JavaScript

超轻量级php框架startmvc

Django+Vue实现WebSocket连接的示例代码

更新时间:2020-08-29 16:24:01 作者:startmvc
近期有一需求:前端页面点击执行任务,实时显示后端执行情况,思考一波;发现WebSocket最

近期有一需求:前端页面点击执行任务,实时显示后端执行情况,思考一波;发现 WebSocket 最适合做这件事。

效果

测试 ping www.baidu.com 效果

点击连接建立ws连接

后端实现

所需软件包

后端主要借助Django Channels 实现socket连接,官网文档链接

这里想实现每个连接进来加入组进行广播,所以还需要引入 channels-redis

pip


channels==2.2.0
channels-redis==2.4.0

引入

settings.py


INSTALLED_APPS = [
 'django.contrib.admin',
 'django.contrib.auth',
 'django.contrib.contenttypes',
 'django.contrib.sessions',
 'django.contrib.messages',
 'django.contrib.staticfiles',
 'rest_framework.authtoken',
 'rest_framework',
 ...
 'channels',
]

# Redis配置
REDIS_HOST = ENV_DICT.get('REDIS_HOST', '127.0.0.1')
REDIS_PORT = ENV_DICT.get('REDIS_PORT', 6379)
CHANNEL_LAYERS = {
 "default": {
 "BACKEND": "channels_redis.core.RedisChannelLayer",
 "CONFIG": {
 "hosts": [(REDIS_HOST, REDIS_PORT)],
 },
 },
}

代码

apps/consumers.py

新建一个消费处理

实现: 默认连接加入组,发送信息时的处理。


from channels.generic.websocket import WebsocketConsumer

class MyConsumer(WebsocketConsumer):
 def connect(self):
 """
 每个任务作为一个频道
 默认进入对应任务执行频道
 """
 self.job_name = self.scope['url_route']['kwargs']['job_name']
 self.job_group_name = 'job_%s' % self.job_name
 async_to_sync(self.channel_layer.group_add)(
 self.job_group_name,
 self.channel_name
 )
 self.accept()

 def disconnect(self, close_code):
 async_to_sync(self.channel_layer.group_discard)(
 self.job_group_name,
 self.channel_name
 )

 # job.message类型处理
 def job_message(self, event):

 # 默认发送收到信息
 self.send(text_data=event["text"])

apps/routing.py

ws类型路由

实现:ws/job/<job_name>由 MyConsumer 去处理。


from . import consumers
from django.urls import path
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.sessions import SessionMiddlewareStack

application = ProtocolTypeRouter({
 'websocket': SessionMiddlewareStack(
 URLRouter(
 [
 path('ws/job/<str:job_name>', consumers.MyConsumer)
 ]
 )
 ),
})

apps/views.py

在执行命令中获取 webSocket 消费通道,进行异步推送

  • 使用异步推送async_to_sync是因为在连接的时候采用的异步连接,所以推送必须采用异步推送。
  • 因为执行任务时间过长,启动触发运行时加入多线程,直接先返回ok,后端运行任务。

from subprocess import Popen,PIPE
import threading

def runPopen(job):
 """
 执行命令,返回popen
 """
 path = os.path
 Path = path.abspath(path.join(BASE_DIR, path.pardir))
 script_path = path.abspath(path.join(Path,'run.sh'))
 cmd = "sh %s %s" % (script_path, job)
 return Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)

def runScript(job):
 channel_layer = get_channel_layer()
 group_name = "job_%s" % job

 popen = runPopen(job)
 while True:
 output = popen.stdout.readline()
 if output == '' and popen.poll() is not None:
 break

 if output:
 output_text = str(output.strip())
 async_to_sync(
 channel_layer.group_send
 )(
 group_name, 
 {"type": "job.message", "text": output_text}
 )
 else:
 err = popen.stderr.readline()
 err_text = str(err.strip())
 async_to_sync(
 channel_layer.group_send
 )(
 group_name,
 {"type": "job.message", "text": err_text}
 )
 break

class StartJob(APIView): 
 def get(self, request, job=None):
 run = threading.Thread(target=runScript, args=(job,))
 run.start()
 return HttpResponse('ok')

apps/urls.py

get请求就能启动任务


urlpatterns = [
 ...
 path('start_job/<str:job>', StartJob.as_view())
]

