0%

Django配置-channels-WS的ASGI服务器

Django配置-channels-WS的ASGI服务器

环境说明:

该环境几乎算是目前最新的版本的asgi服务器了

  • 系统环境:MacOS(M系列)

  • Python版本:3.12

  • Django版本:5.1.7

  • daphne版本:4.2.0

  • channels版本:4.2.2

  • asgiref版本:3.8.1

image-20250522090858536

项目结构:

├── MyPorject
│   ├── asgi.py
│   ├── routing.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── apps
│   ├── __pycache__
│   │   └── consumers.cpython-312.pyc
│   ├── consumers.py(位于主app下)
│   └── **其他app**
├── manage.py

特别说明:

channels 大于 3.0.5 必须配置daphne或者uvicorn使用

第一步:安装带达芙妮channels版本,以及其他环境

pip install channels[daphne]
# daphne==4.2.0 channels==4.2.2

# 安装Django
pip install Django==5.1.7

## 安装 asgiref
pip install asgiref==3.8.1

第二步:配置 MyPorject__settings.py

INSTALLED_APPS = [
"daphne",
"其他app"
]

## 开启 ASGI访问
WSGI_APPLICATION = 'MyPorject.wsgi.application'
# 添加 Channels 配置
ASGI_APPLICATION = 'MyPorject.asgi.application'

第三步:配置 MyPorject__asgi.py

import os
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
from . import routing

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MyPorject.settings')
django_asgi_app = get_asgi_application()

application = ProtocolTypeRouter({
"http": django_asgi_app,
"websocket": URLRouter(routing.websocket_urlpatterns)
})

第四步:新建 MyPorject__routing.py

# yourapp/routing.py
from django.urls import path, re_path
from apps import consumers

websocket_urlpatterns = [
# xxxx/ws/x1
path('ws', consumers.ChatConsumer.as_asgi()),
]

第五步:新建 MyPorject__apps__consumers.py

from channels.generic.websocket import AsyncWebsocketConsumer
from channels.exceptions import StopConsumer
import json


class ChatConsumer(AsyncWebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.clientData = None
self.SendDataJsonKeys = None

async def connect(self):
await self.accept()
async def disconnect(self, close_code=None):
raise StopConsumer()
async def receive(self, text_data=None, bytes_data=None):
try:
# 传递回来的参数的转换
self.clientData = json.loads(text_data)
print("传递过来参数了", self.clientData)
self.SendDataJsonKeys = self.clientData.keys() # 获取传递上来的参数
except Exception as e:
if text_data == "断开":
await self.send(f"{e}")
await self.send("网络连接断开!")
await self.close()
return