Strimzi

Strimzi Documentation (0.4.0)

Strimzi provides a way to run an Apache Kafka cluster on Kubernetes or OpenShift in various deployment configurations. This guide describes how to install and use Strimzi.

1. Overview

Apache Kafka is a popular platform for streaming data delivery and processing. For more details about Apache Kafka itself visit Apache Kafka website. The aim of Strimzi is to make it easy to run Apache Kafka on Kubernetes and OpenShift.

Strimzi consists of two main components:

Cluster Operator

Responsible for deploying and managing Apache Kafka clusters within a Kubernetes or OpenShift cluster

Topic Operator

Responsible for managing Kafka topics within a Kafka cluster running within Kubernetes or OpenShift

2. Getting started

2.1. Prerequisites

A Kubernetes or OpenShift cluster is required to deploy Strimzi. Strimzi supports all kinds of clusters - from public and private clouds down to local deployments intended for development purposes. This guide expects that a Kubernetes or OpenShift cluster is available and the kubectl or oc command line tools are installed and configured to connect to the running cluster.

When no existing Kubernetes or OpenShift cluster is available, Minikube or Minishift can be used to create a local cluster. More details can be found in Appendix B, Installing Kubernetes and OpenShift cluster

In order to execute the commands in this guide, your Kubernetes/OpenShift user needs to have the rights to create and manage RBAC resources (Roles and Role Bindings).

2.2. Cluster Operator

Strimzi uses a component called the Cluster Operator to deploy and manage Kafka (including Zookeeper) and Kafka Connect clusters. The Cluster Operator is deployed as a process running inside your Kubernetes or OpenShift cluster. To deploy a Kafka cluster, a ConfigMap with the cluster configuration has to be created. Based on the information in that ConfigMap, the Cluster Operator will deploy a corresponding Kafka cluster. By default, the ConfigMap needs to be labeled with following labels:

strimzi.io/type: kafka
strimzi.io/kind: cluster

and contain the cluster configuration in a specific format. The ConfigMap format is described in Section 3.2, “Format of the cluster ConfigMap”.

Strimzi contains example YAML files which make deploying a Cluster Operator easier.

2.2.1. Deploying to Kubernetes

To deploy the Cluster Operator on Kubernetes, the following command should be executed:

kubectl create -f examples/install/cluster-operator

To verify whether the Cluster Operator has been deployed successfully, the Kubernetes Dashboard or the following command can be used:

kubectl describe all

2.2.2. Deploying to OpenShift

To deploy the Cluster Operator on OpenShift, the following commands should be executed:

oc create -f examples/install/cluster-operator
oc create -f examples/templates/cluster-operator

To verify whether the Cluster Operator has been deployed successfully, the OpenShift console or the following command can be used:

oc describe all

2.3. Kafka broker

Strimzi uses the StatefulSets feature of Kubernetes/OpenShift to deploy Kafka brokers. With StatefulSets, the pods receive a unique name and network identity and that makes it easier to identify the individual Kafka broker pods and set their identity (broker ID). The deployment uses regular and headless services:

  • regular services can be used as bootstrap servers for Kafka clients

  • headless services are needed to have DNS resolve the pods IP addresses directly

As well as Kafka, Strimzi also installs a Zookeeper cluster and configures the Kafka brokers to connect to it. The Zookeeper cluster also uses StatefulSets.

Strimzi provides two flavors of Kafka broker deployment: ephemeral and persistent.

The ephemeral flavour is suitable only for development and testing purposes and not for production. The ephemeral flavour uses emptyDir volumes for storing broker information (Zookeeper) and topics/partitions (Kafka). Using emptyDir volume means that its content is strictly related to the pod life cycle (it is deleted when the pod goes down). This makes the in-memory deployment well-suited to development and testing because you don’t have to provide persistent volumes.

The persistent flavour uses PersistentVolumes to store Zookeeper and Kafka data. The PersistentVolume is acquired using a PersistentVolumeClaim – that makes it independent of the actual type of the PersistentVolume. For example, it can use HostPath volumes on Minikube or Amazon EBS volumes in Amazon AWS deployments without any changes in the YAML files. The PersistentVolumeClaim can use a StorageClass to trigger automatic volume provisioning.

To deploy a Kafka cluster, a ConfigMap with the cluster configuration has to be created. The ConfigMap should have the following labels:

strimzi.io/type: kafka
strimzi.io/kind: cluster

Example ConfigMaps and the details about the ConfigMap format are in Section 3.2.1, “Kafka”.

2.3.1. Deploying to Kubernetes

To deploy a Kafka broker on Kubernetes, the corresponding ConfigMap has to be created. To create the ephemeral cluster using the provided example ConfigMap, the following command should be executed:

kubectl apply -f examples/configmaps/cluster-operator/kafka-ephemeral.yaml

Another example ConfigMap is provided for persistent Kafka cluster. To deploy it, the following command should be run:

kubectl apply -f examples/configmaps/cluster-operator/kafka-persistent.yaml

2.3.2. Deploying to OpenShift

For OpenShift, the Kafka broker is provided in the form of a template. The cluster can be deployed from the template either using the command line or using the OpenShift console. To create the ephemeral cluster, the following command should be executed:

oc new-app strimzi-ephemeral

Similarly, to deploy a persistent Kafka cluster the following command should be run:

oc new-app strimzi-persistent

2.4. Kafka Connect

The Cluster Operator can also deploy a Kafka Connect cluster which can be used with either of the Kafka broker deployments described above. It is implemented as a Deployment with a configurable number of workers. The default image currently contains only the Connectors distributed with Apache Kafka Connect: FileStreamSinkConnector and FileStreamSourceConnector. The REST interface for managing the Kafka Connect cluster is exposed internally within the Kubernetes/OpenShift cluster as kafka-connect service on port 8083.

Example ConfigMaps and the details about the ConfigMap format for deploying Kafka Connect can be found in Section 3.2.2, “Kafka Connect”.

2.4.1. Deploying to Kubernetes

To deploy Kafka Connect on Kubernetes, the corresponding ConfigMap has to be created. An example ConfigMap can be created using the following command:

kubectl apply -f examples/configmaps/cluster-operator/kafka-connect.yaml

2.4.2. Deploying to OpenShift

On OpenShift, Kafka Connect is provided in the form of a template. It can be deployed from the template either using the command line or using the OpenShift console. To create a Kafka Connect cluster from the command line, the following command should be run:

oc new-app strimzi-connect

2.4.3. Using Kafka Connect with additional plugins

Strimzi Docker images for Kafka Connect contain, by default, only the FileStreamSinkConnector and FileStreamSourceConnector connectors which are part of Apache Kafka.

To facilitate deployment with 3rd party connectors, Kafka Connect is configured to automatically load all plugins/connectors which are present in the /opt/kafka/plugins directory during startup. There are two ways of adding custom plugins into this directory:

  • Using a custom Docker image

  • Using the OpenShift build system with the Strimzi S2I image

Create a new image based on strimzi/kafka-connect

