PRS10.rst

Przetwarzanie równoległe i strumieniowe

Uwaga! Kod z którego będziemy korzystać na zajęciach jest dostępny na branchu ApacheFlinkIntroStart w repozytorium https://github.com/WitMar/PRS2020 .

Jeżeli nie widzisz odpowiednich gałęzi na GitHubie wykonaj Ctr+T, a jak to nie pomoże to wybierz z menu Git->Fetch.

Uruchomienie

Uwaga! Na zajęciach musisz korzystać z systemu LINUX!

Flink działa na wszystkich środowiskach podobnych do UNIX, tj. Linux, Mac OS X i Cygwin (dla Windows). Aby uruchomić Flinka musisz mieć zainstalowaną Javę 11.

Krok 1: Pobierz najnowszą wersję binarną Flink, my skorzystamy z wersji 1.15

, następnie wypakuj do lokalnego folderu (na wydziale polecamy wypakować na dysku POLIGON).

Struktura katalogów Flinka:

bin/ zawiera plik binarny flink, a także kilka skryptów bash, które zarządzają różnymi zadaniami i zadaniami
conf/ zawiera pliki konfiguracyjne, w tym flink-conf.yaml
example/ zawiera przykładowe aplikacje napisane z wykorzystaniem Flink

Aby uruchomić lokalny klaster, uruchom skrypt bash dołączony do Flink:

./bin/start-cluster.sh

Powinieneś być w stanie przejść do interfejsu internetowego pod adresem localhost:8081, aby wyświetlić pulpit nawigacyjny Flink i sprawdzić, czy klaster jest uruchomiony i działa.

Aby szybko zatrzymać klaster i wszystkie działające komponenty, możesz użyć dostarczonego skryptu:

./bin/stop-cluster.sh

Flink dostarcza narzędzie CLI, bin/flink, które może uruchamiać programy spakowane jako archiwa Java (JAR) i kontrolować ich wykonywanie. Przesłanie zadania oznacza przesłanie pliku JAR zadania i powiązanych zależności do działającego klastra Flink i wykonanie go.

Aby wdrożyć przykładowe zadanie liczenia słów w działającym klastrze, wydaj następujące polecenie:

./bin/flink run examples/streaming/WordCount.jar

dla zadań napisanych w języku Python możesz wykonać:

bin/flink run -py examples/python/table/word_count.py

by przekierować wynik do katalogu wykonaj

bin/flink run -py examples/python/table/word_count.py --output wynik

Możesz dodatkowo zweryfikować dane wyjściowe, przeglądając logi:

tail log/flink-*-taskexecutor-*.out

Python intrefaces

PyFlink umożliwia użytkownikom tworzenie zadań Flink w Pythonie. Flink udostępnia trzy typy interfejsów API: API SQL, API tabel oraz API DataStream. Pierwsze dwa to stosunkowo podstawowe interfejsy API. Interfejs API table PyFlink umożliwia pisanie zapytań relacyjnych w sposób podobny do używania SQL lub pracy z danymi tabelarycznymi w Pythonie. Table API jest bardziej ogólnym interfejsem API niż SQL. W przypadku zadań opracowanych w oparciu o API tabel, logika jest wykonywana po serii optymalizacji po stronie serwera.

Interfejs API PyFlink DataStream zapewnia kontrolę na niższym poziomie nad podstawowymi blokami konstrukcyjnymi Flink, stanem i czasem, w celu tworzenia bardziej złożonych przypadków użycia przetwarzania strumieniowego.

SQL interface

Pozwala wykorzystywać standardowy SQL do wykonywania zapytań na danych.

Python Table API

Python Table API umożliwia użytkownikom procesowanie danych poprzez wykorzystanie metod funkcyjnych jak select(), group_by(), where() które są automatycznie tłumaczone na niskopoziomowe metody.

Oba interfejsy Table API oraz SQL korzystają ze wspólnego modelu danych, którym są tabele. Wyrażenia na tablicach są "lazy evaluated" (czyli operacje w strumieniu wykonywane są dopiero w momencie, gdy dane są nam potrzebne na wyjściu).

Wejściowym obiektem na którym będziemy pracować będzie tzw. "TableEnvironment". W ramach tego środowiska będziemy rejestrować tabele i źródła danych, wykonywać zapytania oraz uruchamiać dedykowane funkcje.

EnvironmentSettings oraz TableEnvironment z modułu pyflink.table będą wykorzystywane do tworzenia TableEnvironment dla zadań batchowych.

TableEnvironment

Żeby utworzyć zadanie we Flink musimy utworzyć "środowisko" tabel. Do wyboru mamy dwie opcje przetwarzanie batchowe i strumieniowe. Na tych zajęciach skupimy się na przetwarzaniu batchowym (wsadowym).

    env_settings = EnvironmentSettings.in_batch_mode()
table = TableEnvironment.create(env_settings)

Zadanie 1

Zobacz przykłady tworzenia środowiska tabel w pliku TableEnv.py.

Model danych

Tabele służą za źródła danych jak i jako obiekty pośrednie przy transformacji danych. Tabele możemy tworzyć zapisując obiekty na sztywno w kodzie jako kolekcje w Pythonie lub odczytując je ze źródeł takich jak pliki, tabele czy też zewnętrzne systemy (jak Kafka).

Tworzenie tabel na postawie zapisanych na sztywno danych :

Pyflink wspiera użytkowników w tworzeniu tabel źródłowych z podanej listy. Poniższy przykład definiuje tabelę zawierającą trzy wiersze danych: [(Hello, 1), ((world, 2), ((Flink, 3)]), która ma dwie kolumny z nazwami kolumn a i B oraz typami varchar i bigint.

tab = t_env.from_elements([("hello", 1), ("world", 2), ("flink", 3)], ['a', 'b'])

Ta metoda jest zwykle używana w fazie testowej, gdyż może szybko utworzyć tabelę źródła danych w celu weryfikacji logiki zadania. Pierwszy parametr służy do określenia listy danych. Każdy element na liście musi być typu krotki; Drugi parametr określa schemat tabeli

Tworzenie tabel za pomocą SQL API.

DDL jest najbardziej zalecanym sposobem definiowania tabel źródeł i ujść danych, a wszystkie konektory obsługiwane w Java Table API & SQL mogą być używane w zadaniach pyflink Table API poprzez DDL. Wśród nich są: Kafka, Upsert Kafka, Firehose, Kinesis, JDBC, Elasticsearch, FileSystem, HBase, DataGen, Print, BlackHole, Hive. Podobnie jak w przypadku tworzenia tabeli źródła danych, można również utworzyć tabelę wynikową za pomocą języka DDL. Wynik przetwarzania możemy zapisywać w plikach, albo przesyłać do zewnętrznych systemów, czy kolejek.

Zadanie 2

Zobacz przykłady tworzenia tabeli z kolekcji w pliku DataSourceExample.py.

Zadanie 3

Zobacz przykłady tworzenia środowiska tabel z pliku CSV w pliku FileSourceCSV.py.

Zadanie 4

Zobacz przykłady tworzenia środowiska tabel źródła danych i ujścia danych z pliku CSV w pliku FileSink.py. Zobacz co stanie się, gdy nie wykonamy wait() po operacji insert.

Wait if necessary for at most the given time (milliseconds) for the data to be ready. For a select operation, this method will wait until the first row can be accessed locally. For an insert operation, this method will wait for the job to finish

Operacje

Operacje na tabelach wykonaywane są funkcyjnie, tabele są niemutowalne, więc wszystkie operacje tworzą, jako wynik działania, nowe kopie tabel, które są następnie dalej przetwarzane w kolejnych krokach.

Projekcje

select - działa jak select w SQL, pozwala nam wybierać kolumny z tabel, dodawać, usuwać kolumny, czy też nadawać im aliasy, albo tworzyć nowe na podstawie istniejących danych.

Dodatkowo możemy na wyniku wykonywać znane operacje z SQL takie jak limit(), czy distinct().

Lista funckji SQL z których możemy korzystać jest dostępna tutaj (w postaci API SQL i Table API) :

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/systemfunctions/

Zadanie 5

Zobacz przykłady selectów w pliku Select.py.

Filtry

where - umożliwia filtrowanie wierzy za pomocą logicznego warunku, podobną rolę spełnia element DataStream api o nazwie filter

Zadanie 6

Zobacz przykłady selectów w pliku Filtering.py.

Join

join - służy do łączenia dwóch tabel w jedną tabelę. Uwaga! Flink nie radzi sobie z konfliktami nazw i w związku z tym nie możemy mieć tej samej nazwy kolumny w obu tabelach!

Zadanie 7

Zobacz przykłady selectów w pliku Join.py.

Aggregacje

group_by - służy do agregacji wierszy i tworzenia podsumowań takich jak średnie, zliczenie (count) czy sumowanie

Zadanie 8

Zobacz przykłady selectów w pliku Aggregations.py.

User defined functions (UDF)

Python Table API to relacyjne API, które pod względem funkcji jest podobne do SQL. Niestandardowe funkcje w SQL są bardzo ważne, ponieważ mogą rozszerzyć zakres, w jakim można użyć SQL. Podobnie funkcje UDF są przeznaczone do opracowywania funkcji niestandardowych w celu rozszerzenia scenariuszy, w których używany jest interfejs API tabel w języku Python. Oprócz zadań Python Table API, Python UDF mogą być również używane w zadaniach Java Table API i SQL.

Istnieje wiele sposobów definiowania Python UDF w PyFlink. Użytkownicy mogą zdefiniować klasę Pythona, która dziedziczy funkcję ScalarFunction lub zdefiniować zwykłą funkcję Pythona lub funkcję Lambda, aby zaimplementować logikę UDF.

cols

Zadanie 9

Zobacz przykład w pliku UserDefined.py.

Plan wykonania zadania

Zadanie 10

Prześlij na serwer zadanie Aggreations_server.py zobacz w GUI jak dzielone jest zadanie i jak wygląda jego wykonanie.

Uwaga! Flink nie obsługuje nagłówków w CSV, zarówno odczytu nagłówka jako osobnej wartości i pomijaniu go przy tworzeniu tabeli jak i możliwości zapisu w ujściu jako pierwszy wiersz. Dlatego też wszystkie dane wejściowe powinny mieć usunięty nagłówek z danych csv, inaczej podczas uruchomienia powstaną błędy.

Przykłady

W pliku word_count.py znajduje się pełne zadanie Flink opracowane w oparciu o Python Table API. Logika zadania polega na odczytaniu pliku, obliczeniu liczby słów i zapisaniu wyników obliczeń do pliku. Wydaje się proste, ale zawiera wszystkie podstawowe procesy tworzenia zadania Python Table API.

Pierwszym krokiem jest zdefiniowanie trybu wykonywania zadania, np. w trybie wsadowym lub strumieniowym, współbieżności i konfiguracji zadania.

Następnym krokiem jest zdefiniowanie tabeli źródłowej i tabeli ujścia. Tabela źródłowa określa, skąd pochodzi źródło danych zadania i jaki jest format danych. Tabela ujścia definiuje, gdzie zapisać wynik wykonania zadania i jaki jest format danych.

Ostatnim krokiem jest zdefiniowanie logiki wykonania zadania, jak obliczanie zapisanej liczby w tym przykładzie.

Dodatkowo w pliku basic_operations.py można znaleźć przykłady podstawowych operacji na prostym modelu danych.

Zadanie 11

Napisz program, który wypisze liczbę państw w podziale na kontynenty

Zadanie 11

Napisz program, który wypisze statystykę liczby państw zaczynających się na dane litery w podziale na kontynenty. Czyli np. A 1 EU A 2 AF .. B 1 EU itp.

*

Wykorzystano materiały z: