Kafka

Kafka Strimzi Drill Down

0.개요

업무중 Kafka Strimizi 를 다룬적이 있어 관련내용을 정리해 볼까 한다.

먼저 개요에 대한 썰을 살짝 풀자면…
Kafka를 공부하면서 세상에 많고 많은 메세징 시스템중에 왜 kafka가 평정하게 된 것인가를 이해할 수 있었다.
높은실시간 처리량, 다양한 제품과 연동의 유연성, 데이터분실리스크에 대한 신뢰성, 데이터처리의 유연성 등 무장한 kafka 는 기존에 흔히들 사용되는 메세지 큐 제품과는 큰 차이를 두고 있다.

kafka 를 운용하려면 통상 N개(최소3대) 이상의 broker를 결합한 cluster 구성이 필요한데 바로 Container 환경이 kafka cluster 를 구성할 수 있는 적절한 수단이 될 수 있으며 이러한 배경이 Strimzi 를 탄생시키게 된듯 하다. Strmizi 를 이용하면 kubernetes 에서 어렵지 않게 kafka cluster 를 구성하고 사용할 수 있다. 실제로 kubernetes 를 실행하는 yaml 파일 하나로 cluster 가 손쉽게 구성되고 topic 또한 CRD로 구성된 resource 를 이요하면 쉽게 생성이 가능하다.

kafka 개요는 앞서 Joy 님의 글이 잘 정리되어 있으니 이쯤으로 마무리한다. 또한 Strimzi 기본 코드(topic 생성 등) 또한 앞 글로 대체하며 이번 글에서는 좀더 심화 과정인 Kafka Srimzi 의 보안과 Restful Bridge, monitoring 등의 내용으로 정리하고자 한다.

그러면 보안 부터 살펴보자.

1. 보안

실제 kafka 를 좀더 다양한 비즈니스 환경에서 운영하려면 반드시 보안기능이 제공되어야 한다. 예를들어 서로 이해관계가 서로 다른 부서에 운영중인 Topic 들은 그 자체가 상호간에 지켜야 할 비밀일 수 있다. 그러므로 반드시 권한을 받은 사용자 또는 App 만이 특정 Topic 에 접근해야 한다. Kafka Strimzi 에서는 기본적으로 SASL 프레임워크를 사용할 수 있다.

1) SASL이란 무엇인가?

위키피디아의 설명(https://ko.wikipedia.org/wiki/SASL)에 따르면 "인터넷 프로토콜에서 인증과 데이터 보안을 위한 프레임워크이며, 애플리케이션 프로토콜들로부터 인증 메커니즘을 분리시킨다." 라고 설명하고 있다. 또한 SASL로 제공된 어떤 인증 메커니즘도 허용하며, SASL에 의해 제공된 서비스를 보완하기 위해 전송 계층 보안(TLS)도 지원한다고 설명하고 있다.

Introduction to Simple Authentication Security Layer (SASL) - Developer's  Guide to Oracle® Solaris 11.3 Security

좀더 간단히 설명하자면 Kafka가 지원하는 Kerberos, PLAIN, SCRAM, OAUTHBEARER 등의 메커니즘을 사용하여 인증/인가를 할 수 있도록 해주며, 인증/인가 교환이 성공했을 때, 후속 데이터 교환을 데이터 보안 계층 위에서 할 수 있도록 해 주는 기술이다.

SASL이 지원하는 Kafka 메커니즘

  • SASL/GSSAPI (Kerberos) – 0.9.0.0 부터
  • SASL/PLAIN – 0.10.0.0 부터
  • SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 – 0.10.2.0 부터
  • SASL/OAUTHBEARER – 2.0 부터

이중 SASL/SCRAM-SHA-512 인증 방식의 샘플을 살펴보자.

2) Cluster 생성

kafka cluster 생성시에 인증메커니즘을 지정할 수 있으며 이때 지정된 인증 방식으로 접근을 제어하게 된다.
예를 들면 authentication.type이 scram-sha-512 일 경우 반드시 인증(user/pass)이 필요하다.

$ cat 11.kafka-my-cluster.yaml
---
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
spec:
  entityOperator:
    topicOperator: {}
    userOperator: {}
  kafka:
    authorization:
      type: simple
    config:
      log.message.format.version: '2.4'
      offsets.topic.replication.factor: 1
      transaction.state.log.min.isr: 1
      transaction.state.log.replication.factor: 1
    listeners:
      plain:
        authentication:
          type: scram-sha-512
      tls: {}
    replicas: 3
    storage:
      type: ephemeral
    version: 2.4.0
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
  • 인증메커니즘
    • 인증방식은 SASL 이 지원하는 메커니즘 중 하나인 scram-sha-512 방식이며 Broker 를 SASL 방식으로 구성하게 된다.
  • cluster 생성
oc -n kafka create -f 11.kafka-my-cluster.yaml
oc -n kafka delete -f 11.kafka-my-cluster.yaml

3) User 생성

kafkaUser 를 strimzi 에서 제공되는 resource 중 하나이며 scram-sha-512 방식의 인증 사용시 해당 User / Password 를 사용하게 된다. KafkaUser 를 생성하면 Opaque type의 secret 이 생성되며 이 값이 인증 password 로 사용된다. 또한 kafkaUser생성시 어떤 topic 에 접근 가능할지, 또는 어떤 Group 에 접근할지에 대한 권한을 부여할 수 있다. 그럼 아래와 같이 kafkaUser 를 생성해 보자.

cat > 11.KafkaUser-my-bridge-user-my-topic.yaml
---
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: millet-user
  labels:
    strimzi.io/cluster: my-cluster
  namespace: kafka
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
      - operation: All
        resource:
          type: topic
          name: millet
          patternType: prefix
      - operation: All
        resource:
          name: millet
          patternType: prefix
          type: group

acls 항목을 보면 접근가능한 topic 과 group 을 명시했는데 아래와 같이 해석된다.

  • millet로 시작하는 topic 을 모두 처리가능
    • ex) millet.my-topic, millet.my-topic2
  • millet 로 시작하는 consumer group 에 접근가능
    • ex) millet-consumer-group

위 yaml 파일을 이용해서 kafkauUser 를 생성해 보자.

$ oc -n kafka create -f 11.KafkaUser-millet-user-my-topic.yaml

$ oc -n kafka get secret millet-user
NAME             TYPE      DATA      AGE
millet-user   Opaque    1         26d

# user/pass 
  user : millet-user
  pass : XVDMDpGgaTTu

이제 kafka broker 에 접근가능한 millet-user 와 secret 에 명시된 password 가 부여 되었다.

4) [Client Connect] – kafka shell producer set

(1) 인증 file 셋팅

$ cat > kafka_client_jaas.conf
---
KafkaClient {
  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="millet-user"
  password="XVDMDpGgaTTu";
};
---


$ cat > client-acl.properties
---
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
---

(2) kafka-console-producer-acl

$ cat > kafka-console-producer-acl.sh
---
#!/bin/bash

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec /opt/kafka/bin/kafka-run-class.sh -Djava.security.auth.login.config=./kafka_client_jaas.conf kafka.tools.ConsoleProducer "$@"
---

$ chmod 777 kafka-console-producer-acl.sh

(3) 실행

./kafka-console-producer-acl.sh --broker-list my-cluster-kafka-bootstrap:9092 --producer.config ./client-acl.properties --topic my-topic

5) [Client Connect] -python Code Sample

위에서 생성한 User/passsword 를 이용하여 kafkacluster 에 접근하는 간단한 python code 작성해 보자.

(1) producer

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='my-cluster-kafka-bootstrap.kafka.svc:9092',
                        security_protocol="SASL_PLAINTEXT",
                        sasl_mechanism='SCRAM-SHA-512',
                        sasl_plain_username='millet-user',
                        sasl_plain_password='XVDMDpGgaTTu')

producer.send('millet-my-topic', b'python test2')
producer.send('millet-my-topic', b'python test3')
producer.send('millet-my-topic', b'python test4')

(2) consumer

from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers='my-cluster-kafka-bootstrap.kafka.svc:9092',
                        security_protocol="SASL_PLAINTEXT",
                        sasl_mechanism='SCRAM-SHA-512',
                        sasl_plain_username='millet-user',
                        sasl_plain_password='XVDMDpGgaTTu',
                        auto_offset_reset='earliest',
                        enable_auto_commit= True,
                        group_id='millet-consumer-group')
# topic 확인
consumer.topics()

# 사용할 topic 지정(구독)
consumer.subscribe("millet-my-topic")
consumer.subscription()    ## {'millet-my-topic'}

# 메세지 읽기
for message in consumer:
   print("topic=%s partition=%d offset=%d: key=%s value=%s" %
        (message.topic,
          message.partition,
          message.offset,
          message.key,
          message.value))

실행결과

topic=millet-my-topic3 partition=0 offset=16: key=None value=b'python test2'
topic=millet-my-topic3 partition=2 offset=7: key=None value=b'python test3'
topic=millet-my-topic3 partition=0 offset=17: key=None value=b'python test4'

2. Restful Bridge

