Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 138 additions & 97 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ services:
- 8888:8888
command: sh -c "pip install questdb psycopg[binary] psycopg2-binary PyGithub kafka-python confluent-kafka \"confluent-kafka[avro]\" fastavro requests && start-notebook.sh --NotebookApp.password= --NotebookApp.token="
depends_on:
- questdb
- broker-1
- broker-2
questdb:
condition: service_started
broker-1:
condition: service_healthy
broker-2:
condition: service_healthy
extra_hosts:
- "host.docker.internal:host-gateway"
environment:
Expand All @@ -125,7 +128,6 @@ services:
- QDB_CLIENT_USER=${QDB_CLIENT_USER:-admin}
- QDB_CLIENT_PASSWORD=${QDB_CLIENT_PASSWORD:-quest}


broker-1:
<<: *kafka-broker-common
hostname: broker
Expand All @@ -148,6 +150,12 @@ services:
KAFKA_JMX_PORT: 9101
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_OPTS: -javaagent:/usr/jolokia/agents/jolokia-jvm-1.7.2.jar=port=8778,host=0.0.0.0 #Jolokia agent for MX monitoring metrics
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
interval: 30s
timeout: 5s
retries: 10
start_period: 15s

broker-2:
<<: *kafka-broker-common
Expand All @@ -171,6 +179,12 @@ services:
KAFKA_JMX_PORT: 9102
KAFKA_LISTENERS: 'PLAINTEXT://broker-2:29092,CONTROLLER://broker-2:29093,PLAINTEXT_HOST://0.0.0.0:9093'
KAFKA_OPTS: -javaagent:/usr/jolokia/agents/jolokia-jvm-1.7.2.jar=port=8779,host=0.0.0.0 #Jolokia agent for MX monitoring metrics
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9093"]
interval: 30s
timeout: 5s
retries: 10
start_period: 15s

schema_registry:
image: confluentinc/cp-schema-registry:7.7.0
Expand All @@ -188,6 +202,22 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092,broker-2:29092'
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,OPTIONS'
entrypoint: /bin/sh
command:
- -c
- |
echo 'Waiting for Kafka brokers to become available...'
while ! nc -z broker 29092 || ! nc -z broker-2 29092; do
echo 'Kafka brokers not yet available. Retrying in 5s...'
sleep 5
done
echo 'Kafka brokers are up! Starting Schema Registry...'
/etc/confluent/docker/run
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8081"]
interval: 30s
timeout: 5s
retries: 20

kafka-connect-1:
<<: *kafka-connect-common
Expand All @@ -199,107 +229,118 @@ services:
<<: *kafka-connect-env
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_LISTENERS: http://0.0.0.0:8083
depends_on:
broker-1:
condition: service_healthy
broker-2:
condition: service_healthy
schema_registry:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8083/connectors"]
interval: 30s
timeout: 5s
retries: 20
command:
- bash
- -c
- |
# Launch Kafka Connect
/etc/confluent/docker/run &
#
# Wait for Kafka Connect listener
echo "Waiting for Kafka Connect to start listening on localhost ⏳"
while : ; do
curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
if [ $$curl_status -eq 200 ] ; then
break
fi
sleep 5
done
- bash
- -c
- |
echo "Starting Kafka Connect..."
/etc/confluent/docker/run &

echo "Waiting for Kafka Connect listener to be ready on localhost:8083"
while : ; do
curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
if [ $$curl_status -eq 200 ] ; then
break
fi
sleep 5
done

echo -e "\n--\n+> Registering QuestDB Connector"
echo -e "\n--\n+> Registering QuestDB Connectors"

curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/questdb-iot/config -d '{
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"tasks.max": "5",
"topics": "iot_data",
"client.conf.string": "http::addr=${QUESTDB_HTTP_ENDPOINT:-host.docker.internal:9000};",
"name": "questdb-iot",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"include.key": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"table": "iot_data",
"symbols": "device_type",
"value.converter.schemas.enable": false
}'
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/questdb-iot/config -d '{
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"tasks.max": "5",
"topics": "iot_data",
"client.conf.string": "http::addr=${QUESTDB_HTTP_ENDPOINT:-host.docker.internal:9000};",
"name": "questdb-iot",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"include.key": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"table": "iot_data",
"symbols": "device_type",
"value.converter.schemas.enable": false
}'

curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/questdb-github/config -d '{
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"tasks.max": "5",
"topics": "github_events",
"client.conf.string": "http::addr=${QUESTDB_HTTP_ENDPOINT:-host.docker.internal:9000};",
"name": "questdb-github",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"include.key": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"table": "github_events",
"symbols": "type,repo",
"timestamp.field.name": "created_at",
"value.converter.schemas.enable": false
}'
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/questdb-github/config -d '{
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"tasks.max": "5",
"topics": "github_events",
"client.conf.string": "http::addr=${QUESTDB_HTTP_ENDPOINT:-host.docker.internal:9000};",
"name": "questdb-github",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"include.key": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"table": "github_events",
"symbols": "type,repo",
"timestamp.field.name": "created_at",
"value.converter.schemas.enable": false
}'

curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/questdb-trades/config -d '{
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"tasks.max": "5",
"topics": "trades",
"client.conf.string": "http::addr=${QUESTDB_HTTP_ENDPOINT:-host.docker.internal:9000};",
"name": "questdb-trades",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema_registry:8081",
"include.key": false,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema_registry:8081",
"table": "trades",
"symbols": "symbol, side",
"timestamp.field.name": "timestamp",
"value.converter.schemas.enable": true
}'
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/questdb-trades/config -d '{
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"tasks.max": "5",
"topics": "trades",
"client.conf.string": "http::addr=${QUESTDB_HTTP_ENDPOINT:-host.docker.internal:9000};",
"name": "questdb-trades",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema_registry:8081",
"include.key": false,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema_registry:8081",
"table": "trades",
"symbols": "symbol, side",
"timestamp.field.name": "timestamp",
"value.converter.schemas.enable": true
}'

curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/questdb-smart-meters/config -d '{
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"tasks.max": "5",
"topics": "smart-meters",
"client.conf.string": "http::addr=${QUESTDB_HTTP_ENDPOINT:-host.docker.internal:9000};",
"name": "questdb-smart-meters",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema_registry:8081",
"include.key": false,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema_registry:8081",
"table": "smart_meters",
"symbols": "device_id, mark_model, status",
"timestamp.field.name": "timestamp",
"value.converter.schemas.enable": true
}'
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/questdb-smart-meters/config -d '{
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"tasks.max": "5",
"topics": "smart-meters",
"client.conf.string": "http::addr=${QUESTDB_HTTP_ENDPOINT:-host.docker.internal:9000};",
"name": "questdb-smart-meters",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema_registry:8081",
"include.key": false,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema_registry:8081",
"table": "smart_meters",
"symbols": "device_id, mark_model, status",
"timestamp.field.name": "timestamp",
"value.converter.schemas.enable": true
}'

curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/questdb-transactions/config -d '{
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"tasks.max": "5",
"topics": "transactions",
"client.conf.string": "http::addr=${QUESTDB_HTTP_ENDPOINT:-host.docker.internal:9000};",
"name": "questdb-transactions",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema_registry:8081",
"include.key": false,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema_registry:8081",
"table": "transactions",
"symbols": "merchant, category, gender, city, state",
"timestamp.field.name": "timestamp",
"value.converter.schemas.enable": true
}'
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/questdb-transactions/config -d '{
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"tasks.max": "5",
"topics": "transactions",
"client.conf.string": "http::addr=${QUESTDB_HTTP_ENDPOINT:-host.docker.internal:9000};",
"name": "questdb-transactions",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema_registry:8081",
"include.key": false,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema_registry:8081",
"table": "transactions",
"symbols": "merchant, category, gender, city, state",
"timestamp.field.name": "timestamp",
"value.converter.schemas.enable": true
}'

sleep infinity
sleep infinity

kafka-connect-2:
<<: *kafka-connect-common
Expand Down