Streaming Tweets to Snowflake Data Warehouse with Spark Structured Streaming and Kafka

November 20, 2017

Streaming Tweets to Snowflake Data Warehouse with Spark Structured Streaming and Kafka

Streaming architecture

In this post we will build a system that ingests real time data from Twitter, packages it as JSON objects and sends it through a Kafka Producer to a Kafka Cluster. A Spark Streaming application will then consume those tweets in JSON format and stream them as mini batches into the Snowflake data warehouse. We will then show how easy it is to query JSON data in Snowflake. If you have XML data make sure to look at our other post where we load and query XML data to Snowflake with Flexter, our enterprise XML converter.

About Snowflake

Snowflake is a analytic database provided as Software-as-a-Service (SaaS) that runs completely on Amazon Web Services (AWS). It is built for analytical queries of any size by utilizing the shared-disk architecture and columnar storage. It’s architecture separates compute from storage allowing you to elastically scale resources.
In Snowflake you do not pay for your stored data but you do pay for the compute engine. The compute engine has to be turned on only when you are interacting with your data. This means that you can turn off the compute engine in the hours that you are not using your Snowflake data warehouse. You can also easily scale the compute resources that are used by Snowflake. For example you can use four compute nodes for your once a month intensive queries and use two nodes for your daily ETL processes.
Snowflake also provides an elegant interface for running queries on semi-structured data such as JSON data which we will utilize in this post.

Preparing the Snowflake Database

The goal of this system is to stream real-time tweets in JSON format into our Snowflake data warehouse. We will start off by creating a simple table in Snowflake that will store our tweets in JSON format. Snowflake supports various semi-structured data format (JSON, Avro, ORC, Parquet, or XML). We are going to store data in JSON format as a special VARIANT data type.

In the following parts we will build a data pipeline to populate this table with real-time tweets.

Configuring Kafka

To use Kafka for message transmission we will need to start a Kafka Broker. A single Kafka Cluster can consist of multiple Kafka Brokers where each broker takes a part of the message transmission load. To start a Kafka Broker we have to first start Zookeeper (which is used for coordination) and then the Kafka Broker. We can do that with the following two command line statements:

Now that the Kafka Broker is running on our machine it can be accessed on the local host address at port 9092.

Ingesting real time data from Twitter

We will use Twitter’s Developer API to search for patterns in the text of all public tweets. Specifically, we are going to search for tweets containing the keyword “ESPN”, but you can search for any pattern.
In Python we can easily access the Twitter Developer API using the tweepy library:

Using the kafka-python library we are going to define a Kafka Producer that sends messages to the appropriate topic of our Kafka Broker:

Since we did not define the topic “data-tweets” beforehand, it will automatically be created by the producer.
We can get the result of a search on tweets using:

The result contains almost 50 fields in JSON format but we want to focus only on some important fields of a tweet. Here is a Python list containing only the relevant attributes of a tweet:

In the next step we will define a function that:

  • Queries the Twitter API for our search pattern
  • Extracts the important fields from the original tweet
  • “Cleans” the text attribute of a tweet because they contain characters such as: ‘ , “ and \n which Snowflakes struggles with when parsing JSON tweets (this is currently a small limitation)
  • Transforms the cleaned tweet into a JSON object and sends it to the Kafka Broker in binary format

Lastly we will define a function that will call the ‘get_twitter_data()’ function to ingest data and run indefinitely:

Intro to Spark Structured Streaming

Structured Streaming is the new streaming model of the Apache Spark framework. The main idea is that you should not have to reason about streaming, but rather use a single API for both streaming and batch operations. Thus it allows you to write batch queries on your streaming data.
It was inspired by The Dataflow Model invented by Google and Google open sourcing it’s Cloud Dataflow SDK as the open source project Apache Beam. Structured Streaming also finally adds the support for end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. This is a very important feature also previously implemented by streaming engines Apache Flink and Apache Beam.
In Spark 2.2.0 (which we are using for this blog post) Structured Streaming was released in General Availability mode and will soon replace the old streaming engine that was based on Resilient Distributed Datasets.
The new model treats streaming data as unbounded dataframe to which data is appended. This model is comprised of several components:

  1. Input – defines the ever increasing dataframe which a data source populates
  2. Trigger – defines the time between checking for new Input and then running the Query on the new data
  3. Query – implements operations on input data such as select, group by, filter…
  4. Result – the result of running the Query on Input
  5. Output – defines what part of Result to write to the sink

The following picture made by Databricks also graphically summarizes the above mentioned parts of a Structured Streaming architecture:

The picture focuses on a non-aggregation query and using the Append mode as deifinition of Output. Please consult the Spark Structured Streaming guide for a better overview of the features and in-depth explanation of how the Query and Output interact.

Using Spark Structured Streaming to deliver data to Snowflake

We are going to use Scala for the following Apache Spark examples. We have to start off by defining the usual Spark SQL entry point:

Then we will define a stream that reads data from the “data-tweets” topic of our Kafka Broker:

With Spark we can easily explore the schema of incoming data:

And we get the following result:

Our tweets are stored only in the “value” field. The other fields are related to Kafka messaging protocol. Our goal is to extract only the “value” column, encode it as a string and export it to Snowflake using a JDBC connection.
Using the Spark Structured Streaming API we can implement our Structured Streaming model by:

  1. Using the (above defined) “data_stream” as source of data
  2. Running a simple select query every minute
  3. Implementing a select query that casts the binary message as a string
  4. Using the append output mode to only output the newest data in the result set to Snowflake.

The above mentioned variable “writer” will represent the Snowflake database ie. a JDBC sink. Since JSON data has to be first parsed by Snowflake’s engine we will write have to write a custom JDBC sink that utilizes Snowflake’s JDBC connector and specific function “parse_json()” for parsing JSON strings into VARIANT data type. Snowflake also supports other methods for loading data which are better performing but only suitable for batch data loading.

As you can see Spark allows simple creation of custom output sinks. The only complex part was using the “parse_json()” function to ingest the JSON data as VARIANT type.The above mentioned variable “writer” will represent the Snowflake database ie. a JDBC sink. Since JSON data has to be first parsed by Snowflake’s engine we will write have to write a custom JDBC sink that utilizes Snowflake’s JDBC connector and specific function “parse_json()” for parsing JSON strings into VARIANT data type. Snowflake also supports other methods for loading data which are better performing but only suitable for batch data loading.
In our code we will instantiate the previously mentioned “writer” JDBC sink by simply using the class we just created:

The whole Spark Streaming pipeline looks as following:

Querying the data in Snowflake

After our data is in Snowflake we can easily query it within the web interface after provisioning a warehouse (compute resources).
First we will run a select query to check the contents of our “Tweets” table:

Lastly we will run a query that only extracts specific JSON attributes of a tweet and filters the tweets based on number of retweets:

In this post we have shown you how easy it is to load and query JSON in Snowflake. Also make sure to look at our other post where we load and query XML data to Snowflake with Flexter, our enterprise XML converter.
Enjoyed this post? Have a look at the other posts on our blog.
Contact us for Snowflake professional services.
We created the content in partnership with Snowflake.