WebSocket双向连接解析
By Vincent. @2025.6.20 • 4.4K

1核1G机器即可构建毫秒级延迟的行情接收系统。每分钟超百万笔委托如何实时接收处理?传统爬虫难以应对高频数据洪流,仅用标准WebSocket协议
与压缩算法解析。
完整Python代码
# -*- coding: utf-8 -*-
import time
import websocket
import zlib
def on_open(ws):
"""连接建立时订阅标的"""
# 订阅贵州茅台Lv2和上证指数Lv1
ws.send("all=lv2_600519,lv1_000001")
def on_message(ws, message, opcode, flag):
"""处理推送数据"""
timestamp = time.strftime('%H:%M:%S', time.localtime())
# 文本消息(如心跳包)
if opcode == websocket.ABNF.OPCODE_TEXT:
print(f"[{timestamp}] 指令响应: {message}")
# 二进制消息(行情数据需解压)
elif opcode == websocket.ABNF.OPCODE_BINARY:
try:
# 使用zlib解压 (-MAX_WBITS处理头部校验问题)
decompressed = zlib.decompress(message, -zlib.MAX_WBITS)
print(f"[{timestamp}] 行情推送: {decompressed.decode('utf-8')}")
except Exception as e:
print(f"解压异常: {str(e)}")
def on_error(ws, error):
"""错误处理"""
print(f"连接异常: {str(error)}")
def on_close(ws, close_status_code, close_msg):
"""连接关闭"""
print(f"连接关闭: {close_msg}({close_status_code})")
if __name__ == "__main__":
# 替换实际服务器地址和token
ws_url = "ws://your_quant_server/?token=your_auth_token"
# 创建WebSocket连接
ws = websocket.WebSocketApp(ws_url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
# 启动事件循环
ws.run_forever()
生产环境注意事项
- 网络稳定性
关键技术解析
1. WebSocket连接生命周期
- on_open:建立连接后立即发送订阅请求
- on_message:核心数据处理逻辑
- on_error/on_close:异常处理和重连入口(建议扩展重试机制)
2. 数据压缩处理
zlib.decompress(message, -zlib.MAX_WBITS)
- 参数说明:
-zlib.MAX_WBITS
用于处理无zlib头部的压缩数据 - 常见错误:使用错误参数会导致
Error -3: invalid code lengths set
3. 订阅协议格式
lv1_
前缀:普通行情(委托五档)lv2_
前缀:Level2行情
- (委托五十档+逐笔成交)
- 多标的分隔:使用英文逗号分隔多个代码
生产环境注意事项
- 网络稳定性
添加心跳机制防止断连:
ws.run_forever(ping_interval=30, ping_timeout=10)
- 数据持久化
建议将数据写入队列或数据库:
from queue import Queue
data_queue = Queue()
data_queue.put(decompressed.decode('utf-8'))
- 性能优化
- 使用异步IO(如websockets库)
- 分离数据处理线程
- 批量写入降低IO压力
扩展应用场景
- 实时价量分析
结合TA-Lib - 计算动态指标:
from talib import RSI
rsi = RSI(prices, timeperiod=14)
- 订单簿重建
通过累计买卖盘数据构建市场深度模型 - 事件驱动策略
基于逐笔成交检测大单异动