PRS13.rst

Przetwarzanie równoległe i strumieniowe

Kafka Streams

Uwaga! Kod z przykładami znajduje się na gałęzi KafkaStreams w repozytorium https://github.com/WitMar/PRS2025 .

Wykorzystanie

Kafka Streams to biblioteka Java opracowana przez Apache, służąca do budowy skalowalnych, odpornych na błędy aplikacji do przetwarzania danych strumieniowych w czasie rzeczywistym. Działa bezpośrednio na danych zapisanych w Apache Kafka, dzięki czemu nie wymaga zewnętrznych klastrów przetwarzających (jak Apache Spark czy Flink).

Cechy charakterystyczne Kafka Streams:

  • Przetwarzanie lokalne (bez konieczności klastrów obliczeniowych).

  • Wsparcie dla stanowego przetwarzania (tabele, agregacje).

  • Prosta integracja z aplikacjami Java.

  • Wysoka dostępność i tolerancja błędów.

Typowe zastosowania:

  • Agregacja i analiza logów w czasie rzeczywistym.

  • Wzbogacanie danych (data enrichment).

  • Reaktywne przetwarzanie zdarzeń (event-driven applications).

  • Budowa pipeline'ów danych (data pipelines).

Uruchomienie

Aby uruchomić aplikację opartą na Kafka Streams w projekcie Maven, musisz dodać przynajmniej jedną główną zależność do kafka-streams. W zależności od potrzeb możesz też dodać inne, jak np. slf4j dla logowania, czy kafka-clients. Minimalne zależności Maven dla Kafka Streams (np. Kafka 3.6.0):

<dependencies>
        <!-- Główna biblioteka Kafka Streams -->
        <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <version>2.7.0</version>
        </dependency>

<!-- SLF4J dla logowania (jeśli używasz logback/log4j/slf4j) -->
        <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId> <!-- lub slf4j-log4j12 / logback-classic -->
                <version>2.0.9</version>
        </dependency>

<!-- (opcjonalnie) Kafka clients - zwykle już zawarte w kafka-streams -->
        <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.7.0</version>
        </dependency>

<!-- (opcjonalnie) JSON/Serde biblioteki, jeśli przetwarzasz dane w JSON -->
        <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.15.0</version>
        </dependency>
</dependencies>

Kompatybilność

Wersja kafka-streams musi być zgodna z wersją Twojego brokera Kafka, chociaż klienci są zazwyczaj kompatybilni wstecznie w ograniczonym zakresie.
Tworzenie strumieni w Kafka Streams

Podstawowym krokiem jest utworzenie topologii przetwarzania z wykorzystaniem klasy StreamsBuilder. Strumień danych reprezentowany jest przez klasę KStream, natomiast tabelę stanu (np. ostatnie znane wartości dla danego klucza) przez KTable.

Aby stworzyć strumień danych, należy:

  1. Utworzyć obiekt StreamsBuilder.

  2. Użyć metody stream(...) do pobrania danych z określonego topicu Kafka.

Przykład utworzenia prostego strumienia:

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;

StreamsBuilder builder = new StreamsBuilder();
KStream\<String, String> strumienWejsciowy = builder.stream("nazwa-topicu");

W powyższym przykładzie:

  • "nazwa-topicu" to topic Kafka, z którego strumień będzie pobierał dane.

  • Klucze i wartości są typu String, ale mogą być dowolnego typu, o ile są serializowalne.

Strumień danych z pliku lub listy obiektów

Kafka Streams bezpośrednio współpracuje tylko z tematami Kafka. Jeśli chcesz utworzyć strumień z lokalnego pliku lub z listy obiektów Java, musisz najpierw przesłać dane do topicu Kafka lub użyć alternatywnych mechanizmów, takich jak Kafka Connect czy Producer API.

Przykład utworzenia strumienia z lokalnej listy obiektów Java poprzez tymczasowy zapis do topicu Kafka:

List dane = Arrays.asList("obiekt1", "obiekt2", "obiekt3");

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
        for (String rekord : dane) {
                producer.send(new ProducerRecord<>("nazwa-topicu", rekord));
        }
}

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("nazwa-topicu");

Analogicznie, aby utworzyć strumień z pliku, należy:

Odczytać plik lokalnie. Wysłać dane do odpowiedniego topicu Kafka za pomocą KafkaProducer. Następnie użyć Kafka Streams do pobrania danych z tego topicu.

