Adaltas Cloud Academy
Sign out >

Spark Structured Streaming

In this tutorial you will:

  • Discover the Spark Structured Streaming API
  • Cover its advantages
  • Compare its usage with Spark Streaming
  • Create an application for stream processing
  • Consume a Kafka topic in spark and produce message into another Kafka topic

A prerequisite to execute the following tutorial is to be connected to the edge-1 node via SSH.

We will use Kafka cluster consisting of 3 nodes (kfk-brk-1.au.adaltas.cloud, kfk-brk-2.au.adaltas.cloud and kfk-brk-3.au.adaltas.cloud) all listening on port 6667 and ZooKeeper quorum (zoo-1.au.adaltas.cloud:2181,zoo-2.au.adaltas.cloud:2181,kfk-brk-3.au.adaltas.cloud:2181/kafka). Please, read Kafka basics before.

Note: We will need Kerberos authentication, so make sure you have a valid Kerberos ticket before continuing (see here for details).

When to choose Spark Structured Streaming

Spark Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL Engine. Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

Note: Structured Streaming should not be confused with Spark Streaming, it’s predecessor.

Alternatives for Spark Structured Streaming are Kafka Streams, Apache Flink, Apache Storm, Apache Samza, Apache Flume, Amazon Kinesis Streams.

Spark Structured Streaming provides high reliability, resiliency and throughput, it is widely used for highly load systems. It should be chosen if:

  • Latency is not the most critical for your project
  • Manipulations with data could be quite sophisticated. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
  • Input and/or output data could be in different sources. Data can be ingested from many sources like Kafka, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards.
  • You need use Python or R, besides Scala and Java as programming language

Structured Streaming vs. Spark Streaming

Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Each batch represents an Resilient Distributed Dataset (RDD).

The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Thus Structured Streaming uses DataFrames which are more optimized in terms of processing and provide more options for aggregations and other operations with a variety of functions available (in Spark 2.4 new functionality was added).

You could find more detail comparison of Spark Streaming and Structured Streaming here.

Tutorial

Writing a Structured Streaming Application

Our application will take messages from one topic and send them to another one without changing. The Python code to create and exectute the pipeline is written in a style called a fluent interface or API which chains multiple methods.

We need to determine Streams execution configuration values:

  • kafka.bootstrap.servers shows the brokers in our Kafka cluster
  • The value of kafka.security.protocol parameter, in our case is SASL_PLAINTEXT, is the protocol used to communicate with brokers
  • Using Kerberos authentication is determined by kafka.sasl.kerberos.service.name
  • Incoming sink should be received from Kafka producer with incoming_topic
  • Outcoming sink will be send to outcoming_topic
  • In order to send the data to Kafka we need also to specify a directory for checkpoint as checkpointLocation. In our case it is a HDFS directory.

Note: The topics for incoming and outcoming sinks should be created before the application starts. Have a look on Kafka basics to find out how to create them.

Note: Checkpoint directory should be empty before the start. You could clean it by command:

hdfs dfs -rm -r `checkpointLocation/*` 

If the directory does not exist, it is not a problem, it will be created by Spark. Have a look at the HDFS tutorial to find out how to work with directories and file in HDFS.

Reading from Kafka: message schema

The messages from Kafka have predefined schema:

Column Type
key binary
value binary
topic string
partition int
offset long
timestamp timestamp
timestampType int
headers (optional) array

In this tutorial we work with ‘key’, ‘value’, ‘topic’ and ‘timestamp’ fields. Since ‘key’ and ‘value’ are binary think about using CAST function on them.

Writing data to Kafka

Writing messages to Kafka does not require determing all message fields manually. In fact ‘value’ field is enough if ‘topic’ is defined in the configuration.

Column Type
key (optional) string or binary
value (required) string or binary
headers (optional) array
topic (*optional) string
partition (optional) int

Note: Topic column is not required if “topic” configuration option is specified, as in the code below.

On the edge node or any machine with the Spark client installed and configured, create a file structured_stream_example.py with the content:

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col

checkpointPath = "hdfs://hdfs-nn-1.au.adaltas.cloud/user/{username}/checkpointLocation"
brokers = "kfk-brk-1.au.adaltas.cloud:6667,kfk-brk-2.au.adaltas.cloud:6667,kfk-brk-3.au.adaltas.cloud:6667"
query = (
  # Initialize a new Spark session
  SparkSession
  .builder
  .appName("Structured Streaming from Kafka")
  .getOrCreate()
  # Define the input stream 
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("kafka.security.protocol", "SASL_PLAINTEXT")
  .option("kafka.sasl.kerberos.service.name", "kafka")
  .option("subscribe", "incoming_topic")
  .load()
  ### Start of the block
  # Consume the data stream
  .selectExpr(
    "CAST(key AS string) AS key",
    # Data stransformation in SQL
    "UPPER(CAST(value AS string)) AS value")
  # Filtering data 
  .filter(col("value").rlike('([A-Z])+'))
  # Spliting messages by word 
  .withColumn("value", explode(split(col("value"), " ")))
  ### End of the block
  # Define the output stream
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("kafka.security.protocol", "SASL_PLAINTEXT")
  .option("kafka.sasl.kerberos.service.name", "kafka")
  .option("topic", "outcoming_topic")
  .option("checkpointLocation", checkpointPath)
  .start()
)
# Launch query
query.awaitTermination()

