Stream Lineage¶
To move forward with updates to mission-critical applications or answer questions on important subjects like data regulation and compliance, teams need an easy means of comprehending the big picture journey of data in motion.
Stream lineage provides a graphical UI of event streams and data relationships with both a bird’s eye view and drill-down magnification for answering questions like:
- Where did data come from?
- Where is it going?
- Where, when, and how was it transformed?
Answers to questions like these allow developers to trust the data they’ve found, and gain the visibility needed to make sure their changes won’t cause any negative or unexpected downstream impact. Developers can learn and make decisions quickly with live metrics and metadata inspection embedded directly within lineage graphs.
First Look¶
What stream lineage shows¶
Stream lineage in Confluent Cloud is represented visually to show the movement of data from source to destination, and how it is transformed as it moves. The lineage graph always shows the activity of producers and consumers of data for the last 10 minutes.
How to access stream lineage views¶
There are multiple ways to get into the stream lineage view, as described in Summary of navigation paths. This example shows one path.
To view the stream lineage UIs:
- Log on to Confluent Cloud.
- Select an environment.
- Select a cluster.
- Select a topic.
- Click See in Stream Lineage on the top right of the topic page.
- The stream lineage for that topic is shown.

Tip
The stream lineage shown in this example is the result of setting up a data pipeline based on several ksqlDB query streams. If you haven’t set up a data pipleline yet, your lineage view may only show a single, lonely event node.
To get an interesting lineage like the one shown above, take a spin through the tutorial in the next section!
Tutorial¶
In order to really see stream lineage in action, you need to configure topics, producers, and consumers to create a data pipeline. Once you have events flowing into your pipeline, you can use stream lineage to inspect where data is coming from, what transformations are applied to it, and where it’s going.
Select an environment, cluster, and Schema Registry¶
Add an environment or select an existing one.
Add a cluster or select an existing one on which to run the demo.
If you create a new cluster:
- You must select a cluster type. You can choose any cluster type.
- Choose a cloud provider and region.
- Click Continue to review configuration and costs, usage limits, and uptime service level agreement (SLA)
Then click Launch Cluster
Enable a Schema Registry (if not already enabled) by navigating to the schemas page for your cluster and follow the prompts to choose a region and cloud provider.
The Schema Registry settings and information will be available on the Schema Registry tab for the environment.
Generate and save the Schema Registry API key and secret for this Schema Registry. (Save the key to use later on step 10 of this procedure.)
Tip
If you need help with these initial steps, see Quick Start for Apache Kafka using Confluent Cloud.
Create the “stocks” topic and generate data¶
(Optional) Create a topic named
stocks
.Tip
- This step is optional because adding the Datagen connector (as described in next steps) will automatically create the
stocks
topic if it does not exist. - To learn more about manually creating topics and working with them, see Managing Topics in Confluent Cloud.
- This step is optional because adding the Datagen connector (as described in next steps) will automatically create the
Choose Connectors from the menu and select the Datagen source connector.
Add the Connect Datagen source connector to generate sample data to the
stocks
topic, using these settings:- Name:
StockSourceConnector
- Which topic do you want to send data to?:
stocks
- Output message format:
AVRO
- Quickstart:
STOCK_TRADES
- Max interval between messages:
1000
- Number of tasks for this connector:
1
You’ll also need to generate and save an API key and secret for this cluster, if you have not done so already.
- Name:
Click Next, review the settings for the connector, and click Launch to start sending data to the target topic.
The connector first shows as Provisioning, then Running when it is fully initiated.
Create a ksqlDB app¶
- Navigate to ksqlDB
- Click Create application myself.
- Select Global access and click Continue.
- Provide an application name, such as
ksqlDB_stocks_app
, and accept the defaults for the number of streaming units. - Click Launch application.
Tip
- Provisioning will take some time. In some cases, it can take up to an hour.
- By creating the ksqlDB app with global access, you avoid having to create specific ACLs for the app itself. With global access, the ksqlDB cluster is running with the same level of access to Kafka as the user who provisions ksqlDB. If you are interested in learning how to manage ACLs on a ksqlDB cluster with granular access, see Appendix A: Creating a ksqlDB app with granular access and assigning ACLs.
Verify your ksqlDB app is running¶
Return to the list of ksqlDB apps on the Confluent Cloud UI.
Your ksqlDB app should have completed Provisioning, and show a status of Up
.

