Stateless transformations, such as groupByKey
and groupBy
can be used to group records that share the same key.
Aggregations
are stateful transformation
that always operate on these groups of records sharing the same key.
Aggregate
Generates a new record from a calculation involving the grouped records.
Count
Counts the number of records for each grouped key
Reduce
Combines the grouped records into a single record.
Aggregations.java
package com.ccdak.jay;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
public class Aggregations {
public static void main(String[] args) {
// Set up the configuration.
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregations-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
// Since the input topic uses Strings for both key and value, set the default
// Serdes to String.
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Get the source stream.
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("aggregations-input-topic");
// Group the source stream by the existing key.
KGroupedStream<String, String> groupedStream = source.groupByKey();
// Create an aggregation that totals the length in characters of the value for
// all records sharing the same key
KTable<String, Integer> aggregatedTable = groupedStream.aggregate(() -> 0,
(aggkey, newValue, aggValue) -> aggValue + newValue.length(),
Materialized.with(Serdes.String(), Serdes.Integer()));
aggregatedTable.toStream().to("aggregations-output-charactercount-topic",
Produced.with(Serdes.String(), Serdes.Integer()));
// Count the number of records for each key
KTable<String, Long> countedTable = groupedStream.count(Materialized.with(Serdes.String(), Serdes.Long()));
countedTable.toStream().to("aggregations-output-count-topic", Produced.with(Serdes.String(), Serdes.Long()));
// Combine the values of all records with the same key into a string separated
// by spaces.
KTable<String, String> reducedTable = groupedStream.reduce((aggValue, newValue) -> aggValue + " " + newValue);
reducedTable.toStream().to("aggregations-output-reduce-topic");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
// Print the topology to the console.
System.out.println(topology.describe());
final CountDownLatch latch = new CountDownLatch(1);
// Attach a shutdown handler to catch control-c and terminate the application
// gracefully.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (final Throwable e) {
System.out.println(e.getMessage());
System.exit(1);
}
System.exit(0);
}
}
plugins {
// Apply the application plugin to add support for building a CLI application in Java.
id 'application'
}
repositories {
// Use JCenter for resolving dependencies.
jcenter()
}
dependencies {
// Use JUnit test framework.
testImplementation 'junit:junit:4.13'
// Kafka-streams
// Kafka-clients
implementation 'org.apache.kafka:kafka-streams:2.2.1'
implementation 'org.apache.kafka:kafka-clients:2.2.1'
// This dependency is used by the application.
implementation 'com.google.guava:guava:29.0-jre'
}
application {
// Define the main class for the application.
mainClass = 'com.ccdak.jay.CreateTopic'
}
task(runAggregations, dependsOn: 'classes', type: JavaExec) {
main = 'com.ccdak.jay.Aggregations'
classpath = sourceSets.main.runtimeClasspath
}
./gradlew runStatelessTransformations
kafka-console-producer
kafka-console-producer --broker-list localhost:9092 --topic aggregations-input-topic --property parse.key=true --property key.separator=:
>a:a
>b:hello
>b:hello
>c:hello
kafka-console-consumer
- aggregations-output-charactercount-topic
- aggregations-output-count-topic
- aggregations-output-reduce-topic
aggregations-output-charactercount-topic
kafka-console-consumer --bootstrap-server localhost:9092 --topic aggregations-output-charactercount-topic --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
b 5
b 10
c 5
aggregations-output-count-topic
kafka-console-consumer --bootstrap-server localhost:9092 --topic aggregations-output-count-topic --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
b 1
b 2
c 1
aggregations-output-reduce-topic
kafka-console-consumer --bootstrap-server localhost:9092 --topic aggregations-output-reduce-topic --property print.key=true
b hello
b hello hello
c hello