PRS03.rst

Przetwarzanie równoległe i strumieniowe

Uwaga! Kod z którego będziemy korzystać na zajęciach jest dostępny na branchu ThreadClassesStart w repozytorium https://github.com/WitMar/PRS2020 . Kod końcowy można znaleźć na branchu ThreadClassesEnd.

Jeżeli nie widzisz odpowiednich gałęzi na GitHubie wykonaj Ctr+T, a jak to nie pomoże to wybierz z menu Git->Fetch.

Producent konsument

Klasycznym wzorcem projektowym programowania współbieżnego jest wzorzec producent-konsument, w którym procesy są wyznaczane jako producenci lub konsumenci. Producenci są odpowiedzialni za dodanie danych do jakiejś udostępnionej globalnie struktury danych, a konsumenci są odpowiedzialni za odczytywanie danych z tej struktury. Tylko jedna strona, albo pojedynczy producent, albo pojedynczy konsument, może mieć dostęp do struktury w danym momencie.

col

Thread class methods

Yield

Metoda yield() jest statyczną metodą klasy Thread i może zatrzymać aktualnie wykonywany wątek i dać szansę na wykonanie innym oczekującym wątkom o tym samym priorytecie. W przypadku braku oczekujących wątków lub jeśli wszystkie oczekujące wątki mają niski priorytet, ten sam wątek będzie kontynuował swoje wykonanie. Zaletą metody yield() jest uzyskanie szansy na wykonanie innych oczekujących wątków, więc jeśli nasz bieżący wątek zajmie więcej czasu na wykonanie i przydzielenie procesora do innych wątków.

Metoda ta jest głównie wykorzystywana do debugowania oraz do testów.

    for (int i = 0; i < 5; ++i) {
     Thread.yield(); // By calling this method, MyThread stop its execution and giving a chance to a main thread
     log.info("Thread started:" + Thread.currentThread().getName());
}

Zobacz przykład w klasie Yield.java.

Join

Metoda ta powoduje, że aktywny wątek czeka na zakończenie wykonywania innego wątku. Możemy zdefiniować dodatkowo timeout ograniczający czas oczekiwania na zakończenie innego wątku.

Zauważmy, że główny wątek Javy nie jest zatrzymywany przez JVM i nie czeka na nasze potomne wątki. W związku z tym jeżeli nie poczekamy na ich zakończenie, to przetwarzanie może zakończyć się szybciej niż byśmy tego oczekiwali. Warto więc na końcu programu głównego czekać na zakończenie wszystkich wątków.

thread.start();

thread.join();

thread2.start();
thread3.start();

Zobacz przykład w klasie Join.java.

Synchronized

Programy wielowątkowe często mogą dojść do sytuacji, w której wiele wątków próbuje uzyskać dostęp do tych samych zasobów i ostatecznie daje błędne i nieprzewidziane wyniki.

Należy więc upewnić się za pomocą jakiejś metody synchronizacji, że tylko jeden wątek może uzyskać dostęp do zasobu w danym momencie. Java umożliwia tworzenie wątków i synchronizowanie ich zadań za pomocą synchronizowanych bloków. Słowo kluczowe synchronized służy do zakładania blokady tak aby tylko jeden wątek mógł wykonać operacje.

// Only one thread can execute at a time.
// sync_object is a reference to an object
// whose lock associates with the monitor.
// The code is said to be synchronized on
// the monitor object
synchronized(sync_object)
{
        // Access shared variables and other
        // shared resources
}

Synchronized możemy używać na różnych poziomach:

  • Metody instancji
public synchronized void increment() {
        number++;
}
  • Metody statycznej
public static synchronized void increment() {
        number++;
}
  • Bloku kodu
public synchronizowana void increment() {
        synchronized (this) {
                number++;
        }
}

Kiedy używamy zsynchronizowanego bloku, Java wewnętrznie używa monitora, znanego również jako blokada lub blokada wewnętrzna, aby zapewnić synchronizację. Te monitory są powiązane z obiektem; dlatego wszystkie zsynchronizowane bloki (metody) tego samego obiektu mogą mieć tylko jeden wątek wykonujący je w tym samym czasie. Jest to największy minus tego podejścia, gdyż mogą istnieć niezależne metody w klasie, które mogą być wykonane razem a my nie możemy przypisać różnych blokad dla pozbioru metod. Stąd zmiana ich na sychronizowane może niepotrzebnie spowolnić przetwarzanie.

