Uwaga! Kod z którego będziemy korzystać na zajęciach jest dostępny na branchu VirtualThreadsAndGatherers w repozytorium https://github.com/WitMar/PRS2025 .
Jeżeli nie widzisz odpowiednich gałęzi na GitHubie wykonaj Ctr+T, a jak to nie pomoże to wybierz z menu Git->Fetch.
Nowoczesne programowanie współbieżne w języku Java korzysta z nowych możliwości takich jak:
Korutyny (przy pomocy CompletableFuture, Structured Concurrency)
Wirtualne wątki (Virtual Threads) — Java 21
Okna czasowe — Tumbling, Sliding, Session, Global
Korutyny są lekkimi procesami, które można zawiesić i wznowić bez blokowania zasobów systemowych.
W Javie korutyny pozwalają wykonywać wiele rzeczy na raz bez tworzenia ciężkich wątków systemowych. Chociaż w samej czystej Javie korutyn nie ma, to podobny pomysł wprowadza Project Loom, który pozwala tworzyć lekkie "wirtualne wątki". Dzięki temu programy mogą obsługiwać tysiące zadań na raz, bez zużywania dużej ilości pamięci i mocy procesora.
Korutyny działają asynchronicznie — to znaczy, że nie muszą czekać, aż jedno zadanie się skończy, zanim zaczną następne. Na przykład, gdy jedna operacja potrzebuje odpowiedzi z sieci, korutyna może ją "zamrozić" i w tym czasie robić coś innego. To pozwala pisać szybsze i bardziej wydajne aplikacje.
W Javie lub na platformie JVM korutyny pomagają w pracy z operacjami wejścia/wyjścia, siecią i serwerami, gdzie często trzeba długo czekać. Dzięki korutynom kod jest czytelniejszy, a programy są szybsze, bo nie marnują czasu na bezczynne czekanie.
Zasymulowanie podobnego działania w Javie realizujemy za pomocą:
CompletableFuture
StructuredTaskScope (Structured Concurrency)
Wirtualnych wątków (Project Loom)
Typowe zastosowania korutyn:
Asynchroniczne API
Przetwarzanie grafów i streamów
Serwery HTTP
Kolejki wiadomości
CompletableFuture to klasa w Javie, która pozwala uruchamiać zadania asynchronicznie, czyli w tle, bez blokowania głównego wątku programu. Jest częścią pakietu java.util.concurrent i pojawiła się w Javie 8.
CompletableFuture tworzy zadanie, które może się wykonać w przyszłości, kiedy będzie gotowe. - Zadanie może zwrócić wynik albo zakończyć się błędem. - Program może kontynuować inne działania w tym czasie, nie czekając na zakończenie zadania.
CompletableFuture domyślnie uruchamia swoje zadanie w osobnym wątku z puli wątków (tak zwany ForkJoinPool albo własny Executor jeśli podasz), nie jest więc korutyną!.
Główne cechy :
Można uruchomić zadanie metodami takimi jak supplyAsync() albo runAsync().
Po wykonaniu zadania można je połączyć z innymi zadaniami przy pomocy metod takich jak thenApply(), thenAccept() lub thenRun().
W przypadku błędów można ustalić reakcję za pomocą metod takich jak exceptionally() lub handle().
Można też łączyć kilka CompletableFuture razem metodami takimi jak thenCombine(), allOf() albo anyOf().
CompletableFuture pozwala na budowanie bardzo wydajnych aplikacji, które mogą robić kilka rzeczy jednocześnie, np. pobierać dane z sieci, zapisywać pliki lub wykonywać obliczenia, bez blokowania pracy programu.
Przykład podstawowy: CompletableFuture
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExample {
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Task running in: " + Thread.currentThread());
});
future.join();
}
}
Przykład sekwencji z CompletableFuture
CompletableFuture.supplyAsync(() -> "Step 1")
.thenApply(result -> {
System.out.println(result);
return "Step 2";
})
.thenAccept(System.out::println)
.join();
Zadanie
Uruchom kod w klasie ShopSimulation symulujący działanie sklepu za pomocą CompletableFuture.
Structured Concurrency to nowy sposób zarządzania współbieżnością w Javie, w którym wszystkie zadania potomne są powiązane ze swoim zadaniem nadrzędnym. Pojawił się jako część projektu Project Loom. Celem jest lepsze kontrolowanie życia wszystkich wątków i procesów asynchronicznych.
Gdy główny wątek (rodzic) uruchamia inne wątki (dzieci), tworzy strukturę przypominającą drzewo.
Wątki potomne są automatycznie zamykane lub przerywane, kiedy wątek rodzic kończy pracę.
Dzięki temu unika się "porzuconych" albo "uciekających" wątków, które mogą żyć za długo i powodować błędy.
Dlaczego jest to przydatne ?
Zapewnia jasne granice dla współbieżnych zadań.
Ułatwia obsługę błędów, ponieważ wyjątki z wątków potomnych mogą być zebrane w jednym miejscu.
Pomaga w automatycznym sprzątaniu zasobów (np. zamykanie połączeń, przerywanie operacji).
Pozwala pisać kod współbieżny, który jest łatwiejszy do czytania i testowania.
Bez Structured Concurrency łatwo zgubić kontrolę nad tym, ile wątków działa, kiedy się kończą i co się stanie, jeśli któryś zawiedzie. Structured Concurrency wprowadza porządek — wszystkie wątki potomne muszą zakończyć się razem z rodzicem, co czyni programy bezpieczniejszymi i bardziej przewidywalnymi.
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> user = scope.fork(() -> findUser());
Future<Integer> order = scope.fork(() -> fetchOrder());
scope.join(); // Czekaj na wszystkie zadania
scope.throwIfFailed(); // Rzuć wyjątek, jeśli coś poszło źle
System.out.println("User: " + user.resultNow());
System.out.println("Order: " + order.resultNow());
}
Structured concurrency (Java 21) grupuje zadania w bloki.
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
scope.fork(() -> {
System.out.println("Task 1 running");
return null;
});
scope.fork(() -> {
System.out.println("Task 2 running");
return null;
});
scope.join();
}
Zadanie
Uruchom kod w klasie StructuredTournamentMax symulujący wyszukiwanie maximum w tablicy korzystając z wielu wątków.
Wirtualne wątki (ang. Virtual Threads) to nowy rodzaj lekkich wątków w Javie, wprowadzony w ramach Project Loom. Wirtualne wątki są zarządzane przez maszynę wirtualną (JVM) i nie odpowiadają bezpośrednio jednemu wątkowi systemowemu.
Wirtualne wątki są bardzo tanie do utworzenia — można ich mieć tysiące, a nawet miliony w jednej aplikacji.
Gdy wątek czeka (np. na sieć lub plik), JVM zawiesza jego wykonanie i zwalnia zasoby systemowe.
Gdy dane będą gotowe, JVM wznawia wykonanie wirtualnego wątku.
Główne cechy: - Każdy wirtualny wątek działa jak zwykły wątek, ale jest dużo lżejszy. - Programista używa ich tak samo jak zwykłych wątków (Thread), ale JVM zarządza nimi bardziej inteligentnie. - Wirtualne wątki są automatycznie przypisywane do puli małej liczby wątków systemowych. - Wirtualne wątki współpracują świetnie z istniejącymi bibliotekami Java.
Wirtualne wątki pozwalają budować bardzo skalowalne aplikacje, np. serwery obsługujące miliony połączeń jednocześnie. Dzięki nim pisanie współbieżnego kodu jest prostsze, bo nie trzeba tworzyć skomplikowanych systemów obsługi asynchroniczności.
Runnable task = () -> {
System.out.println("Hello from virtual thread: " + Thread.currentThread());
};
Thread.startVirtualThread(task);
Thread.startVirtualThread(() -> {
System.out.println("Hello from virtual thread!");
});
for (int i = 0; i < 10000; i++) {
Thread.startVirtualThread(() -> {
try {
Thread.sleep(1000);
System.out.println("Done: " + Thread.currentThread());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 100; i++) {
executor.submit(() -> {
System.out.println("Executing task in: " + Thread.currentThread());
});
}
}
Zadanie
Uruchom kod w klasie VirtualThreadApiGateway symulujący zachowanie serwera HTTP (dodatkowo w katalogu test jest przykłąd symulujący zapytania do serwera).
Korutyny |
Wirtualne Wątki |
Wspólne cechy |
---|---|---|
Fine-grained async control |
Tradycyjny model kodu |
Ultra lekkie zarządzanie taskami |
Integracja ze strumieniami |
Możliwość blokowania (sleep, wait) |
Skalowalność do milionów tasków |
Trudniejsza składnia |
Znany API Thread |
Przystępność dla Java developerów |
W Javie 24 wprowadzono nową funkcjonalność o nazwie Stream Gatherers (JEP 485), która rozszerza możliwości API strumieni, umożliwiając tworzenie niestandardowych operacji pośrednich. Dzięki Gatherers programiści mogą definiować własne operacje transformujące dane w strumieniach, co wcześniej było trudne lub niemożliwe do osiągnięcia za pomocą standardowych metod takich jak map(), filter() czy flatMap().
Gatherer to interfejs reprezentujący niestandardową operację pośrednią w strumieniu.
Gatherers to klasa dostarczająca gotowe implementacje Gathererów, takie jak windowFixed(), windowSliding(), fold(), scan() i mapConcurrent().
import java.util.stream.Gatherers;
import java.util.List;
import java.util.stream.Stream;
public class GathererExample {
public static void main(String[] args) {
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9);
// Przykład użycia windowFixed
List<List<Integer>> fixedWindows = numbers.stream()
.gather(Gatherers.windowFixed(3))
.toList();
System.out.println("Fixed windows: " + fixedWindows);
}
}
Wynik działania powyższego kodu:
Fixed windows: [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
windowFixed(int windowSize): Dzieli strumień na niepokrywające się okna o stałym rozmiarze.
windowSliding(int windowSize): Tworzy przesuwające się okna o określonym rozmiarze, gdzie każde kolejne okno przesuwa się o jeden element.
fold(Supplier<R> initial, BiFunction<R, T, R> folder): Wykonuje operację redukcji, podobną do reduce(), ale jako operację pośrednią.
scan(Supplier<R> initial, BiFunction<R, T, R> scanner): Wykonuje akumulację prefiksową, emitując pośrednie wyniki po każdej iteracji.
mapConcurrent(int maxConcurrency, Function<T, R> mapper): Przetwarza elementy strumienia równolegle z wykorzystaniem wirtualnych wątków, zachowując kolejność elementów.
Umożliwiają tworzenie bardziej złożonych i niestandardowych operacji pośrednich w strumieniach.
Poprawiają czytelność i modularność kodu.
Wspierają przetwarzanie równoległe i asynchroniczne.
Ułatwiają operacje na strumieniach nieskończonych.
Stream Gatherers w Javie 24 to potężne narzędzie rozszerzające możliwości API strumieni, pozwalające na bardziej elastyczne i wydajne przetwarzanie danych. Dzięki nim programiści mogą tworzyć własne operacje pośrednie, dostosowane do specyficznych potrzeb aplikacji.
Zadanie
Uruchom przyklady pokazujące wykorzystanie Gatherers.
Oficjalna dokumentacja Oracle: https://docs.oracle.com/en/java/javase/24/docs/api/java.base/java/util/stream/Gatherers.html
JEP 485: https://openjdk.org/jeps/485
Artykuł na blogu: https://www.danvega.dev/blog/stream-gatherers