Create persistent streams in ksqlDB to filter on stock prices¶
Navigate to the ksqlDB Editor and click into your ksqlDB app, ksqlDB_stocks_app
(ksqlDB_stocks_app > Editor), to create the following persistent streams.
Specify each query statement in the Editor and click Run query to start the query. You can click the Streams tab to view a list of running queries.
Create a stream for the
stocks
topic, then create a persistent stream that filters on stocks with price <= 100. This feed the results to thestocks_under_100
topic.You’ll need to specify and run three separate queries for this step. You start by creating the
stocks
stream, then add the filters to find and list stocks under $100. After each of these, click Run query, then clear the editor to specify the next statement.CREATE STREAM stocks WITH (KAFKA_TOPIC = 'stocks', VALUE_FORMAT = 'AVRO');
CREATE STREAM stocks_under_100 WITH (KAFKA_TOPIC='stocks_under_100', PARTITIONS=10, REPLICAS=3) AS SELECT * FROM stocks WHERE (price <= 100);
SELECT * FROM stocks_under_100 EMIT CHANGES;
When you have these running, click the Streams tab. You should have two new streams,
STOCKS
andSTOCKS_UNDER_100
. (The last statement is a transient query on the stream,STOCKS_UNDER_100
, to get some data onto the UI.)Create a persistent stream that filters on stocks to BUY, and feed the results to the
stocks_buy
topic.You’ll need to specify and run two separate queries for this step. After each of these, click Run query, then clear the editor to specify the next statement.
CREATE STREAM stocks_buy WITH (KAFKA_TOPIC='stocks_buy', PARTITIONS=10, REPLICAS=3) AS SELECT * FROM stocks WHERE side='BUY';
SELECT * FROM stocks_buy EMIT CHANGES;
Create a persistent stream that filters on stocks to SELL.
You’ll need to specify and run two separate queries for this step. After each of these, click Run query, then clear the editor to specify the next statement.
CREATE STREAM stocks_sell WITH (KAFKA_TOPIC='stocks_sell', PARTITIONS=10, REPLICAS=3) AS SELECT * FROM stocks WHERE side='SELL';
SELECT * FROM stocks_sell EMIT CHANGES;
When you have completed these steps, click the ksqlDB > Streams tab. You should have four persistent ksqlDB query streams:
STOCKS
STOCKS_BUY
STOCKS_SELL
STOCKS_UNDER_100

These streams will have associated topics and schemas listed on those pages, respectively.

