In a previous blog post we showed how easy it is to integrate Camel Kafka Connectors with Strimzi by configuring a KafkaConnect custom resource. That approach had one limitation - you had to build your own Kafka Connect image and use it in the custom resource. This step is no longer needed thanks to a feature introduced in Strimzi 0.21 that allows custom Kafka Connect images to be built directly by Strimzi.

How does it work?

Strimzi builds the image in two ways based on the underlying cluster. If you use Kubernetes, the image is built by kaniko. kaniko is a tool to build container images from a Dockerfile, inside a container or Kubernetes cluster. kaniko doesn’t depend on a Docker daemon and executes each command within a Dockerfile completely in userspace. This enables building container images in environments that can’t easily or securely run a Docker daemon, such as a standard Kubernetes cluster. If you use Openshift, the image is built by Openshift Builds.

In both cases, Strimzi will spin-up a build pod, which builds the image based on the configuration from the custom resource. The final image is then pushed into a specific container registry or image stream, again based on the configuration. The Kafka Connect cluster specified by the custom resource with the build configuration part will then use the newly built image.

Kafka Connect configuration

A new build configuration for the KafkaConnect resource allows you to configure a list of custom connectors, which are downloaded and baked into a new KafkaConnect image specified by you.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  ...
  build: (1)
    output: (2)
      type: docker
      image: my-registry.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins: (3) 
      - name: debezium-postgres-connector
        artifacts:
          - type: zip
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.4.2.Final/debezium-connector-postgres-1.4.2.Final-plugin.zip
            sha512sum: ef1620547e6ddf5be010271849b6a87a19f6e6beee93b379c80883815b8f37ec5137095b2c99975d7704cbf957e6a33d76c61109582cad57c7cbbfae43adc86c
      - name: camel-timer
        artifacts:
          - type: tgz
            url: https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-timer-kafka-connector/0.8.0/camel-timer-kafka-connector-0.8.0-package.tar.gz
            sha512sum: c0102700ae176b1ef078bdb86d640935a1333865e58b870f6b0ef3d9dad067221122aee6bf52852dad209a7c2051f359e0434fc7fbd783fb50e21707c7577ef9
      - name: echo-connector
        artifacts:
          - type: jar
            url: https://github.com/scholzj/echo-sink/releases/download/1.1.0/echo-sink-1.1.0.jar
            sha512sum: b7da48d5ecd1e4199886d169ced1bf702ffbdfd704d69e0da97e78ff63c1bcece2f59c2c6c751f9c20be73472b8cb6a31b6fd4f75558c1cb9d96daa9e9e603d2

In this example KafkaConnect configuration, you can see the build specification:

  • (1) - build configuration which contains output information and list of plugins.
  • (2) - configuration of registry, where new image will be pushed.
  • (3) - List of plugins, which will be downloaded and added into your specific connect image.

There are a several possible options which you can configure for the build. For the output property, you need to specify the type either as docker to push into a container registry like Docker Hub or Quay, or as ImageStream to push the image into an internal OpenShift registry (OpenShift only). The output configuration also requires an image name in image property. Optional property is pushSecret in case the registry is protected. The secret has to be deployed in the same namespace as the KafkaConnect resource. You can find more information about how to create this secret in the Kubernetes documentation.

For the plugins configuration, you need to list all connectors that you want to have in your image. You can also use this to add custom SMTs or converters. You need to specify the name of the plugin, and add the information needed to download and work with the plugin using the artifacts property. The artifact type can be zip, tgz, or jar and, of course, has to be same as the artifact specified with the url. sha512sum is an optional property, which is used for validation of the downloaded artifact by Strimzi.

Quick example

Now let’s go through a quick example of how to make the build work in your cluster. First, you have to have Strimzi and a Kafka cluster up and running. Then you can create KafkaConnect with the following configuration (just change the registry and organization in the image name and secret):

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  config:
    group.id: my-connect-cluster
    offset.storage.topic: my-connect-cluster-offsets
    config.storage.topic: my-connect-cluster-configs
    status.storage.topic: my-connect-cluster-status
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter.schemas.enable: true
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3
  build:
    output:
      type: docker
      image: my-reg.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins: 
      - name: camel-timer
        artifacts:
          - type: tgz
            url: https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-timer-kafka-connector/0.8.0/camel-timer-kafka-connector-0.8.0-package.tar.gz
            sha512sum: c0102700ae176b1ef078bdb86d640935a1333865e58b870f6b0ef3d9dad067221122aee6bf52852dad209a7c2051f359e0434fc7fbd783fb50e21707c7577ef9
  template:
    pod:
      imagePullSecrets:
        - name: my-registry-credentials

