Skip to content

外置适配器开发

除了内置适配器,LinkZone 支持通过 Node.js 和 Python 开发自定义适配器,接入新的通信平台。

适配器的工作方式

外置适配器通过 Unix Socket / TCP 与框架核心通信,SDK 封装了底层协议。适配器需要实现两个核心能力:

  1. 接收消息:从外部平台接收消息,通过 pushEvent 推送到框架
  2. 发送消息:框架调用 send 方法,将回复发送到外部平台
外部平台 ←→ 你的适配器代码 ←→ SDK ←→ Unix Socket ←→ 框架核心

项目结构

Node.js

ecosystems/nodejs/adapters/
  my-adapter/
    index.js          # 适配器入口

Python

ecosystems/python/adapters/
  my-adapter/
    __init__.py       # 适配器入口

元信息完整字段

元信息是适配器注册时提供给框架的描述数据,框架据此识别和管理适配器。

字段类型必填默认值说明
namestring适配器唯一标识,用于框架内部引用,不可重复
platformstring"custom"平台标识,决定消息路由和平台特性识别
versionstring"1.0.0"版本号,语义化版本格式
typestring"adapter"组件类型,适配器固定为 "adapter"
descriptionstring""适配器描述,用于管理后台展示
categorystring""分类,如 "IM""IoT""API"
authorstring""作者
homepagestring""主页 URL
licensestring""许可证
iconstring""图标 URL 或名称
tagsstring[][]标签,如 ["qq", "im"]
dependenciesstring[][]依赖的其他组件名称列表
prioritynumber0优先级,数值越大越优先处理
config_schemaobject{}配置项定义,框架据此生成配置界面和校验
extraobject{}扩展字段,存储自定义元数据
is_publicboolfalse是否公开(市场可见)
is_encryptedbooltrue代码是否加密
file_pathstring""文件路径(框架自动填充)
marketboolfalse是否上架市场
enable_metricsboolfalse是否启用性能指标收集
enable_health_checkboolfalse是否启用健康检查
health_check_intervalstring"30s"健康检查间隔
enable_cacheboolfalse是否启用缓存扩展
enable_retryboolfalse是否启用重试扩展
lifecycle_modestring"persistent"生命周期模式:persistent(常驻)、transient(临时)、loaded(按需加载)
ownerstring"nodejs-runtime" / "python-runtime"所属运行时
stagenumber0启动阶段,数值越小越早启动
adaptersstring[][]依赖的适配器列表
event_typesstring[]["message"]订阅的事件类型
is_serviceboolfalse是否为服务型组件
cronstring""定时任务 Cron 表达式
listen_onlyboolfalse是否仅监听模式(不处理消息)
permission_levelnumber1所需权限等级(1-7,1=普通用户,6=管理员,7=超级管理员)
toolobject/nullnullAI 工具定义(适配器一般不用)
ai_triggerableboolfalse是否可被 AI 触发
ai_trigger_usagestring""AI 触发时的用途描述
ai_trigger_formatstring""AI 触发时的参数格式描述
ai_trigger_argsobject{}AI 触发时的参数定义

config_schema 详解

config_schema 定义适配器的可配置项,框架据此生成配置界面和校验规则:

javascript
config_schema: {
    api_token: {
        type: 'string',           // 类型:string / number / bool / select
        label: 'API Token',       // 显示名称
        default: '',              // 默认值
        description: '平台 API Token',  // 说明
        required: true            // 是否必填(可选)
    },
    max_retries: {
        type: 'number',
        label: '最大重试次数',
        default: 3,
        description: 'API 请求最大重试次数'
    },
    sandbox: {
        type: 'bool',
        label: '沙箱模式',
        default: false,
        description: '是否使用沙箱环境'
    },
    mode: {
        type: 'select',
        label: '运行模式',
        default: 'websocket',
        description: '连接方式',
        options: [
            { label: 'WebSocket', value: 'websocket' },
            { label: 'HTTP 长轮询', value: 'long_polling' }
        ]
    }
}

框架会自动为所有组件追加 log_level 配置项。

完整示例

Node.js

javascript
const { Adapter, LinkZone } = require('linkzone-sdk');

class TelegramAdapter extends Adapter {
    constructor() {
        super({
            name: 'telegram',
            platform: 'telegram',
            version: '1.0.0',
            description: 'Telegram Bot 适配器',
            category: 'IM',
            author: 'Your Name',
            license: 'MIT',
            tags: ['telegram', 'im'],
            priority: 80,
            enable_metrics: true,
            enable_health_check: true,
            health_check_interval: '30s',
            config_schema: {
                bot_token: {
                    type: 'string',
                    label: 'Bot Token',
                    default: '',
                    description: 'Telegram Bot Token(从 @BotFather 获取)'
                },
                api_base: {
                    type: 'string',
                    label: 'API 地址',
                    default: 'https://api.telegram.org',
                    description: 'Telegram API 地址'
                },
                max_connections: {
                    type: 'number',
                    label: '最大连接数',
                    default: 10,
                    description: 'Webhook 最大连接数'
                }
            }
        });
        this.pollingTimer = null;
    }