Consume events from the “stocks” topic¶
Now, set up a consumer using the Confluent Cloud CLI to consume events from your stocks
topic.
Log on to the Confluent Cloud CLI. (Provide username and password at prompts.)
ccloud login --url https://confluent.cloud
List the environments to verify you are on the environment.
ccloud environment list
If needed, re-select the environment you’ve been using for this demo.
ccloud environment use <ENVIRONMENT_ID>
List the clusters to verify you are on the right cluster.
ccloud kafka cluster list
If needed, re-select the cluster you’ve been using for this demo.
ccloud kafka cluster use <KAFKA_CLUSTER_ID>
Create Kafka API credentials for the consumer.
Create an API key.
ccloud api-key create --resource <KAFKA_CLUSTER_ID>
Use the API key.
ccloud api-key use <API_KEY> --resource <KAFKA_CLUSTER_ID>
Alternatively, you can store the key.
ccloud api-key store --resource <KAFKA_CLUSTER_ID>
Run a CLI consumer.
ccloud kafka topic consume stocks_buy --value-format avro --group buy_group
When prompted, provide the Schema Registry API key you generated in the first steps.
You should see the consumer data being generated to the consumer at the command line, for example:
Vickys-MacBook-Pro:~ vicky$ ccloud kafka topic consume stocks_buy --value-format avro --group buy_group Enter your Schema Registry API key: ***************** Enter your Schema Registry API secret: **************************************************************** Starting Kafka Consumer. ^C or ^D to exit {"SIDE":{"string":"BUY"},"QUANTITY":{"int":959},"SYMBOL":{"string":"ZVZZT"},"PRICE":{"int":704},"ACCOUNT":{"string":"XYZ789"},"USERID":{"string":"User_8"}} {"ACCOUNT":{"string":"ABC123"},"USERID":{"string":"User_1"},"SIDE":{"string":"BUY"},"QUANTITY":{"int":1838},"SYMBOL":{"string":"ZWZZT"},"PRICE":{"int":405}} {"QUANTITY":{"int":2163},"SYMBOL":{"string":"ZTEST"},"PRICE":{"int":78},"ACCOUNT":{"string":"ABC123"},"USERID":{"string":"User_8"},"SIDE":{"string":"BUY"}} {"PRICE":{"int":165},"ACCOUNT":{"string":"LMN456"},"USERID":{"string":"User_2"},"SIDE":{"string":"BUY"},"QUANTITY":{"int":4675},"SYMBOL":{"string":"ZJZZT"}} {"QUANTITY":{"int":1702},"SYMBOL":{"string":"ZJZZT"},"PRICE":{"int":82},"ACCOUNT":{"string":"XYZ789"},"USERID":{"string":"User_7"},"SIDE":{"string":"BUY"}} {"ACCOUNT":{"string":"LMN456"},"USERID":{"string":"User_9"},"SIDE":{"string":"BUY"},"QUANTITY":{"int":2982},"SYMBOL":{"string":"ZVV"},"PRICE":{"int":643}} {"SIDE":{"string":"BUY"},"QUANTITY":{"int":3687},"SYMBOL":{"string":"ZJZZT"},"PRICE":{"int":514},"ACCOUNT":{"string":"ABC123"},"USERID":{"string":"User_5"}} {"USERID":{"string":"User_5"},"SIDE":{"string":"BUY"},"QUANTITY":{"int":289},"SYMBOL":{"string":"ZJZZT"},"PRICE":{"int":465},"ACCOUNT":{"string":"XYZ789"}} ...
Explore the data pipleline in stream lineage¶
Stream data quick tour¶
With the producers and consumers up and running, you can use stream lineage to visualize and explore the flow of data from the source connector
to the STOCKS topic, where queries filter the data on specified limits and generate lists to your three topics:
- STOCKS_BUY
- STOCKS_SELL
- STOCKS_UNDER_100
Search for stocks topic on the search box.
Click See in Stream Lineage on the top right of the stocks topic page.
The stream lineage for the
stocks
topic is shown.Hover on a node for a high level description of the data source and throughput.
This example shows a ksqlDB query node
The thumbnail in this case shows:
- Mode and type: persistent stream
- Total number of bytes in and out of the flow for the last 10 minutes
- Total number of messages in and out of the flow for the last 10 minutes
This example shows a topic node:
The thumbnail in this case shows:
- Topic name
- Schema format (can be Avro, Protobuf, or JSON schema)
- Number of partitions for the topic
- Total number of bytes into the topic during the last 10 minutes
- Total number of messages received by the topic in the last 10 minutes
Click a node to inspect.
Return to the diagram, and hover on an edge to get a description of the flow between the given nodes.
Click the edge to inspect.
Tabs on node drilldown to inspect queries¶
The stream lineage inspect panel surfaces details and metrics about the queries based on the nodes you select. The tabs available and details shown will vary, depending on the query. For example:
Overview tab - Shows per topic throughput, along with bytes consumed and produced.
Messages tab - Shows the list of messages the topic received.
Schema tab - Shows a view-only copy of the schema for the topic. An editable version is available directly from the topic (see Manage Schemas in Confluent Cloud).
Query tab - Shows a view-only copy of the persistent query that is sending results to the topic. (For details on stream processing, see the Confluent Cloud ksqlDB Quick Start and ksqlDB Stream Processing.)
Try this¶
- Click the stocks topic node, and scroll through the message throughput timelines on the Overview tab, then click Edit topic to go directly to the topic.
- Click the stocks_buy topic node, then click the Schema tab to view its associated schema.
- Click a query, such as stocks_buy query, and click the Schema tab. This shows you a menu style view of the same schema because the schema associated with the stocks_buy topic is coming from the stocks_buy query.
- To verify this, click View query to link to the ksqlDB_stocks_app, then click the Flow tab under that app, and click stocks_buy on that diagram. (Note that you also can visualize a data flow particular to that query from directly within the ksqlDB app, but not the combined flows of all queries to all topics, as is shown on stream lineage.)
Hide or Show Internal Topics¶
From any stream lineage graph view, you have the option to hide or show internal (system) topics. System topics are those that manage and track Confluent Cloud metadata, such as replication factors, partition counts, and so forth. Typically, this system metadata is of less interest than data related your own topics, and you’ll want to hide it.