Strimzi will spin-up a Kafka Connect build pod, and when the build is finished a regular Kafka Connect pod is created.

NAME                                          READY   STATUS      RESTARTS   AGE
my-cluster-entity-operator-7d7f49cbc-b47cw    3/3     Running     0          4m10s
my-cluster-kafka-0                            1/1     Running     0          4m43s
my-cluster-kafka-1                            1/1     Running     0          4m43s
my-cluster-kafka-2                            1/1     Running     0          4m43s
my-cluster-zookeeper-0                        1/1     Running     0          5m14s
my-cluster-zookeeper-1                        1/1     Running     0          5m14s
my-cluster-zookeeper-2                        1/1     Running     0          5m14s
my-connect-cluster-connect-69bc4bc47c-tvjzh   1/1     Running     0          74s
my-connect-cluster-connect-build              0/1     Completed   0          2m2s
strimzi-cluster-operator-769f57cb64-7cbvm     1/1     Running     0          6m3s

You can also double check the status of the KafkaConnect resource, which will now contain all available connectors:

...
  status:
    conditions:
    - lastTransitionTime: "2021-03-23T16:31:11.204228Z"
      status: "True"
      type: Ready
    connectorPlugins:
    - class: org.apache.camel.kafkaconnector.CamelSinkConnector
      type: sink
      version: 0.8.0
    - class: org.apache.camel.kafkaconnector.CamelSourceConnector
      type: source
      version: 0.8.0
    - class: org.apache.camel.kafkaconnector.timer.CamelTimerSourceConnector
      type: source
      version: 0.8.0
    - class: org.apache.kafka.connect.file.FileStreamSinkConnector
      type: sink
      version: 2.7.0
    - class: org.apache.kafka.connect.file.FileStreamSourceConnector
      type: source
      version: 2.7.0
    - class: org.apache.kafka.connect.mirror.MirrorCheckpointConnector
      type: source
      version: "1"
    - class: org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
      type: source
      version: "1"
    - class: org.apache.kafka.connect.mirror.MirrorSourceConnector
      type: source
      version: "1"
    labelSelector: strimzi.io/cluster=my-connect-cluster,strimzi.io/name=my-connect-cluster-connect,strimzi.io/kind=KafkaConnect
    observedGeneration: 1
    replicas: 1
    url: http://my-connect-cluster-connect-api.kafka.svc:8083

Now you can create a KafkaConnector resource and use any of the connectors which you added to the build section.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: telegram-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.camel.kafkaconnector.timer.CamelTimerSourceConnector
  tasksMax: 5
  config:
    camel.source.path.timerName: timer
    topics: timer-topic
    value.converter.schemas.enable: false
    transforms: HoistField,InsertField,ReplaceField
    transforms.HoistField.type: org.apache.kafka.connect.transforms.HoistField$Value
    transforms.HoistField.field: originalValue
    transforms.InsertField.type: org.apache.kafka.connect.transforms.InsertField$Value
    transforms.InsertField.timestamp.field: timestamp
    transforms.InsertField.static.field: message
    transforms.InsertField.static.value: 'Hello World'
    transforms.ReplaceField.type: org.apache.kafka.connect.transforms.ReplaceField$Value
    transforms.ReplaceField.blacklist: originalValue

You can verify that a source connector is working by attaching a Kafka client to a connector topic:

kubectl run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.22.1-kafka-2.7.0 --rm=true --restart=Never -n kafka -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic timer-topic

{"schema":null,"payload":{"message":"Hello World","timestamp":1616517487920}}
{"schema":null,"payload":{"message":"Hello World","timestamp":1616517487914}}
{"schema":null,"payload":{"message":"Hello World","timestamp":1616517488925}}
...

The connector generates messages and sends them to Kafka where they are consumed by consumers.

Conclusion

In this blog post we showed how easy it is to set up Kafka Connect with your custom connectors using only kubectl and Strimzi.
For more information, please see KafkaConnect build reference in our documentation. You don’t need to download anything. You don’t need to build anything. You just need to create the KafkaConnect resource and use it!