Kafka Streams allows us to build applications that process Kafka
data in real-time with ease.
A Kafka Streams application is an application where both the input and the output are stored in Kafka topics.
Kafka Streams is a client library(API) that makes it easy to build these applications.
gradle init
Select type of project to generate:
1: basic
2: application
3: library
4: Gradle plugin
Enter selection (default: basic) [1..4] 2
Select implementation language:
1: C++
2: Groovy
3: Java
4: Kotlin
5: Scala
6: Swift
Enter selection (default: Java) [1..6] 3
Split functionality across multiple subprojects?:
1: no - only one application project
2: yes - application and library projects
Enter selection (default: no - only one application project) [1..2] 1
Select build script DSL:
1: Groovy
2: Kotlin
Enter selection (default: Groovy) [1..2] 1
Select test framework:
1: JUnit 4
2: TestNG
3: Spock
4: JUnit Jupiter
Enter selection (default: JUnit 4) [1..4] 1
Project name (default: kafka): com.ccdak.jay
Source package (default: com.ccdak.jay):
> Task :init
Get more help with your project: https://docs.gradle.org/6.8.3/samples/sample_building_java_applications.html
BUILD SUCCESSFUL in 44s
2 actionable tasks: 2 executed
Add dependencies
/*
* This file was generated by the Gradle 'init' task.
*
* This generated file contains a sample Java application project to get you started.
* For more details take a look at the 'Building Java & JVM projects' chapter in the Gradle
* User Manual available at https://docs.gradle.org/6.8.3/userguide/building_java_projects.html
*/
plugins {
// Apply the application plugin to add support for building a CLI application in Java.
id 'application'
}
repositories {
// Use JCenter for resolving dependencies.
jcenter()
}
dependencies {
// Use JUnit test framework.
testImplementation 'junit:junit:4.13'
// Kafka-streams
// Kafka-clients
implementation 'org.apache.kafka:kafka-streams:2.2.1'
implementation 'org.apache.kafka:kafka-clients:2.2.1'
// This dependency is used by the application.
implementation 'com.google.guava:guava:29.0-jre'
}
application {
// Define the main class for the application.
mainClass = 'com.ccdak.jay.CreateTopic'
}
task(runStreams, dependsOn: 'classes', type: JavaExec) {
main = 'com.ccdak.jay.StreamsMain'
classpath = sourceSets.main.runtimeClasspath
}
package com.linuxacademy.ccdak.streams;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
public class StreamsMain {
public static void main(String[] args) {
// Set up the configuration.
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-data");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
// Since the input topic uses Strings for both key and value, set the default
// Serdes to String.
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Get the source stream.
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> source = builder.stream("streams-input-topic");
source.to("streams-output-topic");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
// Print the topology to the console.
System.out.println(topology.describe());
final CountDownLatch latch = new CountDownLatch(1);
// Attach a shutdown handler to catch control-c and terminate the application
// gracefully.
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (final Throwable e) {
System.out.println(e.getMessage());
System.exit(1);
}
System.exit(0);
}
}
kafka-console-producer --broker-list localhost:9092 --topic streams-input-topic --property parse.key=true --property key.separator=:
>hi:?
./gradlew runStreams
kafka-console-consumer --bootstrap-server localhost:9092 --topic streams-output-topic --property print.key=true
hi ?