Strimzi 로 Cluster 를 생성했다고 표현 했지만 사실 내부는 kafka 구성된 일반 적인 cluster 의 모습과 크게 다르지 않다. 다만 kubernetes 사상에 맞도록 접근할 수 있는 endpoint 는 Service로 , zookeeper 와 broker 는 POD 구성되어 서로 cluster를 이루고 있을 뿐이다. 그러므로 Broker Service 를 이용한다면 kafka broker 에 native 접근이 가능한데 보통 아래와 같은 형식이다.

my-cluster-kafka-bootstrap.kafka.svc:9092

중요한 점은 위 샘플과 같이 TCP 통신으로 접근한다는 것이다. kubernetes cluster 내에서라면 통신하는데 문제가 없지만 cluster 외부에서 접근하고자 한다면 설정이 복잡해 진다. 더구나 사내에서만 사용되는 패쇄망일 경우에는 방화벽과 같은 좀더 많은 제약사항이 존재한다. (kubernetes 내 pod간의 통신은 RESTFUL 이 권장되고 있다.) 다행스럽게도 Strimzi 에서는 Rest 로 접근가능한 Bridge 라는 리소스를 제공한다. Cluster 외에 존재하는 원격지 Cluster나 로컬에서 이 Bridge를 이용한다면 비교적 쉽게 접근이 가능하다.

1) Bridge 구조

Using Strimzi (0.19.0)

그럼 어떻게 Bridge 를 생성하고 접근하는지 확인해 보자.

2) Bridge 생성

cat > 11.kafka-bridge-mybridge-sha-512.yaml
---
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaBridge
metadata:
  name: millet
  namespace: kafka
spec:
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  http:
    port: 8080
  authentication:
    type: scram-sha-512
    username: millet-user
    passwordSecret:
      secretName: millet-user
      password: password
---

확인 위 yaml 을 실행하면 kafka namespace 에 bridge pod 와 service 가 생성된다. 또한 route 나 nodeport 등으로 서비스를 외부로 노출시킨다면 local에서도 접근이 가능해 진다. route 를 생성 후 curl 로 health check 를 시도해보자.

curl -i -X GET http://millet-bridge.ktdscoe.myds.me/healthy

3) Bridge를 이용한 Consumer 처리

Consumer 생성

# Create a Kafka Bridge consumer group
$ curl -i -X POST http://millet-bridge.ktdscoe.myds.me/consumers/millet-consumer-group \
  -H 'content-type: application/vnd.kafka.v2+json' \
  -d '{
    "name": "millet-my-topic-consumer",
    "auto.offset.reset": "earliest",
    "format": "json",
    "enable.auto.commit": true,
    "fetch.min.bytes": 512,
    "consumer.request.timeout.ms": 30000
  }'
  
HTTP/1.1 200 OK
content-type: application/vnd.kafka.v2+json
content-length: 144
Set-Cookie: 1a0e244938f08e012e4d52b665258d7a=7500577add36080bbdc1931ca15dbcc0; path=/; HttpOnly

{"instance_id":"millet-consumer","base_uri":"http://millet-bridge.ktdscoe.myds.me:80/consumers/millet-consumer-group/instances/millet-consumer"}     
---
  • "enable.auto.commit": false 로 설정했다면 offset commit 이 필요하다.

Consumer 구독

  • 구독
$ curl -i -X POST http://millet-bridge.ktdscoe.myds.me/consumers/millet-consumer-group/instances/millet-my-topic-consumer/subscription \
  -H 'content-type: application/vnd.kafka.v2+json' \
  -d '{
    "topics": [
        "millet-my-topic"
    ]
  }'

HTTP/1.1 204 No Content   ← 성공
  • 토픽은 싱글또는 멀티를 포함할 수 있다.
  • 멀티타픽 구독을 원할 경우 정규식을 이용할 수 있다.
  • topic-array 대신 topic_pattern 을 사용할 수 있다.

구독중인 토픽 읽어오기

  • 구독중인 토픽 읽어오기
# pattern
  --> /consumers/{groupid}instances/{name}/subscription

$ curl -i -X GET http://millet-bridge.ktdscoe.myds.me/consumers/millet-consumer-group/instances/millet-my-topic-consumer/subscription \
  -H 'content-type: application/vnd.kafka.json.v2+json'
  
HTTP/1.1 200 OK
content-type: application/vnd.kafka.v2+json
content-length: 159

{
"topics":["millet-my-topic","millet-my-topic3","millet.my-topic"],
"partitions":[{"millet-my-topic":[0]},{"millet.my-topic":[0]},{"millet-my-topic3":[0,2,1]}]
}

consumer 확인

$ curl -i -X GET http://millet-bridge.ktdscoe.myds.me/consumers/millet-consumer-group2instances/millet-my-topic-consumer/records \
  -H 'accept: application/vnd.kafka.json.v2+json'

