Setup:
First you need to setup a Kafka Connect Cluster using the kafka-operator from Strimzi.
connect-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
replicas: 1
bootstrapServers: <kafka_svc_name>.<kafka_namespace>.svc.cluster.locaL:<kafka_svc_port>
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
value.converter.schemas.enable: true
config.storage.replication.factor: 3
offset.storage.replication.factor: 3
status.storage.replication.factor: 3
build:
output:
type: docker
image: <container-registry-name>.azurecr.io/debezium-connector-sink-jdbc:latest
pushSecret: <container-registry-secret>
plugins:
- name: debezium-connector-sink-jdbc
artifacts:
- type: tgz
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/2.3.3.Final/debezium-connector-jdbc-2.3.3.Final-plugin.tar.gz
template:
pod:
imagePullSecrets:
- name: azurecr-secretThen, you can define a new connector to be shipped to the Kafka Connect Cluster:
connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-sink-jdbc
labels:
strimzi.io/cluster: connect-cluster
spec:
class: io.debezium.connector.jdbc.JdbcSinkConnector
tasksMax: 5
config:
tasks.max: 1
connection.url: jdbc:mysql://basic-tidb.tidb-cluster.svc.cluster.local:4000/test
connection.username: <db-username>
connection.password: <db-password>
table.name.format: <table_prefix>_${topic} # The '${topic}' placeholder will be replaced by the kafka topic in runtime. The final value of this property (in runtime) must match the destination table name.
primary.key.fields: <table-pk>
primary.key.mode: record_key
insert.mode: upsert
delete.enabled: true
schema.evolution: basic
database.time_zone: UTCOBS: the table.name.format connector configuration property apply the prefix or suffix that you want.
- For example, to prefix all table names with
jdbc_, specify thetable.name.formatconfiguration property with a value ofjdbc_${topic}. If the connector is subscribed to a topic calledorders, the resulting table is created asjdbc_orders. - You can choose to not use the
${topic}placeholder, but your topic name MUST match the table name that connector will write to.
Publish messages
if the database schema use a PK, the PK must be sent in the “Key” field of the kafka message following a similar structure schema of the “Value” field (in the kafka message).
Kafka Message Key:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "dummy-id"
}
],
"optional": false,
"name": "your_key_schema_name"
},
"payload": {
"dummy-id": 2
}
}Kafka Message Value:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "dummy-date"
}
],
"optional": false,
"name": "your_key_schema_name"
},
"payload": {
"dummy-date": 13
}
}