Hide ToC

Strimzi Kafka Bridge Documentation (In Development)

Table of Contents

1. Kafka Bridge overview

Use the Kafka Bridge to make HTTP requests to a Kafka cluster.

You can use the Kafka Bridge to integrate HTTP client applications with your Kafka cluster.

HTTP client integration

Internal and external HTTP producers and consumers exchange data with the Kafka brokers through the Kafka Bridge

1.1. Running the Kafka Bridge

Install the Kafka Bridge to run in the same environment as your Kafka cluster.

You can download and add the Kafka Bridge installation artifacts to your host machine. To try out the Kafka Bridge in your local environment, see the Kafka Bridge quickstart.

It’s important to note that each instance of the Kafka Bridge maintains its own set of in-memory consumers (and subscriptions) that connect to the Kafka Brokers on behalf of the HTTP clients. This means that each HTTP client must maintain affinity to the same Kafka Bridge instance in order to access any subscriptions that are created. Additionally, when an instance of the Kafka Bridge restarts, the in-memory consumers and subscriptions are lost. It is the responsibility of the HTTP client to recreate any consumers and subscriptions if the Kafka Bridge restarts.

1.1.1. Running the Kafka Bridge on Kubernetes

If you deployed Strimzi on Kubernetes, you can use the Strimzi Cluster Operator to deploy the Kafka Bridge to the Kubernetes cluster. Configure and deploy the Kafka Bridge as a KafkaBridge resource. You’ll need a running Kafka cluster that was deployed by the Cluster Operator in a Kubernetes namespace. You can configure your deployment to access the Kafka Bridge outside the Kubernetes cluster.

HTTP clients must maintain affinity to the same instance of the Kafka Bridge to access any consumers or subscriptions that they create. Hence, running multiple replicas of the Kafka Bridge per Kubernetes Deployment is not recommended. If the Kafka Bridge pod restarts (for instance, due to Kubernetes relocating the workload to another node), the HTTP client must recreate any consumers or subscriptions.

For information on deploying and configuring the Kafka Bridge as a KafkaBridge resource, see the Strimzi documentation.

1.2. Kafka Bridge interface

The Kafka Bridge provides a RESTful interface that allows HTTP-based clients to interact with a Kafka cluster.  It offers the advantages of a web API connection to Strimzi, without the need for client applications to interpret the Kafka protocol.

The API has two main resources — consumers and topics — that are exposed and made accessible through endpoints to interact with consumers and producers in your Kafka cluster. The resources relate only to the Kafka Bridge, not the consumers and producers connected directly to Kafka.

1.2.1. HTTP requests

The Kafka Bridge supports HTTP requests to a Kafka cluster, with methods to:

  • Send messages to a topic.

  • Retrieve messages from topics.

  • Retrieve a list of partitions for a topic.

  • Create and delete consumers.

  • Subscribe consumers to topics, so that they start receiving messages from those topics.

  • Retrieve a list of topics that a consumer is subscribed to.

  • Unsubscribe consumers from topics.

  • Assign partitions to consumers.

  • Commit a list of consumer offsets.

  • Seek on a partition, so that a consumer starts receiving messages from the first or last offset position, or a given offset position.

The methods provide JSON responses and HTTP response code error handling. Messages can be sent in JSON or binary formats.

Clients can produce and consume messages without the requirement to use the native Kafka protocol.

Additional resources

1.3. Kafka Bridge OpenAPI specification

Kafka Bridge APIs use the OpenAPI Specification (OAS). OAS provides a standard framework for describing and implementing HTTP APIs.

The Kafka Bridge OpenAPI specification is in JSON format. You can find the OpenAPI JSON files in the src/main/resources/ folder of the Kafka Bridge source download files. The download files are available from the GitHub release page.

You can also use the GET /openapi method to retrieve the OpenAPI v2 specification in JSON format.

Additional resources

1.4. Securing connectivity to the Kafka cluster

You can configure the following between the Kafka Bridge and your Kafka cluster:

  • TLS or SASL-based authentication

  • A TLS-encrypted connection

You configure the Kafka Bridge for authentication through its properties file.

You can also use ACLs in Kafka brokers to restrict the topics that can be consumed and produced using the Kafka Bridge.

Note
Use the KafkaBridge resource to configure authentication when you are running the Kafka Bridge on Kubernetes.

1.5. Securing the Kafka Bridge HTTP interface

Authentication and encryption between HTTP clients and the Kafka Bridge is not supported directly by the Kafka Bridge. Requests sent from clients to the Kafka Bridge are sent without authentication or encryption. Requests must use HTTP rather than HTTPS.

You can combine the Kafka Bridge with the following tools to secure it:

  • Network policies and firewalls that define which pods can access the Kafka Bridge

  • Reverse proxies (for example, OAuth 2.0)

  • API gateways

1.6. Requests to the Kafka Bridge

Specify data formats and HTTP headers to ensure valid requests are submitted to the Kafka Bridge.

1.6.1. Content Type headers

API request and response bodies are always encoded as JSON.

  • When performing consumer operations, POST requests must provide the following Content-Type header if there is a non-empty body:

    Content-Type: application/vnd.kafka.v2+json
  • When performing producer operations, POST requests must provide Content-Type headers specifying the embedded data format of the messages produced. This can be either json, binary or text.

    Embedded data format Content-Type header

    JSON

    Content-Type: application/vnd.kafka.json.v2+json

    Binary

    Content-Type: application/vnd.kafka.binary.v2+json

    Text

    Content-Type: application/vnd.kafka.text.v2+json

The embedded data format is set per consumer, as described in the next section.

The Content-Type must not be set if the POST request has an empty body. An empty body can be used to create a consumer with the default values.

1.6.2. Embedded data format

The embedded data format is the format of the Kafka messages that are transmitted, over HTTP, from a producer to a consumer using the Kafka Bridge. Three embedded data formats are supported: JSON, binary and text.

When creating a consumer using the /consumers/groupid endpoint, the POST request body must specify an embedded data format of either JSON, binary or text. This is specified in the format field, for example:

{
  "name": "my-consumer",
  "format": "binary", # (1)
  # ...
}
  1. A binary embedded data format.

The embedded data format specified when creating a consumer must match the data format of the Kafka messages it will consume.

If you choose to specify a binary embedded data format, subsequent producer requests must provide the binary data in the request body as Base64-encoded strings. For example, when sending messages using the /topics/topicname endpoint, records.value must be encoded in Base64:

{
  "records": [
    {
      "key": "my-key",
      "value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ="
    },
  ]
}

Producer requests must also provide a Content-Type header that corresponds to the embedded data format, for example, Content-Type: application/vnd.kafka.binary.v2+json.

1.6.3. Message format

When sending messages using the /topics endpoint, you enter the message payload in the request body, in the records parameter.

The records parameter can contain any of these optional fields:

  • Message headers

  • Message key

  • Message value

  • Destination partition

Example POST request to /topics
curl -X POST \
  http://localhost:8080/topics/my-topic \
  -H 'content-type: application/vnd.kafka.json.v2+json' \
  -d '{
    "records": [
        {
            "key": "my-key",
            "value": "sales-lead-0001",
            "partition": 2,
            "headers": [
              {
                "key": "key1",
                "value": "QXBhY2hlIEthZmthIGlzIHRoZSBib21iIQ==" (1)
              }
            ]
        }
    ]
}'
  1. The header value in binary format and encoded as Base64.

Please note that if your consumer is configured to use the text embedded data format, the value and key field in the records parameter must be a string and not a JSON object.

1.6.4. Accept headers

After creating a consumer, all subsequent GET requests must provide an Accept header in the following format:

Accept: application/vnd.kafka.EMBEDDED-DATA-FORMAT.v2+json

The EMBEDDED-DATA-FORMAT is either json, binary or text.

For example, when retrieving records for a subscribed consumer using an embedded data format of JSON, include this Accept header:

Accept: application/vnd.kafka.json.v2+json

1.7. CORS

In general, it is not possible for an HTTP client to issue requests across different domains.

For example, suppose the Kafka Bridge you deployed alongside a Kafka cluster is accessible using the http://my-bridge.io domain. HTTP clients can use the URL to interact with the Kafka Bridge and exchange messages through the Kafka cluster. However, your client is running as a web application in the http://my-web-application.io domain. The client (source) domain is different from the Kafka Bridge (target) domain. Because of same-origin policy restrictions, requests from the client fail. You can avoid this situation by using Cross-Origin Resource Sharing (CORS).

