channels使用

摘要:
Channels还支持编写异步consumers以提高性能。AuthMiddlewareStack将使用对当前经过身份验证的用户的引用来填充连接的scope,类似于Django的AuthenticationMiddleware用当前经过身份验证的用户填充视图函数的请求对象。channellayerchannellayer是一种通信系统。任何有名称的channel都可以向channel发送消息。group是一组相关的channels。无法列举特定group中的channel。每个consumer实例都有一个自动生成的唯一的channel名称,因此可以通过channellayer进行通信。安装channels_redis,以便Channels知道如何调用redis。

安装django及channels

pip install channels

创建channels库根路由配置文件,根路由配置文件类似Django URLconf,它会告诉Channels当收到由Channes服务器发过来的Http请求时,应该执行什么代码:

# wssite/routing.py
from channels.routing import ProtocolTypeRouter

application = ProtocolTypeRouter({
    # (http->django views is added by default)
})

将 Channels 库添加到已安装的应用程序列表中。编辑 wssite/settings.py 文件并将 'channels' 添加到 INSTALLED_APPS 设置:

# mysite/settings.py
INSTALLED_APPS = [
    'channels',
    'chat',
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
]

配置根路由指向 Channels:

# wssite/settings.py
# Channels
ASGI_APPLICATION = 'wssite.routing.application'

现在已安装的应用程序中有 Channels, 它将控制 runserver 命令, 用 Channels 开发服务器替换标准的 Django 开发服务器。

consumer配置

在app中创建新文件consumer,并添加如下代码:

# app/consumers.py
from channels.generic.websocket import WebsocketConsumer
import json

class ChatConsumer(WebsocketConsumer):
    def connect(self):
        self.accept()

    def disconnect(self, close_code):
        pass

    def receive(self, text_data=None, bytes_data=None):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        self.send(text_data=json.dumps({
            'message': message

这是一个同步 WebSocket consumer, 它接受所有连接, 接收来自其客户端的消息, 并将这些消息回送到同一客户端。现在, 它不向同一个房间的其他客户端广播消息。

Channels 还支持编写异步 consumers 以提高性能。但是, 任何异步 consumers 都必须小心, 避免直接执行阻塞操作。

我们需要为 app 创建一个路由配置, 它有一个通往 consumer 的路由。创建新文件 app/routing.py。并写入以下代码:

# app/routing.py
from django.conf.urls import url

from . import consumers

websocket_urlpatterns = [
    url(r'^ws/chat/(?P<room_name>[^/]+)/$', consumers.ChatConsumer),
]

下一步是将根路由指向 chat.routing 模块。在 mysite/routing.py 中, 导入 AuthMiddlewareStack、URLRouter 和 chat.routing ;并在 ProtocolTypeRouter 列表中插入一个 "websocket" 键, 格式如下:

# app/routing.py
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import chat.routing

application = ProtocolTypeRouter({
    # (http->django views is added by default)
    'websocket': AuthMiddlewareStack(
        URLRouter(
            chat.routing.websocket_urlpatterns
        )
    ),
})

这个根路由配置指定,当与 Channels 开发服务器建立连接的时候, ProtocolTypeRouter 将首先检查连接的类型。如果是 WebSocket 连接 (ws://或 wss://), 则连接会交给 AuthMiddlewareStack。

AuthMiddlewareStack 将使用对当前经过身份验证的用户的引用来填充连接的 scope, 类似于 Django 的 AuthenticationMiddleware 用当前经过身份验证的用户填充视图函数的请求对象。然后连接将被给到 URLRouter。

根据提供的 url 模式, URLRouter 将检查连接的 HTTP 路径, 以将其路由指定到到特定的 consumer。

channel layer

channel layer 是一种通信系统。它允许多个 consumer 实例互相交谈, 以及与 Django 的其他部分进行通信。

channel layer 提供以下抽象:

channel 是可以发送消息的邮箱。每个 channel 都有一个名称。任何有名称的 channel 都可以向 channel 发送消息。

group 是一组相关的 channels。group 具有名称。任何具有名字的 group 都可以按名称向 group 中添加/删除 channel, 也可以向 group 中的所有 channel 发送消息。无法列举特定 group 中的 channel。

每个 consumer 实例都有一个自动生成的唯一的 channel 名称, 因此可以通过 channel layer 进行通信。

使用redis作为channel layer的后备存储。

安装 channels_redis, 以便 Channels 知道如何调用 redis。运行以下命令:

pip install channels_redis

在使用 channel layer 之前, 必须对其进行配置。编辑 wssite/settings.py 文件并将 CHANNEL_LAYERS 设置添加到底部:

# wssite/settings.py
# Channels
ASGI_APPLICATION = 'mysite.routing.application'
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}

现在我们有了一个 channel layer, 让我们在 ChatConsumer 中使用它。将以下代码放在 chat/consumers.py 中, 替换旧代码:

# app/consumers.py
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
import json

class ChatConsumer(WebsocketConsumer):
    def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']  # scope中包含有关其连接的信息且在app/routes.py中的url路由中获取'room_name'参数
        self.room_group_name = 'chat_%s' % self.room_name # 构建channels_group名称

        # Join room group
        async_to_sync(self.channel_layer.group_add)(
            self.room_group_name,
            self.channel_name
        )

        self.accept() # 用于接受websocket连接,不调用则表示拒绝接收连接

    def disconnect(self, close_code):
        # Leave room group
        async_to_sync(self.channel_layer.group_discard)(
            self.room_group_name,
            self.channel_name
        )

    # Receive message from WebSocket
    def receive(self, text_data=None, bytes_data=None):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # Send message to room group
        async_to_sync(self.channel_layer.group_send)(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message
            }
        )

    # Receive message from room group
    def chat_message(self, event):
        message = event['message']

        # Send message to WebSocket
        self.send(text_data=json.dumps({
            'message': message
        }))

异步模式

重写 ChatConsumer 使其变为异步的。在 app/consumers.py 中输入以下代码:

# app/consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
import json
class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = 'chat_%s' % self.room_name

        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    # Receive message from WebSocket
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # Send message to room group
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message
            }
        )

    # Receive message from room group
    async def chat_message(self, event):
        message = event['message']

        # Send message to WebSocket
        await self.send(text_data=json.dumps({
            'message': message
        }))

这些用于 ChatConsumer 的新代码与原始代码非常相似, 它们具有以下差异:

  • 现在 ChatConsumer 继承自 AsyncWebsocketConsumer 而不是 WebsocketConsumer。
  • 所有方法都是 async def, 而不仅仅是 def。
  • await 被用于调用执行 I/O 的异步函数。
  • 在 channel layer 上调用方法时, 不再需要 async_to_sync。

数据库访问

Django ORM是一段同步代码,因此如果想从异步代码访问它,需要进行特殊处理以确保其连接正确关闭。

如果你正在使用SyncConsumer或者基于它的任何东西 - 比如 JsonWebsocketConsumer- 你不需要做任何特别的事情,因为所有代码都已经在同步模式下运行,并且Channels将作为SyncConsumer代码的一部分为你做清理工作。

但是,如果要编写异步代码,则需要使用安全的同步上下文调用数据库方法database_sync_to_async

数据库连接

如果使用线程使用者(同步的),通道可能会比可能使用的通道打开更多的数据库连接 - 每个线程最多可以打开一个连接。

默认情况下,线程数设置为“CPU数* 5”,因此可以看到最多这个线程数。如果要更改它,请将ASGI_THREADS环境变量设置为希望允许的最大数量。

为了避免在连接中有太多线程空闲,可以改写代码以使用异步使用者,并且只在需要使用Django的ORM(使用database_sync_to_async)时才进入线程。

database_sync_to_async

要使用它,请在单独的函数或方法中编写ORM查询,然后database_sync_to_async像这样调用它:

from channels.db import database_sync_to_async

async def connect(self):
    self.username = await database_sync_to_async(self.get_name)()

def get_name(self):
    return User.objects.all()[0].name

还可以将它用作装饰器:

from channels.db import database_sync_to_async

async def connect(self):
    self.username = await self.get_name()

@database_sync_to_async
def get_name(self):
    return User.objects.all()[0].name

免责声明:文章转载自《channels使用》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇常用MySQL操作Spring Boot (三): ORM 框架 JPA 与连接池 Hikari下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

微信小程序开发四:接口

4.1 网络 发起请求wx.request接口,用于发起 HTTPS 请求。一个微信小程序,同时只能有5个网络请求连接。上传下载wx.uploadFile接口,用于将本地资源上传到开发者服务器。如页面通过 wx.chooseImage 等接口获取到一个本地资源的临时文件路径后,可通过此接口将本地资源上传到指定服务器。客户端发起一个 HTTPS POST...

C#异步编程之(三):深入 Async 和 Await 的实现及其成本

From: http://msdn.microsoft.com/zh-cn/magazine/hh456402.aspx异步性能:了解 Async 和 Await 的成本Stephen Toub 异步编程长时间以来一直都是那些技能高超、喜欢挑战自我的开发人员涉足的领域 — 这些人愿意花费时间,充满热情并拥有心理承受能力,能够在非线性的控制流程中不断地...

websocket实时监控画面

  Ajax轮询是通过特定的的时间间隔(如每1秒),由浏览器对服务器发出HTTP请求,然后由服务器返回最新的数据给客户端的浏览器。这种简单粗暴模式有一个明显的缺点,就是浏览器需要不断的向服务器发出请求,HTTP请求可能包含较长的头部,其中真正有效的数据可能只是很小的一部分,显然这样会浪费很多的带宽等资源(对于很多局域网内的企业应用,这个简单粗暴模式确实解决...

C# 实现WebSocket通信

  本实例可通过web网页端进行测试,下面直接上代码。   首先要在NuGet导入“Fleck”包,.net framework4以上版本都可以选择。 using System; using System.Collections.Generic; using System.Linq; using System.Threading; namespace...

Nginx反向代理WebSocket链接失败问题

问题记录:本地socket测试无误后部署发现 WebSocket connection to "xxx/xxx" failed  解决方案: 在nginx.conf的http模块添加如下内容 map $http_upgrade $connection_upgrade { default upgrade; '' close; } 其...

F5 关于websocket

版本问题:  https://support.f5.com/csp/article/K14754  12.1.0 以后能够处理 websocket 协议,之前的版本可以使用两个vs 来区分, http 协议一个  ws 协议 一个  .。如果需要  ssl 协议 那么就配置一个 wss 的vs 。   11.4.0   版本到 12.1.0 版本可以 配置...