Skip to content

Latest commit

 

History

History
295 lines (207 loc) · 12.8 KB

File metadata and controls

295 lines (207 loc) · 12.8 KB

Kafka Connect Modülü

English Documentation

İngilizce dokümantasyon için README.md dosyasına bakınız.


Genel Bakış

Bu modül, Ansible, Docker Compose ve AWS S3'teki Terraform state'ten dinamik konfigürasyon ile tam otomatik, üretim seviyesinde Apache Kafka Connect kümesi kurulumu sağlar. Güvenli, ölçeklenebilir ve gözlemlenebilir Kafka Connect node'ları, otomatik connector yönetimi ve GitHub Actions ile CI/CD içerir.


Özellikler

  • Otomatik Çoklu Node Kafka Connect Kurulumu: İki node'lu cluster, Docker Compose, dinamik envanter ve güvenli konfigürasyon.
  • Terraform/S3'ten Dinamik Konfigürasyon: Envanter, .env ve connector konfigürasyonları canlı altyapıdan üretilir.
  • Güvenli SASL/SSL Entegrasyonu: Tüm bağlantılar SASL_SSL ile, dinamik şifre ve sertifika yönetimiyle sağlanır.
  • Connector Yaşam Döngüsü Otomasyonu: Connector oluşturma, güncelleme, silme ve yeniden başlatma için scriptler.
  • Connector API bağlantısı: Connector, Kafka API'ye bağlanırken Authorization: Bearer <token> başlığı ile token tabanlı güvenli kimlik doğrulama kullanır.
  • Prometheus İzleme: JMX Exporter ile tüm metrikler Prometheus'a açılır.
  • GitHub Actions ile CI/CD: Her push'ta uçtan uca otomasyon, dağıtım ve konfigürasyon güncellemesi.

Dizin Yapısı

  • ansible_kafka_connect.yml — Tüm kurulum için Ansible playbook'u
  • generate-inventory.sh — S3 Terraform state'ten envanter, .env ve connector konfigürasyonlarını üretir
  • inventory.yml — Dinamik Ansible envanteri (otomatik üretilir)
  • docker-compose-1.yml, docker-compose-2.yml — Her node için Docker Compose dosyaları
  • connectors/ — Connector konfigürasyon şablonları ve üretilen dosyalar
  • scripts/ — Connector ve node yönetimi için shell scriptler
  • jmx-exporter-config.yml — Prometheus JMX Exporter konfigürasyonu

Otomatik İş Akışı

  1. Altyapı Bilgisi Çekme: generate-inventory.sh S3'ten Kafka Connect ve broker bilgilerini çeker.
  2. Konfigürasyon Üretimi: Envanter, .env ve connector JSON dinamik olarak üretilir.
  3. Ansible Playbook: ansible_kafka_connect.yml bağımlılıkları kurar, konfigürasyonları kopyalar ve her node'da Docker Compose'u çalıştırır.
  4. Connector Yönetimi: scripts/ altındaki scriptlerle connector oluşturma, güncelleme, silme ve yeniden başlatma işlemleri yapılır.
  5. İzleme: JMX Exporter ile metrikler Prometheus'a açılır.
  6. CI/CD: GitHub Actions workflow'u tüm süreci her push'ta otomatikleştirir.

Temel Dosya ve Scriptler

  • Ansible Playbook: Kafka Connect node'larını kurar, SSL yönetir ve idempotent çalışır.
  • Docker Compose: Tüm güvenlik, cluster ve izleme ayarlarıyla Kafka Connect'i çalıştırır.
  • Connector Scriptleri:
    • create-connector.sh — Connector oluşturur ve doğrular
    • delete-connector.sh — Connector siler
    • restart-task.sh — Connector task'ını yeniden başlatır
    • setup-kafka-connect-1.sh / setup-kafka-connect-2.sh — Node kurulumu ve plugin yönetimi
  • Connector Konfigürasyonları: Şablonlar, generate-inventory.sh ile dinamik olarak doldurulur.
  • JMX Exporter: Prometheus için detaylı metrikleri açar.

GitHub Actions CI/CD

  • Workflow: .github/workflows/ansible_kafka_connect.yaml
  • Adımlar:
    • Kodu checkout et
    • Ansible'ı Docker içinde AWS ve SSH secret'larıyla çalıştır
    • S3'ten envanter ve konfigürasyonları üret
    • Kafka Connect node'larını kur ve test et
    • Healthcheck ve metrik doğrulaması yap actions

Güvenlik ve En İyi Pratikler

  • Tüm şifreler ve kimlik bilgileri environment variable veya GitHub Actions secret olarak aktarılır.
  • Tüm Kafka bağlantıları için SASL/SSL zorunludur.
  • Sertifikalar ve hassas dosyalar versiyon kontrolüne eklenmez.
  • Tüm script ve playbook'lar idempotent ve tekrar çalıştırmaya uygundur.

İzleme ve Gözlemlenebilirlik

  • JMX Exporter, Kafka Connect, connector ve task metriklerini açar.
  • Prometheus her node'dan 9404 portundan metrikleri çekebilir.

Kafka Connect REST API İşlemleri

Cluster Bilgileri:

Kafka Connect Cluster Bilgisini Getirme

komut:

curl -s http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/

çıktı:

{"version":"7.5.0-ccs","commit":"ff3c201baa948d97889dc26c99d7cdc23d038f2e","kafka_cluster_id":"D6vOXFfVQSaiuc9BoZ6bnA"}

Connector Plugin’lerini Listele

komut:

curl -s http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connector-plugins | jq

çıktı:

[
  {
    "class": "com.github.castorm.kafka.connect.http.HttpSourceConnector",
    "type": "source",
    "version": "0.0.0.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "7.5.0-ccs"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "7.5.0-ccs"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "7.5.0-ccs"
  }
]

Connector Konfigürasyonunu Doğrula (Validate)

komut:

curl -X PUT -H "Content-Type: application/json" http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connector-plugins/com.github.castorm.kafka.connect.http.HttpSourceConnector/config/validate --data @/home/ubuntu/kafka_connect/connectors/http-source-connector-validate.json

çıktı:

