Kafka Technical

Kafka 실전 코드 (with Strimzi)

지난 포스트에서 Kafka의 개념에 대해 다루었고, 이번에는 예정대로 Kafka를 설치하고 Topic을 구성해서 데이터를 전달하는 과정까지 취급해 보고자 한다. kafka는 제공하는 Vendor도 다양하며, Community 와 Enterprise 형태로 제공하고 있다고 함축 할 수 있겠지만, 이번에는 컨테이너 트랜드에 맞춰 보다 쉽게 사용할 수 있게 제공되는 Strimzi 기반으로 진행하고자 한다.

  • Kafka 개요
  • Kafka 실전 코드
  • Kafka 사례

0. 시작하며

사전에 언급해야 할 사항들이 있는데, Kafka는 현재 Apache 에서 기본적으로 제공하고 있고, Kafka Connect 를 제공하여 보다 더 쉽게 Plugin을 확장하여 생산성과 사용형태를 극대화 한 Confluent Kafka 가 있다. 물론 Confluent Kafka는 UI 인터페이스를 제공하여 Apache Kafka 대비 터미널 환경에서 커맨드 작업을 대체 할 수 있을 만큼 제공하고 있고, 일부 고급 기능(RBAC 권한 등)은 Enterprise 형태로 제공하고 있다.

기본에 충실하려면, Apache Kafka나 Confluent Kafka를 기반으로 준비도 고려 하였으나, 기술은 또다시 발전을 거듭해서 귀찮은 내부 로직은 머리속에만 남겨두고 더 쉽게 쓰게 해주는 Strimzi 라는 컨테이너 환경 특화용 Kafka가 등장했다.

분산환경에 최적화된 Kubernetes에 Operator 라는 CRD 속성을 듬뿍 사용하여, 이제는 resource 유형으로 “Kafka”, “KafkaTopic”, “KafkaUser” 정도로 내가 사용할 Topic을 만들고 Partition과 복제 수준도 정의한다.

포스팅에 중점은 Kafka에 맞춰서 진행하며, 부가적으로 Kubernetes 설명은 이와 같은 사유로 생략되거나 간략히 될 수 있음을 안내하고 진행한다.

1. Strimzi

Kafka Concept (지난 복습)

아래 그림에서 이제 각 표기되는 명칭들의 의미가 어떤 것인지 생각해 보자. Broker 를 설명하였고, Topic은 1개로 보이며, Partition은 3개 Repication도 3개로 했음을 알 수 있겠다. 또한 각 Brocker에 Partition 0, 1, 2 는 Leader로써 역할을 수행하고, 나머지는 Follower가 되겠다. 물론 이 그림만으로는 최소 복제 수 설정은 알 수 없지만, 대략적인 구성 개념으로 지난 포스트를 복귀하고 다음으로 진행하겠다.

https://strimzi.io/docs/overview/latest/images/overview/kafka-concepts-key-concepts.png

Strimzi Overview

Strimzi Overview 문서에서 보면, ‘Kafka Component Architecture’로 안내하고 있다. 서론에서 잠깐 설명한 Kafka Connect가 보이고 자료를 제공하는 Source와 전달받는 Sink가 존재한다. 우측에 또 다른 Kafka Cluster 인 ‘Source Kafka Cluster’는 ‘Kafka MirrorMaker’에 의해 Kafka Cluster 레벨에서 데이터 복제를 수행한다. 이는 문서상에서는 DW 등을 위한 구조적인 Architecture를 제공하기 위한 것으로 설명되어 있다. 추가로 좌측에 ‘Kafka Bridge’는 Kafka에 Http 기반으로 Client에게 주고/받고를 전달하기 위한 Component가 존재한다. Java Client 특화된 Kafka 에서 Http 프로토콜을 제공함으로써, 다양한 종단 방식이 가능하고 IoT 장치로 부터 연속적인 데이터 처리에도 편리하게 접근할 수 있겠다.

https://strimzi.io/docs/overview/latest/images/overview/kafka-concepts-supporting-components.png

Strimzi Operator