CORS allows for simple and preflighted requests between origin sources on different domains.

Simple requests are suitable for standard requests using GET, HEAD, POST methods.

A preflighted request sends a HTTP OPTIONS request as an initial check that the actual request is safe to send. On confirmation, the actual request is sent. Preflight requests are suitable for methods that require greater safeguards, such as PUT and DELETE, and use non-standard headers.

All requests require an origins value in their header, which is the source of the HTTP request.

CORS allows you to specify allowed methods and originating URLs for accessing the Kafka cluster in your Kafka Bridge HTTP configuration.

Example CORS configuration for Kafka Bridge
# ...
http.cors.enabled=true
http.cors.allowedOrigins=http://my-web-application.io
http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH

1.7.1. Simple request

For example, this simple request header specifies the origin as http://my-web-application.io.

Origin: http://my-web-application.io

The header information is added to the request to consume records.

curl -v -X GET HTTP-BRIDGE-ADDRESS/consumers/my-group/instances/my-consumer/records \
-H 'Origin: http://my-web-application.io'\
-H 'content-type: application/vnd.kafka.v2+json'

In the response from the Kafka Bridge, an Access-Control-Allow-Origin header is returned. It contains the list of domains from where HTTP requests can be issued to the bridge.

HTTP/1.1 200 OK
Access-Control-Allow-Origin: * (1)
  1. Returning an asterisk (*) shows the resource can be accessed by any domain.

1.7.2. Preflighted request

An initial preflight request is sent to Kafka Bridge using an OPTIONS method. The HTTP OPTIONS request sends header information to check that Kafka Bridge will allow the actual request.

Here the preflight request checks that a POST request is valid from http://my-web-application.io.

OPTIONS /my-group/instances/my-consumer/subscription HTTP/1.1
Origin: http://my-web-application.io
Access-Control-Request-Method: POST (1)
Access-Control-Request-Headers: Content-Type (2)
  1. Kafka Bridge is alerted that the actual request is a POST request.

  2. The actual request will be sent with a Content-Type header.

OPTIONS is added to the header information of the preflight request.

curl -v -X OPTIONS -H 'Origin: http://my-web-application.io' \
-H 'Access-Control-Request-Method: POST' \
-H 'content-type: application/vnd.kafka.v2+json'

Kafka Bridge responds to the initial request to confirm that the request will be accepted. The response header returns allowed origins, methods and headers.

HTTP/1.1 200 OK
Access-Control-Allow-Origin: http://my-web-application.io
Access-Control-Allow-Methods: GET,POST,PUT,DELETE,OPTIONS,PATCH
Access-Control-Allow-Headers: content-type

If the origin or method is rejected, an error message is returned.

The actual request does not require Access-Control-Request-Method header, as it was confirmed in the preflight request, but it does require the origin header.

curl -v -X POST HTTP-BRIDGE-ADDRESS/topics/bridge-topic \
-H 'Origin: http://my-web-application.io' \
-H 'content-type: application/vnd.kafka.v2+json'

The response shows the originating URL is allowed.

HTTP/1.1 200 OK
Access-Control-Allow-Origin: http://my-web-application.io
Additional resources

1.8. Configuring loggers for the Kafka Bridge

You can set a different log level for each operation that is defined by the Kafka Bridge OpenAPI specification.

Each operation has a corresponding API endpoint through which the bridge receives requests from HTTP clients. You can change the log level on each endpoint to produce more or less fine-grained logging information about the incoming and outgoing HTTP requests.

Loggers are defined in the log4j2.properties file, which has the following default configuration for healthy and ready endpoints:

logger.healthy.name = http.openapi.operation.healthy
logger.healthy.level = WARN
logger.ready.name = http.openapi.operation.ready
logger.ready.level = WARN

The log level of all other operations is set to INFO by default. Loggers are formatted as follows:

logger.<operation_id>.name = http.openapi.operation.<operation_id>
logger.<operation_id>_level = _<LOG_LEVEL>

Where <operation_id> is the identifier of the specific operation.

List of operations defined by the OpenAPI specification
  • createConsumer

  • deleteConsumer

  • subscribe

  • unsubscribe

  • poll

  • assign

  • commit

  • send

  • sendToPartition

  • seekToBeginning

  • seekToEnd

  • seek

  • healthy

  • ready

  • openapi

Where <LOG_LEVEL> is the logging level as defined by log4j2 (i.e. INFO, DEBUG, …​).

2. Kafka Bridge quickstart

Use this quickstart to try out the Kafka Bridge in your local development environment.

You will learn how to do the following:

  • Produce messages to topics and partitions in your Kafka cluster

  • Create a Kafka Bridge consumer

  • Perform basic consumer operations, such as subscribing the consumer to topics and retrieving the messages that you produced

In this quickstart, HTTP requests are formatted as curl commands that you can copy and paste to your terminal.

Ensure you have the prerequisites and then follow the tasks in the order provided in this chapter.

In this quickstart, you will produce and consume messages in JSON format.

Prerequisites for the quickstart
  • A Kafka cluster is running on the host machine.

2.1. Downloading a Kafka Bridge archive

A zipped distribution of the Kafka Bridge is available for download.

Procedure

2.2. Installing the Kafka Bridge

Use the script provided with the Kafka Bridge archive to install the Kafka Bridge. The application.properties file provided with the installation archive provides default configuration settings.

The following default property values configure the Kafka Bridge to listen for requests on port 8080.

Default configuration properties
http.host=0.0.0.0
http.port=8080
Procedure
  1. If you have not already done so, unzip the Kafka Bridge installation archive to any directory.

  2. Run the Kafka Bridge script using the configuration properties as a parameter:

    For example:

    ./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
  3. Check to see that the installation was successful in the log.

    HTTP-Kafka Bridge started and listening on port 8080
    HTTP-Kafka Bridge bootstrap servers localhost:9092

2.3. Producing messages to topics and partitions

Use the Kafka Bridge to produce messages to a Kafka topic in JSON format by using the topics endpoint.

You can produce messages to topics in JSON format by using the topics endpoint. You can specify destination partitions for messages in the request body. The partitions endpoint provides an alternative method for specifying a single destination partition for all messages as a path parameter.

In this procedure, messages are produced to a topic called bridge-quickstart-topic.

Prerequisites
  • The Kafka cluster has a topic with three partitions.

    You can use the kafka-topics.sh utility to create topics.

    Example topic creation with three partitions
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1
    Verifying the topic was created
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topic
Note
If you deployed Strimzi on Kubernetes, you can create a topic using the KafkaTopic custom resource.
Procedure
  1. Using the Kafka Bridge, produce three messages to the topic you created:

    curl -X POST \
      http://localhost:8080/topics/bridge-quickstart-topic \
      -H 'content-type: application/vnd.kafka.json.v2+json' \
      -d '{
        "records": [
            {
                "key": "my-key",
                "value": "sales-lead-0001"
            },
            {
                "value": "sales-lead-0002",
                "partition": 2
            },
            {
                "value": "sales-lead-0003"
            }
        ]
    }'
    • sales-lead-0001 is sent to a partition based on the hash of the key.

    • sales-lead-0002 is sent directly to partition 2.

    • sales-lead-0003 is sent to a partition in the bridge-quickstart-topic topic using a round-robin method.

  2. If the request is successful, the Kafka Bridge returns an offsets array, along with a 200 code and a content-type header of application/vnd.kafka.v2+json. For each message, the offsets array describes:

    • The partition that the message was sent to

    • The current message offset of the partition

      Example response
      #...
      {
        "offsets":[
          {
            "partition":0,
            "offset":0
          },
          {
            "partition":2,
            "offset":0
          },
          {
            "partition":0,
            "offset":1
          }
        ]
      }
Additional topic requests

Make other curl requests to find information on topics and partitions.

List topics
curl -X GET \
  http://localhost:8080/topics
Example response
[
  "__strimzi_store_topic",
  "__strimzi-topic-operator-kstreams-topic-store-changelog",
  "bridge-quickstart-topic",
  "my-topic"
]
Get topic configuration and partition details
curl -X GET \
  http://localhost:8080/topics/bridge-quickstart-topic
Example response
{
  "name": "bridge-quickstart-topic",
  "configs": {
    "compression.type": "producer",
    "leader.replication.throttled.replicas": "",
    "min.insync.replicas": "1",
    "message.downconversion.enable": "true",
    "segment.jitter.ms": "0",
    "cleanup.policy": "delete",
    "flush.ms": "9223372036854775807",
    "follower.replication.throttled.replicas": "",
    "segment.bytes": "1073741824",
    "retention.ms": "604800000",
    "flush.messages": "9223372036854775807",
    "message.format.version": "2.8-IV1",
    "max.compaction.lag.ms": "9223372036854775807",
    "file.delete.delay.ms": "60000",
    "max.message.bytes": "1048588",
    "min.compaction.lag.ms": "0",
    "message.timestamp.type": "CreateTime",
    "preallocate": "false",
    "index.interval.bytes": "4096",
    "min.cleanable.dirty.ratio": "0.5",
    "unclean.leader.election.enable": "false",
    "retention.bytes": "-1",
    "delete.retention.ms": "86400000",
    "segment.ms": "604800000",
    "message.timestamp.difference.max.ms": "9223372036854775807",
    "segment.index.bytes": "10485760"
  },
  "partitions": [
    {
      "partition": 0,
      "leader": 0,
      "replicas": [
        {
          "broker": 0,
          "leader": true,
          "in_sync": true
        },
        {
          "broker": 1,
          "leader": false,
          "in_sync": true
        },
        {
          "broker": 2,
          "leader": false,
          "in_sync": true
        }
      ]
    },
    {
      "partition": 1,
      "leader": 2,
      "replicas": [
        {
          "broker": 2,
          "leader": true,
          "in_sync": true
        },
        {
          "broker": 0,
          "leader": false,
          "in_sync": true
        },
        {
          "broker": 1,
          "leader": false,
          "in_sync": true
        }
      ]
    },
    {
      "partition": 2,
      "leader": 1,
      "replicas": [
        {
          "broker": 1,
          "leader": true,
          "in_sync": true
        },
        {
          "broker": 2,
          "leader": false,
          "in_sync": true
        },
        {
          "broker": 0,
          "leader": false,
          "in_sync": true
        }
      ]
    }
  ]
}
List the partitions of a specific topic
curl -X GET \
  http://localhost:8080/topics/bridge-quickstart-topic/partitions
Example response
[
  {
    "partition": 0,
    "leader": 0,
    "replicas": [
      {
        "broker": 0,
        "leader": true,
        "in_sync": true
      },
      {
        "broker": 1,
        "leader": false,
        "in_sync": true
      },
      {
        "broker": 2,
        "leader": false,
        "in_sync": true
      }
    ]
  },
  {
    "partition": 1,
    "leader": 2,
    "replicas": [
      {
        "broker": 2,
        "leader": true,
        "in_sync": true
      },
      {
        "broker": 0,
        "leader": false,
        "in_sync": true
      },
      {
        "broker": 1,
        "leader": false,
        "in_sync": true
      }
    ]
  },
  {
    "partition": 2,
    "leader": 1,
    "replicas": [
      {
        "broker": 1,
        "leader": true,
        "in_sync": true
      },
      {
        "broker": 2,
        "leader": false,
        "in_sync": true
      },
      {
        "broker": 0,
        "leader": false,
        "in_sync": true
      }
    ]
  }
]
List the details of a specific topic partition
curl -X GET \
  http://localhost:8080/topics/bridge-quickstart-topic/partitions/0
Example response
{
  "partition": 0,
  "leader": 0,
  "replicas": [
    {
      "broker": 0,
      "leader": true,
      "in_sync": true
    },
    {
      "broker": 1,
      "leader": false,
      "in_sync": true
    },
    {
      "broker": 2,
      "leader": false,
      "in_sync": true
    }
  ]
}
List the offsets of a specific topic partition
curl -X GET \
  http://localhost:8080/topics/bridge-quickstart-topic/partitions/0/offsets
Example response
{
  "beginning_offset": 0,
  "end_offset": 1
}
What to do next

After producing messages to topics and partitions, create a Kafka Bridge consumer.

2.4. Creating a Kafka Bridge consumer

Before you can perform any consumer operations in the Kafka cluster, you must first create a consumer by using the consumers endpoint. The consumer is referred to as a Kafka Bridge consumer.

Procedure
  1. Create a Kafka Bridge consumer in a new consumer group named bridge-quickstart-consumer-group:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "name": "bridge-quickstart-consumer",
        "auto.offset.reset": "earliest",
        "format": "json",
        "enable.auto.commit": false,
        "fetch.min.bytes": 512,
        "consumer.request.timeout.ms": 30000
      }'
    • The consumer is named bridge-quickstart-consumer and the embedded data format is set as json.

    • Some basic configuration settings are defined.

    • The consumer will not commit offsets to the log automatically because the enable.auto.commit setting is false. You will commit the offsets manually later in this quickstart.

      If the request is successful, the Kafka Bridge returns the consumer ID (instance_id) and base URL (base_uri) in the response body, along with a 200 code.

      Example response
      #...
      {
        "instance_id": "bridge-quickstart-consumer",
        "base_uri":"http://<bridge_id>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer"
      }
  2. Copy the base URL (base_uri) to use in the other consumer operations in this quickstart.

What to do next

Now that you have created a Kafka Bridge consumer, you can subscribe it to topics.

Additional resources

2.5. Subscribing a Kafka Bridge consumer to topics

After you have created a Kafka Bridge consumer, subscribe it to one or more topics by using the subscription endpoint. When subscribed, the consumer starts receiving all messages that are produced to the topic.

Procedure
  • Subscribe the consumer to the bridge-quickstart-topic topic that you created earlier, in Producing messages to topics and partitions:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "topics": [
            "bridge-quickstart-topic"
        ]
    }'

    The topics array can contain a single topic (as shown here) or multiple topics. If you want to subscribe the consumer to multiple topics that match a regular expression, you can use the topic_pattern string instead of the topics array.

    If the request is successful, the Kafka Bridge returns a 204 (No Content) code only.

When using an Apache Kafka client, the HTTP subscribe operation adds topics to the local consumer’s subscriptions. Joining a consumer group and obtaining partition assignments occur after running multiple HTTP poll operations, starting the partition rebalance and join-group process. It’s important to note that the initial HTTP poll operations may not return any records.

What to do next

After subscribing a Kafka Bridge consumer to topics, you can retrieve messages from the consumer.

2.6. Retrieving the latest messages from a Kafka Bridge consumer

Retrieve the latest messages from the Kafka Bridge consumer by requesting data from the records endpoint. In production, HTTP clients can call this endpoint repeatedly (in a loop).