Strimzi provides its own Docker image for running Kafka Connect which can be found on Docker Hub as strimzi/kafka-connect. This image could be used as a base image for building a new custom image with additional plugins. The following steps describe the process for creating such a custom image:

  1. Create a new Dockerfile which uses strimzi/kafka-connect as the base image

    FROM strimzi/kafka-connect:latest
    USER root:root
    COPY ./my-plugin/ /opt/kafka/plugins/
    USER kafka:kafka
  2. Build the Docker image and upload it to the appropriate Docker repository

  3. Use the new Docker image in the Kafka Connect deployment:

    • On OpenShift, the template parameters IMAGE_REPO_NAME, IMAGE_NAME and IMAGE_TAG can be changed to point to the new image when the Kafka Connect cluster is being deployed

    • On Kubernetes, the Kafka Connect ConfigMap has to be modified to use the new image

Using OpenShift Build and S2I image

OpenShift supports Builds which can be used together with Source-to-Image (S2I) framework to create new Docker images. OpenShift Build takes a builder image with S2I support together with source code and/or binaries provided by the user and uses them to build a new Docker image. The newly created Docker Image will be stored in OpenShift’s local Docker repository and can then be used in deployments. Strimzi provides a Kafka Connect builder image strimzi/kafka-connect-s2i with such S2I support. It takes user-provided binaries (with plugins and connectors) and creates a new Kafka Connect image. This enhanced Kafka Connect image can be used with our Kafka Connect deployment.

The S2I deployment is again provided as an OpenShift template. It can be deployed from the template either using the command line or using the OpenShift console. To create Kafka Connect S2I cluster from the command line, the following command should be run:

oc new-app strimzi-connect-s2i

Once the cluster is deployed, a new Build can be triggered from the command line:

  1. A directory with Kafka Connect plugins has to be prepared first. For example:

    $ tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    │   ├── bson-3.4.2.jar
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mongodb-0.7.1.jar
    │   ├── debezium-core-0.7.1.jar
    │   ├── LICENSE.txt
    │   ├── mongodb-driver-3.4.2.jar
    │   ├── mongodb-driver-core-3.4.2.jar
    │   └── README.md
    ├── debezium-connector-mysql
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mysql-0.7.1.jar
    │   ├── debezium-core-0.7.1.jar
    │   ├── LICENSE.txt
    │   ├── mysql-binlog-connector-java-0.13.0.jar
    │   ├── mysql-connector-java-5.1.40.jar
    │   ├── README.md
    │   └── wkb-1.0.2.jar
    └── debezium-connector-postgres
        ├── CHANGELOG.md
        ├── CONTRIBUTE.md
        ├── COPYRIGHT.txt
        ├── debezium-connector-postgres-0.7.1.jar
        ├── debezium-core-0.7.1.jar
        ├── LICENSE.txt
        ├── postgresql-42.0.0.jar
        ├── protobuf-java-2.6.1.jar
        └── README.md
  2. To start a new image build using the prepared directory, the following command has to be run:

    oc start-build my-connect-cluster-connect --from-dir ./my-plugins/

    The name of the build should be changed according to the cluster name of the deployed Kafka Connect cluster.

  3. Once the build is finished, the new image will be used automatically by the Kafka Connect deployment.

2.5. Topic Operator

Strimzi uses a component called the Topic Operator to manage topics in the Kafka cluster. The Topic Operator is deployed as a process running inside a Kubernetes/OpenShift cluster. To create a new Kafka topic, a ConfigMap with the related configuration (name, partitions, replication factor, …​) has to be created. Based on the information in that ConfigMap, the Topic Operator will create a corresponding Kafka topic in the cluster.

Deleting a topic ConfigMap raises the deletion of the corresponding Kafka topic as well.

The Cluster Operator is able to deploy a Topic Operator, which can be configured in the cluster ConfigMap. Alternatively, it is possible to deploy a Topic Operator manually, rather than having it deployed by the Cluster Operator.

2.5.1. Deploying through the Cluster Operator

To deploy the Topic Operator through the Cluster Operator, its configuration needs to be provided in the cluster ConfigMap in the topic-operator-config field as a JSON string.

For more information on the JSON configuration format see Section 3.2.1.7, “Topic Operator”.

2.5.2. Deploying standalone Topic Operator

If you are not going to deploy the Kafka cluster using the Cluster Operator but you already have a Kafka cluster deployed on Kubernetes or OpenShift, it could be useful to deploy the Topic Operator using the provided YAML files. In that case you can still leverage on the Topic Operator features of managing Kafka topics through related ConfigMaps.

Deploying to Kubernetes

To deploy the Topic Operator on Kubernetes (not through the Cluster Operator), the following command should be executed:

kubectl create -f examples/install/topic-operator.yaml

To verify whether the Topic Operator has been deployed successfully, the Kubernetes Dashboard or the following command can be used:

kubectl describe all
Deploying to OpenShift

To deploy the Topic Operator on OpenShift (not through the Cluster Operator), the following command should be executed:

oc create -f examples/install/topic-operator

To verify whether the Topic Operator has been deployed successfully, the OpenShift console or the following command can be used:

oc describe all

2.5.3. Topic ConfigMap

When the Topic Operator is deployed by the Cluster Operator it will be configured to watch for "topic ConfigMaps" which are those with the following labels:

strimzi.io/cluster: <cluster-name>
strimzi.io/kind: topic
Note
When the Topic Operator is deployed manually the strimzi.io/cluster label is not necessary.

The topic ConfigMap contains the topic configuration in a specific format. The ConfigMap format is described in Section 4.3, “Format of the ConfigMap”.

3. Cluster Operator

The cluster operator is in charge of deploying a Kafka cluster alongside a Zookeeper ensemble. As part of the Kafka cluster, it can also deploy the topic operator which provides operator-style topic management via ConfigMaps. The cluster operator is also able to deploy a Kafka Connect cluster which connects to an existing Kafka cluster. On OpenShift such a cluster can be deployed using the Source2Image feature, providing an easy way of including more connectors.

Cluster Operator

When the cluster operator is up, it starts to "watch" for ConfigMaps containing the Kafka or Kafka Connect cluster configuration. Such ConfigMaps need to have a specific label which is, by default, strimzi.io/kind=cluster (as described later) that can be changed through a corresponding environment variable.

When a new ConfigMap is created in the Kubernetes/OpenShift cluster, the operator gets the cluster configuration from its data section and starts creating a new Kafka or Kafka Connect cluster by creating the necessary Kubernetes/OpenShift resources, such as StatefulSets, ConfigMaps, Services etc.

Every time the ConfigMap is updated by the user with some changes in the data section, the operator performs corresponding updates on the Kubernetes/OpenShift resources which make up the Kafka or Kafka Connect cluster. Resources are either patched or deleted and then re-created in order to make the Kafka or Kafka Connect cluster reflect the state of the cluster ConfigMap. This might cause a rolling update which might lead to service disruption.

Finally, when the ConfigMap is deleted, the operator starts to un-deploy the cluster deleting all the related Kubernetes/OpenShift resources.

3.1. Reconciliation

Although the operator reacts to all notifications about the cluster ConfigMaps received from the Kubernetes/OpenShift cluster, if the operator is not running, or if a notification is not received for any reason, the ConfigMaps will get out of sync with the state of the running Kubernetes/OpenShift cluster.

In order to handle failovers properly, a periodic reconciliation process is executed by the cluster operator so that it can compare the state of the ConfigMaps with the current cluster deployment in order to have a consistent state across all of them.

