版本: v3.3.1
文档创建日期: 2025-12-24
目的: 为系统重构改造提供全面的架构参考
QuantMesh 是一个毫秒级高频加密货币做市商系统,专注于永续合约市场的做多网格交易策略。
- ✅ 多交易所支持(Binance、Bitget、Gate.io)
- ✅ 基于网格的自动做市策略
- ✅ WebSocket 实时价格和订单流
- ✅ 智能仓位管理(超级槽位系统)
- ✅ 主动风控监控(成交量异常检测)
- ✅ 订单清理与对账机制
- ✅ 持仓安全性检查
- 语言: Go 1.21+
- 配置管理: YAML
- WebSocket: gorilla/websocket
- 限流: golang.org/x/time/rate
- 并发模型: goroutine + channel + sync.Map
✅ 全局唯一的价格流(PriceMonitor)
✅ WebSocket 是唯一的价格来源(不使用 REST API 轮询)
✅ 所有组件通过 priceMonitor.GetLastPrice() 获取价格
❌ 禁止在其他地方独立启动价格流
架构意义:
- 避免价格不一致
- 减少 API 调用,防止触发限流
- 毫秒级系统无法容忍 REST API 延迟
启动顺序:
1️⃣ 启动订单流(StartOrderStream)
2️⃣ 下单(PlaceOrder)
3️⃣ 避免错过成交推送
反模式:
❌ 先下单,后启动订单流 → 可能错过成交推送
✅ 先启动订单流,再下单 → 确保成交推送不丢失传统网格: 固定数量买入(如每次0.01 BTC)
QuantMesh: 固定金额买入(如每次30 USDT)
优势:
- 资金利用率更可控
- 适配不同价格区间
- 方便资金管理和风控
槽位状态:
- FREE: 空闲,可操作
- PENDING: 等待下单确认
- LOCKED: 已锁定,有活跃订单
作用:
- 防止并发重复下单
- 避免同一槽位重复买入/卖出
- 确保订单与持仓的一致性
quantmesh_platform/
├── main.go # 主程序入口,组件编排
│
├── config/ # 配置管理
│ └── config.go # YAML配置加载与验证
│
├── exchange/ # 交易所抽象层(核心)
│ ├── interface.go # IExchange 统一接口
│ ├── factory.go # 工厂模式创建交易所实例
│ ├── types.go # 通用数据结构
│ ├── wrapper_*.go # 适配器(包装各交易所)
│ ├── binance/ # 币安实现
│ ├── bitget/ # Bitget实现
│ └── gate/ # Gate.io实现
│
├── logger/ # 日志系统
│ └── logger.go # 文件日志 + 控制台日志
│
├── monitor/ # 价格监控
│ └── price_monitor.go # 全局唯一价格流
│
├── order/ # 订单执行层
│ └── executor_adapter.go # 订单执行器(限流+重试)
│
├── position/ # 仓位管理(核心)
│ └── super_position_manager.go # 超级槽位管理器
│
├── safety/ # 安全与风控
│ ├── safety.go # 启动前安全检查
│ ├── risk_monitor.go # 主动风控(K线监控)
│ ├── reconciler.go # 持仓对账
│ └── order_cleaner.go # 订单清理
│
└── utils/ # 工具函数
└── orderid.go # 自定义订单ID生成
1. 加载主配置(**`app_config`** / 可选首参 YAML)
↓
2. 创建交易所实例 (factory.go)
↓
3. 启动价格监控 (PriceMonitor.Start)
├── WebSocket 连接
└── 等待首次价格推送
↓
4. 持仓安全性检查 (safety.CheckAccountSafety)
├── 验证账户余额
├── 验证杠杆倍数
└── 计算最大可持仓数
↓
5. 启动订单流 (exchange.StartOrderStream)
├── 监听订单成交
└── 回调 → SuperPositionManager.OnOrderUpdate
↓
6. 初始化仓位管理器 (SuperPositionManager.Initialize)
├── 设置价格锚点
├── 创建初始买单槽位
└── 批量下单
↓
7. 启动对账器 (Reconciler.Start)
↓
8. 启动订单清理器 (OrderCleaner.Start)
↓
9. 启动风控监控 (RiskMonitor.Start)
├── 加载历史K线
├── 启动K线流
└── 实时检测成交量异常
↓
10. 价格驱动交易循环
├── 监听价格变化
├── 风控检查
└── 调整订单窗口 (AdjustOrders)
Exchange WebSocket
↓
PriceMonitor.updatePrice()
↓
latestPriceChange (atomic.Value)
↓
periodicPriceSender (定期推送)
↓
priceChangeCh (channel)
↓
main.go 监听协程
↓
风控检查 (RiskMonitor.IsTriggered)
├── ❌ 触发 → 撤销所有买单,暂停交易
└── ✅ 正常 → SuperPositionManager.AdjustOrders()
Exchange WebSocket (订单更新)
↓
main.go 回调函数
↓
反射提取字段 (解决匿名结构体问题)
↓
position.OrderUpdate
↓
SuperPositionManager.OnOrderUpdate()
├── 匹配槽位 (通过 ClientOrderID 或 OrderID)
├── 更新槽位状态
├── FILLED → 创建卖单
└── CANCELED → 重置槽位
价格变化
↓
AdjustOrders(newPrice)
↓
遍历所有槽位
↓
┌─────────────────────────────────┐
│ 槽位类型判断 │
├─────────────────────────────────┤
│ 1. 空槽位 (无订单,无持仓) │
│ → 检查是否在买入窗口 │
│ → 下买单 │
│ │
│ 2. 有买单 (等待成交) │
│ → 检查是否超出窗口 │
│ → 撤单 │
│ │
│ 3. 有持仓 (等待卖出) │
│ → 检查是否有卖单 │
│ → 无卖单 → 下卖单 │
│ → 有卖单 → 检查价格 │
│ │
│ 4. 有卖单 (等待成交) │
│ → 检查是否需要调价 │
│ → 撤单并重新下单 │
└─────────────────────────────────┘
- 接口:
IExchange统一所有交易所操作 - 工厂:
NewExchange()根据配置创建实例 - 适配器:
wrapper_*.go包装各交易所实现
type IExchange interface {
// 订单操作
PlaceOrder(ctx, req) (*Order, error)
BatchPlaceOrders(ctx, orders) ([]*Order, bool)
CancelOrder(ctx, symbol, orderID) error
BatchCancelOrders(ctx, symbol, orderIDs) error
CancelAllOrders(ctx, symbol) error
// 账户查询
GetAccount(ctx) (*Account, error)
GetPositions(ctx, symbol) ([]*Position, error)
GetOpenOrders(ctx, symbol) ([]*Order, error)
// WebSocket
StartPriceStream(ctx, symbol, callback)
StartOrderStream(ctx, callback)
StartKlineStream(ctx, symbols, interval, callback)
// 精度信息
GetPriceDecimals() int
GetQuantityDecimals() int
GetBaseAsset() string
GetQuoteAsset() string
}IExchange (接口)
↓
wrapper_binance.go (适配器)
↓
binance/adapter.go (交易所SDK)
↓
binance/websocket.go (WebSocket)
-
API差异:
- Binance: listenKey 订单流
- Bitget: 私有订单WebSocket
- Gate.io: 用户订单 WebSocket
-
精度处理:
- Binance: 通过 exchangeInfo 获取
- Bitget: 通过 contractInfo 获取
- Gate.io: 通过 contracts 获取
-
批量操作:
- Bitget: 原生支持批量下单/撤单
- Binance/Gate: 循环调用单个API
type InventorySlot struct {
Price float64 // 槽位价格(精确到小数点后n位)
// 持仓状态
PositionStatus string // EMPTY/FILLED
PositionQty float64
// 订单状态
OrderID int64
ClientOID string
OrderSide string // BUY/SELL
OrderStatus string // NOT_PLACED/PLACED/FILLED/CANCELED
OrderPrice float64
OrderFilledQty float64
// 锁定机制
SlotStatus string // FREE/PENDING/LOCKED
// PostOnly降级
PostOnlyFailCount int
mu sync.RWMutex // 槽位锁
}1. 初始化 (FREE, EMPTY, 无订单)
↓
2. 下买单 (PENDING → LOCKED, 等待成交)
↓
3. 买单成交 (FILLED, 有持仓)
↓
4. 下卖单 (LOCKED, 等待卖出)
↓
5. 卖单成交 (FREE, EMPTY, 回到初始状态)
// 初始化(设置锚点价格,创建初始槽位)
Initialize(currentPrice, currentPriceStr) error
// 订单窗口调整(价格变化时调用)
AdjustOrders(newPrice) error
// 订单更新回调(WebSocket推送)
OnOrderUpdate(update OrderUpdate)
// 批量操作
CreateBuyOrders(prices []float64)
CreateSellOrders(prices []float64)
CancelAllBuyOrders()- 全局锁:
mu sync.RWMutex(保护 slots Map) - 槽位锁:
slot.mu sync.RWMutex(保护单个槽位) - 槽位状态:
SlotStatus防止重复操作
典型操作流程:
// 下单前:
slot.mu.Lock()
if slot.SlotStatus != "FREE" {
slot.mu.Unlock()
return // 槽位已被占用
}
slot.SlotStatus = "PENDING"
slot.mu.Unlock()
// 下单后:
slot.mu.Lock()
slot.OrderID = orderID
slot.SlotStatus = "LOCKED"
slot.mu.Unlock()- 全局唯一: 整个系统只有一个实例
- WebSocket Only: 不使用 REST API 轮询
- 原子操作: 使用
atomic.Value存储价格
type PriceMonitor struct {
exchange exchange.IExchange
lastPrice atomic.Value // float64
lastPriceStr atomic.Value // string(用于检测精度)
priceChangeCh chan PriceChange
latestPriceChange atomic.Value // *PriceChange
isRunning atomic.Bool
priceSendInterval time.Duration
}1. StartPriceStream (启动 WebSocket)
↓
2. updatePrice (收到价格推送)
↓
3. latestPriceChange.Store (原子存储)
↓
4. periodicPriceSender (定期发送到 channel)
↓
5. main.go 监听 priceChangeCh
// 通过价格字符串检测小数位数
priceStr := "123.4567"
parts := strings.Split(priceStr, ".")
if len(parts) == 2 {
decimals := len(parts[1]) // 4位小数
}CheckAccountSafety(
ex, symbol, currentPrice,
orderAmount, priceInterval, feeRate,
requiredPositions, priceDecimals
)检查内容:
- 账户余额充足性
- 杠杆倍数限制(最高10倍)
- 最大可持仓数计算
- 手续费率验证
- 盈利率 vs 手续费率
公式:
最大可用保证金 = 账户余额 × 杠杆倍数
每仓成本 = 订单金额(固定)
最大持仓数 = 最大可用保证金 / 每仓成本
type RiskMonitor struct {
cfg *config.Config
exchange exchange.IExchange
symbolDataMap map[string]*SymbolData // K线缓存
triggered bool // 是否触发风控
}监控逻辑:
- 实时监听多个币种的K线(如BTC、ETH)
- 计算成交量移动平均
- 检测当前成交量是否超过阈值(默认3倍)
- 触发风控 → 撤销所有买单,暂停交易
- 恢复条件:多数币种恢复正常(默认3/5)
配置示例:
risk_control:
enabled: true
monitor_symbols: ["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT", "ADAUSDT"]
interval: "1m"
volume_multiplier: 3.0
average_window: 20
recovery_threshold: 3type Reconciler struct {
cfg *config.Config
exchange IExchange
positionManager *SuperPositionManager
pauseChecker func() bool // 风控暂停检查
}对账内容:
- 交易所持仓 vs 本地持仓
- 交易所未完成订单 vs 本地订单
- 槽位状态修复
对账周期:
- 默认每5分钟(可配置)
- 风控触发时暂停对账日志
type OrderCleaner struct {
cfg *config.Config
executor *order.ExchangeOrderExecutor
manager *SuperPositionManager
}清理策略:
- 检查未完成订单数量
- 超过阈值(默认100)时触发清理
- 批量撤销最旧的订单(默认10个/批)
- 重置对应槽位状态
- 限流: 25单/秒,突发30(可配置)
- 重试: 自动重试失败订单
- PostOnly降级: 连续失败3次后降级为普通单
PlaceOrder(req *OrderRequest) (*Order, error) {
// 1. 限流等待
rateLimiter.Wait()
// 2. 重试循环(最多5次)
for i := 0; i <= 5; i++ {
order, err := exchange.PlaceOrder(ctx, req)
// 3. PostOnly错误检测
if isPostOnlyError(err) {
postOnlyFailCount++
if postOnlyFailCount >= 3 {
degraded = true // 降级为普通单
}
continue
}
// 4. 其他错误重试
if err != nil {
time.Sleep(orderRetryDelay)
continue
}
return order, nil
}
}BatchPlaceOrders(orders []*OrderRequest) ([]*Order, bool) {
// 调用交易所批量API(Bitget原生支持)
// 或循环调用单个API(Binance/Gate)
results, marginError := exchange.BatchPlaceOrders(ctx, orders)
// marginError: 是否有保证金不足错误
return results, marginError
}main.go
├── config (配置)
├── logger (日志)
├── exchange (交易所)
│ └── binance/bitget/gate (实现)
├── monitor (价格监控)
│ └── exchange.IExchange
├── order (订单执行)
│ └── exchange.IExchange
├── position (仓位管理)
│ ├── order.OrderExecutor (接口适配)
│ └── IExchange (子集接口)
└── safety (安全风控)
├── exchange.IExchange
└── position.SuperPositionManager
问题: position 需要调用 order 执行器,order 需要 position 的数据结构
解决方案: 在 position 包内定义接口
// position/super_position_manager.go
type OrderExecutorInterface interface {
PlaceOrder(req *OrderRequest) (*Order, error)
BatchPlaceOrders(orders []*OrderRequest) ([]*Order, bool)
BatchCancelOrders(orderIDs []int64) error
}
// main.go 中创建适配器
type exchangeExecutorAdapter struct {
executor *order.ExchangeOrderExecutor
}
func (a *exchangeExecutorAdapter) PlaceOrder(req *position.OrderRequest) (*position.Order, error) {
// 转换类型并调用
}问题: position 需要查询交易所,但不能依赖 exchange 包(循环)
解决方案: 定义子集接口
// position/super_position_manager.go
type IExchange interface {
GetName() string
GetPositions(ctx, symbol) (interface{}, error)
GetOpenOrders(ctx, symbol) (interface{}, error)
GetOrder(ctx, symbol, orderID) (interface{}, error)
GetBaseAsset() string
CancelAllOrders(ctx, symbol) error
}问题: exchange 订单流回调需要传递 position.OrderUpdate,但会循环依赖
解决方案: 使用 interface{} + 反射
// exchange/interface.go
StartOrderStream(ctx, callback func(interface{})) error
// main.go
ex.StartOrderStream(ctx, func(updateInterface interface{}) {
v := reflect.ValueOf(updateInterface)
// 反射提取字段
posUpdate := position.OrderUpdate{
OrderID: getInt64Field("OrderID"),
ClientOrderID: getStringField("ClientOrderID"),
...
}
superPositionManager.OnOrderUpdate(posUpdate)
})main.go 启动的协程:
1. priceMonitor.Start() # 价格 WebSocket
2. ex.StartOrderStream() # 订单 WebSocket
3. riskMonitor.Start() # 风控 K线 WebSocket
4. reconciler.Start() # 定期对账(每5分钟)
5. orderCleaner.Start() # 定期清理(每60秒)
6. 价格变化监听 (main goroutine) # 监听 priceChangeCh
7. 定期打印状态 # 每1分钟
1. priceChangeCh (monitor)
类型: chan PriceChange
容量: 10
作用: 价格变化推送
2. priceCh (订阅者)
类型: chan PriceChange
容量: 10
作用: 价格订阅(多个订阅者)
3. sigChan (main)
类型: chan os.Signal
容量: 1
作用: 退出信号
1. sync.Map (position/slots)
作用: 槽位存储(支持并发读写)
2. sync.RWMutex (position/mu)
作用: 全局槽位锁(保护 Map 操作)
3. sync.RWMutex (InventorySlot/mu)
作用: 槽位级别锁(细粒度锁)
4. atomic.Value (price/lastPrice)
作用: 无锁原子操作(价格读取)
5. atomic.Bool (price/isRunning)
作用: 运行状态标志
-
槽位并发修改
- 风险: 价格变化协程 vs 订单更新回调
- 保护: 槽位锁 + SlotStatus 状态机
-
订单重复下单
- 风险: AdjustOrders 快速调用
- 保护: SlotStatus = PENDING 锁定
-
价格读取
- 风险: 多个协程同时读取
- 保护: atomic.Value(无锁)
❌ 反模式:
全局锁持有时 → 调用交易所API → 网络延迟 → 阻塞其他协程
✅ 正确做法:
释放锁 → 调用API → 重新获取锁 → 更新状态
第1层: 启动前检查 (safety.CheckAccountSafety)
├── 余额充足性
├── 杠杆倍数限制
└── 手续费率验证
第2层: 主动风控 (RiskMonitor)
├── K线成交量异常检测
├── 多币种联动监控
└── 自动撤销买单
第3层: 订单清理 (OrderCleaner)
├── 未完成订单数量限制
└── 定期清理旧订单
第4层: 持仓对账 (Reconciler)
├── 本地 vs 交易所对账
└── 槽位状态修复
第5层: 人工干预
├── SIGINT/SIGTERM 优雅退出
└── cancel_on_exit 配置
成交量异常检测
↓
RiskMonitor.IsTriggered() = true
↓
main.go 价格监听协程检测
↓
superPositionManager.CancelAllBuyOrders()
↓
暂停交易(跳过 AdjustOrders)
↓
等待恢复条件满足
↓
RiskMonitor.IsTriggered() = false
↓
恢复自动交易
// SuperPositionManager
insufficientMargin bool # 标志位
marginLockUntil time.Time # 锁定时间
marginLockDuration time.Duration # 锁定时长(默认10秒)
// 批量下单失败处理
if marginError {
manager.insufficientMargin = true
manager.marginLockUntil = time.Now().Add(marginLockDuration)
}
// 后续下单检查
if manager.insufficientMargin && time.Now().Before(manager.marginLockUntil) {
return // 保证金锁定中,跳过下单
}app:
current_exchange: "binance"
exchanges:
binance:
api_key: "your_api_key"
secret_key: "your_secret_key"
fee_rate: 0.0002
bitget:
api_key: "your_api_key"
secret_key: "your_secret_key"
passphrase: "your_passphrase"
fee_rate: 0.0002
trading:
symbol: "BTCUSDT"
price_interval: 1.0
order_quantity: 30.0
min_order_value: 6.0
buy_window_size: 100
sell_window_size: 100
reconcile_interval: 5
order_cleanup_threshold: 100
cleanup_batch_size: 10
margin_lock_duration_seconds: 10
position_safety_check: 100
system:
log_level: "INFO"
cancel_on_exit: true
risk_control:
enabled: true
monitor_symbols: ["BTCUSDT", "ETHUSDT", "BNBUSDT"]
interval: "1m"
volume_multiplier: 3.0
average_window: 20
recovery_threshold: 3
timing:
websocket_reconnect_delay: 5
websocket_write_wait: 10
websocket_pong_wait: 60
websocket_ping_interval: 20
listen_key_keepalive_interval: 30
price_send_interval: 50
rate_limit_retry_delay: 1
order_retry_delay: 500
price_poll_interval: 500
status_print_interval: 1
order_cleanup_interval: 60| 术语 | 英文 | 说明 |
|---|---|---|
| 槽位 | Slot | 每个价格点的仓位和订单管理单元 |
| 锚点价格 | Anchor Price | 系统初始化时的市场价格,作为网格基准 |
| 固定金额模式 | Fixed Amount Mode | 每笔交易投入固定金额(而非固定数量) |
| 价格精度 | Price Decimals | 价格小数位数(如BTC为2,ETH为2) |
| 数量精度 | Quantity Decimals | 数量小数位数(如BTC为3,ETH为3) |
| PostOnly | Post Only Order | 只做Maker的订单(不立即成交) |
| ReduceOnly | Reduce Only Order | 只减仓订单(平仓单) |
| 保证金锁定 | Margin Lock | 批量下单失败后的冷却时间 |
| 对账 | Reconciliation | 本地状态与交易所状态的一致性检查 |
REST API: 1200次/分钟
WebSocket: 10连接/IP
订单: 10单/秒(单交易对)
REST API: 600次/分钟
批量下单: 20单/次
批量撤单: 20单/次
WebSocket: 无特殊限制
REST API: 900次/分钟
订单: 100单/秒
WebSocket: 无特殊限制
2025-12-24 10:00:00 [INFO] 🚀 www.QuantMesh.com 做市商系统启动...
2025-12-24 10:00:00 [INFO] 📦 版本号: v3.3.1
2025-12-24 10:00:00 [INFO] ✅ 配置加载成功: 交易对=BTCUSDT, 窗口大小=100
2025-12-24 10:00:01 [INFO] ✅ 使用交易所: Binance
2025-12-24 10:00:02 [INFO] 🔗 启动 WebSocket 价格流...
2025-12-24 10:00:03 [INFO] 📊 当前价格: 42156.78
2025-12-24 10:00:04 [INFO] 🔒 ===== 开始持仓安全性检查 =====
2025-12-24 10:00:04 [INFO] 💰 账户余额: 3000.00 USDT
2025-12-24 10:00:04 [INFO] 📈 当前币价: 42156.78, 每笔金额: 30.00 USDT
2025-12-24 10:00:04 [INFO] ✅ 持仓安全性检查通过:可以安全持有至少 100 仓
2025-12-24 10:00:05 [INFO] ✅ [Binance] 订单流已启动
2025-12-24 10:00:06 [INFO] 📊 [SuperPositionManager] 初始化成功,锚点价格: 42156.78
2025-12-24 10:00:07 [INFO] 🛡️ 启动主动安全风控监控 (周期: 1m, 倍数: 3.0)
2025-12-24 10:00:08 [INFO] ✅ 系统启动完成,开始自动交易
QuantMesh是一个设计合理但有改进空间的做市商系统。核心架构采用:
- 接口抽象 + 工厂模式(多交易所)
- WebSocket驱动 + 事件回调(实时性)
- 细粒度锁 + 原子操作(并发安全)
- 多层风控 + 状态机(安全性)
官网:
- Website: www.QuantMesh.com
- Version: v3.3.1
- Last Updated: 2025-12-24