Adaltas Cloud Academy
Sign out >

Kafka Streams

In this tutorial you will:

  • Discover the Kafka Streams library
  • Learn when it should be chosen
  • Create an application for stream processing
  • Launch this application with a Kafka producer and a Kafka consumer

Pre-requisite

Basic knowledge of Kafka, its architectures and usage is required. Please refer to the Kafka Basics for thorough information.

A working cluster must be at your disposal. Using Adaltas Cloud, you can loggin on the edge node over SSH.

Environment

The Adaltas Cloud platform contains a Kafka cluster consisting of 3 Kafka brokers (kfk-brk-1.au.adaltas.cloud, kfk-brk-2.au.adaltas.cloud and kfk-brk-3.au.adaltas.cloud) listening on port 6667 and a ZooKeeper cluster (quorum zoo-1.au.adaltas.cloud:2181,zoo-2.au.adaltas.cloud:2181,kfk-brk-3.au.adaltas.cloud:2181/kafka). Please, read Kafka basics before.

We will need Kerberos authentication, so make sure you have a valid Kerberos ticket before continuing (see here for details).

When to choose Kafka Streams?

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology.

Alternatives for Kafka Streams are Spark Streaming, Apache Flink, Apache Storm, Apache Samza, Apache Flume, Amazon Kinesis Streams and others.

Kafka Streams provides high reliability, resiliency and throughput, they are widely used for highly load systems. You shall consider using it when:

  • Latency is extremely important and you need to process data with less then 1 ms delay
  • Manipulations with data are relatively simple
  • Input and output data is stored in Kafka
  • Applications are written on Scala or Java

Tutorial

To learn Kafka Streams we will create an application written in Java and managed by Maven. Steps to be done:

  1. Creation of a structure for the Maven project
  2. Configuration file preparing
  3. Writing a code of the streaming application on Java
  4. Launch the application

Maven Project Structure

First of all, a new Streams project structure is initialized with:

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.kafka \
    -DarchetypeArtifactId=streams-quickstart-java \
    -DarchetypeVersion=2.5.0 \
    -DgroupId=streams.examples \
    -DartifactId=streams.examples \
    -Dversion=0.1 \
    -Dpackage=myapps

We use the same set of values for groupId, artifactId and package as in the Kafka Streams Tutorial, but you could choose your own. Assuming the above parameter values are used, this command will create a project structure that looks like this:

> tree streams.examples
streams.examples
|-- pom.xml
|-- src
    |-- main
        |-- java
        |   |-- myapps
        |       |-- LineSplit.java
        |       |-- Pipe.java
        |       |-- WordCount.java
        |-- resources
        |-- log4j.properties

Maven’s Configuration

The generated pom.xml file contains information about the project and configuration details used by Maven to build the project. It already has the Streams dependency defined. You only need to add the slf4j support:

<dependencies>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>${slf4j.version}</version>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>${slf4j.version}</version>
  </dependency>
</dependencies>

Note: The pom.xml targets Java 8, and does not work with higher Java versions.

We already have several programs in src/main/java directory. In order to learn how to write a new one, we will delete them.

cd streams.examples
rm src/main/java/myapps/*.java

Writing a Simple Application: Pipe

Our application takes messages from one topic and send them to another one without changing. We need to determine Streams execution configuration values:

  • StreamsConfig.BOOTSTRAP_SERVERS_CONFIG shows the brokers in our Kafka cluster
  • StreamsConfig.APPLICATION_ID_CONFIG is the unique identifier of this Streams application
  • The value of StreamsConfig.SECURITY_PROTOCOL parameter in our case is SASL_PLAINTEXT
  • Using Kerberos authentication is determined by sasl.kerberos.service.name and sasl.jaas.config
  • Incoming sink should be received from Kafka producer with incoming_topic
  • Outcoming sink will be send to outcoming_topic

The topics for incoming and outcoming sinks should be created before the application start. Have a look on Kafka basics to find out how to create them.

The complete code of the application is:

package myapps;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class Pipe {
    public static void main(String[] args) throws Exception {
        // Set up properties for connection
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kfk-brk-1.au.adaltas.cloud:6667,kfk-brk-2.au.adaltas.cloud:6667,kfk-brk-3.au.adaltas.cloud:6667");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.kerberos.service.name", "kafka");
        props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true;");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        // Define the computational logic
        final StreamsBuilder builder = new StreamsBuilder();
        builder.stream("incoming_topic").to("outcoming_topic");
        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);
        // Attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
        // Start streaming app
        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

Application Launch

Refer to Kafka basics to learn the details on how to create topics, start producers, listen events with consumers and obtain the value of the KAFKA_BROKERS environment variable.

Here are the main steps of the program. For the sake of simplicity, you can open 3 SSH connection with you terminal and run those commands.

  1. Clean the package and launch the Pipe application:
mvn clean package
mvn exec:java -Dexec.mainClass=myapps.Pipe
  1. Start a producer on the topic incoming_topic with the kafka-console-producer tool:
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh \
  --broker-list $KAFKA_BROKERS \
  --producer-property security.protocol=SASL_PLAINTEXT \
  --topic 'incoming_topic'
  1. Start a consumer on the topic outcoming_topic with the kafka-console-consumer tool:
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh \
  --bootstrap-server $KAFKA_BROKERS \
  --consumer-property security.protocol=SASL_PLAINTEXT \
  --topic 'outcoming_topic' \
  --from-beginning

All messages created by the Kafka producer are sent to the Kafka consumer. We do not change the messages in the given example but you could add some manipulations if needed. Have a look at Kafka Streams Tutorial to find more information.

What to learn next?