Use Consumer Groups¶
From any stream lineage graph view, you have the option to visually group consumers by consumer group. Grouping by consumer group has a negative impact on performance, so you may want to toggle this on just to get a snapshot visual of groupings, and then toggle it off.

Browsing the Diagram View¶
Export a Lineage Diagram¶
To export the current diagram, click the Export icon on the lower right tool panel.
Reset the View¶
To reset the view to center on the entity that is the original focus of the diagram,
click the Reset icon on the lower right tool panel.
Tip
Reset view is only applicable when you launch the lineage diagram from within an entity, such as a topic, ksqlDB table or stream, producer, consumer, and so forth. It is not applicable if you launch the lineage diagram from the left menu or dashboard because that is a global view, not centered on any specific node to begin with.
Zoom In or Out¶
Use the + and - buttons on the lower right tool panel to zoom in or zoom out on the lineage diagram.
Traverse the Diagram¶
To explore the diagram, click, hold, and drag the cursor, or use analogous actions such as three-finger drag on a Mac trackpad.
All Streams¶
Click All Streams on the lower right of a diagram to view cards representing the data flows.
The default view shows Stream 1.

Click another card to focus in on a particular stream, for example Stream 2. The diagram updates to show only the selected stream.

Understanding Data Nodes¶
Understanding Edges¶
Edge thumbnails and drilldowns describe the flow between the given nodes. They show:
- The node where the data came from
- The node where the data is going to
- Bytes transfered
- Number of messages transferred
Hovering on an edge gives you the thumbnail.
Drilldown on an edge provides the tab view.
Appendix A: Creating a ksqlDB app with granular access and assigning ACLs¶
As an alternative to creating the ksqlDB app with global access, you can create the app with granular access, assign a service account to it, and then create ACLs limited specifically to your ksqlDB app. There may be cases where you want to limit access to the ksqlDB cluster to specific topics or actions.
Navigate to ksqlDB
Click Create application myself.
Select Granular access and click Continue.
Under Create a service account:
- Select Create a new one (unless you already have an account you want to use).
- Provide a new service account name and description, such as
stocks_trader
ksqlDB_stocks_app
. - Check the box to add required ACLs when the ksqlDB app is created.
Provide access to the
stocks
topic (this should already be selected), and click Continue.Create the ACLs for your ksqlDB app as follows (skip this step if you have done this previously for this app).
Log on to the Confluent Cloud CLI. (Provide username and password at prompts.)
ccloud login --url https://confluent.cloud
List the environments to get the environment ID.
ccloud environment list
Select the environment you’ve been using for this demo.
ccloud environment use <ENVIRONMENT_ID>
List the clusters to get the right cluster ID.
ccloud kafka cluster list
Select the cluster you’ve been using for this demo.
ccloud kafka cluster use <KAFKA_CLUSTER_ID>
List the ksqlDB apps to get the ID for your app.
ccloud ksql app list
Run this command to get the service account ID.
ccloud ksql app configure-acls <KSQL_APP_ID> * --cluster <KAFKA_CLUSTER_ID> --dry-run
Copy the service account ID (after
User:<SERVICE_ACCOUNT_ID>
in the output).Allow READ access to all topics on the ksql app for your service account ID.
ccloud kafka acl create --allow --service-account <SERVICE_ACCOUNT_ID> --operation READ --topic '*'
Allow WRITE access to all topics on the ksql app for your service account ID.
ccloud kafka acl create --allow --service-account <SERVICE_ACCOUNT_ID> --operation WRITE --topic '*'
Allow CREATE access for all topics on the ksql app for your service account ID.
ccloud kafka acl create --allow --service-account <SERVICE_ACCOUNT_ID> --operation CREATE --topic '*'