The code above allows to receive the messages from Kafka topic, transform them to uppercase, filter out the messages without letters, split the messages by space symbol and send the result to another Kafka topic.

For instance, the group of 3 messages like this:

France italy Allemange
,,,
russie, UKRAINE

will give us the result:

FRANCE
ITALY
ALLEMAGNE
RUSSIE,
UKRAINE

Before launching we need to provide the authentication of our application.

JAAS Configuration

Since the cluster is protected with Kerberos all consumers need to have tickets. We will configure the Java Authentication and Authorization Service (JAAS), special Java package which is used by Kafka. We will fix the configuration for Kerberos authentication in jaas.conf file:

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=true;
};

The file with JAAS configuration could be placed in you home directory. We will send it to Spark engine.

Application Launch with Spark-submit

Now, when we prepared the code and security configuration we could launch our application with the Spark engine.

spark-submit \
  --master local \
  --driver-memory 1g \
  --num-executors 2 \
  --executor-memory 1g \
  --driver-java-options "-Djava.security.auth.login.config=/home/{username}/jaas.conf" \
  --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/{username}/jaas.conf \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 \
  structured_stream_example.py

You could find information about first 4 parameters in the Spark Basics, we do not discuss them here.

The parameters which help us to provide the safe connection between Kafka and Spark are:

  • driver-java-options shows the JAAS configuration file to Spark driver
  • spark.executor.extraJavaOptions shows the JAAS file to Spark executors
  • the package spark-sql-kafka allows to Spark work with Kafka

After execution of the command above we will have the program which is ready to consume and produce streaming data.

How to check?

Now, we could have a look how it works. You could find out how to create topics, start producer and consumer in Kafka basics.

  1. Launch the python program with spark-submit mechanism how it is described

in the previous section 2. Start a Kafka producer on the topic incoming_topic with the kafka-console-producer tool

brokers="kfk-brk-1.au.adaltas.cloud:6667,kfk-brk-2.au.adaltas.cloud:6667,kfk-brk-3.au.adaltas.cloud:6667"
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh \
  --broker-list $brokers \
  --producer-property security.protocol=SASL_PLAINTEXT \
  --topic incoming_topic
  1. Start a Kafka consumer on the topic outcoming_topic
brokers="kfk-brk-1.au.adaltas.cloud:6667,kfk-brk-2.au.adaltas.cloud:6667,kfk-brk-3.au.adaltas.cloud:6667"
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh \
  --bootstrap-server $brokers \
  --consumer-property security.protocol=SASL_PLAINTEXT \
  --topic outcoming_topic \
  --from-beginning

If everything is well configured you will see the messages you send in producer appearing in the consumer filtered, splitted and in uppercase.

Work with group of messages

We use use the same code and only change it in two places:

  1. Add the import of the functions window, count, concat and lit from pyspark.sql.functions
  2. Replace the block of code between two comments ### Start of the block and ### End of the block with the next code:
  ### Start of the block
  # Determine the period of the actuality
  .withWatermark("timestamp", "2 minutes")
  # Group messages by time intervals
  .groupBy(window("timestamp", "2 minutes", "1 minutes"))
  # Count the received messages by time window
  .agg(count("value").cast("string").alias("count_messages"))
  # Prepare nice message for output sink
  .withColumn(
    "value",
    concat(
      lit("From "),
      col("window.start"),
      lit(" to "),
      col("window.end"),
      lit(" received "),
      col("count_messages"),
      lit(" messages")))
  ### End of the block

Being launched new application will count how many messages were received during the time window of 2 minutes and send a message with the result to the outcoming_topic. Function withWatermark says how long we keep data. In our example the engine keeps all messages as minimum 2 minutes before to drop them. Function window is configurated to use the field timestamp from the incoming message and to start a new window every minute. Window’s size is 2 minutes.

Imagine we received sometthing like this from incoming_topic:

France Italy
1 1
,,,
russie

In the Kafka consumer it will be displayed like:

From 2021-05-07 12:31:00 to 2021-05-07 12:33:00 received 4 messages
From 2021-05-07 12:32:00 to 2021-05-07 12:34:00 received 2 messages

The capabilities of Spark Structured Streaming are not limited to the examples listed above. It is the powerful instrument which allows to use SQL, Python, R, and Scala and to make complex transformations on streaming data.

What to learn next?