Przetwarzanie strumienia danych

Operacje przetwarzania danych

Kafka Streams oferuje różnorodne operacje do przetwarzania danych:

filter() – filtrowanie rekordów na podstawie określonego warunku.

KStream<String, String> przefiltrowanyStrumien = strumienWejsciowy.filter((key, value) -> value.contains("kryterium"));

map() / mapValues() – transformacja rekordów lub wartości.

KStream<String, Integer> zmapowanyStrumien = strumienWejsciowy.mapValues(value -> value.length());

flatMap() / flatMapValues() – transformacja jednego rekordu w zero, jeden lub więcej rekordów.

KStream<String, String> flatMapStrumien = strumienWejsciowy.flatMapValues(value -> Arrays.asList(value.split("\s+")));

groupBy() / groupByKey() – grupowanie rekordów.

KGroupedStream<String, String> zgrupowanyStrumien = strumienWejsciowy.groupByKey();

aggregate() / reduce() / count() – operacje stanowe na zgrupowanych rekordach.

KTable<String, Long> liczbaRekordow = zgrupowanyStrumien.count();

join() – łączenie strumieni lub tabel.

KStream<String, String> joinedStream = strumienWejsciowy.join(innyStrumien,
(value1, value2) -> value1 + "-" + value2,
JoinWindows.of(Duration.ofMinutes(5)));

Stateful operations like:

.aggregate(...)

.reduce(...)

.count()

.join(...)

.windowedBy(...) with grouping

Tworzą tymczasowe topicy do przechowywania wiadomości!

Okna przetwarzające dane

Kafka Streams obsługuje różne typy okien czasowych do przetwarzania danych:

Tumbling Window (okno skokowe) – okna o stałej długości, bez nakładania się.

stream.groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(5))).count();

Hopping Window (okno przesuwne) – okna o stałej długości, które mogą się nakładać.

stream.groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1))).count();

Sliding Window (okno przesuwne ze zdarzeniami) – dynamiczne okno definiowane przez różnicę czasową między zdarzeniami.

stream.groupByKey().windowedBy(SlidingWindows.withTimeDifference(Duration.ofMinutes(5))).count();

Session Window (okno sesyjne) – okno definiowane przez brak aktywności w strumieniu.

stream.groupByKey().windowedBy(SessionWindows.with(Duration.ofMinutes(10))).count();

🔥 UWAGA: Kafka Streams nie wspiera bezpośrednio okien liczonych na ilość elementów (typu "sliding count window", np. ostatnie 20 wartości). Są tylko czasowe okna (time-based windows).

Definicje ujścia strumienia (sink)

Kafka Streams pozwala zapisywać wyniki przetwarzania danych do różnych ujść:

Topic Kafka – zapis danych do innego tematu Kafka.

strumienWejsciowy.to("wyjsciowy-topic");

Konsola lub logi – przydatne do debugowania.

strumienWejsciowy.foreach((key, value) -> System.out.println(key + ": " + value));

KTable – zapis wyników jako tabela stanu, np. po agregacji.

liczbaRekordow.toStream().to("agregacje-topic");

Przykłady

Uwaga KafkaStreams mają pare cech które powodują, że uruchamianie przykładowych kodów nie jest proste.

Po pierwsze w przypadku dodawania wartości odbywa się to asynchronicznie i musimy zrobić sleep w kodzie producera inaczej z sukcesem dane nie będą dodane do topica.

Po drugie każdy program oznaczany jest przez ApplicationId i nie da się tak jak we Flinku uruchamiać programu zawsze od początku topica tylko kazde applicationId ma swój własny licznik offsetu i nowe uruchomienie będzie przetwarzać tylko dane od tego offsetu i nie możemy nic z tym zrobić.

Dlatego w przypadku przykładów z pakietu simpleReader proszę o zmianę nazwy ApplicationId na jakiś unikalny string jak np państwa numer indeksu.

Po trzecie w przypadku okien i aggregacji operacje te tworzą topici pośrednie do przetwarzania. Nasi użytkownicy na cloudclusters nie mają praw tworzenia topiców bo jest to ograniczony tani serwer. W związku z tym proszę nie zmieniać nazw aplikacji w pakiecie KafkaStream! Z powodu wyżej aby zobaczyć wynik przetwarzania trzeba wygenerować jakieś dane czyli równolele wykonać program Producer.java razem z konsumerem.