JavaScript

超轻量级php框架startmvc

django使用channels2.x实现实时通讯

更新时间:2020-08-06 12:24:01 作者:startmvc
一、背景在最近的项目中的一个需求是消息实时推送消息以及通知功能,项目使用django写的

一、背景

在最近的项目中的一个需求是消息实时推送消息以及通知功能,项目使用django写的所以决定采用django-channels来实现websocket进行实时通讯。目前官方已经更新到2.1版本,相对于老的channels 1.x版本有了很大变化,无论是使用方式还是功能,其中最大的变化莫过于2.x版本中带来的asyncio特性,可使用异步处理模式。本文内容将介绍channels2版本使用,由于项目django是1.11,其中也遇到了一些坑,比如在channels在处理一次请求后hang住然后报错,后面修改了下django1.11版本的一点源码得以解决,2.0版本应该不会有问题。

二、channels介绍

channels是以django插件的形式存在,它不仅能处理http请求,还提供对websocket、MQTT等长连接支持。不仅如此,channels在保留了原生django的同步和易用的特性上还带来了异步处理方式(channels2.X版本),并且将django自带的认证系统以及session集成到模块中,扩展性非常强。官方文档:https://channels.readthedocs.io/en/latest/index.html

三、安装以及安装需求

channels2.0最低django版本要求是1.11+,python3.5+。笔者的版本是django1.11,直接安装可能有问题,以下是测试通过的版本。

笔者的相关版本如下:


Django==1.11.10
channels==2.1.4
channels-redis==2.3.1
asgiref==2.1.6
asgi-redis==1.4.3

如果django版本比较高直接采用pip安装:


pip3 install channels
pip3 install channels-redis #可选的,官方推荐如果使用redis作为channel layer

redis安装可以参考博客:https://www.jb51.net/article/151522.htm

四、开始使用

一、配置settings.py

笔者采用的redis作为channel layer(关于其介绍请移步至https://channels.readthedocs.io/en/latest/topics/channel_layers.html),它是实现消息推送的核心,在项目的settings.py中:

注册channles app:


INSTALLED_APPS = [
 'django.contrib.admin',
 'django.contrib.auth',
 'django.contrib.contenttypes',
 'django.contrib.sessions',
 'django.contrib.messages',
 'django.contrib.staticfiles',
 'cmdb',
 'channels', #注册app
]

配置channels layer:


ASGI_APPLICATION = 'devops.routing.application'
CHANNEL_LAYERS = {
 'default': {
 'BACKEND': 'channels_redis.core.RedisChannelLayer',
 'CONFIG': {
 "hosts": [('10.1.210.33', 6379)], #需修改
 },
 },
}

二、路由配置

在项目settings文件同级目录中新增routing.py


#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import deploy.routing

application = ProtocolTypeRouter({
 'websocket': AuthMiddlewareStack(
 URLRouter(
 deploy.routing.websocket_urlpatterns# 指明路由文件是devops/routing.py
 )
 ),
})

最后在app里配置路由和对应的消费者,笔者这里是devops下的routing.py:


#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from django.conf.urls import url

from . import consumers

websocket_urlpatterns = [
 url(r'^ws/deploy/(?P<service_name>[^/]+)/$', consumers.DeployResult), #consumers.DeployResult 是该路由的消费者
]

项目目录结构如下:

三、编写webscoket消息处理方法(消费者)

首先说明,消费者是Channels代码的基本单元,当一个新的Socket进入的时候,Channels会根据路由表找到正确的消费者,以下代码中每个方法都可以看作一个消费者,他们消费不同的event,比如刚刚接受连接时候connect方法进行消费处理并接受连接,关闭websocket时候使用disconnect进行消费处理。

deploy/consumers.py:


#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from channels.generic.websocket import AsyncWebsocketConsumer
import json

class DeployResult(AsyncWebsocketConsumer):
 async def connect(self):
 self.service_uid = self.scope["url_route"]["kwargs"]["service_uid"]
 self.chat_group_name = 'chat_%s' % self.service_uid
 # 收到连接时候处理,
 await self.channel_layer.group_add(
 self.chat_group_name,
 self.channel_name
 )

 await self.accept()

 async def disconnect(self, close_code):
 # 关闭channel时候处理
 await self.channel_layer.group_discard(
 self.chat_group_name,
 self.channel_name
 )

 # 收到消息
 async def receive(self, text_data):
 text_data_json = json.loads(text_data)
 message = text_data_json['message']
 print("收到消息--》",message)
 # 发送消息到组
 await self.channel_layer.group_send(
 self.chat_group_name,
 {
 'type': 'client.message',
 'message': message
 }
 )

 # 处理客户端发来的消息
 async def client_message(self, event):
 message = event['message']
 print("发送消息。。。",message)
 # 发送消息到 WebSocket
 await self.send(text_data=json.dumps({
 'message': message
 }))

以上代码部分说明:

1.self.scope是单个连接传入的详细信息,其中包含了请求的session、以及django认证系统中的用户信息等;

2.async...await 是python3.5之后的新异步特性,基于asyncio模块;

四、发起webscoket请求

利用js发起websocket请求


function InitWebSocket() {
 var websocket = new WebSocket( 
 'ws://' + window.location.host + '/ws/deploy/tasks/' );

 websocket.onmessage = function (e) {
 var data = JSON.parse(e.data);
 var message = '\n' + data['message'];
 document.querySelector('#deploy-res').innerText += (message + '\n');
 };
 }

五、发送消息到channel 无论是消息的推送或者消息的接受,都是经过channel layer进行传输,以下是发送消息示例,


from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync


channel_layer = get_channel_layer()
def send_channel_msg(channel_name, msg):
 """
 send msg to channel
 :param channel_name: 
 :param msg: 
 :return: 
 """
 async_to_sync(channel_layer.group_send)(channel_name,
 {"type": "deploy.run", "text": msg})

六、生产部署

大多数django的应用部署方式都采用的是nginx+uwsgi进行部署,当django集成channels时候,由于uwsgi不能处理websocket请求,所以我们需要asgi服务器来处理websocket请求,官方推荐使用daphne。下一篇文章将介绍nginx+supervisor+daphne+uwsgi进行生产部署。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

django channels实时通讯 django channels django 实时通讯