공식적인 Kafka를 사용할 경우 Broker를 서버당 보통 1식으로 구성하고, kafka client를 통해 Topic 생성 명령을 수행하는 등 사용하기 위해 생각보다 손이 많이 가는 작업이며, Kafka 공식 문서에 있는 설정값(기동할 때 현대 설정값이 쭉쭉 올라간다)을 보면 A4 용지 1-2장은 거뜬히 채워보일듯한 내용으로 올라간다.

그렇기에, 최근에는 조금 더 편리하게 접근하고 안정성에 더 높은 점수를 받고 있는 Kubernetes 환경에서 시도를 많이 하게 되었는데, 시도 해본사람은 알겠지만, 이 역시 외부 Producer, Consumer를 고려하고 Node 장애를 고려하여 affinity 방안까지 고려한다면, 구성 방식을 만족하기가 쉽지 않다. (흔한말로 Kafka 전문가 아니면 컨테이너 환경은 시작하지 말란 소문도… 물론 전문가의 정의가 애매하지만…)

이런때에 Strimzi는 필요한 부분에 적절하게 공략한 듯 하다. 단순화 시켜버린 CRD 몇 종을 제공하면서 ‘Kafka 구성’ Resource 선언으로 Zookeeper와 Kafka Broker가 동작되고, ‘Topic 만들기’ 선언으로 Partition과 Repication 정의가 되며, ‘User 만들기’로 Topic에 접근을 정의할 수 있다. 모든 사항은 Kubernetes의 선언적 사용 방식에 맞게 해당 수준을 정의하면, Kubernetes는 위 사항을 맞추기 위해 노력해 주고 있고, 우리는 제공된 접점만으로 쉽게 사용 가능 하게 되었다.

https://strimzi.io/docs/overview/latest/images/operators.png

2. Install

Strimzi는 포스팅 작업일자 기준으로 0.17.0 버전이다. 버전을 보면, 아직 가야할 길이 먼 것 같지만 Kafka 역시 1.0.0에 도래 하지 않을 때 부터 기업에서도 많이 사용되었다.

Download

Strimzi는 Homepage를 통해 Github에서 한달에 몇번씩 release를 지속하여 제공하고 있다. 아래 내용을 다운로드 받고 압축을 풀자.

curl -o strimzi-0.17.0.tar.gz https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.17.0/strimzi-0.17.0.tar.gz

사전작업

우선 아래와 같이 Strimzi Kafka Cluster Operator를 위한 관리목적의 Kafka Namespace를 생성하자.

kubectl create ns kafka

Strimzi에는 아직 배포 패키징이 뚜렷해 지지 않아 일부 설치 파일에 직접적인 수정이 필요한 요소가 있다. Kubernetes의 namespace 지정부분은 OS에 따라 sed 명령의 옵션차이로 아래와 같이 조정할 것을 안내하고 있다.

  • On linux : sed -i ‘s/namespace: .*/namespace: kafka/’ install/cluster-operator/*RoleBinding*.yaml
  • On Mac : sed -i ” ‘s/namespace: .*/namespace: kafka/’ install/cluster-operator/*RoleBinding*.yaml

Kafka Cluster 사용목적의 Kubernetes Namespace 추가

Kafka Cluster를 구성할 Namespace를 생성하고, 아래의 특정 Manifest에 Namespace를 변경하자.

kubectl create ns my-kafka-project

‘install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml’ 파일 27라인 수준에 아래와 같이 ‘value: my-kafka-project’로 명시하자.

# ...
env:
- name: STRIMZI_NAMESPACE
  value: my-kafka-project
# ...

CRD 생성

아래 명령으로 CRD를 kafka Namespace에서 생성하고, RBAC 활용하여 CRD를 관리 할 수 있게 실행하자.

kubectl apply -f install/cluster-operator/ -n kafka

RBAC 권한 생성

kafka(관리 Namespace) 와 my-kafka-project(Kafka Cluster Namespace) Rolebinding을 부여하여 Operator가 my-kafka-project Namespace에 Resource를 제어할 수 있게 권한을 부여한다.

kubectl apply -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n my-kafka-project
kubectl apply -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n my-kafka-project
kubectl apply -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n my-kafka-project

3. Kafka Cluster Configuration

이 단계에 들어오면, Kafka Cluster를 구성하기 직전까지의 준비과정이 완료되었다. Kafka CRD를 생성했고, Cluster를 구축할 작업공간인 Kubernetes Namespace에 RBAC 권한 부여까지가 위 과정이다. 남은 과정에서는 Kubernetes 특성에 따른 저장공간(Persistent Volume)을 생성하고, 정의된 CRD 에 맞는 Resource 선언만으로 사용이 완료 된다.

Persistent Volume 생성

Kubernetes가 아닌 VM 기반 또는 Dedicated Server에서 Kafka를 구성한다면, 물리적인 저장공간은 서버 디스크 또는 스토리지등 마운트 포인트를 기준으로 지정위치를 선언하는 것으로 편리하지만, 컨테이너 환경에서는 휘발성이 전제되어 있기에 저장공간을 하나의 Resource로써 관리하여 사용하고 있다. (Manifest의 일부 주소는 환경에 마다 다를 수 있음으로 PV 생성에 대한 Kubernetes 정의 방식에 따라 참고만 바란다)

cat << EOF | kubectl create -f -
apiVersion: v1
kind: PersistentVolume
metadata:
  labels:
    app: kafka-0
  name: kafka-0
spec:
  accessModes:
  - ReadWriteOnce
  capacity:
    storage: 10Gi
  storageClassName: nfs-client
  nfs:
    path: /volume2/kubernetes/kafka
    server: 192.168.10.211
  persistentVolumeReclaimPolicy: Retain
---
apiVersion: v1
kind: PersistentVolume
metadata:
  labels:
    app: zookeeper-0
  name: zookeeper-0
spec:
  accessModes:
  - ReadWriteOnce
  capacity:
    storage: 10Gi
  claimRef:
    apiVersion: v1
    kind: PersistentVolumeClaim
    name: data-my-cluster-zookeeper-0
    namespace: my-kafka-project
  storageClassName: nfs-client
  nfs:
    path: /volume2/kubernetes/zookeeper
    server: 192.168.10.211
  persistentVolumeReclaimPolicy: Retain
EOF

Kafka Cluster 생성

Kubernets Resource로 정의된 아래의 ‘Kafka’ Resource 구성요소를 살펴보면, Kafka의 Broker 수를 의미하는 replicas를 정의할 수 있고, zookeeper 역시 수를 정의할 수 있다. 또한 Config 섹션에서 옵션을 정의 할 수 있다.

cat << EOF | kubectl create -n my-kafka-project -f -
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 1
    listeners:
      plain: {}
      tls: {}
      external:
        type: nodeport
        tls: false
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        class: nfs-client
        size: 10Gi
        deleteClaim: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      class: nfs-client
      size: 10Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}
EOF

Kafka Topic 생성

‘KafkaTopic’ Resource 정의에는 Partition의 수와 Replicas의 복제수를 정의 할 수 있다. 참고로 Partition을 늘리는 것은 가능하지만, 줄이는 것은 불가하다고 안내되어 있다. (https://strimzi.io/docs/latest/#type-KafkaTopicSpec-reference)

cat << EOF | kubectl create -n my-kafka-project -f -
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 3
  replicas: 1
EOF

생성 결과

Kubernetes 명령어로 생성된 Resource를 확인하면, 아래와 같이 확인 할 수 있다. ‘statefulset’ 으로 kafka와 zookeeper가 동작되었고, 외부 접근이 가능한 Kubernetes NodePort 타입으로 Kafka 외부 접근 주소가 제공되며, 내부접근 용도는 별도 분리되어 있다.

추가로 CRD로 생성한 ‘kafkatopic’, ‘kafka’ 정보도 확인하면, 생성한 Partition과 Replication 수와 일치한 정보를 제공한다. (Kafka는 Consumer에 의한 offset 처리도 내부적인 topic으로 관리하기에 kafkatopic CRD상에 보여진다)

❯ kubectl get all
NAME                                              READY   STATUS    RESTARTS   AGE
pod/my-cluster-entity-operator-79cfb4cdf8-vqstb   3/3     Running   3          139m
pod/my-cluster-kafka-0                            2/2     Running   2          141m
pod/my-cluster-zookeeper-0                        2/2     Running   0          142m

NAME                                          TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
service/my-cluster-kafka-0                    NodePort    10.100.111.218   <none>        9094:30275/TCP               141m
service/my-cluster-kafka-bootstrap            ClusterIP   10.99.255.72     <none>        9091/TCP,9092/TCP,9093/TCP   141m
service/my-cluster-kafka-brokers              ClusterIP   None             <none>        9091/TCP,9092/TCP,9093/TCP   141m
service/my-cluster-kafka-external-bootstrap   NodePort    10.111.36.105    <none>        9094:32167/TCP               141m
service/my-cluster-zookeeper-client           ClusterIP   10.99.213.150    <none>        2181/TCP                     142m
service/my-cluster-zookeeper-nodes            ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   142m

NAME                                         READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/my-cluster-entity-operator   1/1     1            1           139m

NAME                                                    DESIRED   CURRENT   READY   AGE
replicaset.apps/my-cluster-entity-operator-79cfb4cdf8   1         1         1       139m

NAME                                    READY   AGE
statefulset.apps/my-cluster-kafka       1/1     141m
statefulset.apps/my-cluster-zookeeper   1/1     142m

❯ kubectl get kafkatopic
NAME                                                          PARTITIONS   REPLICATION FACTOR
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a   50           1
my-topic                                                      3            1

❯ kubectl get kafka
NAME         DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS
my-cluster   1                        1

4. Testing

모든 Kafka Cluster 구성은 완료 되었고, 실제 데이터 전달과정을 테스트 하기 위해서는 Apache Kafka를 다운로드 하여, Producer, Consumer 쉘을 사용하여 터미널에서 확인할 수 있다.

Download Apache Kafka

http://kafka.apache.org/downloads 주소에서 Binary 버전의 Kafka를 다운받을 수 있다. 다운받은 후 압축을 풀면 된다.

http://kafka.apache.org/downloads

Producer 실행

Apache Kafka 위치에서 아래와 같은 명령을 실행한다. ‘broker-list’는 생성결과에서 NodePort의 정보가 되어 Kubernetes Node IP와 Port 정보를 기술한다. ‘topic’ 정보는 생성한 Topic 명칭이다. Producer는 데이터 전달의 역할이기 때문에 실행하면, 터미널 창은 입력 대기 상태가 되어 key를 입력받을 수 있고, key를 입력하면 이제 Consumer 실행 이후에 데이터가 전달 된다.

❯ bin/kafka-console-producer.sh --broker-list 192.168.10.31:32167 --topic my-topic
> hello world!
> 

Consumer 실행

새로운 터미널 창에서 Apache Kafka 위치에서 아래와 같이 명령을 실행한다. ‘bootstrap-server’ 항목은 broker 정보를 기술하고, topic정보도 동일하다. 실행하면, 메세지가 출력된다.

❯ bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.31:32167 --topic my-topic --from-beginning
hello world!

5. 마무리

간략한 사용방법과 결과를 정리하여 메시지 전송결과가 바로 확인되는 내용으로는 쉽게 보였지만, 1장 개요에서 언급한 내용 수준의 오류가 발생된 사항과 전달보장을 위한 옵션과 그에 대한 상황 발생되는 정도의 시나리오가 부족해서 다소 유흥이 약하다. 보다 손쉬운 데이터를 발생시키고 전달하면서 데이터 복제를 확인하고, 장애상황을 유발시키는 재미를 가미하여, ‘심층 테스트’ 목차를 추가하는 내용으로 포스팅 내용을 보완할 예정이다.

그전에 우선 Kafka Cluster 환경을 가장 손쉽게 구축하는 방법으로 Strimzi를 소개하고 적용하는 수준으로 정리하며, 3장 사례에서 다음 이야기를 계속했으면 한다.

댓글 남기기

이 사이트는 스팸을 줄이는 아키스밋을 사용합니다. 댓글이 어떻게 처리되는지 알아보십시오.

%d 블로거가 이것을 좋아합니다: