Supersonic subatomic Java meets event streams

Balakrishnan
9 min readMay 8, 2020
Photo by Pablo Heimplatz on Unsplash

Recently, I participated in a hackfest, where I had the privilege to build a real-time event streams processing application with supersonic subatomic Java — Quarkus. The idea was to build a solution that analyses the hashtags from the user tweets in Twitter over the last 10 seconds time window (for a given search term) and find the popularity of the hashtags in real time based on the Twitter stream.

It was fun building this application leveraging some of the awesome Quarkus extensions and this article is aimed at providing a hands-on view of the key elements involved in building such an application. As always, happy to share the details of this application so that others could try it and potentially expand upon it. Here’s the full source code for the application and you may give it a spin by following the details outlined below.

Solution Overview

Before diving deep, it’s important to visualize what’s required to render the solution for this use-case. At a high level, three core components would be required to address this requirement — a tweet poller, a tweet aggregator and a dashboard.

  1. tweet-poller - An application that connects to Twitter and gets the latest set of tweets for a given search term. In addition, it should introspect the tweets for hashtags and extract them from the tweets and push the hashtags to a data streaming destination like a Kafka topic.
  2. tweet-aggregator - A component that pulls the hashtags provided by the tweet-poller (from a Kafka topic) and does the following — applies a tumbling window of 10 seconds on the tweets stream, calculates the popularity of the hashtags based on their count/number of occurrences with in a given time window (10 s), creates a weight for each hashtag that’s available within the 10 s time window and feeds the hashtag vs weight pair to another data streaming destination — say another Kafka topic.
  3. hashtag-dashboard - A web application that picks up the hashtag vs popularity map for every tumbling window from a Kafka topic and renders the output into a dynamic word tag cloud.

Putting the pieces together, the overall solution component model looks like this:

Quarkus microservices interplay with Kakfa cluster

Looking at the available set of extensions at Quarkus application template generator, it’s evident that the following set of key extensions could be leveraged to build this.

  • Quarkus Camel Twitter extension for the tweet-poller component
  • Quarkus Reactive Messaging and Kafka Streams extensions for tweet-aggregator
  • Quarkus RESTEasy extension for pushing the aggregated hashtag vs popularity values to browser clients / web page automatically using server sent events

Key elements of the solution

First things first. In order to get a better understanding of the use-case, its important to understand what constitutes a tumbling window.

Tumbling windows are time intervals defined to group events for aggregation. Typically, they are contiguous and of equal duration and here’s an illustration that shows tumbling windows of size 15 seconds overlaid on an event stream.

Image credits: Tumbling window of size 15 seconds

Effectively, the brown dots in the above image represents the tweets that have been tweeted between 12:00 hours to 12:01 hours across 15 seconds moving windows. Using the tweets available in the tumbling windows, the next step is to

  • get all the tweets from Twitter (that matches the given search term) for every 10 s
  • introspect each of the tweets in the 10 s window and extract the hashtags
  • do a count of the hashtags in each 10 s window
  • show the hashtag along with its weight in the form of a tag cloud

The following sections details out how this idea has been implemented.

Twitter Feed Reader

The tweet-poller application relies on the Quarkus Camel Twitter extension. This extension provides Apache Camel based template abstractions for reading and posting to Twitter timelines. It also provides features for searching tweets based on a given search term. However, in order to use this extension, a Twitter API application must be created first at developer.twitter.com.

Once created, the access token + secret and consumer key + secret pair will get generated for the Twitter application. These key + secret pair are to be set in the application.properties (as shown below) for the extension to work.

Configuration for Quarkus Camel Twitter extension in application.properties

Now, using the Camel Twitter template, the twitter feeds for a given search term can be obtained. The CamelResource.java contains an implementation for this. It exposes a REST API through which the required search query and the sample size can be specified while initiating the tweet polling. Once the tweets have been pulled from Twitter, the hashtags can be extracted from the tweets by using a simple regex and it can be directly sent to the destination Kafka topic twitter-feeds using the Quarkus implementation of Channel and Emitter APIs defined by Eclipse Microprofile.

Tweet poller implementation using Quarkus Camel Twitter Extension

The connection configuration for the destination Kafka topic to which the hashtags are to be sent is configured in the same application.properties.

Connection configuration for Kafka topic in application.properties

Additionally, a custom serializer (FeedSerializer.java) has been implemented since the extracted hashtags are to be sent to the Kafka topic as an object (defined by a class called Feed.java) containing the hashtag along with the timestamp at which the hashtag was mentioned in the tweet. The tweet timestamp plays an important role to determine the tumbling window block to which a hashtag belongs to.

FeedSerializer class

More details on how the timestamp is used for classifying the tweets in a tumbling window is covered in the next sections.

A Custom Serdes

With the hashtag timestamp pair available on the twitter-feeds Kafka topic, the next step is to use the Quarkus Kafka Streams API.

FeedReader class

This application tweet-aggregator contains the fun bit in the whole implementation and so let’s take a closer look at its implementation in the class FeedReader.java.

  • First, the buildTopology method builds the Kafka Streams topology, by first getting the stream KStream<String, Feed> from the Kafka topic twitter-feeds (using a custom SerDes - FeedSeder.java) on an instance of StreamsBuilder.
  • Then, a new KeyValue instance is created for each message using map() — with the hashtag as the new key. This is done to ensure that each message a valid and to use a well known key to operate upon.
  • Next, the events are grouped by the groupByKey() method and a tumbling window of 10 s is defined by calling windowedBy(TimeWindows.of(Duration.ofMinutes(10))).
  • Then, the count() method is called to direct the topology to count the grouped events that occur within each tumbling window. This defines the weight or popularity of each hashtag in the window.
  • At this point in the topology, the result is a KTable<Windowed<String>, Long>.
  • On top of the KTable, the map() method is used to turn it into two easy-to-read strings (for showing it in the tag cloud in a web application).
  • Then, the result is emitted to the output topic agg-feeds using the to() method.

One might be wondering as to how each 10 s tumbling window in Kafka Streams understands the tweet timestamp/event occurrence time for each of the incoming messages in the stream. Recall that in the tweet-poller application, for each extracted hashtag, the tweet timestamp detail was added and sent as a Feed object to the Kafka topic. So, by providing an implementation of the timestamp extractor, Kafka Streams will use the event time to calculate the window. Here’s the implementation of the time stamp extractor used for this that would be used to extract the timestamp for each message in the stream.

FeedTimeStamp extractor

This time stamp extractor and the aforementioned SerDes implementation can be hooked on to the Kafka Streams by mapping the relevant property keys in the application.properties.

Connection configuration for Kafka Streams in application.properties

Server Sent Events

With the hashtag vs popularity readily available in the agg-feeds topic, the next step is to push these pairs to a web application from the tag-dashboard application. In order to do that, Server-sent events can be used — which is a standard that allows browser clients to receive a stream of updates from the server side over a HTTP connection without resorting to polling from the web UI side. It is imperative to note that, unlike WebSockets, server-sent events are a one way communications channel in which events flow from server to client only.

The Quarkus RESTEasy extension supports server-sent events and the implementation in the class WeightedHashTag.java relies on that.

WeightedHashTag class

In order to allow multiple web clients to be able to receive the server sent events, the implementation in WeightedHashTag.java exposes a REST API and uses an in-memory event stream named hash-stream instead directly using the Kafka stream agg-feeds. The service class that makes this possible is HashTagReaderService.java

HashTagReader class

Dynamic Tag Cloud

A simple web application to show the tag cloud has been built by leveraging the JQWCloud jQuery API and the same has been included in the results.html of the tag-dashboard application. Shown below is the JavaScript function that receives the server-sent events provided the aforementioned REST API and does a de-duplication of the incoming hashtags (to avoid duplicate tags from being shown in the tag cloud) and renders the window specific tag cloud.

JavaScript function to invoke server sent events and render the output in tag cloud

Setting it up

To better understand the implementation details of these Quarkus microservices or to further extend it, it is recommended to have these packages set up on your machine.

Getting the applications

First, clone the git repository to a suitable location on your machine.

$ git clone https://github.com/bbalakriz/twitter-popular-hashtag-analysis.git

You may open up these applications in VSCode to do a code deep dive by running:

$ code ./tweet-poller
$ code ./tweet-aggregator
$ code ./tag-dashboard

Starting the required containers

These 3 applications are dependent upon Kafka and ZooKeeper and both of them would be required, if in case you’d like to run these applications locally on your machine. To set up and run these dependent components quickly, I’ve included a docker-compose configuration in the application repository. Run the command given below in a terminal to get these containerized components up and running on your machine.

$ docker-compose -f twitter-stream-app.yaml up -d

To verify if Kafka and ZooKeeper containers are up and running, run the command docker ps -a on a terminal and you should see that the status of the containers as up.

Running the applications

To run these applications locally in your machine, execute the following commands in VSCode terminal or on a bash terminal where you’ve cloned the repository.

$ cd tweet-aggregator && mvn compile quarkus:dev
$ cd ../tweet-poller && mvn compile quarkus:dev
$ cd ../tag-dashboard && mvn compile quarkus:dev

Once done, the application can be kick started by providing a search term for which the relevant tweets had be obtained from Twitter and processed and later shown in the tag cloud for every 10 s tumbling window. The easiest way to do that is to run the following in a bash terminal.

$ export URL="http://localhost:8090/twitter/timeline?topic=covid&sample=20" 
$ while sleep 5; do curl $URL ; done
  • Note: In the URL above, I’ve given the search term as “covid” and the sample size as “20”. You can change these values as you wish.

Then, the tag cloud can be accessed from http://localhost:8080/results.html in your browser and it will bring up a dynamic tag cloud that will automatically get updated with latest hashtags for the given search term for every 10 s tumbling window.

Quarkus & event streams processing in action

This application can be run on OpenShift (Red Hat’s distro of Kubernetes) very easily by following the steps listed below. However, for this application to work on OpenShift, a Kafka cluster should be running on it and its connection configuration needs to be updated accordingly in each of the application.properties contained in the components tweet-poller, tweet-aggregator and tag-dashboard.

In each of the three applications’ project location, run the following commands by replacing the <<project-name>> with suitable names:

$ mvn clean compile package -DuberJar=true
$ oc new-build registry.access.redhat.com/redhat-openjdk-18/openjdk18-openshift:1.6 --binary --name=<<project-name>>
$ oc start-build <<project-name>> --from-file ./target/*-runner.jar --follow
$ oc new-app <<project-name>>
$ oc expose svc/<<project-name>>

Hope you enjoyed reading this.

Credits to Jon and Craig for contributing certain parts of this solution.

--

--