Airflow
To start an Airflow session and integrate it with UltiHash, users need to ensure that the right pip packages are installed, then create the connection.
# Check that these packages are installed
pip3 install 'apache-airflow[amazon]'
pip3 install apache-airflow-providers-amazon
# Create the connection between Airflow and UltiHash
airflow connections add 'ultihash' --conn-json '{
"conn_type": "aws",
"login": "ACCESS_KEY_ID",
"password": "AWS_SECRET_KEY",
"extra": {
"endpoint_url": "<endpoint-url>",
"verify": "False",
"service_config": {
"s3": {
"endpoint_url": "<endpoint-url>"
}
}
}
}'
# The output should be:
# Successfully added `conn_id`=ultihash : aws://ACCESS_KEY_ID:******@:
Below is a DAG that leveraged the connection with UltiHash cluster created above:
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
BUCKET_NAME="bucket-test"
AWS_CONN_ID="ultihash" # Specify the connection with UltiHash
def bucket_exists():
s3 = S3Hook(AWS_CONN_ID) # Make sure your S3 Hooks leverage the connection with UltiHash.
if s3.check_for_bucket(BUCKET_NAME):
raise Exception("Bucket %s still exists after removal" % BUCKET_NAME)
else:
print("Bucket %s has been successfully removed" % BUCKET_NAME)
with DAG(
dag_id='create_delete_bucket',
schedule_interval=None,
start_date=days_ago(2),
max_active_runs=1,
tags=['testing'],
) as dag:
# Create a bucket with BUCKET_NAME on Ultihash
create_bucket = S3CreateBucketOperator(
task_id='create_s3_bucket',
bucket_name=BUCKET_NAME,
aws_conn_id=AWS_CONN_ID # Make sure your S3 Operators leverage the connection with UltiHash
)
# Delete the previously created bucket
delete_bucket = S3DeleteBucketOperator(
task_id='delete_s3_bucket',
bucket_name=BUCKET_NAME,
aws_conn_id=AWS_CONN_ID # Make sure your S3 Operators leverage the connection with UltiHash
)
# Check if the bucket still exists
check_bucket = PythonOperator(
task_id='check_if_bucket_exists',
python_callable=bucket_exists
)
create_bucket >> delete_bucket >> check_bucket
See all information about the integration on GitHub here: https://github.com/UltiHash/scripts/tree/main/airflow
Last updated
Was this helpful?