For too long our Kafka Connect story hasn’t been quite as “Kubernetes-native” as it could have been.
We had a KafkaConnect
resource to configure a Kafka Connect cluster but you still had to use the Kafka Connect REST API to actually create a connector within it.
While this wasn’t especially difficult using something like curl
, it stood out because everything else could be done using kubectl
and it meant that connectors didn’t fit into our Kubernetes-native vision.
With the help of a contribution from the community, Strimzi now supports a KafkaConnector
custom resource and the rest of this blog post is going to explain how to use it using Debezium as an example.
As if that wasn’t enough, there’s some awesome ASCII art to help explain how it all fits together. So read on to find out about this new Kubernetes-native way of managing connectors.
Debezi-what?
If you haven’t heard of Debezium before, it is an open source project for applying the Change Data Capture (CDC) pattern to your applications using Kafka.
But what is CDC? You probably have several databases in your organization; silos full of business-critical data. While you can query those databases any time you want, the essence of your business revolves around how that data gets modified. Those modifications trigger, and are triggered by, real world business events (e.g. a phone call from a customer, or a successful payment or whatever) In order to reflect this, modern application architectures are frequently event-based. And so asking for the current state of some tables is not enough; what these architectures need is a stream of events representing modifications to those tables. That’s what CDC is: Capturing the changes to the state data as event data.
Concretely, Debezium works with a number of common DBMSs (MySQL, MongoDB, PostgreSQL, Oracle, SQL Server and Cassandra) and runs as a source connector within a Kafka Connect cluster. How Debezium works on the database side depends which database it’s using. For example for MySQL it reads the commit log in order to know what transactions are happening, but for MongoDB it hooks into the native replication mechanism. In any case, the changes get represented by default as JSON events (other serializations are also possible) which are sent to Kafka.
It should be apparent, then, that Debezium provides a route for getting events out of database applications (which otherwise might not expose any kind of event-based API) and make them available to Kafka applications.
So that’s what Debezium is and what it does. Its role in this blog post is just to be an example connector to use with the KafkaConnector
which is new in Strimzi 0.16.
Let’s get cracking with the walkthrough…
Fire up MySQL
Let’s follow the steps from the Debezium tutorial for getting a demo MySQL server up and running.
First we fire up the database server in a Docker container:
docker run -it --rm --name mysql -p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.0
Once we see the following we know the server is ready:
...
2020-01-24T12:20:18.183194Z 0 [Note] mysqld: ready for connections.
Version: '5.7.29-log' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server (GPL)
Then, in another terminal, we can run the command line client:
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh \
-c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" \
-P"$MYSQL_PORT_3306_TCP_PORT" \
-uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
At the mysql>
prompt we can switch to the “inventory” database and show the tables in it:
mysql> use inventory;
mysql> show tables;
The output will look like this:
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
+---------------------+
6 rows in set (0.01 sec)
Don’t worry, this isn’t the awesome ASCII art.
If you want, you can have a poke around this demo database, but when you’re done leave this MySQL client running in its own terminal window, so you can come back to it later.
Create a Kafka cluster
Now we can follow some of the Strimzi quickstart to create a Kafka cluster running inside minikube
.
First start minikube:
minikube start --memory=4096
When the command finishes you can create a namespace for the resources we’re going to create:
kubectl create namespace kafka
Then install the cluster operator and associated resources:
curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.16.1/strimzi-cluster-operator-0.16.1.yaml \
| sed 's/namespace: .*/namespace: kafka/' \
| kubectl apply -f - -n kafka
And spin up a Kafka cluster, waiting until it’s ready:
kubectl -n kafka \
apply -f https://raw.githubusercontent.com/strimzi/strimzi-kafka-operator/0.16.1/examples/kafka/kafka-persistent-single.yaml \
&& kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s -n kafka
This can take a while, depending on the speed of your connection.
What we’ve got so far looks like this
┌────────────────────────────────┐ ┌────────────────────────────────┐
│ minikube, namespace: kafka │ │ docker │
│ │ │ │
│ Kafka │ │ MySQL │
│ name: my-cluster │ │ │
│ │ └────────────────────────────────┘
│ │
│ │
│ │
└────────────────────────────────┘
Don’t worry, the ASCII art gets a lot better than this!
What’s missing from this picture is Kafka Connect in the minikube box.
Kafka Connect image
The next step is to create a Strimzi Kafka Connect image which includes the Debezium MySQL connector and its dependencies.
First download and extract the Debezium MySQL connector archive
curl https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.0.0.Final/debezium-connector-mysql-1.0.0.Final-plugin.tar.gz \
| tar xvz
Prepare a Dockerfile
which adds those connector files to the Strimzi Kafka Connect image
cat <<EOF >Dockerfile
FROM strimzi/kafka:0.16.1-kafka-2.4.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./debezium-connector-mysql/ /opt/kafka/plugins/debezium/
USER 1001
EOF
Then build the image from that Dockerfile
and push it to dockerhub.
# You can use your own dockerhub organization
export DOCKER_ORG=tjbentley
docker build . -t ${DOCKER_ORG}/connect-debezium
docker push ${DOCKER_ORG}/connect-debezium
Secure the database credentials
To make this a bit more realistic we’re going to use Kafka’s config.providers
mechanism to avoid having to pass secret information over Kafka Connect REST interface (which uses unencrypted HTTP).
We’ll to use a Kubernetes Secret
called my-sql-credentials
to store the database credentials.
This will be mounted as a secret volume within the connect pods.
We can then configure the connector with the path to this file.
Let’s create the secret:
cat <<EOF > debezium-mysql-credentials.properties
mysql_username: debezium
mysql_password: dbz
EOF
kubectl -n kafka create secret generic my-sql-credentials \
--from-file=debezium-mysql-credentials.properties
rm debezium-mysql-credentials.properties
Create the Connect cluster
Now we can create a KafkaConnect
cluster in Kubernetes:
cat <<EOF | kubectl -n kafka apply -f -
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
# use-connector-resources configures this KafkaConnect
# to use KafkaConnector resources to avoid
# needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "true"
spec:
image: ${DOCKER_ORG}/connect-debezium
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
externalConfiguration:
volumes:
- name: connector-config
secret:
secretName: my-sql-credentials
EOF
It’s worth pointing out a couple of things about the above resource:
- In the
metadata.annotations
thestrimzi.io/use-connector-resources: "true"
annotation tells the cluster operator thatKafkaConnector
resources will be used to configure connectors within this Kafka Connect cluster. - The
spec.image
is the image we created withdocker
. - In the
config
we’re using replication factor 1 because we created a single-broker Kafka cluster. - In the
externalConfiguration
we’re referencing the secret we just created.
Create the connector
The last piece is to create the KafkaConnector
resource configured to connect to our “inventory” database in MySQL.
Here’s what the KafkaConnector
resource looks like:
cat <<EOF | kubectl -n kafka apply -f -
apiVersion: "kafka.strimzi.io/v1alpha1"
kind: "KafkaConnector"
metadata:
name: "inventory-connector"
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
database.hostname: 192.168.99.1
database.port: "3306"
database.user: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_username}"
database.password: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_password}"
database.server.id: "184054"
database.server.name: "dbserver1"
database.whitelist: "inventory"
database.history.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
database.history.kafka.topic: "schema-changes.inventory"
include.schema.changes: "true"
EOF
In metadata.labels
, strimzi.io/cluster
names the KafkaConnect
cluster which this connector will be created in.
The spec.class
names the Debezium MySQL connector and spec.tasksMax
must be 1 because that’s all this connector ever uses.
The spec.config
object contains the rest of the connector configuration.
The Debezium documentation explains the available properties, but it’s worth calling out some specifically:
- I’m using
database.hostname: 192.168.99.1
as IP address for connecting to MySQL because I’m usingminikube
with the virtualbox VM driver If you’re using a different VM driver withminikube
you might need a different IP address. - The
database.port: "3306"
works because of the-p 3306:3306
argument we used when we started up the MySQL server. - The
${file:...}
used for thedatabase.user
anddatabase.password
is a placeholder which gets replaced with the referenced property from the given file in the secret we created. - The
database.whitelist: "inventory"
basically tells Debezium to only watch theinventory
database. - The
database.history.kafka.topic: "schema-changes.inventory"
configured Debezium to use theschema-changes.inventory
topic to store the database schema history.
A while after you’ve created this connector you can have a look at its status
, using kubectl -n kafka get kctr inventory-connector -o yaml
:
#...
status:
conditions:
- lastTransitionTime: "2020-01-24T14:28:32.406Z"
status: "True"
type: Ready
connectorStatus:
connector:
state: RUNNING
worker_id: 172.17.0.9:8083
name: inventory-connector
tasks:
- id: 0
state: RUNNING
worker_id: 172.17.0.9:8083
type: source
observedGeneration: 3
This tells us that the connector is running within the KafkaConnect
cluster we created in the last step.
To summarise, we’ve now the complete picture with the connector talking to MySQL:
┌────────────────────────────────┐ ┌────────────────────────────────┐
│ minikube, namespace: kafka │ │ docker │
│ │ │ │
│ Kafka │ ┏━━━┿━▶ MySQL │
│ name: my-cluster │ ┃ │ │
│ │ ┃ └────────────────────────────────┘
│ KafkaConnect │ ┃
│ name: my-connect │ ┃
│ │ ┃
│ KafkaConnector │ ┃
│ name: inventory-connector ◀━━━┿━━━━━┛
│ │
└────────────────────────────────┘
OK, I admit it: I was lying about the ASCII art being awesome.
Showtime!
If you list the topics, for example using kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t -- bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
you should see:
__consumer_offsets
connect-cluster-configs
connect-cluster-offsets
connect-cluster-status
dbserver1
dbserver1.inventory.addresses
dbserver1.inventory.customers
dbserver1.inventory.geom
dbserver1.inventory.orders
dbserver1.inventory.products
dbserver1.inventory.products_on_hand
schema-changes.inventory
The connect-cluster-*
topics are the usual internal Kafka Connect topics.
Debezium has created a topic for the server itself (dbserver1
), and one for each table within the inventory
database.
Let’s start consuming from one of those change topics:
kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t -- \
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic dbserver1.inventory.customers
This will block waiting for records/messages. To produce those messages we have to make some changes in the database. So back in the terminal window we left open with the MySQL command line client running we can make some changes to the data. First let’s see the existing customers:
mysql> SELECT * FROM customers;
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)
Now let’s change the first_name
of the last customer:
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
Switching terminal window again we should be able to see this name change event in the dbserver1.inventory.customers
topic:
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":{"id":1004,"first_name":"Anne Mary","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.10.0.Final","connector":"mysql","name":"dbserver1","ts_ms":1574090237000,"snapshot":"false","db":"inventory","table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":4311,"row":0,"thread":3,"query":null},"op":"u","ts_ms":1574090237089}}
which is rather a lot of JSON, but if we reformat it (e.g. using copying, paste and jq
) we see:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope"
},
"payload": {
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne Mary",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "1.0.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1574090237000,
"snapshot": "false",
"db": "inventory",
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 4311,
"row": 0,
"thread": 3,
"query": null
},
"op": "u",
"ts_ms": 1574090237089
}
}
The schema
object is describing the schema of the actual event payload.
What’s more interesting for this post is the payload
itself. Working backwards we have:
ts_ms
is the timestamp of when the change happenedop
tells us this was anu
pdate (an insert would bec
and a delete would bed
)- the
source
which tells us exactly which table of which database in which server got changed. before
andafter
are pretty self-explanatory, describing the row before and after the update.
You can of course experiment with inserting and deleting rows in different tables (remember, there’s a topic for each table).
But what about that dbserver1
topic which I glossed over? If we look at the messages in there
(using kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbserver1 --from-beginning
)
we can see records representing the DDL which was used (when the docker image was created) to create the database.
For example:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "databaseName"
},
{
"type": "string",
"optional": false,
"field": "ddl"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.SchemaChangeValue"
},
"payload": {
"source": {
"version": "1.0.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 0,
"snapshot": "true",
"db": "inventory",
"table": "products_on_hand",
"server_id": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 2324,
"row": 0,
"thread": null,
"query": null
},
"databaseName": "inventory",
"ddl": "CREATE TABLE `products_on_hand` (\n `product_id` int(11) NOT NULL,\n `quantity` int(11) NOT NULL,\n PRIMARY KEY (`product_id`),\n CONSTRAINT `products_on_hand_ibfk_1` FOREIGN KEY (`product_id`) REFERENCES `products` (`id`)\n) ENGINE=InnoDB DEFAULT CHARSET=latin1"
}
}
Again we have a schema
and payload
. This time the payload
has:
source
, like before,databaseName
, which tells us which database this record is for- the
ddl
string tells us how theproducts_on_hand
table was created.
Conclusion
In this post we’ve learned that Strimzi now supports a KafkaConnector
custom resource which you can use to define connectors.
We’ve demonstrated this generic functionality using the Debezium connector as an example.
By creating a KafkaConnector
resource, linked to our KafkaConnect
cluster via the strimzi.io/cluster
label, we were able to observe changes made in a MySQL database as records in a Kafka topic.
And finally, we’ve been left with a feeling of disappointment about the unfulfilled promise of awesome ASCII art.