본문 바로가기
카테고리 없음

실시간 CDC 스트리밍 파이프라인 as Code

by frontier12 2025. 5. 29.


Debezium → Apache Kafka (Strimzi) → Apache Flink Operator → KEDA → Argo CD 기반으로, 데이터베이스 변경(CDC)을 실시간으로 캡처·스트리밍 처리하고, 지연(lag)·처리량(metric) 기준 자동 확장·자동 복구까지 모두 코드로 선언·관리하는 차세대 스트리밍 DataOps 파이프라인입니다.



🎯 목표
1. 데이터베이스 변경을 Debezium으로 캡처해 Kafka에 실시간 전송
2. Flink로 스트림 처리(조인·윈도우·집계)
3. KEDA로 Kafka lag, 처리 지연 기준 자동 스케일링
4. Argo CD로 모든 스트리밍 애플리케이션과 인프라를 GitOps로 선언·동기화
5. 장애 시 자동 롤백·Self-Heal까지 완전 자동 운영



⚙️ 핵심 컴포넌트

계층 도구 / CRD 역할
CDC 파이프라인 Debezium Operator MySQL/Postgres 변경 이벤트를 Kafka 토픽으로 스트리밍
메시징 레이어 Strimzi Kafka Cluster 고가용성 Kafka 클러스터 운영
스트림 처리 Apache Flink Operator Kubernetes 위 Flink Job(StreamingJob CR) 관리
오토스케일링 KEDA ScaledObject Kafka lag(kafka_consumer_group_lag)기반 Pod 자동 증감
배포·동기화 Argo CD Application & ApplicationSet 모든 CR/매니페스트 Git 선언 & 실시간 동기화
모니터링 & 알림 Prometheus + Alertmanager 처리 지연·오류율 모니터링 → Slack/PagerDuty 알림
자가복구(Self-Heal) Argo CD Self-Heal + Rollout Git 상태와 클러스터 상태 불일치 시 자동 복구




🏗️ 파이프라인 워크플로우

[Git Push: Debezium, Kafka, Flink, KEDA 매니페스트]
               ↓
     ┌─────────────────────────┐
     │  Argo CD Sync Infra CRs │
     └────────────┬────────────┘
                  ↓
       Debezium → Kafka 토픽 구성
                  ↓
        Flink StreamingJob CR 배포
                  ↓
        Flink Operator가 Job 실행
                  ↓
  Prometheus 수집: kafka_consumer_group_lag
                  ↓
      ┌───────────┴───────────┐
      │ KEDA ScaledObject     │
      │ triggers on lag > X   │
      └───────────┬───────────┘
                  ↓
     Flink TaskManager / JobManager 자동 증감
                  ↓
  처리 실패 또는 Self-Heal 이벤트 발생 시
                  ↓
  Argo CD Self-Heal이 Git 상태로 복구




🧪 실전 매니페스트 예시

1) Debezium Connector CR (KafkaConnect)

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-mysql
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  config:
    database.hostname: mysql.default.svc.cluster.local
    database.port: "3306"
    database.user: debezium
    database.password: ${file:/opt/kafka/secret/password}
    database.server.id: "85744"
    database.server.name: mysql-connector
    table.whitelist: inventory.customers
    database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
    database.history.kafka.topic: schema-changes.inventory

2) Flink StreamingJob CR

apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  name: inventory-aggregate
spec:
  flinkVersion: v1_13
  jarURI: "oci://registry/my-flink-jobs@sha256:..."
  parallelism: 2
  args:
    - "--input-topic"
    - "mysql-connector.inventory.customers"
    - "--output-topic"
    - "customers-aggregates"

3) KEDA ScaledObject (Kafka lag 기준)

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: flink-customer-lag-scaled
spec:
  scaleTargetRef:
    name: inventory-aggregate-job
  minReplicaCount: 1
  maxReplicaCount: 10
  pollingInterval: 30
  cooldownPeriod: 300
  triggers:
    - type: kafka
      metadata:
        bootstrapServers: my-cluster-kafka-bootstrap:9092
        consumerGroup: flink-customer-group
        lagThreshold: "1000"

4) Argo CD ApplicationSet 예시

apiVersion: argoproj.io/v1alpha1
kind: ApplicationSet
metadata:
  name: streaming-data-pipeline
spec:
  generators:
    - git:
        repoURL: https://github.com/myorg/streaming-pipeline
        directories:
          - path: 'infrastructure/*'
  template:
    metadata:
      name: '{{path.basename}}'
    spec:
      source:
        repoURL: https://github.com/myorg/streaming-pipeline
        path: '{{path}}'
        targetRevision: main
      destination:
        server: https://kubernetes.default.svc
        namespace: streaming
      syncPolicy:
        automated:
          prune: true
          selfHeal: true




✅ 기대 효과
• 실시간 데이터 통합: OLTP DB 변경 → 실시간 집계·알림 제공
• 운영 자동화: Kafka lag에 맞춰 Flink 리소스 자동 조정
• GitOps 일관성: 모든 매니페스트 Git 선언 → Argo CD Sync로 보장
• 자가복구: 클러스터 상태 Drift 발생 시 즉시 Self-Heal

.