PRS04.rst

Laboratorium 4 - Multiprocessing. Anomalie związane z przetwarzaniem równoległym, zagłodzenie, race condition, zakleszczenie

multiprocessing

multiprocessing to pakiet obsługujący tworzenie procesów przy użyciu interfejsu API podobnie do modułu thread.

Pakiet multiprocessing oferuje zarówno lokalną, jak i zdalną współbieżność, skutecznie omijając globalną blokadę interpretera, używając podprocesów zamiast wątków. Dzięki temu moduł wieloprocesorowy pozwala programiście na pełne wykorzystanie wielu procesorów na danej maszynie. Działa zarówno na systemie Unix, jak i Windows.

W przetwarzaniu multiprocessing procesy są tworzone przez utworzenie obiektu Process, a następnie wywołanie jego metody start().

from time import sleep
from multiprocessing import Process

def task():
    sleep(1)
    print('This is from another process')

if __name__ == '__main__':
    process = Process(target=task)
    process.start()
    print('Waiting for the process...')
    process.join()
Waiting for the process...
This is from another process
from multiprocessing import Process

def f(name):
    print('Cześć', name)

if __name__ == '__main__':
    p = Process(target=f, args=('Marcin',))
    p.start()
    p.join()
Cześć Marcin

Podobnie możemy tworzyć proces jako podklase klasy Process:

from time import sleep
from multiprocessing import Process

class CustomProcess(Process):
    def run(self):
        sleep(1)
        print('This is coming from another process')

if __name__ == '__main__':
    process = CustomProcess()
    process.start()
    print('Waiting for the process to finish')
    process.join()
Waiting for the process to finish
This is coming from another process

Jak widać w poniższym przykładzie wykonywany kod uruchamia nowy process w systemie.

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('Cześć', name)

if __name__ == '__main__':
    info('main funkcja')
    p = Process(target=f, args=('Marcin',))
    p.start()
    p.join()
main funkcja
module name: __main__
parent process: 58469
process id: 59795
function f
module name: __main__
parent process: 59795
process id: 60127
Cześć Marcin

Wykorzystanie Lock w multiprocessing

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9

Dane między procesami być przechowywane na mapie pamięci współdzielone przy użyciu Value lub Array.

Zmienne współdzielone oznaczają, że zmiany dokonane w jednym procesie są zawsze propagowane i udostępniane innym procesom. Wystąpienie multiprocessing.Value można zdefiniować w konstruktorze klasy jako zmienną. Konstruktor klasy multiprocessing.Value wymaga od nas określenia typu danych oraz wartości początkowej. Definicje konstruktorów obiektów:

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

multiprocessing.Value(typecode_or_type, *args, lock=True)

Typy z moduly ctypes: ‘i’ - int, ‘d’ - double, c_char - char, c_bool - bool, c_wchar_p - String.

Jeśli blokada ma wartość True (wartość domyślna), tworzony jest nowy obiekt blokady w celu synchronizacji dostępu do wartości. Jeśli lock jest obiektem Lock lub RLock, zostanie użyty do synchronizacji dostępu do wartości. Jeśli blokada ma wartość False, dostęp do zwróconego obiektu nie będzie automatycznie chroniony blokadą, więc niekoniecznie będzie „bezpieczny dla procesu”.

Na przykład:

from multiprocessing import Process, Value, Array
from ctypes import Structure, c_bool

def f(n, a, m):
    n.value = 3.1415927
    m.value = True
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    bools = Value(c_bool, False)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr, bools))
    p.start()
    p.join()

    print(num.value)
    print(bools.value)
    print(arr[:])
3.1415927
True
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Tworzenie nowych procesów czy wątków? Jak zobaczymy niżej jako że Jupyter działa na Linuxie uruchamiane są nowe wątki poprzez fork. Uruchom program u siebie na systemie windows.

import multiprocessing

methods = multiprocessing.get_all_start_methods()
print(methods)
method = multiprocessing.get_start_method()
print(method)
['fork', 'spawn', 'forkserver']
fork

Aby sprawdzić czy warto wykorzystywać ten moduł sprawdź liczbę rdzeni na komputerze z któreo korzystasz (odpal lokalnie!)

import multiprocessing

print("Number of cpu : ", multiprocessing.cpu_count())
Number of cpu :  6

Race condition

Przykład:

from threading import Thread

def adder(amount, repeats):
    global value
    for _ in range(repeats):
        value += amount*2

def subtractor(amount, repeats):
    global value
    for _ in range(repeats):
        value -= amount*2

value = 0
adder_thread = Thread(target=adder, args=(10, 500000))
adder_thread.start()
subtractor_thread = Thread(target=subtractor, args=(10, 500000))
subtractor_thread.start()
print('Waiting for threads to finish...')
adder_thread.join()
subtractor_thread.join()

print(f'Value: {value}')
Waiting for threads to finish...
Value: 4731260

Zakleszczenie

Przykład watki czekają na siebie (złe użycie join()):

from threading import current_thread
from threading import Thread

def task(other):
    print(f'[{current_thread().name}] waiting on [{other.name}]...')
    other.join()

main_thread = current_thread()
new_thread = Thread(target=task, args=(main_thread,))
new_thread.start()
task(new_thread)

Przykład złe blokowanie locków:

from time import sleep
from threading import Thread
from threading import Lock

def task(number, lock1, lock2):
    print(f'Thread {number} acquiring lock 1...')
    with lock1:
        sleep(1)
        print(f'Thread {number} acquiring lock 2...')
        with lock2:
            pass

lock1 = Lock()
lock2 = Lock()
thread1 = Thread(target=task, args=(1, lock1, lock2))
thread2 = Thread(target=task, args=(2, lock2, lock1))
thread1.start()
thread2.start()
thread1.join()
thread2.join()

Zagłodzenie

from time import sleep
from threading import Thread
from threading import Lock

# task
def task(lock, identifier):
    with lock:
        for i in range(5):
            print(f'Thread {identifier} working')
            sleep(1)

lock = Lock()
threads = [Thread(target=task, args=(lock,i)) for i in range(2)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
Thread 0 working
Thread 0 working
Thread 0 working
Thread 0 working
Thread 0 working
Thread 1 working
Thread 1 working
Thread 1 working
Thread 1 working
Thread 1 working

Fix nr 1

from time import sleep
from threading import Thread
from threading import Lock

def task(lock, identifier):
    for i in range(5):
        lock.acquire()
        print(f'Thread {identifier} working')
        sleep(1)
        lock.release()

lock = Lock()
threads = [Thread(target=task, args=(lock,i)) for i in range(2)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
Thread 0 working
Thread 0 working
Thread 0 working
Thread 0 working
Thread 0 working
Thread 1 working
Thread 1 working
Thread 1 working
Thread 1 working
Thread 1 working

Nadal nie jest sprawiedliwie, fix nr 2:

from time import sleep
from threading import Thread
from threading import Semaphore

def task(lock, identifier):
    for i in range(5):
        with lock:
            print(f'Thread {identifier} working')
            sleep(1)

lock = Semaphore(1)
threads = [Thread(target=task, args=(lock,i)) for i in range(2)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
Thread 0 working
Thread 0 working
Thread 0 working
Thread 0 working
Thread 0 working
Thread 1 working
Thread 1 working
Thread 1 working
Thread 1 working
Thread 1 working

Żeby wymusić przełączenie najlepiej użyć sleep()

from time import sleep
from threading import Thread
from threading import Semaphore

def task(lock, identifier):
    for i in range(5):
        sleep(0.001)
        with lock:
            print(f'Thread {identifier} working')
            sleep(1)

lock = Semaphore(1)
threads = [Thread(target=task, args=(lock,i)) for i in range(2)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
Thread 0 working
Thread 1 working
Thread 0 working
Thread 1 working
Thread 0 working
Thread 1 working
Thread 0 working
Thread 1 working
Thread 0 working
Thread 1 working

Livelock

from time import sleep
from threading import Thread
from threading import Lock

def task(number, lock1, lock2):
    for i in range(10):
        with lock1:
            sleep(0.1)
            if lock2.locked():
                print(f'Task {number} cannot get the second lock, giving up...')
            else:
                with lock2:
                    print(f'Task {number} made it, all done.')
                    break

lock1 = Lock()
lock2 = Lock()
thread1 = Thread(target=task, args=(0, lock1, lock2))
thread2 = Thread(target=task, args=(1, lock2, lock1))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
Task 0 cannot get the second lock, giving up...
Task 1 cannot get the second lock, giving up...
Task 0 cannot get the second lock, giving up...
Task 1 cannot get the second lock, giving up...
Task 0 cannot get the second lock, giving up...
Task 1 cannot get the second lock, giving up...
Task 0 cannot get the second lock, giving up...
Task 1 cannot get the second lock, giving up...
Task 0 cannot get the second lock, giving up...
Task 1 cannot get the second lock, giving up...
Task 0 cannot get the second lock, giving up...
Task 1 cannot get the second lock, giving up...
Task 0 cannot get the second lock, giving up...
Task 1 cannot get the second lock, giving up...
Task 0 cannot get the second lock, giving up...
Task 1 cannot get the second lock, giving up...
Task 0 cannot get the second lock, giving up...
Task 1 cannot get the second lock, giving up...
Task 0 cannot get the second lock, giving up...
Task 1 made it, all done.

Zadanie

Przeanalizuj poniższy kod sklepu sprzedającego trzy elementy następnie wykonaj zadania na moodle (debbuger z Pycharm może ci pomóc w analizie).

from enum import Enum
import random
from threading import Thread, Lock, current_thread

class Items(Enum):
    GARNEK = 0
    SZKLANKA = 1
    TALERZ = 2

class Shop:
    listLock = []
    warehouse = {}

    def __init__(self):
        self.listLock.append(Lock())
        self.listLock.append(Lock())
        self.listLock.append(Lock())
        self.warehouse[Items.GARNEK.name] = 10000
        self.warehouse[Items.TALERZ.name] = 200
        self.warehouse[Items.SZKLANKA.name] = 800

    def purchase(self, item, wynik):
        #print("Kupuje %s %s" % (item, current_thread().name))
        stockSize = self.warehouse[item.name]
        if (stockSize > 0):
            #print("Zakup zaakceptowany - Na stanie %s %s %s" % (item.name, stockSize, current_thread().name))
            self.warehouse[item.name] = self.warehouse[item.name] - 1
        else:
            #print("Zakup odrzucony - nie ma dostępnych więcej elementów %s %s" % (item.name, current_thread().name))
            wynik[item.name] = wynik[item.name] + 1

def shopper(shop, shoppingList, wynik):
    for el in shoppingList:
        shop.purchase(el, wynik)

if __name__ == '__main__':
    shop = Shop()
    shoppingList_1 = []
    shoppingList_2 = []
    shoppingList_3 = []
    shoppingList_4 = []
    ile = {Items.GARNEK.name:  10000, Items.TALERZ.name:  200, Items.SZKLANKA.name: 800}
    wynik = {Items.GARNEK.name:  0, Items.TALERZ.name:  0, Items.SZKLANKA.name: 0}
    for _ in range(0, 50000):
        r = random.randrange(0, 3)
        item = list(Items)[r]
        shoppingList_1.append(item)
        ile[item.name] = ile[item.name] - 1

    for _ in range(0, 50000):
        r = random.randrange(0, 3)
        item = list(Items)[r]
        shoppingList_2.append(item)
        ile[item.name] = ile[item.name] - 1

    for _ in range(0, 50000):
        r = random.randrange(0, 3)
        item = list(Items)[r]
        shoppingList_3.append(item)
        ile[item.name] = ile[item.name] - 1

    for _ in range(0, 5000):
        r = random.randrange(0, 3)
        item = list(Items)[r]
        shoppingList_4.append(item)
        ile[item.name] = ile[item.name] - 1

    thread1 = Thread(target=shopper, args=(shop,shoppingList_1, wynik,))
    thread2 = Thread(target=shopper, args=(shop,shoppingList_2, wynik,))
    thread3 = Thread(target=shopper, args=(shop,shoppingList_3, wynik,))
    thread4 = Thread(target=shopper, args=(shop,shoppingList_4, wynik,))

    thread1.start()
    thread2.start()
    thread3.start()
    thread4.start()

    thread1.join()
    thread2.join()
    thread3.join()
    thread4.join()

    for i in Items:
        if ile[i.name] > 0:
            print("Ok")
        else:
            if ile[i.name] == -wynik[i.name]:
                print("Ok")
            else:
                print("Źle")
Źle
Źle
Źle