-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbot_manager.go
More file actions
1291 lines (1158 loc) · 38.6 KB
/
bot_manager.go
File metadata and controls
1291 lines (1158 loc) · 38.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"quantmesh/config"
"quantmesh/event"
"quantmesh/feerate"
"quantmesh/lock"
"quantmesh/logger"
"quantmesh/position"
"quantmesh/storage"
)
// BotRuntime 代表單個 Bot 的運行時,封裝 SymbolRuntime 實現 Bot 級別的邏輯隔離
type BotRuntime struct {
Config config.BotConfig
BotID string
Inner *SymbolRuntime
EventBus *event.EventBus
configMu sync.RWMutex // 保護 Config 的並發訪問
}
// botStartFailure 記錄異步啟動失敗原因(供 Web API 與前端輪詢展示)
type botStartFailure struct {
Message string
At time.Time
}
// BotManager 管理多個 BotRuntime,按 BotID 進行生命週期管理
type BotManager struct {
cfg *config.Config
runtimes map[string]*BotRuntime
runtimesMu sync.RWMutex
groupLegAlerted map[string]bool
groupLegTimers map[string]*time.Timer
singleLegGraceSec int
eventBus *event.EventBus
storageService *storage.StorageService
distributedLock lock.DistributedLock
botStatesFileOverride string // 測試用,空時用默認 ./data/bot_states.json
startFailMu sync.RWMutex
startFail map[string]botStartFailure
primaryYAMLPath string // 命令行主配置路徑(非空時啟動前與主庫一併刷新內存配置)
}
// NewBotManager 創建 Bot 管理器。primaryYAMLPath 為啟動時傳入的主 YAML 路徑(無則傳空),用於與 app_config 一致的刷新順序。
func NewBotManager(cfg *config.Config, eventBus *event.EventBus, storageService *storage.StorageService, distributedLock lock.DistributedLock, primaryYAMLPath string) *BotManager {
return &BotManager{
cfg: cfg,
runtimes: make(map[string]*BotRuntime),
groupLegAlerted: make(map[string]bool),
groupLegTimers: make(map[string]*time.Timer),
singleLegGraceSec: 30,
eventBus: eventBus,
storageService: storageService,
distributedLock: distributedLock,
startFail: make(map[string]botStartFailure),
primaryYAMLPath: strings.TrimSpace(primaryYAMLPath),
}
}
func (bm *BotManager) refreshConfigBeforeBotStart() error {
if bm == nil || bm.cfg == nil {
return nil
}
var st storage.Storage
if bm.storageService != nil {
st = bm.storageService.GetStorage()
}
return storage.RefreshTradingConfigFromPrimarySource(bm.primaryYAMLPath, st, &bm.cfg)
}
// resolveLatestStartConfig 在真正啟動前重新對齊 Bot 配置:
// 1. 優先使用剛刷新的 bm.cfg.Bots;
// 2. 若主庫存在 bot_configs 快照,則再用該 Bot 專屬快照覆蓋。
// 避免 StartBot 調用入口傳入的是保存前/刷新前的舊副本。
func (bm *BotManager) resolveLatestStartConfig(botCfg config.BotConfig) config.BotConfig {
if bm == nil {
return botCfg
}
botID := config.BotIDOrGenerate(botCfg)
latest := botCfg
if bm.cfg != nil {
for i := range bm.cfg.Bots {
candidate := bm.cfg.Bots[i]
if config.BotIDOrGenerate(candidate) == botID {
latest = candidate
break
}
}
}
if bm.storageService == nil {
return latest
}
ss, ok := bm.storageService.GetStorage().(*storage.SQLStorage)
if !ok || ss == nil {
return latest
}
doc, err := ss.GetBotConfigDocument(context.Background(), botID)
if err != nil {
logger.Warn("⚠️ [%s] 啟動前讀取 bot_configs 失敗,回退主配置快照: %v", botID, err)
return latest
}
if doc == nil || strings.TrimSpace(doc.Content) == "" {
return latest
}
var bf config.BotConfigFile
if err := json.Unmarshal([]byte(doc.Content), &bf); err != nil {
logger.Warn("⚠️ [%s] 啟動前解析 bot_configs 失敗,回退主配置快照: %v", botID, err)
return latest
}
latest = config.ConvertToBotConfig(&bf)
if latest.ID == "" {
latest.ID = botID
}
return latest
}
func (bm *BotManager) applyExchangeFeeFromAPIForBot(botCfg config.BotConfig) {
if bm == nil || bm.cfg == nil || botCfg.Exchange == "" || botCfg.Symbol == "" {
return
}
if bm.cfg.Timing.SkipExchangeFeeOnBotStart {
return
}
maker, taker, err := feerate.FetchFromExchangeAPI(bm.cfg, botCfg.Exchange, botCfg.Symbol)
if err != nil {
logger.Info("ℹ️ 啟動前從交易所拉取手續費跳過: %v", err)
return
}
if taker <= 0 {
return
}
if exCfg, ok := bm.cfg.Exchanges[botCfg.Exchange]; ok {
exCfg.FeeRate = taker
bm.cfg.Exchanges[botCfg.Exchange] = exCfg
logger.Info("💳 啟動前已依交易所接口更新 %s Taker 手續費: %.4f%%(maker %.4f%%,用於持倉安全檢查)",
botCfg.Exchange, taker*100, maker*100)
}
}
// runPeriodicFeeRefresh 先從主庫/YAML 刷新內存配置,再按各交易所拉取 Taker 費率寫入內存(不強制寫回數據庫)。
func (bm *BotManager) runPeriodicFeeRefresh() {
if bm == nil || bm.cfg == nil {
return
}
var st storage.Storage
if bm.storageService != nil {
st = bm.storageService.GetStorage()
}
if err := storage.RefreshTradingConfigFromPrimarySource(bm.primaryYAMLPath, st, &bm.cfg); err != nil {
logger.Warn("⚠️ 定期刷新主配置失敗(仍嘗試拉取交易所費率): %v", err)
}
seen := make(map[string]bool)
type pair struct{ ex, sym string }
var pairs []pair
for _, b := range bm.cfg.Bots {
if b.Exchange == "" || b.Symbol == "" {
continue
}
k := strings.ToLower(b.Exchange)
if seen[k] {
continue
}
seen[k] = true
pairs = append(pairs, pair{b.Exchange, b.Symbol})
}
if len(pairs) == 0 {
seen = make(map[string]bool)
for _, s := range bm.cfg.Trading.Symbols {
if s.Exchange == "" || s.Symbol == "" {
continue
}
k := strings.ToLower(s.Exchange)
if seen[k] {
continue
}
seen[k] = true
pairs = append(pairs, pair{s.Exchange, s.Symbol})
}
}
for _, p := range pairs {
maker, taker, err := feerate.FetchFromExchangeAPI(bm.cfg, p.ex, p.sym)
if err != nil {
logger.Info("ℹ️ 定期拉取 %s 手續費失敗: %v", p.ex, err)
continue
}
if taker <= 0 {
continue
}
if exCfg, ok := bm.cfg.Exchanges[p.ex]; ok {
exCfg.FeeRate = taker
bm.cfg.Exchanges[p.ex] = exCfg
logger.Info("💳 定期同步 %s Taker 手續費: %.4f%%(maker %.4f%%)", p.ex, taker*100, maker*100)
}
}
}
// StartFeeRateRefreshLoop 按 timing.fee_rate_refresh_minutes 週期刷新主配置並拉取交易所費率;分鐘數 <= 0 時不啟動。
func (bm *BotManager) StartFeeRateRefreshLoop(ctx context.Context) {
if bm == nil || bm.cfg == nil {
return
}
min := bm.cfg.Timing.FeeRateRefreshMinutes
if min <= 0 {
return
}
d := time.Duration(min) * time.Minute
logger.Info("⏱️ 交易所手續費定期同步已啟用(間隔 %v)", d)
go func() {
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
bm.runPeriodicFeeRefresh()
}
}
}()
}
// symbolKey 返回交易所+交易對+市場類型的標準化 key,用於衝突檢測
func symbolKey(exchange, symbol, marketType string) string {
return strings.ToLower(fmt.Sprintf("%s:%s:%s", exchange, symbol, marketType))
}
// findConflictingRuntime 檢查是否已有與待啟動 Bot 衝突的運行中實例(期現套利規則 + 期貨腿重疊)
func (bm *BotManager) findConflictingRuntime(newBot *config.BotConfig) *BotRuntime {
bm.runtimesMu.RLock()
defer bm.runtimesMu.RUnlock()
return bm.findConflictingRuntimeUnlocked(newBot)
}
func (bm *BotManager) findConflictingRuntimeUnlocked(newBot *config.BotConfig) *BotRuntime {
for _, br := range bm.runtimes {
if br == nil {
continue
}
if config.BotsConflict(&br.Config, newBot) {
return br
}
}
return nil
}
func (bm *BotManager) recordStartFailure(botID string, err error) {
if bm == nil || err == nil || botID == "" {
return
}
bm.startFailMu.Lock()
defer bm.startFailMu.Unlock()
if bm.startFail == nil {
bm.startFail = make(map[string]botStartFailure)
}
bm.startFail[botID] = botStartFailure{Message: err.Error(), At: time.Now()}
}
func (bm *BotManager) clearStartFailure(botID string) {
if bm == nil || botID == "" {
return
}
bm.startFailMu.Lock()
defer bm.startFailMu.Unlock()
if bm.startFail != nil {
delete(bm.startFail, botID)
}
}
// GetLastStartFailure 返回最近一次啟動失敗信息(無則 ok=false)
func (bm *BotManager) GetLastStartFailure(botID string) (message string, failedAt time.Time, ok bool) {
if bm == nil || botID == "" {
return "", time.Time{}, false
}
bm.startFailMu.RLock()
defer bm.startFailMu.RUnlock()
if bm.startFail == nil {
return "", time.Time{}, false
}
rec, ok := bm.startFail[botID]
if !ok {
return "", time.Time{}, false
}
return rec.Message, rec.At, true
}
// StartBot 啟動指定 Bot
func (bm *BotManager) StartBot(ctx context.Context, botCfg config.BotConfig) (*BotRuntime, error) {
botID := config.BotIDOrGenerate(botCfg)
botCfg.ID = botID
bm.clearStartFailure(botID)
bm.runtimesMu.RLock()
_, exists := bm.runtimes[botID]
bm.runtimesMu.RUnlock()
if exists {
return nil, nil // 已在運行
}
// 🔒 衝突檢測:阻止同一交易所+交易對+市場類型啟動多個 Bot
// 原因:交易所倉位按 Symbol 隔離,多個 Bot 無法區分誰擁有哪部分倉位,
// 會導致倉位重複認領、訂單互撞、對賬覆蓋等問題。
if conflict := bm.findConflictingRuntime(&botCfg); conflict != nil {
logger.Warn("🚫 [%s] 跳過啟動:與運行中 Bot [%s] 衝突(期貨腿或資金費套利互斥規則)。"+
"如需多策略,請在同一 Bot 內配置多個 strategies。",
botID, conflict.BotID)
err := fmt.Errorf("symbol_conflict: 與 Bot [%s] 衝突", conflict.BotID)
bm.recordStartFailure(botID, err)
return nil, err
}
// 🔒 檢查數據庫中的啟停狀態(優先級高於配置文件)
// 如果數據庫中標記為已禁用,則跳過啟動
dbEnabled, reason := bm.isBotEnabledInDB(botID)
if !dbEnabled {
logger.Warn("🚫 [%s] 跳過啟動:數據庫中已禁用此 Bot(原因: %s)", botID, reason)
// 发布启动失败事件(因被禁用)
bm.eventBus.Publish(&event.Event{
Type: event.EventTypeTradingStartFailed,
Data: map[string]interface{}{
"bot_id": botID,
"exchange": botCfg.Exchange,
"symbol": botCfg.Symbol,
"error": fmt.Sprintf("Bot 在數據庫中被禁用: %s", reason),
},
})
err := fmt.Errorf("bot_disabled_in_database: %s", reason)
bm.recordStartFailure(botID, err)
return nil, err
}
if err := bm.refreshConfigBeforeBotStart(); err != nil {
logger.Warn("⚠️ 啟動前重新加載主配置失敗(繼續使用進程內存中的配置): %v", err)
}
botCfg = bm.resolveLatestStartConfig(botCfg)
bm.applyExchangeFeeFromAPIForBot(botCfg)
symCfg := config.BotConfigToSymbolConfig(botCfg)
onRequestStop := func(botID string) {
_ = bm.StopBotWithReason(botID, "close_condition", "關閉條件觸發")
}
rt, err := startSymbolRuntime(ctx, bm.cfg, symCfg, bm.eventBus, bm.storageService, bm.distributedLock, onRequestStop)
if err != nil {
// 发布启动失败事件
bm.eventBus.Publish(&event.Event{
Type: event.EventTypeTradingStartFailed,
Data: map[string]interface{}{
"bot_id": botID,
"exchange": botCfg.Exchange,
"symbol": botCfg.Symbol,
"error": err.Error(),
},
})
bm.recordStartFailure(botID, err)
return nil, err
}
br := &BotRuntime{
Config: botCfg,
BotID: botID,
Inner: rt,
EventBus: bm.eventBus,
}
bm.runtimesMu.Lock()
if _, ok := bm.runtimes[botID]; ok {
bm.runtimesMu.Unlock()
if rt != nil && rt.Stop != nil {
rt.Stop()
}
return nil, nil
}
if conflict := bm.findConflictingRuntimeUnlocked(&botCfg); conflict != nil {
bm.runtimesMu.Unlock()
if rt != nil && rt.Stop != nil {
rt.Stop()
}
err := fmt.Errorf("symbol_conflict: 與 Bot [%s] 衝突", conflict.BotID)
bm.recordStartFailure(botID, err)
return nil, err
}
bm.runtimes[botID] = br
bm.runtimesMu.Unlock()
bm.clearStartFailure(botID)
if bm.storageService != nil {
registerWebSymbolProvidersForRuntime(rt, &botCfg, bm.storageService)
}
// 发布启动成功事件
bm.eventBus.Publish(&event.Event{
Type: event.EventTypeTradingStarted,
Data: map[string]interface{}{
"bot_id": botID,
"exchange": botCfg.Exchange,
"symbol": botCfg.Symbol,
"strategy": botCfg.Strategies,
},
})
bm.checkGroupLegConsistencyForBot(botID)
return br, nil
}
// StopBot 停止指定 Bot
func (bm *BotManager) StopBot(botID string) error {
return bm.StopBotWithReason(botID, "web_ui", "用戶通過 Web UI 停止")
}
// StopBotWithReason 停止指定 Bot 並記錄原因(供關閉條件等自動停止場景使用)
func (bm *BotManager) StopBotWithReason(botID, updatedBy, reason string) error {
bm.runtimesMu.Lock()
br, ok := bm.runtimes[botID]
if !ok {
bm.runtimesMu.Unlock()
return nil
}
delete(bm.runtimes, botID)
bm.runtimesMu.Unlock()
unregisterWebSymbolProvidersForRuntime(&br.Config)
if br.Inner != nil && br.Inner.Stop != nil {
br.Inner.Stop()
}
// 🔥 保存停止狀態到數據庫(持久化,重啟後仍然有效)
bm.saveBotStateToDB(botID, false, updatedBy, reason)
// 发布停止事件
bm.eventBus.Publish(&event.Event{
Type: event.EventTypeTradingStopped,
Data: map[string]interface{}{
"bot_id": botID,
"exchange": br.Config.Exchange,
"symbol": br.Config.Symbol,
"strategy": br.Config.Strategies,
"reason": reason,
},
})
bm.checkGroupLegConsistencyForBot(botID)
return nil
}
// EnableBot 啟用 Bot(從數據庫移除禁用標記)
// 注意:這只是從數據庫移除禁用標記,不會立即啟動 Bot
// Bot 需要通過 StartBot 方法或在配置文件中 enabled=true 才會啟動
func (bm *BotManager) EnableBot(botID string) error {
// 🔥 從數據庫中刪除禁用記錄(或設置為 enabled=true)
bm.saveBotStateToDB(botID, true, "web_ui", "用戶通過 Web UI 啟用")
logger.Info("✅ [%s] Bot 已在數據庫中標記為啟用,可以通過 StartBot 方法啟動", botID)
return nil
}
// Get 按 BotID 獲取運行時
func (bm *BotManager) Get(botID string) (*BotRuntime, bool) {
bm.runtimesMu.RLock()
defer bm.runtimesMu.RUnlock()
br, ok := bm.runtimes[botID]
return br, ok
}
// GetByExchangeSymbol 按交易所和交易對獲取運行時(兼容舊接口)
func (bm *BotManager) GetByExchangeSymbol(exchangeName, symbol string, marketType ...string) (*BotRuntime, bool) {
mt := "futures"
if len(marketType) > 0 && marketType[0] != "" {
mt = marketType[0]
}
botID := config.GenerateBotID(exchangeName, symbol, mt)
return bm.Get(botID)
}
// List 列出所有 Bot 運行時
func (bm *BotManager) List() []*BotRuntime {
bm.runtimesMu.RLock()
defer bm.runtimesMu.RUnlock()
list := make([]*BotRuntime, 0, len(bm.runtimes))
for _, br := range bm.runtimes {
list = append(list, br)
}
return list
}
// Remove 從管理器中移除 Bot 運行時(會先停止 Bot)
func (bm *BotManager) Remove(botID string) {
_ = bm.StopBot(botID)
}
// AddRuntime 註冊已創建的 BotRuntime(用於啟動時已有 SymbolRuntime 的向後兼容場景)
func (bm *BotManager) AddRuntime(br *BotRuntime) {
if br == nil || br.BotID == "" {
return
}
bm.runtimesMu.Lock()
defer bm.runtimesMu.Unlock()
if bm.groupLegAlerted == nil {
bm.groupLegAlerted = make(map[string]bool)
}
bm.runtimes[br.BotID] = br
}
// StopAll 停止所有 Bot
func (bm *BotManager) StopAll() {
bm.runtimesMu.Lock()
runtimes := make([]*BotRuntime, 0, len(bm.runtimes))
for _, br := range bm.runtimes {
runtimes = append(runtimes, br)
}
bm.runtimes = make(map[string]*BotRuntime)
bm.groupLegAlerted = make(map[string]bool)
for _, timer := range bm.groupLegTimers {
if timer != nil {
timer.Stop()
}
}
bm.groupLegTimers = make(map[string]*time.Timer)
bm.runtimesMu.Unlock()
for _, br := range runtimes {
if br != nil && br.Inner != nil && br.Inner.Stop != nil {
br.Inner.Stop()
}
}
}
func (bm *BotManager) checkGroupLegConsistencyForBot(botID string) {
if bm == nil || bm.cfg == nil || botID == "" {
return
}
for _, group := range bm.cfg.BotGroups {
if len(group.BotIDs) < 2 || !containsBotID(group.BotIDs, botID) {
continue
}
var runningIDs []string
total := len(group.BotIDs)
publishAlert := false
publishRecovered := false
bm.runtimesMu.Lock()
if bm.groupLegAlerted == nil {
bm.groupLegAlerted = make(map[string]bool)
}
for _, id := range group.BotIDs {
if _, ok := bm.runtimes[id]; ok {
runningIDs = append(runningIDs, id)
}
}
running := len(runningIDs)
alreadyAlerted := bm.groupLegAlerted[group.ID]
if running > 0 && running < total {
if !alreadyAlerted {
bm.groupLegAlerted[group.ID] = true
publishAlert = true
}
bm.scheduleSingleLegEnforcementLocked(group.ID)
} else {
if running == total && alreadyAlerted {
publishRecovered = true
}
bm.groupLegAlerted[group.ID] = false
bm.cancelSingleLegEnforcementLocked(group.ID)
}
bm.runtimesMu.Unlock()
if publishAlert {
logger.Warn("⚠️ [BotGroup:%s] 对冲组出现单腿运行,running=%v total=%d", group.ID, runningIDs, total)
if bm.eventBus != nil {
bm.eventBus.Publish(&event.Event{
Type: event.EventTypeError,
Data: map[string]interface{}{
"group_id": group.ID,
"group_name": group.Name,
"issue": "single_leg_running",
"running_bot_ids": runningIDs,
"expected_bot_ids": group.BotIDs,
},
})
}
}
if publishRecovered {
logger.Info("✅ [BotGroup:%s] 对冲组双腿已恢复一致运行", group.ID)
if bm.eventBus != nil {
bm.eventBus.Publish(&event.Event{
Type: event.EventTypeRiskRecovered,
Data: map[string]interface{}{
"group_id": group.ID,
"group_name": group.Name,
"issue": "single_leg_running",
"running_bot_ids": runningIDs,
},
})
}
}
}
}
func (bm *BotManager) scheduleSingleLegEnforcementLocked(groupID string) {
if bm.singleLegGraceSec <= 0 {
return
}
if bm.groupLegTimers == nil {
bm.groupLegTimers = make(map[string]*time.Timer)
}
if _, exists := bm.groupLegTimers[groupID]; exists {
return
}
delay := time.Duration(bm.singleLegGraceSec) * time.Second
bm.groupLegTimers[groupID] = time.AfterFunc(delay, func() {
bm.enforceSingleLegPause(groupID)
})
}
func (bm *BotManager) cancelSingleLegEnforcementLocked(groupID string) {
if bm.groupLegTimers == nil {
return
}
if timer, exists := bm.groupLegTimers[groupID]; exists {
timer.Stop()
delete(bm.groupLegTimers, groupID)
}
}
func (bm *BotManager) enforceSingleLegPause(groupID string) {
if bm == nil || bm.cfg == nil || groupID == "" {
return
}
var targetGroup *config.BotGroup
for i := range bm.cfg.BotGroups {
if bm.cfg.BotGroups[i].ID == groupID {
targetGroup = &bm.cfg.BotGroups[i]
break
}
}
if targetGroup == nil {
bm.runtimesMu.Lock()
bm.cancelSingleLegEnforcementLocked(groupID)
bm.runtimesMu.Unlock()
return
}
runningIDs := make([]string, 0, len(targetGroup.BotIDs))
bm.runtimesMu.Lock()
for _, id := range targetGroup.BotIDs {
if _, ok := bm.runtimes[id]; ok {
runningIDs = append(runningIDs, id)
}
}
// 当前已经恢复(全运行)或全部停止,不执行自动暂停
if len(runningIDs) == 0 || len(runningIDs) == len(targetGroup.BotIDs) {
bm.cancelSingleLegEnforcementLocked(groupID)
bm.runtimesMu.Unlock()
return
}
bm.cancelSingleLegEnforcementLocked(groupID)
bm.runtimesMu.Unlock()
for _, id := range runningIDs {
if br, ok := bm.Get(id); ok && br != nil {
br.PauseOpening("single_leg_running")
}
}
logger.Warn("🛑 [BotGroup:%s] 单腿运行超过 %d 秒,自动暂停开仓。running=%v", groupID, bm.singleLegGraceSec, runningIDs)
if bm.eventBus != nil {
bm.eventBus.Publish(&event.Event{
Type: event.EventTypeRiskTriggered,
Data: map[string]interface{}{
"group_id": groupID,
"issue": "single_leg_running",
"action": "pause_opening",
"running_bot_ids": runningIDs,
},
})
}
}
func containsBotID(botIDs []string, target string) bool {
for _, id := range botIDs {
if id == target {
return true
}
}
return false
}
// ListSymbolRuntimes 返回底層 SymbolRuntime 列表(供需要兼容 SymbolManager 的調用方使用)
func (bm *BotManager) ListSymbolRuntimes() []*SymbolRuntime {
bm.runtimesMu.RLock()
defer bm.runtimesMu.RUnlock()
list := make([]*SymbolRuntime, 0, len(bm.runtimes))
for _, br := range bm.runtimes {
if br.Inner != nil {
list = append(list, br.Inner)
}
}
return list
}
// UpdateRuntimeTradingParams 更新運行中的 Bot 交易參數(熱更新)
// 始終同步 Config 到運行時,確保 smart_order 等非交易參數變更也能反映到 GetBot 返回的詳情中
func (bm *BotManager) UpdateRuntimeTradingParams(latestCfg *config.Config) (updatedBotIDs []string) {
for _, botCfg := range latestCfg.Bots {
botID := botCfg.ID
if botID == "" {
botID = config.GenerateBotID(botCfg.Exchange, botCfg.Symbol, botCfg.GetMarketType())
}
bm.runtimesMu.RLock()
br, ok := bm.runtimes[botID]
bm.runtimesMu.RUnlock()
if !ok || br.Inner == nil || br.Inner.SuperPositionManager == nil {
continue
}
symCfg := config.BotConfigToSymbolConfig(botCfg)
changed := br.Inner.SuperPositionManager.UpdateTradingParams(
symCfg.PriceInterval,
symCfg.ProfitSpread,
symCfg.OrderQuantity,
symCfg.BuyWindowSize,
symCfg.SellWindowSize,
)
br.Inner.SuperPositionManager.SetSpotInventoryPolicy(symCfg.SpotInventoryPolicy)
// 始終同步 Config,確保 smart_order、風控等配置變更在刷新頁面時正確顯示
br.Config = botCfg
br.Inner.Config = symCfg
if changed {
updatedBotIDs = append(updatedBotIDs, botID)
}
}
return
}
// ClosePositions 平倉(支持市價/限價)
func (br *BotRuntime) ClosePositions(ctx context.Context, cfg config.ClosePositionConfig) (*position.ClosePositionRecord, error) {
if br.Inner == nil || br.Inner.SuperPositionManager == nil {
return nil, fmt.Errorf("bot not initialized")
}
// 獲取交易所
exchange := br.Inner.Exchange
if exchange == nil {
return nil, fmt.Errorf("exchange not initialized")
}
// 創建平倉管理器
closeMgr := position.NewClosePositionManager(
position.NewExchangeAdapterWrapper(exchange),
br.BotID,
br.Config.Symbol,
)
// 獲取持倉
_, err := exchange.GetPositions(ctx, br.Config.Symbol)
if err != nil {
return nil, fmt.Errorf("failed to get positions: %w", err)
}
// 平倉
// 這裡需要根據實際的持倉情況來決定平倉方向
// 暫時簡化處理,假設做多平倉為賣單
record, err := closeMgr.ClosePositions(ctx, "SELL", 100, cfg) // 這裡需要實際數量
if err != nil {
return nil, err
}
return record, nil
}
// GetCloseRecords 獲取平倉記錄
func (br *BotRuntime) GetCloseRecords() []*position.ClosePositionRecord {
// 暫時返回空列表,實際需要從平倉管理器獲取
return []*position.ClosePositionRecord{}
}
// GetSlotFilter 獲取槽位過濾器
func (br *BotRuntime) GetSlotFilter() *config.SlotFilterConfig {
if br.Inner == nil || br.Inner.SuperPositionManager == nil {
return nil
}
return br.Inner.SuperPositionManager.GetSlotFilter()
}
// SetSlotFilter 設置槽位過濾器
func (br *BotRuntime) SetSlotFilter(filter *config.SlotFilterConfig) {
if br.Inner == nil || br.Inner.SuperPositionManager == nil {
return
}
br.Inner.SuperPositionManager.SetSlotFilter(filter)
}
// GetSlots 獲取所有槽位信息
func (br *BotRuntime) GetSlots() []map[string]interface{} {
if br.Inner == nil || br.Inner.SuperPositionManager == nil {
return []map[string]interface{}{}
}
slots := br.Inner.SuperPositionManager.GetAllSlotsDetailed()
result := make([]map[string]interface{}, len(slots))
for i, slot := range slots {
result[i] = map[string]interface{}{
"price": slot.Price,
"position_status": slot.PositionStatus,
"position_qty": slot.PositionQty,
"order_id": slot.OrderID,
"order_side": slot.OrderSide,
"order_status": slot.OrderStatus,
"order_price": slot.OrderPrice,
"slot_status": slot.SlotStatus,
}
}
return result
}
// GetBotRiskControl 获取 Bot 风控配置
func (br *BotRuntime) GetBotRiskControl() *config.BotRiskControl {
if br.Config.OpenPositionControl.BotRiskControl == nil {
return &config.BotRiskControl{}
}
return br.Config.OpenPositionControl.BotRiskControl
}
// SetBotRiskControl 设置 Bot 风控配置
func (br *BotRuntime) SetBotRiskControl(riskControl *config.BotRiskControl) error {
br.configMu.Lock()
defer br.configMu.Unlock()
if br.Config.OpenPositionControl.BotRiskControl == nil {
br.Config.OpenPositionControl.BotRiskControl = &config.BotRiskControl{}
}
br.Config.OpenPositionControl.BotRiskControl = riskControl
return nil
}
// GetGridRiskControl 獲取網格風控配置
func (br *BotRuntime) GetGridRiskControl() config.GridRiskControl {
br.configMu.RLock()
defer br.configMu.RUnlock()
return br.Config.GridRiskControl
}
// SetGridRiskControl 設置網格風控配置(運行時熱更新 + 同步到 SuperPositionManager)
func (br *BotRuntime) SetGridRiskControl(grc config.GridRiskControl) error {
br.configMu.Lock()
defer br.configMu.Unlock()
br.Config.GridRiskControl = grc
if br.Inner != nil && br.Inner.SuperPositionManager != nil {
br.Inner.SuperPositionManager.SetGridRiskControl(grc)
}
return nil
}
// PauseOpening 暂停开仓
func (br *BotRuntime) PauseOpening(reason string) {
br.configMu.Lock()
defer br.configMu.Unlock()
// 更新 OpenPositionControl 中的 PauseOpening 状态
br.Config.OpenPositionControl.PauseOpening = true
// 同时更新 BotRiskControl 中的状态
if br.Config.OpenPositionControl.BotRiskControl == nil {
br.Config.OpenPositionControl.BotRiskControl = &config.BotRiskControl{}
}
br.Config.OpenPositionControl.BotRiskControl.PauseOpening = true
br.Config.OpenPositionControl.BotRiskControl.PauseOpeningReason = reason
br.Config.OpenPositionControl.BotRiskControl.Enabled = true
storage.AppendBotRiskControlEvent(br.BotID, "paused", reason, "config")
// 🔥 如果设置了自动恢复时间,启动自动恢复 goroutine
if br.Config.OpenPositionControl.BotRiskControl.AutoResumeAfter > 0 {
autoResumeSec := br.Config.OpenPositionControl.BotRiskControl.AutoResumeAfter
go br.autoResumeAfter(autoResumeSec)
}
}
// ResumeOpening 恢复开仓
func (br *BotRuntime) ResumeOpening() {
br.resumeOpening("config")
}
func (br *BotRuntime) resumeOpening(source string) {
br.configMu.Lock()
defer br.configMu.Unlock()
// 更新 OpenPositionControl 中的 PauseOpening 状态
br.Config.OpenPositionControl.PauseOpening = false
// 同时更新 BotRiskControl 中的状态
if br.Config.OpenPositionControl.BotRiskControl != nil {
br.Config.OpenPositionControl.BotRiskControl.PauseOpening = false
br.Config.OpenPositionControl.BotRiskControl.PauseOpeningReason = ""
}
storage.AppendBotRiskControlEvent(br.BotID, "resumed", "", source)
}
// GetPositionStatus 获取仓位状态(包括是否达到限制)
func (br *BotRuntime) GetPositionStatus() map[string]interface{} {
if br.Inner == nil || br.Inner.SuperPositionManager == nil {
return map[string]interface{}{
"error": "bot not initialized",
}
}
spm := br.Inner.SuperPositionManager
// 使用公开方法获取仓位信息
positionLayers := spm.GetActiveLayers()
totalPositionValue := spm.GetTotalPositionValueUSDT()
// 获取当前价格
currentPrice := spm.GetLastMarketPrice()
totalPositionQty := totalPositionValue / currentPrice
// 获取杠杆并计算实际占用资金
leverage := spm.GetLeverage()
if leverage <= 0 {
leverage = 1
}
actualMargin := totalPositionValue / float64(leverage)
// 读取风控配置(使用读锁保护)
br.configMu.RLock()
riskControl := br.Config.OpenPositionControl.BotRiskControl
openControl := br.Config.OpenPositionControl
br.configMu.RUnlock()
// 计算暂停状态(安全处理 nil 情况)
paused := openControl.PauseOpening
if riskControl != nil && riskControl.Enabled && riskControl.PauseOpening {
paused = true
}
status := map[string]interface{}{
"total_position_qty": totalPositionQty,
"total_position_value": totalPositionValue,
"total_actual_margin": actualMargin,
"leverage": leverage,
"position_layers": positionLayers,
"current_price": currentPrice,
"paused": paused,
}
// 检查是否达到数量限制
reachedLimitQty := false
if riskControl != nil && riskControl.Enabled && riskControl.MaxPositionQuantity > 0 {
status["max_position_qty"] = riskControl.MaxPositionQuantity
reachedLimitQty = totalPositionQty >= riskControl.MaxPositionQuantity
} else if openControl.MaxPositionQuantity > 0 {
status["max_position_qty"] = openControl.MaxPositionQuantity
reachedLimitQty = totalPositionQty >= openControl.MaxPositionQuantity
}
status["reached_limit_qty"] = reachedLimitQty
// 检查是否达到价值限制(使用实际占用资金)
reachedLimitValue := false
if riskControl != nil && riskControl.Enabled && riskControl.MaxPositionValue > 0 {
status["max_position_value"] = riskControl.MaxPositionValue
reachedLimitValue = actualMargin >= riskControl.MaxPositionValue
} else if openControl.MaxPositionValue > 0 {
status["max_position_value"] = openControl.MaxPositionValue
reachedLimitValue = actualMargin >= openControl.MaxPositionValue
}
status["reached_limit_value"] = reachedLimitValue
// 检查是否达到层数限制
reachedLimitLayers := false
if riskControl != nil && riskControl.Enabled && riskControl.MaxPositionLayers > 0 {
status["max_position_layers"] = riskControl.MaxPositionLayers
reachedLimitLayers = positionLayers >= riskControl.MaxPositionLayers
} else if openControl.MaxPositionLayers > 0 {
status["max_position_layers"] = openControl.MaxPositionLayers
reachedLimitLayers = positionLayers >= openControl.MaxPositionLayers
}
status["reached_limit_layers"] = reachedLimitLayers
// 是否应该停止开仓
status["should_stop_opening"] = reachedLimitQty || reachedLimitValue || reachedLimitLayers || paused
return status
}
// CancelAllOpenOrders 取消所有开仓订单
func (br *BotRuntime) CancelAllOpenOrders() error {
if br.Inner == nil || br.Inner.SuperPositionManager == nil {
return fmt.Errorf("bot not initialized")
}