Kafka Streams Aggregations

Stateless transformations, such as groupByKey and groupBy can be used to group records that share the same key.

Aggregations are stateful transformation that always operate on these groups of records sharing the same key.

Aggregate

Generates a new record from a calculation involving the grouped records.

Count

Counts the number of records for each grouped key

Reduce

Combines the grouped records into a single record.

Aggregations.java

package com.ccdak.jay;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

public class Aggregations {

    public static void main(String[] args) {
        // Set up the configuration.
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregations-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        // Since the input topic uses Strings for both key and value, set the default
        // Serdes to String.
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Get the source stream.
        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("aggregations-input-topic");

        // Group the source stream by the existing key.
        KGroupedStream<String, String> groupedStream = source.groupByKey();

        // Create an aggregation that totals the length in characters of the value for
        // all records sharing the same key
        KTable<String, Integer> aggregatedTable = groupedStream.aggregate(() -> 0,
                (aggkey, newValue, aggValue) -> aggValue + newValue.length(),
                Materialized.with(Serdes.String(), Serdes.Integer()));
        aggregatedTable.toStream().to("aggregations-output-charactercount-topic",
                Produced.with(Serdes.String(), Serdes.Integer()));

        // Count the number of records for each key

        KTable<String, Long> countedTable = groupedStream.count(Materialized.with(Serdes.String(), Serdes.Long()));
        countedTable.toStream().to("aggregations-output-count-topic", Produced.with(Serdes.String(), Serdes.Long()));

        // Combine the values of all records with the same key into a string separated
        // by spaces.
        KTable<String, String> reducedTable = groupedStream.reduce((aggValue, newValue) -> aggValue + " " + newValue);
        reducedTable.toStream().to("aggregations-output-reduce-topic");

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        // Print the topology to the console.
        System.out.println(topology.describe());
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach a shutdown handler to catch control-c and terminate the application
        // gracefully.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.out.println(e.getMessage());
            System.exit(1);
        }
        System.exit(0);
    }
}

plugins {
    // Apply the application plugin to add support for building a CLI application in Java.
    id 'application'
}

repositories {
    // Use JCenter for resolving dependencies.
    jcenter()
}

dependencies {
    // Use JUnit test framework.
    testImplementation 'junit:junit:4.13'
    // Kafka-streams
    // Kafka-clients

    implementation 'org.apache.kafka:kafka-streams:2.2.1'
    implementation 'org.apache.kafka:kafka-clients:2.2.1'
    // This dependency is used by the application.
    implementation 'com.google.guava:guava:29.0-jre'
}

application {
    // Define the main class for the application.
    mainClass = 'com.ccdak.jay.CreateTopic'
}

task(runAggregations, dependsOn: 'classes', type: JavaExec) {
   main = 'com.ccdak.jay.Aggregations'
   classpath = sourceSets.main.runtimeClasspath
}
./gradlew runStatelessTransformations

kafka-console-producer

kafka-console-producer --broker-list localhost:9092 --topic aggregations-input-topic --property parse.key=true --property key.separator=:
>a:a
>b:hello
>b:hello
>c:hello

kafka-console-consumer

  • aggregations-output-charactercount-topic
  • aggregations-output-count-topic
  • aggregations-output-reduce-topic

aggregations-output-charactercount-topic

kafka-console-consumer --bootstrap-server localhost:9092 --topic aggregations-output-charactercount-topic --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
b	5
b	10
c	5

aggregations-output-count-topic

kafka-console-consumer --bootstrap-server localhost:9092 --topic aggregations-output-count-topic --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
b 1
b	2
c	1

aggregations-output-reduce-topic

kafka-console-consumer --bootstrap-server localhost:9092 --topic aggregations-output-reduce-topic --property print.key=true                               
b	hello
b	hello hello
c	hello

Leave a Reply

Your email address will not be published.

ANOTE.DEV