PRS08.rst

Przetwarzanie równoległe i strumieniowe

Uwaga! Kod z którego będziemy korzystać na zajęciach jest dostępny na branchu VirtualThreadsAndGatherers w repozytorium https://github.com/WitMar/PRS2025 .

Jeżeli nie widzisz odpowiednich gałęzi na GitHubie wykonaj Ctr+T, a jak to nie pomoże to wybierz z menu Git->Fetch.

Przetwarzanie danych w Java, wirtualne wątki, okna, korutyny w Javie

Wprowadzenie

Nowoczesne programowanie współbieżne w języku Java korzysta z nowych możliwości takich jak:

  • Korutyny (przy pomocy CompletableFuture, Structured Concurrency)

  • Wirtualne wątki (Virtual Threads) — Java 21

  • Okna czasowe — Tumbling, Sliding, Session, Global

Korutyny w Javie

Korutyny są lekkimi procesami, które można zawiesić i wznowić bez blokowania zasobów systemowych.

W Javie korutyny pozwalają wykonywać wiele rzeczy na raz bez tworzenia ciężkich wątków systemowych. Chociaż w samej czystej Javie korutyn nie ma, to podobny pomysł wprowadza Project Loom, który pozwala tworzyć lekkie "wirtualne wątki". Dzięki temu programy mogą obsługiwać tysiące zadań na raz, bez zużywania dużej ilości pamięci i mocy procesora.

Korutyny działają asynchronicznie — to znaczy, że nie muszą czekać, aż jedno zadanie się skończy, zanim zaczną następne. Na przykład, gdy jedna operacja potrzebuje odpowiedzi z sieci, korutyna może ją "zamrozić" i w tym czasie robić coś innego. To pozwala pisać szybsze i bardziej wydajne aplikacje.

W Javie lub na platformie JVM korutyny pomagają w pracy z operacjami wejścia/wyjścia, siecią i serwerami, gdzie często trzeba długo czekać. Dzięki korutynom kod jest czytelniejszy, a programy są szybsze, bo nie marnują czasu na bezczynne czekanie.

Zasymulowanie podobnego działania w Javie realizujemy za pomocą:

  • CompletableFuture

  • StructuredTaskScope (Structured Concurrency)

  • Wirtualnych wątków (Project Loom)

Typowe zastosowania korutyn:

  • Asynchroniczne API

  • Przetwarzanie grafów i streamów

  • Serwery HTTP

  • Kolejki wiadomości

CompletableFuture w Javie

CompletableFuture to klasa w Javie, która pozwala uruchamiać zadania asynchronicznie, czyli w tle, bez blokowania głównego wątku programu. Jest częścią pakietu java.util.concurrent i pojawiła się w Javie 8.

CompletableFuture tworzy zadanie, które może się wykonać w przyszłości, kiedy będzie gotowe. - Zadanie może zwrócić wynik albo zakończyć się błędem. - Program może kontynuować inne działania w tym czasie, nie czekając na zakończenie zadania.

CompletableFuture domyślnie uruchamia swoje zadanie w osobnym wątku z puli wątków (tak zwany ForkJoinPool albo własny Executor jeśli podasz), nie jest więc korutyną!.

Główne cechy :

  • Można uruchomić zadanie metodami takimi jak supplyAsync() albo runAsync().

  • Po wykonaniu zadania można je połączyć z innymi zadaniami przy pomocy metod takich jak thenApply(), thenAccept() lub thenRun().

  • W przypadku błędów można ustalić reakcję za pomocą metod takich jak exceptionally() lub handle().

  • Można też łączyć kilka CompletableFuture razem metodami takimi jak thenCombine(), allOf() albo anyOf().

CompletableFuture pozwala na budowanie bardzo wydajnych aplikacji, które mogą robić kilka rzeczy jednocześnie, np. pobierać dane z sieci, zapisywać pliki lub wykonywać obliczenia, bez blokowania pracy programu.

Przykład podstawowy: CompletableFuture

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("Task running in: " + Thread.currentThread());
        });

        future.join();
    }
}

Przykład sekwencji z CompletableFuture

CompletableFuture.supplyAsync(() -> "Step 1")
    .thenApply(result -> {
        System.out.println(result);
        return "Step 2";
    })
    .thenAccept(System.out::println)
    .join();

Zadanie

Uruchom kod w klasie ShopSimulation symulujący działanie sklepu za pomocą CompletableFuture.

Structured Concurrency w Javie

Structured Concurrency to nowy sposób zarządzania współbieżnością w Javie, w którym wszystkie zadania potomne są powiązane ze swoim zadaniem nadrzędnym. Pojawił się jako część projektu Project Loom. Celem jest lepsze kontrolowanie życia wszystkich wątków i procesów asynchronicznych.

  • Gdy główny wątek (rodzic) uruchamia inne wątki (dzieci), tworzy strukturę przypominającą drzewo.

  • Wątki potomne są automatycznie zamykane lub przerywane, kiedy wątek rodzic kończy pracę.

  • Dzięki temu unika się "porzuconych" albo "uciekających" wątków, które mogą żyć za długo i powodować błędy.

Dlaczego jest to przydatne ?

  • Zapewnia jasne granice dla współbieżnych zadań.

  • Ułatwia obsługę błędów, ponieważ wyjątki z wątków potomnych mogą być zebrane w jednym miejscu.

  • Pomaga w automatycznym sprzątaniu zasobów (np. zamykanie połączeń, przerywanie operacji).

  • Pozwala pisać kod współbieżny, który jest łatwiejszy do czytania i testowania.

Bez Structured Concurrency łatwo zgubić kontrolę nad tym, ile wątków działa, kiedy się kończą i co się stanie, jeśli któryś zawiedzie. Structured Concurrency wprowadza porządek — wszystkie wątki potomne muszą zakończyć się razem z rodzicem, co czyni programy bezpieczniejszymi i bardziej przewidywalnymi.

Przykład

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<String> user = scope.fork(() -> findUser());
    Future<Integer> order = scope.fork(() -> fetchOrder());

    scope.join();           // Czekaj na wszystkie zadania
    scope.throwIfFailed();  // Rzuć wyjątek, jeśli coś poszło źle

    System.out.println("User: " + user.resultNow());
    System.out.println("Order: " + order.resultNow());
}

Structured Concurrency

Structured concurrency (Java 21) grupuje zadania w bloki.

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    scope.fork(() -> {
        System.out.println("Task 1 running");
        return null;
    });

    scope.fork(() -> {
        System.out.println("Task 2 running");
        return null;
    });

    scope.join();
}

Zadanie

Uruchom kod w klasie StructuredTournamentMax symulujący wyszukiwanie maximum w tablicy korzystając z wielu wątków.

Wirtualne wątki w Javie

Wirtualne wątki (ang. Virtual Threads) to nowy rodzaj lekkich wątków w Javie, wprowadzony w ramach Project Loom. Wirtualne wątki są zarządzane przez maszynę wirtualną (JVM) i nie odpowiadają bezpośrednio jednemu wątkowi systemowemu.

  • Wirtualne wątki są bardzo tanie do utworzenia — można ich mieć tysiące, a nawet miliony w jednej aplikacji.

  • Gdy wątek czeka (np. na sieć lub plik), JVM zawiesza jego wykonanie i zwalnia zasoby systemowe.

  • Gdy dane będą gotowe, JVM wznawia wykonanie wirtualnego wątku.

Główne cechy: - Każdy wirtualny wątek działa jak zwykły wątek, ale jest dużo lżejszy. - Programista używa ich tak samo jak zwykłych wątków (Thread), ale JVM zarządza nimi bardziej inteligentnie. - Wirtualne wątki są automatycznie przypisywane do puli małej liczby wątków systemowych. - Wirtualne wątki współpracują świetnie z istniejącymi bibliotekami Java.

Wirtualne wątki pozwalają budować bardzo skalowalne aplikacje, np. serwery obsługujące miliony połączeń jednocześnie. Dzięki nim pisanie współbieżnego kodu jest prostsze, bo nie trzeba tworzyć skomplikowanych systemów obsługi asynchroniczności.

Przykład

Runnable task = () -> {
    System.out.println("Hello from virtual thread: " + Thread.currentThread());
};

Thread.startVirtualThread(task);

Tworzenie pojedynczego wirtualnego wątku

Thread.startVirtualThread(() -> {
    System.out.println("Hello from virtual thread!");
});

Tworzenie wielu wirtualnych wątków

for (int i = 0; i < 10000; i++) {
    Thread.startVirtualThread(() -> {
        try {
            Thread.sleep(1000);
            System.out.println("Done: " + Thread.currentThread());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}

Wirtualne wątki z ExecutorService

try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
    for (int i = 0; i < 100; i++) {
        executor.submit(() -> {
            System.out.println("Executing task in: " + Thread.currentThread());
        });
    }
}

Zadanie

Uruchom kod w klasie VirtualThreadApiGateway symulujący zachowanie serwera HTTP (dodatkowo w katalogu test jest przykłąd symulujący zapytania do serwera).

Porównanie Korutyn i Wirtualnych Wątków

Korutyny

Wirtualne Wątki

Wspólne cechy

Fine-grained async control

Tradycyjny model kodu

Ultra lekkie zarządzanie taskami

Integracja ze strumieniami

Możliwość blokowania (sleep, wait)

Skalowalność do milionów tasków

Trudniejsza składnia

Znany API Thread

Przystępność dla Java developerów

Stream Gatherers w Javie 24

Wprowadzenie

W Javie 24 wprowadzono nową funkcjonalność o nazwie Stream Gatherers (JEP 485), która rozszerza możliwości API strumieni, umożliwiając tworzenie niestandardowych operacji pośrednich. Dzięki Gatherers programiści mogą definiować własne operacje transformujące dane w strumieniach, co wcześniej było trudne lub niemożliwe do osiągnięcia za pomocą standardowych metod takich jak map(), filter() czy flatMap().

Podstawowe informacje

  • Gatherer to interfejs reprezentujący niestandardową operację pośrednią w strumieniu.

  • Gatherers to klasa dostarczająca gotowe implementacje Gathererów, takie jak windowFixed(), windowSliding(), fold(), scan() i mapConcurrent().

Przykładowe użycie

import java.util.stream.Gatherers;
import java.util.List;
import java.util.stream.Stream;

public class GathererExample {
    public static void main(String[] args) {
        List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9);

        // Przykład użycia windowFixed
        List<List<Integer>> fixedWindows = numbers.stream()
            .gather(Gatherers.windowFixed(3))
            .toList();

        System.out.println("Fixed windows: " + fixedWindows);
    }
}

Wynik działania powyższego kodu:

Fixed windows: [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

Dostępne metody w klasie Gatherers

  • windowFixed(int windowSize): Dzieli strumień na niepokrywające się okna o stałym rozmiarze.

  • windowSliding(int windowSize): Tworzy przesuwające się okna o określonym rozmiarze, gdzie każde kolejne okno przesuwa się o jeden element.

  • fold(Supplier<R> initial, BiFunction<R, T, R> folder): Wykonuje operację redukcji, podobną do reduce(), ale jako operację pośrednią.

  • scan(Supplier<R> initial, BiFunction<R, T, R> scanner): Wykonuje akumulację prefiksową, emitując pośrednie wyniki po każdej iteracji.

  • mapConcurrent(int maxConcurrency, Function<T, R> mapper): Przetwarza elementy strumienia równolegle z wykorzystaniem wirtualnych wątków, zachowując kolejność elementów.

Zalety korzystania z Gatherers

  • Umożliwiają tworzenie bardziej złożonych i niestandardowych operacji pośrednich w strumieniach.

  • Poprawiają czytelność i modularność kodu.

  • Wspierają przetwarzanie równoległe i asynchroniczne.

  • Ułatwiają operacje na strumieniach nieskończonych.

Podsumowanie

Stream Gatherers w Javie 24 to potężne narzędzie rozszerzające możliwości API strumieni, pozwalające na bardziej elastyczne i wydajne przetwarzanie danych. Dzięki nim programiści mogą tworzyć własne operacje pośrednie, dostosowane do specyficznych potrzeb aplikacji.

Zadanie

Uruchom przyklady pokazujące wykorzystanie Gatherers.

Dodatkowe źródła