玩转 ASGI:从零到一实现一个实时博客

利用 ASGI,可以同时处理 HTTP、HTTP2 和 WebSocket。这里将示范如何实现一个实时博客的应用。

最终效果

ASGI 、Django Channels 简介


ASGI 的完整说明我在去年做了一个翻译
ASGI 由 Django 团队提出,为了解决在一个网络框架里(如 Django)同时处理 HTTP、HTTP2、WebSocket 协议。为此,Django 团队开发了 Django Channels 插件,为 Django 带来了 ASGI 能力。
在 ASGI 中,将一个网络请求划分成三个处理层面,最前面的一层,interface server(协议处理服务器),负责对请求协议进行解析,并将不同的协议分发到不同的 Channel(频道);频道属于第二层,通常可以是一个队列系统。频道绑定了第三层的 Consumer(消费者)。
比如说,HTTP 协议的频道绑定了 HTTP 的消费者,当有新的 HTTP 请求过来时,interface server 将该请求分发到 HTTP 频道,HTTP 频道绑定的 HTTP 消费者对该请求进行处理,将处理结果返回给 HTTP 频道,最终传回给客户端。
下面,我们用一个实例演示下这种能力。完整源码在 Github-heshiyou/livelog: A Django Channels example project to demonstrate the ASGI use case.

实时博客的实现

第一步:跑起来

例基于 Python 3.5.1、macOS 10.12.4 beta 5,理论上 Python 2.7、Linux 可以跑起来(测试工作请自行认领)。
安装依赖:pip install -r requirements.txt
requirements.txt 如下:

1
2
3
4
django
channels
asgi_redis
feedparser

新建 Django 项目:django-admin startproject livelog
settings.py 里面,将 channels 添加到 INSTALLED_APPS,再加上基础的 channels 相关配置:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
...
INSTALLED_APPS = (
    ...
    'channels',
)
...
# Redis
REDIS_OPTIONS = {
    'HOST': '127.0.0.1',
    'PORT': 6379,
    'DB': 0
}
USE_REDIS = True
# Channel settings
CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "asgi_redis.RedisChannelLayer",
        "CONFIG": {
            "hosts": ['redis://{}:{}'.format(REDIS_OPTIONS['HOST'],
                                             REDIS_OPTIONS['PORT'])]
        },
        "ROUTING": "livelog.routing.channel_routing"
    }
}

注意到使用了 Redis 来做为 Channels 的 Backend。使用 Redis 是为了跨进程的消息处理,为了简便(不需要跨进程),也可以使用 In Memory Channel Layer。
接着,我们在 livelog app 目录下添加一个 routing.py 文件,用以配置 channel 路由,现在可以留空:

1
channel_routing = []

现在,通过以下命令即可跑起来我们的第一个 ASGI Server。

1
2
3
# run following commands in command line separately
redis-server /usr/local/etc/redis.conf
python manage.py runserver

可以看到和普通的 Django app server 的启动方式没什么两样,这归功于 Django Channels 封装。实际工作中,要拆解为三个单独的启动步骤:

1
2
3
daphne livelog.asgi:channel_layer --port 8080  //daphne 是 asgi interface sever 的一种实现
python manage.py runworker // 启动 ASGI consumer worker
python manage.py runserver --noasgi --noworker // 启动原始的 WSGI server,不运行 ASGI interface server、worker

第二步:处理 WebSocket

Channels 将 WebSocket 连接映射到三个不同到频道:

  • 当一个新的客户端通过 WebSocket 连接时,websocket.connect 频道将收到一条消息,在本例中,将在此频道将新用户添加到实时博客订阅组里。
  • 当一个客户端断开连接,websocket.disconnect 频道将收到一条消息。
  • 每一条消息都将被发往 websocket.receive 频道,在这个频道里。
    首先,我们需要将我们的处理逻辑绑定到这三个频道。
    在 livelog app 目录中,添加 comsumers.py 文件:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
...
def ws_connect(message):
    message.reply_channel.send({'accept': True})
    Group(const.GROUP_NAME).add(message.reply_channel)

def ws_disconnect(message):
    Group(const.GROUP_NAME).discard(message.reply_channel)

def ws_receive(message):
    pass

在本例中,使用 WSGI 也即原生 Django 提供 http 服务。
routing.py 文件中,将逻辑绑定到路由:

1
2
3
4
5
6
...
channel_routing = [
    route('websocket.connect', ws_connect),
    route('websocket.disconnect', ws_disconnect),
    route('websocket.receive', ws_receive)
]

ws_connect 频道中,做了两件事:

  1. 建立 WebSocket 连接;
  2. 将新用户添加到群组里,后面我们将该群组作为实时博客的订阅组,对它发送新的消息。
    ws_disconnect 频道中,将断开连接的用户移出群组。
    ws_receive 频道中,暂时什么也不做,因为本例中,我们不通过被动接受新消息的方式来更新博客。
    为了较为全面地展示 ASGI 的能力,本例中,我们使用 Background worker(后台进程)来主动更新博客,并给群组用户推送新消息。

第三步:推送消息

更新实时博客的方式有很多种,在这里我们使用一个 RSS Feed 作为内容源,对该内容源定时抓取,将新内容更新到博客中,以此来做一个演示。在此之前,介绍一个 Django 的小特性,这个特性将允许我们这么运行我们自己的后台进程:

1
python manage.py blogging_worker // blogging_worker 是我们自定义的命令

这个特新就是 Django 的自定义命令
具体做法在此不赘述,看文档或看本例完整源码即可得知。下面我们回到正事儿上:
blogging_worker.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# -*- coding: utf-8 -*-
...
class Command(BaseCommand):
    """
    Command to start blogging worker from command line.
    """
    help = 'Fetch and parse RSS feed and send over channel'

    def handle(self, *args, **options):
        rc = redis.Redis(host=settings.REDIS_OPTIONS['HOST'],
                         port=settings.REDIS_OPTIONS['PORT'],
                         db=settings.REDIS_OPTIONS['DB'])
        rc.delete(const.GROUP_NAME)  # flush live blogs
        while True:
            feed = feedparser.parse(const.IFANR_FEED_URL)
            for entry in feed.get('entries')[::-1]:
                if not rc.hexists(const.GROUP_NAME, entry.get('id')):
                    Group(const.GROUP_NAME).send({'text': json.dumps(entry)})
                    rc.hset(const.GROUP_NAME, entry.get('id'), json.dumps(entry))
                    logger.debug('send a message %s ' % entry.get('title'))
                    time.sleep(5)

这个文件里的大量写法都是 Django 约束的,所以不必过分追究,我们看 handle 方法,在这里,我们做这么几件事:

  1. 从爱范儿的 RSS Feed 获取最新的文章,对每一文章,检查这篇文章是否已经发送过,如果没有发送过,则将该文章发送给群组;
  2. 在 redis 中,新建一个名为 liveblog(这个常量存储在 const.GROUP_NAME) 的哈希表,将每篇未被发送过的新文章添加到该哈希表中,这样就可以判断一篇文章是否曾经被发送过;
  3. Group(const.GROUP_NAME).send({'text': json.dumps(entry)}) 对群组进行新消息的发送;
  4. 每次发送之后,休息 5 秒钟。

第四步:接收新消息并渲染

我们已经有了一个永不停歇的博客更新服务在运行着,一旦有新的内容,这个服务将为用户推送。现在,我们需要一个页面来渲染呈现新消息。
在 templates 目录下,新建 blog.html 文件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<!DOCTYPE html>
<html lang="en">
...
        <div id="container">
        <h2>Live Blog</h2>
        <ul id="log">

        </ul>
    </div>

    <script type="application/javascript">
        var ws_scheme = window.location.protocol == "https:" ? "wss" : "ws";
        var ws = new WebSocket(ws_scheme + '://' + window.location.host + window.location.pathname);
        console.log(ws);
        ws.onmessage = function (message) {
            var data = JSON.parse(message.data);
            var logList = document.querySelector('#log');
            var logItem = document.createElement('li');

            var itemTmp = `
                <h3>${data.title}</h3>
                <p>
                    <date>${new Date(data.published).toLocaleString()}</date>
                    <span>${data.source.title}</span>
                </p>
            `;
            logItem.innerHTML = itemTmp;
            logList.insertBefore(logItem, logList.firstChild);
        }

    </script>
</body>
</html>

在这个页面里,有一个 WebSocket Client 将被新建,并向服务器发起建立连接的请求。当有新消息,将在 #log ul 中插入新的消息。
接下来就是传统 Django 的工作:

  1. urls.py 中配置 django view 的路由: url(r'blog/$', livelog.views.livelog, name='livelog')
  2. views.py 中新增一个名为 livelog 的 view: def livelog(request): return render(request, 'blog.html'),渲染模板,响应 http 请求。

到此,一个主动更新博客内容并将新消息推送给每位在线的用户的服务就完成了。

References

加载评论