Skip to content

Latest commit

 

History

History

README.md

Kafka Connect Module

Turkish Documentation

For the Turkish version of this documentation, see README.tr.md


Overview

This module provides a fully automated, production-grade Apache Kafka Connect cluster setup using Ansible, Docker Compose, and dynamic configuration from Terraform state in AWS S3. It features secure, scalable, and observable Kafka Connect nodes, automated connector management, and CI/CD with GitHub Actions.


Features

  • Automated Multi-Node Kafka Connect Setup: Two-node cluster, Docker Compose, dynamic inventory, and secure configuration.
  • Dynamic Configuration from Terraform/S3: Inventory, .env, and connector configurations are generated from live infrastructure.
  • Secure SASL/SSL Integration: All connections use SASL_SSL with dynamic password and certificate management.
  • Connector Lifecycle Automation: Scripts for creating, updating, deleting, and restarting connectors.
  • Prometheus Monitoring: All metrics are exposed to Prometheus via JMX Exporter.
  • CI/CD with GitHub Actions: End-to-end automation, deployment, and configuration update on every push.
  • Connector API Security: The connector securely authenticates to the Kafka API using token-based authentication with the Authorization: Bearer <token> header.

Directory Structure

  • ansible_kafka_connect.yml — Ansible playbook for full setup
  • generate-inventory.sh — Generates inventory, .env, and connector configs from S3 Terraform state
  • inventory.yml — Dynamic Ansible inventory (auto-generated)
  • docker-compose-1.yml, docker-compose-2.yml — Docker Compose files for each node
  • connectors/ — Connector configuration templates and generated files
  • scripts/ — Shell scripts for connector and node management
  • jmx-exporter-config.yml — Prometheus JMX Exporter configuration

Automated Workflow

  1. Fetch Infrastructure Info: generate-inventory.sh pulls Kafka Connect and broker info from S3.
  2. Generate Configurations: Inventory, .env, and connector JSON are generated dynamically.
  3. Ansible Playbook: ansible_kafka_connect.yml installs dependencies, copies configs, and runs Docker Compose on each node.
  4. Connector Management: Use scripts in scripts/ to create, update, delete, and restart connectors.
  5. Monitoring: JMX Exporter exposes metrics to Prometheus.
  6. CI/CD: GitHub Actions workflow automates the entire process on every push.

Key Files and Scripts

  • Ansible Playbook: Installs Kafka Connect nodes, manages SSL, and is idempotent.
  • Docker Compose: Runs Kafka Connect with all security, cluster, and monitoring settings.
  • Connector Scripts:
    • create-connector.sh — Creates and validates connector
    • delete-connector.sh — Deletes connector
    • restart-task.sh — Restarts connector task
    • setup-kafka-connect-1.sh / setup-kafka-connect-2.sh — Node setup and plugin management
  • Connector Configurations: Templates are dynamically filled by generate-inventory.sh.
  • JMX Exporter: Exposes detailed metrics for Prometheus.

GitHub Actions CI/CD

  • Workflow: .github/workflows/ansible_kafka_connect.yaml
  • Steps:
    • Checkout code
    • Run Ansible in Docker with AWS and SSH secrets
    • Generate inventory and configs from S3
    • Install and test Kafka Connect nodes
    • Perform healthcheck and metrics validation actions

Security and Best Practices

  • All passwords and credentials are passed as environment variables or GitHub Actions secrets.
  • SASL/SSL is required for all Kafka connections.
  • Certificates and sensitive files are not added to version control.
  • All scripts and playbooks are idempotent and safe to re-run.

Observability

  • JMX Exporter exposes Kafka Connect, connector, and task metrics.
  • Prometheus can scrape metrics from each node on port 9404.

Kafka Connect REST API Operations

Cluster Information:

Get Kafka Connect Cluster Info

Command:

curl -s http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/

Output:

{"version":"7.5.0-ccs","commit":"ff3c201baa948d97889dc26c99d7cdc23d038f2e","kafka_cluster_id":"D6vOXFfVQSaiuc9BoZ6bnA"}

List Connector Plugins

Command:

curl -s http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connector-plugins | jq

Output:

[
  {
    "class": "com.github.castorm.kafka.connect.http.HttpSourceConnector",
    "type": "source",
    "version": "0.0.0.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "7.5.0-ccs"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "7.5.0-ccs"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "7.5.0-ccs"
  }
]

Validate Connector Configuration

Command:

curl -X PUT -H "Content-Type: application/json" http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connector-plugins/com.github.castorm.kafka.connect.http.HttpSourceConnector/config/validate --data @/home/ubuntu/kafka_connect/connectors/http-source-connector-validate.json

Output:

{"name":"com.github.castorm.kafka.connect.http.HttpSourceConnector","error_count":0,"groups":["Common","Transforms","Predicates","Error Handling","Topic Creation","Exactly Once Support","offsets.topic"],"configs":[{"definition":{"name":"name","type":"STRING","required":true,"default_value":null,"importance":"HIGH","documentation":"Globally unique name to use for this connector.","group":"Common","width":"MEDIUM","display_name":"Connector name","dependents":[],"order":1},"value":{"name":"name","value":"http-source-topics-connector","recommended_values":[],"errors":[],"visible":true}},{"definition":{"name":"connector.class","type":"STRING","required":true,"default_value":null,"importance":"HIGH","documentation":"Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. If th
....
..
.

Connector CRUD Operations

Create Connector

Command:

curl -X POST -H "Content-Type: application/json" http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors --data @/home/ubuntu/kafka_connect/connectors/http-source-connector.json

Output:

{"name":"http-source-topics-connector","config":{"connector.class":"com.github.castorm.kafka.connect.http.HttpSourceConnector","tasks.max":"1","http.request.url":"http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:2020/topics","http.request.method":"GET","http.request.headers":"Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MjA4MTAyNjAwNn0.PPGtxAJVOXAzE1UO8Jn0Gxo-H8A_1vH1GaTO9xGSK2k","http.timer.interval.millis":"60000","http.timer.catchup.interval.millis":"1000","http.response.policy.class":"com.github.castorm.kafka.connect.http.response.PolicyHttpResponsePolicy","http.response.policy.policy":"com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisTimestampPolicy","kafka.topic":"topic-1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","http.offset.initial":"Offset{timestamp=0}","http.auth.type":"None","name":"http-source-topics-connector"},"tasks":[],"type":"source"}

List Connectors

Command:

curl -X GET \
http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors

Output:

["http-source-topics-connector"]

Get Connector Details

Command:

curl -X GET \
http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors/http-source-topics-connector

Output:

{"name":"http-source-topics-connector","config":{"connector.class":"com.github.castorm.kafka.connect.http.HttpSourceConnector","http.auth.type":"None","tasks.max":"1","http.request.headers":"Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MjA4MTAyNjAwNn0.PPGtxAJVOXAzE1UO8Jn0Gxo-H8A_1vH1GaTO9xGSK2k","http.timer.catchup.interval.millis":"1000","http.request.url":"http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:2020/topics","http.offset.initial":"Offset{timestamp=0}","http.request.method":"GET","value.converter.schemas.enable":"false","name":"http-source-topics-connector","http.timer.interval.millis":"60000","http.response.policy.class":"com.github.castorm.kafka.connect.http.response.PolicyHttpResponsePolicy","kafka.topic":"topic-1","http.response.policy.policy":"com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisTimestampPolicy","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.storage.StringConverter"},"tasks":[],"type":"source"}

Get Connector Status

Command:

curl -X GET \
http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors/http-source-topics-connector/status

Output:

{"name":"http-source-topics-connector","connector":{"state":"RUNNING","worker_id":"kafka-connect-2:8083"},"tasks":[],"type":"source"}

Update Connector Configuration

Command:

curl -X PUT -H "Content-Type: application/json" http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors/http-source-topics-connector/config --data @/home/ubuntu/kafka_connect/connectors/http-source-connector-validate.json

Output:

{"name":"http-source-topics-connector","config":{"name":"http-source-topics-connector","connector.class":"com.github.castorm.kafka.connect.http.HttpSourceConnector","tasks.max":"1","http.request.url":"http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:2020/topics","http.request.method":"GET","http.request.headers":"Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MjA4MTAyNjAwNn0.PPGtxAJVOXAzE1UO8Jn0Gxo-H8A_1vH1GaTO9xGSK2k","http.timer.interval.millis":"60000","http.timer.catchup.interval.millis":"1000","http.response.policy.class":"com.github.castorm.kafka.connect.http.response.PolicyHttpResponsePolicy","http.response.policy.policy":"com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisTimestampPolicy","kafka.topic":"topic-1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","http.offset.initial":"Offset{timestamp=0}","http.auth.type":"None"},"tasks":[],"type":"source"}

Delete Connector

Command:

curl -X DELETE \
http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors/http-source-topics-connector

Output:

Connector Task Operations

List Connector Tasks

Command:

curl -X GET \
http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors/http-source-topics-connector/tasks

Output:

[{"id":{"connector":"http-source-topics-connector","task":0},"config":{"connector.class":"com.github.castorm.kafka.connect.http.HttpSourceConnector","http.auth.type":"None","tasks.max":"1","http.request.headers":"Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MjA4MTAyNjAwNn0.PPGtxAJVOXAzE1UO8Jn0Gxo-H8A_1vH1GaTO9xGSK2k","http.timer.catchup.interval.millis":"1000","http.request.url":"http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:2020/topics","http.offset.initial":"Offset{timestamp=0}","http.request.method":"GET","task.class":"com.github.castorm.kafka.connect.http.HttpSourceTask","value.converter.schemas.enable":"false","name":"http-source-topics-connector","http.timer.interval.millis":"60000","http.response.policy.class":"com.github.castorm.kafka.connect.http.response.PolicyHttpResponsePolicy","kafka.topic":"topic-1","http.response.policy.policy":"com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisTimestampPolicy","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.storage.StringConverter"}}]

Get Task Status (Task 0)

Command:

curl -X GET \
http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors/http-source-topics-connector/tasks/0/status

Output:

{"id":0,"state":"RUNNING","worker_id":"ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083"}

Restart Task

Command:

curl -X POST \
http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors/http-source-topics-connector/tasks/0/restart

Output:

Task output

task