bookmark_borderKafka Streams Joins

Joins are used to combine streams into one new stream.

Co-Partitioning

When joining streams, the data must be co-partitioned:

  • Same number of partitions for input topics
  • Same partitioning strategies for producers.

You can avoid the need for co-partitioning by using a GlobalKTable. with GlobalKTables, all instances of your streams application will populate the local table with data from all partitions.

InnerJoin

The new stream will contain only records that have a match in both joined streams.

Left Join

The new stream will contain all records from the first stream, but only matching records from the joined stream.

Outer Join

The new stream will contain all records from both streams.

Joins.java

package com.ccdak.jay;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;

public class Joins {

    public static void main(String[] args) {
        // Set up the configuration.
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "joins-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        // Since the input topic uses Strings for both key and value, set the default
        // Serdes to String.
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Get the source stream.
        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> left = builder.stream("joins-input-topic-left");
        KStream<String, String> right = builder.stream("joins-input-topic-right");

        // Perform a inner join
        KStream<String, String> innerJoined = left.join(right,
                (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
                JoinWindows.of(Duration.ofMinutes(5)));
        innerJoined.to("inner-join-output-topic");

        // Perform a left join.
        KStream<String, String> leftJoined = left.leftJoin(right,
                (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
                JoinWindows.of(Duration.ofMinutes(5)));
        leftJoined.to("left-join-output-topic");

        // Perform an outer join.
        KStream<String, String> outerJoined = left.outerJoin(right,
                (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
                JoinWindows.of(Duration.ofMinutes(5)));
        outerJoined.to("outer-join-output-topic");

        // Implement streams logic.
        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        // Print the topology to the console.
        System.out.println(topology.describe());
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach a shutdown handler to catch control-c and terminate the application
        // gracefully.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.out.println(e.getMessage());
            System.exit(1);
        }
        System.exit(0);
    }
}
/*
 * This file was generated by the Gradle 'init' task.
 *
 * This generated file contains a sample Java application project to get you started.
 * For more details take a look at the 'Building Java & JVM projects' chapter in the Gradle
 * User Manual available at https://docs.gradle.org/6.8.3/userguide/building_java_projects.html
 */

plugins {
    // Apply the application plugin to add support for building a CLI application in Java.
    id 'application'
}

repositories {
    // Use JCenter for resolving dependencies.
    jcenter()
}

dependencies {
    // Use JUnit test framework.
    testImplementation 'junit:junit:4.13'
    // Kafka-streams
    // Kafka-clients

    implementation 'org.apache.kafka:kafka-streams:2.2.1'
    implementation 'org.apache.kafka:kafka-clients:2.2.1'
    // This dependency is used by the application.
    implementation 'com.google.guava:guava:29.0-jre'
}

application {
    // Define the main class for the application.
    mainClass = 'com.ccdak.jay.CreateTopic'
}

task(runJoins, dependsOn: 'classes', type: JavaExec) {
   main = 'com.ccdak.jay.Joins'
   classpath = sourceSets.main.runtimeClasspath
}

Case 1

Two input Topics

kafka-console-producer --broker-list localhost:9092 --topic joins-input-topic-left --property parse.key=true --property key.separator=:
>b:anote
 kafka-console-producer --broker-list localhost:9092 --topic joins-input-topic-right --property parse.key=true --property key.separator=:

Three outer Topics

kafka-console-consumer --bootstrap-server localhost:9092 --topic inner-join-output-topic --property print.key=true
kafka-console-consumer --bootstrap-server localhost:9092 --topic left-join-output-topic --property print.key=true
b	left=anote, right=null
kafka-console-consumer --bootstrap-server localhost:9092 --topic outer-join-output-topic --property print.key=true
b	left=anote, right=null
  • innerJoined only contains the records the both two entities are joined.
  • leftJoined only contains the records in left entity
  • outerJoined contains all of the records in the entities

Case 2

Two input Topics

kafka-console-producer --broker-list localhost:9092 --topic joins-input-topic-left --property parse.key=true --property key.separator=:
>b:anote
kafka-console-producer --broker-list localhost:9092 --topic joins-input-topic-right --property parse.key=true --property key.separator=:
>b:dev
>c:kafka

Three outer Topics

kafka-console-consumer --bootstrap-server localhost:9092 --topic inner-join-output-topic --property print.key=true
b	left=anote, right=dev
kafka-console-consumer --bootstrap-server localhost:9092 --topic left-join-output-topic --property print.key=true
b	left=anote, right=null
b	left=anote, right=dev
kafka-console-consumer --bootstrap-server localhost:9092 --topic outer-join-output-topic --property print.key=true
b	left=anote, right=null
b	left=anote, right=dev
c	left=null, right=kafka

ANOTE.DEV