菜单 学习猿地 - LMONKEY

VIP

开通学习猿地VIP

尊享10项VIP特权 持续新增

知识通关挑战

打卡带练!告别无效练习

接私单赚外块

VIP优先接,累计金额超百万

学习猿地私房课免费学

大厂实战课仅对VIP开放

你的一对一导师

每月可免费咨询大牛30次

领取更多软件工程师实用特权

入驻
264
0

事件驱动-协程实现爬虫

原创
05/13 14:22
阅读数 68464

实验三:事件驱动-协程实现爬虫

什么是协程?

协程其实是比起一般的子例程而言更宽泛的存在,子例程是协程的一种特例。

子例程的起始处是惟一的入口点,一旦退出即完成了子例程的执行,子例程的一个实例只会返回一次。

协程可以通过yield来调用其它协程。通过yield方式转移执行权的协程之间不是调用者与被调用者的关系,而是彼此对称、平等的。

协程的起始处是第一个入口点,在协程里,返回点之后是接下来的入口点。子例程的生命期遵循后进先出(最后一个被调用的子例程最先返回);相反,协程的生命期完全由他们的使用的需要决定。

还记得我们什么时候会用到yield吗,就是在生成器(generator)里,在迭代的时候每次执行next(generator)生成器都会执行到下一次yield的位置并返回,可以说生成器就是例程。

一个生成器的例子:

#定义生成器函数
def fib():
    a, b = 0, 1
    while(True):
            yield a
            a, b = b, a + b
#获得生成器
fib = fib()

next(fib)     # >> 0
next(fib)     # >> 1


生成器是如何工作的?

在考察生成器前,我们需要先了解一般的python函数是如何运行的。当一个函数调用子函数时,控制就会交给子函数,直到子函数返回或者抛出异常时才会将控制交还给调用函数。

自定义两个函数:

>>> def foo():
...     bar()
...
>>> def bar():
...     pass


标准Python解释器CPython中的PyEval_EvalFrameEx方法会取得栈帧和待运行的字节码,并在得到的栈帧的上下文环境下计算字节码的结果。以下是foo函数的字节码:

从字节码可以看出foo函数加载bar到栈上之后通过CALL_FUNCTION调用,bar返回后弹出bar的返回值,加载None到栈上并将其作为foo的返回值返回。

PyEval_EvalFrameEx遇到CALL_FUNCTION字节码时,它创建一个新的Python栈帧。

需要了解的一点是Python的栈帧是存在于堆上的。CPython作为一个普通的C程序,它的栈帧就在栈上,但是CPython所控制的Python的栈帧却是在堆上的,所以Python的栈帧在函数调用结束后是仍能够保持存在。我们设置一个全局变量frame,将bar的栈帧赋给frame

>>> import inspect
>>> frame = None
>>> def foo():
...     bar()
...
>>> def bar():
...     global frame
...     frame = inspect.currentframe()
...
>>> foo()
>>> #得到'bar'的栈帧
>>> frame.f_code.co_name
'bar'
>>> # 它的返回指针指向foo的栈
>>> caller_frame = frame.f_back
>>> caller_frame.f_code.co_name
'foo'


此处输入图片的描述

现在让我们考察一下生成器的结构,先定义一个生成器函数:

>>> def gen_fn():
...     result = yield 1
...     print('result of yield: {}'.format(result))
...     result2 = yield 2
...     print('result of 2nd yield: {}'.format(result2))
...     return 'done'


当 Python 将 gen_fn 编译为字节码时,它会检查有没有yield,有的话那就是生成器函数了,编译器会在该函数的flag上打上标识:

>>> # 生成器的标识位是第5位.
>>> generator_bit = 1 << 5
>>> bool(gen_fn.__code__.co_flags & generator_bit)
True


调用生成器函数时会生成一个生成器对象:

>>> gen = gen_fn()
>>> type(gen)
<class 'generator'>


生成器对象会封装一个栈帧和一个对代码的引用:

>>> gen.gi_code.co_name
'gen_fn'


所有来自同一个生成器函数的生成器对象都会引用同一份代码,但是却会拥有各自不同的栈帧,生成器对象结构图如下:

此处输入图片的描述

帧拥有f_lasti指针,它指向之前最新一次运行的指令。它初始化的时候为-1,说明生成器还没有开始运行。

>>> gen.gi_frame.f_lasti
-1


当我们对生成器执行send方法时,生成器会运行到第一次yield的位置并停止。在这里它返回1,正如我们编写的代码预期的那样。

>>> gen.send(None)
1


现在f_lasti指向3了,比最开始前进了4个字节码,以及可以发现这个函数一共生成了56个字节码:

>>> gen.gi_frame.f_lasti
3
>>> len(gen.gi_code.co_code)
56


生成器能够停止,也能够在任意时刻通过任意函数恢复,这也是因为栈帧是在堆上而不是栈上,所以不用遵守函数调用的先进后出原则。

我们可以send"hello"字符串给生成器,它会在之前停止的yield那里得到并赋值给result,之后生成器继续运行直到下一个yield位置停止并返回。

>>> gen.send('hello')
result of yield: hello
2


查看生成器的局部变量:

>>> gen.gi_frame.f_locals
{'result': 'hello'}


当我们再次调用send的时候,生成器从它第二次yield的地方继续运行,到最后已经没有yield了,所以出现了StopIteration异常:

>>> gen.send('goodbye')
result of 2nd yield: goodbye
Traceback (most recent call last):
  File "<input>", line 1, in <module>
StopIteration: done


可以看到,该异常的值是生成器最后返回的值,在这里就是字符串"done"

生成器实现协程模型

虽然生成器拥有一个协程该有的特性,但光这样是不够的,做异步编程仍是困难的,我们需要先用生成器实现一个协程异步编程的简单模型,它同时也是Python标准库asyncio的简化版,正如asyncio的实现,我们会用到生成器,Future类,以及yield from语句。

首先实现Future类, Future类可以认为是专门用来存储将要发送给协程的信息的类。

class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)


Future对象最开始处在挂起状态,当调用set_result时被激活,并运行注册的回调函数,该回调函数多半是对协程发送信息让协程继续运行下去的函数。

我们改造一下之前从fetchconnected的函数,加入Futureyield

这是之前回调实现的fetch

class Fetcher:
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('localhost', 3000))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(),
                          EVENT_WRITE,
                          self.connected)

    def connected(self, key, mask):
        print('connected!')
        # ...后面省略...


改造后,我们将连接建立后的部分也放到了fetch中。

class Fetcher:
    def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect(('localhost', 3000))
        except BlockingIOError:
            pass

        f = Future()

        def on_connected():
            #连接建立后通过set_result协程继续从yield的地方往下运行
            f.set_result(None)

        selector.register(sock.fileno(),
                          EVENT_WRITE,
                          on_connected)
        yield f
        selector.unregister(sock.fileno())
        print('connected!')


fetcher是一个生成器函数,我们创建一个Future实例,yield它来暂停fetch的运行直到连接建立f.set_result(None)的时候,生成器才继续运行。那set_result时运行的回调函数是哪来的呢?这里引入Task类:

class Task:
    def __init__(self, coro):
        #协程
        self.coro = coro
        #创建并初始化一个为None的Future对象
        f = Future()
        f.set_result(None)
        #步进一次(发送一次信息)
        #在初始化的时候发送是为了协程到达第一个yield的位置,也是为了注册下一次的步进
        self.step(f)

    def step(self, future):
        try:
            #向协程发送消息并得到下一个从协程那yield到的Future对象
            next_future = self.coro.send(future.result)
        except StopIteration:
            return

        next_future.add_done_callback(self.step)

fetcher = Fetcher('/')
Task(fetcher.fetch())

loop()


流程大致是这样的,首先Task初始化,向fetch生成器发送None信息(也可以想象成step调用了fetch,参数是None),fetch得以从开头运行到第一个yield的地方并返回了一个Future对象给stepnext_future,然后step就在这个得到的Future对象注册了step。当连接建立时on_connected就会被调用,再一次向协程发送信息,协程就会继续往下执行了。

使用yield from分解协程

一旦socket连接建立成功,我们发送HTTP GET请求到服务器并在之后读取服务器响应。现在这些步骤不用再分散在不同的回调函数里了,我们可以将其放在同一个生成器函数中:

def fetch(self):
    # ... 省略连接的代码
    sock.send(request.encode('ascii'))

    while True:
        f = Future()

        def on_readable():
            f.set_result(sock.recv(4096))

        selector.register(sock.fileno(),
                          EVENT_READ,
                          on_readable)
        chunk = yield f
        selector.unregister(sock.fileno())
        if chunk:
            self.response += chunk
        else:
            # 完成读取
            break


但是这样代码也会越积越多,可不可以分解生成器函数的代码呢,从协程中提取出子协程?Python 3yield from能帮助我们完成这部分工作。:

>>> def gen_fn():
...     result = yield 1
...     print('result of yield: {}'.format(result))
...     result2 = yield 2
...     print('result of 2nd yield: {}'.format(result2))
...     return 'done'
...     


使用yield from在一个生成器中调用另一个生成器:

>>> # Generator function:
>>> def caller_fn():
...     gen = gen_fn()
...     rv = yield from gen
...     print('return value of yield-from: {}'
...           .format(rv))
...
>>> # Make a generator from the
>>> # generator function.
>>> caller = caller_fn()


