to select which topics the connector should listen for updates, you can use an aggregation pipeline like this:

{"$match": {"ns.coll": {"$in": ["masstypes", "locations", "plans"]}}}

when using $project to extract only the fields of interest, you must keep the ns structure, otherwise the connector will not found the topic to publish the output.

    "ns": {
        "db": "flow",
        "coll": "locations"
    },

the full json config should look like this:

{
    "change.stream.full.document": "updateLookup",
    "connection.password": "<sensitive>",
    "connection.uri": "<sensitive>",
    "connection.url": "<sensitive>",
    "connection.username": "mongobongo",
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "database": "flow",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "true",
    "name": "flow-manifest-mongodb-source-connector",
    "output.schema.infer.value": "false",
    "pipeline": "[{\"$match\": {\"ns.coll\": {\"$in\": [\"masstypes\", \"locations\", \"plans\"]}}}, {\"$addFields\": {\"collection\": \"$ns.coll\", \"company_id\": \"$fullDocument.company_id\", \"resource_id\": \"$fullDocument._id\", \"action\": \"$operationType\"}}, {\"$project\": {\"operationType\": 1, \"fullDocument._id\": 1, \"fullDocument.company_id\": 1, \"ns\": 1, \"documentKey\": 1, \"collection\": 1}}]",
    "publish.full.document.only": "false",
    "publish.full.document.only.tombstone.on.delete": "true",
    "startup.mode": "latest",
    "topic.creation.default.partitions": "1",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.enable": "true",
    "topic.prefix": "connector",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schemas.enable": "true"
}

Schema Example:

i want to apply this schema:

type ConnectorEvent struct {
	Collection string `json:"collection"`
	CompanyId  string `json:"company_id"`
	ResourceId string `json:"resource_id"`
	Action     string `json:"action"`
}
 
{
    "connection.password": "<sensitive>",
    "connection.uri": "<sensitive>",
    "connection.url": "<sensitive>",
    "connection.username": "mongobongo",
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "database": "flow",
    "collection": "purchases",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "true",
    "name": "flow-manifest-mongodb-source-connector",
    "output.format.value": "schema",
    "output.schema.value": "{\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]}",
    "pipeline": "[{\"$match\": {\"ns.coll\": {\"$in\": [\"masstypes\", \"locations\", \"plans\"]}}}]",
    "publish.full.document.only": "true",
    "publish.full.document.only.tombstone.on.delete": "false",
    "startup.mode": "latest",
    "startup.mode.copy.existing.namespace.regex": "flow\\.(masstypes|locations|plans)",
    "topic.creation.default.partitions": "1",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.enable": "true",
    "topic.prefix": "connector",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true"
}

🌱 Back to Garden