In this exercise will learn to use kafka connectors
Kafka connect is a utility from confluent kafka, where you can integrate kafka with thirds parts
There are two types differents connectors:
-
Source: Where read of thirds parts and write any topic
-
Sink: Where read a topic and write a thirds parts
See List Connectors.
$ git clone https://github.com/jotamayo/MongoDB-Kafka-Connector-first-steps.git
$ cd MongoDB-Kafka-Connector-first-steps
$ docker-compose up -d
When finish all images to run, then you can go to
- See Control center
- You will can connect with database mongodb and check
docker exec -it mongodb bash
mongosh "mongodb://localhost:27017" --username admin --authenticationDatabase admin
test> use cop
cop>
cop> db.test.insertOne("msg", "message")
cop> db.getCollectionNames()
[ 'test' ]
cop> db.test.drop()
cop> db.getCollectionNames()
[ ]
cop > db.firstcollection.insertOne("hello world")
{
acknowledged: true,
insertedId: ObjectId("6208db954d9b42dc099b6043")
}
db.firstcollection.countDocuments()
1
We will create a new topic in kafka, we need download binary commands files before in Download
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic first-topic --partitions 1 --replication-factor 1
Created topic first-topic.
It's time to create our first sink connector
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "MongoDBFirstSink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"output.format.value" : "schema",
"topics":"first-topic",
"output.json.formatter":"com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"output.schema.infer.value":true,
"connection.uri": "mongodb://admin:123@mongodb:27017/admin?authSource=admin&readPreference=primary&appname=MongoDB%20Compass&directConnection=true&ssl=false",
"copy.existing": "true",
"pipeline": "[{\"$match\": {}}]",
"publish.full.document.only": "true",
"copy.existing.max.threads": "1",
"copy.existing.queue.size": "160000",
"database": "cop",
"collection": "firstcollection"
}
}
'
We can check status new connector now
curl --location --request GET 'http://localhost:8083/connectors/MongoDBFirstSink/status'
{
"name": "MongoDBFirstSink",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
}
],
"type": "sink"
}
If you want delete it, you need launch
curl --location --request DELETE 'http://localhost:8083/connectors/MongoDBFirstSink' \
--header 'Content-Type: application/json' \
--data-raw '
Or list all connectors is this dns
curl --location --request GET 'http://localhost:8083/connectors'
[
"MongoDBFirstSink",
]
We launch a producer
./kafka-console-producer.sh --broker-list localhost:9092 --topic first-topic --property parse.key=true --property key.separator=: < exampleWithKey1000.data
So, we can see how there are new messages in a topic called first-topic
and there are new messages in a collection firstcollection
cop> db.firstcollection.countDocuments()
1008