Airflow
How to connect UltiHash to Airflow
To start an Airflow session and integrate it with UltiHash, users need to ensure that the right pip packages are installed, then create the connection.
# Check that these packages are installed
pip3 install 'apache-airflow[amazon]'
pip3 install apache-airflow-providers-amazon
# Create the connection between Airflow and UltiHash
airflow connections add 'ultihash' --conn-json '{
"conn_type": "aws",
"login": "ACCESS_KEY_ID",
"password": "AWS_SECRET_KEY",
"extra": {
"endpoint_url": "<endpoint-url>",
"verify": "False",
"service_config": {
"s3": {
"endpoint_url": "<endpoint-url>"
}
}
}
}'
# The output should be:
# Successfully added `conn_id`=ultihash : aws://ACCESS_KEY_ID:******@:
Below is a DAG that leveraged the connection with UltiHash cluster created above:
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
BUCKET_NAME="bucket-test"
AWS_CONN_ID="ultihash" # Specify the connection with UltiHash
def bucket_exists():
s3 = S3Hook(AWS_CONN_ID) # Make sure your S3 Hooks leverage the connection with UltiHash.
if s3.check_for_bucket(BUCKET_NAME):
raise Exception("Bucket %s still exists after removal" % BUCKET_NAME)
else:
print("Bucket %s has been successfully removed" % BUCKET_NAME)
with DAG(
dag_id='create_delete_bucket',
schedule_interval=None,
start_date=days_ago(2),
max_active_runs=1,
tags=['testing'],
) as dag:
# Create a bucket with BUCKET_NAME on Ultihash
create_bucket = S3CreateBucketOperator(
task_id='create_s3_bucket',
bucket_name=BUCKET_NAME,
aws_conn_id=AWS_CONN_ID # Make sure your S3 Operators leverage the connection with UltiHash
)
# Delete the previously created bucket
delete_bucket = S3DeleteBucketOperator(
task_id='delete_s3_bucket',
bucket_name=BUCKET_NAME,
aws_conn_id=AWS_CONN_ID # Make sure your S3 Operators leverage the connection with UltiHash
)
# Check if the bucket still exists
check_bucket = PythonOperator(
task_id='check_if_bucket_exists',
python_callable=bucket_exists
)
create_bucket >> delete_bucket >> check_bucket
See all information about the integration on GitHub here: https://github.com/UltiHash/scripts/tree/main/airflow
Last updated
Was this helpful?