    async onStart() {
        const config = await this.getConfig();
        this.botToken = config.bot_token;
        this.apiBase = config.api_base || 'https://api.telegram.org';

        if (!this.botToken) {
            throw new Error('未配置 bot_token');
        }

        // 启动长轮询
        this._startPolling();
        LinkZone.logger.info('telegram', '适配器已启动');
    }

    async onStop() {
        if (this.pollingTimer) {
            clearInterval(this.pollingTimer);
            this.pollingTimer = null;
        }
        LinkZone.logger.info('telegram', '适配器已停止');
    }

    async send(msg) {
        const { receiver_id, content, segments } = msg;
        let text = content;

        // 将消息段转换为 Telegram 格式
        if (segments && segments.length > 0) {
            text = this._convertSegments(segments);
        }

        const url = `${this.apiBase}/bot${this.botToken}/sendMessage`;
        const resp = await fetch(url, {
            method: 'POST',
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify({
                chat_id: receiver_id,
                text: text,
                parse_mode: 'HTML'
            })
        });

        const data = await resp.json();
        return String(data.result?.message_id || '');
    }

    async deleteMessage(messageId) {
        const config = await this.getConfig();
        const url = `${this.apiBase}/bot${config.bot_token}/deleteMessage`;
        // 需要从存储中获取 chat_id
        const chatId = await this.getData(`chat:${messageId}`);
        if (!chatId) return;

        await fetch(url, {
            method: 'POST',
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify({ chat_id: chatId, message_id: parseInt(messageId) })
        });
    }

    async doAction(action, params) {
        switch (action) {
            case 'kick':
                return this._kickMember(params);
            case 'ban':
                return this._banMember(params);
            default:
                throw new Error(`不支持的动作: ${action}`);
        }
    }

    // 长轮询获取更新
    async _startPolling() {
        let offset = 0;
        this.pollingTimer = setInterval(async () => {
            try {
                const url = `${this.apiBase}/bot${this.botToken}/getUpdates`;
                const resp = await fetch(url, {
                    method: 'POST',
                    headers: { 'Content-Type': 'application/json' },
                    body: JSON.stringify({ offset, timeout: 10 })
                });
                const data = await resp.json();

                for (const update of data.result || []) {
                    offset = update.update_id + 1;
                    if (update.message) {
                        this._handleTelegramMessage(update.message);
                    }
                }
            } catch (err) {
                LinkZone.logger.error('telegram', '轮询失败: ' + err.message);
            }
        }, 3000);
    }

    // 将 Telegram 消息推送到框架
    _handleTelegramMessage(msg) {
        this.pushEvent({
            type: 'message',
            platform: 'telegram',
            botId: this.botToken.split(':')[0],
            senderId: String(msg.from.id),
            senderName: msg.from.first_name,
            receiverId: String(msg.chat.id),
            receiverType: msg.chat.type === 'group' ? 'group' : 'private',
            message: msg.text || '',
            segments: [],
            messageId: String(msg.message_id),
            extra: {
                telegram: {
                    chat_type: msg.chat.type,
                    is_bot: msg.from.is_bot
                }
            }
        });
    }

    _convertSegments(segments) {
        return segments.map(seg => {
            switch (seg.type) {
                case 'text': return seg.data.text || '';
                case 'image': return `[图片](${seg.data.url})`;
                case 'at': return `@${seg.data.qq}`;
                default: return '';
            }
        }).join('');
    }

    async _kickMember(params) { /* ... */ }
    async _banMember(params) { /* ... */ }
}

module.exports = TelegramAdapter;

Python

python
from linkzone import Adapter, LinkZone, create_adapter
import threading
import time
import json
import urllib.request


