to select which topics the connector should listen for updates, you can use an aggregation pipeline like this:
{"$match": {"ns.coll": {"$in": ["masstypes", "locations", "plans"]}}}when using $project to extract only the fields of interest, you must keep the ns structure, otherwise the connector will not found the topic to publish the output.
"ns": {
"db": "flow",
"coll": "locations"
},the full json config should look like this:
{
"change.stream.full.document": "updateLookup",
"connection.password": "<sensitive>",
"connection.uri": "<sensitive>",
"connection.url": "<sensitive>",
"connection.username": "mongobongo",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"database": "flow",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "true",
"name": "flow-manifest-mongodb-source-connector",
"output.schema.infer.value": "false",
"pipeline": "[{\"$match\": {\"ns.coll\": {\"$in\": [\"masstypes\", \"locations\", \"plans\"]}}}, {\"$addFields\": {\"collection\": \"$ns.coll\", \"company_id\": \"$fullDocument.company_id\", \"resource_id\": \"$fullDocument._id\", \"action\": \"$operationType\"}}, {\"$project\": {\"operationType\": 1, \"fullDocument._id\": 1, \"fullDocument.company_id\": 1, \"ns\": 1, \"documentKey\": 1, \"collection\": 1}}]",
"publish.full.document.only": "false",
"publish.full.document.only.tombstone.on.delete": "true",
"startup.mode": "latest",
"topic.creation.default.partitions": "1",
"topic.creation.default.replication.factor": "1",
"topic.creation.enable": "true",
"topic.prefix": "connector",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "true"
}Schema Example:
i want to apply this schema:
type ConnectorEvent struct {
Collection string `json:"collection"`
CompanyId string `json:"company_id"`
ResourceId string `json:"resource_id"`
Action string `json:"action"`
}
{
"connection.password": "<sensitive>",
"connection.uri": "<sensitive>",
"connection.url": "<sensitive>",
"connection.username": "mongobongo",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"database": "flow",
"collection": "purchases",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "true",
"name": "flow-manifest-mongodb-source-connector",
"output.format.value": "schema",
"output.schema.value": "{\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]}",
"pipeline": "[{\"$match\": {\"ns.coll\": {\"$in\": [\"masstypes\", \"locations\", \"plans\"]}}}]",
"publish.full.document.only": "true",
"publish.full.document.only.tombstone.on.delete": "false",
"startup.mode": "latest",
"startup.mode.copy.existing.namespace.regex": "flow\\.(masstypes|locations|plans)",
"topic.creation.default.partitions": "1",
"topic.creation.default.replication.factor": "1",
"topic.creation.enable": "true",
"topic.prefix": "connector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true"
}