Gevent的hub剖析

Comments(1)


Posted on 2014-02-13 16:41:10 gevent


简介

  • hub是gevent中核心,依靠libev这个事件库,来调度所有的greenlet。

  • hub也是一个greenlet,就是所谓的’main’ greenlet。

  • hub会首先启动,然后马上启动事件循环。也就是libev的loop。

  • hub对应到实际的代码就是hub.py,其中最关键的是Hub类。

  • hub保存在线程的本地数据中,所以说每个线程中只存在一个’main’ hub。

  • 从main greenlet切换到普通greenlet之后,main greenlet是停止执行的。greenlet是安排合理的串行,从而看起来像是并行。

  • 切换到main greenlet是指首先切换到hub.wait,然后在greenlet.so中执行真正的greenlet切换。 即切换到需要运行的那个greenlet。

  • 所谓的’由hub决定运行哪个greenlet’,实际上是loop检测到greenlet感兴趣的事件后,首先切换到hub,再由hub.switch切换到发生事件的那个greenlet。主导整个过程的是libev的loop。

hub.py

hub.py中重要的函数有:

  • sleep
  • kill
  • get_hub
  • wait
  • iwait

重要的类有:

  • signal
  • Hub
  • Waiter

下面逐个介绍.

sleep函数

sleep函数的代码如下:

def sleep(seconds=0, ref=True):
    hub = get_hub()
    loop = hub.loop
    if seconds <= 0:
        waiter = Waiter()
        loop.run_callback(waiter.switch)
        waiter.get()
    else:
        hub.wait(loop.timer(seconds, ref=ref))

sleep函数的作用是,将当前greenlet挂起,并主动切换到’main’ greenlet。 默认情况下seconds=0, 会立即切换到’main’ greenlet,并将当前greenlet加入到greenlet链,等待下一次运行机会。 如果seconds>0,则在libev事件循环启动一个timer(是一个libev的watcher,详情参考libev),等待seconds之后,再切换到’main’ greenlet.

kill函数

kill函数的代码如下:

def kill(greenlet, exception=GreenletExit):
    if not greenlet.dead:
        get_hub().loop.run_callback(greenlet.throw, exception)

kill函数首先检查目标greenlet是否已经死去,不能kill已经死去的greenlet. 如果greenlet还活着,就让在libev的事件循环中运行greenlet.throw函数,参数是一个异常类。 一般是GreenletExit.

上面的加粗的greenlet是Greenlet类的实例,它的throw函数方法如下,其中最关键的代码是:

 # 这个greenlet是greenlet.so中的
greenlet.throw(self, *args)

上面的代码中,第一个参数self自然就是Greenlet类的实例,第二个参数就是传递的GreenletExit异常类。 上面的throw对应到greenlet.c中的throw_greenlet函数:

return single_result(g_switch(self, result, NULL));

g_switch的第一个参数self是要切换到的greenlet,第二个参数result是异常. g_switch切换到目标greenlet,引发异常并导致目标greenlet退出。 single_result函数是从g_switch函数的返回值中提取需要的内容并返回。

注意:如果传递的异常类不是GreenletExit,会导致’main’ greenlet的退出

get_hub函数

get_hub函数的功能比较单一,但运行此函数时,如果发现未实例化Hub类则实例化它,并将实例化的对象保存在线程本地数据中。

注意:这里虽然实例化了Hub类,但并没有运行’main’ hub,除非其他greenlet切换到’main’ greenlet.

wait函数

wait函数的代码如下:

def wait(objects=None, timeout=None, count=None):
    if objects is None:
        return get_hub().join(timeout=timeout)
    result = []
    if count is None:
        return list(iwait(objects, timeout))
    for obj in iwait(objects=objects, timeout=timeout):
        result.append(obj)
        count -= 1
        if count <= 0:
            break
    return result

wait函数使用的比较少,我搜索到在gevent中,只有一个地方调用wait函数,greenlet.py中的joinall函数:

def joinall(greenlets, timeout=None, raise_error=False, count=None):
    if not raise_error:
        wait(greenlets, timeout=timeout)
    else:
        for obj in iwait(greenlets, timeout=timeout):
            if getattr(obj, 'exception', None) is not None:
                raise obj.exception
            if count is not None:
                count -= 1
                if count <= 0:
                    break

joinall的作用是等待所有greenlet执行完毕。

wait(greenlets, timeout=timeout),这句代码调用了wait函数,所以在wait函数中实际执行的代码是:

return list(iwait(objects, timeout))

流程到了iwait函数这里,我们再详细的看一下iwait函数:

def iwait(objects, timeout=None):
    waiter = Waiter()
    switch = waiter.switch
    if timeout is not None:
        timer = get_hub().loop.timer(timeout, priority=-1)
        timer.start(waiter.switch, _NONE)
    try:
        count = len(objects)
        for obj in objects:
            obj.rawlink(switch)
        for _ in xrange(count):
            item = waiter.get()
            waiter.clear()
            if item is _NONE:
                return
            yield item
    finally:
        if timeout is not None:
            timer.stop()
        for obj in objects:
            unlink = getattr(obj, 'unlink', None)
            if unlink:
                try:
                    unlink(switch)
                except:
                    traceback.print_exc()

obj.rawlink会将Waiter类的switch函数添加到Greenlet实例属性中,当Greenlet实例执行完毕后,会调用它。 接下来在循环中,会根据objects的数量,执行很多次get方法。作用是等待greenlet执行完毕,并切换到’main’ hub中。

Hub类

hub是gevent中的核心,而Hub类是hub.py模块中的最关键的一个类。它是实现hub主要功能的类。 其次是Waiter类,它和Hub一起提供了gevent中绝大多数的功能。

Hub类中关键的方法有:

  • _init_
  • wait
  • switch
  • join
  • run
  • destroy

典型代码

为了方便介绍,下面给出Hub类的一个典型使用场景,下面的代码只摘取了关键部分:

# fileobject.py中:

# 创建一个io类型的watcher
# 用户监视fileno这个文件描述符

self._read_event = io(fileno, 1)

##############################################

# fileobject.py中:
# recv函数:
# recv函数里发生IO阻塞,将当前greenlet切换到main hub
# 让出控制权
# 等到感兴趣的IO事件发生后
# main hub会切换到这个点,继续执行

self.hub.wait(self._read_event)

##############################################

#Hub:
#hub.wait函数:
        waiter = Waiter()
        unique = object()
        # 这个watcher就是_read_event
        # 下面将waiter.switch这个回调函数,绑定到watcher上(io类型的)
        watcher.start(waiter.switch, unique) 
        try:
            # 切换到main hub, 等待io事件被触发
            # 然后main hub切换到被挂起的greenlet继续运行
            result = waiter.get()
        finally:
            watcher.stop()
            
##############################################

#Wait:
#wait.get函数:
            self.greenlet = getcurrent()
            try:
                # 切换到main hub
                return self.hub.switch()
            finally:
                self.greenlet = None
                
##############################################

Hub:
hub.switch函数:
        # 切换到main hub
        return greenlet.switch(self)

上面代码的功能概括为:

在普通的greenlet中(非main greenlet)中,创建一个socket。 将socket风转到libev的io类型的watcher中,再将这个watcher和当前greenlet关联起来。 最后从当前greenlet切换到main greenlet。

最后由libev的loop检测socket上发生的IO事件,当事件发生后,将从main greenlet切换到刚才被挂起的greenlet继续执行。

这样就体现了libev的异步和greenlet的同步完美的结合。

下面做简要分析:

  • 当从socket接收数据并发生阻塞事件时,为了能异步的得到事件通知,将watcher加入到libev的loop循环中。

  • self.hub.wait(self._read_event)就是将watcher加入到libev的loop循环中。

  • 调用hub.wait方法后,会从当前greenlet切换到main greenlet,也就是hub。过程如下所述。

  • 由于hub管理着所有的greenlet,并将这些greenlet和libev的loop关联起来。这是通过libev的watcher来关联的。

  • 在hub.wait中,启动一个Waiter: waiter = Waiter(), 并将water.switch这个回调函数和watcher关联起来: watcher.start(waiter.switch, unique)

  • 最后执行wait.get将当前greenlet切换到main greenlet。这时libev如果检测到socket上发生了greenlet感兴趣的事件,则从main greenlet切换到刚才被挂起的greenlet,并从挂起处继续执行。

__init__方法

__init__函数的功能是初始化设置loop类,并初始化:

loop_class = config('gevent.core.loop', 'GEVENT_LOOP')
...
self.loop = loop_class(flags=loop, default=default)

gevent.core.loop是gevent的C模块core.so中的loop类,也就是libev的loop做了一层包装。它是hub的核心。

wait方法

wait方法的代码如下:

def wait(self, watcher):
    waiter = Waiter()
    unique = object()
    watcher.start(waiter.switch, unique)
    try:
        result = waiter.get()
        assert result is unique, 'Invalid...'
    finally:
        watcher.stop()
  • 创建Waiter类实例

  • object()是一个唯一的对象,作为从main greenlet返回时的跟踪对象

  • waiter.switch这个callback附加到watcher上,参数为object()

  • 执行waiter.get从当前greenlet切换到main greenlet

  • watcher感兴趣的事件发生后,触发调度,从main greelet切换回来,代码继续执行

switch方法

switch方法的代码如下:

def switch(self):
    switch_out = getattr(getcurrent(), 'switch_out', None)
    if switch_out is not None:
        switch_out()
    return greenlet.switch(self)
  • 在当前greenlet对象中寻找switch_out属性,如果找到就调用。可以在继承Geenlet类时时重载switch_out方法,作为用户自定义的接口。

  • 调用greenlet.switch(self),切换到main greenlet。

  • 这里的greenlet是C模块的greenlet.so中greenlet类的方法,参数self为当前的greenlet对象。

    join方法

join方法的功能是,等待Event loop执行完毕,当没有活动的greenlet、正在运行的server、超时器、watcher以后,join方法会退出。

当timeout参数不为None时,启动一个超时器,等待timeout秒,再切换到main greenlet。 否则直接切换到main greenlet。

其实join方法的核心只是切换到main greenlet,但为什么切换到main greenlet就能等到Event loop退出呢?

因为切换到main greenlet之后,才能由main greenlet管理所有的greenlet,当watcher上事件发生时,除了main greenlet之外的其他greenlet才能有机会运行。当其他所有greenlet都运行完毕,Event loop自然就退出了。

看起来有点故弄玄虚的意思,所以其实只需要switch到main greenlet即可。

join方法的代码如下:

def join(self, timeout=None):
    assert getcurrent() is self.parent, "only possible from the MAIN greenlet"
    if self.dead:
        return True

    waiter = Waiter()

    if timeout is not None:
        timeout = self.loop.timer(timeout, ref=False)
        timeout.start(waiter.switch)

    try:
        try:
            waiter.get()
        except LoopExit:
            return True
    finally:
        if timeout is not None:
            timeout.stop()
    return False
  • 初始化一个Waiter类的实例

  • 如果提供了timeout,会启动超时器,等待timeout秒后,在切换到main greenlet

  • 如果未提供timeout参数,则直接切换到main greenlet

run方法

run方法的代码如下:

def run(self):
    assert self is getcurrent(), 'Do not call Hub.run() directly'
    while True:
        loop = self.loop
        loop.error_handler = self
        try:
            loop.run()
        finally:
            loop.error_handler = None  # break the refcount cycle
        self.parent.throw(LoopExit('This operation would block forever'))

run方法是hub的核心,由run方法来启动libev的loop run方法启动了libev的loop,并且会一直阻塞,直到收到了信号

前一篇: Greenlet简介 后一篇: 浅谈TCP/IP网络编程中socket的行为

Captcha:
验证码

Email:

Content: (Support Markdown Syntax)


0N7CZwR5mOJ  2015-08-07 23:57:56 From 188.143.234.155

Ab fab my godloy man.