HTTP Sink Connector for Confluent Cloud

The Kafka Connect HTTP Sink Connector for Confluent Cloud integrates Apache Kafka® with an API via HTTP or HTTPS.

The connector consumes records from Kafka topic(s) and converts each record value to a String or a JSON with before sending it in the request body to the configured http.api.url, which optionally can reference the record key and/or topic name. The targeted API must support either a POST or PUT request.

The connector batches records up to the set Batch max size (batch.max.size) before sending the batched request to the API. Each record is converted to its String representation or its JSON representation with Request Body Format (request.body.format=json) and then separated with the Batch separator (batch.separator). See Configuration Properties for configuration property descriptions.

The HTTP Sink Connector supports connecting to APIs using SSL along with Basic Authentication, OAuth2, or a Proxy Authentication Server.

Important

If you are still on Confluent Cloud Enterprise, please contact your Confluent Account Executive for more information about using this connector.

Features

The HTTP Sink connector supports the following features:

  • At least once delivery: This connector guarantees that records from the Kafka topic are delivered at least once.

  • Supports multiple tasks: The connector supports running one or more tasks. More tasks may improve performance (that is, consumer lag is reduced with multiple tasks running).

  • Automatically creates topics: The following three topics are automatically created when the connector starts:

    The suffix for each topic name is the connector’s logical ID. In the example below, there are the three connector topics and one pre-existing Kafka topic named pageviews.

    HTTP Sink Connector Topics

    Connector Topics

    If the records sent to the topic are not in the correct format, or if important fields are missing in the record, the errors are recorded in the error topic, and the connector continues to run.

  • Supported data formats: The connector supports Avro, JSON Schema (JSON-SR), Protobuf, JSON (schemaless), and Bytes formats. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf).

  • Regex Replacements: The connector can take a number of regex patterns and replacement strings that are applied to a record before it is submitted to the destination API. To do this, the connector uses the configuration options regex.patterns, regex.replacements, and regex.separator.

  • Supports Batching: The connector batches requests submitted to HTTP APIs for efficiency. Batches can be built with the configuration options batch.prefix, batch.suffix and batch.separator. All regex options apply when batching and are applied to individual records before being submitted to the batch.

For configuration property values and descriptions, see Configuration Properties. For additional information, refer to Cloud connector limitations.

Quick Start

Use this quick start to get up and running with the Confluent Cloud HTTP Sink connector. The quick start provides the basics of selecting the connector and configuring it to stream events to an HTTP endpoint.

Prerequsites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud Platform (GCP).
  • The Confluent Cloud CLI installed and configured for the cluster. See Install and Configure the Confluent Cloud CLI.
  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  • At least one source Kafka topic must exist in your Confluent Cloud cluster before creating the sink connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Apache Kafka using Confluent Cloud for installation instructions.

Step 2: Add a connector.

Click Connectors. If you already have connectors in your cluster, click Add connector.

Step 3: Select your connector.

Click the HTTP Sink connector icon.

HTTP Sink Connector Icon

Step 4: Set up the connection.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.
  1. Select one or more topics.
  2. Enter a connector Name.
  3. Select an Input message format (data coming from the Kafka topic): AVRO, PROTOBUF, JSON_SR (JSON Schema), JSON (schemaless), or BYTES. A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  4. Enter your Kafka Cluster credentials. The credentials are either the cluster API key and secret or the service account API key and secret.
  5. Enter your HTTP URL. Use an HTTP or HTTPS connection URL. For example, http://eshost1:9200 or https://eshost3:9200. If the connection URL is HTTPS, HTTPS is used for all connections. A URL with no protocol is considered HTTP.
  6. Select either PUT or POST for the HTTP API Request Method.
  7. Enter the number of tasks to use with the connector. More tasks may improve performance (that is, consumer lag is reduced with multiple tasks running).

See Configuration Properties for configuration property values and descriptions.

Step 5: Launch the connector.

Verify the connection details and click Launch.

Launch the connector

Step 6: Check the connector status.

The status for the connector should go from Provisioning to Running.

Connector status

Step 7: Check for records.

Verify that records are being produced at the endpoint.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

Using the Confluent Cloud Console

To set up and run the connector using the Confluent Cloud Console, complete the following steps.

Note

Make sure you have all your prerequisites completed.

Step 1: List the available connectors.

