Advanced Spark Structured Streaming - Aggregations, Joins, Checkpointing

Dorian Beganovic Spark

In this post we are going to build a system that ingests real time data from Twitter, packages it as JSON objects and sends it through a Kafka Producer to a Kafka Cluster. A Spark Streaming application will then parse those tweets in JSON format and perform various transformations on them including filtering, aggregations and joins. A table in a Snowflake database will then get updated with each result.

Delivering real-time data using Kafka

To simplify the data ingestion pipeline we will re-use the setup from a previous post that showed you how to stream tweets into a Snowflake data warehouse using Spark Structured Streaming and Apache Kafka.

This is the Python code which produces the data Spark will ingest:

In case you want a more detailed explanation of exactly how this works, please check out this related blog post in which we explained this set up in more details

Spark Structured Streaming

Introduction

Structured Streaming is the new streaming model of the Apache Spark framework. It was inspired by Google open sourcing it’s Cloud Dataflow SDK as the open source project Apache Beam. The Dataflow Model, invented by Google, says that you should not have to reason about streaming, but rather use a single API for both streaming and batch operations. It allows you to write batch queries on your streaming data.

You can read a more detailed introduction to Spark Streaming Architecture in this post.

Goal

The goal of the streaming architecture is to present a realtime view with how many tweets about iPhones were made in each language region during the day.

Spark configuration

In the following code snippets we will be using Scala with Apache Spark 2.2.0.

To start the application we are going to define the usual spark entry point that gives us both Spark SQL and Spark Structured Streaming functionality:

Defining the data source

Next we are going to define the data input for our Streaming applicatioin. We will ingest data by reading latest messages from the “data-tweets” topic on the Kafka broker we previously developed.

This is the schema of incoming data from Kafka:

Watermarking

Firstly we will introduce a Watermark in order to handle late arriving data. Spark’s engine automatically tracks the current event time and can filter out incoming messages if they are older than time T. In our use case we want to filter out messages that just arrived but are more than 1 day old. We can do that with the following code:

Parsing JSON data

Since the incoming tweets are packaged inside of the “value” column (of the schema shown above), we have to first extract them out of that column and then parse the JSON content into a table with a schema containing the following columns that describe a tweet:

The following code will first cast the “value” column (originally of byte type) into a string.

Then the from_json() function parses the JSON string into a table with columns based on the schema that we just defined above.

Lastly we use selectExpr() function to do a few datatype casts.

Now our JSON tweets are nicely formatted as rows in a dataframe that we can query.

Transforming incoming data

We will first filter out the rows whose authors have less than five retweets. We are doing this in order to remove less influential or bot accounts from our consideration.

Next we are going to group the tweets by their language code (this is the best proxy for a location of a tweet) and then calculate the count of unique id’s and the sum of times each tweet was favorited.

Now the outcoming data looks as following:

Unfortunately language codes are not very informative so we will join this table with a table containing ISO language codes associated with a human readable description. The table is sitting on a local machine and will be read using the batch oriented Spark SQL commands.

To make the transformation logic a bit more clear, I will rewrite all of the transformations we just applied into a single code block:

Analyzing the execution plan

Now it would be interesting to analyze the execution plan of this query to gain a better understanding of how the query is executed and how the computations are optimized.

In the execution plan we can observe that the query starts by scanning new for new data (in the form of RDDs).

It then filters the data older than 1 days based on the Kafka message timestamp column that captures when the message was made.

The data is then filtered based on “retweet_count > 5” condition we set.

After that Spark will materialize the JSON data as a new dataframe.

The sum and count aggregates are theb performed on partial data - only the new data.

The Spark Streaming engine stores the state of aggregates (in this case the last sum/count value) after each query in memory or on disk when checkpointing is enabled.

This allows it to merge the value of aggregate functions computed on the partial (new) data with the value of the same aggregate functions computed on previous (old) data.

After the new states are computed, they are checkpointed and only then the join is performed with the table containing language codes.

We can see that overall this is a very efficient execution plan as the engine tries to operate on as little data as possible.

Exporting data into Snowflake

In the next steps we will define the output of our streaming pipeline and export it into a Snowflake database.

Spark allows us to use three output modes:

  • Append mode - Only the new rows added to the Result Table since the last trigger will be outputted to the sink
  • Complete Mode - The whole Result Table will be outputted to the sink after every trigger (only supported for aggregate queries)
  • Update Mode - Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink

We are going to use the Update mode to export only the rows that changed in the result of aggregations.

It is also important that we define a trigger which determines how often the streaming pipeline will run. For this use case we will use a trigger of 10 seconds in order to run the pipeline every 10 seconds.

The following code defines what we mentioned:

Upserting data to a JDBC sink

The writer variable which we used with foreach() method represents a custom made JDBC sink. In a previous post we covered in more detail how to create custom JDBC sinks. Essentially the foreach() function processes every row in the Result Table with the logic we define with process() function of a JDBC sink.

The process function (located in the JDBC sink class) is in our case applied to every updated row in the Result Table.

The code below shows the exact implementation:

You can observe that we are essentially performing an upsert. If the row already exists, we will update it (and Spark’s engine guarantees it to have changed) by deleting and then inserting it. If it was not already present in the Result Table, we will simply insert it.

In this way we can capture the same state in Snowflake as is kept in Spark’s Streaming model.

Checkpointing

The Checkpoint option we defined for our streaming application will make sure that our streaming application can easily resume it’s operations without any loss of state while still maintaining the proper offsets for Kafka messages. This is done using a Write ahead log.

We can actually explore the contents of the checkpoint folder:

The Checkpoint directory contains folders which are populated with data required to restore the current state after every query is ran.

In the offsets folder the last offsets of messages we read from Kafka are written. In the state folder the latest state of aggregate counters is caputred. The information about source of data stream can be found in sources folder and the commits we made are noted in the commits folder.

In case we turn off our streaming application for 8 hours (while new tweets are being produced and stored in the Kafka broker) our application could easily be restarted and no information would be lost.

Querying the data in Snowflake

The resulting data can now be easily queried in Snowflake using a simple query:

You can observe the result in Snowflake’s SQL Worksheet: