From 8c8fdcde9a6b6f40a83870981aefee65f9521f31 Mon Sep 17 00:00:00 2001 From: Mylo Fawcett Date: Sun, 29 Jun 2025 20:52:59 +0200 Subject: [PATCH 1/4] Skip recently processed nodes in crawler --- internal/dhtcrawler/crawler.go | 7 +-- internal/dhtcrawler/discovered_nodes.go | 13 ++++-- internal/dhtcrawler/factory.go | 5 ++- internal/protocol/dht/ktable/mocks/Table.go | 48 --------------------- internal/protocol/dht/ktable/query.go | 19 -------- internal/protocol/dht/ktable/table.go | 10 ----- 6 files changed, 17 insertions(+), 85 deletions(-) diff --git a/internal/dhtcrawler/crawler.go b/internal/dhtcrawler/crawler.go index 988085724..2c74026e5 100644 --- a/internal/dhtcrawler/crawler.go +++ b/internal/dhtcrawler/crawler.go @@ -48,7 +48,8 @@ type crawler struct { // containing every hash it has already encountered. // This avoids multiple attempts to crawl the same hash, and takes a lot of load off the database query // that checks if a hash has already been indexed. - ignoreHashes *ignoreHashes + ignoreHashes *ignoreFilter + ignoreNodes *ignoreFilter blockingManager blocking.Manager // soughtNodeID is a random node ID used as the target for find_node and sample_infohashes requests. // It is rotated every 10 seconds. @@ -101,12 +102,12 @@ type infoHashWithScrape struct { bfpe bloom.Filter } -type ignoreHashes struct { +type ignoreFilter struct { mutex sync.Mutex bloom *boom.StableBloomFilter } -func (i *ignoreHashes) testAndAdd(id protocol.ID) bool { +func (i *ignoreFilter) testAndAdd(id protocol.ID) bool { i.mutex.Lock() defer i.mutex.Unlock() diff --git a/internal/dhtcrawler/discovered_nodes.go b/internal/dhtcrawler/discovered_nodes.go index 7154fd37b..4b0de5683 100644 --- a/internal/dhtcrawler/discovered_nodes.go +++ b/internal/dhtcrawler/discovered_nodes.go @@ -45,11 +45,16 @@ func (c *crawler) runDiscoveredNodes(ctx context.Context) { addrs = append(addrs, p.Addr().Addr()) } } - // for any discovered node not already in the routing table, - // we will block until it can be sent to any one of the pipeline channels. - unknownAddrs := c.kTable.FilterKnownAddrs(addrs) - for _, addr := range unknownAddrs { + + // For any newly discovered node, we will block until it can be + // sent to any one of the pipeline channels. + for _, addr := range addrs { p := m[addr.String()] + + if c.ignoreNodes.testAndAdd(p.ID()) { + continue + } + select { case <-ctx.Done(): return diff --git a/internal/dhtcrawler/factory.go b/internal/dhtcrawler/factory.go index 412fc0413..18794702d 100644 --- a/internal/dhtcrawler/factory.go +++ b/internal/dhtcrawler/factory.go @@ -117,9 +117,12 @@ func New(params Params) Result { savePieces: params.Config.SavePieces, rescrapeThreshold: params.Config.RescrapeThreshold, dao: query, - ignoreHashes: &ignoreHashes{ + ignoreHashes: &ignoreFilter{ bloom: boom.NewStableBloomFilter(10_000_000, 2, 0.001), }, + ignoreNodes: &ignoreFilter{ + bloom: boom.NewStableBloomFilter(200_000*uint(scalingFactor), 2, 0.001), + }, blockingManager: blockingManager, soughtNodeID: &concurrency.AtomicValue[protocol.ID]{}, stopped: make(chan struct{}), diff --git a/internal/protocol/dht/ktable/mocks/Table.go b/internal/protocol/dht/ktable/mocks/Table.go index be0fd559d..10373284d 100644 --- a/internal/protocol/dht/ktable/mocks/Table.go +++ b/internal/protocol/dht/ktable/mocks/Table.go @@ -121,54 +121,6 @@ func (_c *Table_DropNode_Call) RunAndReturn(run func(protocol.ID, error) bool) * return _c } -// FilterKnownAddrs provides a mock function with given fields: addrs -func (_m *Table) FilterKnownAddrs(addrs []netip.Addr) []netip.Addr { - ret := _m.Called(addrs) - - if len(ret) == 0 { - panic("no return value specified for FilterKnownAddrs") - } - - var r0 []netip.Addr - if rf, ok := ret.Get(0).(func([]netip.Addr) []netip.Addr); ok { - r0 = rf(addrs) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]netip.Addr) - } - } - - return r0 -} - -// Table_FilterKnownAddrs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FilterKnownAddrs' -type Table_FilterKnownAddrs_Call struct { - *mock.Call -} - -// FilterKnownAddrs is a helper method to define mock.On call -// - addrs []netip.Addr -func (_e *Table_Expecter) FilterKnownAddrs(addrs interface{}) *Table_FilterKnownAddrs_Call { - return &Table_FilterKnownAddrs_Call{Call: _e.mock.On("FilterKnownAddrs", addrs)} -} - -func (_c *Table_FilterKnownAddrs_Call) Run(run func(addrs []netip.Addr)) *Table_FilterKnownAddrs_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].([]netip.Addr)) - }) - return _c -} - -func (_c *Table_FilterKnownAddrs_Call) Return(_a0 []netip.Addr) *Table_FilterKnownAddrs_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *Table_FilterKnownAddrs_Call) RunAndReturn(run func([]netip.Addr) []netip.Addr) *Table_FilterKnownAddrs_Call { - _c.Call.Return(run) - return _c -} - // GetClosestNodes provides a mock function with given fields: id func (_m *Table) GetClosestNodes(id protocol.ID) []ktable.Node { ret := _m.Called(id) diff --git a/internal/protocol/dht/ktable/query.go b/internal/protocol/dht/ktable/query.go index a24b27dda..270158948 100644 --- a/internal/protocol/dht/ktable/query.go +++ b/internal/protocol/dht/ktable/query.go @@ -1,7 +1,6 @@ package ktable import ( - "net/netip" "sort" "time" ) @@ -40,24 +39,6 @@ func (c GetOldestPeers) execReturn(t *table) []Node { return peers } -var _ Query[[]netip.Addr] = FilterKnownAddrs{} - -type FilterKnownAddrs struct { - Addrs []netip.Addr -} - -func (c FilterKnownAddrs) execReturn(t *table) []netip.Addr { - var unknown []netip.Addr - - for _, addr := range c.Addrs { - if _, ok := t.addrs.addrs[addr.String()]; !ok { - unknown = append(unknown, addr) - } - } - - return unknown -} - var _ Query[[]Node] = GetNodesForSampleInfoHashes{} type GetNodesForSampleInfoHashes struct { diff --git a/internal/protocol/dht/ktable/table.go b/internal/protocol/dht/ktable/table.go index 9e7c9d926..7befd1059 100644 --- a/internal/protocol/dht/ktable/table.go +++ b/internal/protocol/dht/ktable/table.go @@ -25,7 +25,6 @@ type TableQuery interface { GetClosestNodes(id ID) []Node GetOldestNodes(cutoff time.Time, n int) []Node GetNodesForSampleInfoHashes(n int) []Node - FilterKnownAddrs(addrs []netip.Addr) []netip.Addr GetHashOrClosestNodes(id ID) GetHashOrClosestNodesResult // SampleHashesAndNodes returns a random sample of up to 8 hashes and nodes, and the total hashes count. SampleHashesAndNodes() SampleHashesAndNodesResult @@ -133,15 +132,6 @@ func (t *table) GetNodesForSampleInfoHashes(n int) []Node { }.execReturn(t) } -func (t *table) FilterKnownAddrs(addrs []netip.Addr) []netip.Addr { - t.mutex.RLock() - defer t.mutex.RUnlock() - - return FilterKnownAddrs{ - Addrs: addrs, - }.execReturn(t) -} - func (t *table) GetHashOrClosestNodes(id ID) GetHashOrClosestNodesResult { t.mutex.RLock() defer t.mutex.RUnlock() From 61e92b7edc6549d0c12956a02828abb62438ca1f Mon Sep 17 00:00:00 2001 From: Mylo Fawcett Date: Sun, 29 Jun 2025 21:09:17 +0200 Subject: [PATCH 2/4] Only ignore nodes once they have had their infohashes sampled --- internal/dhtcrawler/crawler.go | 14 ++++++++++++++ internal/dhtcrawler/discovered_nodes.go | 3 ++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/internal/dhtcrawler/crawler.go b/internal/dhtcrawler/crawler.go index 2c74026e5..397658dd9 100644 --- a/internal/dhtcrawler/crawler.go +++ b/internal/dhtcrawler/crawler.go @@ -114,6 +114,20 @@ func (i *ignoreFilter) testAndAdd(id protocol.ID) bool { return i.bloom.TestAndAdd(id[:]) } +func (i *ignoreFilter) test(id protocol.ID) bool { + i.mutex.Lock() + defer i.mutex.Unlock() + + return i.bloom.Test(id[:]) +} + +func (i *ignoreFilter) add(id protocol.ID) { + i.mutex.Lock() + defer i.mutex.Unlock() + + i.bloom.Add(id[:]) +} + func (c *crawler) rotateSoughtNodeID(ctx context.Context) { for { select { diff --git a/internal/dhtcrawler/discovered_nodes.go b/internal/dhtcrawler/discovered_nodes.go index 4b0de5683..c4d27c0bf 100644 --- a/internal/dhtcrawler/discovered_nodes.go +++ b/internal/dhtcrawler/discovered_nodes.go @@ -51,7 +51,7 @@ func (c *crawler) runDiscoveredNodes(ctx context.Context) { for _, addr := range addrs { p := m[addr.String()] - if c.ignoreNodes.testAndAdd(p.ID()) { + if c.ignoreNodes.test(p.ID()) { continue } @@ -60,6 +60,7 @@ func (c *crawler) runDiscoveredNodes(ctx context.Context) { return case c.nodesForFindNode.In() <- p: case c.nodesForSampleInfoHashes.In() <- p: + c.ignoreNodes.add(p.ID()) case c.nodesForPing.In() <- p: } } From 60cb2f38f3ed5b7837554be19e365c7043d6f8bf Mon Sep 17 00:00:00 2001 From: Mylo Fawcett Date: Tue, 1 Jul 2025 00:14:21 +0200 Subject: [PATCH 3/4] Add more metrics to the DHT crawler --- internal/dhtcrawler/crawler.go | 22 ++- internal/dhtcrawler/discovered_nodes.go | 26 ++-- internal/dhtcrawler/factory.go | 173 +++++++++++++++++++++-- internal/dhtcrawler/find_node.go | 2 + internal/dhtcrawler/get_peers.go | 14 +- internal/dhtcrawler/infohash_triage.go | 13 +- internal/dhtcrawler/request_meta_info.go | 4 + internal/dhtcrawler/sample_infohashes.go | 17 ++- internal/dhtcrawler/scrape.go | 14 +- 9 files changed, 250 insertions(+), 35 deletions(-) diff --git a/internal/dhtcrawler/crawler.go b/internal/dhtcrawler/crawler.go index 397658dd9..283aefd9f 100644 --- a/internal/dhtcrawler/crawler.go +++ b/internal/dhtcrawler/crawler.go @@ -53,10 +53,24 @@ type crawler struct { blockingManager blocking.Manager // soughtNodeID is a random node ID used as the target for find_node and sample_infohashes requests. // It is rotated every 10 seconds. - soughtNodeID *concurrency.AtomicValue[protocol.ID] - stopped chan struct{} - persistedTotal *prometheus.CounterVec - logger *zap.SugaredLogger + soughtNodeID *concurrency.AtomicValue[protocol.ID] + stopped chan struct{} + persistedTotal *prometheus.CounterVec + discoveredNodesTotal *prometheus.CounterVec + findNodesCount prometheus.Histogram + getPeersPeerCount prometheus.Histogram + getPeersNodeCount prometheus.Histogram + getPeersNodeTotal *prometheus.CounterVec + requestMetaInfoTotal *prometheus.CounterVec + infohashTriageTotal *prometheus.CounterVec + sampleInfohashesHashCount prometheus.Histogram + sampleInfohashesHashTotal *prometheus.CounterVec + sampleInfohashesNodeCount prometheus.Histogram + sampleInfohashesNodeTotal *prometheus.CounterVec + scrapePeerCount prometheus.Histogram + scrapeNodeCount prometheus.Histogram + scrapeNodeTotal *prometheus.CounterVec + logger *zap.SugaredLogger } func (c *crawler) start() { diff --git a/internal/dhtcrawler/discovered_nodes.go b/internal/dhtcrawler/discovered_nodes.go index c4d27c0bf..dd2904601 100644 --- a/internal/dhtcrawler/discovered_nodes.go +++ b/internal/dhtcrawler/discovered_nodes.go @@ -7,6 +7,7 @@ import ( "github.com/bitmagnet-io/bitmagnet/internal/concurrency" "github.com/bitmagnet-io/bitmagnet/internal/protocol/dht/ktable" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/fx" ) @@ -51,18 +52,25 @@ func (c *crawler) runDiscoveredNodes(ctx context.Context) { for _, addr := range addrs { p := m[addr.String()] + var result string + if c.ignoreNodes.test(p.ID()) { - continue + result = "ignored" + } else { + select { + case <-ctx.Done(): + return + case c.nodesForFindNode.In() <- p: + result = "find_node" + case c.nodesForSampleInfoHashes.In() <- p: + c.ignoreNodes.add(p.ID()) + result = "sample_infohashes" + case c.nodesForPing.In() <- p: + result = "ping" + } } - select { - case <-ctx.Done(): - return - case c.nodesForFindNode.In() <- p: - case c.nodesForSampleInfoHashes.In() <- p: - c.ignoreNodes.add(p.ID()) - case c.nodesForPing.In() <- p: - } + c.discoveredNodesTotal.With(prometheus.Labels{"result": result}).Inc() } } } diff --git a/internal/dhtcrawler/factory.go b/internal/dhtcrawler/factory.go index 18794702d..a6c2f28f8 100644 --- a/internal/dhtcrawler/factory.go +++ b/internal/dhtcrawler/factory.go @@ -41,21 +41,146 @@ type Result struct { DhtCrawlerActive *concurrency.AtomicValue[bool] `name:"dht_crawler_active"` - PersistedTotal prometheus.Collector `group:"prometheus_collectors"` + PersistedTotal prometheus.Collector `group:"prometheus_collectors"` + DiscoveredNodesTotal prometheus.Collector `group:"prometheus_collectors"` + FindNodesCount prometheus.Collector `group:"prometheus_collectors"` + GetPeersPeerCount prometheus.Collector `group:"prometheus_collectors"` + GetPeersNodeCount prometheus.Collector `group:"prometheus_collectors"` + GetPeersNodeTotal prometheus.Collector `group:"prometheus_collectors"` + RequestMetaInfoTotal prometheus.Collector `group:"prometheus_collectors"` + InfohashTriageTotal prometheus.Collector `group:"prometheus_collectors"` + SampleInfohashesHashCount prometheus.Collector `group:"prometheus_collectors"` + SampleInfohashesHashTotal prometheus.Collector `group:"prometheus_collectors"` + SampleInfohashesNodeCount prometheus.Collector `group:"prometheus_collectors"` + SampleInfohashesNodeTotal prometheus.Collector `group:"prometheus_collectors"` + ScrapePeerCount prometheus.Collector `group:"prometheus_collectors"` + ScrapeNodeCount prometheus.Collector `group:"prometheus_collectors"` + ScrapeNodeTotal prometheus.Collector `group:"prometheus_collectors"` } +const ( + namespace = "bitmagnet" + subsystem = "dht_crawler" +) + func New(params Params) Result { active := &concurrency.AtomicValue[bool]{} var c crawler persistedTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "bitmagnet", - Subsystem: "dht_crawler", + Namespace: namespace, + Subsystem: subsystem, Name: "persisted_total", Help: "A counter of persisted database entities.", }, []string{"entity"}) + discoveredNodesTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "discovered_nodes_total", + Help: "Total number of nodes the crawler discovers.", + }, []string{"result"}) + + findNodesCount := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "find_nodes_count", + Help: "Number of nodes found by find_node requests.", + // Spec compliant DHT nodes should never return more than 8 nodes per find_node request. + Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8}, + }) + + getPeersPeerCount := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "get_peers_peer_count", + Help: "Number of peers found by get_peers requests.", + Buckets: []float64{0, 1, 2, 5, 10, 20, 50, 100}, + }) + + getPeersNodeCount := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "get_peers_node_count", + Help: "Number of nodes found by get_peers requests.", + Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8}, + }) + + getPeersNodeTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "get_peers_node_total", + Help: "Total number of nodes found by get_peers requests.", + }, []string{"result"}) + + requestMetaInfoTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "request_metainfo_total", + Help: "Total number of metainfo requests.", + }, []string{"result"}) + + infohashTriageTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "infohash_triage_total", + Help: "Total number of triaged infohashes.", + }, []string{"result"}) + + sampleInfohashesHashCount := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sample_infohashes_hash_count", + Help: "Number of infohashes found by sample_infohashes requests.", + Buckets: []float64{0, 1, 2, 5, 10, 15, 20}, + }) + + sampleInfohashesHashTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sample_infohashes_hash_total", + Help: "Total number of infohashes found by sample_infohashes requests.", + }, []string{"result"}) + + sampleInfohashesNodeCount := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sample_infohashes_node_count", + Help: "Number of nodes found by sample_infohashes requests.", + Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8}, + }) + + sampleInfohashesNodeTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sample_infohashes_node_total", + Help: "Total number of nodes found by sample_infohashes requests.", + }, []string{"result"}) + + scrapePeerCount := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "scrape_peer_count", + Help: "Number of peers found by scrape requests.", + Buckets: []float64{0, 1, 2, 5, 10, 20, 50, 100}, + }) + + scrapeNodeCount := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "scrape_node_count", + Help: "Number of nodes found by scrape requests.", + Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8}, + }) + + scrapeNodeTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "scrape_node_total", + Help: "Total number of nodes found by scrape requests.", + }, []string{"result"}) + return Result{ Worker: worker.NewWorker( "dht_crawler", @@ -123,11 +248,25 @@ func New(params Params) Result { ignoreNodes: &ignoreFilter{ bloom: boom.NewStableBloomFilter(200_000*uint(scalingFactor), 2, 0.001), }, - blockingManager: blockingManager, - soughtNodeID: &concurrency.AtomicValue[protocol.ID]{}, - stopped: make(chan struct{}), - persistedTotal: persistedTotal, - logger: params.Logger.Named("dht_crawler"), + blockingManager: blockingManager, + soughtNodeID: &concurrency.AtomicValue[protocol.ID]{}, + stopped: make(chan struct{}), + persistedTotal: persistedTotal, + discoveredNodesTotal: discoveredNodesTotal, + findNodesCount: findNodesCount, + getPeersPeerCount: getPeersPeerCount, + getPeersNodeCount: getPeersNodeCount, + getPeersNodeTotal: getPeersNodeTotal, + requestMetaInfoTotal: requestMetaInfoTotal, + infohashTriageTotal: infohashTriageTotal, + sampleInfohashesHashCount: sampleInfohashesHashCount, + sampleInfohashesHashTotal: sampleInfohashesHashTotal, + sampleInfohashesNodeCount: sampleInfohashesNodeCount, + sampleInfohashesNodeTotal: sampleInfohashesNodeTotal, + scrapePeerCount: scrapePeerCount, + scrapeNodeCount: scrapeNodeCount, + scrapeNodeTotal: scrapeNodeTotal, + logger: params.Logger.Named("dht_crawler"), } c.soughtNodeID.Set(protocol.RandomNodeID()) @@ -145,7 +284,21 @@ func New(params Params) Result { }, }, ), - PersistedTotal: persistedTotal, - DhtCrawlerActive: active, + PersistedTotal: persistedTotal, + DiscoveredNodesTotal: discoveredNodesTotal, + FindNodesCount: findNodesCount, + GetPeersPeerCount: getPeersPeerCount, + GetPeersNodeCount: getPeersNodeCount, + GetPeersNodeTotal: getPeersNodeTotal, + RequestMetaInfoTotal: requestMetaInfoTotal, + InfohashTriageTotal: infohashTriageTotal, + SampleInfohashesHashCount: sampleInfohashesHashCount, + SampleInfohashesHashTotal: sampleInfohashesHashTotal, + SampleInfohashesNodeCount: sampleInfohashesNodeCount, + SampleInfohashesNodeTotal: sampleInfohashesNodeTotal, + ScrapePeerCount: scrapePeerCount, + ScrapeNodeCount: scrapeNodeCount, + ScrapeNodeTotal: scrapeNodeTotal, + DhtCrawlerActive: active, } } diff --git a/internal/dhtcrawler/find_node.go b/internal/dhtcrawler/find_node.go index e41a43a87..7743be3d9 100644 --- a/internal/dhtcrawler/find_node.go +++ b/internal/dhtcrawler/find_node.go @@ -33,6 +33,8 @@ func (c *crawler) runFindNode(ctx context.Context) { Reason: fmt.Errorf("find_node failed: %w", err), }) } else { + c.findNodesCount.Observe(float64(len(res.Nodes))) + c.kTable.BatchCommand(ktable.PutNode{ ID: p.ID(), Addr: p.Addr(), diff --git a/internal/dhtcrawler/get_peers.go b/internal/dhtcrawler/get_peers.go index 97e190843..67710cffe 100644 --- a/internal/dhtcrawler/get_peers.go +++ b/internal/dhtcrawler/get_peers.go @@ -8,6 +8,7 @@ import ( "time" "github.com/bitmagnet-io/bitmagnet/internal/protocol/dht/ktable" + "github.com/prometheus/client_golang/prometheus" ) func (c *crawler) runGetPeers(ctx context.Context) { @@ -62,19 +63,28 @@ func (c *crawler) requestPeersForHash( Options: []ktable.NodeOption{ktable.NodeResponded()}, }) + c.getPeersPeerCount.Observe(float64(len(res.Values))) + c.getPeersNodeCount.Observe(float64(len(res.Nodes))) + if len(res.Nodes) > 0 { // block the channel for up to a second in an attempt to add the nodes to the discoveredNodes channel cancelCtx, cancel := context.WithTimeout(ctx, time.Second) + processed := 0 + + nodes: for _, n := range res.Nodes { select { case <-cancelCtx.Done(): - break + break nodes case c.discoveredNodes.In() <- ktable.NewNode(n.ID, n.Addr): - continue + processed++ } } + c.getPeersNodeTotal.With(prometheus.Labels{"result": "discovered_nodes"}).Add(float64(processed)) + c.getPeersNodeTotal.With(prometheus.Labels{"result": "skipped"}).Add(float64(len(res.Nodes) - processed)) + cancel() } diff --git a/internal/dhtcrawler/infohash_triage.go b/internal/dhtcrawler/infohash_triage.go index cc142e4a0..2555da747 100644 --- a/internal/dhtcrawler/infohash_triage.go +++ b/internal/dhtcrawler/infohash_triage.go @@ -7,6 +7,7 @@ import ( "github.com/bitmagnet-io/bitmagnet/internal/model" "github.com/bitmagnet-io/bitmagnet/internal/protocol" + "github.com/prometheus/client_golang/prometheus" ) // runInfoHashTriage receives discovered hashes on the infoHashTriage channel, determines if they should be crawled, @@ -30,6 +31,7 @@ func (c *crawler) runInfoHashTriage(ctx context.Context) { reqMap := make(map[protocol.ID]nodeHasPeersForHash, len(reqs)) for _, r := range reqs { if _, ok := reqMap[r.infoHash]; ok { + c.infohashTriageTotal.With(prometheus.Labels{"result": "duplicate"}).Inc() continue } @@ -43,16 +45,15 @@ func (c *crawler) runInfoHashTriage(ctx context.Context) { break } + c.infohashTriageTotal.With(prometheus.Labels{"result": "blocked"}).Add(float64(len(allHashes) - len(filteredHashes))) + if len(filteredHashes) == 0 { break } - filteredHashMap := make(map[protocol.ID]struct{}, len(filteredHashes)) valuers := make([]driver.Valuer, 0, len(filteredHashes)) for _, h := range filteredHashes { - filteredHashMap[h] = struct{}{} - valuers = append(valuers, h) } @@ -80,7 +81,7 @@ func (c *crawler) runInfoHashTriage(ctx context.Context) { foundTorrents[t.InfoHash] = *t } - for h := range filteredHashMap { + for _, h := range filteredHashes { r := reqMap[h] if t, ok := foundTorrents[r.infoHash]; !ok || t.FilesStatus == model.FilesStatusNoInfo || @@ -90,7 +91,7 @@ func (c *crawler) runInfoHashTriage(ctx context.Context) { case <-ctx.Done(): return case c.getPeers.In() <- r: - continue + c.infohashTriageTotal.With(prometheus.Labels{"result": "get_peers"}).Inc() } } else if (!t.Seeders.Valid || !t.Leechers.Valid) || t.UpdatedAt.Before(time.Now().Add(-c.rescrapeThreshold)) { @@ -98,7 +99,7 @@ func (c *crawler) runInfoHashTriage(ctx context.Context) { case <-ctx.Done(): return case c.scrape.In() <- r: - continue + c.infohashTriageTotal.With(prometheus.Labels{"result": "scrape"}).Inc() } } } diff --git a/internal/dhtcrawler/request_meta_info.go b/internal/dhtcrawler/request_meta_info.go index 492964279..78934fbb1 100644 --- a/internal/dhtcrawler/request_meta_info.go +++ b/internal/dhtcrawler/request_meta_info.go @@ -8,6 +8,7 @@ import ( "github.com/bitmagnet-io/bitmagnet/internal/protocol" "github.com/bitmagnet-io/bitmagnet/internal/protocol/metainfo/metainforequester" + "github.com/prometheus/client_golang/prometheus" ) func (c *crawler) runRequestMetaInfo(ctx context.Context) { @@ -49,11 +50,14 @@ func (c *crawler) doRequestMetaInfo( if banErr := c.banningChecker.Check(res.Info); banErr != nil { _ = c.blockingManager.Block(ctx, []protocol.ID{hash}, false) + c.requestMetaInfoTotal.With(prometheus.Labels{"result": "blocked"}).Inc() return metainforequester.Response{}, banErr } + c.requestMetaInfoTotal.With(prometheus.Labels{"result": "persist_torrents"}).Inc() return res, nil } + c.requestMetaInfoTotal.With(prometheus.Labels{"result": "error"}).Inc() return metainforequester.Response{}, errors.Join(errs...) } diff --git a/internal/dhtcrawler/sample_infohashes.go b/internal/dhtcrawler/sample_infohashes.go index a965cf159..f34a1b890 100644 --- a/internal/dhtcrawler/sample_infohashes.go +++ b/internal/dhtcrawler/sample_infohashes.go @@ -6,6 +6,7 @@ import ( "time" "github.com/bitmagnet-io/bitmagnet/internal/protocol/dht/ktable" + "github.com/prometheus/client_golang/prometheus" ) func (c *crawler) getNodesForSampleInfoHashes(ctx context.Context) { @@ -39,6 +40,9 @@ func (c *crawler) runSampleInfoHashes(ctx context.Context) { return } + c.sampleInfohashesHashCount.Observe(float64(len(res.Samples))) + c.sampleInfohashesNodeCount.Observe(float64(len(res.Nodes))) + var discoveredHashes []nodeHasPeersForHash for _, s := range res.Samples { @@ -47,6 +51,9 @@ func (c *crawler) runSampleInfoHashes(ctx context.Context) { infoHash: s, node: n.Addr(), }) + c.sampleInfohashesHashTotal.With(prometheus.Labels{"result": "infohash_triage"}).Inc() + } else { + c.sampleInfohashesHashTotal.With(prometheus.Labels{"result": "skipped"}).Inc() } } @@ -83,14 +90,20 @@ func (c *crawler) runSampleInfoHashes(ctx context.Context) { timeoutCtx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() + processed := 0 + + nodes: for _, n := range res.Nodes { select { case <-timeoutCtx.Done(): - return + break nodes case c.discoveredNodes.In() <- ktable.NewNode(n.ID, n.Addr): - continue + processed++ } } + + c.sampleInfohashesNodeTotal.With(prometheus.Labels{"result": "discovered_nodes"}).Add(float64(processed)) + c.sampleInfohashesNodeTotal.With(prometheus.Labels{"result": "skipped"}).Add(float64(len(res.Nodes) - processed)) }() } }) diff --git a/internal/dhtcrawler/scrape.go b/internal/dhtcrawler/scrape.go index 0094eb624..8e8885506 100644 --- a/internal/dhtcrawler/scrape.go +++ b/internal/dhtcrawler/scrape.go @@ -6,6 +6,7 @@ import ( "time" "github.com/bitmagnet-io/bitmagnet/internal/protocol/dht/ktable" + "github.com/prometheus/client_golang/prometheus" ) func (c *crawler) runScrape(ctx context.Context) { @@ -47,18 +48,27 @@ func (c *crawler) requestScrape( Options: []ktable.NodeOption{ktable.NodeResponded()}, }) + c.scrapePeerCount.Observe(float64(len(res.Values))) + c.scrapeNodeCount.Observe(float64(len(res.Nodes))) + if len(res.Nodes) > 0 { cancelCtx, cancel := context.WithTimeout(ctx, time.Second) + processed := 0 + + nodes: for _, n := range res.Nodes { select { case <-cancelCtx.Done(): - break + break nodes case c.discoveredNodes.In() <- ktable.NewNode(n.ID, n.Addr): - continue + processed++ } } + c.scrapeNodeTotal.With(prometheus.Labels{"result": "discovered_nodes"}).Add(float64(processed)) + c.scrapeNodeTotal.With(prometheus.Labels{"result": "skipped"}).Add(float64(len(res.Nodes) - processed)) + cancel() } From ac2831faa7e2a48b8ecf8c9dc56f5d99f2f5e0e9 Mon Sep 17 00:00:00 2001 From: Mylo Fawcett Date: Tue, 1 Jul 2025 13:16:37 +0200 Subject: [PATCH 4/4] Remove scrape peer count --- internal/dhtcrawler/crawler.go | 1 - internal/dhtcrawler/factory.go | 11 ----------- internal/dhtcrawler/scrape.go | 1 - 3 files changed, 13 deletions(-) diff --git a/internal/dhtcrawler/crawler.go b/internal/dhtcrawler/crawler.go index 283aefd9f..628731d82 100644 --- a/internal/dhtcrawler/crawler.go +++ b/internal/dhtcrawler/crawler.go @@ -67,7 +67,6 @@ type crawler struct { sampleInfohashesHashTotal *prometheus.CounterVec sampleInfohashesNodeCount prometheus.Histogram sampleInfohashesNodeTotal *prometheus.CounterVec - scrapePeerCount prometheus.Histogram scrapeNodeCount prometheus.Histogram scrapeNodeTotal *prometheus.CounterVec logger *zap.SugaredLogger diff --git a/internal/dhtcrawler/factory.go b/internal/dhtcrawler/factory.go index a6c2f28f8..20a3e3b74 100644 --- a/internal/dhtcrawler/factory.go +++ b/internal/dhtcrawler/factory.go @@ -53,7 +53,6 @@ type Result struct { SampleInfohashesHashTotal prometheus.Collector `group:"prometheus_collectors"` SampleInfohashesNodeCount prometheus.Collector `group:"prometheus_collectors"` SampleInfohashesNodeTotal prometheus.Collector `group:"prometheus_collectors"` - ScrapePeerCount prometheus.Collector `group:"prometheus_collectors"` ScrapeNodeCount prometheus.Collector `group:"prometheus_collectors"` ScrapeNodeTotal prometheus.Collector `group:"prometheus_collectors"` } @@ -158,14 +157,6 @@ func New(params Params) Result { Help: "Total number of nodes found by sample_infohashes requests.", }, []string{"result"}) - scrapePeerCount := prometheus.NewHistogram(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "scrape_peer_count", - Help: "Number of peers found by scrape requests.", - Buckets: []float64{0, 1, 2, 5, 10, 20, 50, 100}, - }) - scrapeNodeCount := prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, @@ -263,7 +254,6 @@ func New(params Params) Result { sampleInfohashesHashTotal: sampleInfohashesHashTotal, sampleInfohashesNodeCount: sampleInfohashesNodeCount, sampleInfohashesNodeTotal: sampleInfohashesNodeTotal, - scrapePeerCount: scrapePeerCount, scrapeNodeCount: scrapeNodeCount, scrapeNodeTotal: scrapeNodeTotal, logger: params.Logger.Named("dht_crawler"), @@ -296,7 +286,6 @@ func New(params Params) Result { SampleInfohashesHashTotal: sampleInfohashesHashTotal, SampleInfohashesNodeCount: sampleInfohashesNodeCount, SampleInfohashesNodeTotal: sampleInfohashesNodeTotal, - ScrapePeerCount: scrapePeerCount, ScrapeNodeCount: scrapeNodeCount, ScrapeNodeTotal: scrapeNodeTotal, DhtCrawlerActive: active, diff --git a/internal/dhtcrawler/scrape.go b/internal/dhtcrawler/scrape.go index 8e8885506..e08ccc177 100644 --- a/internal/dhtcrawler/scrape.go +++ b/internal/dhtcrawler/scrape.go @@ -48,7 +48,6 @@ func (c *crawler) requestScrape( Options: []ktable.NodeOption{ktable.NodeResponded()}, }) - c.scrapePeerCount.Observe(float64(len(res.Values))) c.scrapeNodeCount.Observe(float64(len(res.Nodes))) if len(res.Nodes) > 0 {