Spark Structured Streaming
In this tutorial you will:
- Discover the Spark Structured Streaming API
- Cover its advantages
- Compare its usage with Spark Streaming
- Create an application for stream processing
- Consume a Kafka topic in spark and produce message into another Kafka topic
A prerequisite to execute the following tutorial is to be connected to the edge-1
node via SSH.
We will use Kafka cluster consisting of 3 nodes (kfk-brk-1.au.adaltas.cloud, kfk-brk-2.au.adaltas.cloud and kfk-brk-3.au.adaltas.cloud) all listening on port 6667 and ZooKeeper quorum (zoo-1.au.adaltas.cloud:2181,zoo-2.au.adaltas.cloud:2181,kfk-brk-3.au.adaltas.cloud:2181/kafka). Please, read Kafka basics before.
Note: We will need Kerberos authentication, so make sure you have a valid Kerberos ticket before continuing (see here for details).
When to choose Spark Structured Streaming
Spark Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL Engine. Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
Note: Structured Streaming should not be confused with Spark Streaming, it’s predecessor.
Alternatives for Spark Structured Streaming are Kafka Streams, Apache Flink, Apache Storm, Apache Samza, Apache Flume, Amazon Kinesis Streams.
Spark Structured Streaming provides high reliability, resiliency and throughput, it is widely used for highly load systems. It should be chosen if:
- Latency is not the most critical for your project
- Manipulations with data could be quite sophisticated. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
- Input and/or output data could be in different sources. Data can be ingested from many sources like Kafka, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards.
- You need use Python or R, besides Scala and Java as programming language
Structured Streaming vs. Spark Streaming
Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Each batch represents an Resilient Distributed Dataset (RDD).
The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Thus Structured Streaming uses DataFrames which are more optimized in terms of processing and provide more options for aggregations and other operations with a variety of functions available (in Spark 2.4 new functionality was added).
You could find more detail comparison of Spark Streaming and Structured Streaming here.
Tutorial
Writing a Structured Streaming Application
Our application will take messages from one topic and send them to another one without changing. The Python code to create and exectute the pipeline is written in a style called a fluent interface or API which chains multiple methods.
We need to determine Streams execution configuration values:
kafka.bootstrap.servers
shows the brokers in our Kafka cluster- The value of
kafka.security.protocol
parameter, in our case isSASL_PLAINTEXT
, is the protocol used to communicate with brokers - Using Kerberos authentication is determined by
kafka.sasl.kerberos.service.name
- Incoming sink should be received from Kafka producer with
incoming_topic
- Outcoming sink will be send to
outcoming_topic
- In order to send the data to Kafka we need also to specify a directory for checkpoint as
checkpointLocation
. In our case it is a HDFS directory.
Note: The topics for incoming and outcoming sinks should be created before the application starts. Have a look on Kafka basics to find out how to create them.
Note: Checkpoint directory should be empty before the start. You could clean it by command:
hdfs dfs -rm -r `checkpointLocation/*`
If the directory does not exist, it is not a problem, it will be created by Spark. Have a look at the HDFS tutorial to find out how to work with directories and file in HDFS.
Reading from Kafka: message schema
The messages from Kafka have predefined schema:
Column | Type |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | timestamp |
timestampType | int |
headers (optional) | array |
In this tutorial we work with ‘key’, ‘value’, ‘topic’ and ‘timestamp’ fields. Since ‘key’ and ‘value’ are binary think about using CAST
function on them.
Writing data to Kafka
Writing messages to Kafka does not require determing all message fields manually. In fact ‘value’ field is enough if ‘topic’ is defined in the configuration.
Column | Type |
---|---|
key (optional) | string or binary |
value (required) | string or binary |
headers (optional) | array |
topic (*optional) | string |
partition (optional) | int |
Note: Topic column is not required if “topic” configuration option is specified, as in the code below.
On the edge node or any machine with the Spark client installed and configured, create a file structured_stream_example.py
with the content:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col
checkpointPath = "hdfs://hdfs-nn-1.au.adaltas.cloud/user/{username}/checkpointLocation"
brokers = "kfk-brk-1.au.adaltas.cloud:6667,kfk-brk-2.au.adaltas.cloud:6667,kfk-brk-3.au.adaltas.cloud:6667"
query = (
# Initialize a new Spark session
SparkSession
.builder
.appName("Structured Streaming from Kafka")
.getOrCreate()
# Define the input stream
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("subscribe", "incoming_topic")
.load()
### Start of the block
# Consume the data stream
.selectExpr(
"CAST(key AS string) AS key",
# Data stransformation in SQL
"UPPER(CAST(value AS string)) AS value")
# Filtering data
.filter(col("value").rlike('([A-Z])+'))
# Spliting messages by word
.withColumn("value", explode(split(col("value"), " ")))
### End of the block
# Define the output stream
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("topic", "outcoming_topic")
.option("checkpointLocation", checkpointPath)
.start()
)
# Launch query
query.awaitTermination()
The code above allows to receive the messages from Kafka topic, transform them to uppercase, filter out the messages without letters, split the messages by space symbol and send the result to another Kafka topic.
For instance, the group of 3 messages like this:
France italy Allemange
,,,
russie, UKRAINE
will give us the result:
FRANCE
ITALY
ALLEMAGNE
RUSSIE,
UKRAINE
Before launching we need to provide the authentication of our application.
JAAS Configuration
Since the cluster is protected with Kerberos all consumers need to have tickets. We will configure the Java Authentication and Authorization Service (JAAS), special Java package which is used by Kafka. We will fix the configuration for Kerberos authentication in jaas.conf
file:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};
The file with JAAS configuration could be placed in you home directory. We will send it to Spark engine.
Application Launch with Spark-submit
Now, when we prepared the code and security configuration we could launch our application with the Spark engine.
spark-submit \
--master local \
--driver-memory 1g \
--num-executors 2 \
--executor-memory 1g \
--driver-java-options "-Djava.security.auth.login.config=/home/{username}/jaas.conf" \
--conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/{username}/jaas.conf \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 \
structured_stream_example.py
You could find information about first 4 parameters in the Spark Basics, we do not discuss them here.
The parameters which help us to provide the safe connection between Kafka and Spark are:
driver-java-options
shows the JAAS configuration file to Spark driverspark.executor.extraJavaOptions
shows the JAAS file to Spark executors- the package
spark-sql-kafka
allows to Spark work with Kafka
After execution of the command above we will have the program which is ready to consume and produce streaming data.
How to check?
Now, we could have a look how it works. You could find out how to create topics, start producer and consumer in Kafka basics.
- Launch the python program with
spark-submit
mechanism how it is described
in the previous section
2. Start a Kafka producer on the topic incoming_topic
with the kafka-console-producer
tool
brokers="kfk-brk-1.au.adaltas.cloud:6667,kfk-brk-2.au.adaltas.cloud:6667,kfk-brk-3.au.adaltas.cloud:6667"
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh \
--broker-list $brokers \
--producer-property security.protocol=SASL_PLAINTEXT \
--topic incoming_topic
- Start a Kafka consumer on the topic
outcoming_topic
brokers="kfk-brk-1.au.adaltas.cloud:6667,kfk-brk-2.au.adaltas.cloud:6667,kfk-brk-3.au.adaltas.cloud:6667"
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh \
--bootstrap-server $brokers \
--consumer-property security.protocol=SASL_PLAINTEXT \
--topic outcoming_topic \
--from-beginning
If everything is well configured you will see the messages you send in producer appearing in the consumer filtered, splitted and in uppercase.
Work with group of messages
We use use the same code and only change it in two places:
- Add the import of the functions
window
,count
,concat
andlit
frompyspark.sql.functions
- Replace the block of code between two comments
### Start of the block
and### End of the block
with the next code:
### Start of the block
# Determine the period of the actuality
.withWatermark("timestamp", "2 minutes")
# Group messages by time intervals
.groupBy(window("timestamp", "2 minutes", "1 minutes"))
# Count the received messages by time window
.agg(count("value").cast("string").alias("count_messages"))
# Prepare nice message for output sink
.withColumn(
"value",
concat(
lit("From "),
col("window.start"),
lit(" to "),
col("window.end"),
lit(" received "),
col("count_messages"),
lit(" messages")))
### End of the block
Being launched new application will count how many messages were received during the time window of 2 minutes and send a message with the result to the outcoming_topic
. Function withWatermark
says how long we keep data. In our example the engine keeps all messages as minimum 2 minutes before to drop them. Function window
is configurated to use the field timestamp
from the incoming message and to start a new window every minute. Window’s size is 2 minutes.
Imagine we received sometthing like this from incoming_topic
:
France Italy
1 1
,,,
russie
In the Kafka consumer it will be displayed like:
From 2021-05-07 12:31:00 to 2021-05-07 12:33:00 received 4 messages
From 2021-05-07 12:32:00 to 2021-05-07 12:34:00 received 2 messages
The capabilities of Spark Structured Streaming are not limited to the examples listed above. It is the powerful instrument which allows to use SQL, Python, R, and Scala and to make complex transformations on streaming data.