Uwaga! Kod z przykładami znajduje się na gałęzi Flink w repozytorium https://github.com/WitMar/PRS2025 .
Apache Flink umożliwia tworzenie skalowalnych transformacji wsadowych i strumieniowych, takich jak potoki przetwarzania danych w czasie rzeczywistym, eksploracyjna analiza danych na dużą skalę, potoki uczenia maszynowego (ML) i procesy ETL.
Instalacja na własnym komputerze - zależność w Maven:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.20.0</version>
</dependency>
</dependencies>
Strumienie danych w bibliotece Flink nazywamy DataStream - strumieniami danych. Służą one do opisu zarówno danych do przetwarzania strumieniowego jak i batchowego, które w bibliotece Flink posiadają ten sam interfejs do wykonywania operacji na danych.
Przetwarzanie strumieni danych DataStream we Flink to zwykłe programy, które realizują transformacje na strumieniach danych (np. filtrowanie, aktualizowanie stanu, definiowanie okien, agregowanie). Strumienie danych są początkowo tworzone z różnych źródeł (np. kolejki komunikatów, strumienie gniazd, pliki). Wyniki są zwracane przez ujścia (sink), które mogą na przykład zapisywać dane do plików lub na standardowe wyjście (na przykład terminal wiersza poleceń). Programy Flink działają w różnych kontekstach, samodzielnie lub osadzone w innych programach. Wykonanie może odbywać się w lokalnej maszynie JVM lub w klastrach wielu maszyn.
Programy Flink wyglądają jak zwykłe programy, które przekształcają strumienie danych. Każdy program składa się z tych samych podstawowych części:
Uzyskaj środowisko wykonawcze,Załaduj/utwórz dane początkowe / źródło danych,Określ przekształcenia na tych danych,Określ, gdzie umieścić wyniki swoich obliczeń,Uruchom wykonanie programu
StreamExecutionEnvironment jest podstawą wszystkich programów Flink. W naszym przypadku będziemy korzystać z getExecutionEnvironment(), ponieważ wywołuje on potrzebną funkcję w zależności od kontekstu: jeśli wykonujesz swój program w IDE lub jako zwykły program Java, utworzy lokalne środowisko, które wykona twój program na twojej lokalnej maszynie. Środowisko zapewnia metody kontrolowania wykonywania zadań (takie jak ustawienie parametrów równoległości lub odporności na błędy/punkty kontrolne) oraz interakcji ze światem zewnętrznym (dostęp do danych).
Przykład tworzenia środowiska i ustawiania / odczytu paramertów można znaleźć poniżej:
// Tworzenie środowiska wykonawczego DataStream API
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Przełączenie trybu przetwarzania na wsadowy (Batch)
import org.apache.flink.api.common.RuntimeExecutionMode;
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// Przykład prostego DataStream - batch
DataStream<Integer> data = env.fromElements(1, 2, 3, 4, 5);
data.print();
env.execute();
Exercise
Uruchom przykłady w klasach BatchEnvironment.java i StreamEnvironment.java.
Tryb wykonywania (runtime_mode) STREAMING to sposób wykonywania programów w zależności czy dane są skończone czy też nie. Streaming powinno być używane w przypadku zadań bez ograniczeń, które wymagają ciągłego przetwarzania przyrostowego i oczekuje się, że pozostaną w trybie online przez nieokreślony czas.
Ponadto istnieje tryb wykonywania w stylu wsadowym, który nazywamy trybem wykonywania BATCH. Wykonuje on zadania w sposób bardziej przypominający ramy przetwarzania wsadowego, takie jak MapReduce. Batch powinno być używane w przypadku zadań ograniczonych, dla których dane wejściowe są znane i ograniczone.
Trzecia opcja to AUTOMATIC pozwalająca wybrać tryb przetwarzania bazując na typie źródła danych.
Zunifikowane podejście Apache Flink do przetwarzania strumieniowego i wsadowego oznacza, że aplikacja DataStream wykonywana na ograniczonych danych wejściowych będzie generować takie same wyniki końcowe niezależnie od skonfigurowanego trybu wykonywania.
Ważne jest, aby zauważyć, co oznacza tutaj final: zadanie wykonywane w trybie STREAMING może generować przyrostowe aktualizacje (pomyśl o upsertach w bazie danych), podczas gdy zadanie BATCH dałoby tylko jeden końcowy wynik na końcu.
Najprostszym źródłem danych jest kolekcja. Tworzy ono źródło strumienia. Najprostsze przetwarzanie wypisze ten strumień na ekran. Uwaga! Aby wywołać uruchomić przetwarzanie na środowisku musimy wykonać metodę execute().
env.fromElements(1, 2, 3, 4, 5);
env.fromData(List.of("lower", "UPPER", "MiDdLe"));
Jak widać różnica w tm wypadku to sposób wypisywania danych w przypadku strumieniowego podejścia widzimy też na ekranie identyfikator jednostki przetwarzającej zadanie.
Generycznym typem źródła danych jest obiekt Source: Przykładem Source mogą być pliki, inne źródła danych jak Kafka itp. Poniżej plik cities.csv : Program czytający plik jako wejście dla przetwarzania. Definiujemy w obiekcie schemat danych które są odczytywane jako wiersze do strumienia wejściowego. Uwaga program czyta plik więc może być uruchomiony tylko lokalnie!
FileSource
.forRecordStreamFormat(csvFormat, new Path("cities.csv"))
.build();
TableEnvironment to główny interfejs do pracy z Table API oraz SQL API w Apache Flink. Pozwala on na definiowanie, przekształcanie i wykonywanie operacji na tabelach oraz zarządzanie środowiskiem wykonawczym dla zapytań SQL i tabel.
Tworzenie tabel z danych (fromValues, fromDataStream)
Wykonywanie zapytań SQL (sqlQuery)
Wykonywanie poleceń DDL i DML (executeSql)
Wstawianie danych do tabel (executeInsert)
Rejestrowanie widoków tymczasowych (createTemporaryView)
Konfiguracja środowiska (getConfig)
Istnieją dwa podstawowe tryby tworzenia TableEnvironment:
1. Tryb niezależny (tylko Table API lub SQL API):
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode() // lub inBatchMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
2. Zintegrowany z DataStream API (`StreamTableEnvironment`):
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Uwaga: StreamTableEnvironment używamy, gdy chcemy łączyć Table API z DataStream API.
Przykład użycia table API:
Table result = tableEnv.sqlQuery("SELECT id, name FROM people WHERE id > 10");
result.execute().print();
TableEnvironment pełni kluczową rolę w pracy z danymi tabelarycznymi w Apache Flink. Jest to podstawowe narzędzie do pracy z zapytaniami SQL, definicjami tabel oraz integracją z różnymi źródłami danych (np. pliki, bazy danych, Kafka).
TableEnvironment utrzymuje mapę katalogów tabel, które są tworzone z identyfikatorem. Każdy identyfikator składa się z 3 części: nazwy katalogu, nazwy bazy danych oraz nazwy obiektu.
Tabele mogą być wirtualne (WIDOK) lub zwykłe (TABELA). WIDOKI można tworzyć z istniejącego obiektu tabeli, zazwyczaj w wyniku działania interfejsu API tabeli lub zapytania SQL. TABELE opisują dane zewnętrzne, takie jak plik, tabela bazy danych lub kolejka komunikatów.
Możemy swobodnie przechodzić między tabelami a strumieniami i korzystać na zmianę z obi API.
Exercise
Uruchom przykłady z pakietu tableAPI.
Apache Flink udostępnia szereg operatorów przekształceń dla strumieni danych (DataStream), które pozwalają budować złożone potoki przetwarzania danych. Poniżej znajduje się opis najważniejszych operatorów dostępnych w API Flinka.
dataStream.map(value -> value * 2)
Przekształca każdy element wejściowy na dokładnie jeden element wyjściowy. Jest to najprostsza forma transformacji.
dataStream.flatMap((value, out) -> {
for (String word : value.split(" ")) {
out.collect(word);
}
})
Przekształca każdy element wejściowy na zero, jeden lub wiele elementów wyjściowych.
dataStream.filter(value -> value > 0)
Filtruje elementy strumienia na podstawie warunku logicznego.
dataStream.keyBy(value -> value.getKey())
Grupuje elementy na podstawie wartości klucza. Operacje takie jak reduce, window, aggregate mogą być wykonywane tylko na pogrupowanych danych.
dataStream
.keyBy(value -> value.f1)
.reduce((a, b) -> Tuple2.of(a.f0 + b.f0, a.f1))
Agreguje dane ze wspólnym kluczem w sposób inkrementalny.
stream1.union(stream2)
Łączy wiele strumieni tego samego typu w jeden strumień.
dataStream.partitionCustom((key, numPartitions) -> key % numPartitions, value -> value.getKey())
Pozwala na ręczne określenie przypisania elementów do partycji.
dataStream.print();
dataStream.addSink(new MySinkFunction());
Operator końcowy, który zapisuje przetworzone dane do zewnętrznego systemu (konsola, plik, Kafka, itp.).
Operatory DataStream w Flinku pozwalają tworzyć elastyczne i złożone potoki przetwarzania danych czasu rzeczywistego. Umożliwiają zarówno przekształcenia stateless (np. map, filter), jak i stateful (np. keyBy, reduce, window).
Więcej: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/operators/
Operatory możemy także definiować sami implementując odpowiednie clasy implementujące interfejsy, jako funkcje lub lambdy.
Stan pomaga nam zapamiętać dodatkowe informacje związane z przychodzącymi danymi. Jest to niezbędne gdy chcemy np. wyliczyć średnią z przychodzących wartości.
Jeśli chcesz użyć stanu, musisz najpierw określić klucz dla każdej danej w DataStream, który powinien być użyty do partycjonowania stanu (a także samych rekordów w strumieniu). Możesz określić klucz za pomocą keyBy(KeySelector) w Java/Scala API lub key_by(KeySelector) w Python API na DataStream. Spowoduje to wygenerowanie KeyedStream, który następnie zezwoli na operacje używające stanu klucza.
Funkcja selektora klucza pobiera pojedynczy rekord jako dane wejściowe i zwraca klucz dla tego rekordu. Klucz może być dowolnego typu i musi pochodzić z obliczeń deterministycznych.
Aby uzyskać uchwyt stanu, musisz utworzyć StateDescriptor. Zawiera on nazwę stanu (jak zobaczymy później, możesz utworzyć kilka stanów i muszą one mieć unikalne nazwy, aby można było się do nich odwoływać), typ wartości, które stan przechowuje i czasem funkcje użytkownika- określona funkcja, taka jak ReduceFunction. W zależności od typu stanu, który chcesz pobrać, tworzysz ValueStateDescriptor, ListStateDescriptor, AggregatingStateDescriptor, ReducingStateDescriptor lub MapStateDescriptor.
Dostęp do stanu uzyskuje się za pomocą RuntimeContext, poprzez implementacje tzw. rich functions. Rich functions zapewniają, oprócz funkcji zdefiniowanych przez użytkownika (map, reduce itp.), cztery metody: open, close, getRuntimeContext, and setRuntimeContext.
Exercise
Uruchom przykłady z pakietu operators.
Ujścia danych zużywają strumienie danych i przesyłają je do plików, gniazd, systemów zewnętrznych lub drukują je. Flink ma wiele wbudowanych formatów wyjściowych, które są zamknięte w operacjach na strumieniach danych:
writeAsText() / TextOutputFormat — Zapisuje elementy liniowo jako łańcuchy. Łańcuchy są uzyskiwane przez wywołanie metody toString() każdego elementu.
writeAsCsv(...) / CsvOutputFormat — zapisuje krotki jako pliki wartości oddzielonych przecinkami. Ograniczniki wierszy i pól można konfigurować. Wartość dla każdego pola pochodzi z metody toString() obiektów.
print() / printToErr() — Drukuje wartość toString() każdego elementu w standardowym wyjściu / standardowym strumieniu błędów. Opcjonalnie można podać prefiks (msg), który jest dołączany do danych wyjściowych. Może to pomóc w rozróżnieniu różnych wezwań do drukowania. Jeśli równoległość jest większa niż 1, dane wyjściowe zostaną również poprzedzone identyfikatorem zadania, które wygenerowało dane wyjściowe.
writeUsingOutputFormat() / FileOutputFormat — metoda i klasa bazowa dla niestandardowych plików wyjściowych. Obsługuje niestandardową konwersję obiektów na bajty.
writeToSocket — zapisuje elementy do gniazda zgodnie ze schematem SerializationSchema
addSink — Wywołuje niestandardową funkcję ujścia. Flink jest dostarczany w pakiecie ze złączami do innych systemów (takich jak Apache Kafka), które są zaimplementowane jako funkcje ujścia.
Exercise
Uruchom przykłady z pakietu sink.
Uwaga! Program zapisuje wyniki w częściowych plikach w katalogu output na każdy procesor, nie wiem jak przekonać go do wypisania jednego pliku z danymi wyjściowymi.