Imagem do kafka-connect-zeebe:
berndruecker/kafka-connect-zeebe
Deploy do zeebe connector utilizando Helm Charts:
# Save this file as "helm-values-kafka-connect.yaml"
# Run:
# git clone https://github.com/confluentinc/cp-helm-charts.git
# helm install -f helm-values-kafka-connect.yaml --name kafka cp-helm-charts
## ------------------------------------------------------
## REST Proxy
## ------------------------------------------------------
cp-kafka-rest:
enabled: true
image: confluentinc/cp-kafka-rest
imageTag: 5.3.1
heapOptions: "-Xms512M -Xmx512M"
resources: {}
## ------------------------------------------------------
## Kafka Connect
## ------------------------------------------------------
cp-kafka-connect:
enabled: true
#Custom docker image of confluent connector + zeebe connector .jar embedded
image: berndruecker/kafka-connect-zeebe
imageTag: latest
imagePullPolicy: Always
heapOptions: "-Xms512M -Xmx512M"
resources: {}
## Kafka Connect properties
## ref: https://docs.confluent.io/current/connect/userguide.html#configuring-workers
configurationOverrides:
# "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster"
"bootstrap.servers": "<host1:port1,host2:port2,...>"
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
"key.converter.schemas.enable": "false"
"value.converter.schemas.enable": "false"
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter"
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter"Configurações dentro do ‘confluentinc/cp-helm-charts’
http://www.github.com/confluentinc
Dentro do chart do cp-kafka:
"offsets.topic.replication.factor": "1"(Deve ser menor ou igual a quantidade de réplicas do kafka)
Deploy connectors
List connectors:
curl -X GET http://localhost:8083/connectorsCreate source connector:
curl -X POST -H "Content-Type: application/json" --data @payment-source.json http://localhost:8083/connectorsCreate sink connector:
curl -X POST -H "Content-Type: application/json" --data @payment-sink.json http://localhost:8083/connectorsHelm Zeebe Kafka Connect
https://github.com/zeebe-io/zeebe-helm
Exemplo “Microservice Orchestration”
Zeebe Hazelcast Exporter:
Deploy do zeebe broker + hazelcast exporter utilizando helm chart:
https://github.com/zeebe-io/zeebe-cluster-helm
COMO ADICIONAR EXPORTERS PARA O ZEEBE PELO HELM:
-
Criar um arquivo chamado ***config.yaml ***para adicionar os exporters (sobrescrevendo os valores de “InitContainer”).
-
No arquivo ***config.yaml ***escreva o código:
extraInitContainers: |
- name: init-exporters-hazelcast
image: busybox:1.28
command: ['/bin/sh', '-c']
args: ['wget --no-check-certificate https://repo1.maven.org/maven2/io/zeebe/hazelcast/zeebe-hazelcast-exporter/0.8.0-alpha1/zeebe-hazelcast-exporter-0.8.0-alpha1-jar-with-dependencies.jar -O /exporters/zeebe-hazelcast-exporter.jar; ls -al']
volumeMounts:
- name: exporters
mountPath: /exporters/
zeebeCfg: |-
[[exporters]]
id = "hazelcast"
className = "io.zeebe.hazelcast.exporter.HazelcastExporter"
[exporters.args]
enabledValueTypes = "JOB,WORKFLOW_INSTANCE,DEPLOYMENT,INCIDENT,TIMER,VARIABLE,MESSAGE,MESSAGE_SUBSCRIPTION,MESSAGE_START_EVENT_SUBSCRIPTION"
updatePosition = false- Faça o deploy do chart executando no terminal:
helm install -f config.yaml zeebe/zeebe-cluster --generate-name --namespace zeebe- Verifique se o JAR do exporter realmente se encontra dentro do diretório /exporters no pod do kubernetes:
kubectl --namespace zeebe exec --stdin --tty <nome-do-pod-zeebe> -- /bin/bash
/Untitled-495.png)
Deploy ‘Kafka Connector’ com connectors do ‘confluent hub’:
kafka-connect-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
kompose.cmd: kompose convert
kompose.version: 1.21.0 (992df58d8)
creationTimestamp: null
labels:
io.kompose.service: kafka-connect
name: kafka-connect
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: kafka-connect
strategy:
type: Recreate
template:
metadata:
annotations:
kompose.cmd: kompose convert
kompose.version: 1.21.0 (992df58d8)
creationTimestamp: null
labels:
io.kompose.service: kafka-connect
spec:
containers:
- args:
- bash
- -c
- "echo \"Installing connector plugins\"\nconfluent-hub install --no-prompt zeebe-io/kafka-connect-zeebe:0.22.0\n#\necho \"Launching Kafka Connect worker\"\n/etc/confluent/docker/run
& \n#\nsleep infinity\n"
env:
- name: CONNECT_BOOTSTRAP_SERVERS
value: kafka:9092
- name: CONNECT_CONFIG_PROVIDERS
value: file
- name: CONNECT_CONFIG_PROVIDERS_FILE_CLASS
value: org.apache.kafka.common.config.provider.FileConfigProvider
- name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_CONFIG_STORAGE_TOPIC
value: _kafka-connect-group-01-configs
- name: CONNECT_CUB_KAFKA_TIMEOUT
value: "300"
- name: CONNECT_GROUP_ID
value: kafka-connect-group-01
- name: CONNECT_INTERNAL_KEY_CONVERTER
value: org.apache.kafka.connect.json.JsonConverter
- name: CONNECT_INTERNAL_VALUE_CONVERTER
value: org.apache.kafka.connect.json.JsonConverter
- name: CONNECT_KEY_CONVERTER
value: org.apache.kafka.connect.storage.StringConverter
- name: CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN
value: '[%d] %p %X{connector.context}%m (%c:%L)%n'
- name: CONNECT_LOG4J_LOGGERS
value: org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
- name: CONNECT_LOG4J_ROOT_LOGLEVEL
value: INFO
- name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_OFFSET_STORAGE_TOPIC
value: _kafka-connect-group-01-offsets
- name: CONNECT_PLUGIN_PATH
value: /usr/share/java,/usr/share/confluent-hub-components/,/data/connect-jars
- name: CONNECT_REST_ADVERTISED_HOST_NAME
value: kafka-connect
- name: CONNECT_REST_PORT
value: "8083"
- name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_STATUS_STORAGE_TOPIC
value: _kafka-connect-group-01-status
- name: CONNECT_VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverter
- name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://schema-registry:8081
image: confluentinc/cp-kafka-connect:5.4.1
imagePullPolicy: ""
name: kafka-connect
ports:
- containerPort: 8083
resources: {}
volumeMounts:
- mountPath: /data/credentials.properties
name: kafka-connect-claim0
restartPolicy: Always
serviceAccountName: ""
volumes:
- name: kafka-connect-claim0
persistentVolumeClaim:
claimName: kafka-connect-claim0
status: {}kafka-connect-volume-claim.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
creationTimestamp: null
labels:
io.kompose.service: kafka-connect-claim0
name: kafka-connect-claim0
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Mi
status: {}kafka-connect-service.yaml
apiVersion: v1
kind: Service
metadata:
annotations:
kompose.cmd: kompose convert
kompose.version: 1.21.0 (992df58d8)
creationTimestamp: null
labels:
io.kompose.service: kafka-connect
name: kafka-connect
spec:
ports:
- name: "8083"
port: 8083
targetPort: 8083
selector:
io.kompose.service: kafka-connect
status:
loadBalancer: {}VALUES.YAML ZEEBE-FULL:
helm install zeebe zeebe/zeebe-full -n zeebe -f values.yaml
# Default values for zeebe-cluster.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
global:
elasticsearch:
host: "elasticsearch-master"
port: 9200
zeebe: "{{ .Release.Name }}-zeebe"
image:
repository: camunda/zeebe
tag: 0.24.3
pullPolicy: IfNotPresent
clusterSize: "3"
partitionCount: "3"
replicationFactor: "3"
cpuThreadCount: "2"
ioThreadCount: "2"
pvcSize: "100Gi"
pvcAccessModes: [ "ReadWriteOnce" ]
env: []
logLevel: debug
gateway:
replicas: 1
logLevel: debug
env: []
podDisruptionBudget:
enabled: false
minAvailable: 1
maxUnavailable:
resources: {}
elasticsearch:
enabled: true
imageTag: 6.8.5
kibana:
enabled: true
imageTag: 6.8.5
prometheus:
enabled: false
servicemonitor:
enabled: false
JavaOpts: >-
-XX:MaxRAMPercentage=25.0
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/usr/local/zeebe/data
-XX:ErrorFile=/usr/local/zeebe/data/zeebe_error%p.log
-XX:+ExitOnOutOfMemoryError
labels:
app: zeebe
serviceType: ClusterIP
serviceHttpPort: 9600
serviceGatewayPort: 26500
serviceCommandPort: 26501
serviceInternalPort: 26502
resources:
requests:
cpu: 250m
memory: 1Gi
limits:
cpu: 500m
memory: 2Gi
probePath: /ready
readinessProbe:
failureThreshold: 1
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
podDisruptionBudget:
enabled: false
minAvailable:
maxUnavailable: 1
nodeSelector: {}
tolerations: []
affinity: {}ZEEBE-KAFKA-CONNECTORS JSONs
[ ALL HAIL THE CONFLUENT CONTROL CENTER <3 ]
SOURCE CONNECTOR:
{
"name": "poc-source",
"config": {
"zeebe.client.job.worker": "kafka-connector",
"zeebe.client.job.pollinterval": "2000",
"name": "poc-source",
"connector.class": "io.zeebe.kafka.connect.ZeebeSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"zeebe.client.broker.contactPoint": "cluster-1-zeebe-gateway.zeebe.svc.cluster.local:26500",
"zeebe.client.requestTimeout": "10000",
"zeebe.client.security.plaintext": "true",
"zeebe.client.worker.name": "kafka-connector",
"zeebe.client.worker.maxJobsActive": "1000",
"zeebe.client.job.timeout": "5000",
"job.types": "chamadaInicial",
"job.header.topics": "kafka-topic"
}
}A chave de “job.header.topics” deve ter o mesmo de “key” no modelador.
/Untitled-496.png)
SINK CONNECTOR:
{
"name": "flow-retail-sink",
"config": {
"connector.class": "io.zeebe.kafka.connect.ZeebeSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"topics": "payment-confirm",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name":"deadletterqueue",
"errors.deadletterqueue.topic.replication.factor": 1,
"zeebe.client.broker.contactPoint": "cluster-1-zeebe-gateway.zeebe.svc.cluster.local:26500",
"zeebe.client.requestTimeout": "10000",
"zeebe.client.security.plaintext": true,
"message.path.messageName": "$.eventType",
"message.path.correlationKey": "$.orderId",
"message.path.variables": "$.['amount', 'orderId']"
}
}Se em “message.path.variables” houver somente a variável de “message.path.correlationKey”, apagar a linha “message.path.variables”.
Testes:
Teste 1 - Subir os exemplos do zeebe
Deploy de todos os connectors na pasta:
find . -iname '*.json' -exec curl -X POST -H "Content-Type: application/json" --data @{} http://localhost:8083/connectors \;Delete all connectors:
for i in $(curl -X GET http://localhost:8083/connectors | jq -r '.[]'| sed -E 's/(^ *)"([^"]*)":/\1\2:/'); do curl -X DELETE http://localhost:8083/connectors/$i; done