class TelegramAdapter(Adapter):
    def __init__(self):
        super().__init__(
            {
                "name": "telegram",
                "platform": "telegram",
                "version": "1.0.0",
                "description": "Telegram Bot 适配器",
                "category": "IM",
                "author": "Your Name",
                "license": "MIT",
                "tags": ["telegram", "im"],
                "priority": 80,
                "enable_metrics": True,
                "enable_health_check": True,
                "health_check_interval": "30s",
                "config_schema": {
                    "bot_token": {
                        "type": "string",
                        "label": "Bot Token",
                        "default": "",
                        "description": "Telegram Bot Token",
                    },
                    "api_base": {
                        "type": "string",
                        "label": "API 地址",
                        "default": "https://api.telegram.org",
                        "description": "Telegram API 地址",
                    },
                },
            }
        )
        self._polling_thread = None
        self._running = False

    def on_start(self):
        config = self.get_config()
        self.bot_token = config.get("bot_token", "")
        self.api_base = config.get("api_base", "https://api.telegram.org")

        if not self.bot_token:
            raise Exception("未配置 bot_token")

        self._running = True
        self._polling_thread = threading.Thread(target=self._poll, daemon=True)
        self._polling_thread.start()
        LinkZone.logger.info("telegram", "适配器已启动")

    def on_stop(self):
        self._running = False
        LinkZone.logger.info("telegram", "适配器已停止")

    def send(self, msg):
        receiver_id = msg.get("receiver_id", "")
        content = msg.get("content", "")

        url = f"{self.api_base}/bot{self.bot_token}/sendMessage"
        data = json.dumps(
            {"chat_id": receiver_id, "text": content, "parse_mode": "HTML"}
        ).encode()

        req = urllib.request.Request(
            url, data=data, headers={"Content-Type": "application/json"}
        )
        with urllib.request.urlopen(req) as resp:
            result = json.loads(resp.read())

        return str(result.get("result", {}).get("message_id", ""))

    def delete_message(self, message_id):
        pass

    def do_action(self, action, params):
        if action == "kick":
            return self._kick_member(params)
        raise Exception(f"不支持的动作: {action}")

    def _poll(self):
        offset = 0
        while self._running:
            try:
                url = f"{self.api_base}/bot{self.bot_token}/getUpdates"
                data = json.dumps({"offset": offset, "timeout": 10}).encode()
                req = urllib.request.Request(
                    url, data=data, headers={"Content-Type": "application/json"}
                )
                with urllib.request.urlopen(req, timeout=15) as resp:
                    result = json.loads(resp.read())

                for update in result.get("result", []):
                    offset = update["update_id"] + 1
                    if "message" in update:
                        self._handle_message(update["message"])
            except Exception as e:
                LinkZone.logger.error("telegram", f"轮询失败: {e}")
            time.sleep(3)

    def _handle_message(self, msg):
        self.push_event(
            {
                "type": "message",
                "platform": "telegram",
                "bot_id": self.bot_token.split(":")[0],
                "sender_id": str(msg["from"]["id"]),
                "sender_name": msg["from"].get("first_name", ""),
                "receiver_id": str(msg["chat"]["id"]),
                "receiver_type": "group" if msg["chat"]["type"] == "group" else "private",
                "message": msg.get("text", ""),
                "segments": [],
                "message_id": str(msg["message_id"]),
                "extra": {"telegram": {"chat_type": msg["chat"]["type"]}},
            }
        )

    def _kick_member(self, params):
        pass


create_adapter(TelegramAdapter)

核心 API

必须实现的方法

send(msg)

框架调用此方法将回复消息发送到外部平台。

msg 参数:

字段类型说明
receiver_idstring接收者 ID(用户 ID 或群 ID)
receiver_typestringprivate(私聊)或 group(群聊)
contentstring消息文本内容
segmentsarray消息段(富文本内容)
bot_idstring机器人 ID
extraobject附加数据

返回值: 发送成功后的消息 ID(string)

onStart()

适配器启动时调用,用于初始化连接、读取配置等。

onStop()

适配器停止时调用,用于清理连接、释放资源等。

可选实现的方法

deleteMessage(messageId)

撤回指定消息。如果不实现,框架会返回默认错误。

doAction(action, params)

执行平台特定动作(踢人、禁言等)。如果不实现,框架会返回默认错误。

推送事件

pushEvent(event)

将外部平台收到的消息推送到框架:

event 参数:

字段类型必填说明
typestring事件类型,默认 "message"
platformstring平台标识,默认使用元信息中的 platform
bot_idstring机器人 ID
timestampnumber时间戳,默认当前时间
sender_idstring发送者 ID
sender_namestring发送者名称
receiver_idstring接收者 ID
receiver_typestringprivategroup
group_idstring群 ID(群聊时设置)
group_namestring群名称
messagestring消息文本
segmentsarray消息段
message_idstring消息 ID
sub_typestring子类型
extraobject附加数据

消息段格式

消息段用于表示富文本内容,支持以下类型:

javascript
// 文本
{ type: 'text', data: { text: '你好' } }

// 图片
{ type: 'image', data: { url: 'https://example.com/img.png', file: 'https://example.com/img.png' } }

// @某人
{ type: 'at', data: { qq: '123456' } }

// 回复
{ type: 'reply', data: { id: 'msg-123' } }

// 表情
{ type: 'face', data: { id: '178' } }

// 语音
{ type: 'voice', data: { url: 'https://example.com/voice.silk', file: 'https://example.com/voice.silk' } }

// 视频
{ type: 'video', data: { url: 'https://example.com/video.mp4', file: 'https://example.com/video.mp4' } }

// JSON 卡片
{ type: 'json', data: { data: { ... } } }

// 音乐
{ type: 'music', data: { type: 'qq', url: '...', audio: '...', title: '...', content: '...', image: '...' } }

数据存储

适配器可以使用框架提供的键值存储:

javascript
// 写入
await this.setData('key', { foo: 'bar' });

// 读取
const value = await this.getData('key', defaultValue);

// 删除
await this.deleteData('key');

// 列出所有 key
const keys = await this.listData();

配置管理

javascript
// 获取当前配置
const config = await this.getConfig();

// 更新配置
await this.setConfig({ api_token: 'new-token' });

HTTP 路由注册

适配器可以注册 HTTP 路由,用于接收 Webhook 回调:

javascript
// Node.js
await this.registerRoute('/webhook/my-platform', async (req) => {
    this.onExternalMessage(req.body);
    return { status: 200, body: { ok: true } };
}, 'POST');

// 取消注册
await this.unregisterRoute('/webhook/my-platform', 'POST');
python
# Python
self.register_route("/webhook/my-platform", self._handle_webhook, "POST")

def _handle_webhook(self, req):
    self.on_external_message(req["body"])
    return {"status": 200, "body": {"ok": True}}

WebSocket 注册

javascript
// Node.js
await this.registerWebSocket('/ws/my-platform', {
    onConnect: (connId) => { /* 连接建立 */ },
    onMessage: (connId, data) => { /* 收到消息 */ return responseData; },
    onDisconnect: (connId) => { /* 连接断开 */ }
});

// 向客户端发送消息
await this.sendToWebSocket(connId, data);

定时任务

javascript
// 注册定时任务
await this.registerCron('health-check', '*/5 * * * *', () => {
    // 每 5 分钟执行一次
    this.checkConnection();
});

// 取消定时任务
await this.unregisterCron('health-check');

日志

javascript
// Node.js
LinkZone.logger.debug('telegram', '调试信息');
LinkZone.logger.info('telegram', '普通信息');
LinkZone.logger.warn('telegram', '警告信息');
LinkZone.logger.error('telegram', '错误信息');
python
# Python
LinkZone.logger.debug("telegram", "调试信息")
LinkZone.logger.info("telegram", "普通信息")
LinkZone.logger.warn("telegram", "警告信息")
LinkZone.logger.error("telegram", "错误信息")

性能指标

javascript
// 手动上报指标(启用 enable_metrics 后自动上报消息处理指标)
await this.reportMetrics(durationMs, errorOrNull);

快捷创建

SDK 提供了快捷创建函数:

Node.js

javascript
const { createAdapter } = require('linkzone-sdk');

module.exports = createAdapter(
    {
        name: 'simple-adapter',
        platform: 'simple',
        version: '1.0.0'
    },
    async (msg) => {
        // send 方法实现
        console.log('发送消息:', msg.content);
        return 'msg-id';
    }
);

Python

python
from linkzone import create_adapter

class SimpleAdapter(Adapter):
    def __init__(self):
        super().__init__({"name": "simple-adapter", "platform": "simple", "version": "1.0.0"})

    def send(self, msg):
        print(f"发送消息: {msg.get('content')}")
        return "msg-id"

create_adapter(SimpleAdapter)

与 Go 核心 Adapter 接口的对应关系

Go 核心定义了 Adapter 接口,外置 SDK 通过 IPC 协议实现了等价功能:

Go 接口方法SDK 方法说明
Send(msg)send(msg)发送消息
DeleteMessage(id)deleteMessage(id)撤回消息
DoAction(action, params)doAction(action, params)执行动作
OnStart()onStart()启动
OnStop()onStop()停止
OnReload()自动处理重载(SDK 自动重读配置)
SetConfig(config)setConfig(config)更新配置
GetConfigMap()getConfig()获取配置
SetData/GetDatasetData/getData数据存储
IsConnected()心跳机制连接状态(SDK 通过心跳维护)
AddFilter/Match过滤器(外置适配器暂不支持)

Go 核心还定义了扩展接口:

Go 接口SDK 支持说明
RoutableAdapterregisterRouteHTTP 路由注册
StreamAdapterregisterWebSocketWebSocket 支持

基于 MIT 许可发布 | QQ 群:581485581 点击加入