3.2. Format of the cluster ConfigMap

The operator watches for ConfigMaps having the label strimzi.io/kind=cluster in order to find and get configuration for a Kafka or Kafka Connect cluster to deploy.

In order to distinguish which "type" of cluster to deploy, Kafka or Kafka Connect, the operator checks the strimzi.io/type label which can have one of the the following values :

  • kafka: the ConfigMap provides configuration for a Kafka cluster (with Zookeeper ensemble) deployment

  • kafka-connect: the ConfigMap provides configuration for a Kafka Connect cluster deployment

  • kafka-connect-s2i: the ConfigMap provides configuration for a Kafka Connect cluster deployment using Build and Source2Image features (works only with OpenShift)

Whatever other labels are applied to the ConfigMap will also be applied to the Kubernetes/OpenShift resources making up the Kafka or Kafka Connect cluster. This provides a convenient mechanism for those resource to be labelled in whatever way the user requires.

The data section of such ConfigMaps contains different keys depending on the "type" of deployment as described in the following sections.

3.2.1. Kafka

In order to configure a Kafka cluster deployment, it’s possible to specify the following fields in the data section of the related ConfigMap :

  • kafka-nodes: number of Kafka broker nodes. Default is 3

  • kafka-image: the Docker image to use for the Kafka brokers. Default is determined by the value of the STRIMZI_DEFAULT_KAFKA_IMAGE environment variable of the Cluster Operator.

  • kafka-healthcheck-delay: the initial delay for the liveness and readiness probes for each Kafka broker node. Default is 15

  • kafka-healthcheck-timeout: the timeout on the liveness and readiness probes for each Kafka broker node. Default is 5

  • kafka-config: a JSON string with Kafka configuration. See section Section 3.2.1.1, “Kafka Configuration” for more details.

  • kafka-storage: a JSON string representing the storage configuration for the Kafka broker nodes. See section Section 3.2.1.3, “Storage” for more details.

  • kafka-metrics-config: a JSON string representing the JMX exporter configuration for exposing metrics from Kafka broker nodes. Removing this field means having no metrics exposed.

  • kafka-resources: a JSON string configuring the resource limits and requests for Kafka broker containers. The accepted JSON format is described in the Section 3.2.1.5, “Resource limits and requests” section.

  • kafka-jvmOptions: a JSON string allowing the JVM running Kafka to be configured. The accepted JSON format is described in the Section 3.2.1.6, “JVM Options” section.

  • zookeeper-nodes: number of Zookeeper nodes

  • zookeeper-image: the Docker image to use for the Zookeeper nodes. Default is determined by the value of the STRIMZI_DEFAULT_ZOOKEEPER_IMAGE environment variable of the Cluster Operator.

  • zookeeper-healthcheck-delay: the initial delay for the liveness and readiness probes for each Zookeeper node. Default is 15

  • zookeeper-healthcheck-timeout: the timeout on the liveness and readiness probes for each Zookeeper node. Default is 5

  • zookeeper-config: a JSON string with Zookeeper configuration. See section Section 3.2.1.2, “Zookeeper Configuration” for more details.

  • zookeeper-storage: a JSON string representing the storage configuration for the Zookeeper nodes. See section Section 3.2.1.3, “Storage” for more details.

  • zookeeper-metrics-config: a JSON string representing the JMX exporter configuration for exposing metrics from Zookeeper nodes. Removing this field means having no metrics exposed.

  • zookeeper-resources: a JSON string configuring the resource limits and requests for Zookeeper broker containers. The accepted JSON format is described in the Section 3.2.1.5, “Resource limits and requests” section.

  • zookeeper-jvmOptions: a JSON string allowing the JVM running Zookeeper to be configured. The accepted JSON format is described in the Section 3.2.1.6, “JVM Options” section.

  • topic-operator-config: a JSON string representing the topic operator configuration. See the Section 3.2.1.7, “Topic Operator” documentation for further details. More info about the topic operator in the related Section 2.5, “Topic Operator” documentation page.

The following is an example of a ConfigMap for a Kafka cluster.

Example Kafka cluster ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
  name: my-cluster
  labels:
    strimzi.io/kind: cluster
    strimzi.io/type: kafka
data:
  kafka-nodes: "3"
  kafka-image: "strimzi/kafka:latest"
  kafka-healthcheck-delay: "15"
  kafka-healthcheck-timeout: "5"
  kafka-config: |-
    {
      "offsets.topic.replication.factor": 3,
      "transaction.state.log.replication.factor": 3,
      "transaction.state.log.min.isr": 2
    }
  kafka-storage: |-
    { "type": "ephemeral" }
  kafka-metrics-config: |-
    {
      "lowercaseOutputName": true,
      "rules": [
          {
            "pattern": "kafka.server<type=(.+), name=(.+)PerSec\\w*><>Count",
            "name": "kafka_server_$1_$2_total"
          },
          {
            "pattern": "kafka.server<type=(.+), name=(.+)PerSec\\w*, topic=(.+)><>Count",
            "name": "kafka_server_$1_$2_total",
            "labels":
            {
              "topic": "$3"
            }
          }
      ]
    }
  zookeeper-nodes: "1"
  zookeeper-image: "strimzi/zookeeper:latest"
  zookeeper-healthcheck-delay: "15"
  zookeeper-healthcheck-timeout: "5"
  zookeeper-config: |-
    {
      "timeTick": 2000,
      "initLimit": 5,
      "syncLimit": 2,
      "autopurge.purgeInterval": 1
    }
  zookeeper-storage: |-
    { "type": "ephemeral" }
  zookeeper-metrics-config: |-
    {
      "lowercaseOutputName": true
    }

The resources created by the cluster operator into the Kubernetes/OpenShift cluster will be the following :

  • [cluster-name]-zookeeper StatefulSet which is in charge to create the Zookeeper node pods

  • [cluster-name]-kafka StatefulSet which is in charge to create the Kafka broker pods

  • [cluster-name]-zookeeper-headless Service needed to have DNS resolve the Zookeeper pods IP addresses directly

  • [cluster-name]-kafka-headless Service needed to have DNS resolve the Kafka broker pods IP addresses directly

  • [cluster-name]-zookeeper Service used by Kafka brokers to connect to Zookeeper nodes as clients

  • [cluster-name]-kafka Service can be used as bootstrap servers for Kafka clients

  • [cluster-name]-zookeeper-metrics-config ConfigMap which contains the Zookeeper metrics configuration and mounted as a volume by the Zookeeper node pods

  • [cluster-name]-kafka-metrics-config ConfigMap which contains the Kafka metrics configuration and mounted as a volume by the Kafka broker pods

Kafka Configuration

The kafka-config field allows detailed configuration of Apache Kafka. This field should contain a JSON object with Kafka configuration options as keys. The values could be in one of the following JSON types:

  • String

  • Number

  • Boolean

The kafka-config field supports all Kafka configuration options with the exception of options related to:

  • Security (Encryption, Authentication and Authorization)

  • Listener configuration

  • Broker ID configuration

  • Configuration of log data directories

  • Inter-broker communication

  • Zookeeper connectivity

