bookmark_borderKafka Streams Transformations

Kafka Streams provides a robust set of tools for processing and transforming data. The Kafka cluster itself serves as the backend for data management and storage.

There are two types of data transformations in Kafka Streams:

  • Stateless transformations do not require any additional storage to manage the state.
  • Stateful transformations require a state store to manage the state.

Stateless Transformations

Branch

Splits a stream into multiple streams based on a predicate.

Filter

Removes messages from the stream based on a condition.

FlatMap

Takes input records and turns them into a different set of records.

Foreach

Performs an arbitrary stateless operation on each record. This is a terminal operation and stops further processing.

StatelessTransformations

package com.ccdak.jay;

import java.util.List;
import java.util.LinkedList;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.KeyValue;

public class StatelessTransformations {

    public static void main(String[] args) {
        // Set up the configuration.
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stateless-transformations-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        // Since the input topic uses Strings for both key and value, set the default
        // Serdes to String.
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Get the source stream.
        final StreamsBuilder builder = new StreamsBuilder();

        final KStream<String, String> source = builder.stream("stateless-transformations-input-topic");

        // Split the stream into two streams, one containing all records where the key
        // begins with "a", and t
        KStream<String, String>[] branches = source.branch((key, value) -> key.startsWith("a"), (key, value) -> true);
        KStream<String, String> aKeysStream = branches[0];
        KStream<String, String> othersStream = branches[1];

        // Remove any records from the "a" stream where the value does not also start
        // with "a"
        aKeysStream = aKeysStream.filter((key, value) -> value.startsWith("a"));

        // For the "a" stream, convert each record into two records, one with an
        // uppercased value and one with a lowercased value.

        aKeysStream = aKeysStream.flatMap((key, value) -> {
            List<KeyValue<String, String>> result = new LinkedList<>();
            result.add(KeyValue.pair(key, value.toUpperCase()));
            result.add(KeyValue.pair(key, value.toLowerCase()));
            return result;
        });

        // Just oupt of the data : return void
        // aKeysStream.foreach((key, value) -> System.out.println("key=" + key + ",
        // value " + value));

        aKeysStream = aKeysStream.map((key, value) -> KeyValue.pair(key.toUpperCase(), value));

        // Merge the two streams back together.
        KStream<String, String> mergedStream = aKeysStream.merge(othersStream);

        // Print each record to the console.
        mergedStream = mergedStream.peek((key, value) -> System.out.println("key=" + key + ", value" + value));

        // Output the transformed data to a topic
        mergedStream.to("stateless-transformations-output-topic");

        // Implement streams logic.
        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        // Print the topology to the console.
        System.out.println(topology.describe());
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach a shutdown handler to catch control-c and terminate the application
        // gracefully.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.out.println(e.getMessage());
            System.exit(1);
        }
        System.exit(0);
    }

}

plugins {
    // Apply the application plugin to add support for building a CLI application in Java.
    id 'application'
}

repositories {
    // Use JCenter for resolving dependencies.
    jcenter()
}

dependencies {
    // Use JUnit test framework.
    testImplementation 'junit:junit:4.13'
    // Kafka-streams
    // Kafka-clients

    implementation 'org.apache.kafka:kafka-streams:2.2.1'
    implementation 'org.apache.kafka:kafka-clients:2.2.1'
    // This dependency is used by the application.
    implementation 'com.google.guava:guava:29.0-jre'
}

application {
    // Define the main class for the application.
    mainClass = 'com.ccdak.jay.CreateTopic'
}

task(runStatelessTransformations, dependsOn: 'classes', type: JavaExec) {
   main = 'com.ccdak.jay.StatelessTransformations'
   classpath = sourceSets.main.runtimeClasspath
}

Open Producer

kafka-console-producer --broker-list localhost:9092 --topic stateless-transformations-input-topic --property parse.key=true --property key.separator=:
>akey:avalue
>bkey:bvalue

Open Consumer

kafka-console-consumer --bootstrap-server localhost:9092 --topic stateless-transformations-output-topic --property print.key=true
AKEY	AVALUE
AKEY	avalue
ANOTE.DEV