Kafka Streams
In this tutorial you will:
- Discover the Kafka Streams library
- Learn when it should be chosen
- Create an application for stream processing
- Launch this application with a Kafka producer and a Kafka consumer
Pre-requisite
Basic knowledge of Kafka, its architectures and usage is required. Please refer to the Kafka Basics for thorough information.
A working cluster must be at your disposal. Using Adaltas Cloud, you can loggin on the edge node over SSH.
Environment
The Adaltas Cloud platform contains a Kafka cluster consisting of 3 Kafka brokers (kfk-brk-1.au.adaltas.cloud
, kfk-brk-2.au.adaltas.cloud
and kfk-brk-3.au.adaltas.cloud
) listening on port 6667 and a ZooKeeper cluster (quorum zoo-1.au.adaltas.cloud:2181,zoo-2.au.adaltas.cloud:2181,kfk-brk-3.au.adaltas.cloud:2181/kafka
). Please, read Kafka basics before.
We will need Kerberos authentication, so make sure you have a valid Kerberos ticket before continuing (see here for details).
When to choose Kafka Streams?
Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology.
Alternatives for Kafka Streams are Spark Streaming, Apache Flink, Apache Storm, Apache Samza, Apache Flume, Amazon Kinesis Streams and others.
Kafka Streams provides high reliability, resiliency and throughput, they are widely used for highly load systems. You shall consider using it when:
- Latency is extremely important and you need to process data with less then 1 ms delay
- Manipulations with data are relatively simple
- Input and output data is stored in Kafka
- Applications are written on Scala or Java
Tutorial
To learn Kafka Streams we will create an application written in Java and managed by Maven. Steps to be done:
- Creation of a structure for the Maven project
- Configuration file preparing
- Writing a code of the streaming application on Java
- Launch the application
Maven Project Structure
First of all, a new Streams project structure is initialized with:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.kafka \
-DarchetypeArtifactId=streams-quickstart-java \
-DarchetypeVersion=2.5.0 \
-DgroupId=streams.examples \
-DartifactId=streams.examples \
-Dversion=0.1 \
-Dpackage=myapps
We use the same set of values for groupId
, artifactId
and package
as in the Kafka Streams Tutorial, but you could choose your own. Assuming the above parameter values are used, this command will create a project structure that looks like this:
> tree streams.examples
streams.examples
|-- pom.xml
|-- src
|-- main
|-- java
| |-- myapps
| |-- LineSplit.java
| |-- Pipe.java
| |-- WordCount.java
|-- resources
|-- log4j.properties
Maven’s Configuration
The generated pom.xml
file contains information about the project and configuration details used by Maven to build the project. It already has the Streams dependency defined. You only need to add the slf4j
support:
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
Note: The pom.xml
targets Java 8, and does not work with higher Java versions.
We already have several programs in src/main/java
directory. In order to learn how to write a new
one, we will delete them.
cd streams.examples
rm src/main/java/myapps/*.java
Writing a Simple Application: Pipe
Our application takes messages from one topic and send them to another one without changing. We need to determine Streams execution configuration values:
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
shows the brokers in our Kafka clusterStreamsConfig.APPLICATION_ID_CONFIG
is the unique identifier of this Streams application- The value of
StreamsConfig.SECURITY_PROTOCOL
parameter in our case isSASL_PLAINTEXT
- Using Kerberos authentication is determined by
sasl.kerberos.service.name
andsasl.jaas.config
- Incoming sink should be received from Kafka producer with
incoming_topic
- Outcoming sink will be send to
outcoming_topic
The topics for incoming and outcoming sinks should be created before the application start. Have a look on Kafka basics to find out how to create them.
The complete code of the application is:
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class Pipe {
public static void main(String[] args) throws Exception {
// Set up properties for connection
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kfk-brk-1.au.adaltas.cloud:6667,kfk-brk-2.au.adaltas.cloud:6667,kfk-brk-3.au.adaltas.cloud:6667");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true;");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Define the computational logic
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("incoming_topic").to("outcoming_topic");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
// Start streaming app
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
Application Launch
Refer to Kafka basics to learn the details on how to create topics, start producers, listen events with consumers and obtain the value of the KAFKA_BROKERS
environment variable.
Here are the main steps of the program. For the sake of simplicity, you can open 3 SSH connection with you terminal and run those commands.
- Clean the package and launch the Pipe application:
mvn clean package
mvn exec:java -Dexec.mainClass=myapps.Pipe
- Start a producer on the topic
incoming_topic
with thekafka-console-producer
tool:
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh \
--broker-list $KAFKA_BROKERS \
--producer-property security.protocol=SASL_PLAINTEXT \
--topic 'incoming_topic'
- Start a consumer on the topic
outcoming_topic
with thekafka-console-consumer
tool:
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh \
--bootstrap-server $KAFKA_BROKERS \
--consumer-property security.protocol=SASL_PLAINTEXT \
--topic 'outcoming_topic' \
--from-beginning
All messages created by the Kafka producer are sent to the Kafka consumer. We do not change the messages in the given example but you could add some manipulations if needed. Have a look at Kafka Streams Tutorial to find more information.