Laboratoria8.rst

Laboratorium 8 - Korutyny i async w Python

Yield

We wprowadzeniu pojęć korutyny pomocne nam będzie przypomnienie sobie działania komendy yield.

Komenda yield definiuje strumień danych ewaluowanych leniwie w momencie pobierania kolejnej wartości. W praktyce zwykłą pętlę:

x = [1, 2, 3, 4, 5]
for i in x:
    print(i)
1
2
3
4
5

Możemy zapisać jako

x = [1, 2, 3, 4, 5]
y = iter(x)
try:
    while True:
        print(next(y))
except StopIteration as e:
    pass
1
2
3
4
5

Generatory w Pythonie to proste, ale potężne narzędzie do tworzenia iteratorów. Zapisuje się je jak zwykłe funkcje, ale do zwracania danych używa się instrukcji Yield. Mechanizm ten pozwala generatorom generować serię wartości w czasie, zamiast obliczać i przechowywać je wszystkie na raz.

Generatory są leniwe, co oznacza, że generują wartości na bieżąco i tylko wtedy, gdy jest to wymagane. Ta leniwa ocena prowadzi do optymalizacji wydajności, szczególnie w scenariuszach, w których nie wszystkie wygenerowane wartości są potrzebne. Pozwala na wydajne przetwarzanie dużych zbiorów danych bez konieczności ładowania całego zbioru danych do pamięci.

Generatory można również wykorzystać do tworzenia wydajnych potoków danych. Na przykład w potoku przetwarzania danych, który obejmuje odczytywanie danych, przekształcanie ich, a następnie zapisywanie, każdy krok może pełnić funkcję generatora przekazującego dane z jednego kroku do następnego bez konieczności ładowania wszystkich danych do pamięci.

Korutyny

Korutyny to w funkcje, których wykonywanie można wstrzymać/zawiesić w określonym momencie, a następnie wznowić wykonywanie od tego samego punktu później, kiedy tylko chcemy. Inaczej mówiąc korutyny to funkcje, które podczas wykonywania mogą wstrzymać się oczekując na przykład na wprowadzenie danych wejściowych. Są one używane do wielozadaniowości opartej na współpracy, gdy proces dobrowolnie oddaje kontrolę okresowo lub w stanie bezczynności, aby umożliwić jednoczesne uruchamianie wielu zadań. Kotutyna może zawiesić swoje wykonanie i przenieść kontrolę do innej korutyny, a także może wznowić wykonywanie od miejsca, w którym została przerwana. Zasadniczo korutyna jest funkcją podzieloną na wiele części które możemy wykonywać etapowo.

Korutyny zostały po raz pierwszy wprowadzone w latach pięćdziesiątych XX wieku jako sposób na uproszczenie tworzenia programów współbieżnych. Zostały one później spopularyzowane w języku programowania Python poprzez użycie instrukcji „yield”, która pozwala funkcji na tymczasowe zawieszenie wykonywania i przywrócenie kontroli wywołującemu, zachowując jej stan.

Korutyny współpracują, co oznacza, że łączą się ze sobą, tworząc potok. Jedna korutyna może czytać dane wejściowe i wysyłać je do drugiej, która je przetwarza. Na koniec może istnieć korutyna wyświetlający wynik.

Jak opisano wyżej potrzebujemy mechanizmu — a dokładniej słowa kluczowego — za pomocą którego możemy wstawić punkt kontrolny i powiedzieć programowi, że chcemy w tym miejscu wstrzymać wykonywanie funkcji i przywrócić kontrolę do punktu, z którego została wywołana. Egzekucję wznowimy, kiedy tylko będziemy tego chcieli.

W Pythonie możemy wstrzymać wykonywanie funkcji za pomocą słowa kluczowego yield.

Przykład:

def func():
    print('Function Starts')

    yield

    print('Function Ends')


try:
    y = func()
    print(type(y))
    next(y)  # First part of the function executed
    print('stop!')
    next(y) # Second part of the function executed
    next(y) # and now?

except StopIteration as e:
    print('error')
    pass
<class 'generator'>
Function Starts
stop!
Function Ends
error

Funkcja fun() ma typ generatora i będzie zachowywał się podobnie do iteratora, ale w przypadku iteratora przechodzimy przez iterację. Za pomocą generatora wykonujemy części korutyny.

Podobnie jak dla iteratora wyjątek StopIteration jest zgłaszany i przechwytywany w przypadku przekroczenia zakresu korutyny (generatora).

Załóżmy, że chcemy wznowić działanie korutyny poprzez wysłanie wartości (która może być stałą lub zmienną) w określonym punkcie kontrolnym (tj. w pewnym stanie funkcji). Gdy chcemy wysłać wartość, zamiast funkcji next użyjemy funkcji send. Korutyna wznawia działanie tylko w odpowiedzi na metody next() lub send().

def func():
    print('Function part 1')

    x = yield
    print(x)
    print('Function part 2')

    a = yield
    print(a)
    print('Function part 3')


try:

    y = func()

    next(y)         # Function part 1 executed, to reach the first yield we used next

    y.send(6)               # Function part 2 executed and value sent 6

    y.send(12)              # Function part 2 executed and value sent 12 and StopIteration raised

except StopIteration as e:
    pass
Function part 1
6
Function part 2
12
Function part 3

send możemy użyć tylko wtedy, gdy wykonanie znajduje się w punkcie kontrolnym yield, oraz yield znajduje się po prawej stronie wyrażenia.

Korutyny mają wiele zastosowań, takich jak implementacja paradygmatu producent-konsument lub nadawca-odbiorca w programowaniu sieciowym.

Korutyny są także elementami składowymi wielu frameworków, takich jak asyncio, twisted, aiohttp. Można je również łączyć ze sobą w celu tworzenia pipelinów.

Pojedyncza instancja korutyny nie daje nam przewagi w przetwarzaniu, zysk z przetwarzania możemy uzyskać dopiero, gdy skorzytamy z wielu korutyn jednocześnie!

Korutyny Vs Wątki

W przypadku wątków to system operacyjny (lub środowisko), przełącza sterowanie między wątkami. Natomiast w przypadku korutyn to programista i język programowania decydują, kiedy przełączać sterowanie. Korutyny współpracują, wielozadaniowo, zawieszając i wznawiając się w ustalonych przez programistę punktach.

Wątki umożliwiają programowi podzielenie się na dwa lub więcej jednocześnie (lub pseudojednocześnie) uruchomionych zadań. Są one implementowane przez system operacyjny i mają własny stos i przestrzeń pamięci. Mogą działać niezależnie od siebie i mogą wchodzić w interakcję ze współdzielonymi zasobami poprzez mechanizmy synchronizacji, takie jak blokady i semafory.

Z drugiej strony korutyny są konstrukcją na poziomie języka służącą współbieżności. Są podobne do podprogramów, ale w przeciwieństwie do podprogramów, korutyna może zapisać swój stan, a następnie wznowić od miejsca, w którym została przerwana.

Korutyna pozwala jednej „oddać” kontrolę innej korutynie i wznowić ją później od miejsca, w którym została przerwana, w przeciwieństwie do wątków, które zwykle są wykonywane w sposób ciągły aż do zakończenia lub zablokowania przez jakieś zdarzenie zewnętrzne.

Podsumowując, wątki są funkcją na poziomie systemu operacyjnego, a współprogramy są funkcją na poziomie języka proramowania służące do kontrolowanego współbieżnego wykonywania operacji. Korutyny to konstrukcja programistyczna, którą można traktować jako lekką wersję wątków.

Korutyna jako funkcja przetwarzająca dane (filtrująca napisy posiadajace określony prefix):

# Python3 program for demonstrating
# coroutine execution

def print_name(prefix):
    print("Searching prefix: {}".format(prefix))
    while True:
        name = (yield)
        if prefix in name:
            print(name)

# calling coroutine, nothing will happen
corou = print_name("Dear")

# This will start execution of coroutine and
# Prints first line "Searching prefix..."
# and advance execution to the first yield expression
corou.__next__()

# sending inputs
corou.send("")
corou.send("Student")
corou.send("Dear Student")
corou.send("Dear St")
Searching prefix: Dear
Dear Student
Dear St

Korutyna może działać w nieskończoność, do zamknięcia korutyny używana jest metoda Close(). Gdy korutyna jest zamknięta, generuje wyjątek GeneratorExit. Jeżeli po zamknięciu współprogramu spróbujemy wysłać wartości, zgłosi to wyjątek StopIteration. Poniżej znajduje się prosty przykład:

# Python3 program for demonstrating
# closing a coroutine
def print_name(prefix):
    print("Searching prefix:{}".format(prefix))
    try :
        while True:
                name = (yield)
                if prefix in name:
                    print(name)
    except GeneratorExit:
            print("Closing coroutine!!")

corou = print_name("Dear")
corou.__next__()
corou.send("Dear Student")
corou.close()
corou.send("Dear Student")
Searching prefix:Dear
Dear Student
Closing coroutine!!
---------------------------------------------------------------------------

StopIteration                             Traceback (most recent call last)

Cell In [117], line 17
     15 corou.send("Dear Student")
     16 corou.close()
---> 17 corou.send("Dear Student")


StopIteration:

Korutyna jako przykład przetwarzania producent-konsument:

# Python3 program for demonstrating
# coroutine chaining

def producer(sentence, next_coroutine):
    '''
    Dzielimy zdanie i wysyłamy dalej
    '''
    tokens = sentence.split(" ")
    for token in tokens:
        next_coroutine.send(token)
    next_coroutine.close()

def pattern_filter(next_coroutine=None):
    '''
    Filtrujemy szukając słow na a
    '''
    print("Fitrowanie")
    try:
        while True:
            token = (yield)
            if token.startswith('a') or token.startswith('A'):
                next_coroutine.send(token)
    except GeneratorExit:
        print("Done with filtering!!")
        next_coroutine.close()

def consumer():
    '''
    Wypisz filtrowane słowa
    '''
    print("I'm sink, i'll print tokens")
    try:
        while True:
            token = (yield)
            print(token)
    except GeneratorExit:
        print("Done with printing!")

pt = consumer()
pt.__next__()
pf = pattern_filter(next_coroutine = pt)
pf.__next__()

sentence = "Ala ma kota Afika, ale to fajne."
producer(sentence, pf)
I'm sink, i'll print tokens
Fitrowanie
Ala
Afika,
ale
Done with filtering!!
Done with printing!

Async

Najpopularniejszym sposobem pisania programów jest programowanie synchronicznie, gdzie po prostu kolejne linijki kodu są wykonywane w konkretnej kolejności, zgodnie z ustalonym porządkiem instrukcji.

W alternatywie do tego podejścia stoi programowanie asynchroniczne, która powoduje, że wspomniana wcześniej kolejność nie zostaje zachowana.

W programowaniu asynchornicznym zadania czekają w kolejce do wykonania, wybieraniem z kolejki zajmuje się pętla zwana pętlą zdarzeń czyli eventloop.

Zadania które czekają w pętli zdarzeń nazywamy korutynami (ang. coroutines) lub koprocedurami. Termin ten ma mniej używane polskie tłumaczenie „współprogram”.

Asyncio i korutyny, które pojawiły się od wersji Pythona 3.4.

Asyncio umożliwia wykonywanie kodu asynchronicznie. Jest to sytuacja, w której wiele tasków wykonuje się “jednocześnie”, bez konieczności czekania, aż zakończą się pozostałe. Stanowi to niezwykle istotne i znaczące rozwiązanie w dobie współczesnych programów, gdzie przykładowo potrzebujemy obsłużyć wiele tysięcy zapytań i przy tym nie zawieszać całej aplikacji.

Asyncio wprowadza koncept event loop. Jest to “główna pętla”, która umożliwia:

Korutyny i async

Korutyny zapewniają nam łatwy sposób programowania synchronicznego i asynchronicznego, ponieważ umożliwiają zawieszenie i wznowienie wykonywania później w pewnym momencie w przyszłości, co najlepiej nadaje się do wykonywania operacji nieblokujących w przypadku wielowątkowości.

Korutyny mogą być implementowane za pomocą async def statement. Preferowanym sposobem pisania aplikacji asyncio są współprogramy zadeklarowane ze składnią async/await. Mówimy, że obiekt jest obiektem awaitable, jeśli można go użyć w wyrażeniu await. Istnieją trzy główne typy obiektów odpowiadających na await: korutyny, zadania (tasks) i future.

Na przykład następujący fragment kodu (wymaga Pythona 3.7 lub nowszego) wypisuje „hello”, czeka 1 sekundę, a następnie wypisuje „world”:

import asyncio

async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')

await main()
hello
world
import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

await main()
started at 14:22:44
hello
world
finished at 14:22:47

Korutyna bez await nie zostanie wykonana

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

await main()
42
/tmp/ipykernel_52663/1163459027.py:10: RuntimeWarning: coroutine 'nested' was never awaited
  nested()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Funkcja asyncio.create_task() pozwala nam jednocześnie uruchamianiać korutyny jako zadania asyncio.

Zmodyfikujmy powyższy przykład i uruchommy jednocześnie dwie współprogramy say_after:

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

await main()
started at 14:44:48
hello
world
finished at 14:44:50

Jak widzimy wykonanie zadań jest w tym wypadku o wiele szybsze.

To nieblokujące zachowanie jest niezbędne w programowaniu asynchronicznym i jest zarządzane przez pętlę zdarzeń, taką jak ta udostępniana przez moduł asyncio. Pętla zdarzeń uruchamia zadania asynchroniczne i wywołania zwrotne, zarządza komunikacją między nimi i obsługuje zdarzenia we/wy.

import asyncio

async def task(name, delay):
    print(f"Task {name} starting with delay {delay}")
    await asyncio.sleep(delay)
    print(f"Task {name} completed after {delay} seconds")

async def main():
    # Schedule multiple tasks concurrently
    await asyncio.gather(
        task("A", 2),  # Task A will take 2 seconds to complete
        task("B", 3),  # Task B will take 3 seconds to complete
        task("C", 1)   # Task C will take 1 second to complete
    )
    print("All tasks completed")

# Run the main coroutine
await main()
Task A starting with delay 2
Task B starting with delay 3
Task C starting with delay 1
Task C completed after 1 seconds
Task A completed after 2 seconds
Task B completed after 3 seconds
All tasks completed

W głównej korutynie asyncio.gather służy do jednoczesnego uruchamiania wielu instancji zadania. To pokazuje wielozadaniowość kooperacyjną, w której Zadanie C zakończy się przed Zadaniem A i Zadaniem B, mimo że zostanie rozpoczęte po nich.

Zastosowanie

Programowanie asynchroniczne za pomocą korutyn jest szczególnie skuteczne w przypadku działań związanych z we/wy i charakteryzujących się dużymi opóźnieniami. W przeciwieństwie do tradycyjnego wątków, programowanie asynchroniczne za pomocą korutyn pozwala na jednoczesne zarządzanie wieloma zadaniami w jednym wątku, zmniejszając obciążenie i złożoność. Jest to szczególnie przydatne w scenariuszach takich jak obsługa wielu połączeń sieciowych lub wykonywanie operacji wymagających oczekiwania na zasoby zewnętrzne.

Żeby uruchomić poniższy przykład potrzebujesz modułu aiohttp spoza Jupytera, uruchom więc go lokalnie na komputerze

import asyncio
import aiohttp

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "http://google.com",
        "http://yahoo.com",
        "http://facebook.com",
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch(url, session) for url in urls]
        responses = await asyncio.gather(*tasks)

        for response in responses:
            print(response[:100])  # Print first 100 characters of each response

def run_asyncio_coroutine():
    try:
        # Get the current event loop, but don't close it afterwards
        loop = asyncio.get_event_loop()
    except RuntimeError as e:
        # If no current event loop, create a new one
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

    # Run the coroutine
    loop.run_until_complete(main())

# Execute the function
await main()
/usr/lib/python3.9/genericpath.py:77: RuntimeWarning: coroutine 'main' was never awaited
  m = tuple(map(os.fspath, m))
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
---------------------------------------------------------------------------

NameError                                 Traceback (most recent call last)

Cell In [43], line 34
     31     loop.run_until_complete(main())
     33 # Execute the function
---> 34 await main()


Cell In [43], line 14, in main()
      7 async def main():
      8     urls = [
      9         "http://google.com",
     10         "http://yahoo.com",
     11         "http://facebook.com",
     12     ]
---> 14     async with aiohttp.ClientSession() as session:
     15         tasks = [fetch(url, session) for url in urls]
     16         responses = await asyncio.gather(*tasks)


NameError: name 'aiohttp' is not defined

Korutyny i dekoratory

Podczas pracy nad większymi projektami ręczne inicjowanie każdej pojedynczej korutyny może być ogromną przeszkodą! Dzięki wykorzystaniu dekoratorów, nie musimy używać metody next():

def coroutine(func):
    def start(*args, **kwargs):
        cr = func(*args, **kwargs)
        next(cr)
        return cr
    return start

@coroutine
def bare_bones():
    while True:
        value = (yield)
        print(value)

cor = bare_bones()
cor.send("Using a decorator!")
Using a decorator!

Pipeline z wykorzystaniem dekoratora:

def coroutine(func):
    def start(*args, **kwargs):
        cr = func(*args, **kwargs)
        next(cr)
        return cr
    return start

def producer(cor):
    n = 1
    while n < 100:
        cor.send(n)
        n = n * 2

@coroutine
def my_filter(num, cor):
    while True:
        n = (yield)
        if n < num:
            cor.send(n)

@coroutine
def printer():
    while True:
        n = (yield)
        print(n)

prnt = printer()
filt = my_filter(50, prnt)
producer(filt)
1
2
4
8
16
32

Uwaga! Korutyn nie możemy zapętlać to znaczy że jedna wykonuje drugą a ta druga pierwszą.

def coroutine_1(value):
    while True:
        next_cor = (yield)
        print(value)
        value = value - 1
        if next_cor != None:
            next_cor.send(value)

def coroutine_2(next_cor):
    while True:
        value = (yield)
        print(value)
        value = value - 2
        if next != None:
            next_cor.send(value)

cor1 = coroutine_1(20)
next(cor1)
cor2 = coroutine_2(cor1)
next(cor2)
cor1.send(cor2)
20
19
---------------------------------------------------------------------------

ValueError                                Traceback (most recent call last)

Cell In [47], line 21
     19 cor2 = coroutine_2(cor1)
     20 next(cor2)
---> 21 cor1.send(cor2)


Cell In [47], line 7, in coroutine_1(value)
      5 value = value - 1
      6 if next_cor != None:
----> 7     next_cor.send(value)


Cell In [47], line 15, in coroutine_2(next_cor)
     13 value = value - 2
     14 if next != None:
---> 15     next_cor.send(value)


ValueError: generator already executing

Bibliografia