Specifically, all configuration options with keys starting with one of the following strings will be ignored:

  • listeners

  • advertised.

  • broker.

  • listener.

  • host.name

  • port

  • inter.broker.listener.name

  • sasl.

  • ssl.

  • security.

  • password.

  • principal.builder.class

  • log.dir

  • zookeeper.connect

  • zookeeper.set.acl

  • authorizer.

  • super.user

All other options will be passed to Kafka. A list of all the available options can be found on the Kafka website. An example kafka-config field is provided below.

Example Kafka configuration
{
  "num.partitions": 1,
  "num.recovery.threads.per.data.dir": 1,
  "default.replication.factor": 3,
  "offsets.topic.replication.factor": 3,
  "transaction.state.log.replication.factor": 3,
  "transaction.state.log.min.isr": 1,
  "log.retention.hours": 168,
  "log.segment.bytes": 1073741824,
  "log.retention.check.interval.ms": 300000,
  "num.network.threads": 3,
  "num.io.threads": 8,
  "socket.send.buffer.bytes": 102400,
  "socket.receive.buffer.bytes": 102400,
  "socket.request.max.bytes": 104857600,
  "group.initial.rebalance.delay.ms": 0
}
NOTE

The cluster operator doesn’t validate the provided configuration. When invalid configuration is provided, the Kafka cluster might not start or might become unstable. In such cases, the configuration in the kafka-config field should be fixed and the cluster operator will roll out the new configuration to all Kafka brokers.

Zookeeper Configuration

The zookeeper-config field allows detailed configuration of Apache Zookeeper. This field should contain a JSON object with Zookeeper configuration options as keys. The values could be in one of the following JSON types:

  • String

  • Number

  • Boolean

The zookeeper-config field supports all Zookeeper configuration options with the exception of options related to:

  • Security (Encryption, Authentication and Authorization)

  • Listener configuration

  • Configuration of data directories

  • Zookeeper cluster composition

Specifically, all configuration options with keys starting with one of the following strings will be ignored:

  • server.

  • dataDir

  • dataLogDir

  • clientPort

  • authProvider

  • quorum.auth

  • requireClientAuthScheme

All other options will be passed to Zookeeper. A list of all the available options can be found on the Zookeeper website. An example zookeeper-config field is provided below.

Example Zookeeper configuration
{
  "timeTick": 2000,
  "initLimit": 5,
  "syncLimit": 2,
  "quorumListenOnAllIPs": true,
  "maxClientCnxns": 0,
  "autopurge.snapRetainCount": 3,
  "autopurge.purgeInterval": 1
}

Selected options have default values:

  • timeTick with default value 2000

  • initLimit with default value 5

  • syncLimit with default value 2

  • autopurge.purgeInterval with default value 1

These options will be automatically configured in case they are not present in the zookeeper-config field.

NOTE

The cluster operator doesn’t validate the provided configuration. When invalid configuration is provided, the Zookeeper cluster might not start or might become unstable. In such cases, the configuration in the zookeeper-config field should be fixed and the cluster operator will roll out the new configuration to all Zookeeper nodes.

Storage

Both Kafka and Zookeeper save data to files.

Strimzi allows to save such data in an "ephemeral" way (using emptyDir) or in a "persistent-claim" way using persistent volumes. It’s possible to provide the storage configuration in the related ConfigMap using a JSON string as value for the kafka-storage and zookeeper-storage fields.

Important
The kafka-storage and zookeeper-storage fields can’t be changed when the cluster is up.

The JSON representation has a mandatory type field for specifying the type of storage to use ("ephemeral" or "persistent-claim").

The "ephemeral" storage is really simple to configure and the related JSON string has the following structure.

Ephemeral storage JSON
{ "type": "ephemeral" }
Warning
If the Zookeeper cluster is deployed using "ephemeral" storage, the Kafka brokers can have problems dealing with Zookeeper node restarts which could happen via updates in the cluster ConfigMap.

In case of "persistent-claim" type the following fields can be provided as well :

  • size: defines the size of the persistent volume claim (i.e 1Gi) - mandatory

  • class : the Kubernetes/OpenShift storage class to use for dynamic volume allocation - optional

  • selector: allows to select a specific persistent volume to use. It contains a matchLabels field which defines an inner JSON object with key:value representing labels for selecting such a volume - optional

  • delete-claim: boolean value which specifies if the persistent volume claim has to be deleted when the cluster is un-deployed. Default is false - optional

Persistent storage JSON with 1Gi as size
{ "type": "persistent-claim", "size": "1Gi" }

This example demonstrates use of a storage class.

Persistent storage JSON using "storage class"
{
  "type": "persistent-claim",
  "size": "1Gi",
  "class": "my-storage-class"
}

Finally, a selector can be used in order to select a specific labeled persistent volume which provides some needed features (i.e. an SSD)

Persistent storage JSON with "match labels" selector
{
  "type": "persistent-claim",
  "size": "1Gi",
  "selector":
  {
    "matchLabels":
    {
      "hdd-type": "ssd"
    }
  },
  "delete-claim": true
}

When the "persistent-claim" is used, other than the resources already described in the Section 3.2.1, “Kafka” section, the following resources are generated :

  • data-[cluster-name]-kafka-[idx] Persistent Volume Claim for the volume used for storing data for the Kafka broker pod [idx]

  • data-[cluster-name]-zookeeper-[idx] Persistent Volume Claim for the volume used for storing data for the Zookeeper node pod [idx]

Metrics