前端实现

所需软件包


vue-native-websocket 

代码实现

plugins/vueNativeWebsocket.js


import Vue from 'vue'
import VueNativeSock from '../utils/socket/Main.js'

export default function ({ store }) {
 Vue.use(VueNativeSock, 'http://localhost:8000/ws/job', {connectManually: true,});
}

nuxt.config.js

配置文件引入, 这里我使用的是 nuxt 框架


 plugins: [ 
 { 
 src: '@/plugins/vueNativeWebsocket.js', 
 ***: false 
 },
 ],

封装 socket


export default (connection_url, option) => {
 // 事件
 let event = ['message', 'close', 'error', 'open'];

 // 拷贝选项字典
 let opts = Object.assign({}, option);

 // 定义实例字典
 let instance = {

 // socket实例
 socket: '',

 // 是否连接状态
 is_conncet: false,

 // 具体连接方法
 connect: function() {
 if(connection_url) {
 let scheme = window.location.protocol === 'https:' ? 'wss' : 'ws'
 connection_url = scheme + '://' + connection_url.split('://')[1];
 this.socket = new WebSocket(connection_url);
 this.initEvent();
 }else{
 console.log('wsurl為空');
 }
 },

 // 初始化事件
 initEvent: function() {
 for(let i = 0; i < event.length; i++){
 this.addListener(event[i]);
 }
 },

 // 判断事件
 addListener: function(event) {
 this.socket.addEventListener(event, (e) => {
 switch(event){
 case 'open':
 this.is_conncet = true;
 break;
 case 'close':
 this.is_conncet = false;
 break;
 }
 typeof opts[event] == 'function' && opts[event](e);
 });
 },

 // 发送方法,失败则回调
 send: function(data, closeCallback) {
 console.log('socket ---> ' + data)
 if(this.socket.readyState >= 2) {
 console.log('ws已经关闭');
 closeCallback && closeCallback();
 }else{
 this.socket.send(data);
 }
 }

 };

 // 调用连接方法
 instance.connect();
 return instance;
 }

index.vue

具体代码

x2Str 方法,因为后端返回的是bytes,格式 b'xxx' ,编写了方法对其进行转换。


<template>
 <div>

 <el-button type="primary" @click="runFunction" >执行</el-button>
 <el-button type="primary" @click="connectWebSock" >显示</el-button>

 <div class="socketView">
 <span v-for="i in socketMessage" :key="i">{{i}}</span>
 </div>
 </div>
</template>
<script>
 import R from '@/plugins/axios';
 import ws from '@/plugins/socket'
 export default {
 data() {
 return {
 webSocket: '',
 socketMessage: [],
 }
 },

 methods: {
 // 打开连接的处理
 openSocket(e) {
 if (e.isTrusted) {
 const h = this.$createElement;
 this.$notify({
 title: '提示',
 message: h('i', { style: 'color: teal'}, '已建立Socket连接')
 });
 }
 },

 // 连接时的处理
 listenSocket(e) {
 if (e.data){
 this.socketMessage.push(this.x2Str(e.data))
 }
 },

 // 连接webSocket
 connectWebSock() {
 let wsuri = process.env.BACKEND_URL + '/ws/job/' + this.selectFunctions
 this.webSocket = ws(wsuri, {
 open: e => this.openSocket(e),
 message: e => this.listenSocket(e),
 close: e => this.closeSocket(e)
 })
 },

 // 转码
 x2Str(str) {
 if (str) {
 let reg = new RegExp("(?<=^b').*(?='$)")
 let result = str.replace(/(?:\\x[\da-fA-F]{2})+/g, m =>
 decodeURIComponent(m.replace(/\\x/g, '%'))
 )
 return reg.exec(result)[0]
 }
 },

 // 执行方法
 runFunction() {
 R.myRequest('GET','api/start_job/' + this.selectFunctions, {}, {}).then((response) => {
 if (response.hasOwnProperty('response')){
 this.$message({
 type: 'error',
 message: '服务端返回错误,返回码:' + response.response.status 
 });
 }; 
 if (response.data == 'ok') {
 this.$message({
 type: 'success',
 message: '开始执行[' + this.selectFunctions + ']'
 });
 }
 });
 } 
 }
}
</script>

至此,实现前后端 websocket 通讯。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

Django Vue WebSocket连接