Procedure
  1. Produce additional messages to the Kafka Bridge consumer, as described in Producing messages to topics and partitions.

  2. Submit a GET request to the records endpoint:

    curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
      -H 'accept: application/vnd.kafka.json.v2+json'

    After creating and subscribing to a Kafka Bridge consumer, a first GET request will return an empty response because the poll operation starts a rebalancing process to assign partitions.

  3. Repeat step two to retrieve messages from the Kafka Bridge consumer.

    The Kafka Bridge returns an array of messages — describing the topic name, key, value, partition, and offset — in the response body, along with a 200 code. Messages are retrieved from the latest offset by default.

    HTTP/1.1 200 OK
    content-type: application/vnd.kafka.json.v2+json
    #...
    [
      {
        "topic":"bridge-quickstart-topic",
        "key":"my-key",
        "value":"sales-lead-0001",
        "partition":0,
        "offset":0
      },
      {
        "topic":"bridge-quickstart-topic",
        "key":null,
        "value":"sales-lead-0003",
        "partition":0,
        "offset":1
      },
    #...
    Note
    If an empty response is returned, produce more records to the consumer as described in Producing messages to topics and partitions, and then try retrieving messages again.
What to do next

After retrieving messages from a Kafka Bridge consumer, try committing offsets to the log.

2.7. Commiting offsets to the log

Use the offsets endpoint to manually commit offsets to the log for all messages received by the Kafka Bridge consumer. This is required because the Kafka Bridge consumer that you created earlier, in Creating a Kafka Bridge consumer, was configured with the enable.auto.commit setting as false.

Procedure
  • Commit offsets to the log for the bridge-quickstart-consumer:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets

    Because no request body is submitted, offsets are committed for all the records that have been received by the consumer. Alternatively, the request body can contain an array of (OffsetCommitSeek) that specifies the topics and partitions that you want to commit offsets for.

    If the request is successful, the Kafka Bridge returns a 204 code only.

What to do next

After committing offsets to the log, try out the endpoints for seeking to offsets.

2.8. Seeking to offsets for a partition

Use the positions endpoints to configure the Kafka Bridge consumer to retrieve messages for a partition from a specific offset, and then from the latest offset. This is referred to in Apache Kafka as a seek operation.

Procedure
  1. Seek to a specific offset for partition 0 of the quickstart-bridge-topic topic:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "offsets": [
            {
                "topic": "bridge-quickstart-topic",
                "partition": 0,
                "offset": 2
            }
        ]
    }'

    If the request is successful, the Kafka Bridge returns a 204 code only.

  2. Submit a GET request to the records endpoint:

    curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
      -H 'accept: application/vnd.kafka.json.v2+json'

    The Kafka Bridge returns messages from the offset that you seeked to.

  3. Restore the default message retrieval behavior by seeking to the last offset for the same partition. This time, use the positions/end endpoint.

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions/end \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "partitions": [
            {
                "topic": "bridge-quickstart-topic",
                "partition": 0
            }
        ]
    }'

    If the request is successful, the Kafka Bridge returns another 204 code.

Note
You can also use the positions/beginning endpoint to seek to the first offset for one or more partitions.
What to do next

In this quickstart, you have used the Kafka Bridge to perform several common operations on a Kafka cluster. You can now delete the Kafka Bridge consumer that you created earlier.

2.9. Deleting a Kafka Bridge consumer

Delete the Kafka Bridge consumer that you used throughout this quickstart.

Procedure
  • Delete the Kafka Bridge consumer by sending a DELETE request to the instances endpoint.

    curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer

    If the request is successful, the Kafka Bridge returns a 204 code.

3. Kafka Bridge configuration

Configure a deployment of the Kafka Bridge using configuration properties. Configure Kafka and specify the HTTP connection details needed to be able to interact with Kafka. You can also use configuration properties to enable and use distributed tracing with the Kafka Bridge. Distributed tracing allows you to track the progress of transactions between applications in a distributed system.

Note
Use the KafkaBridge resource to configure properties when you are running the Kafka Bridge on Kubernetes.

3.1. Configuring Kafka Bridge properties

This procedure describes how to configure the Kafka and HTTP connection properties used by the Kafka Bridge.

You configure the Kafka Bridge, as any other Kafka client, using appropriate prefixes for Kafka-related properties.

  • kafka. for general configuration that applies to producers and consumers, such as server connection and security.

  • kafka.consumer. for consumer-specific configuration passed only to the consumer.

  • kafka.producer. for producer-specific configuration passed only to the producer.

As well as enabling HTTP access to a Kafka cluster, HTTP properties provide the capability to enable and define access control for the Kafka Bridge through Cross-Origin Resource Sharing (CORS). CORS is a HTTP mechanism that allows browser access to selected resources from more than one origin. To configure CORS, you define a list of allowed resource origins and HTTP methods to access them. Additional HTTP headers in requests describe the CORS origins that are permitted access to the Kafka cluster.

Procedure
  1. Edit the application.properties file provided with the Kafka Bridge installation archive.

    Use the properties file to specify Kafka and HTTP-related properties.

    1. Configure standard Kafka-related properties, including properties specific to the Kafka consumers and producers.

      Use:

      • kafka.bootstrap.servers to define the host/port connections to the Kafka cluster

      • kafka.producer.acks to provide acknowledgments to the HTTP client

      • kafka.consumer.auto.offset.reset to determine how to manage reset of the offset in Kafka

        For more information on configuration of Kafka properties, see the Apache Kafka website

    2. Configure HTTP-related properties to enable HTTP access to the Kafka cluster.

      For example:

      bridge.id=my-bridge
      http.host=0.0.0.0
      http.port=8080 (1)
      http.cors.enabled=true (2)
      http.cors.allowedOrigins=https://strimzi.io (3)
      http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH (4)
      1. The default HTTP configuration for the Kafka Bridge to listen on port 8080.

      2. Set to true to enable CORS.

      3. Comma-separated list of allowed CORS origins. You can use a URL or a Java regular expression.

      4. Comma-separated list of allowed HTTP methods for CORS.

  2. Save the configuration file.

3.2. Configuring metrics

Enable metrics for the Kafka Bridge by setting the KAFKA_BRIDGE_METRICS_ENABLED environment variable.

Procedure
  1. Set the environment variable for enabling metrics to true.

    Environment variable for enabling metrics
    KAFKA_BRIDGE_METRICS_ENABLED=true
  2. Run the Kafka Bridge script to enable metrics.

    Running the Kafka Bridge to enable metrics
    ./bin/kafka_bridge_run.sh --config-file=<path>/application.properties

    With metrics enabled, you can use GET /metrics with the /metrics endpoint to retrieve Kafka Bridge metrics in Prometheus format.

3.3. Configuring distributed tracing

Enable distributed tracing to trace messages consumed and produced by the Kafka Bridge, and HTTP requests from client applications.

Properties to enable tracing are present in the application.properties file. To enable distributed tracing, do the following:

  • Set the bridge.tracing property value to enable the tracing you want to use. The only possible value is opentelemetry.

  • Set environment variables for tracing.

With the default configuration, OpenTelemetry tracing uses OTLP as the exporter protocol. By configuring the OTLP endpoint, you can still use a Jaeger backend instance to get traces.

Note
Jaeger has supported the OTLP protocol since version 1.35. Older Jaeger versions cannot get traces using the OTLP protocol.

OpenTelemetry defines an API specification for collecting tracing data as spans of metrics data. Spans represent a specific operation. A trace is a collection of one or more spans.

Traces are generated when the Kafka Bridge does the following:

  • Sends messages from Kafka to consumer HTTP clients

  • Receives messages from producer HTTP clients to send to Kafka

Jaeger implements the required APIs and presents visualizations of the trace data in its user interface for analysis.

To have end-to-end tracing, you must configure tracing in your HTTP clients.

Caution
Strimzi no longer supports OpenTracing. If you were previously using OpenTracing with the bridge.tracing=jaeger option, we encourage you to transition to using OpenTelemetry instead.
Procedure
  1. Edit the application.properties file provided with the Kafka Bridge installation archive.

    Use the bridge.tracing property to enable the tracing you want to use.

    Example configuration to enable OpenTelemetry
    bridge.tracing=opentelemetry # (1)
    1. The property for enabling OpenTelemetry is uncommented by removing the # at the beginning of the line.

      With tracing enabled, you initialize tracing when you run the Kafka Bridge script.

  2. Save the configuration file.

  3. Set the environment variables for tracing.

    Environment variables for OpenTelemetry
    OTEL_SERVICE_NAME=my-tracing-service # (1)
    OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 # (2)
    1. The name of the OpenTelemetry tracer service.

    2. The gRPC-based OTLP endpoint that listens for spans on port 4317.

  4. Run the Kafka Bridge script with the property enabled for tracing.

    Running the Kafka Bridge with OpenTelemetry enabled
    ./bin/kafka_bridge_run.sh --config-file=<path>/application.properties

    The internal consumers and producers of the Kafka Bridge are now enabled for tracing.

3.3.1. Specifying tracing systems with OpenTelemetry

Instead of the default OTLP tracing system, you can specify other tracing systems that are supported by OpenTelemetry.

If you want to use another tracing system with OpenTelemetry, do the following:

  1. Add the library of the tracing system to the Kafka classpath.

  2. Add the name of the tracing system as an additional exporter environment variable.

    Additional environment variable when not using OTLP
    OTEL_SERVICE_NAME=my-tracing-service
    OTEL_TRACES_EXPORTER=zipkin # (1)
    OTEL_EXPORTER_ZIPKIN_ENDPOINT=http://localhost:9411/api/v2/spans (2)
    1. The name of the tracing system. In this example, Zipkin is specified.

    2. The endpoint of the specific selected exporter that listens for spans. In this example, a Zipkin endpoint is specified.