Blokada stojąca za zsynchronizowanymi metodami i blokami jest możliwa do wielokrotneo przejęcia. Oznacza to, że bieżący wątek może wielokrotnie uruchamiać synchronizowaną blokadę, przechwytując blokadę.

Zadanie 1

Uruchom klasę Counter.java w pakiecie synchronized i zamień metody increment i decrement na synchronizowane. Zobacz jak zmieni się zachowanie programu. Dodaj trzecią metodę logującą numer na ekran, też czyniąc ją synchronizowaną. Czy może ona być wykonana niezależnie z inkrementacją i dekrementacją? Zamień synchronizowane metody na synchronizowane bloki kodu.

Wait

Służy do komunikacji między wątkami. Mówi wątkowi wywołującemu, aby zrezygnował z blokady i poszedł "spać", dopóki jakiś inny wątek nie wejdzie do tego samego monitora i nie wywoła funkcji notify() lub notifyAll(). Metoda wait() zwalnia blokadę przed oczekiwaniem i ponownie uzyskuje blokadę przed powrotem z metody wait(). Wait może służyć do zatrzymania wątku w celu oczekiwania na inny wątek aż on wykona swoje zadanie.

Uwaga! Jeżeli żaden wątek nie wykona notify nasz wątek utknie w stanie oczekiwania i nigdy się nie zakończy. Musimy więc świadomie korzystać z tej metody w naszym kodzie.

Zazwyczaj wait() jest używany, gdy czekasz na zmianę jakiegoś warunku, który jest pod kontrolą procesów spoza bieżącej metody (zazwyczaj ten warunek zostanie zmieniony przez inny wątek). Nie chcesz bezczynnie czekać podczas testowania warunku wewnątrz wątku. Wait() pozwala uśpić wątek podczas oczekiwania na zmianę zewnętrznych warunków i tylko wtedy, gdy wystąpi notify() lub notifyAll(), wątek budzi się i sprawdza zmiany. W ten sposób wait() zapewnia sposób synchronizowania działań między wątkami.

Jako przykład rozważ restaurację, która ma jednego szefa kuchni i jednego kelnera. Kelner musi poczekać, aż szef kuchni przygotuje posiłek. Gdy szef kuchni ma gotowy posiłek, szef kuchni powiadamia kelnera, który otrzymuje posiłek i wraca do czekania. To doskonały przykład współpracy wątków: szef kuchni reprezentuje producenta, a kelner reprezentuje konsumenta.

Uwaga! Gdy konsument się budzi, nie może zakładać, że stan, na który czekał, jest nadal aktualny. Stan mógł zostać zmieniony po wywołaniu metody notify() i przed przebudzeniem wątku konsumenta - np. przez to, że inny konkurencyjny wątek przejął zasób. Oczekujące wątki muszą sprawdzać stan po wybudzenu. Dlatego zwykle wywołujemy metodę wait() w pętli sprawdzającej warunek wznowienia działania (w przypadku przykładu producer - consumer jest to np. istnienie zasobu do pobrania).

W przypadku, gdy synchronizujemy się na obiekcie, a nie na klasie, musimy na obiekcie wywołać metodę wait.

Object lock = new Object();

synchronized(lock) {
        lock.wait();
}

Korzystając z metody wait(long timeout), możemy określić limit czasu, po którym wątek zostanie automatycznie wybudzony. Wątek może zostać wybudzony przed osiągnięciem limitu czasu za pomocą notify() lub notifyAll().

Zauważ, że wywołanie wait(0) jest tym samym, co wywołanie wait().

Notify

W przypadku wszystkich wątków oczekujących na monitorze tego obiektu (przy użyciu jednej z metod wait()) metoda notify() powiadamia dowolny z nich o przebudzeniu. Wybór dokładnie, który wątek ma się obudzić, jest niedeterministyczny i zależy od implementacji. Ponieważ notify() wybudza pojedynczy, losowy wątek, możemy go użyć do zaimplementowania wzajemnie wykluczającego się blokowania w przypadku, gdy wątki wykonują podobne zadania. Należy zauważyć, że wywołanie notify() w rzeczywistości nie zwalnia blokady zasobu. Mówi oczekującemu wątkowi, że ten wątek może się obudzić. Jednak blokada nie jest faktycznie zwalniana, dopóki zsynchronizowany blok zgłaszającego nie zostanie zakończony. Służy więc do informowania wątków w stanie wait o tym by się obudziły i kontynuowały przetwarzanie.

Jeżeli metoda notify() jest wywoływana, gdy żaden inny wątek nie czeka, notify() nic nie robi i powiadomienie zostaje utracone.

W przypadku, gdy synchronizujemy się na obiekcie a nie na klasie musimy na obiekcie wywołać metodę notify.

Object lock = new Object();

synchronized(lock) {
        lock.notify();
}

NotifyAll() prostu budzi wszystkie wątki, które czekają na monitorze tego obiektu.

Zobacz przykłady w klasach WaitNotify.java oraz WaitNotifyOnObject.java.

Uwaga! W przypadku gdy mamy X wątków czekających na synchronizacji i jeden wątek w środku czekający na wait, to przy wykonaniu notify nie możemy zakładać że wątek z wait będzie wykonany (wznowiony) szybciej niż te X czekających na wejście do synchronized !!

Zadanie 2

Dodaj trzeci wątek będący konsumerem, zobacz jak zachowuje się program, czy się kończy? Zamień wait na wait z parametrem liczbowym zobacz różnice.

Sleep

Sleep wstrzymuje działanie wątku na określony czas. Nie zwalnia on blokad, ani nie może być przerwany.

Różnice między wait a sleep :

wait wywołujesz na obiekcie, podczas gdy z drugiej strony wywołujesz sleep na samym wątku
wait (i notify) musi nastąpić w zsynchronizowanym bloku na obiekcie monitora, a sleep nie
operacja sleep nie zwalnia blokad, które trzyma, podczas gdy z drugiej strony wait zwalnia blokadę na obiekcie, dla którego wywoływana jest funkcja wait()

Locks

Zamiast używać niejawnego blokowania za pomocą synchronized, Java oferuje obiekty blokad określone przez interfejs Lock. Blokady obsługują różne metody dokładniejszej kontroli dostępu, dzięki czemu są bardziej elastyczne niż blokady na klasach lub obiektach.

W standardowym JDK dostępnych jest wiele implementacji blokad, które zostaną zademonstrowane w poniższych sekcjach.

ReentrantLock

Klasa ReentrantLock jest blokadą wzajemnego wykluczania o tym samym podstawowym zachowaniu, co synchronized, ale z rozszerzonymi możliwościami.

ReentrantLock lock = new ReentrantLock();
int count = 0;

void increment() {
        lock.lock();
        try {
                count++;
        } finally {
                lock.unlock();
        }
}

Blokada jest uzyskiwana przez lock() i zwalniana przez unlock(). Ważne jest, aby otoczyć kod w blok try/finally, aby zapewnić odblokowanie w przypadku wyjątków. Ta metoda jest bezpieczna wątkowo, podobnie jak zsynchronizowany odpowiednik. Jeśli inny wątek już nabył blokadę, kolejne wywołania funkcji lock() wstrzymują bieżący wątek, aż blokada zostanie odblokowana. Tylko jeden wątek może trzymać blokadę w danym momencie.

Zadanie 3

Dodaj Locki w klasie CounterLocks.java synchronizujący metody. Przepisz klase WaitNotifyLocks.java tak, by działała na lockach.

ReadWriteLock

Interfejs ReadWriteLock określa inny typ blokady utrzymujący parę blokad dostępu do odczytu i zapisu. Ideą blokad odczytu i zapisu jest to, że zwykle jest bezpiecznie czytać zmienne mutowalne, o ile nikt nie zapisuje do tej zmiennej. Tak więc blokada odczytu może być utrzymywana jednocześnie przez wiele wątków, o ile żadne wątki nie trzymają blokady zapisu. Może to poprawić wydajność i przepustowość w przypadku, gdy odczyty są częstsze niż zapisy.

ReadWriteLock lock = new ReentrantReadWriteLock();

void increment() {
        lock.writeLock().lock();
        try {
                count++;
        } finally {
                lock.writelock().unlock();
        }
}

Zadanie 4

Zamień Locki w klasie CounterLocks.java na ReadWriteLock metody.

Klasa Condition umożliwia wątkowi oczekiwanie na wystąpienie pewnego warunku podczas wykonywania sekcji krytycznej. Aby móc użyć metod wait i notify na obiekcie Lock potrzebujemy obiektu typu Condition. Analogiczne metody będą nazywać się await() i signal().

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
...
conditin.await();
condition.signal();

Semaphores

W przypadku gdy chcemy stworzyć blokadę (lock) którą może przejąć więcej niż jeden wątek możemy użyć semafora. Semafor to zmienna całkowita, współdzielona przez wiele procesów. Głównym celem korzystania z semafora jest synchronizacja procesów i kontrola dostępu do wspólnego zasobu w środowisku współbieżnym.

Podczas gdy blokady zwykle zapewniają wyłączny dostęp do zmiennych lub zasobów, semafor jest w stanie przepuścić przez blokadę N wątków i blokować każdy kolejny który będzie chciał przez niego przejść. Jest to przydatne w różnych scenariuszach, w których musisz ograniczyć ilość równoczesnego dostępu do niektórych części aplikacji.

public LoginQueueUsingSemaphore(int slotLimit) {
     semaphore = new Semaphore(slotLimit);
 }

 boolean tryLogin() {
     return semaphore.tryAcquire();
 }

 void logout() {
     semaphore.release();
 }

 int availableSlots() {
     return semaphore.availablePermits();
 }

Początkowa wartość semafora zależy od problemu. Zwykle używamy liczby dostępnych zasobów jako wartości początkowej. W kolejnych sekcjach podamy więcej przykładów inicjowania semaforów w różnych przypadkach użycia. Lock jest przykładem semafora binarnego o wartościach 0 lub 1.

Zobacz klase CounterSemaphores.java.

Zadanie 6

Zamień semafor w klasie CounterSemaphores tak by przepuszczał więcej niż jeden wątek. Czy w takim wypadku przetarzanie dobrze działa?

Atomic variables

W javie nie możemy zadeklarować zmiennej jako obiektu synchronizowanego. Istnieje jednak sposób by osiągnąć taki sam efekt jak w przypadku synchronizacji. Mianowicie definicja zmiennej jako atomowej.

Operacją atomową nazwiemy operację która wykonywana jest w jednym kroku. Innymi słowy albo wykona się zawsze w całości albo nie wykona się w ogóle. Ten sam koncept przyświeca transakcjom w bazach danych, gdzie wykonujemy albo wszystkie operacje w obrębie transakcji albo żadnej z nich.

Standardowo w Javie operacje na typach prostych jak integer nie są atomowe, to znaczy, przypisanie

int a = a + 1;

Łączy się z kilkoma operacjami, pobranie wartości zmiennej a, inkrementacja wartości, zapis z powrotem do zmiennej a. Wszystkie z tych operacji są niezależne i wątek je wykonujący może zostać zatrzymany pomiędzy nimi.

W celu poradzenia sobie z tymi problemamy utworzono specjalne typy danych tzw. zmienne atomowe, które oferują programiście dostęp do metod zmiany wasrtości zmiennych które realizowane są w sposób atomowy.

    AtomicInteger atomicInt = new AtomicInteger(0);

atomicInt.updateAndGet(n -> n + 2);

Zadanie 6

Zamień semafor w klasie CounterSynchronized int na AtomicInteger, sprawdź czy wszystko działa.

Zadanie typu projektowego

Zadanie 7

Zaimplementu w klasie WaitNotifyMultiprocess.java funkcje produce i consume tak aby trzy wątki mogły na raz produkowac dane do kolejki a dwa wątki odczytywać z kolejki tak, aby przetwarzanie było możliwie jak najbardziej wydajne.

Zwróć uwagę na to, że consumer musi czekać gdy kolejka jest pusta, gdy się wzbudzi nie ma pewności czy drugi wątek nie pobrał zadania z kolejki i czy jest coś do pobrania. Jak zrezalizowałbyś synchronizację względem zakończenia działania wątków konsumerów bazując na tym, że wiemy, że możemy zakończyć przetwarzanie jeżeli po 2 sekundach nie przyjdzie nowa wiadomości do kolejki?

*

Wykorzystano materiały z: