Uwaga! Kod z przykładami znajduje się na gałęzi KafkaStreams w repozytorium https://github.com/WitMar/PRS2025 .
Kafka Streams to biblioteka Java opracowana przez Apache, służąca do budowy skalowalnych, odpornych na błędy aplikacji do przetwarzania danych strumieniowych w czasie rzeczywistym. Działa bezpośrednio na danych zapisanych w Apache Kafka, dzięki czemu nie wymaga zewnętrznych klastrów przetwarzających (jak Apache Spark czy Flink).
Cechy charakterystyczne Kafka Streams:
Przetwarzanie lokalne (bez konieczności klastrów obliczeniowych).
Wsparcie dla stanowego przetwarzania (tabele, agregacje).
Prosta integracja z aplikacjami Java.
Wysoka dostępność i tolerancja błędów.
Typowe zastosowania:
Agregacja i analiza logów w czasie rzeczywistym.
Wzbogacanie danych (data enrichment).
Reaktywne przetwarzanie zdarzeń (event-driven applications).
Budowa pipeline'ów danych (data pipelines).
Aby uruchomić aplikację opartą na Kafka Streams w projekcie Maven, musisz dodać przynajmniej jedną główną zależność do kafka-streams. W zależności od potrzeb możesz też dodać inne, jak np. slf4j dla logowania, czy kafka-clients. Minimalne zależności Maven dla Kafka Streams (np. Kafka 3.6.0):
<dependencies>
<!-- Główna biblioteka Kafka Streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.7.0</version>
</dependency>
<!-- SLF4J dla logowania (jeśli używasz logback/log4j/slf4j) -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId> <!-- lub slf4j-log4j12 / logback-classic -->
<version>2.0.9</version>
</dependency>
<!-- (opcjonalnie) Kafka clients - zwykle już zawarte w kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
<!-- (opcjonalnie) JSON/Serde biblioteki, jeśli przetwarzasz dane w JSON -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.0</version>
</dependency>
</dependencies>
Kompatybilność
Wersja kafka-streams musi być zgodna z wersją Twojego brokera Kafka, chociaż klienci są zazwyczaj kompatybilni wstecznie w ograniczonym zakresie.
Podstawowym krokiem jest utworzenie topologii przetwarzania z wykorzystaniem klasy StreamsBuilder. Strumień danych reprezentowany jest przez klasę KStream, natomiast tabelę stanu (np. ostatnie znane wartości dla danego klucza) przez KTable.
Aby stworzyć strumień danych, należy:
Utworzyć obiekt StreamsBuilder.
Użyć metody stream(...) do pobrania danych z określonego topicu Kafka.
Przykład utworzenia prostego strumienia:
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
StreamsBuilder builder = new StreamsBuilder();
KStream\<String, String> strumienWejsciowy = builder.stream("nazwa-topicu");
W powyższym przykładzie:
"nazwa-topicu" to topic Kafka, z którego strumień będzie pobierał dane.
Klucze i wartości są typu String, ale mogą być dowolnego typu, o ile są serializowalne.
Strumień danych z pliku lub listy obiektów
Kafka Streams bezpośrednio współpracuje tylko z tematami Kafka. Jeśli chcesz utworzyć strumień z lokalnego pliku lub z listy obiektów Java, musisz najpierw przesłać dane do topicu Kafka lub użyć alternatywnych mechanizmów, takich jak Kafka Connect czy Producer API.
Przykład utworzenia strumienia z lokalnej listy obiektów Java poprzez tymczasowy zapis do topicu Kafka:
List dane = Arrays.asList("obiekt1", "obiekt2", "obiekt3");
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (String rekord : dane) {
producer.send(new ProducerRecord<>("nazwa-topicu", rekord));
}
}
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("nazwa-topicu");
Analogicznie, aby utworzyć strumień z pliku, należy:
Odczytać plik lokalnie. Wysłać dane do odpowiedniego topicu Kafka za pomocą KafkaProducer. Następnie użyć Kafka Streams do pobrania danych z tego topicu.
Operacje przetwarzania danych
Kafka Streams oferuje różnorodne operacje do przetwarzania danych:
filter() – filtrowanie rekordów na podstawie określonego warunku.
KStream<String, String> przefiltrowanyStrumien = strumienWejsciowy.filter((key, value) -> value.contains("kryterium"));
map() / mapValues() – transformacja rekordów lub wartości.
KStream<String, Integer> zmapowanyStrumien = strumienWejsciowy.mapValues(value -> value.length());
flatMap() / flatMapValues() – transformacja jednego rekordu w zero, jeden lub więcej rekordów.
KStream<String, String> flatMapStrumien = strumienWejsciowy.flatMapValues(value -> Arrays.asList(value.split("\s+")));
groupBy() / groupByKey() – grupowanie rekordów.
KGroupedStream<String, String> zgrupowanyStrumien = strumienWejsciowy.groupByKey();
aggregate() / reduce() / count() – operacje stanowe na zgrupowanych rekordach.
KTable<String, Long> liczbaRekordow = zgrupowanyStrumien.count();
join() – łączenie strumieni lub tabel.
KStream<String, String> joinedStream = strumienWejsciowy.join(innyStrumien,
(value1, value2) -> value1 + "-" + value2,
JoinWindows.of(Duration.ofMinutes(5)));
Stateful operations like:
.aggregate(...)
.reduce(...)
.count()
.join(...)
.windowedBy(...) with grouping
Tworzą tymczasowe topicy do przechowywania wiadomości!
Kafka Streams obsługuje różne typy okien czasowych do przetwarzania danych:
Tumbling Window (okno skokowe) – okna o stałej długości, bez nakładania się.
stream.groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(5))).count();
Hopping Window (okno przesuwne) – okna o stałej długości, które mogą się nakładać.
stream.groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1))).count();
Sliding Window (okno przesuwne ze zdarzeniami) – dynamiczne okno definiowane przez różnicę czasową między zdarzeniami.
stream.groupByKey().windowedBy(SlidingWindows.withTimeDifference(Duration.ofMinutes(5))).count();
Session Window (okno sesyjne) – okno definiowane przez brak aktywności w strumieniu.
stream.groupByKey().windowedBy(SessionWindows.with(Duration.ofMinutes(10))).count();
🔥 UWAGA: Kafka Streams nie wspiera bezpośrednio okien liczonych na ilość elementów (typu "sliding count window", np. ostatnie 20 wartości). Są tylko czasowe okna (time-based windows).
Kafka Streams pozwala zapisywać wyniki przetwarzania danych do różnych ujść:
Topic Kafka – zapis danych do innego tematu Kafka.
strumienWejsciowy.to("wyjsciowy-topic");
Konsola lub logi – przydatne do debugowania.
strumienWejsciowy.foreach((key, value) -> System.out.println(key + ": " + value));
KTable – zapis wyników jako tabela stanu, np. po agregacji.
liczbaRekordow.toStream().to("agregacje-topic");
Uwaga KafkaStreams mają pare cech które powodują, że uruchamianie przykładowych kodów nie jest proste.
Po pierwsze w przypadku dodawania wartości odbywa się to asynchronicznie i musimy zrobić sleep w kodzie producera inaczej z sukcesem dane nie będą dodane do topica.
Po drugie każdy program oznaczany jest przez ApplicationId i nie da się tak jak we Flinku uruchamiać programu zawsze od początku topica tylko kazde applicationId ma swój własny licznik offsetu i nowe uruchomienie będzie przetwarzać tylko dane od tego offsetu i nie możemy nic z tym zrobić.
Dlatego w przypadku przykładów z pakietu simpleReader proszę o zmianę nazwy ApplicationId na jakiś unikalny string jak np państwa numer indeksu.
Po trzecie w przypadku okien i aggregacji operacje te tworzą topici pośrednie do przetwarzania. Nasi użytkownicy na cloudclusters nie mają praw tworzenia topiców bo jest to ograniczony tani serwer. W związku z tym proszę nie zmieniać nazw aplikacji w pakiecie KafkaStream! Z powodu wyżej aby zobaczyć wynik przetwarzania trzeba wygenerować jakieś dane czyli równolele wykonać program Producer.java razem z konsumerem.