Iceberg

A lakehouse architecture is built on two key components: an open table format like Apache Iceberg, and scalable object storage like UltiHash. In this setup, users typically start with structured data (e.g., CSV files) and convert it to the Iceberg format before storing it in UltiHash. This approach enables efficient querying, schema evolution, and ACID guarantees on object storage.

Below is a step-by-step guide using PySpark to convert CSV data into an Iceberg table and store it in UltiHash.

Start a PySpark Session

To get started, launch a PySpark session with the required dependencies and configurations:

  • Make sure your target bucket exists on UltiHash.

  • Include the necessary Iceberg, Hadoop AWS, and AWS SDK packages.

  • Configure the S3A driver properly to connect to UltiHash’s endpoint.

pyspark \
  --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262 \
  --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.iceberg.type=hadoop \
  --conf spark.sql.catalog.iceberg.warehouse=s3a://iceberg \
  --conf spark.hadoop.fs.s3a.endpoint=http://127.0.0.1:8080 \
  --conf spark.hadoop.fs.s3a.access.key=TEST-USER \
  --conf spark.hadoop.fs.s3a.secret.key=SECRET \
  --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
  --conf spark.hadoop.fs.s3a.path.style.access=true \
  --conf spark.hadoop.fs.s3a.connection.ssl.enabled=false \
  --conf spark.driver.bindAddress=127.0.0.1 \
  --conf spark.driver.host=127.0.0.1

Create an Iceberg Table

Once the session is running, start by creating a namespace if it doesn’t already exist. This serves as the logical container for your tables.

spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.ulti")

Now you can define your Iceberg table, specifying the schema and any table-level properties such as format version or metadata retention:

spark.sql("""
    CREATE TABLE IF NOT EXISTS iceberg.ulti.test_iceberg_table (
        id INT,
        name STRING,
        price DOUBLE,
        category STRING,
        ts TIMESTAMP
    )
    USING iceberg
    TBLPROPERTIES (
        'format-version'='2',
        'write.metadata.previous-versions-max'='5'
    )
""")

To verify the table was created successfully:

spark.sql("SHOW TABLES IN iceberg.ulti").show()

Load data from a structured format (e.g. CSV)

Read your CSV data into a DataFrame, enabling schema inference and header detection:

df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/Users/ultihash/Downloads/iceberg/test_data.csv")

df.show()
df.printSchema()

Write data to UltiHash in Iceberg format

With your DataFrame ready, append the records to the Iceberg table stored in your UltiHash bucket:

df.write.format("iceberg") \
    .mode("append") \
    .save("iceberg.ulti.test_iceberg_table")

Read Iceberg data from UltiHash

To confirm the data was written correctly, simply query the table:

spark.sql("SELECT * FROM iceberg.ulti.test_iceberg_table").show()

Last updated

Was this helpful?