Configure Confluent Cloud Clients¶
You can write Kafka client applications to connect to Confluent Cloud in any supported language. You just need to configure the clients using the Confluent Cloud cluster credentials. The following table lists the supported clients:
Client | System Requirements |
---|---|
.NET |
|
C/C++ |
|
Golang |
|
Python |
|
Java and JMS |
|
Refer to the client examples for supported languages in Code Examples. The “Hello, World!” examples produce to and consume from any Kafka cluster, including Confluent Cloud. There are additional examples using Confluent Cloud Schema Registry and Avro for the subset of languages that support it.
Note
All clients that connect to Confluent Cloud must support SASL_PLAIN authentication and TLS 1.2 encryption.
Use Client Code Examples¶
The easiest way to get started connecting your client apps to Confluent Cloud is to copy-paste from the examples on the Confluent Cloud UI.
Log on to Confluent Cloud, navigate to the tools and client configuration examples, and grab the example code for your client as follows:
- Select an environment.
- Select a cluster.
- Select Configure a Client on the Overview page, or choose Clients from the navigation menu.
- Select the language you are using for your client application.
- Copy and paste the displayed example code into your application source code.
The examples on the Confluent Cloud UI also provide links out to full demos on GitHub for each language, and the commands to clone the GitHub repository.
Java Client¶
Following is a an end-to-end walkthrough of how to connect a client app to Confluent Cloud, using a Java client as an example.
Log in to your cluster using the ccloud login command with the cluster URL specified.
ccloud login
Enter your Confluent Cloud credentials: Email: susan@myemail.com Password:
Set the Confluent Cloud environment.
Get the environment ID.
ccloud environment list
Your output should resemble:
Id | Name +-------------+--------------------+ * t2703 | default env-m2561 | demo-env-102893 env-vnywz | ccloud-demo env-qzrg2 | data-lineage-demo env-250o2 | my-new-environment
Set the environment using the ID (
<env-id>
).ccloud environment use <env-id>
Your output should resemble:
Now using "env-vnywz" as the default (active) environment.
Set the cluster to use.
Get the cluster ID.
ccloud kafka cluster list
Your output should resemble:
Id | Name | Type | Provider | Region | Availability | Status +-------------+-----------+-------+----------+----------+--------------+--------+ lkc-oymmj | cluster_1 | BASIC | gcp | us-east4 | single-zone | UP * lkc-7k6kj | cluster_0 | BASIC | gcp | us-east1 | single-zone | UP
Set the cluster using the ID (
<cluster-id>
). This is the cluster where the commands are run.ccloud kafka cluster use <cluster-id>
To verify the selected cluster after setting it, type
ccloud kafka cluster list
again. The selected cluster will have an asterisk (*
) next to it.
Create an API key and secret, and save them. This is required to produce or consume to your topic.
You can generate the API key from the Confluent Cloud Console or on the Confluent Cloud CLI. Be sure to save the API key and secret.
On the web UI, click the Kafka API keys tab and click Create key. Save the key and secret, then click the checkbox next to I have saved my API key and secret and am ready to continue.
Or, from the Confluent Cloud CLI, type the following command:
ccloud api-key create --resource <resource-id>
Your output should resemble:
Save the API key and secret. The secret is not retrievable later. +---------+------------------------------------------------------------------+ | API Key | ABC123xyz | | Secret | 123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABCx | +---------+------------------------------------------------------------------+
Optional: Add the API secret with
ccloud api-key store <key> <secret>
. When you create an API key with the CLI, it is automatically stored locally. However, when you create an API key using the UI, API, or with the CLI on another machine, the secret is not available for CLI use until you store it. This is required because secrets cannot be retrieved after creation.ccloud api-key store <api-key> <api-secret> --resource <resource-id>
Set the API key to use for Confluent Cloud CLI commands with the command
ccloud api-key use <key> --resource <resource-id>
.ccloud api-key use <api-key> --resource <resource-id>
Get the communication endpoint for the selected cluster.
Make sure you have the cluster ID from the previous steps, or retype
ccloud kafka cluster list
to show the active cluster (with the asterisk by it), and copy the cluster ID.Run the following command to view details on the cluster, including security and API endpoints.
ccloud kafka cluster describe <cluster-id>
Your output should resemble:
+--------------+-----------------------------------------------------------+ | Id | lkc-7k6kj | | Name | cluster_0 | | Type | BASIC | | Ingress | 100 | | Egress | 100 | | Storage | 5000 | | Provider | gcp | | Availability | single-zone | | Region | us-east1 | | Status | UP | | Endpoint | SASL_SSL://pkc-4yyd6.us-east1.gcp.confluent.cloud:9092 | | ApiEndpoint | https://pkac-ew1dj.us-east1.gcp.confluent.cloud | | RestEndpoint | https://pkac-ew1dj.us-east1.gcp.confluent.cloud:443 | +--------------+-----------------------------------------------------------+
Copy and save the value shown for the
ApiEndpoint
, as you will need this in the next steps to specify the bootstrap server URL that client applications will use to communicate with this cluster.
Tip
You can also get the cluster ID and bootstrap server values from Cluster settings on the Confluent Cloud UI.
In the Confluent Cloud UI, enable Confluent Cloud Schema Registry and get the Schema Registry endpoint URL, the API key, and the API secret. For more information, see Quick Start for Schema Management on Confluent Cloud.
In the Environment Overview page, click Clusters and select your cluster from the list.
From the navigation menu, click Data In/Out -> Clients. Insert the following configuration settings into your client code.
sasl.mechanism=PLAIN bootstrap.servers=<bootstrap-server-url> sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; security.protocol=SASL_SSL client.dns.lookup=use_all_dns_ips # Set to 10 seconds, so brokers are not overwhelmed in cases of incorrectly configured or expired credentials reconnect.backoff.max.ms=10000 # Keep this default for cloud environments request.timeout.ms=30000 # Producer specific settings acks=all linger.ms=5 # Admin specific settings # Set to 5 minutes to avoid unnecessary timeouts in cloud environments default.api.timeout.ms=300000 # Consumer specific settings # Set to 45 seconds to avoid unnecessary timeouts in cloud environments session.timeout.ms=45000 # Schema Registry specific settings basic.auth.credentials.source=USER_INFO schema.registry.basic.auth.user.info=<sr-api-key>:<sr-api-secret> schema.registry.url=<schema-registry-url> # Enable Avro serializer with Schema Registry (optional) key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
Tip
For
<bootstrap-server-url>
, use the URL you got for theApiEndpoint
withccloud kafka cluster describe <cluster-id>
in a previous step, or navigate to Cluster settings on the Confluent Cloud UI to retrieve the same information. This specifies the endpoint by which your client application will communicate with the cluster.Insert this Java code to configure JVM security.
// JVM security configuration to cache successful name lookups java.security.Security.setProperty(“networkaddress.cache.ttl”, “30"); java.security.Security.setProperty(“networkaddress.cache.negative.ttl”, “0");
If a consumer commits offsets manually, commit offsets at reasonable intervals (e.g., every 30 seconds), instead of on every record.
librdkafka-based C Clients¶
Confluent’s official Python, Golang, and .NET clients for Apache Kafka® are all based on librdkafka, as are other community-supported clients such as node-rdkafka.
Log in to your cluster using the ccloud login command with the cluster URL specified.
ccloud login
Enter your Confluent Cloud credentials: Email: susan@myemail.com Password:
Set the Confluent Cloud environment.
Get the environment ID.
ccloud environment list
Your output should resemble:
Id | Name +-------------+--------------------+ * t2703 | default env-m2561 | demo-env-102893 env-vnywz | ccloud-demo env-qzrg2 | data-lineage-demo env-250o2 | my-new-environment
Set the environment using the ID (
<env-id>
).ccloud environment use <env-id>
Your output should resemble:
Now using "env-vnywz" as the default (active) environment.
Set the cluster to use.
Get the cluster ID.
ccloud kafka cluster list
Your output should resemble:
Id | Name | Type | Provider | Region | Availability | Status +-------------+-----------+-------+----------+----------+--------------+--------+ lkc-oymmj | cluster_1 | BASIC | gcp | us-east4 | single-zone | UP * lkc-7k6kj | cluster_0 | BASIC | gcp | us-east1 | single-zone | UP
Set the cluster using the ID (
<cluster-id>
). This is the cluster where the commands are run.ccloud kafka cluster use <cluster-id>
To verify the selected cluster after setting it, type
ccloud kafka cluster list
again. The selected cluster will have an asterisk (*
) next to it.
Create an API key and secret, and save them. This is required to produce or consume to your topic.
You can generate the API key from the Confluent Cloud Console or on the Confluent Cloud CLI. Be sure to save the API key and secret.
On the web UI, click the Kafka API keys tab and click Create key. Save the key and secret, then click the checkbox next to I have saved my API key and secret and am ready to continue.
Or, from the Confluent Cloud CLI, type the following command:
ccloud api-key create --resource <resource-id>
Your output should resemble:
Save the API key and secret. The secret is not retrievable later. +---------+------------------------------------------------------------------+ | API Key | ABC123xyz | | Secret | 123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABCx | +---------+------------------------------------------------------------------+
Optional: Add the API secret with
ccloud api-key store <key> <secret>
. When you create an API key with the CLI, it is automatically stored locally. However, when you create an API key using the UI, API, or with the CLI on another machine, the secret is not available for CLI use until you store it. This is required because secrets cannot be retrieved after creation.ccloud api-key store <api-key> <api-secret> --resource <resource-id>
Set the API key to use for Confluent Cloud CLI commands with the command
ccloud api-key use <key> --resource <resource-id>
.ccloud api-key use <api-key> --resource <resource-id>
Get the communication endpoint for the selected cluster.
Make sure you have the cluster ID from the previous steps, or retype
ccloud kafka cluster list
to show the active cluster (with the asterisk by it), and copy the cluster ID.Run the following command to view details on the cluster, including security and API endpoints.
ccloud kafka cluster describe <cluster-id>
Your output should resemble:
+--------------+-----------------------------------------------------------+ | Id | lkc-7k6kj | | Name | cluster_0 | | Type | BASIC | | Ingress | 100 | | Egress | 100 | | Storage | 5000 | | Provider | gcp | | Availability | single-zone | | Region | us-east1 | | Status | UP | | Endpoint | SASL_SSL://pkc-4yyd6.us-east1.gcp.confluent.cloud:9092 | | ApiEndpoint | https://pkac-ew1dj.us-east1.gcp.confluent.cloud | | RestEndpoint | https://pkac-ew1dj.us-east1.gcp.confluent.cloud:443 | +--------------+-----------------------------------------------------------+
Copy and save the value shown for the
ApiEndpoint
, as you will need this in the next steps to specify the bootstrap server URL that client applications will use to communicate with this cluster.
Tip
You can also get the cluster ID and bootstrap server values from Cluster settings on the Confluent Cloud UI.
In the Confluent Cloud UI, on the Environment Overview page, click Clusters and select your cluster from the list.
From the navigation menu, click Data In/Out -> Clients. Click C/C++ and insert the following configuration settings into your client code.
bootstrap.servers=<broker-list> broker.address.ttl=30000 api.version.request=true api.version.fallback.ms=0 broker.version.fallback=0.10.0.0 security.protocol=SASL_SSL ssl.ca.location=/usr/local/etc/openssl/cert.pem sasl.mechanisms=PLAIN sasl.username=<api-key> sasl.password=<api-secret> session.timeout.ms=45000
Tip
The
api.version.request
,broker.version.fallback
, andapi.version.fallback.ms
options instruct librdkafka to use the latest protocol version and not fall back to an older version.For more information about librdkafka and Kafka version compatibility, see the documentation. For a complete list of the librdkafka configuration options, see the configuration documentation.
Configuring clients for cluster rolls¶
Confluent Cloud regularly rolls all clusters for upgrades and maintenance. Rolling a cluster means updating all the brokers that make up that cluster one at a time, so that the cluster remains fully available and performant throughout the update. The Kafka protocol and architecture are designed for exactly this type of highly-available, fault-tolerant operation, so correctly configured clients will gracefully handle the broker changes that happen during a roll.
During a cluster roll clients may encounter the following retriable exceptions, which will generate warnings on correctly-configured clients:
UNKNOWN_TOPIC_OR_PARTITION: "This server does not host this topic-partition."
LEADER_NOT_AVAILABLE: "There is no leader for this topic-partition as we are in the middle of a leadership election."
NOT_LEADER_FOR_PARTITION: "This server is not the leader for that topic-partition."
NOT_ENOUGH_REPLICAS: "Messages are rejected since there are fewer in-sync replicas than required."
NOT_ENOUGH_REPLICAS_AFTER_APPEND: "Messages are written to the log, but to fewer in-sync replicas than required."
By default, Kafka producer clients will retry for 2 minutes, print these warnings to logs, and recover without any intervention. Consumer and admin clients default to retrying for 1 minute.
If clients are configured with insufficient retries or retry-time, the exceptions above will be logged as errors.
If clients run out of memory buffer space while retrying, and also run out of time while the client blocks waiting for memory, timeout exceptions will occur.
Recommendations¶
We do not recommend triggering internal alerts on the retriable warnings listed above, because they will occur regularly as part of normal operations and will be gracefully handled by correctly-configured clients without disruption to your streaming applications. Instead, we recommend limiting alerts to client errors that cannot be automatically retried.
For additional recommendations on how to architect, monitor, and optimize your Kafka applications on Confluent Cloud, refer to Developing Client Applications on Confluent Cloud.