Setup:

First you need to setup a Kafka Connect Cluster using the kafka-operator from Strimzi.

connect-cluster.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  replicas: 1
  bootstrapServers: <kafka_svc_name>.<kafka_namespace>.svc.cluster.locaL:<kafka_svc_port>
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter.schemas.enable: true
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3
  build:
    output:
      type: docker
      image: <container-registry-name>.azurecr.io/debezium-connector-sink-jdbc:latest
      pushSecret: <container-registry-secret>
    plugins: 
      - name: debezium-connector-sink-jdbc
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/2.3.3.Final/debezium-connector-jdbc-2.3.3.Final-plugin.tar.gz
  template:
    pod:
      imagePullSecrets:
        - name: azurecr-secret

Then, you can define a new connector to be shipped to the Kafka Connect Cluster:

connector.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-connector-sink-jdbc
  labels:
    strimzi.io/cluster: connect-cluster
spec:
  class: io.debezium.connector.jdbc.JdbcSinkConnector
  tasksMax: 5
  config:
    tasks.max: 1
    connection.url: jdbc:mysql://basic-tidb.tidb-cluster.svc.cluster.local:4000/test
    connection.username: <db-username>
    connection.password: <db-password>
		table.name.format: <table_prefix>_${topic} # The '${topic}' placeholder will be replaced by the kafka topic in runtime. The final value of this property (in runtime) must match the destination table name. 
    primary.key.fields: <table-pk>
    primary.key.mode: record_key
    insert.mode: upsert
    delete.enabled: true
    schema.evolution: basic
    database.time_zone: UTC

OBS: the table.name.format connector configuration property apply the prefix or suffix that you want.

  • For example, to prefix all table names with jdbc_, specify the table.name.format configuration property with a value of jdbc_${topic}. If the connector is subscribed to a topic called orders, the resulting table is created as jdbc_orders.
  • You can choose to not use the ${topic} placeholder, but your topic name MUST match the table name that connector will write to.

Publish messages

if the database schema use a PK, the PK must be sent in the “Key” field of the kafka message following a similar structure schema of the “Value” field (in the kafka message).

Kafka Message Key:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "dummy-id"
            }
        ],
        "optional": false,
        "name": "your_key_schema_name"
    },
    "payload": {
        "dummy-id": 2
    }
}

Kafka Message Value:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "dummy-date"
            }
        ],
        "optional": false,
        "name": "your_key_schema_name"
    },
    "payload": {
        "dummy-date": 13
    }
}

🌱 Back to Garden