Because Strimzi uses the [JMX exporter](https://github.com/prometheus/jmx_exporter) in order to expose metrics on each node, the JSON string used for metrics configuration in the cluster ConfigMap reflects the related JMX exporter configuration file. For this reason, you can find more information on how to use it in the corresponding GitHub repo.

For more information about using the metrics with Prometheus and Grafana, see Appendix C, Metrics

Resource limits and requests

It is possible to configure Kubernetes resource limits and requests on several containers using a JSON object. The object may have a requests and a limits property, each having the same schema, consisting of cpu and memory properties. The same syntax as Kubernetes' is used for the values of cpu and memory.

Example Resource limits and requests JSON configuration
{
  "requests": {
    "cpu": "1",
    "memory": "2Gi"
  },
  "limits": {
    "cpu": "1",
    "memory": "2Gi"
  }
}
requests/memory

the memory request for the container, corresponding directly to Kubernetes' spec.containers[].resources.requests.memory setting. Kubernetes/OpenShift will ensure the containers have at least this much memory by running the pod on a node with at least as much free memory as all the containers require. Optional with no default.

requests/cpu

the cpu request for the container, corresponding directly to Kubernetes' spec.containers[].resources.requests.cpu setting. Kubernetes/OpenShift will ensure the containers have at least this much CPU by running the pod on a node with at least as much uncommitted CPU as all the containers require. Optional with no default.

limits/memory

the memory limit for the container, corresponding directly to Kubernetes' spec.containers[].resources.limits.memory setting. Kubernetes/OpenShift will limit the containers to this much memory, potentially terminating their pod if they use more. Optional with no default.

limits/cpu

the cpu limit for the container, corresponding directly to Kubernetes' spec.containers[].resources.limits.cpu setting. Kubernetes/OpenShift will cap the containers CPU usage to this limit. Optional with no default.

Minimum Resource Requirements

Testing has shown that the cluster operator functions adequately with 256Mi of memory and 200m CPU when watching two clusters. It is therefore recommended to use these as a minimum when configuring resource requests and not to run it with lower limits than these. Configuring more generous limits is recommended, especially when it’s controlling multiple clusters.

JVM Options

It is possible to configure a subset of available JVM options on Kafka, Zookeeper and Kafka Connect containers using a JSON object. The object has a property for each JVM (java) option which can be configured:

-Xmx

The maximum heap size. See the Section 3.2.1.6.1, “Setting -Xmx section for further details.

-Xms

The initial heap size. Setting the same value for initial and maximum (-Xmx) heap sizes avoids the JVM having to allocate memory after startup, at the cost of possibly allocating more heap than is really needed. For Kafka and Zookeeper pods such allocation could cause unwanted latency. For Kafka Connect avoiding over allocation may be the more important concern, especially in distributed mode where the effects of over-allocation will be multiplied by the number of consumers.

Note
The units accepted by JVM settings such as -Xmx and -Xms are those accepted by the OpenJDK java binary in the corresponding image. Accordingly, 1g or 1G means 1,073,741,824 bytes, and Gi is not a valid unit suffix. This is in contrast to the units used for memory limits and requests, which follow the Kubernetes convention where 1G means 1,000,000,000 bytes, and 1Gi means 1,073,741,824 bytes
Example Resource limits and requests JSON configuration
{
  "-Xmx": "2g",
  "-Xms": "2g"
}

In the above example, the JVM will use 2 GiB (=2,147,483,648 bytes) for its heap. Its total memory usage will be approximately 8GiB.

Setting -Xmx

The default value used for -Xmx depends on whether there is a memory limit for the container:

  • If there is a memory limit, the JVM’s maximum memory will be limited according to the kind of pod (Kafka, Zookeeper, TopicOperator) to an appropriate value less than the limit.

  • Otherwise, when there is no memory limit, the JVM’s maximum memory will be set according to the kind of pod and the RAM available to the container.

Important

Setting -Xmx explicitly is requires some care:

  • The JVM’s overall memory usage will be approximately 4 × the maximum heap, as configured by -Xmx.

  • If -Xmx is set without also setting an appropriate Kubernetes memory limit, it is possible that the container will be killed should the Kubernetes node experience memory pressure (from other Pods running on it).

  • If -Xmx is set without also setting an appropriate Kubernetes memory request, it is possible that the container will scheduled to a node with insufficient memory. In this case the container will start but crash (immediately if -Xms is set to -Xmx, or some later time if not).

When setting -Xmx explicitly, it is recommended to:

  • set the memory request and the memory limit to the same value,

  • use a memory request that is at least 4.5 × the -Xmx,

  • consider setting -Xms to the same value as -Xms.

Furthermore, containers doing lots of disk I/O (such as Kafka broker containers) will need to leave some memory available for use as operating system page cache. On such containers, the request memory should be substantially more than the memory used by the JVM.

Topic Operator

Alongside the Kafka cluster and the Zookeeper ensemble, the cluster operator can also deploy the topic operator. In order to do that, the topic-operator-config field has to be put into the data section of the cluster ConfigMap. This field is a JSON string containing the topic operator configuration. Without this field, the cluster operator doesn’t deploy the topic operator. It is still possible to deploy the topic operator by creating appropriate Kubernetes/OpenShift resources.

The JSON representation of the 'topic-operator-config` has no mandatory fields and if the value is an empty object (just "{ }"), the cluster operator will deploy the topic operator with a default configuration.

The configurable fields are the following :

image

Docker image to use for the topic operator. Default is determined by the value of the STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE environment variable of the Cluster Operator.

watchedNamespace

The Kubernetes namespace (OpenShift project) in which the topic operator watches for topic ConfigMaps. Default is the namespace where the topic operator is running

reconciliationIntervalMs

The interval between periodic reconciliations in milliseconds. Default is 900000 (15 minutes).

zookeeperSessionTimeoutMs

The Zookeeper session timeout in milliseconds. Default is 20000 milliseconds (20 seconds).

topicMetadataMaxAttempts

The number of attempts for getting topics metadata from Kafka. The time between each attempt is defined as an exponential back-off. You might want to increase this value when topic creation could take more time due to its larger size (i.e. many partitions/replicas). Default 6.

resources

an object configuring the resource limits and requests for the topic operator container. The accepted JSON format is described in the Section 3.2.1.5, “Resource limits and requests” section.

Example Topic Operator JSON configuration
{ "reconciliationIntervalMs": "900000", "zookeeperSessionTimeoutMs": "20000" }

More information about these configuration parameters in the related Section 2.5, “Topic Operator” documentation page.

3.2.2. Kafka Connect

In order to configure a Kafka Connect cluster deployment, it’s possible to specify the following fields in the data section of the related ConfigMap:

nodes

Number of Kafka Connect worker nodes. Default is 1

image

The Docker image to use for the Kafka Connect workers. Default is determined by the value of the STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE environment variable of the Cluster Operator. If S2I is used (only on OpenShift), then it should be the related S2I image.

healthcheck-delay

The initial delay for the liveness and readiness probes for each Kafka Connect worker node. Default is 60

healthcheck-timeout

The timeout on the liveness and readiness probes for each Kafka Connect worker node. Default is 5

resources

A JSON string configuring the resource limits and requests for Kafka Connect containers. The accepted JSON format is described in the Section 3.2.1.5, “Resource limits and requests” section.

jvmOptions

A JSON string allowing the JVM running Kafka Connect to be configured. The accepted JSON format is described in the Section 3.2.1.6, “JVM Options” section.

connect-config

A JSON string with Kafka Connect configuration. See section Section 3.2.2.1, “Kafka Connect configuration” for more details.

The following is an example of cluster configuration ConfigMap is the following.

Example Kafka Connect cluster ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
  name: my-connect-cluster
  labels:
    strimzi.io/kind: cluster
    strimzi.io/type: kafka-connect
data:
  nodes: "1"
  image: "strimzi/kafka-connect:latest"
  healthcheck-delay: "60"
  healthcheck-timeout: "5"
  connect-config: |-
    {
      "bootstrap.servers": "my-cluster-kafka:9092"
    }

The resources created by the cluster operator into the Kubernetes/OpenShift cluster will be the following :

  • [connect-cluster-name]-connect Deployment which is in charge to create the Kafka Connect worker node pods

  • [connect-cluster-name]-connect Service which exposes the REST interface for managing the Kafka Connect cluster

Kafka Connect configuration

The connect-config field allows detailed configuration of Apache Kafka Connect and Connect S2I. This field should contain a JSON object with Kafka Connect configuration options as keys. The values could be in one of the following JSON types:

  • String

  • Number

  • Boolean

The connect-config field supports all Kafka Connect configuration options with the exception of options related to:

  • Security (Encryption, Authentication and Authorization)

  • Listener / REST interface configuration

  • Plugin path configuration

Specifically, all configuration options with keys starting with one of the following strings will be ignored:

  • ssl.

  • sasl.

  • security.

  • listeners

  • plugin.path

  • rest.

All other options will be passed to Kafka Connect. A list of all the available options can be found on the Kafka website. An example connect-config field is provided below.

Example Kafka Connect configuration
{
  "bootstrap.servers": "my-cluster-kafka:9092",
  "group.id": "my-connect-cluster",
  "offset.storage.topic": "my-connect-cluster-offsets",
  "config.storage.topic": "my-connect-cluster-configs",
  "status.storage.topic": "my-connect-cluster-status",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter.schemas.enable": true,
  "value.converter.schemas.enable": true,
  "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "internal.key.converter.schemas.enable": false,
  "internal.value.converter.schemas.enable": false,
  "config.storage.replication.factor": 3,
  "offset.storage.replication.factor": 3,
  "status.storage.replication.factor": 3
}

Selected options have default values:

  • group.id with default value connect-cluster

  • offset.storage.topic with default value connect-cluster-offsets

  • config.storage.topic with default value connect-cluster-configs

  • status.storage.topic with default value connect-cluster-status

  • key.converter with default value org.apache.kafka.connect.json.JsonConverter

  • value.converter with default value org.apache.kafka.connect.json.JsonConverter

  • internal.key.converter with default value org.apache.kafka.connect.json.JsonConverter

  • internal.value.converter with default value org.apache.kafka.connect.json.JsonConverter

  • internal.key.converter.schemas.enable with default value false

  • internal.value.converter.schemas.enable with default value false

These options will be automatically configured in case they are not present in the connect-config field.

INFO

The cluster operator doesn’t validate the provided configuration. When invalid configuration is provided, the Kafka Connect cluster might not start or might become unstable. In such cases, the configuration in the connect-config field should be fixed and the cluster operator will roll out the new configuration to all Kafka Connect instances.

3.3. Provisioning Role-Based Access Control (RBAC) for the operator

For the operator to function it needs permission within the Kubernetes/OpenShift cluster to interact with the resources it manages (ConfigMaps, Pods, Deployments, StatefulSets, Services etc). Such permission is described in terms of Kubernetes/OpenShift role-based access controls.

3.3.1. Using a ServiceAccount

The operator is best run using a ServiceAccount:

Example ServiceAccount for the Cluster Operator
apiVersion: v1
kind: ServiceAccount
metadata:
  name: strimzi-cluster-operator
  labels:
    app: strimzi

The Deployment of the operator then needs to specify this in the serviceAccountName of its template´s spec:

Partial example Deployment for the Cluster Operator
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: strimzi-cluster-operator
spec:
  replicas: 1
  template:
    metadata:
      labels:
        name: strimzi-cluster-operator
    spec:
      serviceAccountName: strimzi-cluster-operator
      containers:
# etc ...

Note line 12, where the the strimzi-cluster-operator ServiceAccount is specified as the serviceAccountName.

3.3.2. Defining a Role

The operator needs to operate using a Role that gives it access to the necessary resources

Example Role for the Cluster Operator
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: Role
metadata:
  name: strimzi-cluster-operator-role
  labels:
    app: strimzi
rules:
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - get
  - list
  - watch
  - delete
- apiGroups:
  - ""
  resources:
  - services
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - ""
  resources:
  - endpoints
  verbs:
  - get
  - list
  - watch
- apiGroups:
  - "extensions"
  resources:
  - deployments
  - deployments/scale
  - replicasets
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - "apps"
  resources:
  - deployments
  - deployments/scale
  - deployments/status
  - statefulsets
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - ""
  resources:
  - events
  verbs:
  - create
# OpenShift S2I requirements
- apiGroups:
  - "extensions"
  resources:
  - replicationcontrollers
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - apps.openshift.io
  resources:
  - deploymentconfigs
  - deploymentconfigs/scale
  - deploymentconfigs/status
  - deploymentconfigs/finalizers
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - build.openshift.io
  resources:
  - buildconfigs
  - builds
  verbs:
  - create
  - delete
  - get
  - list
  - patch
  - watch
  - update
- apiGroups:
  - image.openshift.io
  resources:
  - imagestreams
  - imagestreams/status
  verbs:
  - create
  - delete
  - get
  - list
  - watch
  - patch
  - update
- apiGroups:
  - ""
  resources:
  - replicationcontrollers
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update

3.3.3. Defining a RoleBinding

Finally, the operator needs a RoleBinding which associates its Role with its ServiceAccount

Example RoleBinding for the Cluster Operator
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: RoleBinding
metadata:
  name: strimzi-cluster-operator-binding
  labels:
    app: strimzi
subjects:
  - kind: ServiceAccount
    name: strimzi-cluster-operator
roleRef:
  kind: Role
  name: strimzi-cluster-operator-role
  apiGroup: rbac.authorization.k8s.io

3.4. Operator configuration

The operator itself can be configured through the following environment variables.

STRIMZI_NAMESPACE

Required. A comma-separated list of namespaces that the operator should operate in. The Cluster Operator deployment might use the Kubernetes Downward API to set this automatically to the namespace the Cluster Operator is deployed in. See the example below:

env:
  - name: STRIMZI_NAMESPACE
    valueFrom:
      fieldRef:
        fieldPath: metadata.namespace
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS

Optional, default: 120000 ms. The interval between periodic reconciliations, in milliseconds.

STRIMZI_OPERATION_TIMEOUT_MS

Optional, default: 60000 ms. The timeout for internal operations, in milliseconds. This value should be increased when using Strimzi on clusters where regular Kubernetes operations take longer than usual (because of slow downloading of Docker images, for example).

STRIMZI_DEFAULT_KAFKA_IMAGE

Optional, default strimzi/kafka:latest. The image name to use as a default when deploying Kafka, if no image is specified as the kafka-image in the Kafka cluster ConfigMap.

STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE

Optional, default strimzi/kafka-connect:latest. The image name to use as a default when deploying Kafka Connect, if no image is specified as the image in the Kafka Connect cluster ConfigMap.

STRIMZI_DEFAULT_KAFKA_CONNECT_S2I_IMAGE

Optional, default strimzi/kafka-connect-s2i:latest. The image name to use as a default when deploying Kafka Connect S2I, if no image is specified as the image in the cluster ConfigMap.

STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE

Optional, default strimzi/topic-operator:latest. The image name to use as a default when deploying the topic operator, if no image is specified as the image in the topic operator config of the Kafka cluster ConfigMap.

STRIMZI_DEFAULT_ZOOKEEPER_IMAGE

Optional, default strimzi/zookeeper:latest. The image name to use as a default when deploying Zookeeper, if no image is specified as the zookeeper-image in the Kafka cluster ConfigMap.

3.4.1. Watching multiple namespaces

The STRIMZI_NAMESPACE environment variable can be used to configure a single operator instance to operate in multiple namespaces. For each namespace given, the operator will watch for cluster ConfigMaps and perform periodic reconciliation. To be able to do this, the operator’s ServiceAccount needs access to the necessary resources in those other namespaces. This can be done by creating an additional RoleBinding in each of those namespaces, associating the operator’s ServiceAccount (strimzi-cluster-operator in the examples) with the operator’s Role (strimzi-operator-role in the examples).

Suppose, for example, that a operator deployed in namespace foo needs to operate in namespace bar. The following RoleBinding would grant the necessary permissions:

Example RoleBinding for a operator to operate in namespace bar
apiVersion: v1
kind: RoleBinding
metadata:
  name: strimzi-cluster-operator-binding-bar
  namespace: bar
  labels:
    app: strimzi
subjects:
  - kind: ServiceAccount
    name: strimzi-cluster-operator
    namespace: foo
roleRef:
  kind: Role
  name: strimzi-cluster-operator-role
  apiGroup: v1

4. Topic Operator

The Topic Operator is in charge of managing topics in a Kafka cluster. The Topic Operator is deployed as a process running inside a Kubernetes/OpenShift cluster. It can be deployed through the Cluster Operator or "manually" through provided YAML files.

Topic Operator

The role of the topic operator is to keep a set of Kubernetes ConfigMaps describing Kafka topics in-sync with corresponding Kafka topics.

Specifically:

  • if a config map is created, the operator will create the topic it describes

  • if a config map is deleted, the operator will delete the topic it describes

  • if a config map is changed, the operator will update the topic it describes

And also, in the other direction:

  • if a topic is created, the operator will create a config map describing it

  • if a topic is deleted, the operator will create the config map describing it

  • if a topic is changed, the operator will update the config map describing it

This is beneficial to a Kubernetes/OpenShift-centric style of deploying applications, because it allows you to declare a ConfigMap as part of your applications deployment and the operator will take care of creating the topic for you, so your application just needs to deal with producing and/or consuming from the necessary topics.

Should the topic be reconfigured, reassigned to different Kafka nodes etc, the ConfigMap will always be up to date.

4.1. Reconciliation

A fundamental problem that the operator has to solve is that there is no single source of truth: Both the ConfigMap and the topic can be modified independently of the operator. Complicating this, the topic operator might not always be able to observe changes at each end in real time (the operator might be down etc).

To resolve this, the operator maintains its own private copy of the information about each topic. When a change happens either in the Kafka cluster, or in Kubernetes/OpenShift, it looks at both the state of the other system, and at its private copy in order to determine what needs to change to keep everything in sync. The same thing happens whenever the operator starts, and periodically while its running.

For example, suppose the topic operator is not running, and a ConfigMap "my-topic" gets created. When the operator starts it will lack a private copy of "my-topic", so it can infer that the ConfigMap has been created since it was last running. The operator will create the topic corresponding to "my-topic" and also store a private copy of the metadata for "my-topic".

The private copy allows the operator to cope with scenarios where the topic config gets changed both in Kafka and in Kubernetes/OpenShift, so long as the changes are not incompatible (e.g. both changing the same topic config key, but to different values). In the case of incompatible changes, the Kafka configuration wins, and the ConfigMap will be updated to reflect that. Defaulting to the Kafka configuration ensures that, in the worst case, data won’t be lost.

The private copy is held in the same ZooKeeper ensemble used by Kafka itself. This mitigates availability concerns, because if ZooKeeper is not running then Kafka itself cannot run, so the operator will be no less available than it would even if it was stateless.

4.2. Usage Recommendations

  1. Try to either always operate on ConfigMaps or always operate directly on topics.

  2. When creating a ConfigMap:

    • Remember that you can’t easily change the name later.

    • Choose a name for the ConfigMap that reflects the name of the topic it describes.

    • Ideally the ConfigMap’s metadata.name should be the same as its data.name. To do this, the topic name will have to be a valid Kubernetes resource name.

  3. When creating a topic:

    • Remember that you can’t change the name later.

    • It’s best to use a name that is a valid Kubernetes resource name, otherwise the operator will have to sanitize the name when creating the corresponding ConfigMap.

4.3. Format of the ConfigMap

By default, the operator only considers ConfigMaps having the label strimzi.io/kind=topic, but this is configurable via the STRIMZI_CONFIGMAP_LABELS environment variable.

The data of such ConfigMaps supports the following keys:

  • name The name of the topic. Optional; if this is absent the name of the ConfigMap itself is used.

  • partitions The number of partitions of the Kafka topic. This can be increased, but not decreased. Required.

  • replicas The number of replicas of the Kafka topic. This cannot be larger than the number of nodes in the Kafka cluster. Required.

  • config A string in JSON format representing the topic configuration. Optional, defaulting to the empty set.

4.4. Example

Suppose you want to create a topic called "orders" with 10 partitions and 2 replicas.

You would first prepare a ConfigMap:

Topic declaration ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
  name: orders
  labels:
    strimzi.io/kind: topic
    strimzi.io/cluster: my-cluster
data:
  name: orders
  partitions: "10"
  replicas: "2"
Note
When the Topic Operator is deployed manually the strimzi.io/cluster label is not necessary.

Because the config key is omitted from the data the topic’s config will be empty, and thus default to the Kafka broker default.

You would then create this ConfigMap in Kubernetes:

kubectl create -f orders-topic.yaml

Or in OpenShift:

oc create -f orders-topic.yaml

That’s it! The operator will create the topic "orders".

Suppose you later want to change the log segment retention time to 4 days, you can update orders-topic.yaml like this:

Topic declaration ConfigMap with "config" update
apiVersion: v1
kind: ConfigMap
metadata:
  name: orders
  labels:
    strimzi.io/kind: topic
    strimzi.io/cluster: my-cluster
data:
  name: orders
  partitions: "10"
  replicas: "2"
  config: '{ "retention.ms":"345600000" }'

And use oc update -f or kubectl update -f to up update the ConfigMap in OpenShift/Kubernetes.

4.5. Unsupported operations

  • You can’t change the data.name key in a ConfigMap, because Kafka doesn’t support changing topic names.

  • You can’t decrease the data.partitions, because Kafka doesn’t support this.

  • You should exercise caution in increasing data.partitions for topics with keys, as it will change how records are partitioned.

4.6. Operator environment

The operator is configured from environment variables:

  • STRIMZI_CONFIGMAP_LABELS – The Kubernetes label selector used to identify ConfigMaps to be managed by the operator. Default: strimzi.io/kind=topic.

  • STRIMZI_ZOOKEEPER_SESSION_TIMEOUT_MS – The Zookeeper session timeout, in milliseconds. For example 10000. Default: 20000 (20 seconds).

  • STRIMZI_KAFKA_BOOTSTRAP_SERVERS – The list of Kafka bootstrap servers. This variable is mandatory.

  • STRIMZI_ZOOKEEPER_CONNECT – The Zookeeper connection information. This variable is mandatory.

  • STRIMZI_FULL_RECONCILIATION_INTERVAL_MS – The interval between periodic reconciliations, in milliseconds.

  • STRIMZI_TOPIC_METADATA_MAX_ATTEMPTS – The number of attempts for getting topics metadata from Kafka. The time between each attempt is defined as an exponential back-off. You might want to increase this value when topic creation could take more time due to its larger size (i.e. many partitions/replicas). Default 6.

If the operator configuration needs to be changed the process must be killed and restarted. Since the operator is intended to execute within Kubernetes, this can be achieved by deleting the pod.

4.7. Resource limits and requests

The Topic Operator can run with resource limits:

  • When it is deployed by the Cluster Operator these can be specified in the resources key of the topic-operator-config.

  • When it is not deployed by the Cluster Operator these can be specified on the Deployment in the usual way.

4.7.1. Minimum Resource Requirements

Testing has shown that the topic operator functions adequately with 96Mi of memory and 100m CPU when watching two topics. It is therefore recommended to use these as a minimum when configuring resource requests and not to run it with lower limits than these. If the Kafka cluster has more than a handful of topics more generous requests and limits will be necessary.

Appendix A: Frequently Asked Questions

A.1. Cluster Operator

A.1.1. Log contains warnings about failing to acquire lock

For each cluster, the Cluster Operator always executes only one operation at a time. The Cluster Operator uses locks to make sure that there are never two parallel operations running for the same cluster. In case an operation requires more time to complete, other operations will wait until it is completed and the lock is released.

INFO

Examples of cluster operations are cluster creation, rolling update, scale down or scale up etc.

If the wait for the lock takes too long, the operation times out and the following warning message will be printed to the log:

---
2018-03-04 17:09:24 WARNING AbstractClusterOperations:290 - Failed to acquire lock for kafka cluster lock::kafka::myproject::my-cluster
---

Depending on the exact configuration of STRIMZI_FULL_RECONCILIATION_INTERVAL_MS and STRIMZI_OPERATION_TIMEOUT_MS, this warning message may appear regularly without indicating any problems. The operations which time out will be picked up by the next periodic reconciliation. It will try to acquire the lock again and execute.

Should this message appear periodically even in situations when there should be no other operations running for a given cluster, it might indicate that due to some error the lock was not properly released. In such cases it is recommended to restart the cluster operator.

Appendix B: Installing Kubernetes and OpenShift cluster

The easiest way to get started with Kubernetes or OpenShift is using the Minikube, Minishift or oc cluster up utilities. This section provides basic guidance on how to use them. More details are provided on the websites of the tools themselves.

B.1. Kubernetes

In order to interact with a Kubernetes cluster the kubectl utility needs to be installed.

The easiest way to get a running Kubernetes cluster is using Minikube. Minikube can be downloaded and installed from the Kubernetes website. Depending on the number of brokers you want to deploy inside the cluster and if you need Kafka Connect running as well, it could be worth running Minikube at least with 4 GB of RAM instead of the default 2 GB. Once installed, it can be started using:

minikube start --memory 4096

B.2. OpenShift

In order to interact with an OpenShift cluster, the oc utility is needed.

An OpenShift cluster can be started in two different ways. The oc utility can start a cluster locally using the command:

oc cluster up

This command requires Docker to be installed. More information about this way can be found here.

Another option is to use Minishift. MiniShift is an OpenShift installation within a VM. It can be downloaded and installed from the Minishift website. Depending on the number of brokers you want to deploy inside the cluster and if you need Kafka Connect running as well, it could be worth running Minishift at least with 4 GB of RAM instead of the default 2 GB. Once installed, Minishift can be started using the following command:

minishift start --memory 4GB

Appendix C: Metrics

This section describes how to deploy a Prometheus server for scraping metrics from the Kafka cluster and showing them using a Grafana dashboard. The resources provided are examples to show how Kafka metrics can be stored in Prometheus: They are not a recommended configuration, and further support should be available from the Prometheus and Grafana communities.

When adding Prometheus and Grafana servers to an Apache Kafka deployment using minikube or minishift, the memory available to the virtual machine should be increased (to 4 GB of RAM, for example, instead of the default 2 GB). Information on how to increase the default amount of memory can be found in the following section Appendix B, Installing Kubernetes and OpenShift cluster.

C.1. Deploying on OpenShift

C.1.1. Prometheus

The Prometheus server configuration uses a service discovery feature in order to discover the pods in the cluster from which it gets metrics. In order to have this feature working, it’s necessary for the service account used for running the Prometheus service pod to have access to the API server to get the pod list. By default the service account prometheus-server is used.

export NAMESPACE=[namespace]
oc login -u system:admin
oc create sa prometheus-server
oc adm policy add-cluster-role-to-user cluster-reader system:serviceaccount:${NAMESPACE}:prometheus-server
oc login -u developer

where [namespace] is the namespace/project where the Apache Kafka cluster was deployed.

Finally, create the Prometheus service by running

oc create -f https://raw.githubusercontent.com/strimzi/strimzi/master/metrics/examples/prometheus/kubernetes.yaml

C.1.2. Grafana

A Grafana server is necessary only to get a visualisation of the Prometheus metrics.

To deploy Grafana on OpenShift, the following commands should be executed:

oc create -f https://raw.githubusercontent.com/strimzi/strimzi/master/metrics/examples/grafana/kubernetes.yaml

C.2. Deploying on Kubernetes

C.2.1. Prometheus

The Prometheus server configuration uses a service discovery feature in order to discover the pods in the cluster from which it gets metrics. If the RBAC is enabled in your Kubernetes deployment then in order to have this feature working, it’s necessary for the service account used for running the Prometheus service pod to have access to the API server to get the pod list. By default the service account prometheus-server is used.

export NAMESPACE=[namespace]
kubectl create sa prometheus-server
kubectl create -f https://raw.githubusercontent.com/strimzi/strimzi/master/metrics/examples/prometheus/cluster-reader.yaml
kubectl create clusterrolebinding read-pods-binding --clusterrole=cluster-reader --serviceaccount=${NAMESPACE}:prometheus-server

where [namespace] is the namespace/project where the Apache Kafka cluster was deployed.

Finally, create the Prometheus service by running

kubectl apply -f https://raw.githubusercontent.com/strimzi/strimzi/master/metrics/examples/prometheus/kubernetes.yaml

C.2.2. Grafana

A Grafana server is necessary only to get a visualisation of Prometheus metrics.

To deploy Grafana on Kubernetes, the following commands should be executed:

kubectl apply -f https://raw.githubusercontent.com/strimzi/strimzi/master/metrics/examples/grafana/kubernetes.yaml

C.3. Grafana dashboard

As an example, and in order to visualize the exported metrics in Grafana, the simple dashboard kafka-dashboard.json file is provided. The Prometheus data source, and the above dashboard, can be set up in Grafana by following these steps.

Note
For accessing the dashboard, you can use the port-forward command for forwarding traffic from the Grafana pod to the host. For example, you can access the Grafana UI by running oc port-forward grafana-1-fbl7s 3000:3000 (or using kubectl instead of oc) and then pointing a browser to http://localhost:3000.
  1. Access to the Grafana UI using admin/admin credentials.

    Grafana login
  2. Click on the "Add data source" button from the Grafana home in order to add Prometheus as data source.

    Grafana home
  3. Fill in the information about the Prometheus data source, specifying a name and "Prometheus" as type. In the URL field, the connection string to the Prometheus server (i.e. http://prometheus:9090) should be specified. After "Add" is clicked, Grafana will test the connection to the data source.

    Add Prometheus data source
  4. From the top left menu, click on "Dashboards" and then "Import" to open the "Import Dashboard" window where the provided kafka-dashboard.json file can be imported or its content pasted.

    Add Grafana dashboard
  5. After importing the dashboard, the Grafana home should show with some initial metrics about CPU and JVM memory usage. When the Kafka cluster is used (creating topics and exchanging messages) the other metrics, like messages in and bytes in/out per topic, will be shown.

    Kafka dashboard