Imagem do kafka-connect-zeebe:

berndruecker/kafka-connect-zeebe

Deploy do zeebe connector utilizando Helm Charts:

# Save this file as "helm-values-kafka-connect.yaml"
# Run:
#    git clone https://github.com/confluentinc/cp-helm-charts.git
#    helm install -f helm-values-kafka-connect.yaml --name kafka cp-helm-charts
 
## ------------------------------------------------------
## REST Proxy
## ------------------------------------------------------
cp-kafka-rest:
enabled: true
image: confluentinc/cp-kafka-rest
imageTag: 5.3.1
heapOptions: "-Xms512M -Xmx512M"
resources: {}
 
## ------------------------------------------------------
## Kafka Connect
## ------------------------------------------------------
cp-kafka-connect:
enabled: true
 
#Custom docker image of confluent connector + zeebe connector .jar embedded
image: berndruecker/kafka-connect-zeebe
 
imageTag: latest
imagePullPolicy: Always
heapOptions: "-Xms512M -Xmx512M"
resources: {}
 
## Kafka Connect properties
## ref: https://docs.confluent.io/current/connect/userguide.html#configuring-workers
configurationOverrides:
 
    #  "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster"
    "bootstrap.servers": "<host1:port1,host2:port2,...>" 
 
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    "key.converter.schemas.enable": "false"
    "value.converter.schemas.enable": "false"
    "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter"
    "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter"

Configurações dentro do ‘confluentinc/cp-helm-charts’

http://www.github.com/confluentinc

Dentro do chart do cp-kafka:

"offsets.topic.replication.factor": "1"

(Deve ser menor ou igual a quantidade de réplicas do kafka)

Deploy connectors

List connectors:

curl -X GET http://localhost:8083/connectors

Create source connector:

curl -X POST -H "Content-Type: application/json" --data @payment-source.json http://localhost:8083/connectors

Create sink connector:

curl -X POST -H "Content-Type: application/json" --data @payment-sink.json http://localhost:8083/connectors

Helm Zeebe Kafka Connect

https://github.com/zeebe-io/zeebe-helm

Exemplo “Microservice Orchestration”

https://github.com/zeebe-io/kafka-connect-zeebe/tree/master/examples/microservices-orchestration#deploy-connectors

Zeebe Hazelcast Exporter:

Deploy do zeebe broker + hazelcast exporter utilizando helm chart:

https://github.com/zeebe-io/zeebe-cluster-helm

COMO ADICIONAR EXPORTERS PARA O ZEEBE PELO HELM:

  • Criar um arquivo chamado ***config.yaml ***para adicionar os exporters (sobrescrevendo os valores de “InitContainer”).

  • No arquivo ***config.yaml ***escreva o código:

extraInitContainers: |
  - name: init-exporters-hazelcast
    image: busybox:1.28
    command: ['/bin/sh', '-c']
    args: ['wget --no-check-certificate https://repo1.maven.org/maven2/io/zeebe/hazelcast/zeebe-hazelcast-exporter/0.8.0-alpha1/zeebe-hazelcast-exporter-0.8.0-alpha1-jar-with-dependencies.jar -O /exporters/zeebe-hazelcast-exporter.jar; ls -al']
    volumeMounts:
    - name: exporters
      mountPath: /exporters/
 
zeebeCfg: |- 
  [[exporters]]
  id = "hazelcast"
  className = "io.zeebe.hazelcast.exporter.HazelcastExporter"
    [exporters.args]
    enabledValueTypes = "JOB,WORKFLOW_INSTANCE,DEPLOYMENT,INCIDENT,TIMER,VARIABLE,MESSAGE,MESSAGE_SUBSCRIPTION,MESSAGE_START_EVENT_SUBSCRIPTION"
    updatePosition = false
  • Faça o deploy do chart executando no terminal:
helm install -f config.yaml zeebe/zeebe-cluster --generate-name --namespace zeebe
  • Verifique se o JAR do exporter realmente se encontra dentro do diretório /exporters no pod do kubernetes:
kubectl --namespace zeebe exec --stdin --tty <nome-do-pod-zeebe> -- /bin/bash

Deploy ‘Kafka Connector’ com connectors do ‘confluent hub’:

kafka-connect-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    kompose.cmd: kompose convert
    kompose.version: 1.21.0 (992df58d8)
  creationTimestamp: null
  labels:
    io.kompose.service: kafka-connect
  name: kafka-connect