caller生成器发送消息,消息送到了gen生成器那里,在gen还没返回前caller就停在rv = yield from gen这条语句上了。

>>> caller.send(None)
1
>>> caller.gi_frame.f_lasti
15
>>> caller.send('hello')
result of yield: hello
2
>>> caller.gi_frame.f_lasti  # 可以发现指令没有前进
15
>>> caller.send('goodbye')
result of 2nd yield: goodbye
return value of yield-from: done
Traceback (most recent call last):
  File "<input>", line 1, in <module>
StopIteration


对于我们来说,于外,我们无法判断发送消息时yield的值是来自caller还是caller内的子协程(比如gen),于内,我们也不用关心gen所得到的消息是从哪里传送来的,gen只用负责在上一次yield时得到消息输入,运行到下一个yield时返回输出,重复这个模式到最后return就可以了。

yield from得到的子协程最后return的返回值:

rv = yield from gen


想一想之前我们抱怨回调函数抛出异常时看不到上下文,这回我们看看协程是怎么样的:

>>> def gen_fn():
...     raise Exception('my error')
>>> caller = caller_fn()
>>> caller.send(None)
Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "<input>", line 3, in caller_fn
  File "<input>", line 2, in gen_fn
Exception: my error


清楚多了,栈跟踪显示在gen_fn抛出异常时消息是从caller_fn委派到gen_fn的。

协程处理异常的手段跟普通函数也是一样的:

>>> def gen_fn():
...     yield 1
...     raise Exception('uh oh')
...
>>> def caller_fn():
...     try:
...         yield from gen_fn()
...     except Exception as exc:
...         print('caught {}'.format(exc))
...
>>> caller = caller_fn()
>>> caller.send(None)
1
>>> caller.send('hello')
caught uh oh


现在让我们从fetch协程上分解出一些子协程。(注意分离出的子协程并不是Fetcher的成员协程)

实现read协程接收一个数据块:

def read(sock):
    f = Future()

    def on_readable():
        #在socket可读时读取消息并向协程发送一个数据快
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    #yield f停止协程,等到可读时,从f那得到数据块。
    chunk = yield f  
    selector.unregister(sock.fileno())
    return chunk


实现read_all协程接收整个消息:

def read_all(sock):
    response = []
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)

    return b''.join(response)


如果将yield from去掉,看上去就跟之前实现阻塞式I/O读取差不多呢。

现在在fetch中调用read_all

class Fetcher:
    def fetch(self):
         # ... 省略连接的代码:
        sock.send(request.encode('ascii'))
        self.response = yield from read_all(sock)


嗯,现在看上去代码短多了,但是可以做的更好。 从之前的代码可以看到当我们等一个Future返回时使用的是yield,而等一个子协程返回却是使用yield from,我们可以让两者统一起来。得益于生成器与迭代器在Python中的一致性,我们实现Future方法同时让它成为一个生成器:

def __iter__(self):
    yield self
    return self.result


这样yield fyield from f的效果就同样是输入Future返回Future的结果了。 统一的好处是什么呢?统一之后无论你是调用一个返回Future的协程还是返回值的协程都可以统一用yield from应对了,当你想改变协程的实现时也不用担心对调用函数(or协程)产生影响。

完成后续工作

我们将连接的逻辑也从fetch中分离出来:

def connect(sock, address):
    f = Future()
    sock.setblocking(False)
    try:
        sock.connect(address)
    except BlockingIOError:
        pass

    def on_connected():
        f.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, on_connected)
    yield from f
    selector.unregister(sock.fileno())


fetch现在长这个样子:

def fetch(self):
    global stopped

    sock = socket.socket()
    yield from connect(sock, ('xkcd.com', 80))
    get = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
    sock.send(get.encode('ascii'))
    self.response = yield from read_all(sock)

    self._process_response()
    urls_todo.remove(self.url)
    if not urls_todo:
        stopped = True
    print(self.url)


将上一节课中的parse_links 改名 _process_response 并稍作修改:

def _process_response(self):
    if not self.response:
        print('error: {}'.format(self.url))
        return
    if not self._is_html():
        return
    urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''',
                          self.body()))

    for url in urls:
        normalized = urllib.parse.urljoin(self.url, url)
        parts = urllib.parse.urlparse(normalized)
        if parts.scheme not in ('', 'http', 'https'):
            continue
        host, port = urllib.parse.splitport(parts.netloc)
        if host and host.lower() not in ('xkcd.com', 'www.xkcd.com'):
            continue
        defragmented, frag = urllib.parse.urldefrag(parts.path)
        if defragmented not in urls_seen:
            urls_todo.add(defragmented)
            urls_seen.add(defragmented)
            Task(Fetcher(defragmented).fetch())


主循环部分:

start = time.time()
fetcher = Fetcher('/')
Task(fetcher.fetch())

while not stopped:
    events = selector.select()
    for event_key, event_mask in events:
        callback = event_key.data
        callback()

print('{} URLs fetched in {:.1f} seconds'.format(
    len(urls_seen), time.time() - start))


运行效果

这里先奉上完整代码:

from selectors import *
import socket
import re
import urllib.parse
import time


class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def result(self):
        return self.result

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

    def __iter__(self):
        yield self 
        return self.result


class Task:
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except StopIteration:
            return

        next_future.add_done_callback(self.step)


urls_seen = set(['/'])
urls_todo = set(['/'])
#追加了一个可以看最高并发数的变量
concurrency_achieved = 0
selector = DefaultSelector()
stopped = False


def connect(sock, address):
    f = Future()
    sock.setblocking(False)
    try:
        sock.connect(address)
    except BlockingIOError:
        pass

    def on_connected():
        f.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, on_connected)
    yield from f
    selector.unregister(sock.fileno())


def read(sock):
    f = Future()

    def on_readable():
        f.set_result(sock.recv(4096))  # Read 4k at a time.

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield from f
    selector.unregister(sock.fileno())
    return chunk


def read_all(sock):
    response = []
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)

    return b''.join(response)


class Fetcher:
    def __init__(self, url):
        self.response = b''
        self.url = url

    def fetch(self):
        global concurrency_achieved, stopped
        concurrency_achieved = max(concurrency_achieved, len(urls_todo))

        sock = socket.socket()
        yield from connect(sock, ('localhost', 3000))
        get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(self.url)
        sock.send(get.encode('ascii'))
        self.response = yield from read_all(sock)

        self._process_response()
        urls_todo.remove(self.url)
        if not urls_todo:
            stopped = True
        print(self.url)

    def body(self):
        body = self.response.split(b'\r\n\r\n', 1)[1]
        return body.decode('utf-8')



    def _process_response(self):
        if not self.response:
            print('error: {}'.format(self.url))
            return
        if not self._is_html():
            return
        urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''',
                              self.body()))

        for url in urls:
            normalized = urllib.parse.urljoin(self.url, url)
            parts = urllib.parse.urlparse(normalized)
            if parts.scheme not in ('', 'http', 'https'):
                continue
            host, port = urllib.parse.splitport(parts.netloc)
            if host and host.lower() not in ('localhost'):
                continue
            defragmented, frag = urllib.parse.urldefrag(parts.path)
            if defragmented not in urls_seen:
                urls_todo.add(defragmented)
                urls_seen.add(defragmented)
                Task(Fetcher(defragmented).fetch())

    def _is_html(self):
        head, body = self.response.split(b'\r\n\r\n', 1)
        headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:])
        return headers.get('Content-Type', '').startswith('text/html')


start = time.time()
fetcher = Fetcher('/')
Task(fetcher.fetch())

while not stopped:
    events = selector.select()
    for event_key, event_mask in events:
        callback = event_key.data
        callback()

print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format(
    len(urls_seen), time.time() - start, concurrency_achieved))


运行python3 coroutine.py命令查看效果:

此处输入图片的描述

七、总结

至此,我们在学习的过程中掌握了:

  1. 线程池实现并发爬虫
  2. 回调方法实现异步爬虫
  3. 协程技术的介绍
  4. 一个基于协程的异步编程模型
  5. 协程实现异步爬虫

三种爬虫的实现方式中线程池是最坏的选择,因为它既占用内存,又有线程竞争的危险需要程序员自己编程解决,而且产生的I/O阻塞也浪费了CPU占用时间。再来看看回调方式,它是一种异步方法,所以I/O阻塞的问题解决了,而且它是单线程的不会产生竞争,问题好像都解决了。然而它引入了新的问题,它的问题在于以这种方式编写的代码不好维护,也不容易debug。看来协程才是最好的选择,我们实现的协程异步编程模型使得一个单线程能够很容易地改写为协程。那是不是每一次做异步编程都要实现TaskFuture呢?不是的,你可以直接使用asyncio官方标准协程库,它已经帮你把TaskFuture封装好了,你根本不会感受到它们的存在,是不是很棒呢?如果你使用Python 3.5那更好,已经可以用原生的协程了,Python 3.5追加了async defawait等协程相关的关键词。这些会在今后的课程中再一一展开,下一个发布的课程就是asyncio库实现网络爬虫啦。

八、参考资料

发表评论

0/200
264 点赞
0 评论
收藏