# Airflow

To start an Airflow session and integrate it with UltiHash, users need to ensure that the right pip packages are installed, then create the connection.

```python
# Check that these packages are installed
pip3 install 'apache-airflow[amazon]'
pip3 install apache-airflow-providers-amazon

# Create the connection between Airflow and UltiHash
airflow connections add 'ultihash' --conn-json '{  
        "conn_type": "aws",
        "login": "ACCESS_KEY_ID",
        "password": "AWS_SECRET_KEY",
        "extra": {
            "endpoint_url": "<endpoint-url>",  
            "verify": "False",
            "service_config": {
              "s3": {
                "endpoint_url": "<endpoint-url>"
              }
            }
        }
    }'
    
# The output should be:
# Successfully added `conn_id`=ultihash : aws://ACCESS_KEY_ID:******@:  
```

Below is a DAG that leveraged the connection with UltiHash cluster created above:

```bash
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

BUCKET_NAME="bucket-test"
AWS_CONN_ID="ultihash"    # Specify the connection with UltiHash 

def bucket_exists():
    s3 = S3Hook(AWS_CONN_ID)  # Make sure your S3 Hooks leverage the connection with UltiHash.
    if s3.check_for_bucket(BUCKET_NAME):
        raise Exception("Bucket %s still exists after removal" % BUCKET_NAME)
    else:
        print("Bucket %s has been successfully removed" % BUCKET_NAME)
 
with DAG(
    dag_id='create_delete_bucket',
    schedule_interval=None,
    start_date=days_ago(2),
    max_active_runs=1,
    tags=['testing'],
) as dag:

    # Create a bucket with BUCKET_NAME on Ultihash
    create_bucket = S3CreateBucketOperator(
        task_id='create_s3_bucket',
        bucket_name=BUCKET_NAME,
        aws_conn_id=AWS_CONN_ID   # Make sure your S3 Operators leverage the connection with UltiHash
    )

    # Delete the previously created bucket
    delete_bucket = S3DeleteBucketOperator(
        task_id='delete_s3_bucket',
        bucket_name=BUCKET_NAME,
        aws_conn_id=AWS_CONN_ID   # Make sure your S3 Operators leverage the connection with UltiHash
    )

    # Check if the bucket still exists
    check_bucket = PythonOperator(
        task_id='check_if_bucket_exists',
        python_callable=bucket_exists
    )

    create_bucket >> delete_bucket >> check_bucket
```

> See all information about the integration on GitHub here: <https://github.com/UltiHash/scripts/tree/main/airflow>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.ultihash.io/operations/prebuilt-connections/airflow.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