Enter the following command to list available connectors:

ccloud connector-catalog list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

ccloud connector-catalog describe <connector-catalog-name>

For example:

ccloud connector-catalog describe HttpSink

Example output:

Following are the required configs:
connector.class: HttpSink
input.data.format
name
kafka.api.key
kafka.api.secret
http.api.url
tasks.max
topics

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "connector.class": "HttpSink",
  "input.data.format": "JSON",
  "name": "HttpSinkConnector_0",
  "kafka.api.key": "****************",
  "kafka.api.secret": "************************************************",
  "http.api.url": "http:://eshost1:9200/",
  "request.method": "POST",
  "tasks.max": "1",
  "topics": "orders",
}

Note the following property definitions:

  • "connector.class": Identifies the connector plugin name.
  • "input.data.format": Sets the input message format (data coming from the Kafka topic). Valid entries are AVRO, JSON_SR, PROTOBUF, JSON, or BYTES. You must have Confluent Cloud Schema Registry configured if using a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  • "name": Sets a name for your new connector.
  • "kafka.api.key" and ""kafka.api.secret": These credentials are either the cluster API key and secret or the service account API key and secret.
  • "http.api.url": Enter an HTTP or HTTPS connection URL. For example, http://eshost1:9200 or https://eshost3:9200. If the connection URL is HTTPS, HTTPS is used for all connections. A URL with no protocol is considered HTTP.
  • "request.method": Enter an HTTP API Request Method: PUT or POST.
  • "tasks.max": Enter the maximum number of tasks for the connector to use. More tasks may improve performance (that is, consumer lag is reduced with multiple tasks running).
  • "topics": Enter the topic name or a comma-separated list of topic names.

For configuration property values and descriptions, see Configuration Properties.

Step 3: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

ccloud connector create --config <file-name>.json

For example:

ccloud connector create --config http-sink-config.json

Example output:

Created connector HttpSinkConnector_0 lcc-do6vzd

Step 4: Check the connector status.

Enter the following command to check the connector status:

ccloud connector list

Example output:

ID           |             Name              | Status  | Type | Trace
+------------+-------------------------------+---------+------+-------+
lcc-do6vzd   | HttpSinkConnector_0           | RUNNING | sink |       |

Step 5: Check for records.

Verify that records are populating the endpoint.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

Configuration Properties

The following connector configuration properties can be used with the HTTP Sink connector for Confluent Cloud.

HTTP server details

http.api.url

HTTP API URL.

  • Type: string
  • Importance: high
request.method

HTTP Request Method. Valid options are PUT or POST.

  • Type: string
  • Default: post
  • Valid Values: one of [POST, PUT]
  • Importance: high
headers

HTTP headers to be included in all requests. Individual headers should be separated by the header.separator. Forbidden header names and X-Forwarded headers are not allowed.

  • Type: string
  • Default: “”
  • Importance: high
header.separator

Separator character used in headers property.

  • Type: string
  • Default: |
  • Importance: high

Server batches

request.body.format

Used to produce request body in either JSON or String format.

  • Type: string
  • Default: string
  • Valid Values: one of [string, json]
  • Importance: medium
batch.key.pattern

Pattern used to build the key for a given batch. ${key} and ${topic} can be used to include message attributes here.

  • Type: string
  • Default: someKey
  • Importance: high
batch.max.size

The number of records accumulated in a batch before the HTTP API is invoked.

  • Type: int
  • Default: 1
  • Importance: high
batch.prefix

Prefix added to record batches. This is applied once at the beginning of the batch of records.

  • Type: string
  • Default: “”
  • Importance: high
batch.suffix

Suffix added to record batches. This is applied once at the end of the batch of records.

  • Type: string
  • Default: “”
  • Importance: high
batch.separator

Separator for records in a batch.

  • Type: string
  • Default: ,
  • Importance: high
batch.json.as.array

Whether or not to use an array to bundle json records. Only used when request.body.format is set to json. This can be disabled only when batch.max.size is set to 1.

  • Type: boolean
  • Default: true
  • Importance: high

Server authentication

auth.type

Authentication type of the endpoint. Valid values are NONE, BASIC, OAUTH2 (Client Credentials grant type only).

  • Type: string
  • Default: NONE
  • Importance: high
connection.user

The username to be used with an endpoint requiring authentication.

  • Type: string
  • Default: “”
  • Importance: high