3.3.2. Supported Span attributes

The Kafka Bridge adds, in addition to the standard OpenTelemetry attributes, the following attributes from the OpenTelemetry standard conventions for HTTP to its spans.

Attribute key

Attribute value

peer.service

Hardcoded to kafka

http.request.method

The http method used to make the request

url.scheme

The URI scheme component

url.path

The URI path component

url.query

The URI query component

messaging.destination.name

The name of the Kafka topic being produced to or read from

messaging.system

Hardcoded to kafka

http.response.status_code

ok for http responses between 200 and 300. error for all other status codes

Additional resources

4. Kafka Bridge API Reference

4.1. Introduction

The Kafka Bridge provides a REST API for integrating HTTP based client applications with a Kafka cluster. You can use the API to create and manage consumers and send and receive records over HTTP rather than the native Kafka protocol.

4.2. Endpoints

4.2.1. Consumers

assign

POST /consumers/{groupid}/instances/{name}/assignments

Description

Assigns one or more topic partitions to a consumer.

Parameters
Table 1. Path Parameters
Name Description Required Default Pattern

groupid

ID of the consumer group to which the consumer belongs.

X

null

name

Name of the consumer to assign topic partitions to.

X

null

Table 2. Body Parameter
Name Description Required Default Pattern

Partitions

List of topic partitions to assign to the consumer. Partitions Partitions

X

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 3. HTTP Response Codes
Code Message Datatype

204

Partitions assigned successfully.

<<>>

404

The specified consumer instance was not found.

Error Error

409

Subscriptions to topics, partitions, and patterns are mutually exclusive.

Error Error

Example HTTP request
Request body
{
  "partitions" : [ {
    "topic" : "topic",
    "partition" : 0
  }, {
    "topic" : "topic",
    "partition" : 1
  } ]
}
Example HTTP response
Response 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
Response 409
{
  "error_code" : 409,
  "message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive."
}
commit

POST /consumers/{groupid}/instances/{name}/offsets

Description

Commits a list of consumer offsets. To commit offsets for all records fetched by the consumer, leave the request body empty.

Parameters
Table 4. Path Parameters
Name Description Required Default Pattern

groupid

ID of the consumer group to which the consumer belongs.

X

null

name

Name of the consumer.

X

null

Table 5. Body Parameter
Name Description Required Default Pattern

OffsetCommitSeekList

List of consumer offsets to commit to the consumer offsets commit log. You can specify one or more topic partitions to commit offsets for. OffsetCommitSeekList OffsetCommitSeekList

-

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 6. HTTP Response Codes
Code Message Datatype

204

Commit made successfully.

<<>>

404

The specified consumer instance was not found.

Error Error

Example HTTP request
Request body
{
  "offsets" : [ {
    "topic" : "topic",
    "partition" : 0,
    "offset" : 15
  }, {
    "topic" : "topic",
    "partition" : 1,
    "offset" : 42
  } ]
}
Example HTTP response
Response 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
createConsumer

POST /consumers/{groupid}

Description

Creates a consumer instance in the given consumer group. You can optionally specify a consumer name and supported configuration options. It returns a base URI which must be used to construct URLs for subsequent requests against this consumer instance.

Parameters
Table 7. Path Parameters
Name Description Required Default Pattern

groupid

ID of the consumer group in which to create the consumer.

X

null

Table 8. Body Parameter
Name Description Required Default Pattern

Consumer

Name and configuration of the consumer. The name is unique within the scope of the consumer group. If a name is not specified, a randomly generated name is assigned. All parameters are optional. The supported configuration options are shown in the following example. Consumer Consumer

-

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 9. HTTP Response Codes
Code Message Datatype

200

Consumer created successfully.

CreatedConsumer CreatedConsumer

409

A consumer instance with the specified name already exists in the Kafka Bridge.

Error Error

422

One or more consumer configuration options have invalid values.

Error Error

Example HTTP request
Request body
{
  "name" : "consumer1",
  "format" : "binary",
  "auto.offset.reset" : "earliest",
  "enable.auto.commit" : false,
  "fetch.min.bytes" : 512,
  "consumer.request.timeout.ms" : 30000,
  "isolation.level" : "read_committed"
}
Example HTTP response
Response 200
{
  "instance_id" : "consumer1",
  "base_uri" : "http://localhost:8080/consumers/my-group/instances/consumer1"
}
Response 409
{
  "error_code" : 409,
  "message" : "A consumer instance with the specified name already exists in the Kafka Bridge."
}
Response 422
{
  "error_code" : 422,
  "message" : "One or more consumer configuration options have invalid values."
}
deleteConsumer

DELETE /consumers/{groupid}/instances/{name}

Description

Deletes a specified consumer instance. The request for this operation MUST use the base URL (including the host and port) returned in the response from the POST request to /consumers/{groupid} that was used to create this consumer.

Parameters
Table 10. Path Parameters
Name Description Required Default Pattern

groupid

ID of the consumer group to which the consumer belongs.

X

null

name

Name of the consumer to delete.

X

null

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 11. HTTP Response Codes
Code Message Datatype

204

Consumer removed successfully.

<<>>

404

The specified consumer instance was not found.

Error Error

Example HTTP response
Response 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
listSubscriptions

GET /consumers/{groupid}/instances/{name}/subscription

Description

Retrieves a list of the topics to which the consumer is subscribed.

Parameters
Table 12. Path Parameters
Name Description Required Default Pattern

groupid

ID of the consumer group to which the subscribed consumer belongs.

X

null

name

Name of the subscribed consumer.

X

null

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 13. HTTP Response Codes
Code Message Datatype

200

List of subscribed topics and partitions.

SubscribedTopicList SubscribedTopicList

404

The specified consumer instance was not found.

Error Error

Example HTTP response
Response 200
{
  "topics" : [ "my-topic1", "my-topic2" ],
  "partitions" : [ {
    "my-topic1" : [ 1, 2, 3 ]
  }, {
    "my-topic2" : [ 1 ]
  } ]
}
Response 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
poll

GET /consumers/{groupid}/instances/{name}/records

Description

Retrieves records for a subscribed consumer, including message values, topics, and partitions. The request for this operation MUST use the base URL (including the host and port) returned in the response from the POST request to /consumers/{groupid} that was used to create this consumer.

Parameters
Table 14. Path Parameters
Name Description Required Default Pattern

groupid

ID of the consumer group to which the subscribed consumer belongs.

X

null

name

Name of the subscribed consumer to retrieve records from.

X

null

Table 15. Query Parameters
Name Description Required Default Pattern

timeout

The maximum amount of time, in milliseconds, that the HTTP Bridge spends retrieving records before timing out the request.

-

null

max_bytes

The maximum size, in bytes, of unencoded keys and values that can be included in the response. Otherwise, an error response with code 422 is returned.

-

null

Content Type
  • application/vnd.kafka.json.v2+json

  • application/vnd.kafka.binary.v2+json

  • application/vnd.kafka.text.v2+json

  • application/vnd.kafka.v2+json

Responses
Table 16. HTTP Response Codes
Code Message Datatype

200

Poll request executed successfully.

List[ConsumerRecord ConsumerRecord]

404

The specified consumer instance was not found.

Error Error

406

The `format` used in the consumer creation request does not match the embedded format in the Accept header of this request or the bridge got a message from the topic which is not JSON encoded.

Error Error

422

Response exceeds the maximum number of bytes the consumer can receive

Error Error

Example HTTP response
Response 200
[ {
  "topic" : "topic",
  "key" : "key1",
  "value" : {
    "foo" : "bar"
  },
  "partition" : 0,
  "offset" : 2
}, {
  "topic" : "topic",
  "key" : "key2",
  "value" : [ "foo2", "bar2" ],
  "partition" : 1,
  "offset" : 3
} ]
[
  {
    "topic": "test",
    "key": "a2V5",
    "value": "Y29uZmx1ZW50",
    "partition": 1,
    "offset": 100,
  },
  {
    "topic": "test",
    "key": "a2V5",
    "value": "a2Fma2E=",
    "partition": 2,
    "offset": 101,
  }
]
Response 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
Response 406
{
  "error_code" : 406,
  "message" : "The `format` used in the consumer creation request does not match the embedded format in the Accept header of this request."
}
Response 422
{
  "error_code" : 422,
  "message" : "Response exceeds the maximum number of bytes the consumer can receive"
}
seek

POST /consumers/{groupid}/instances/{name}/positions

Description

Configures a subscribed consumer to fetch offsets from a particular offset the next time it fetches a set of records from a given topic partition. This overrides the default fetch behavior for consumers. You can specify one or more topic partitions.

Parameters
Table 17. Path Parameters
Name Description Required Default Pattern

groupid

ID of the consumer group to which the consumer belongs.

X

null

name

Name of the subscribed consumer.

X

null

Table 18. Body Parameter
Name Description Required Default Pattern

OffsetCommitSeekList

List of partition offsets from which the subscribed consumer will next fetch records. OffsetCommitSeekList OffsetCommitSeekList

X

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 19. HTTP Response Codes
Code Message Datatype

204

Seek performed successfully.

<<>>

404

The specified consumer instance was not found, or the specified consumer instance did not have one of the specified partitions assigned.

Error Error

Example HTTP request
Request body
{
  "offsets" : [ {
    "topic" : "topic",
    "partition" : 0,
    "offset" : 15
  }, {
    "topic" : "topic",
    "partition" : 1,
    "offset" : 42
  } ]
}
Example HTTP response
Response 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
seekToBeginning

POST /consumers/{groupid}/instances/{name}/positions/beginning

Description

Configures a subscribed consumer to seek (and subsequently read from) the first offset in one or more given topic partitions.

Parameters
Table 20. Path Parameters
Name Description Required Default Pattern

groupid

ID of the consumer group to which the subscribed consumer belongs.

X

null

name

Name of the subscribed consumer.

X

null

Table 21. Body Parameter
Name Description Required Default Pattern

Partitions

List of topic partitions to which the consumer is subscribed. The consumer will seek the first offset in the specified partitions. Partitions Partitions

X

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 22. HTTP Response Codes
Code Message Datatype

204

Seek to the beginning performed successfully.

<<>>

404

The specified consumer instance was not found, or the specified consumer instance did not have one of the specified partitions assigned.

Error Error

Example HTTP request
Request body
{
  "partitions" : [ {
    "topic" : "topic",
    "partition" : 0
  }, {
    "topic" : "topic",
    "partition" : 1
  } ]
}
Example HTTP response
Response 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
seekToEnd

POST /consumers/{groupid}/instances/{name}/positions/end

Description

Configures a subscribed consumer to seek (and subsequently read from) the offset at the end of one or more of the given topic partitions.

Parameters
Table 23. Path Parameters
Name Description Required Default Pattern

groupid

ID of the consumer group to which the subscribed consumer belongs.

X

null

name

Name of the subscribed consumer.

X

null

Table 24. Body Parameter
Name Description Required Default Pattern

Partitions

List of topic partitions to which the consumer is subscribed. The consumer will seek the last offset in the specified partitions. Partitions Partitions

X

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 25. HTTP Response Codes
Code Message Datatype

204

Seek to the end performed successfully.

<<>>

404

The specified consumer instance was not found, or the specified consumer instance did not have one of the specified partitions assigned.

Error Error

Example HTTP request
Request body
{
  "partitions" : [ {
    "topic" : "topic",
    "partition" : 0
  }, {
    "topic" : "topic",
    "partition" : 1
  } ]
}
Example HTTP response
Response 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
Example HTTP request
Request body
{
  "topics" : [ "topic1", "topic2" ]
}
Example HTTP response
Response 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
Response 409
{
  "error_code" : 409,
  "message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive."
}
Response 422
{
  "error_code" : 422,
  "message" : "A list (of Topics type) or a topic_pattern must be specified."
}
unsubscribe

DELETE /consumers/{groupid}/instances/{name}/subscription

Description

Unsubscribes a consumer from all topics.

Parameters
Table 29. Path Parameters
Name Description Required Default Pattern

groupid

ID of the consumer group to which the subscribed consumer belongs.

X

null

name

Name of the consumer to unsubscribe from topics.

X

null

Content Type
  • application/json

Responses
Table 30. HTTP Response Codes
Code Message Datatype

204

Consumer unsubscribed successfully.

<<>>

404

The specified consumer instance was not found.

Error Error

Example HTTP response
Response 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}

4.2.2. Default

healthy

GET /healthy

Description

Check if the bridge is running. This does not necessarily imply that it is ready to accept requests.

Responses
Table 31. HTTP Response Codes
Code Message Datatype

204

The bridge is healthy

<<>>

500

The bridge is not healthy

<<>>

info

GET /

Description

Retrieves information about the Kafka Bridge instance, in JSON format.

Content Type
  • application/json

Responses
Table 32. HTTP Response Codes
Code Message Datatype

200

Information about Kafka Bridge instance.

BridgeInfo BridgeInfo

Example HTTP response
Response 200
{
  "bridge_version" : 0.31.0-SNAPSHOT
}
metrics

GET /metrics

Description

Retrieves the bridge metrics in Prometheus format.

Content Type
  • text/plain

Responses
Table 33. HTTP Response Codes
Code Message Datatype

200

Metrics in Prometheus format retrieved successfully.

[String]

openapi

GET /openapi

Description

Retrieves the OpenAPI v2 specification in JSON format.

Content Type
  • application/json

Responses
Table 34. HTTP Response Codes
Code Message Datatype

200

OpenAPI v2 specification in JSON format retrieved successfully.

[String]

openapiv2

GET /openapi/v2

Description

Retrieves the OpenAPI v2 specification in JSON format.

Content Type
  • application/json

Responses
Table 35. HTTP Response Codes
Code Message Datatype

200

OpenAPI v2 specification in JSON format retrieved successfully.

[String]

openapiv3

GET /openapi/v3

Description

Retrieves the OpenAPI v3 specification in JSON format.

Content Type
  • application/json

Responses
Table 36. HTTP Response Codes
Code Message Datatype

200

OpenAPI v3 specification in JSON format retrieved successfully.

[String]

ready

GET /ready

Description

Check if the bridge is ready and can accept requests.

Responses
Table 37. HTTP Response Codes
Code Message Datatype

204

The bridge is ready

<<>>

500

The bridge is not ready

<<>>

4.2.3. Producer

send

POST /topics/{topicname}

Description

Sends one or more records to a given topic, optionally specifying a partition, key, or both.

Parameters
Table 38. Path Parameters
Name Description Required Default Pattern

topicname

Name of the topic to send records to or retrieve metadata from.

X

null

Table 39. Body Parameter
Name Description Required Default Pattern

ProducerRecordList

ProducerRecordList ProducerRecordList

X

Table 40. Query Parameters
Name Description Required Default Pattern

async

Ignore metadata as result of the sending operation, not returning them to the client. If not specified it is false, metadata returned.

-

null

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 41. HTTP Response Codes
Code Message Datatype

200

Records sent successfully.

OffsetRecordSentList OffsetRecordSentList

404

The specified topic was not found.

Error Error

422

The record list is not valid.

Error Error

Example HTTP request
Request body
{
  "records" : [ {
    "key" : "key1",
    "value" : "value1"
  }, {
    "value" : "value2",
    "partition" : 1
  }, {
    "value" : "value3"
  } ]
}
Example HTTP response
Response 200
{
  "offsets" : [ {
    "partition" : 2,
    "offset" : 0
  }, {
    "partition" : 1,
    "offset" : 1
  }, {
    "partition" : 2,
    "offset" : 2
  } ]
}
Response 404
{
  "error_code" : 404,
  "message" : "The specified topic was not found."
}
Response 422
{
  "error_code" : 422,
  "message" : "The record list contains invalid records."
}
sendToPartition

POST /topics/{topicname}/partitions/{partitionid}

Description

Sends one or more records to a given topic partition, optionally specifying a key.

Parameters
Table 42. Path Parameters
Name Description Required Default Pattern

topicname

Name of the topic to send records to or retrieve metadata from.

X

null

partitionid

ID of the partition to send records to or retrieve metadata from.

X

null

Table 43. Body Parameter
Name Description Required Default Pattern

ProducerRecordToPartitionList

List of records to send to a given topic partition, including a value (required) and a key (optional). ProducerRecordToPartitionList ProducerRecordToPartitionList

X

Table 44. Query Parameters
Name Description Required Default Pattern

async

Whether to return immediately upon sending records, instead of waiting for metadata. No offsets will be returned if specified. Defaults to false.

-

null

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 45. HTTP Response Codes
Code Message Datatype

200

Records sent successfully.

OffsetRecordSentList OffsetRecordSentList

404

The specified topic partition was not found.

Error Error

422

The record is not valid.

Error Error

Example HTTP request
Request body
{
  "records" : [ {
    "key" : "key1",
    "value" : "value1"
  }, {
    "value" : "value2"
  } ]
}
Example HTTP response
Response 200
{
  "offsets" : [ {
    "partition" : 2,
    "offset" : 0
  }, {
    "partition" : 1,
    "offset" : 1
  }, {
    "partition" : 2,
    "offset" : 2
  } ]
}
Response 404
{
  "error_code" : 404,
  "message" : "The specified topic partition was not found."
}
Response 422
{
  "error_code" : 422,
  "message" : "The record is not valid."
}

4.2.4. Seek

seek

POST /consumers/{groupid}/instances/{name}/positions

Description

Configures a subscribed consumer to fetch offsets from a particular offset the next time it fetches a set of records from a given topic partition. This overrides the default fetch behavior for consumers. You can specify one or more topic partitions.

Parameters
Table 46. Path Parameters
Name Description Required Default Pattern

groupid

ID of the consumer group to which the consumer belongs.

X

null

name

Name of the subscribed consumer.

X

null

Table 47. Body Parameter
Name Description Required Default Pattern

OffsetCommitSeekList

List of partition offsets from which the subscribed consumer will next fetch records. OffsetCommitSeekList OffsetCommitSeekList

X

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 48. HTTP Response Codes
Code Message Datatype

204

Seek performed successfully.

<<>>

404

The specified consumer instance was not found, or the specified consumer instance did not have one of the specified partitions assigned.

Error Error

Example HTTP request
Request body
{
  "offsets" : [ {
    "topic" : "topic",
    "partition" : 0,
    "offset" : 15
  }, {
    "topic" : "topic",
    "partition" : 1,
    "offset" : 42
  } ]
}
Example HTTP response
Response 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
seekToBeginning

POST /consumers/{groupid}/instances/{name}/positions/beginning

Description

Configures a subscribed consumer to seek (and subsequently read from) the first offset in one or more given topic partitions.

Parameters
Table 49. Path Parameters
Name Description Required Default Pattern

groupid

ID of the consumer group to which the subscribed consumer belongs.

X

null

name

Name of the subscribed consumer.

X

null

Table 50. Body Parameter
Name Description Required Default Pattern

Partitions

List of topic partitions to which the consumer is subscribed. The consumer will seek the first offset in the specified partitions. Partitions Partitions

X

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 51. HTTP Response Codes
Code Message Datatype

204

Seek to the beginning performed successfully.

<<>>

404

The specified consumer instance was not found, or the specified consumer instance did not have one of the specified partitions assigned.

Error Error

Example HTTP request
Request body
{
  "partitions" : [ {
    "topic" : "topic",
    "partition" : 0
  }, {
    "topic" : "topic",
    "partition" : 1
  } ]
}
Example HTTP response
Response 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
seekToEnd

POST /consumers/{groupid}/instances/{name}/positions/end

Description

Configures a subscribed consumer to seek (and subsequently read from) the offset at the end of one or more of the given topic partitions.

Parameters
Table 52. Path Parameters
Name Description Required Default Pattern

groupid

ID of the consumer group to which the subscribed consumer belongs.

X

null

name

Name of the subscribed consumer.

X

null

Table 53. Body Parameter
Name Description Required Default Pattern

Partitions

List of topic partitions to which the consumer is subscribed. The consumer will seek the last offset in the specified partitions. Partitions Partitions

X

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 54. HTTP Response Codes
Code Message Datatype

204

Seek to the end performed successfully.

<<>>

404

The specified consumer instance was not found, or the specified consumer instance did not have one of the specified partitions assigned.

Error Error

Example HTTP request
Request body
{
  "partitions" : [ {
    "topic" : "topic",
    "partition" : 0
  }, {
    "topic" : "topic",
    "partition" : 1
  } ]
}
Example HTTP response
Response 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}

4.2.5. Topics

getOffsets

GET /topics/{topicname}/partitions/{partitionid}/offsets

Description

Retrieves a summary of the offsets for the topic partition.

Parameters
Table 55. Path Parameters
Name Description Required Default Pattern

topicname

Name of the topic containing the partition.

X

null

partitionid

ID of the partition.

X

null

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 56. HTTP Response Codes
Code Message Datatype

200

A summary of the offsets of the topic partition.

OffsetsSummary OffsetsSummary

404

The specified topic partition was not found.

Error Error

Example HTTP response
Response 200
{
  "beginning_offset" : 10,
  "end_offset" : 50
}
Response 404
{
  "error_code" : 404,
  "message" : "The specified topic partition was not found."
}
getPartition

GET /topics/{topicname}/partitions/{partitionid}

Description

Retrieves partition metadata for the topic partition.

Parameters
Table 57. Path Parameters
Name Description Required Default Pattern

topicname

Name of the topic to send records to or retrieve metadata from.

X

null

partitionid

ID of the partition to send records to or retrieve metadata from.

X

null

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 58. HTTP Response Codes
Code Message Datatype

200

Partition metadata

PartitionMetadata PartitionMetadata

404

The specified partition was not found.

Error Error

Example HTTP response
Response 200
{
  "partition" : 1,
  "leader" : 1,
  "replicas" : [ {
    "broker" : 1,
    "leader" : true,
    "in_sync" : true
  }, {
    "broker" : 2,
    "leader" : false,
    "in_sync" : true
  } ]
}
Response 404
{
  "error_code" : 404,
  "message" : "The specified topic partition was not found."
}
getTopic

GET /topics/{topicname}

Description

Retrieves the metadata about a given topic.

Parameters
Table 59. Path Parameters
Name Description Required Default Pattern

topicname

Name of the topic to send records to or retrieve metadata from.

X

null

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 60. HTTP Response Codes
Code Message Datatype

200

Topic metadata

TopicMetadata TopicMetadata

404

The specified topic was not found.

Error Error

Example HTTP response
Response 200
{
  "name" : "topic",
  "offset" : 2,
  "configs" : {
    "cleanup.policy" : "compact"
  },
  "partitions" : [ {
    "partition" : 1,
    "leader" : 1,
    "replicas" : [ {
      "broker" : 1,
      "leader" : true,
      "in_sync" : true
    }, {
      "broker" : 2,
      "leader" : false,
      "in_sync" : true
    } ]
  }, {
    "partition" : 2,
    "leader" : 2,
    "replicas" : [ {
      "broker" : 1,
      "leader" : false,
      "in_sync" : true
    }, {
      "broker" : 2,
      "leader" : true,
      "in_sync" : true
    } ]
  } ]
}
listPartitions

GET /topics/{topicname}/partitions

Description

Retrieves a list of partitions for the topic.

Parameters
Table 61. Path Parameters
Name Description Required Default Pattern

topicname

Name of the topic to send records to or retrieve metadata from.

X

null

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 62. HTTP Response Codes
Code Message Datatype

200

List of partitions.

List[PartitionMetadata PartitionMetadata]

404

The specified topic was not found.

Error Error

Example HTTP response
Response 200
[ {
  "partition" : 1,
  "leader" : 1,
  "replicas" : [ {
    "broker" : 1,
    "leader" : true,
    "in_sync" : true
  }, {
    "broker" : 2,
    "leader" : false,
    "in_sync" : true
  } ]
}, {
  "partition" : 2,
  "leader" : 2,
  "replicas" : [ {
    "broker" : 1,
    "leader" : false,
    "in_sync" : true
  }, {
    "broker" : 2,
    "leader" : true,
    "in_sync" : true
  } ]
} ]
Response 404
{
  "error_code" : 404,
  "message" : "The specified topic was not found."
}
listTopics

GET /topics

Description

Retrieves a list of all topics.

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 63. HTTP Response Codes
Code Message Datatype

