When building data pipelines using Kafka Connect, or replicating data using MirrorMaker, offsets are used to keep track of the flow of data. Sink connectors use Kafka’s standard consumer offset mechanism, while source connectors store offsets in a custom format within a Kafka topic.
To manage connector offsets, rather than directly interacting with the underlying Kafka topics, you should make use of the following endpoints from the Connect REST API:
GET /connectors/{connector}/offsets
to list offsetsPATCH /connectors/{connector}/offsets
to alter offsetsDELETE /connectors/{connector}/offsets
to reset offsets
Some common use-cases for managing connector offsets are:
- skipping a poison record
- replaying records
- specifying a starting offset for a new connector
From Strimzi 0.44 onwards you can manage connector offsets directly in the KafkaConnector
and KafkaMirrorMaker2
custom resources.
This blog post steps through how you can use this new functionality.
The process is very similar for both Kafka Connect and MirrorMaker, so we’ll demonstrate how to do it with Kafka Connect and then explain how the process applies to MirrorMaker.
Before we begin
If you want to follow along the steps in this blog post you need a Kubernetes cluster containing the Strimzi operator, a Kafka cluster, a Connect cluster, and a running connector. First run through the Strimzi quickstart guide to deploy your Strimzi operator and Kafka cluster.
Once you have a Kafka cluster you can deploy Connect and a connector using the following commands:
- Deploy a Connect cluster
kubectl apply -f https://strimzi.io/examples/latest/connect/kafka-connect-build.yaml -n kafka
- Wait for Connect to be ready
kubectl wait kafkaconnect/my-connect-cluster --for=condition=Ready --timeout=300s -n kafka
- Enable
KafkaConnector
resources on your Connect clusterkubectl annotate kafkaconnect my-connect-cluster strimzi.io/use-connector-resources=true -n kafka
- Create a source connector
kubectl apply -f https://strimzi.io/examples/latest/connect/source-connector.yaml -n kafka
- Wait for the source connector to be ready
kubectl wait kafkaconnector/my-source-connector --for=condition=Ready --timeout=300s -n kafka
Listing offsets
To list offsets, edit your KafkaConnector
resource by running kubectl edit kafkaconnector my-source-connector -n kafka
and add the configuration to specify where the offsets should be output:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
# ...
spec:
#...
listOffsets:
toConfigMap:
name: my-connector-offsets
#...
This tells Strimzi to write the offsets to a ConfigMap called my-connector-offsets
.
To trigger Strimzi to get the latest offsets, annotate your KafkaConnector
resource:
$ kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=list -n kafka
Once you have annotated the resource, Strimzi creates a new ConfigMap containing the offsets:
$ kubectl get configmap my-connector-offsets -n kafka -oyaml
apiVersion: v1
kind: ConfigMap
metadata:
creationTimestamp: "2024-09-26T10:20:15Z" (1)
labels:
strimzi.io/cluster: my-connect-cluster
name: my-connector-offsets
namespace: kafka
ownerReferences: (2)
- apiVersion: kafka.strimzi.io/v1beta2
blockOwnerDeletion: false
controller: false
kind: KafkaConnector
name: my-source-connector
uid: 637e3be7-bd96-43ab-abde-c55b4c4550e0
resourceVersion: "66951"
uid: 641d60a9-36eb-4f29-9895-8f2c1eb9638e
data: (3)
offsets.json: |-
{
"offsets" : [ {
"partition" : {
"filename" : "/opt/kafka/LICENSE"
},
"offset" : {
"position" : 15295
}
} ]
}
- If the ConfigMap doesn’t already exist, Strimzi will create it automatically.
- The owner reference points to your
KafkaConnector
resource. To provide a custom owner reference, for example to prevent the ConfigMap being deleted when theKafkaConnector
resource is, create the ConfigMap in advance and set an owner reference manually. - Strimzi puts the offsets into a field called
offsets.json
. It doesn’t overwrite any other fields when updating an existing ConfigMap.
You can check that the output matches the results from the Connect REST API by calling the GET /connectors/{connector}/offsets
endpoint directly:
$ kubectl exec -n kafka -it my-connect-cluster-connect-0 -- curl localhost:8083/connectors/my-source-connector/offsets
{"offsets":[{"partition":{"filename":"/opt/kafka/LICENSE"},"offset":{"position":15295}}]}
Altering offsets
To alter a connector’s offsets, you need to create a ConfigMap that instructs Strimzi on the new offsets to apply. All sink connectors use the same format, however for source connectors it varies. The easiest way to create a ConfigMap for altering offsets is to reuse the one that Strimzi originally wrote the offsets to.
To alter connector offsets the connector also needs to be stopped.
Edit the KafkaConnector
resource to set the my-connector-offsets
ConfigMap as the source of offsets for the alter operation, and set the state
as stopped
:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
# ...
spec:
#...
state: stopped
alterOffsets:
fromConfigMap:
name: my-connector-offsets
#...
The status
in the KafkaConnector
is updated once Strimzi has stopped the connector.
Now edit the my-connector-offsets
ConfigMap to change the position
to, for example, 10:
apiVersion: v1
kind: ConfigMap
# ...
data:
offsets.json: |-
{
"offsets" : [ {
"partition" : {
"filename" : "/opt/kafka/LICENSE"
},
"offset" : {
"position" : 10
}
} ]
}
Finally trigger the operator to alter offsets by annotating the resource:
$ kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=alter -n kafka
Strimzi removes the strimzi.io/connector-offsets
annotation from the resource once the offsets have been successfully updated.
You can also verify this directly:
$ kubectl exec -n kafka -it my-connect-cluster-connect-0 -- curl localhost:8083/connectors/my-source-connector/offsets
{"offsets":[{"partition":{"filename":"/opt/kafka/LICENSE"},"offset":{"position":10}}]}
Resuming the connector
Once Strimzi has altered the offsets you need to manually resume the connector.
Edit the KafkaConnector
resource to set the state
to running
:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
# ...
spec:
#...
state: running
#...
Resetting offsets
The final action you can perform is to reset the connector offsets.
For this action the connector also needs to be in a stopped
state, but you don’t need a ConfigMap.
Reset the offsets for your connector by annotating the resource:
$ kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=reset -n kafka
Once completed, the connector offsets will be empty:
$ kubectl exec -n kafka -it my-connect-cluster-connect-0 -- curl localhost:8083/connectors/my-source-connector/offsets
{"offsets":[]}
Similar to altering offsets, make sure you manually resume the connector by changing the state
in KafkaConnector
to running
.
Managing offsets for MirrorMaker
In addition to connectors managed via the KafkaConnector
resource, you can also manage the connectors that are deployed as part of a KafkaMirrorMaker2
resource.
When using a KafkaMirrorMaker2
resource, the configurations for the ConfigMaps for listing and altering offsets, as well as the state
configuration for stopping and resuming the connector, are specified on a per-connector basis. For example:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: my-mirror-maker-2
spec:
# ...
mirrors:
- # ...
sourceConnector:
listOffsets:
toConfigMap:
name: my-connector-offsets
alterOffsets:
fromConfigMap:
name: my-connector-offsets
state: stopped
# ...
Strimzi allows you to list, alter and reset offsets for all the MirrorMaker connectors (MirrorSourceConnector
, MirrorCheckpointConnector
, MirrorHeartbeatConnector
).
However, be aware that the only connector that currently actively uses its offsets is the MirrorSourceConnector
.
Listing offsets can be useful for the MirrorCheckpointConnector
and MirrorHeartbeatConnector
to track progress.
However, altering or resetting offsets for these connectors is rarely necessary.
Strimzi only allows you to perform actions on one connector within a single mirror at a time.
Therefore, to initiate an action from a KafkaMirrorMaker2
resource you must apply two annotations:
strimzi.io/connector-offsets
strimzi.io/mirrormaker-connector
Set strimzi.io/connector-offsets
to one of list
, alter
or reset
.
At the same time set strimzi.io/mirrormaker-connector
to the name of your connector.
Strimzi names the connectors using the format [SOURCE_ALIAS]->[TARGET_ALIAS].[CONNECTOR_TYPE]
, for example east-kafka->west-kafka.MirrorSourceConnector
.
You can use a single command to annotate the resource with both annotations.
For example, this command lists offsets for a connector called east-kafka->west-kafka.MirrorSourceConnector
:
$ kubectl annotate kafkamirrormaker2 my-mirror-maker-2 strimzi.io/connector-offsets=list strimzi.io/mirrormaker-connector="east-kafka->west-kafka.MirrorSourceConnector" -n kafka
When listing and altering offsets for MirrorMaker connectors, Strimzi uses the connector name in the data field replacing ->
with --
.
For example, the above command to list offsets for the connector east-kafka->west-kafka.MirrorSourceConnector
results in a ConfigMap containing:
apiVersion: v1
kind: ConfigMap
# ...
data:
east-kafka--west-kafka.MirrorSourceConnector.json: |
{
"offsets": [
{
"partition": {
"cluster": "east-kafka",
"partition": 0,
"topic": "mirrormaker2-cluster-configs"
},
"offset": {
"offset": 0
}
}
]
}
Conclusion
Now you can list, alter, and reset offsets of your connectors using the KafkaConnector
and KafkaMirrorMaker2
custom resources.
Use this new feature to manage the flow of data in your Connect data pipelines, whether that’s to skip poison records, replay records, or even to check on the status of your connector.