[
{"topic":"millet-my-topic","key":null,"value":"millet-test-0001","partition":0,"offset":0}
]

[
{"topic":"millet-my-topic","key":null,"value":"sales-lead-0002","partition":2,"offset":0},
{"topic":"millet-my-topic","key":null,"value":"millet-test-0001","partition":0,"offset":1},
{"topic":"millet-my-topic","key":"my-key","value":"sales-lead-0001","partition":0,"offset":2},
{"topic":"millet-my-topic","key":null,"value":"sales-lead-0003","partition":0,"offset":3}
]

[
{"topic":"millet-my-topic","key":null,"value":"millet-test-0001","partition":2,"offset":2},
{"topic":"millet-my-topic","key":null,"value":"millet-test-0001","partition":2,"offset":3},
{"topic":"millet-my-topic","key":null,"value":"millet-test-0001","partition":1,"offset":0},
{"topic":"millet-my-topic","key":null,"value":"millet-test-0001","partition":0,"offset":6}
]

seek offset

특정한 메세지를 읽어오기 위해 offset 을 이동시킨다.

$ curl -X POST http://localhost:8080/consumers/bridge-consumer-group/instances/bridge-consumer/positions \
  -H 'content-type: application/vnd.kafka.v2+json' \
  -d '{
    "offsets": [
        {
            "topic": "millet-my-topic",
            "partition": 0,
            "offset": 2
        }
    ]
}'

4) Bridge 를 이용한 Producer 처리

$ curl -X POST http://millet-bridge.ktdscoe.myds.me/topics/millet-my-topic \
  -H 'content-type: application/vnd.kafka.json.v2+json' \
  -d '{ 
    "records": [ 
        { 
            "key": "my-key",
            "value": "millet-test-0001" 
        } 
    ] 
  }'

3. 모니터링

Strimzi는 간단한 설정만으로 KafkaCluster 에 대한 모니터링 시스템을 구축할 수 있다. Kafka Exporter라는 agent 를 제공하여 Kafka metric 를 수집하고 Prometheus를 통해 집결 시킨후 Grafana 를 통해 그래프를 확인하는 구성이다.

1) 프로 메테우스 모니터링

Prometheus 는 오픈 소스 모니터링 솔루션으로 클라우드 네이티브 세계에서 통계 및 경고에 대한 사실상의 표준이 되었다. 다른 많은 모니터링 시스템과 달리 Prometheus를 사용하면 응용 프로그램이 메트릭을 Prometheus로 푸시하지 않아도 된다. 대신 Prometheus는 응용 프로그램에서 메트릭을 스크랩 (수집)하여 시계열 데이터베이스에 저장한다. Prometheus가 이러한 데이터를 스크랩하는 인터페이스는 메트릭과 함께 텍스트 출력을 제공하는 간단한 HTTP 엔드 포인트이며 Strimzi 에서는 Exporter 가 그 역할을 수행한다.

Monitoring Kafka on Kubernetes with Prometheus | by Agraj Mangal | Medium

2) Exporter 설치

모니터링이 필요할 경우 kafka Cluster 를 설치시 Exporter 를 함께 구성한다.

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
  ...
  kafka:
  ...
  zookeeper:
  ...
  # 아래와 같이 추가
  kafkaExporter:
    groupRegex: ".*"
    topicRegex: ".*"
    resources:
      requests:
        cpu: 200m
        memory: 64Mi
      limits:
        cpu: 500m
        memory: 128Mi
    logging: debug
    enableSaramaLogging: true
    template:
      pod:
        metadata:
          labels:
            label1: value1
        imagePullSecrets:
          - name: my-docker-credentials
        securityContext:
          runAsUser: 1000001
          fsGroup: 0
        terminationGracePeriodSeconds: 120
    readinessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
  • exporter 전용이미지는 기본적으로 Strimzi 가 제공하는 이미지에 포함되어 있으므로 생략해도 된다. 하지만 별도의 이미지 사용을 원할때는 아래와 같이 명시해야 한다.
    • image: strimzi/kafka:0.17.0-kafka-2.4.0
  • Kafka Exporter 사용이 kafka cluster 를 설치이후에 결정되었다면 위와 같이 yaml 파일만 수정후 apply 해도 추가된다.(재설치 하지 않아도 된다.)

3) grafana 확인

  • prometheus / grafana 는 일반적인 내용과 동일하다.
  • grafana 에서는 strimzi dashboard 를 찾아서 import 한다. Kafka Exporter dashboard

1 comment

댓글 남기기

이 사이트는 스팸을 줄이는 아키스밋을 사용합니다. 댓글이 어떻게 처리되는지 알아보십시오.

%d 블로거가 이것을 좋아합니다: