mqttasgi: 适用于 Django 的 MQTT ASGI 协议服务器 - Openclaw Skills

作者:互联网

2026-04-13

AI教程

什么是 mqttasgi?

mqttasgi 是一个专门的 ASGI 协议服务器,旨在填补 MQTT(通过 paho-mqtt)与 Django Channels 之间的空白。就像 Daphne 服务于 HTTP 和 WebSockets 一样,mqttasgi 允许您的 Django 应用程序将 MQTT 视为原生协议。这使得开发人员可以编写订阅和发布 MQTT 主题的 Django consumer,同时保持对 Django ORM、Channel Layers 和测试套件的完整访问权限。

在构建需要数据持久化和实时协调的复杂物联网骨干网或家庭自动化系统时,该技能尤其有价值。通过将其集成到 Openclaw Skills 工作流中,您可以创建智能代理,通过标准的基于 Django 的架构直接与硬件设备、传感器和执行器交互。

下载入口:https://github.com/openclaw/skills/tree/main/skills/sivulich/mqttasgi

安装与下载

1. ClawHub CLI

从源直接安装技能的最快方式。

npx clawhub@latest install mqttasgi

2. 手动安装

将技能文件夹复制到以下位置之一

全局模式 ~/.openclaw/skills/ 工作区 /skills/

优先级:工作区 > 本地 > 内置

3. 提示词安装

将此提示词复制到 OpenClaw 即可自动安装。

请帮我使用 Clawhub 安装 mqttasgi。如果尚未安装 Clawhub,请先安装(npm i -g clawhub)。

mqttasgi 应用场景

  • 创建家庭自动化中心,根据运动传感器数据触发 Django ORM 事件。
  • 构建 AI 驱动的物联网响应程序,将传感器数据通过大语言模型(LLM)路由以确定自动化操作。
  • 监控能源消耗,具有实时阈值警报和数据库日志记录功能。
  • 使用针对不同物理区域或房间的独立 worker 协调多设备环境。
  • 管理基于计划的工业自动化(如花园灌溉),并与数据库支持的逻辑同步。
mqttasgi 工作原理
  1. mqttasgi 服务器初始化并使用指定的主机和端口配置建立与 MQTT 代理(如 Mosquitto)的持久连接。
  2. 当订阅的主题收到 MQTT 消息时,服务器将原始数据包转换为符合 ASGI 标准的事件。
  3. 事件通过 Django 的 ProtocolTypeRouter 路由到指定的 MqttConsumer。
  4. 在 consumer 内部,开发人员可以使用异步方法与数据库交互、向其他 channel 组发送消息或发布新的 MQTT 消息。
  5. 服务器管理连接的生命周期,支持清理会话、SSL/TLS 安全以及自动重连尝试。

mqttasgi 配置指南

通过 pip 安装包:

pip install mqttasgi

配置您的 asgi.py 以路由 mqtt 协议:

from channels.routing import ProtocolTypeRouter
from my_application.consumers import MyMqttConsumer

application = ProtocolTypeRouter({
    'mqtt': MyMqttConsumer.as_asgi(),
})

从命令行启动协议服务器:

mqttasgi -H localhost -p 1883 my_application.asgi:application

mqttasgi 数据架构与分类体系

该技能通过 ASGI 作用域内的结构化字典事件处理数据。以下是 MQTT 消息的内部模式:

属性 类型 描述
topic 字符串 接收消息或将发布消息的 MQTT 主题。
payload 字节 原始消息内容。
qos 整数 服务质量级别(0, 1 或 2)。
retain 布尔值 代理是否应保留该消息(仅限发布)。
type 字符串 内部事件类型:mqtt.connectmqtt.msgmqtt.pubmqtt.sub
name: mqttasgi
description: MQTT ASGI protocol server for Django — bridge MQTT messages to Django Channels consumers with full ORM, Channel Layers, and testing support. The perfect backbone for your home automation projects, IoT pipelines, and real-time device integrations.
version: 1.0.0
metadata:
  openclaw:
    emoji: "??"
    homepage: https://github.com/sivulich/mqttasgi

mqttasgi

mqttasgi is an ASGI protocol server that bridges MQTT (via paho-mqtt) and Django Channels, inspired by Daphne. It lets Django consumers subscribe/publish to MQTT topics with full ORM and Channel Layers support.

Supports: Django 3.2–5.x · Channels 3.x–4.x · paho-mqtt 1.x and 2.x · Python 3.9–3.13

Installation

pip install mqttasgi

Running the server

mqttasgi -H localhost -p 1883 my_application.asgi:application
Parameter Env variable Default Purpose
-H / --host MQTT_HOSTNAME localhost MQTT broker host
-p / --port MQTT_PORT 1883 MQTT broker port
-U / --username MQTT_USERNAME Broker username
-P / --password MQTT_PASSWORD Broker password
-c / --cleansession MQTT_CLEAN True MQTT clean session
-v / --verbosity VERBOSITY 0 Logging level (0–2)
-i / --id MQTT_CLIENT_ID MQTT client ID
-C / --cert TLS_CERT TLS certificate
-K / --key TLS_KEY TLS key
-S / --cacert TLS_CA TLS CA certificate
-SSL / --use-ssl MQTT_USE_SSL False SSL without cert auth
-T / --transport MQTT_TRANSPORT tcp Transport: tcp or websockets
-r / --retries MQTT_RETRIES 3 Reconnect retries (0 = unlimited)

All parameters can also be set via a .env file at the project root. CLI args take precedence over env vars.

asgi.py setup

import os
import django
from channels.routing import ProtocolTypeRouter
from my_application.consumers import MyMqttConsumer
from django.core.asgi import get_asgi_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_application.settings')
django.setup()

application = ProtocolTypeRouter({
    'http': get_asgi_application(),
    'mqtt': MyMqttConsumer.as_asgi(),
})

Writing a consumer

from mqttasgi.consumers import MqttConsumer

class MyMqttConsumer(MqttConsumer):

    async def connect(self):
        """Called when connected to the broker. Subscribe here."""
        await self.subscribe('my/topic', qos=2)

    async def receive(self, mqtt_message):
        """Called for each incoming MQTT message."""
        topic   = mqtt_message['topic']
        payload = mqtt_message['payload']   # bytes
        qos     = mqtt_message['qos']
        await self.publish('response/topic', payload, qos=1, retain=False)

    async def disconnect(self):
        """Called on broker disconnect. Clean up here."""
        await self.unsubscribe('my/topic')

Consumer API

Method Description
await self.subscribe(topic, qos) Subscribe to an MQTT topic
await self.unsubscribe(topic) Unsubscribe from an MQTT topic
await self.publish(topic, payload, qos=1, retain=False) Publish an MQTT message
self.scope ASGI scope dict (includes app_id, instance_type, and any consumer_parameters)

Channel Layers

# Outside the consumer (e.g. Django view or management command)
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
    "my.group",
    {"type": "my.custom.message", "text": "Hello from outside"}
)

# Inside the consumer
class MyMqttConsumer(MqttConsumer):

    async def connect(self):
        await self.subscribe('my/topic', qos=2)
        await self.channel_layer.group_add("my.group", self.channel_name)

    async def my_custom_message(self, event):
        # Handler name must match the `type` field (dots become underscores)
        print('Channel layer message:', event)

    async def receive(self, mqtt_message): ...
    async def disconnect(self): ...

Multiple workers (experimental)

Only the master consumer (instance_type='master', app_id=0) may spawn or kill workers.

class MasterConsumer(MqttConsumer):

    async def connect(self):
        # Spawn a worker with a unique app_id
        await self.spawn_worker(
            app_id=1,
            consumer_path='my_application.consumers.WorkerConsumer',
            consumer_params={'device_id': 'sensor-01'},
        )

    async def receive(self, mqtt_message):
        if condition:
            await self.kill_worker(app_id=1)

    async def disconnect(self): ...

Testing (no broker required)

MqttComunicator drives consumers directly through the ASGI interface — no running broker needed.

pytest.ini

[pytest]
asyncio_mode = auto

tests/conftest.py

import django
from django.conf import settings

def pytest_configure(config):
    if not settings.configured:
        settings.configure(
            SECRET_KEY='test-secret-key',
            INSTALLED_APPS=['channels'],
            DATABASES={},
            CHANNEL_LAYERS={
                'default': {
                    'BACKEND': 'channels.layers.InMemoryChannelLayer',
                }
            },
        )
        django.setup()

Writing tests

import pytest
from mqttasgi.testing import MqttComunicator  # note: one 'm' in Comunicator
from my_application.consumers import MyMqttConsumer

async def test_subscribe_on_connect():
    comm = MqttComunicator(MyMqttConsumer.as_asgi(), app_id=1)
    response = await comm.connect()          # returns first message from consumer
    assert response['type'] == 'mqtt.sub'
    assert response['mqtt']['topic'] == 'my/topic'
    await comm.disconnect()

async def test_publish_on_message():
    comm = MqttComunicator(MyMqttConsumer.as_asgi(), app_id=1)
    await comm.connect()
    await comm.publish('my/topic', b'hello', qos=1)
    response = await comm.receive_from()    # next message from consumer
    assert response['type'] == 'mqtt.pub'
    assert response['mqtt']['payload'] == b'hello'
    await comm.disconnect()

MqttComunicator API

Method Description
MqttComunicator(app, app_id, instance_type='worker', consumer_parameters=None) Create communicator
await comm.connect(timeout=1) Send mqtt.connect; returns first consumer response
await comm.publish(topic, payload, qos) Send mqtt.msg event to the consumer
await comm.receive_from(timeout=1) Receive next message the consumer sent
await comm.disconnect(timeout=1) Send mqtt.disconnect and wait for shutdown

Consumer responses have this shape:

{
    'type': 'mqtt.sub',   # or mqtt.pub / mqtt.usub
    'mqtt': {
        'topic': 'my/topic',
        'payload': b'...',   # only for mqtt.pub
        'qos': 1,
    }
}

Internal message types (for advanced use)

Server → Consumer: mqtt.connect, mqtt.msg, mqtt.disconnect

Consumer → Server: mqtt.pub, mqtt.sub, mqtt.usub, mqttasgi.worker.spawn, mqttasgi.worker.kill

Project ideas and examples

Home automation — motion-triggered lights

A motion sensor publishes to home/sensor/motion. A consumer listens and publishes a command to the light controller, logging every event to the Django ORM.

from mqttasgi.consumers import MqttConsumer
from myapp.models import MotionEvent

class LightAutomationConsumer(MqttConsumer):

    async def connect(self):
        await self.subscribe('home/sensor/motion', qos=1)

    async def receive(self, mqtt_message):
        room = mqtt_message['payload'].decode()
        await MotionEvent.objects.acreate(room=room)
        await self.publish(f'home/lights/{room}/set', b'on', qos=1)

    async def disconnect(self):
        await self.unsubscribe('home/sensor/motion')

AI-powered automation — ask Claude before acting

Route sensor data through Claude to decide what action to take. The consumer calls the Anthropic API and publishes the result back onto the MQTT bus.

import anthropic
from mqttasgi.consumers import MqttConsumer

client = anthropic.Anthropic()

class AIAutomationConsumer(MqttConsumer):

    async def connect(self):
        await self.subscribe('home/sensor/#', qos=1)

    async def receive(self, mqtt_message):
        topic   = mqtt_message['topic']
        payload = mqtt_message['payload'].decode()

        message = client.messages.create(
            model='claude-opus-4-6',
            max_tokens=64,
            messages=[{
                'role': 'user',
                'content': (
                    f'Sensor reading — topic: {topic}, value: {payload}. '
                    'Reply with only the MQTT topic and payload to publish, '
                    'separated by a space. Example: home/lights/living on'
                ),
            }],
        )
        response = message.content[0].text.strip().split(' ', 1)
        if len(response) == 2:
            out_topic, out_payload = response
            await self.publish(out_topic, out_payload.encode(), qos=1)

    async def disconnect(self):
        await self.unsubscribe('home/sensor/#')

Energy monitoring — store readings in Django, alert on threshold

Electricity sensors publish consumption data every 30 seconds. The consumer persists each reading and fires an alert if usage spikes.

from mqttasgi.consumers import MqttConsumer
from myapp.models import EnergyReading

ALERT_THRESHOLD_WATTS = 3000

class EnergyMonitorConsumer(MqttConsumer):

    async def connect(self):
        await self.subscribe('home/energy/consumption', qos=1)

    async def receive(self, mqtt_message):
        watts = float(mqtt_message['payload'])
        await EnergyReading.objects.acreate(watts=watts)
        if watts > ALERT_THRESHOLD_WATTS:
            await self.publish('home/alerts/energy', b'high_consumption', qos=2)

    async def disconnect(self):
        await self.unsubscribe('home/energy/consumption')

Multi-device coordination — workers per room

Spawn a dedicated worker for each room so subscriptions and logic stay isolated. The master consumer manages the worker lifecycle.

class MasterConsumer(MqttConsumer):

    ROOMS = ['living', 'bedroom', 'kitchen']

    async def connect(self):
        for i, room in enumerate(self.ROOMS, start=1):
            await self.spawn_worker(
                app_id=i,
                consumer_path='myapp.consumers.RoomConsumer',
                consumer_params={'room': room},
            )

    async def receive(self, mqtt_message): pass
    async def disconnect(self): pass


class RoomConsumer(MqttConsumer):

    async def connect(self):
        room = self.scope['room']
        await self.subscribe(f'home/{room}/#', qos=1)

    async def receive(self, mqtt_message):
        # Handle all topics for this room
        ...

    async def disconnect(self):
        room = self.scope['room']
        await self.unsubscribe(f'home/{room}/#')

Garden irrigation — schedule-aware automation

Combine Django's ORM with MQTT to only water the garden when the schedule says so and soil moisture is below a threshold.

from django.utils import timezone
from mqttasgi.consumers import MqttConsumer
from myapp.models import IrrigationSchedule

class IrrigationConsumer(MqttConsumer):

    async def connect(self):
        await self.subscribe('garden/sensor/moisture', qos=1)

    async def receive(self, mqtt_message):
        moisture = float(mqtt_message['payload'])
        now = timezone.now()
        scheduled = await IrrigationSchedule.objects.filter(
            active=True,
            start_hour=now.hour,
        ).aexists()

        if scheduled and moisture < 30.0:
            await self.publish('garden/valve/main', b'open', qos=2)

    async def disconnect(self):
        await self.unsubscribe('garden/sensor/moisture')

Common pitfalls

  • MqttComunicator.connect() returns the first message the consumer sends. If connect() does nothing (no subscribe, no publish), the call will time out — always subscribe or send something in connect().
  • The class is spelled MqttComunicator (one m) — this is an intentional (legacy) typo in the library.
  • Worker spawn/kill is only allowed from the master consumer (app_id=0). Calling it from a worker raises an error.
  • With mosquitto 2.x you need allow_anonymous true and an explicit listener line in mosquitto.conf for integration tests.
  • connect_max_retries=0 means retry forever with exponential back-off (capped at 30 s).

相关推荐