200

List of topics.

List[[string]]

Example HTTP response
Response 200
[ "topic1", "topic2" ]
send

POST /topics/{topicname}

Description

Sends one or more records to a given topic, optionally specifying a partition, key, or both.

Parameters
Table 64. Path Parameters
Name Description Required Default Pattern

topicname

Name of the topic to send records to or retrieve metadata from.

X

null

Table 65. Body Parameter
Name Description Required Default Pattern

ProducerRecordList

ProducerRecordList ProducerRecordList

X

Table 66. Query Parameters
Name Description Required Default Pattern

async

Ignore metadata as result of the sending operation, not returning them to the client. If not specified it is false, metadata returned.

-

null

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 67. HTTP Response Codes
Code Message Datatype

200

Records sent successfully.

OffsetRecordSentList OffsetRecordSentList

404

The specified topic was not found.

Error Error

422

The record list is not valid.

Error Error

Example HTTP request
Request body
{
  "records" : [ {
    "key" : "key1",
    "value" : "value1"
  }, {
    "value" : "value2",
    "partition" : 1
  }, {
    "value" : "value3"
  } ]
}
Example HTTP response
Response 200
{
  "offsets" : [ {
    "partition" : 2,
    "offset" : 0
  }, {
    "partition" : 1,
    "offset" : 1
  }, {
    "partition" : 2,
    "offset" : 2
  } ]
}
Response 404
{
  "error_code" : 404,
  "message" : "The specified topic was not found."
}
Response 422
{
  "error_code" : 422,
  "message" : "The record list contains invalid records."
}
sendToPartition

POST /topics/{topicname}/partitions/{partitionid}

Description

Sends one or more records to a given topic partition, optionally specifying a key.

Parameters
Table 68. Path Parameters
Name Description Required Default Pattern

topicname

Name of the topic to send records to or retrieve metadata from.

X

null

partitionid

ID of the partition to send records to or retrieve metadata from.

X

null

Table 69. Body Parameter
Name Description Required Default Pattern

ProducerRecordToPartitionList

List of records to send to a given topic partition, including a value (required) and a key (optional). ProducerRecordToPartitionList ProducerRecordToPartitionList

X

Table 70. Query Parameters
Name Description Required Default Pattern

async

Whether to return immediately upon sending records, instead of waiting for metadata. No offsets will be returned if specified. Defaults to false.

-

null

Content Type
  • application/vnd.kafka.v2+json

Responses
Table 71. HTTP Response Codes
Code Message Datatype

200

Records sent successfully.

OffsetRecordSentList OffsetRecordSentList

404

The specified topic partition was not found.

Error Error

422

The record is not valid.

Error Error

Example HTTP request
Request body
{
  "records" : [ {
    "key" : "key1",
    "value" : "value1"
  }, {
    "value" : "value2"
  } ]
}
Example HTTP response
Response 200
{
  "offsets" : [ {
    "partition" : 2,
    "offset" : 0
  }, {
    "partition" : 1,
    "offset" : 1
  }, {
    "partition" : 2,
    "offset" : 2
  } ]
}
Response 404
{
  "error_code" : 404,
  "message" : "The specified topic partition was not found."
}
Response 422
{
  "error_code" : 422,
  "message" : "The record is not valid."
}

4.3. Models

4.3.1. BridgeInfo BridgeInfo

Information about Kafka Bridge instance.

Field Name Required Nullable Type Description Format

bridge_version

String

4.3.2. Consumer Consumer

Field Name Required Nullable Type Description Format

name

String

The unique name for the consumer instance. The name is unique within the scope of the consumer group. The name is used in URLs. If a name is not specified, a randomly generated name is assigned.

format

String

The allowable message format for the consumer, which can be `binary` (default) or `json`. The messages are converted into a JSON format.

auto.offset.reset

String

Resets the offset position for the consumer. If set to `latest` (default), messages are read from the latest offset. If set to `earliest`, messages are read from the first offset.

fetch.min.bytes

Integer

Sets the minimum amount of data, in bytes, for the consumer to receive. The broker waits until the data to send exceeds this amount. Default is `1` byte.

consumer.request.timeout.ms

Integer

Sets the maximum amount of time, in milliseconds, for the consumer to wait for messages for a request. If the timeout period is reached without a response, an error is returned. Default is `30000` (30 seconds).

enable.auto.commit

Boolean

If set to `true` (default), message offsets are committed automatically for the consumer. If set to `false`, message offsets must be committed manually.

isolation.level

String

If set to `read_uncommitted` (default), all transaction records are retrieved, indpendent of any transaction outcome. If set to `read_committed`, the records from committed transactions are retrieved.

4.3.3. ConsumerRecord ConsumerRecord

Field Name Required Nullable Type Description Format

key

RecordKey RecordKey

offset

Long

int64

partition

Integer

int32

topic

String

value

X

RecordValue RecordValue

headers

List of KafkaHeader KafkaHeader

timestamp

Long

int64

4.3.4. CreatedConsumer CreatedConsumer

Field Name Required Nullable Type Description Format

instance_id

String

Unique ID for the consumer instance in the group.

base_uri

String

Base URI used to construct URIs for subsequent requests against this consumer instance.

4.3.5. Error Error

Field Name Required Nullable Type Description Format

error_code

Integer

int32

message

String

4.3.6. KafkaHeader KafkaHeader

Field Name Required Nullable Type Description Format

key

X

String

value

X

byte[]

The header value in binary format, base64-encoded

byte

4.3.7. OffsetCommitSeek OffsetCommitSeek

Field Name Required Nullable Type Description Format

partition

X

Integer

int32

offset

X

Long

int64

topic

X

String

4.3.8. OffsetCommitSeekList OffsetCommitSeekList

Field Name Required Nullable Type Description Format

offsets

List of OffsetCommitSeek OffsetCommitSeek

4.3.9. OffsetRecordSent OffsetRecordSent

Field Name Required Nullable Type Description Format

partition

Integer

int32

offset

Long

int64

4.3.10. OffsetRecordSentList OffsetRecordSentList

Field Name Required Nullable Type Description Format

offsets

List of [OffsetRecordSentList_offsets_inner]

4.3.11. OffsetRecordSentListOffsetsInner

Field Name Required Nullable Type Description Format

partition

Integer

int32

offset

Long

int64

error_code

Integer

int32

message

String

4.3.12. OffsetsSummary OffsetsSummary

Field Name Required Nullable Type Description Format

beginning_offset

Long

int64

end_offset

Long

int64

4.3.13. Partition Partition

Field Name Required Nullable Type Description Format

partition

Integer

int32

topic

String

4.3.14. PartitionMetadata PartitionMetadata

4.3.15. Partitions Partitions

Field Name Required Nullable Type Description Format

partitions

List of Partition Partition

4.3.16. ProducerRecord ProducerRecord

Field Name Required Nullable Type Description Format

partition

Integer

int32

timestamp

Long

int64

value

X

X

RecordValue RecordValue

key

RecordKey RecordKey

headers

List of KafkaHeader KafkaHeader

4.3.17. ProducerRecordList ProducerRecordList

Field Name Required Nullable Type Description Format

records

List of ProducerRecord ProducerRecord

4.3.18. ProducerRecordToPartition ProducerRecordToPartition

Field Name Required Nullable Type Description Format

value

X

X

RecordValue RecordValue

key

RecordKey RecordKey

headers

List of KafkaHeader KafkaHeader

4.3.20. RecordKey RecordKey

Key representation for a record. It can be an array, a JSON object or a string

Field Name Required Nullable Type Description Format

4.3.21. RecordValue RecordValue

Value representation for a record. It can be an array, a JSON object or a string

Field Name Required Nullable Type Description Format

4.3.22. Replica Replica

Field Name Required Nullable Type Description Format

broker

Integer

int32

leader

Boolean

in_sync

Boolean

4.3.23. SubscribedTopicList SubscribedTopicList

Field Name Required Nullable Type Description Format

topics

Topics Topics

partitions

List of [map]

int32

4.3.24. TopicMetadata TopicMetadata

4.3.25. Topics Topics

Field Name Required Nullable Type Description Format

topics

List of [string]

topic_pattern

String

A regex topic pattern for matching multiple topics

Revised on 2024-11-15 09:45:29 UTC