Realtime Word Count — Kafka Streams Series — 1

Shazin Sadakath
3 min readJan 14, 2025

--

As part of Kafka Streams Series I started, let’s have a look at how to develop a real time, scalable Word Counting application. This is equivalent to the Hello World demo for any other programming language. If you haven’t read the previous post with the introduction please go through it before continuing to get a better understanding.

Problem Statement

Input : A stream of words separated by spaces. Ex:- “Today is a good day”.

Output : A key/value list where key is the unique word and value is the number of occurrence for that word. Ex:- Today 1, is 1, a 1, good 1, day 1.

Implementation

Before we implement this we need to identify and create a flow diagram which is in the form of a Directed Acyclic Graph (DAG) which shows from Source to Sink. Also this will show the operations we need to perform to get the results we want.

DAG for Word Count Kafka Stream Application

Now let’s go through the DAG and see how we can implement this.

  1. We need to create a KStream from the input Kafka topic in this case “sentences” where each sentence will be as a single String separated by space. Ex:- “Today is a great day”
  2. The next step is to create another KStream from the source which will have each word from the sentences. For this we use flatMapValues which is similar to Java 8 Stream flatMap.
  3. Then to calculate the counts of each words we need to prepare the words in a grouped manner. The key for the group will be each word.
  4. After that we need to count the number of occurrences for each word.
  5. Finally we need to convert the KTable into a KStream and print the elements in the topic.

The code to implement this logic is as following;

StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> KS0 = streamsBuilder.stream("sentences"); // 1.

KStream<String, String> KS1 = KS0.flatMapValues((k, v) ->
Arrays.asList(v.split(" "))); // 2.
KGroupedStream<String, String> KGS0 = KS1.groupBy((k,v) -> v); // 3.
KTable<String, Long> KT0 = KGS0.count(); // 4.

KT0.toStream().print(Printed.<String, Long>toSysOut().withLabel("KT0")); // 5.

Finally the complete code required to run the application is as following;

Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcountapp");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.STATE_DIR_CONFIG, "tmp/wordcountstore");

StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> KS0 = streamsBuilder.stream("sentences");

KStream<String, String> KS1 = KS0.flatMapValues((k, v) -> Arrays.asList(v.split(" ")));
KGroupedStream<String, String> KGS0 = KS1.groupBy((k,v) -> v);
KTable<String, Long> KT0 = KGS0.count();

KT0.toStream().print(Printed.<String, Long>toSysOut().withLabel("KT0"));

KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
kafkaStreams.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Shutting down...");
kafkaStreams.close();
}));

Here we need to include some boiler blate code which contains properties to connect to Kafka brokers, Serialisation/Deserialization (Serde) settings, etc. along with code to start the KafkaStreams application. streamsBuilder.build() will create an instance of Topology which passed to KafkaStreams along with properties before starting the actual Kafka Streams application.

Conclusion

Real time word count is a business requirement for many business and this logic can be modified to support game leaderboards, weather pattern calculation etc. I am planning to write more advanced topics about Kafka Stream in the upcoming articles so please follow me to get the latest articles on this topic.

--

--

Shazin Sadakath
Shazin Sadakath

No responses yet