programing

asyncio는 실제로 어떻게 작동합니까?

projobs 2022. 10. 31. 23:12
반응형

asyncio는 실제로 어떻게 작동합니까?

이 질문은 저의 또 다른 질문에서 비롯되었습니다.CDef에서 어떻게 기다리죠?

웹에는 에 대한 수많은 기사와 블로그 게시물이 있습니다.asyncio이치노는 어떻게 해야 하는지에 대한 수 .asyncio실제로 구현되어 있습니다.I/O를 사용하다소스 코드를 읽으려고 했는데, 수천 줄의 C코드가 최고 등급은 아닙니다. 많은 것들이 보조 객체를 다루고 있습니다. 하지만 가장 중요한 것은 Python 구문과 C코드가 어떤 C코드로 변환되는지 연결하는 것이 어렵습니다.

Asycnio 자신의 문서는 더 도움이 되지 않습니다.작동 방법에 대한 정보는 없고, 사용 방법에 대한 지침만 있을 뿐이며, 때때로 오해의 소지가 있거나 매우 서투릅니다.

저는 Go의 Coroutine 구현에 익숙하고 Python도 같은 것을 하기를 바랬습니다.그런 경우라면, 상기 링크의 포스트에 기재되어 있던 코드가 유효했을 것입니다.그게 아니었으니까 이유를 알아보는 중이에요.지금까지의 추측은 다음과 같습니다.잘못된 부분을 정정해 주세요.

  1. async def foo(): ...는 실제로는 됩니다.coroutine.
  2. async def는 실제로 .await스테이트먼트(이러한 메서드를 호출하는 오브젝트가 지금까지의 실행을 통해 이루어진 진척을 추적할 수 있는 경우)
  3. 위의 내용이 참일 경우 기본적으로 Coroutine 실행은 글로벌 매니저(루프?)에 의한 Coroutine 객체의 호출 메서드로 요약됩니다.
  4. /O 를 어느 정도 인식하고 , 실행 후(「」 「IO」 「Python(?)」를 한 후 중인 할 수 . 코드에 의해 실행되는 시기를 어느 정도(어떻게) 인식하고 있으며 현재 실행 방법이 제어를 포기한 후(에 히트) 실행할 보류 중인 Coroutine 메서드 중 하나를 선택할 수 있습니다.await이치노

말해,가 '탈출하고 있는 것은 '입니다.asyncio이치

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

만약 내 추측이 맞다면, 나는 문제가 있다.이 시나리오에서는 실제로 어떻게 I/O가 이루어집니까?되어 있고 I통역사 ?통역사 전체가 정지되어 있고 I/O가 통역사 외부에서 발생하고 있습니까?I/O 확확 i i i i i i i i i? 내 가 C라고 불렸다면.open()프로시저는 커널에 인터럽트를 전송하고 제어권을 이양합니다.Python 인터프리터는 어떻게 이 사실을 알고 다른 코드를 계속 실행할 수 있는 반면 커널 코드는 실제 I/O를 수행하며 원래 인터럽트를 전송한 Python 프로시저를 웨이크업할 때까지 인터럽트를 실행합니다.Python 인터프리터?

asyncio는 어떻게 작동합니까?

이 질문에 답변하기 전에 몇 가지 기본 용어를 이해해야 합니다. 이미 알고 있는 용어가 있으면 생략하십시오.

제너레이터

제너레이터는 python 함수의 실행을 일시 중단할 수 있는 객체입니다.사용자 큐레이션된 생성기는 키워드를 사용하여 구현됩니다. 이 생성기는 다음을 포함하는 일반 함수를 생성함으로써yield키워드를 지정하면, 그 함수를 제너레이터로 변환합니다.

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

보시다시피 제너레이터를 호출하면 인터프리터가 테스트 프레임을 로드하고yielded value.부르기next()다시 프레임을 인터프리터 스택에 다시 로드하고 계속합니다.yield다른 가치를 부여합니다.

세 번째까지next()호출되고, 발전기가 완성되고, 버려집니다.

제너레이터와의 통신

제너레이터의 별로 알려지지 않은 기능은 및 두 가지 방법을 사용하여 제너레이터와 통신할 수 있다는 점입니다.

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

전화하면gen.send()이 값은 에서 반환값으로 전달됩니다.yield키워드를 지정합니다.

gen.throw()한편, 생성기 내부에 예외를 던질 수 있습니다.단, 같은 장소에서 예외가 발생합니다.yield가 호출되었습니다.

생성자에서 값 반환

제너레이터에서 값을 반환하면 값이 내부로 들어갑니다.StopIteration예외.나중에 예외에서 값을 회복하여 필요에 따라 사용할 수 있습니다.

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

새로운 키워드를 소개합니다.yield from

Python 3.4에는 새로운 키워드: 가 추가되었습니다.그 키워드로 할 수 있는 것은, 어느 것이든next(),send()그리고.throw()내부 가장 중첩된 발전기로 변환합니다.내부 제너레이터가 값을 반환하는 경우 이 값은 다음 값이기도 합니다.yield from:

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

나는 이 주제에 대해 좀 더 자세히 설명하기 위해 기사를 썼다.

모든 것을 종합하면

새로운 키워드를 도입했을 때yield fromPython 3.4에서는 생성기 내부에 터널처럼 데이터를 가장 안쪽에서 가장 바깥쪽 생성기로 주고받는 생성기를 만들 수 있었습니다.이로 인해 생성자에 대한 새로운 의미인 코루틴이 생성되었습니다.

코루틴은 실행 중에 중지 및 재개할 수 있는 기능입니다.Python에서는 키워드를 사용하여 정의합니다.발전기처럼, 그들 또한 그들만의 형식을 사용합니다.yield from그 전에async그리고.awaitPython 3.5에서 도입되었으며, 생성기가 생성된 것과 동일한 방법으로 코루틴을 만들었습니다.yield from대신await).

async def inner():
    return 1

async def outer():
    await inner()

모든 반복기 및 생성기와 마찬가지로__iter__()메서드, 모든 Coroutins 구현__await__()매번 계속 할 수 있게 해주죠await coro호출됩니다.

Python 문서 안에 멋진 시퀀스 다이어그램이 있으니 확인해 보세요.

비동기에서는 코루틴 기능과는 별도로 태스크미래라는 두 가지 중요한 오브젝트가 있습니다.

선물

Future(선물)는 다음과 같은 기능을 가진 객체입니다.__await__()구현된 방식이며, 이들의 역할은 특정 상태와 결과를 유지하는 것입니다.상태는 다음 중 하나입니다.

  1. 보류 중 - 미래에는 결과 또는 예외가 설정되지 않았습니다.
  2. 취소됨 - 다음을 사용하여 미래 취소됨fut.cancel()
  3. FINished - 미래를 완료했습니다. 결과 집합 또는 예외 집합 중 하나를 사용하여 완료됨

결과는 당신이 추측한 대로 반환되는 Python 오브젝트 또는 발생할 수 있는 예외가 될 수 있습니다.

다른 중요한 기능future오브젝트에는 라는 메서드가 포함되어 있습니다.이 메서드를 사용하면 예외가 발생했든 종료했든 작업이 완료되자마자 함수를 호출할 수 있습니다.

임무들

작업 객체는 코루틴을 감싸고 가장 안쪽 및 가장 바깥쪽 코루틴과 통신하는 특별한 미래입니다.매번 코루틴이await미래입니다. 미래는 태스크로 돌아갑니다(예:yield from)가 수신됩니다.

다음으로, 태스크는 미래에 스스로를 구속합니다.전화함으로써 그렇게 한다.add_done_callback()미래에 대해서요.앞으로 취소, 예외 통과, Python 오브젝트 통과 중 하나를 통해 향후 작업이 수행될 경우 태스크의 콜백이 호출되어 존재 상태로 돌아갑니다.

비동기

마지막으로 대답해야 할 중요한 질문은 IO를 어떻게 구현하느냐입니다.

비동기식 내부에는 이벤트 루프가 있습니다.태스크 이벤트루프이벤트 루프의 역할은 작업이 준비될 때마다 작업을 호출하고 모든 작업을 하나의 작업 기계로 조정하는 것입니다.

이벤트 루프의 IO 부분은 라고 하는 하나의 중요한 기능을 기반으로 합니다.Select는, 이하의 operating system에 의해서 실장되는 블로킹 기능이며, 소켓으로 착신 또는 발신 데이터를 대기할 수 있습니다.데이터를 수신하면 웨이크업 상태가 되어 데이터를 수신한 소켓 또는 쓰기 준비가 된 소켓이 반환됩니다.

asyncio를 통해 소켓을 통해 데이터를 송수신하려고 하면 소켓에 즉시 읽거나 전송할 수 있는 데이터가 있는지 먼저 확인됩니다.만약 그렇다면.send()버퍼가 꽉 찼거나.recv()버퍼가 비어 있고 소켓이 에 등록되어 있습니다.select기능(목록 중 하나에 추가하는 것만으로)rlist위해서recv그리고.wlist위해서send) 및 적절한 기능await신규 작성future그 소켓에 묶여있는 물체.

사용 가능한 모든 태스크가 미래를 기다리고 있을 때 이벤트루프가 호출됩니다select기다리고 있습니다.소켓 중 하나에 착신 데이터가 있는 경우 또는send버퍼가 고갈되면 asyncio는 해당 소켓에 연결된 미래 객체를 체크하고 done으로 설정합니다.

이제 모든 마법이 일어난다.장래는 완료로 설정되어 있습니다.이 작업은 이전에 로 추가되었던 것입니다.add_done_callback()다시 살아나면 전화를 걸죠.send()가장 안쪽의 코루틴을 재개하는 코루틴에서(때문에)awaitchain)에서 새로 수신한 데이터를 인근 버퍼에서 읽습니다.

다음과 같은 경우 메서드 체인을 다시 사용합니다.recv():

  1. select.select기다리다
  2. 데이터가 포함된 준비 소켓이 반환됩니다.
  3. 소켓으로부터의 데이터는 버퍼로 이동한다.
  4. future.set_result()호출됩니다.
  5. 에 의해 추가된 태스크add_done_callback()이제 깨어난다.
  6. 태스크 콜.send()가장 안쪽 코루틴으로 들어가서 깨우는 코루틴을요
  7. 버퍼에서 데이터를 읽어내어 겸손한 사용자에게 반환하고 있습니다.

요약하면, 비동기 기능은 일시정지 및 재개 기능을 가능하게 하는 제너레이터 기능을 사용합니다.제너레이터 기능이 사용됩니다.사용하다yield from가장 안쪽 제너레이터에서 가장 바깥쪽 제너레이터로 데이터를 주고받을 수 있는 기능입니다.I/O가 완료되기를 기다리는 동안(OS를 사용하여) 기능 실행을 중지하기 위해 이러한 모든 기능을 사용합니다.select기능).

가장 좋은 건?1개의 기능이 일시 정지되어 있는 동안 다른 기능이 실행되어 섬세한 패브릭(비동기)과 인터리브 할 수 있습니다.

에 대해 이야기하다async/await그리고.asyncio같은 것이 아닙니다.첫 번째는 기본적이고 낮은 수준의 구성(코루틴)이고, 후자는 이러한 구성을 사용하는 라이브러리입니다.반대로, 하나의 궁극적인 답은 없습니다.

다음은 일반적인 설명입니다.async/await그리고.asyncio라이브러리가 기능하는 것처럼.즉, 위에 다른 트릭이 있을 수 있지만(...) 직접 구축하지 않으면 중요하지 않습니다.이러한 질문을 하지 않아도 될 만큼 충분히 알고 있지 않다면 그 차이는 무시할 수 있을 것입니다.

1. 너트 셸에서의 코루틴 대 서브루틴

서브루틴(함수, 프로시저 등)과 마찬가지로 코루틴(생성기 등)은 콜스택과 명령 포인터의 추상화입니다.실행 코드 조각의 스택이 있으며 각각이 특정 명령에 있습니다.

의 차이점defasync def단지 명확성을 위해서야실제 차이는returnyield여기서부터await아니면yield from개별 콜의 차이를 스택 전체로 가져옵니다.

1.1 서브루틴

서브루틴은 로컬 변수를 유지하기 위한 새로운 스택레벨과 끝에 도달하기 위한 명령의 단일 트래버스를 나타냅니다.다음과 같은 서브루틴을 생각해 보겠습니다.

def subfoo(bar):
     qux = 3
     return qux * bar

당신이 그것을 실행한다면, 그것은

  1. 스택 공간 할당bar그리고.qux
  2. 첫 번째 문장을 반복적으로 실행하고 다음 문장으로 건너뜁니다.
  3. 한 번에 한 번씩return, 그 값을 발신측 스택에 푸시 합니다.
  4. 스택(1.)과 명령 포인터(2)를 지웁니다.

특히 4.는 서브루틴이 항상 같은 상태에서 시작됨을 의미합니다.기능 자체의 모든 것은 완료 시 손실됩니다.다음 명령이 있더라도 기능을 재개할 수 없습니다.return.

root -\
  :    \- subfoo --\
  :/--<---return --/
  |
  V

1.2 영속 서브루틴으로서의 코루틴

코루틴은 서브루틴과 비슷하지만 상태를 파괴하지 않고 종료할 수 있습니다.다음과 같은 코루틴을 생각해 봅시다.

 def cofoo(bar):
      qux = yield bar  # yield marks a break point
      return qux

당신이 그것을 실행한다면, 그것은

  1. 스택 공간 할당bar그리고.qux
  2. 첫 번째 문장을 반복적으로 실행하고 다음 문장으로 건너뜁니다.
    1. 한 번에 한 번씩yield, 그 값을 콜링 스택에 푸시하지만 스택과 명령 포인터를 저장합니다.
    2. 한번 불러서yield, 스택 및 명령 포인터와 push 인수를 복원합니다.qux
  3. 한 번에 한 번씩return, 그 값을 발신측 스택에 푸시 합니다.
  4. 스택(1.)과 명령 포인터(2)를 지웁니다.

2.1과 2.2의 추가에 유의하십시오. 코루틴은 사전 정의된 지점에서 일시 중단했다가 다시 시작할 수 있습니다.이것은 다른 서브루틴을 호출할 때 서브루틴이 일시 정지되는 것과 비슷합니다.차이점은 액티브한 coroutine이 콜링 스택에 엄밀하게 바인드 되어 있지 않다는 것입니다.대신 부유 코루틴은 분리된 개별 스택의 일부입니다.

root -\
  :    \- cofoo --\
  :/--<+--yield --/
  |    :
  V    :

즉, 부유 코루틴을 자유롭게 저장하거나 스택 간에 이동할 수 있습니다.Coroutine에 액세스 할 수 있는 콜스택은, 그 재개를 결정할 수 있습니다.

1.3. 콜 스택 통과

지금까지 우리의 코루틴은 콜스택에서yield서브루틴은 콜스택을 다운 및 업 할 수 있습니다.return그리고.()coroutine은 완전성을 유지하기 위해 콜스택을 올라가기 위한 메커니즘도 필요합니다.다음과 같은 코루틴을 생각해 봅시다.

def wrap():
    yield 'before'
    yield from cofoo()
    yield 'after'

실행 시 스택과 명령 포인터가 서브루틴처럼 할당됩니다.일시정지 시에도 서브루틴을 저장하는 것과 같습니다.

하지만,yield from 다 할 수 있어요.스택 및 명령 포인터를 일시 정지합니다.wrap 실행cofoo주의해 주세요.wrap까지 보류된 채로 있다.cofoo완전히 끝납니다.언제든지cofoo일시정지나 뭔가 보내진다면cofoo는 콜링 스택에 직접 접속되어 있습니다.

1.4 코루틴이 완전히 내려갑니다.

확립된 대로yield from를 사용하면 2개의 스코프를 다른 중간 스코프에 접속할 수 있습니다.재귀적으로 적용되면 스택의 상단을 스택의 하부에 연결할 수 있습니다.

root -\
  :    \-> coro_a -yield-from-> coro_b --\
  :/ <-+------------------------yield ---/
  |    :
  :\ --+-- coro_a.send----------yield ---\
  :                             coro_b <-/

주의:root그리고.coro_b서로에 대해 모르다이것에 의해, 콜백보다 훨씬 깨끗한 코루틴이 됩니다.코루틴은 서브루틴과 같은 1:1 관계에 구축되어 있습니다.코루틴은 일반 콜포인트까지 기존 실행 스택 전체를 일시정지하고 재개합니다.

특히,root재개할 코루틴의 개수가 임의일 수 있습니다.그러나 동시에 둘 이상 재개할 수 없습니다.같은 루트의 코루틴은 동시에 존재하지만 병렬은 아닙니다!

1.5 Python의async그리고.await

지금까지의 설명에서는,yield그리고.yield from제너레이터의 어휘 - 기본 기능은 동일합니다.새로운 Python 3.5 구문async그리고.await주로 명확성을 위해 존재합니다.

def foo():  # subroutine?
     return None

def foo():  # coroutine?
     yield from foofoo()  # generator? coroutine?

async def foo():  # coroutine!
     await foofoo()  # coroutine!
     return None

async for그리고.async with스테이트먼트가 필요하기 때문에yield from/await맨몸으로 묶다for그리고.with진술들.

2. 단순 이벤트 루프의 구조

코루틴은 그 자체로는 다른 코루틴에 제어를 양보하는 개념이 없다.coroutine 스택의 하부에 있는 발신자에게만 제어 권한을 부여할 수 있습니다.다음으로 이 발신자는 다른 코루틴으로 전환하여 실행할 수 있습니다.

여러 코루틴의 이 루트노드는 보통 이벤트루프입니다일시정지시 코루틴은 재개할 이벤트를 생성합니다.또한 이벤트 루프는 이러한 이벤트가 발생할 때까지 효율적으로 대기할 수 있습니다.이것에 의해, 다음에 실행할 코루틴이나 재개하기 전에 대기하는 방법을 결정할 수 있습니다.

이러한 설계는 루프가 인식하는 일련의 사전 정의된 이벤트가 있음을 의미합니다.여러 코루틴await결국 사건이 터질 때까지 서로서로awaited. 이 이벤트는 이벤트 루프와 직접 통신할 수 있습니다.yield제어하고 있습니다.

loop -\
  :    \-> coroutine --await--> event --\
  :/ <-+----------------------- yield --/
  |    :
  |    :  # loop waits for event to happen
  |    :
  :\ --+-- send(reply) -------- yield --\
  :        coroutine <--yield-- event <-/

핵심은 코루틴 서스펜션이 이벤트 루프와 이벤트를 직접 통신할 수 있도록 한다는 것입니다.중간 Coroutine 스택은 어떤 루프가 실행 중인지도 이벤트 동작에 대한 지식도 필요하지 않습니다.

2.1.1 시간 내 이벤트

가장 간단하게 처리할 수 있는 이벤트는 특정 시점에 도달하는 것입니다.이 또한 스레드 코드의 기본 블록입니다. 스레드 반복sleeps는 조건이 true가 될 때까지 계속됩니다.단, 단골은sleep실행 자체를 차단합니다.다른 코루틴은 차단되지 않습니다.대신 이벤트 루프가 언제 현재의 Coroutine 스택을 재개해야 하는지 알립니다.

2.1.2 이벤트의 정의

이벤트는 단순히 열거형, 유형 또는 기타 ID를 통해 식별할 수 있는 값입니다.목표 시간을 저장하는 간단한 클래스로 정의할 수 있습니다.이벤트 정보를 저장하는 것 외에await직접 수업

class AsyncSleep:
    """Event to sleep until a point in time"""
    def __init__(self, until: float):
        self.until = until

    # used whenever someone ``await``s an instance of this Event
    def __await__(self):
        # yield this Event to the loop
        yield self
    
    def __repr__(self):
        return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

이 클래스는 이벤트만 저장합니다. 실제로 이벤트를 처리하는 방법은 설명하지 않습니다.

유일한 특장점은__await__- 이게 바로await키워드를 지정합니다.실제로는 반복기이지만 일반 반복 기계에서는 사용할 수 없습니다.

2.2.1 이벤트 대기 중

이벤트도 있는데 코루틴은 어떻게 반응할까요?우리는 다음과 같은 것을 표현할 수 있을 것이다.sleep타고await우리 이벤트를 위해서.상황을 더 잘 파악하기 위해 두 번 대기합니다.

import time

async def asleep(duration: float):
    """await that ``duration`` seconds pass"""
    await AsyncSleep(time.time() + duration / 2)
    await AsyncSleep(time.time() + duration / 2)

이 코루틴을 직접 인스턴스화하고 실행할 수 있습니다.발전기와 유사, 사용coroutine.send코루틴을 가동하여yield결과입니다.

coroutine = asleep(100)
while True:
    print(coroutine.send(None))
    time.sleep(0.1)

이거면 두 개야AsyncSleep이벤트 및 그 후 aStopIteration코루틴이 완성되면요유일한 지연은 다음 날짜부터입니다.time.sleep루프를 타고!각각AsyncSleep현재 시간으로부터의 오프셋만 저장합니다.

2.2.2. 이벤트 + 수면

현시점에서는, 다음의 2개의 메카니즘이 준비되어 있습니다.

  • AsyncSleep코루틴 내부에서 얻을 수 있는 이벤트
  • time.sleep코루틴에 영향을 주지 않고 대기할 수 있는

특히, 이 두 가지는 직교이며, 한쪽이 다른 쪽에 영향을 미치거나 트리거하지 않습니다.그 결과, 우리는 우리만의 전략을 생각해 낼 수 있다.sleep지연에 대처하다AsyncSleep.

2.3. 순진한 이벤트 루프

만약 우리가 여러 개의 코루틴을 가지고 있다면, 각각의 코루틴이 언제 깨어나고 싶은지 말해 줄 수 있다.그 후 첫 번째가 재개될 때까지 기다렸다가 그 후에 다시 재개될 때까지 기다릴 수 있습니다.특히, 각 시점에서는, 어느 쪽이 다음인지에만 관심이 있습니다.

이것에 의해, 스케줄링이 간단하게 됩니다.

  1. 원하는 기상 시간에 따라 코루틴을 정렬하다
  2. 가장 먼저 깨어나고 싶은 사람을 고르다
  3. 이 시점까지 기다리다
  4. 이 코루틴을 투여하다
  5. 1부터 반복합니다.

간단한 구현에는 고급 개념이 필요하지 않습니다.alist를 사용하여 날짜별로 코루틴을 정렬할 수 있습니다.기다림은 단골이다.time.sleep. Coroutines 실행은 이전과 동일하게 동작합니다.coroutine.send.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    # store wake-up-time and coroutines
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting:
        # 2. pick the first coroutine that wants to wake up
        until, coroutine = waiting.pop(0)
        # 3. wait until this point in time
        time.sleep(max(0.0, until - time.time()))
        # 4. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])

물론, 이것은 개선의 여지가 충분히 있습니다.대기 큐에는 힙을 사용하거나 이벤트에는 디스패치테이블을 사용할 수 있습니다.또, Return values(반환치)를, RetStopIteration코루틴에 할당하는 거죠그러나 근본원칙은 그대로다.

2.4. 협조 대기

AsyncSleep이벤트 및run이벤트 루프는 타이밍이 설정된 이벤트의 완전한 실장입니다.

async def sleepy(identifier: str = "coroutine", count=5):
    for i in range(count):
        print(identifier, 'step', i + 1, 'at %.2f' % time.time())
        await asleep(0.1)

run(*(sleepy("coroutine %d" % j) for j in range(5)))

이것에 의해, 5개의 코루틴 각각이 협동적으로 전환되어 각각이 0.1초간 일시정지됩니다.이벤트 루프가 동기화되어 있어도 2.5초가 아닌 0.5초 만에 작업을 수행합니다.각 코루틴은 상태를 유지하고 독립적으로 작동합니다.

3. I/O 이벤트 루프

가 지원하는 이벤트루프sleep폴링에 적합합니다.그러나 파일 핸들의 I/O를 보다 효율적으로 대기할 수 있습니다. 즉, 운영 체제에서 I/O를 구현하므로 어떤 핸들이 준비되었는지 알 수 있습니다.이벤트 루프는 명시적인 "Ready for I/O" 이벤트를 지원하는 것이 이상적입니다.

3.1.select불러

Python은 이미 OS에서 읽기 I/O 핸들을 쿼리하는 인터페이스를 가지고 있습니다.읽거나 쓸 핸들을 사용하여 호출하면 읽거나 쓸 수 있는 핸들이 반환됩니다.

readable, writable, _ = select.select(rlist, wlist, xlist, timeout)

예를 들어, 우리는open쓰기용 파일을 작성하고 준비될 때까지 기다립니다.

write_target = open('/tmp/foo')
readable, writable, _ = select.select([], [write_target], [])

일단 반품 선택 후,writable에는 열려 있는 파일이 포함되어 있습니다.

3.2. 기본 I/O 이벤트

와 비슷합니다.AsyncSleepI/O 이벤트를 정의해야 합니다.기반이 되는 것select논리, 이벤트는 읽을 수 있는 객체를 참조해야 합니다.open파일. 또한 읽을 데이터의 양을 저장합니다.

class AsyncRead:
    def __init__(self, file, amount=1):
        self.file = file
        self.amount = amount
        self._buffer = b'' if 'b' in file.mode else ''

    def __await__(self):
        while len(self._buffer) < self.amount:
            yield self
            # we only get here if ``read`` should not block
            self._buffer += self.file.read(1)
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.file, self.amount, len(self._buffer)
        )

와 마찬가지로AsyncSleep대부분의 경우 기본 시스템 호출에 필요한 데이터만 저장합니다.이번에,__await__여러 번 재개할 수 있습니다.원할 때까지amount가 읽혔습니다.또, 우리는returnI/O 결과를 얻을 수 있습니다.

3.3. 읽기 I/O를 통한 이벤트 루프 확대

이벤트 루프의 기초는 여전히run를 참조해 주세요.먼저 읽기 요청을 추적해야 합니다.이것은 더 이상 정렬된 스케줄이 아니며 읽기 요청을 코루틴에만 매핑합니다.

# new
waiting_read = {}  # type: Dict[file, coroutine]

부터select.select타임아웃 파라미터를 사용합니다.time.sleep.

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(waiting_read), [], [])

그러면 모든 파일을 읽을 수 있습니다.파일이 있으면 대응하는 루틴을 실행합니다.없는 경우 현재 코루틴이 실행되기를 충분히 기다린 것입니다.

# new - reschedule waiting coroutine, run readable coroutine
if readable:
    waiting.append((until, coroutine))
    waiting.sort()
    coroutine = waiting_read[readable[0]]

마지막으로 읽기 요청을 실제로 수신해야 합니다.

# new
if isinstance(command, AsyncSleep):
    ...
elif isinstance(command, AsyncRead):
    ...

3.4. 조립

위의 내용은 조금 단순화되었습니다.항상 책을 읽을 수 있다면 잠자는 코루틴을 굶기지 않기 위해 전환이 필요하다.우리는 읽을 것도 없고 기다릴 것도 없는 것을 처리해야 한다.그러나 최종 결과는 여전히 30 LOC에 들어맞습니다.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    waiting_read = {}  # type: Dict[file, coroutine]
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting or waiting_read:
        # 2. wait until the next coroutine may run or read ...
        try:
            until, coroutine = waiting.pop(0)
        except IndexError:
            until, coroutine = float('inf'), None
            readable, _, _ = select.select(list(waiting_read), [], [])
        else:
            readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
        # ... and select the appropriate one
        if readable and time.time() < until:
            if until and coroutine:
                waiting.append((until, coroutine))
                waiting.sort()
            coroutine = waiting_read.pop(readable[0])
        # 3. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension ...
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
        # ... or register reads
        elif isinstance(command, AsyncRead):
            waiting_read[command.file] = coroutine

3.5 공동 I/O

AsyncSleep,AsyncRead그리고.runsleeve 및/또는 판독이 완전하게 기능하게 되었습니다.동일sleepy, 다음의 판독치를 테스트하는 도우미를 정의할 수 있습니다.

async def ready(path, amount=1024*32):
    print('read', path, 'at', '%d' % time.time())
    with open(path, 'rb') as file:
        result = await AsyncRead(file, amount)
    print('done', path, 'at', '%d' % time.time())
    print('got', len(result), 'B')

run(sleepy('background', 5), ready('/dev/urandom'))

이를 실행하면 I/O가 대기 태스크와 인터리빙되어 있음을 알 수 있습니다.

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B

4. 논블로킹 I/O

파일의 I/O는 개념을 이해하지만 다음과 같은 라이브러리에는 적합하지 않습니다.asyncio: 그select은 항상 파일에 대해 반환되며 둘 다open그리고.read무기한 차단될 수 있습니다.그러면 이벤트 루프의 모든 코루틴이 차단됩니다.이것은 불량입니다.라이브러리와 같은aiofiles스레드 및 동기화를 사용하여 파일에서 차단되지 않는 I/O 및 이벤트를 위조합니다.

그러나 소켓에서는 논블로킹 I/O가 가능하기 때문에 대기 시간이 매우 중요합니다.이벤트 루프에서 사용하면 데이터 대기 및 재시도를 차단하지 않고 래핑할 수 있습니다.

4.1. 논블로킹 I/O 이벤트

델과 같은AsyncRead, 소켓의 일시정지 및 읽기 이벤트를 정의할 수 있습니다.파일을 가져오는 대신 소켓을 사용합니다.소켓은 논블로킹이어야 합니다.그리고 또 저희__await__사용하다socket.recv대신file.read.

class AsyncRecv:
    def __init__(self, connection, amount=1, read_buffer=1024):
        assert not connection.getblocking(), 'connection must be non-blocking for async recv'
        self.connection = connection
        self.amount = amount
        self.read_buffer = read_buffer
        self._buffer = b''

    def __await__(self):
        while len(self._buffer) < self.amount:
            try:
                self._buffer += self.connection.recv(self.read_buffer)
            except BlockingIOError:
                yield self
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.connection, self.amount, len(self._buffer)
        )

와는 대조적으로AsyncRead,__await__진정한 논블로킹 I/O를 수행합니다.사용 가능한 데이터는 항상 읽힙니다.사용 가능한 데이터가 없으면 항상 일시 중단됩니다.즉, 이벤트 루프가 차단되는 것은 유용한 작업을 수행하는 동안뿐입니다.

4.2. 이벤트 루프의 차단 해제

이벤트 루프에 관한 한 큰 변화는 없습니다.수신할 이벤트는 파일에 대한 이벤트와 동일합니다. 파일 설명자에 ready 마크가 붙어 있습니다.select.

# old
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
    waiting_read[command.connection] = coroutine

이 시점에서, 는 것은 명백할 것이다.AsyncRead그리고.AsyncRecv같은 종류의 사건입니다.교환 가능한 I/O 컴포넌트를 가진 하나의 이벤트로 쉽게 재구성할 수 있습니다.실제로 이벤트 루프, 코루틴 및 이벤트는 스케줄러, 임의의 중간 코드 및 실제 I/O를 완전히 분리합니다.

4.3. 논블로킹 I/O의 단점

원칙적으로 이 시점에서 해야 할 일은 다음과 같은 논리를 복제하는 것입니다.read로서recv위해서AsyncRecv하지만 지금은 훨씬 더 보기 흉합니다. 커널 내부에서 함수가 차단되었을 때 조기 반환을 처리하지만 제어권은 사용자에게 양보해야 합니다.예를 들어, 접속을 여는 경우와 파일을 여는 경우의 시간이 훨씬 길어집니다.

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
    connection.connect((url, port))
except BlockingIOError:
    pass

요컨대, 남은 것은 수십 줄의 예외 처리입니다.이벤트와 이벤트루프는 이 시점에서 이미 동작하고 있습니다.

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5

부록

github의 예제 코드

asyncio가 뭐죠?

Asyncio는 비동기 입력 출력을 나타내며 단일 스레드 또는 이벤트 루프를 사용하여 높은 동시성을 실현하는 프로그래밍 패러다임을 나타냅니다.비동기 프로그래밍은 작업 단위가 기본 응용 프로그램 스레드와 별도로 실행될 수 있는 병렬 프로그래밍의 한 유형입니다.작업이 완료되면 작업자 스레드의 완료 또는 실패를 메인 스레드에 알립니다.

다음 이미지를 참조해 주십시오.

asynchronous_flow

예를 들어 asyncio에 대해 설명하겠습니다.

비동기화의 개념을 이해하기 위해 웨이터가 1명 있는 레스토랑을 생각해 봅시다.갑자기 손님 A, B, C 세 명이 나타난다.웨이터로부터 메뉴를 받은 후, 세 사람은 무엇을 먹을지 결정하는 데 다양한 시간이 걸린다.

A가 5분, B가 10분, C가 1분이 걸린다고 가정해 봅시다.싱글 웨이터가 먼저 B부터 시작해서 10분 후에 B의 주문을 받으면, 그는 A를 대접하고 5분 동안 자신의 주문을 받아 적으며 마지막으로 C가 무엇을 먹고 싶은지 알기 위해 1분을 소비한다.따라서 웨이터는 주문을 받아 적는 데 총 10 + 5 + 1 = 16분을 소비합니다.단, 이 일련의 이벤트에서는 C는 웨이터가 도착하기 15분 전에, A는 10분, B는 0분 기다린다는 것을 알 수 있습니다.

이제 웨이터가 각 고객이 결정하는 데 걸리는 시간을 알고 있는지 생각해 보십시오.그는 먼저 C로 시작해서 A로, 마지막으로 B로 갈 수 있다.이렇게 하면 각 고객이 0분 동안 대기하게 됩니다.웨이터가 1명밖에 없는데도 1명씩, 3명의 웨이터가 있는 듯한 착각을 일으킨다.

마지막으로 웨이터가 세 가지 주문을 모두 받는 데 걸리는 시간은 10분으로 다른 시나리오의 16분보다 훨씬 짧습니다.

다음 예시를 살펴보겠습니다.

체스의 거장 매그너스 칼슨이 여러 아마추어 선수들과 함께 체스 전시회를 개최한다고 가정해 봅시다.그의 전시회 진행 방식은 동기식과 비동기식의 두 가지다.

전제 조건:

  • 24명의 상대
  • 매그너스 칼슨은 각각의 체스를 5초 만에 움직인다.
  • 상대는 각각 55초씩 움직인다.
  • 게임 평균 30페어 무브(총 60무브)

동기: Magnus Carlsen은 게임이 완료될 때까지 한 번에 한 게임씩 플레이하며 동시에 두 게임을 플레이하지 않습니다.각 게임은 (55 + 5) * 30 == 1800초 또는 30분이 소요됩니다.전체 전시 시간은 24 * 30 == 720분, 즉 12시간입니다.

비동기: Magnus Carlsen은 테이블에서 테이블로 이동하며 테이블마다 하나씩 이동합니다.그녀는 테이블에서 나와 대기 시간 동안 상대방이 다음 동작을 하도록 한다.24게임 모두 한 번의 이동에 저드 24 x 5 == 120초 또는 2분이 소요됩니다.전체 전시가 120 * 30 == 3600초로 단축되었습니다. 즉, 단 1시간입니다.

매그너스 칼슨은 두 손만 가지고 한 번에 한 동작만 하는 유일한 사람이다.하지만 비동기식으로 연주하면 전시 시간이 12시간에서 1시간으로 단축된다.

코딩 예시:

코드 스니펫을 사용하여 동기 및 비동기 실행 시간을 시연해 보겠습니다.

비동기 - async_count.화이

import asyncio  
import time  
  
  
async def count():  
    print("One", end=" ")  
    await asyncio.sleep(1)  
    print("Two", end=" ")  
    await asyncio.sleep(2)  
    print("Three", end=" ")  
  
  
async def main():  
    await asyncio.gather(count(), count(), count(), count(), count())  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    asyncio.run(main())  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")

비동기 - 출력:

One One One One One Two Two Two Two Two Three Three Three Three Three 
Executing - async_count.py
Execution Starts: 18453.442160108
Executions Ends: 18456.444719712
Totals Execution Time:3.00 seconds.

Synchronous - sync_count.화이

import time  
  
  
def count():  
    print("One", end=" ")  
    time.sleep(1)  
    print("Two", end=" ")  
    time.sleep(2)  
    print("Three", end=" ")  
  
  
def main():  
    for _ in range(5):  
        count()  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    main()  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")

동기 - 출력:

One Two Three One Two Three One Two Three One Two Three One Two Three 
Executing - sync_count.py
Execution Starts: 18875.175965998
Executions Ends: 18890.189930292
Totals Execution Time:15.01 seconds.

Python에서 멀티스레딩 대신 비동기식을 사용하는 이유는 무엇입니까?

  • 스레드 세이프 코드를 작성하는 것은 매우 어렵습니다.비동기 코드를 사용하면 코드가 어떤 태스크에서 다음 태스크로 전환될지 정확히 알 수 있으며, 레이스 조건은 매우 어려워집니다.
  • 스레드는 각 스레드에 자체 스택이 필요하기 때문에 상당한 양의 데이터를 소비합니다.비동기 코드를 사용하면 모든 코드가 동일한 스택을 공유하며 작업 간에 스택을 지속적으로 풀 수 있기 때문에 스택은 작게 유지됩니다.
  • 스레드는 OS 구조이기 때문에 플랫폼에서 지원하려면 더 많은 메모리가 필요합니다.비동기 태스크에는 이러한 문제가 없습니다.

asyncio는 어떻게 동작

자세히 알아보기 전에 Python Generator에 대해 알아보겠습니다.

Python 제너레이터:

를 포함하는 함수yield문은 생성기로 컴파일됩니다.함수의 본문에서 수율식을 사용하면 해당 함수가 생성자가 됩니다.이러한 함수는 반복 프로토콜 메서드를 지원하는 개체를 반환합니다.생성된 생성기 개체는 자동으로 다음을 수신합니다.__next()__방법.이전 섹션의 예시로 돌아가서 호출할 수 있습니다.__next__를 사용하는 대신 생성기 개체에서 직접next():

def asynchronous():
    yield "Educative"


if __name__ == "__main__":
    gen = asynchronous()

    str = gen.__next__()
    print(str)

제너레이터에 대해 다음 사항에 주의하십시오.

  • 생성 함수를 사용하면 비싼 값 계산을 지연할 수 있습니다.필요한 경우에만 다음 값을 계산합니다.이것에 의해, 제너레이터의 메모리와 계산의 효율이 높아집니다.이것에 의해, 긴 시퀀스를 메모리에 보존하거나, 고가의 계산을 사전에 실시하는 것을 피할 수 있습니다.
  • 제너레이터는 일시정지 시 마지막으로 실행된 항복문인 코드 위치 및 전체 로컬 범위를 유지합니다.이것에 의해, 종료한 장소로부터 실행을 재개할 수 있습니다.
  • 생성기 개체는 반복기에 지나지 않습니다.
  • 제너레이터 함수와 관련 제너레이터 개체를 구분해야 합니다. 이러한 개체는 종종 서로 교환할 수 있습니다.제너레이터 함수가 호출되면 제너레이터 개체가 반환되고next()제너레이터 개체에서 호출되어 제너레이터 기능 내에서 코드를 실행합니다.

제너레이터 상태:

제너레이터는 다음 상태를 거칩니다.

  • GEN_CREATED생성기 함수에서 생성기 개체가 처음으로 반환되고 반복이 시작되지 않은 경우.
  • GEN_RUNNINGgenerator 객체에서 next가 호출되어 python 인터프리터에 의해 실행되고 있는 경우.
  • GEN_SUSPENDED발전기가 수율로 매달려 있을 때
  • GEN_CLOSED제너레이터 실행이 완료되었거나 종료된 경우.

generator_cycle

제너레이터 객체에 대한 메서드:

제너레이터 개체는 제너레이터를 조작하기 위해 호출할 수 있는 다양한 메서드를 노출합니다.다음과 같습니다.

  • throw()
  • send()
  • close()

자세한 설명을 살펴보겠습니다.

비동기화 규칙:

  • 구문async def에는 네이티브 Coroutine 또는 비동기 제너레이터가 도입되어 있습니다.표현들async with그리고.async for유효합니다.
  • 키워드await는 함수 제어를 이벤트루프로 되돌립니다(주변 Coroutine의 실행을 정지합니다).Python이 검출된 경우await f()범위에 있어서의 발현g(), 이렇게 합니다.await이벤트 루프에 대해 "Suspend of of the Event Loop"을 알립니다.g()내가 뭘 기다리고 있든 간에--의 결과f():가 반환됩니다.그 사이에, 다른 것을 가동시키도록 해."

코드에서 두 번째 글머리 기호 포인트는 대략 다음과 같습니다.

async def g():
    # Pause here and come back to g() when f() is ready
    r = await f()
    return r

또한 언제, 어떻게 사용할 수 있고 사용할 수 없는지에 대한 엄격한 규칙도 있습니다.async/await구문을 아직 선택 중이거나 이미 구문을 사용하고 있는 경우에도 편리합니다.async/await:

  • 와 함께 도입하는 기능async def코루틴입니다.사용할 수 있습니다.await,return, 또는yield단, 이 모든 것은 옵션입니다.선언하다async def noop(): pass유효:
    • 사용.await및/또는returnCoroutine 함수를 만듭니다.코루틴 함수를 호출하려면await결과를 얻기 위해서요
    • 사용 빈도가 낮다yield에 있어서async def그러면 비동기 생성기가 생성되며, 이 생성기를 사용하여 반복할 수 있습니다.async for당분간 비동기 생성기는 잊고 coroutine 함수의 구문을 이해하는 데 집중합니다.await및/또는return.
    • 정의되어 있는 모든 것async def사용할 수 없음yield from그 결과,SyntaxError.
  • 마치...SyntaxError사용하다yield의 바깥에def기능, 이것은SyntaxError사용하다await밖에서async def코루틴사용할 수 있는 것은await코루틴의 몸속에 있습니다.

위의 몇 가지 규칙을 요약하는 간단한 예를 다음에 나타냅니다.

async def f(x):
    y = await z(x)     # OK - `await` and `return` allowed in coroutines
    return y

async def g(x):
    yield x            # OK - this is an async generator

async def m(x):
    yield from gen(x)  # NO - SyntaxError

def m(x):
    y = await z(x)     # NO - SyntaxError (no `async def` here)
    return y

제너레이터 기반 Coroutine

Python은 Python 생성기와 Coroutine으로 사용되도록 의도된 생성기를 구분했습니다.이러한 코루틴은 제너레이터 기반 코루틴이라고 불리며 데코레이터가 필요합니다.@asynio.coroutine함수의 정의에 추가할 수 있습니다.단, 이는 엄격히 강제되는 것은 아닙니다.

제너레이터 기반 코루틴 사용yield from대신 구문yield코루틴은 다음을 수행할 수 있습니다.

  • 다른 코루틴에서 산출하다
  • 미래에서 산출하다
  • 반문하다.
  • 예외를 제기하다

Python의 Coroutine은 공동 멀티태스킹을 가능하게 합니다.공동 멀티태스킹이란 실행 중인 프로세스가 CPU를 자발적으로 다른 프로세스에 넘기는 방법입니다.프로세스가 논리적으로 차단되어 있는 경우, 예를 들어 사용자 입력을 기다리는 동안 또는 네트워크 요구를 시작하여 잠시 아이돌 상태가 되는 경우 등이 있습니다.코루틴은 상태를 잃지 않고 발신자에게 제어를 양보할 수 있는 특수한 함수라고 정의할 수 있습니다.

코루틴과 발전기의 차이점은 무엇일까요?

제너레이터는 기능처럼 보이지만 본질적으로는 반복기입니다.일반적으로 발전기와 코루틴의 차이점은 다음과 같습니다.

  • 생성기는 호출자에게 값을 반환하는 반면, 코루틴은 다른 코루틴에 제어를 양보하고 제어를 포기한 시점부터 실행을 재개할 수 있습니다.
  • 생성자는 일단 시작하면 인수를 받아들일 수 없지만 Coroutine은 받을 수 있습니다.
  • 생성기는 주로 반복기 쓰기를 단순화하는 데 사용됩니다.그것들은 코루틴의 한 종류이며 때때로 세미코루틴이라고도 불린다.

제너레이터 기반 Coroutine 예제

우리가 쓸 수 있는 가장 간단한 제너레이터 기반 코루틴은 다음과 같습니다.

@asyncio.coroutine
def do_something_important():
    yield from asyncio.sleep(1)

코루틴은 1초 동안 잠을 잔다.데코레이터와 사용법에 주의해 주세요.yield from.

네이티브 기반 Coroutine 예시

네이티브는 코루틴을 정의하는 구문을 도입하여 코루틴을 언어의 일등 시민으로 만들었다는 것을 의미합니다.네이티브 코루틴은 다음 명령을 사용하여 정의할 수 있습니다.async/await구문을 사용합니다.우리가 쓸 수 있는 가장 간단한 네이티브 기반 코루틴은 다음과 같습니다.

async def do_something_important():
    await asyncio.sleep(1)

비동기 IO 설계 패턴

AsyncIO에는 독자적인 스크립트 설계 세트가 포함되어 있습니다.이 섹션에서는 이에 대해 설명합니다.

1. 이벤트 루프

이벤트 루프는 이벤트가 발생할 때까지 기다렸다가 이벤트핸들러에 디스패치 하는 프로그래밍 구조입니다이벤트는 UI 버튼을 클릭하는 사용자 또는 파일 다운로드를 시작하는 프로세스일 수 있습니다.비동기 프로그래밍의 핵심에 이벤트 루프가 있습니다.

코드 예:

import asyncio  
import random  
import time  
from threading import Thread  
from threading import current_thread  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[34m",  # Blue  
)  
  
  
async def do_something_important(sleep_for):  
    print(colors[1] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])  
    await asyncio.sleep(sleep_for)  
  
  
def launch_event_loops():  
    # get a new event loop  
  loop = asyncio.new_event_loop()  
  
    # set the event loop for the current thread  
  asyncio.set_event_loop(loop)  
  
    # run a coroutine on the event loop  
  loop.run_until_complete(do_something_important(random.randint(1, 5)))  
  
    # remember to close the loop  
  loop.close()  
  
  
if __name__ == "__main__":  
    thread_1 = Thread(target=launch_event_loops)  
    thread_2 = Thread(target=launch_event_loops)  
  
    start_time = time.perf_counter()  
    thread_1.start()  
    thread_2.start()  
  
    print(colors[2] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])  
  
    thread_1.join()  
    thread_2.join()  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[3] + f"Event Loop Start Time: {start_time}\nEvent Loop End Time: {end_time}\nEvent Loop Execution Time: {execution_time:0.2f} seconds." + colors[0])

실행 명령: python async_event_loop.py

출력:

async_event_loop

직접 테스트하여 출력을 조사하면 생성된 각 스레드가 자체 이벤트 루프를 실행하고 있음을 알 수 있습니다.

이벤트 루프의 유형

이벤트 루프에는 다음 두 가지 유형이 있습니다.

  • SelectorEventLoop: SelectorEventLoop은 셀렉터 모듈을 기반으로 하며 모든 플랫폼에서 기본 루프입니다.
  • ProactorEventLoop: ProactorEventLoop은 Windows의 I/O 완료 포트를 기반으로 하며 Windows에서만 지원됩니다.

2. 선물

Future는 현재 진행 중이거나 미래에 예약될 계산을 나타냅니다.비동기 조작의 최종 결과를 나타내는 특수한 저레벨 대기 가능 객체입니다.헷갈리지 마세요threading.Future그리고.asyncio.Future.

코드 예:

import time  
import asyncio  
from asyncio import Future  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[34m",  # Blue  
)  
  
  
async def bar(future):  
    print(colors[1] + "bar will sleep for 3 seconds" + colors[0])  
    await asyncio.sleep(3)  
    print(colors[1] + "bar resolving the future" + colors[0])  
    future.done()  
    future.set_result("future is resolved")  
  
  
async def foo(future):  
    print(colors[2] + "foo will await the future" + colors[0])  
    await future  
  print(colors[2] + "foo finds the future resolved" + colors[0])  
  
  
async def main():  
    future = Future()  
    await asyncio.gather(foo(future), bar(future))  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    asyncio.run(main())  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])

실행 명령: python async_futures.py

출력:

async_futures

두 코루틴 모두 미래로 넘어갔다.foo()coroutine은 미래가 해결되기를 기다리는 반면,bar()코루틴은 3초 후에 미래를 해결합니다.

3. 태스크

태스크는 미래와 같습니다.실제로 태스크는 Future의 하위 클래스이며 다음 방법을 사용하여 생성할 수 있습니다.

  • asyncio.create_task()는 코루틴을 받아들여 태스크로 랩합니다.
  • loop.create_task()는 코루틴만 받습니다.
  • asyncio.ensure_future()선물, 코루틴 및 대기할 수 있는 모든 객체를 허용합니다.

태스크는 코루틴을 랩하여 이벤트 루프에서 실행합니다.코루틴이 미래에서 대기하는 경우 태스크는 코루틴의 실행을 일시 중지하고 퓨처가 완료될 때까지 기다립니다.Future가 완료되면 랩된 Coroutine의 실행이 재개됩니다.

코드 예:

import time  
import asyncio  
from asyncio import Future  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[34m",  # Blue  
)  
  
  
async def bar(future):  
    print(colors[1] + "bar will sleep for 3 seconds" + colors[0])  
    await asyncio.sleep(3)  
    print(colors[1] + "bar resolving the future" + colors[0])  
    future.done()  
    future.set_result("future is resolved")  
  
  
async def foo(future):  
    print(colors[2] + "foo will await the future" + colors[0])  
    await future  
  print(colors[2] + "foo finds the future resolved" + colors[0])  
  
  
async def main():  
    future = Future()  
  
    loop = asyncio.get_event_loop()  
    t1 = loop.create_task(bar(future))  
    t2 = loop.create_task(foo(future))  
  
    await t2, t1  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    loop = asyncio.get_event_loop()  
    loop.run_until_complete(main())  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])

실행 명령: python async_tasks.py

출력:

async_tasks

4. 코루틴 체인:

코루틴의 주요 특징은 그들이 함께 사슬에 묶일 수 있다는 것이다.코루틴 오브젝트는 대기 가능하므로 다른 코루틴은await이를 통해 프로그램을 보다 작고 관리하기 쉬운 재사용 가능한 코루틴으로 분할할 수 있습니다.

코드 예:

import sys  
import asyncio  
import random  
import time  
  
# ANSI colors  
colors = (  
    "\033[0m",  # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[36m",  # Cyan  
  "\033[34m",  # Blue  
)  
  
  
async def function1(n: int) -> str:  
    i = random.randint(0, 10)  
    print(colors[1] + f"function1({n}) is sleeping for {i} seconds." + colors[0])  
    await asyncio.sleep(i)  
    result = f"result{n}-1"  
  print(colors[1] + f"Returning function1({n}) == {result}." + colors[0])  
    return result  
  
  
async def function2(n: int, arg: str) -> str:  
    i = random.randint(0, 10)  
    print(colors[2] + f"function2{n, arg} is sleeping for {i} seconds." + colors[0])  
    await asyncio.sleep(i)  
    result = f"result{n}-2 derived from {arg}"  
  print(colors[2] + f"Returning function2{n, arg} == {result}." + colors[0])  
    return result  
  
  
async def chain(n: int) -> None:  
    start = time.perf_counter()  
    p1 = await function1(n)  
    p2 = await function2(n, p1)  
    end = time.perf_counter() - start  
    print(colors[3] + f"--> Chained result{n} => {p2} (took {end:0.2f} seconds)." + colors[0])  
  
  
async def main(*args):  
    await asyncio.gather(*(chain(n) for n in args))  
  
  
if __name__ == "__main__":  
    random.seed(444)  
    args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])  
    start_time = time.perf_counter()  
    asyncio.run(main(*args))  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

출력에 주의해 주세요.function1()다양한 시간 동안 수면하고 있습니다.function2()는, 결과가 사용 가능하게 되면, 다음의 조작을 개시합니다.

실행 명령: python async_chained.py 11 8 5

출력:

async_chained

5. 큐 사용:

이 설계에서는 개별 소비자와 생산자가 연결되어 있지 않습니다.소비자들은 생산자 수, 심지어 대기열에 추가될 누적 품목 수조차 미리 알지 못한다.

각 생산자 또는 소비자가 큐에 항목을 넣고 큐에서 항목을 추출하는 데 걸리는 시간은 각각 다릅니다.큐는 생산자 및 소비자가 서로 직접 대화하지 않고 통신할 수 있는 throughput 역할을 합니다.

코드 예:

import asyncio  
import argparse  
import itertools as it  
import os  
import random  
import time  
  
# ANSI colors  
colors = (  
    "\033[0m",  # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[36m",  # Cyan  
  "\033[34m",  # Blue  
)  
  
  
async def generate_item(size: int = 5) -> str:  
    return os.urandom(size).hex()  
  
  
async def random_sleep(caller=None) -> None:  
    i = random.randint(0, 10)  
    if caller:  
        print(colors[1] + f"{caller} sleeping for {i} seconds." + colors[0])  
    await asyncio.sleep(i)  
  
  
async def produce(name: int, producer_queue: asyncio.Queue) -> None:  
    n = random.randint(0, 10)  
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer  
  await random_sleep(caller=f"Producer {name}")  
        i = await generate_item()  
        t = time.perf_counter()  
        await producer_queue.put((i, t))  
        print(colors[2] + f"Producer {name} added <{i}> to queue." + colors[0])  
  
  
async def consume(name: int, consumer_queue: asyncio.Queue) -> None:  
    while True:  
        await random_sleep(caller=f"Consumer {name}")  
        i, t = await consumer_queue.get()  
        now = time.perf_counter()  
        print(colors[3] + f"Consumer {name} got element <{i}>" f" in {now - t:0.5f} seconds." + colors[0])  
        consumer_queue.task_done()  
  
  
async def main(no_producer: int, no_consumer: int):  
    q = asyncio.Queue()  
    producers = [asyncio.create_task(produce(n, q)) for n in range(no_producer)]  
    consumers = [asyncio.create_task(consume(n, q)) for n in range(no_consumer)]  
    await asyncio.gather(*producers)  
    await q.join()  # Implicitly awaits consumers, too  
  for consumer in consumers:  
        consumer.cancel()  
  
  
if __name__ == "__main__":  
    random.seed(444)  
    parser = argparse.ArgumentParser()  
    parser.add_argument("-p", "--no_producer", type=int, default=10)  
    parser.add_argument("-c", "--no_consumer", type=int, default=15)  
    ns = parser.parse_args()  
    start_time = time.perf_counter()  
    asyncio.run(main(**ns.__dict__))  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

실행 명령: python async_queue.py -p 2 -c 4

출력:

async_queue

마지막으로 asyncio가 대기시간을 단축하는 예를 들어보겠습니다.coroutine을 지정하면generate_random_int()는 임의의 정수 [0, 10]를 계속 생성합니다.이들 중 하나가 임계값을 초과할 때까지 이 Coroutine의 여러 콜이 연속적으로 완료될 때까지 기다릴 필요가 없습니다.

코드 예:

import time  
import asyncio  
import random  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[36m",  # Cyan  
  "\033[35m",  # Magenta  
  "\033[34m",  # Blue  
)  
  
  
async def generate_random_int(indx: int, threshold: int = 5) -> int:  
    print(colors[indx + 1] + f"Initiated generate_random_int({indx}).")  
    i = random.randint(0, 10)  
    while i <= threshold:  
        print(colors[indx + 1] + f"generate_random_int({indx}) == {i} too low; retrying.")  
        await asyncio.sleep(indx + 1)  
        i = random.randint(0, 10)  
    print(colors[indx + 1] + f"---> Finished: generate_random_int({indx}) == {i}" + colors[0])  
    return i  
  
  
async def main():  
    res = await asyncio.gather(*(generate_random_int(i, 10 - i - 1) for i in range(3)))  
    return res  
  
  
if __name__ == "__main__":  
    random.seed(444)  
    start_time = time.perf_counter()  
    r1, r2, r3 = asyncio.run(main())  
    print(colors[4] + f"\nRandom INT 1: {r1}, Random INT 2: {r2}, Random INT 3: {r3}\n" + colors[0])  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[5] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

실행 명령: python async_random.py

출력:

async_random

주의: 사용자가 직접 코드를 작성하는 경우 암묵적인 것보다 명시적인 것을 위해 네이티브 코루틴을 선호합니다.생성기 기반 코루틴은 Python 3.10에서 제거됩니다.

GitHub Repo : https://github.com/tssovi/asynchronous-in-python

당신의.coro디스거링은 개념적으로는 맞지만 약간 불완전합니다.

await는 무조건 일시정지되는 것이 아니라 블로킹콜이 발생했을 경우에만 정지됩니다.콜이 차단되어 있는 것을 어떻게 알 수 있습니까?이는 대기 중인 코드에 따라 결정됩니다.예를 들어, 소켓 판독의 대기 가능한 실장은 다음 사항에 적합하지 않을 수 있습니다.

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

실제 비동기에서는 등가 코드에 의해 스테이트가 변경됩니다.Future마법의 값을 반환하는 대신 컨셉은 동일합니다.발전기 같은 물체에 적절히 적응하면 위의 코드는 다음과 같이 될 수 있습니다.awaited.

발신자측에서는, Coroutine 에 다음의 항목이 포함되어 있는 경우.

data = await read(sock, 1024)

다음과 같은 것으로 탈피합니다.

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

발전기에 익숙한 사람들은 위의 사항을 다음과 같은 관점에서 설명하는 경향이 있다.yield from서스펜션이 자동으로 작동합니다.

서스펜션 체인은 이벤트루프까지 계속됩니다.이 루프는 Coroutine이 서스펜드된 것을 인식하고 실행 가능한 세트에서 Coroutine을 삭제하고 실행 가능한 Coroutine을 실행합니다(있는 경우).실행할 수 있는 코루틴이 없는 경우 루프가 대기합니다.select()코루틴이 관심 있는 파일 기술자 중 하나가 IO 준비가 되거나 타임아웃이 만료될 때까지 계속됩니다.(이벤트 루프에서는 파일 설명자와 코루틴의 매핑이 유지됩니다).

위의 예에서는 1회select()이벤트 루프에 대해서sock읽을 수 있습니다. 다시 추가됩니다.coro정지 시점부터 계속됩니다.

즉, 다음과 같습니다.

  1. 기본적으로는 모든 것이 같은 스레드에서 발생합니다.

  2. 이벤트 루프는 코루틴을 스케줄링하고 코루틴이 대기하고 있는 모든 것(일반적으로 차단되는 IO 콜 또는 타임아웃)이 준비되었을 때 코루틴을 깨우는 역할을 합니다.

코루틴 구동 이벤트 루프에 대한 통찰력을 얻으려면 Dave Beazley가 라이브 청중 앞에서 이벤트 루프를 처음부터 코딩하는 것을 시연하는 이 강연을 추천합니다.

이 모든 것은 비동기화가 해결해야 하는 두 가지 주요 과제로 요약됩니다.

  • 단일 스레드에서 여러 I/O를 수행하는 방법
  • 공동 멀티태스킹 구현 방법

첫 번째 포인트에 대한 답은 오랫동안 존재해 왔으며 선택 루프라고 불립니다.python에서는 selectors 모듈에 구현됩니다.

두 번째 질문은 코루틴의 개념, 즉 실행을 중지하고 나중에 복원할 수 있는 함수와 관련된 것입니다.python에서 coroutine은 생성자와 yield from 문을 사용하여 구현됩니다.이것이 비동기/대기 구문 뒤에 숨겨져 있는 것입니다.

답변에 더 많은 리소스가 있습니다.


편집: goroutine에 대한 코멘트에 대한 대응:

asyncio에서 goroutine에 가장 가까운 것은 실제로는 coroutine이 아니라 작업입니다(매뉴얼의 차이 참조).python에서 coroutine(또는 생성기)은 이벤트 루프 또는 I/O의 개념에 대해 아무것도 모릅니다.이것은 단순히 다음을 사용하여 실행을 중지할 수 있는 기능입니다.yield현재 상태를 유지하면서 나중에 복원할 수 있습니다.yield from syntax allows for chaining them in a transparent way.

Now, within an asyncio task, the coroutine at the very bottom of the chain always ends up yielding a future. This future then bubbles up to the event loop, and gets integrated into the inner machinery. When the future is set to done by some other inner callback, the event loop can restore the task by sending the future back into the coroutine chain.


EDIT: Addressing some of the questions in your post:

How does I/O actually happen in this scenario? In a separate thread? Is the whole interpreter suspended and I/O happens outside the interpreter?

No, nothing happens in a thread. I/O is always managed by the event loop, mostly through file descriptors. However the registration of those file descriptors is usually hidden by high-level coroutines, making the dirty work for you.

What exactly is meant by I/O? If my python procedure called C open() procedure, and it in turn sent interrupt to kernel, relinquishing control to it, how does Python interpreter know about this and is able to continue running some other code, while kernel code does the actual I/O and until it wakes up the Python procedure which sent the interrupt originally? How can Python interpreter in principle, be aware of this happening?

An I/O is any blocking call. In asyncio, all the I/O operations should go through the event loop, because as you said, the event loop has no way to be aware that a blocking call is being performed in some synchronous code. That means you're not supposed to use a synchronous open within the context of a coroutine. Instead, use a dedicated library such aiofiles which provides an asynchronous version of open.

It allows you to write single-threaded asynchronous code and implement concurrency in Python. Basically, asyncio provides an event loop for asynchronous programming. For example, if we need to make requests without blocking the main thread, we can use the asyncio library.

The asyncio module allows for the implementation of asynchronous programming using a combination of the following elements:

  • 이벤트 루프:비동기 모듈에서는 프로세스별로 이벤트루프가 허용됩니다

  • 코루틴:코루틴은 특정 규칙을 따르는 발전기입니다.가장 흥미로운 기능은 실행 중에 일시정지되어 외부 처리(I/O의 일부 루틴)를 기다렸다가 외부 처리가 완료된 시점에서 복귀할 수 있다는 것입니다.

  • 선물:선물은 아직 끝나지 않은 과정을 나타낸다.미래는 미래에 결과가 있어야 할 대상이며 완료되지 않은 작업을 나타냅니다.

  • 작업: 이것은 의 서브클래스입니다.asyncio코루틴을 캡슐화하고 관리하는 미래.비동기식도 쓸 수 있고Coroutine을 캡슐화하는 작업 개체입니다.

가장 중요한 컨셉은asyncio이벤트 루프입니다.이벤트 루프를 사용하면 콜백 또는 코루틴을 사용하여 비동기 코드를 쓸 수 있습니다.이해의 열쇠asyncio코루틴과 이벤트루프의 용어입니다코루틴은 다른 I/O 작업이 실행되는 동안 실행을 중지할 수 있는 상태 저장 함수입니다.이벤트 루프는 코루틴 실행을 조정하기 위해 사용됩니다.

코루틴 함수를 실행하려면 이벤트 루프가 필요합니다.이렇게 할 수 있습니다.

    loop = asyncio.get_event_loop()

이것은 우리에게BaseEventLoop물건.이거는run_until_complete코루틴을 가져와 완료될 때까지 실행하는 메서드입니다.그런 다음 Coroutine은 결과를 반환합니다.로우 레벨에서는 이벤트루프에 의해BaseEventLoop.rununtilcomplete(future)방법.

언급URL : https://stackoverflow.com/questions/49005651/how-does-asyncio-actually-work

반응형