Kafka

This guide walks you through setting up Kafka S3 sink connector to continuously write messages to UltiHash cluster. You’ll learn how to configure Kafka S3 sink connector, launch a local Kafka setup using Docker, produce Kafka messages, and write them to an UltiHash bucket continuously.

Download the S3 Sink connector locally

To use the S3 Sink connector with Kafka, you’ll need to download it locally. You can either use the command below, or do it manually via this link. Make sure to keep track of the target path file (it needs to reside in the right folder)

mkdir -p ./connect-plugins
cd connect-plugins

curl -O <https://packages.confluent.io/maven/io/confluent/kafka-connect-s3/10.7.1/kafka-connect-s3-10.7.1.jar>

Create a docker-compose.kafka-connect.yml file

Create the following yml file. Make sure that the path to the plugins (S3 Sink Connector) is accurate.

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  connect:
    image: confluentinc/cp-kafka-connect:7.5.0
    container_name: connect
    ports:
      - "8083:8083"
    depends_on:
      - kafka
    environment:
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_GROUP_ID: "connect-cluster"
      CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_PLUGIN_PATH: "/usr/share/java/plugins"
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_RUNNABLE_PRECONDITION_CLASS:
      CONNECT_DISABLE_PRECHECKS: "true"
    volumes:
      - ./plugins:/usr/share/java/plugins

Start Kafka

docker compose -f docker-compose.kafka-connect.yml up -d

docker ps #check that your containers are running

Configure the S3 Sink Connector

Use the following command to configure your S3 Sink Connector. The target bucket in UltiHash needs to exist in your UltiHash Cluster prior to this configuration.

  • "connector.class": "io.confluent.connect.s3.S3SinkConnector" tells Kafka Connect which connector to use. Here, we’re using the S3 sink connector to push data to object storage.

  • "tasks.max" sets the maximum number of parallel tasks the connector can run. Set it to 1 for a simple setup. Increase it if you have multiple partitions or topics and need higher throughput.

  • "topics" tells the connector which Kafka topics to pull messages from. If you're working with more than one topic, just list them separated by commas.

  • "s3.bucket.name" is the name of the bucket in your UltiHash cluster where the messages will be stored.

  • "s3.region" is required by the connector even if you’re running locally. Can be set to any valid AWS-style region.

  • "store.url" is the URL of your S3-compatible storage (UltiHash). host.docker.internal bridges Docker to your local host.

  • "s3.part.size" corresponds to the minimum size (in bytes) before splitting large files into parts. Mostly relevant for big messages.

  • "flush.size" determines how many Kafka messages are grouped together into a single JSON file before being written to UltiHash.

    • smaller values write more frequently, creating more (but smaller) files,

    • larger values reduce write frequency and create fewer, larger files — which can be more efficient for batch processing or compression.

  • "storage.class" specifies how data is written to S3. This should always match the connector type.

  • "format.class" defines the format for stored messages. JSON keeps things readable and lightweight.

  • "schema.compatibility" disables schema enforcement. Good if your messages don’t use Avro or schema registries.

  • "key.converter" converts message keys into strings for storage.

  • "value.converter" converts message values into strings for storage.

  • "aws.access.key.id" is your UltiHash access key (works like AWS credentials).

  • "aws.secret.access.key" is your UltiHash secret key.

curl -X PUT <http://localhost:8083/connectors/s3-sink-connector/config> \\
  -H "Content-Type: application/json" \\
  -d '{
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "1",
    "topics": "test-topic",
    "s3.bucket.name": "kafka",
    "s3.region": "us-east-1",
    "store.url": "<http://host.docker.internal:8080>", 
    "s3.part.size": 5242880,
    "flush.size": 3,
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.compatibility": "NONE",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "aws.access.key.id": "<UltiHash access key id>",
    "aws.secret.access.key": "<UltiHash secret access key>"
  }'

Check the S3 Sink Connector status

Check the status of the S3 Sink Connector, it should be “RUNNING”

curl <http://localhost:8083/connectors/s3-sink-connector/status> | jq 

Access the CLI within docker

You should enter the Docker container to interact with Kafka directly.

docker exec -it kafka bash

Write a message to Kafka (which will be communicated to UltiHash automatically)

First, access the Kafka console:

kafka-console-producer --broker-list kafka:9092 --topic test-topic

Now, you can write your messages:

hello-world
this-is-working
🦾💪

#hit enter after each line

#Ctrl+C when done

exit

Read your message from UltiHash

You can read from UltiHash by seeing the JSON files that were written by Kafka to UltiHash. In that case, you’re seeing a list of JSON files where the amount of JSON files depends on the flush size in the S3 Sink Connector configuration and the amount of messages sent (#messages sent/flush size = total #objects). This list will provide you with the objects’ keys. This will enable you to read the contents of the JSON files, as described below:

#list object in the bucket 
curl -X GET <http://localhost:8080/kafka/>

#read specific JSON files
curl <http://localhost:8080/kafka/><key>

Last updated

Was this helpful?