{"name":"com.github.castorm.kafka.connect.http.HttpSourceConnector","error_count":0,"groups":["Common","Transforms","Predicates","Error Handling","Topic Creation","Exactly Once Support","offsets.topic"],"configs":[{"definition":{"name":"name","type":"STRING","required":true,"default_value":null,"importance":"HIGH","documentation":"Globally unique name to use for this connector.","group":"Common","width":"MEDIUM","display_name":"Connector name","dependents":[],"order":1},"value":{"name":"name","value":"http-source-topics-connector","recommended_values":[],"errors":[],"visible":true}},{"definition":{"name":"connector.class","type":"STRING","required":true,"default_value":null,"importance":"HIGH","documentation":"Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. If th
....
..
.

Connector CRUD İşlemleri

Connector Oluştur

komut:

curl -X POST -H "Content-Type: application/json" http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors --data @/home/ubuntu/kafka_connect/connectors/http-source-connector.json

çıktı:

{"name":"http-source-topics-connector","config":{"connector.class":"com.github.castorm.kafka.connect.http.HttpSourceConnector","tasks.max":"1","http.request.url":"http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:2020/topics","http.request.method":"GET","http.request.headers":"Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MjA4MTAyNjAwNn0.PPGtxAJVOXAzE1UO8Jn0Gxo-H8A_1vH1GaTO9xGSK2k","http.timer.interval.millis":"60000","http.timer.catchup.interval.millis":"1000","http.response.policy.class":"com.github.castorm.kafka.connect.http.response.PolicyHttpResponsePolicy","http.response.policy.policy":"com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisTimestampPolicy","kafka.topic":"topic-1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","http.offset.initial":"Offset{timestamp=0}","http.auth.type":"None","name":"http-source-topics-connector"},"tasks":[],"type":"source"}

Connector’ları Listele

komut:

curl -X GET \
http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors

çıktı:

["http-source-topics-connector"]

Connector Detaylarını Getir

komut:

curl -X GET \
http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors/http-source-topics-connector

çıktı:

{"name":"http-source-topics-connector","config":{"connector.class":"com.github.castorm.kafka.connect.http.HttpSourceConnector","http.auth.type":"None","tasks.max":"1","http.request.headers":"Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MjA4MTAyNjAwNn0.PPGtxAJVOXAzE1UO8Jn0Gxo-H8A_1vH1GaTO9xGSK2k","http.timer.catchup.interval.millis":"1000","http.request.url":"http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:2020/topics","http.offset.initial":"Offset{timestamp=0}","http.request.method":"GET","value.converter.schemas.enable":"false","name":"http-source-topics-connector","http.timer.interval.millis":"60000","http.response.policy.class":"com.github.castorm.kafka.connect.http.response.PolicyHttpResponsePolicy","kafka.topic":"topic-1","http.response.policy.policy":"com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisTimestampPolicy","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.storage.StringConverter"},"tasks":[],"type":"source"}

Connector Durumunu Getir

komut:

curl -X GET \
http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors/http-source-topics-connector/status

çıktı:

{"name":"http-source-topics-connector","connector":{"state":"RUNNING","worker_id":"kafka-connect-2:8083"},"tasks":[],"type":"source"}

Connector Konfigürasyonunu Güncelle

komut:

curl -X PUT -H "Content-Type: application/json" http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors/http-source-topics-connector/config --data @/home/ubuntu/kafka_connect/connectors/http-source-connector-validate.json

çıktı:

{"name":"http-source-topics-connector","config":{"name":"http-source-topics-connector","connector.class":"com.github.castorm.kafka.connect.http.HttpSourceConnector","tasks.max":"1","http.request.url":"http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:2020/topics","http.request.method":"GET","http.request.headers":"Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MjA4MTAyNjAwNn0.PPGtxAJVOXAzE1UO8Jn0Gxo-H8A_1vH1GaTO9xGSK2k","http.timer.interval.millis":"60000","http.timer.catchup.interval.millis":"1000","http.response.policy.class":"com.github.castorm.kafka.connect.http.response.PolicyHttpResponsePolicy","http.response.policy.policy":"com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisTimestampPolicy","kafka.topic":"topic-1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","http.offset.initial":"Offset{timestamp=0}","http.auth.type":"None"},"tasks":[],"type":"source"}

Connector Sil

komut:

curl -X DELETE \
http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors/http-source-topics-connector

çıktı:

Connector Task İşlemleri

Connector Task’lerini Listele

komut:

curl -X GET \
http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors/http-source-topics-connector/tasks

çıktı:

[{"id":{"connector":"http-source-topics-connector","task":0},"config":{"connector.class":"com.github.castorm.kafka.connect.http.HttpSourceConnector","http.auth.type":"None","tasks.max":"1","http.request.headers":"Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MjA4MTAyNjAwNn0.PPGtxAJVOXAzE1UO8Jn0Gxo-H8A_1vH1GaTO9xGSK2k","http.timer.catchup.interval.millis":"1000","http.request.url":"http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:2020/topics","http.offset.initial":"Offset{timestamp=0}","http.request.method":"GET","task.class":"com.github.castorm.kafka.connect.http.HttpSourceTask","value.converter.schemas.enable":"false","name":"http-source-topics-connector","http.timer.interval.millis":"60000","http.response.policy.class":"com.github.castorm.kafka.connect.http.response.PolicyHttpResponsePolicy","kafka.topic":"topic-1","http.response.policy.policy":"com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisTimestampPolicy","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.storage.StringConverter"}}]

Task Durumunu Getir (Task 0)

komut:

curl -X GET \
http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors/http-source-topics-connector/tasks/0/status

çıktı:

{"id":0,"state":"RUNNING","worker_id":"ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083"}

Task Yeniden Başlat (Restart)

komut:

curl -X POST \
http://ec2-3-255-139-80.eu-west-1.compute.amazonaws.com:8083/connectors/http-source-topics-connector/tasks/0/restart

çıktı:

Task çıktısı

task