connection.password

The password to be used with an endpoint requiring authentication.

  • Type: password
  • Default: [hidden]
  • Importance: high
oauth2.token.url

The URL to be used for fetching OAuth2 token. Client Credentials is the only supported grant type.

  • Type: string
  • Default: “”
  • Importance: high
oauth2.client.id

The client id used when fetching OAuth2 token.

  • Type: string
  • Default: “”
  • Importance: high
oauth2.client.secret

The secret used when fetching OAuth2 token.

  • Type: password
  • Default: [hidden]
  • Importance: high
oauth2.token.property

The name of the property containing the OAuth2 token returned by the http proxy. Default value is access_token.

  • Type: string
  • Default: access_token
  • Importance: high
oauth2.client.auth.mode

Specifies how to encode client_id and client_secret in the OAuth2 authorization request. If set to ‘header’, the credentials are encoded as an 'Authorization: Basic <base-64 encoded client_id:client_secret>' HTTP header. If set to ‘url’, then client_id and client_secret are sent as URL encoded parameters.

  • Type: string
  • Default: header
  • Valid Values: one of [header, url]
  • Importance: low
oauth2.client.scope

The scope used when fetching OAuth2 token.

  • Type: string
  • Default: any
  • Importance: low

Server retries

retry.on.status.codes

The HTTP error codes to retry on. Comma-separated list of codes or range of codes to retry on. Ranges are specified with start and optional end code. Range boundaries are inclusive. For instance, 400- includes all codes greater than or equal to 400. 400-500 includes codes from 400 to 500, including 500. Multiple ranges and single codes can be specified together to achieve fine-grained control over retry behavior. For example, 404,408,500- will retry on 404 NOT FOUND, 408 REQUEST TIMEOUT, and all 5xx error codes.

  • Type: string
  • Default: 400-
  • Importance: medium
max.retries

The maximum number of times to retry on errors before failing the task.

  • Type: int
  • Default: 10
  • Valid Values: [1,…]
  • Importance: medium
retry.backoff.ms

The initial duration in milliseconds to wait following an error before a retry attempt is made. Subsequent backoffs will be exponentially larger than the first duration.

  • Type: int
  • Default: 3000
  • Valid Values: [100,…]
  • Importance: medium
http.connect.timeout.ms

Time to wait for a connection to be established.

  • Type: int
  • Default: 30000
  • Importance: medium
http.request.timeout.ms

Time to wait for a request response to arrive.

  • Type: int
  • Default: 30000
  • Importance: medium

Server regular expressions (regex)

regex.patterns

Regular expression patterns used for replacements in the message sent to the HTTP service. Multiple regular expression patterns can be specified, but they must be separated by regex.separator.

  • Type: string
  • Default: “”
  • Importance: low
regex.replacements

Regex replacements to use with the patterns in regex.patterns. Multiple replacements can be specified, but they must be separated by regex.separator. ${key} and ${topic} can be used here.

  • Type: string
  • Default: “”
  • Importance: low
regex.separator

Separator character used in regex.patterns and regex.replacements property.

  • Type: string
  • Default: ~
  • Importance: high

SSL, Key Store, and Trust Store

https.ssl.key.password

The password of the private key in the key store file. This is optional for client.

  • Type: password
  • Default: null
  • Importance: high
https.ssl.keystore.location

The location of the key store file. This is optional for client and can be used for two-way authentication for client.

  • Type: string
  • Default: null
  • Importance: high
https.ssl.keystore.password

The store password for the key store file. This is optional for a client and is only needed if ssl.keystore.location is configured.

  • Type: password
  • Default: null
  • Importance: high
https.ssl.truststore.location

The location of the trust store file.

  • Type: string
  • Default: null
  • Importance: high
https.ssl.truststore.password

The password for the trust store file. If a password is not set, access to the truststore is still available, but integrity checking is disabled.

  • Type: password
  • Default: null
  • Importance: high
https.ssl.enabled.protocols

The list of protocols enabled for SSL connections.

  • Type: list
  • Default: TLSv1.2,TLSv1.1,TLSv1
  • Importance: medium
https.host.verifier.enabled

Enables or disables host verification using an endpoint identification algorithm. When set to false the algorithm is set to empty, which disables host verification.

  • Type: boolean
  • Default: true
  • Importance: high

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../_images/topology.png