spec:
  replicas: 1
  selector:
    matchLabels:
      io.kompose.service: kafka-connect
  strategy:
    type: Recreate
  template:
    metadata:
      annotations:
        kompose.cmd: kompose convert
        kompose.version: 1.21.0 (992df58d8)
      creationTimestamp: null
      labels:
        io.kompose.service: kafka-connect
    spec:
      containers:
      - args:
        - bash
        - -c
        - "echo \"Installing connector plugins\"\nconfluent-hub install --no-prompt zeebe-io/kafka-connect-zeebe:0.22.0\n#\necho \"Launching Kafka Connect worker\"\n/etc/confluent/docker/run
          & \n#\nsleep infinity\n"
        env:
        - name: CONNECT_BOOTSTRAP_SERVERS
          value: kafka:9092
        - name: CONNECT_CONFIG_PROVIDERS
          value: file
        - name: CONNECT_CONFIG_PROVIDERS_FILE_CLASS
          value: org.apache.kafka.common.config.provider.FileConfigProvider
        - name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
          value: "1"
        - name: CONNECT_CONFIG_STORAGE_TOPIC
          value: _kafka-connect-group-01-configs
        - name: CONNECT_CUB_KAFKA_TIMEOUT
          value: "300"
        - name: CONNECT_GROUP_ID
          value: kafka-connect-group-01
        - name: CONNECT_INTERNAL_KEY_CONVERTER
          value: org.apache.kafka.connect.json.JsonConverter
        - name: CONNECT_INTERNAL_VALUE_CONVERTER
          value: org.apache.kafka.connect.json.JsonConverter
        - name: CONNECT_KEY_CONVERTER
          value: org.apache.kafka.connect.storage.StringConverter
        - name: CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN
          value: '[%d] %p %X{connector.context}%m (%c:%L)%n'
        - name: CONNECT_LOG4J_LOGGERS
          value: org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
        - name: CONNECT_LOG4J_ROOT_LOGLEVEL
          value: INFO
        - name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
          value: "1"
        - name: CONNECT_OFFSET_STORAGE_TOPIC
          value: _kafka-connect-group-01-offsets
        - name: CONNECT_PLUGIN_PATH
          value: /usr/share/java,/usr/share/confluent-hub-components/,/data/connect-jars
        - name: CONNECT_REST_ADVERTISED_HOST_NAME
          value: kafka-connect
        - name: CONNECT_REST_PORT
          value: "8083"
        - name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
          value: "1"
        - name: CONNECT_STATUS_STORAGE_TOPIC
          value: _kafka-connect-group-01-status
        - name: CONNECT_VALUE_CONVERTER
          value: io.confluent.connect.avro.AvroConverter
        - name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
          value: http://schema-registry:8081
        image: confluentinc/cp-kafka-connect:5.4.1
        imagePullPolicy: ""
        name: kafka-connect
        ports:
        - containerPort: 8083
        resources: {}
        volumeMounts:
        - mountPath: /data/credentials.properties
          name: kafka-connect-claim0
      restartPolicy: Always
      serviceAccountName: ""
      volumes:
      - name: kafka-connect-claim0
        persistentVolumeClaim:
          claimName: kafka-connect-claim0
status: {}

kafka-connect-volume-claim.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  creationTimestamp: null
  labels:
    io.kompose.service: kafka-connect-claim0
  name: kafka-connect-claim0
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 100Mi
status: {}

kafka-connect-service.yaml

apiVersion: v1
kind: Service
metadata:
  annotations:
    kompose.cmd: kompose convert
    kompose.version: 1.21.0 (992df58d8)
  creationTimestamp: null
  labels:
    io.kompose.service: kafka-connect
  name: kafka-connect
spec:
  ports:
  - name: "8083"
    port: 8083
    targetPort: 8083
  selector:
    io.kompose.service: kafka-connect
status:
  loadBalancer: {}

VALUES.YAML ZEEBE-FULL:

helm install zeebe zeebe/zeebe-full -n zeebe -f values.yaml

# Default values for zeebe-cluster.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
 
global: 
  elasticsearch:
    host: "elasticsearch-master"
    port: 9200 
  zeebe: "{{ .Release.Name }}-zeebe"
 
image:
  repository: camunda/zeebe
  tag: 0.24.3
  pullPolicy: IfNotPresent
 
clusterSize: "3"
partitionCount: "3"
replicationFactor: "3"
cpuThreadCount: "2"
ioThreadCount: "2"
pvcSize: "100Gi"
pvcAccessModes: [ "ReadWriteOnce" ]
env: []
logLevel: debug
 
gateway:
  replicas: 1
  logLevel: debug
  env: []
  podDisruptionBudget:
    enabled: false
    minAvailable: 1
    maxUnavailable:
  resources:  {}
 
elasticsearch:
  enabled: true
  imageTag: 6.8.5
 
kibana:
  enabled: true
  imageTag: 6.8.5
 
prometheus:
  enabled: false
  servicemonitor:
    enabled: false
 
JavaOpts: >-
  -XX:MaxRAMPercentage=25.0
  -XX:+HeapDumpOnOutOfMemoryError
  -XX:HeapDumpPath=/usr/local/zeebe/data
  -XX:ErrorFile=/usr/local/zeebe/data/zeebe_error%p.log
  -XX:+ExitOnOutOfMemoryError
 
labels:
  app: zeebe    
serviceType: ClusterIP
serviceHttpPort: 9600
serviceGatewayPort: 26500    
serviceCommandPort: 26501     
serviceInternalPort: 26502  
resources: 
  requests:
    cpu: 250m
    memory: 1Gi
  limits:
    cpu: 500m
    memory: 2Gi
probePath: /ready
readinessProbe:
  failureThreshold: 1
  periodSeconds: 10
  successThreshold: 1
  timeoutSeconds: 1
podDisruptionBudget:
  enabled: false
  minAvailable:
  maxUnavailable: 1
 
nodeSelector: {}
 
tolerations: []
 
affinity: {}

ZEEBE-KAFKA-CONNECTORS JSONs

[ ALL HAIL THE CONFLUENT CONTROL CENTER <3 ]

SOURCE CONNECTOR:

{
    "name": "poc-source",
    "config": {
        "zeebe.client.job.worker": "kafka-connector",
        "zeebe.client.job.pollinterval": "2000",
        "name": "poc-source",
        "connector.class": "io.zeebe.kafka.connect.ZeebeSourceConnector",
        "tasks.max": "1",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "zeebe.client.broker.contactPoint": "cluster-1-zeebe-gateway.zeebe.svc.cluster.local:26500",
        "zeebe.client.requestTimeout": "10000",
        "zeebe.client.security.plaintext": "true",
        "zeebe.client.worker.name": "kafka-connector",
        "zeebe.client.worker.maxJobsActive": "1000",
        "zeebe.client.job.timeout": "5000",
        "job.types": "chamadaInicial",
        "job.header.topics": "kafka-topic"
    }
}

A chave de “job.header.topics” deve ter o mesmo de “key” no modelador.

SINK CONNECTOR:

{
    "name": "flow-retail-sink",
    "config": {
      "connector.class": "io.zeebe.kafka.connect.ZeebeSinkConnector",
      "tasks.max": "1",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.storage.StringConverter",
      "topics": "payment-confirm",
  
      "errors.tolerance": "all",
      "errors.deadletterqueue.topic.name":"deadletterqueue",
      "errors.deadletterqueue.topic.replication.factor": 1,
  
      "zeebe.client.broker.contactPoint": "cluster-1-zeebe-gateway.zeebe.svc.cluster.local:26500",
      "zeebe.client.requestTimeout": "10000",
      "zeebe.client.security.plaintext": true,
  
      "message.path.messageName": "$.eventType",
      "message.path.correlationKey": "$.orderId",
      "message.path.variables": "$.['amount', 'orderId']"
    }
  }

Se em “message.path.variables” houver somente a variável de “message.path.correlationKey”, apagar a linha “message.path.variables”.

Testes:

Teste 1 - Subir os exemplos do zeebe

Teste 2 - PoC Gateway

Deploy de todos os connectors na pasta:

find . -iname '*.json' -exec curl -X POST -H "Content-Type: application/json" --data @{} http://localhost:8083/connectors \;

Delete all connectors:

for i in $(curl -X GET http://localhost:8083/connectors | jq -r '.[]'| sed -E 's/(^ *)"([^"]*)":/\1\2:/'); do curl -X DELETE http://localhost:8083/connectors/$i; done

Helm Chart : Zeebe + Operate + ElasticSearch + Kibana + Hazelcast + Zeeqs + Mancenter

zeebe-deployment.tar.xz


🌱 Back to Garden

2 items under this folder.