Django配置-channels-WS的ASGI服务器
环境说明:
该环境几乎算是目前最新的版本的asgi服务器了
系统环境:MacOS(M系列)
Python版本:3.12
Django版本:5.1.7
daphne版本:4.2.0
channels版本:4.2.2
asgiref版本:3.8.1

项目结构:
├── MyPorject │ ├── asgi.py │ ├── routing.py │ ├── settings.py │ ├── urls.py │ └── wsgi.py ├── apps │ ├── __pycache__ │ │ └── consumers.cpython-312.pyc │ ├── consumers.py(位于主app下) │ └── **其他app** ├── manage.py
|
特别说明:
channels 大于 3.0.5 必须配置daphne或者uvicorn使用
第一步:安装带达芙妮的 channels版本,以及其他环境
pip install channels[daphne]
pip install Django==5.1.7
pip install asgiref==3.8.1
|
第二步:配置 MyPorject__settings.py
INSTALLED_APPS = [ "daphne", "其他app" ]
WSGI_APPLICATION = 'MyPorject.wsgi.application'
ASGI_APPLICATION = 'MyPorject.asgi.application'
|
第三步:配置 MyPorject__asgi.py
import os from channels.routing import ProtocolTypeRouter, URLRouter from django.core.asgi import get_asgi_application from . import routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MyPorject.settings') django_asgi_app = get_asgi_application()
application = ProtocolTypeRouter({ "http": django_asgi_app, "websocket": URLRouter(routing.websocket_urlpatterns) })
|
第四步:新建 MyPorject__routing.py
from django.urls import path, re_path from apps import consumers
websocket_urlpatterns = [ path('ws', consumers.ChatConsumer.as_asgi()), ]
|
第五步:新建 MyPorject__apps__consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer from channels.exceptions import StopConsumer import json
class ChatConsumer(AsyncWebsocketConsumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.clientData = None self.SendDataJsonKeys = None async def connect(self): await self.accept() async def disconnect(self, close_code=None): raise StopConsumer() async def receive(self, text_data=None, bytes_data=None): try: self.clientData = json.loads(text_data) print("传递过来参数了", self.clientData) self.SendDataJsonKeys = self.clientData.keys() except Exception as e: if text_data == "断开": await self.send(f"{e}") await self.send("网络连接断开!") await self.close() return
|