# Airflow

To start an Airflow session and integrate it with UltiHash, users need to ensure that the right pip packages are installed, then create the connection.

```python
# Check that these packages are installed
pip3 install 'apache-airflow[amazon]'
pip3 install apache-airflow-providers-amazon

# Create the connection between Airflow and UltiHash
airflow connections add 'ultihash' --conn-json '{  
        "conn_type": "aws",
        "login": "ACCESS_KEY_ID",
        "password": "AWS_SECRET_KEY",
        "extra": {
            "endpoint_url": "<endpoint-url>",  
            "verify": "False",
            "service_config": {
              "s3": {
                "endpoint_url": "<endpoint-url>"
              }
            }
        }
    }'
    
# The output should be:
# Successfully added `conn_id`=ultihash : aws://ACCESS_KEY_ID:******@:  
```

Below is a DAG that leveraged the connection with UltiHash cluster created above:

```bash
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

BUCKET_NAME="bucket-test"
AWS_CONN_ID="ultihash"    # Specify the connection with UltiHash 

def bucket_exists():
    s3 = S3Hook(AWS_CONN_ID)  # Make sure your S3 Hooks leverage the connection with UltiHash.
    if s3.check_for_bucket(BUCKET_NAME):
        raise Exception("Bucket %s still exists after removal" % BUCKET_NAME)
    else:
        print("Bucket %s has been successfully removed" % BUCKET_NAME)
 
with DAG(
    dag_id='create_delete_bucket',
    schedule_interval=None,
    start_date=days_ago(2),
    max_active_runs=1,
    tags=['testing'],
) as dag:

    # Create a bucket with BUCKET_NAME on Ultihash
    create_bucket = S3CreateBucketOperator(
        task_id='create_s3_bucket',
        bucket_name=BUCKET_NAME,
        aws_conn_id=AWS_CONN_ID   # Make sure your S3 Operators leverage the connection with UltiHash
    )

    # Delete the previously created bucket
    delete_bucket = S3DeleteBucketOperator(
        task_id='delete_s3_bucket',
        bucket_name=BUCKET_NAME,
        aws_conn_id=AWS_CONN_ID   # Make sure your S3 Operators leverage the connection with UltiHash
    )

    # Check if the bucket still exists
    check_bucket = PythonOperator(
        task_id='check_if_bucket_exists',
        python_callable=bucket_exists
    )

    create_bucket >> delete_bucket >> check_bucket
```

> See all information about the integration on GitHub here: <https://github.com/UltiHash/scripts/tree/main/airflow>
