Life, half is memory, half is to continue.
WebSocket双向连接解析
By Vincent. @2025.6.20 • 4.4K
WebSocket双向连接解析

1核1G机器即可构建毫秒级延迟的行情接收系统。每分钟超百万笔委托如何实时接收处理?传统爬虫难以应对高频数据洪流,仅用标准WebSocket协议

与压缩算法解析。


完整Python代码

# -*- coding: utf-8 -*-
import time
import websocket
import zlib

def on_open(ws):
    """连接建立时订阅标的"""
    # 订阅贵州茅台Lv2和上证指数Lv1
    ws.send("all=lv2_600519,lv1_000001")  

def on_message(ws, message, opcode, flag):
    """处理推送数据"""
    timestamp = time.strftime('%H:%M:%S', time.localtime())
    
    # 文本消息(如心跳包)
    if opcode == websocket.ABNF.OPCODE_TEXT:
        print(f"[{timestamp}] 指令响应: {message}")
        
    # 二进制消息(行情数据需解压)
    elif opcode == websocket.ABNF.OPCODE_BINARY:
        try:
            # 使用zlib解压 (-MAX_WBITS处理头部校验问题)
            decompressed = zlib.decompress(message, -zlib.MAX_WBITS)  
            print(f"[{timestamp}] 行情推送: {decompressed.decode('utf-8')}")
        except Exception as e:
            print(f"解压异常: {str(e)}")

def on_error(ws, error):
    """错误处理"""
    print(f"连接异常: {str(error)}")

def on_close(ws, close_status_code, close_msg):
    """连接关闭"""
    print(f"连接关闭: {close_msg}({close_status_code})")

if __name__ == "__main__":
    # 替换实际服务器地址和token
    ws_url = "ws://your_quant_server/?token=your_auth_token"
    
    # 创建WebSocket连接
    ws = websocket.WebSocketApp(ws_url,
                              on_open=on_open,
                              on_message=on_message,
                              on_error=on_error,
                              on_close=on_close)
    
    # 启动事件循环
    ws.run_forever()

生产环境注意事项

  1. 网络稳定性

关键技术解析

1. WebSocket连接生命周期

2. 数据压缩处理

zlib.decompress(message, -zlib.MAX_WBITS)

3. 订阅协议格式


生产环境注意事项

  1. 网络稳定性
    添加心跳机制防止断连:
ws.run_forever(ping_interval=30, ping_timeout=10)
  1. 数据持久化
    建议将数据写入队列或数据库:
from queue import Queue
data_queue = Queue()
data_queue.put(decompressed.decode('utf-8'))
  1. 性能优化

扩展应用场景

  1. 实时价量分析
    结合TA-Lib
  2. 计算动态指标:
from talib import RSI

rsi = RSI(prices, timeperiod=14)
  1. 订单簿重建
    通过累计买卖盘数据构建市场深度模型
  2. 事件驱动策略
    基于逐笔成交检测大单异动