diff --git a/internal/dhtcrawler/crawler.go b/internal/dhtcrawler/crawler.go index 988085724..628731d82 100644 --- a/internal/dhtcrawler/crawler.go +++ b/internal/dhtcrawler/crawler.go @@ -48,14 +48,28 @@ type crawler struct { // containing every hash it has already encountered. // This avoids multiple attempts to crawl the same hash, and takes a lot of load off the database query // that checks if a hash has already been indexed. - ignoreHashes *ignoreHashes + ignoreHashes *ignoreFilter + ignoreNodes *ignoreFilter blockingManager blocking.Manager // soughtNodeID is a random node ID used as the target for find_node and sample_infohashes requests. // It is rotated every 10 seconds. - soughtNodeID *concurrency.AtomicValue[protocol.ID] - stopped chan struct{} - persistedTotal *prometheus.CounterVec - logger *zap.SugaredLogger + soughtNodeID *concurrency.AtomicValue[protocol.ID] + stopped chan struct{} + persistedTotal *prometheus.CounterVec + discoveredNodesTotal *prometheus.CounterVec + findNodesCount prometheus.Histogram + getPeersPeerCount prometheus.Histogram + getPeersNodeCount prometheus.Histogram + getPeersNodeTotal *prometheus.CounterVec + requestMetaInfoTotal *prometheus.CounterVec + infohashTriageTotal *prometheus.CounterVec + sampleInfohashesHashCount prometheus.Histogram + sampleInfohashesHashTotal *prometheus.CounterVec + sampleInfohashesNodeCount prometheus.Histogram + sampleInfohashesNodeTotal *prometheus.CounterVec + scrapeNodeCount prometheus.Histogram + scrapeNodeTotal *prometheus.CounterVec + logger *zap.SugaredLogger } func (c *crawler) start() { @@ -101,18 +115,32 @@ type infoHashWithScrape struct { bfpe bloom.Filter } -type ignoreHashes struct { +type ignoreFilter struct { mutex sync.Mutex bloom *boom.StableBloomFilter } -func (i *ignoreHashes) testAndAdd(id protocol.ID) bool { +func (i *ignoreFilter) testAndAdd(id protocol.ID) bool { i.mutex.Lock() defer i.mutex.Unlock() return i.bloom.TestAndAdd(id[:]) } +func (i *ignoreFilter) test(id protocol.ID) bool { + i.mutex.Lock() + defer i.mutex.Unlock() + + return i.bloom.Test(id[:]) +} + +func (i *ignoreFilter) add(id protocol.ID) { + i.mutex.Lock() + defer i.mutex.Unlock() + + i.bloom.Add(id[:]) +} + func (c *crawler) rotateSoughtNodeID(ctx context.Context) { for { select { diff --git a/internal/dhtcrawler/discovered_nodes.go b/internal/dhtcrawler/discovered_nodes.go index 7154fd37b..dd2904601 100644 --- a/internal/dhtcrawler/discovered_nodes.go +++ b/internal/dhtcrawler/discovered_nodes.go @@ -7,6 +7,7 @@ import ( "github.com/bitmagnet-io/bitmagnet/internal/concurrency" "github.com/bitmagnet-io/bitmagnet/internal/protocol/dht/ktable" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/fx" ) @@ -45,18 +46,31 @@ func (c *crawler) runDiscoveredNodes(ctx context.Context) { addrs = append(addrs, p.Addr().Addr()) } } - // for any discovered node not already in the routing table, - // we will block until it can be sent to any one of the pipeline channels. - unknownAddrs := c.kTable.FilterKnownAddrs(addrs) - for _, addr := range unknownAddrs { + + // For any newly discovered node, we will block until it can be + // sent to any one of the pipeline channels. + for _, addr := range addrs { p := m[addr.String()] - select { - case <-ctx.Done(): - return - case c.nodesForFindNode.In() <- p: - case c.nodesForSampleInfoHashes.In() <- p: - case c.nodesForPing.In() <- p: + + var result string + + if c.ignoreNodes.test(p.ID()) { + result = "ignored" + } else { + select { + case <-ctx.Done(): + return + case c.nodesForFindNode.In() <- p: + result = "find_node" + case c.nodesForSampleInfoHashes.In() <- p: + c.ignoreNodes.add(p.ID()) + result = "sample_infohashes" + case c.nodesForPing.In() <- p: + result = "ping" + } } + + c.discoveredNodesTotal.With(prometheus.Labels{"result": result}).Inc() } } } diff --git a/internal/dhtcrawler/factory.go b/internal/dhtcrawler/factory.go index 412fc0413..20a3e3b74 100644 --- a/internal/dhtcrawler/factory.go +++ b/internal/dhtcrawler/factory.go @@ -41,21 +41,137 @@ type Result struct { DhtCrawlerActive *concurrency.AtomicValue[bool] `name:"dht_crawler_active"` - PersistedTotal prometheus.Collector `group:"prometheus_collectors"` + PersistedTotal prometheus.Collector `group:"prometheus_collectors"` + DiscoveredNodesTotal prometheus.Collector `group:"prometheus_collectors"` + FindNodesCount prometheus.Collector `group:"prometheus_collectors"` + GetPeersPeerCount prometheus.Collector `group:"prometheus_collectors"` + GetPeersNodeCount prometheus.Collector `group:"prometheus_collectors"` + GetPeersNodeTotal prometheus.Collector `group:"prometheus_collectors"` + RequestMetaInfoTotal prometheus.Collector `group:"prometheus_collectors"` + InfohashTriageTotal prometheus.Collector `group:"prometheus_collectors"` + SampleInfohashesHashCount prometheus.Collector `group:"prometheus_collectors"` + SampleInfohashesHashTotal prometheus.Collector `group:"prometheus_collectors"` + SampleInfohashesNodeCount prometheus.Collector `group:"prometheus_collectors"` + SampleInfohashesNodeTotal prometheus.Collector `group:"prometheus_collectors"` + ScrapeNodeCount prometheus.Collector `group:"prometheus_collectors"` + ScrapeNodeTotal prometheus.Collector `group:"prometheus_collectors"` } +const ( + namespace = "bitmagnet" + subsystem = "dht_crawler" +) + func New(params Params) Result { active := &concurrency.AtomicValue[bool]{} var c crawler persistedTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "bitmagnet", - Subsystem: "dht_crawler", + Namespace: namespace, + Subsystem: subsystem, Name: "persisted_total", Help: "A counter of persisted database entities.", }, []string{"entity"}) + discoveredNodesTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "discovered_nodes_total", + Help: "Total number of nodes the crawler discovers.", + }, []string{"result"}) + + findNodesCount := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "find_nodes_count", + Help: "Number of nodes found by find_node requests.", + // Spec compliant DHT nodes should never return more than 8 nodes per find_node request. + Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8}, + }) + + getPeersPeerCount := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "get_peers_peer_count", + Help: "Number of peers found by get_peers requests.", + Buckets: []float64{0, 1, 2, 5, 10, 20, 50, 100}, + }) + + getPeersNodeCount := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "get_peers_node_count", + Help: "Number of nodes found by get_peers requests.", + Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8}, + }) + + getPeersNodeTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "get_peers_node_total", + Help: "Total number of nodes found by get_peers requests.", + }, []string{"result"}) + + requestMetaInfoTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "request_metainfo_total", + Help: "Total number of metainfo requests.", + }, []string{"result"}) + + infohashTriageTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "infohash_triage_total", + Help: "Total number of triaged infohashes.", + }, []string{"result"}) + + sampleInfohashesHashCount := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sample_infohashes_hash_count", + Help: "Number of infohashes found by sample_infohashes requests.", + Buckets: []float64{0, 1, 2, 5, 10, 15, 20}, + }) + + sampleInfohashesHashTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sample_infohashes_hash_total", + Help: "Total number of infohashes found by sample_infohashes requests.", + }, []string{"result"}) + + sampleInfohashesNodeCount := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sample_infohashes_node_count", + Help: "Number of nodes found by sample_infohashes requests.", + Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8}, + }) + + sampleInfohashesNodeTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sample_infohashes_node_total", + Help: "Total number of nodes found by sample_infohashes requests.", + }, []string{"result"}) + + scrapeNodeCount := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "scrape_node_count", + Help: "Number of nodes found by scrape requests.", + Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8}, + }) + + scrapeNodeTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "scrape_node_total", + Help: "Total number of nodes found by scrape requests.", + }, []string{"result"}) + return Result{ Worker: worker.NewWorker( "dht_crawler", @@ -117,14 +233,30 @@ func New(params Params) Result { savePieces: params.Config.SavePieces, rescrapeThreshold: params.Config.RescrapeThreshold, dao: query, - ignoreHashes: &ignoreHashes{ + ignoreHashes: &ignoreFilter{ bloom: boom.NewStableBloomFilter(10_000_000, 2, 0.001), }, - blockingManager: blockingManager, - soughtNodeID: &concurrency.AtomicValue[protocol.ID]{}, - stopped: make(chan struct{}), - persistedTotal: persistedTotal, - logger: params.Logger.Named("dht_crawler"), + ignoreNodes: &ignoreFilter{ + bloom: boom.NewStableBloomFilter(200_000*uint(scalingFactor), 2, 0.001), + }, + blockingManager: blockingManager, + soughtNodeID: &concurrency.AtomicValue[protocol.ID]{}, + stopped: make(chan struct{}), + persistedTotal: persistedTotal, + discoveredNodesTotal: discoveredNodesTotal, + findNodesCount: findNodesCount, + getPeersPeerCount: getPeersPeerCount, + getPeersNodeCount: getPeersNodeCount, + getPeersNodeTotal: getPeersNodeTotal, + requestMetaInfoTotal: requestMetaInfoTotal, + infohashTriageTotal: infohashTriageTotal, + sampleInfohashesHashCount: sampleInfohashesHashCount, + sampleInfohashesHashTotal: sampleInfohashesHashTotal, + sampleInfohashesNodeCount: sampleInfohashesNodeCount, + sampleInfohashesNodeTotal: sampleInfohashesNodeTotal, + scrapeNodeCount: scrapeNodeCount, + scrapeNodeTotal: scrapeNodeTotal, + logger: params.Logger.Named("dht_crawler"), } c.soughtNodeID.Set(protocol.RandomNodeID()) @@ -142,7 +274,20 @@ func New(params Params) Result { }, }, ), - PersistedTotal: persistedTotal, - DhtCrawlerActive: active, + PersistedTotal: persistedTotal, + DiscoveredNodesTotal: discoveredNodesTotal, + FindNodesCount: findNodesCount, + GetPeersPeerCount: getPeersPeerCount, + GetPeersNodeCount: getPeersNodeCount, + GetPeersNodeTotal: getPeersNodeTotal, + RequestMetaInfoTotal: requestMetaInfoTotal, + InfohashTriageTotal: infohashTriageTotal, + SampleInfohashesHashCount: sampleInfohashesHashCount, + SampleInfohashesHashTotal: sampleInfohashesHashTotal, + SampleInfohashesNodeCount: sampleInfohashesNodeCount, + SampleInfohashesNodeTotal: sampleInfohashesNodeTotal, + ScrapeNodeCount: scrapeNodeCount, + ScrapeNodeTotal: scrapeNodeTotal, + DhtCrawlerActive: active, } } diff --git a/internal/dhtcrawler/find_node.go b/internal/dhtcrawler/find_node.go index e41a43a87..7743be3d9 100644 --- a/internal/dhtcrawler/find_node.go +++ b/internal/dhtcrawler/find_node.go @@ -33,6 +33,8 @@ func (c *crawler) runFindNode(ctx context.Context) { Reason: fmt.Errorf("find_node failed: %w", err), }) } else { + c.findNodesCount.Observe(float64(len(res.Nodes))) + c.kTable.BatchCommand(ktable.PutNode{ ID: p.ID(), Addr: p.Addr(), diff --git a/internal/dhtcrawler/get_peers.go b/internal/dhtcrawler/get_peers.go index 97e190843..67710cffe 100644 --- a/internal/dhtcrawler/get_peers.go +++ b/internal/dhtcrawler/get_peers.go @@ -8,6 +8,7 @@ import ( "time" "github.com/bitmagnet-io/bitmagnet/internal/protocol/dht/ktable" + "github.com/prometheus/client_golang/prometheus" ) func (c *crawler) runGetPeers(ctx context.Context) { @@ -62,19 +63,28 @@ func (c *crawler) requestPeersForHash( Options: []ktable.NodeOption{ktable.NodeResponded()}, }) + c.getPeersPeerCount.Observe(float64(len(res.Values))) + c.getPeersNodeCount.Observe(float64(len(res.Nodes))) + if len(res.Nodes) > 0 { // block the channel for up to a second in an attempt to add the nodes to the discoveredNodes channel cancelCtx, cancel := context.WithTimeout(ctx, time.Second) + processed := 0 + + nodes: for _, n := range res.Nodes { select { case <-cancelCtx.Done(): - break + break nodes case c.discoveredNodes.In() <- ktable.NewNode(n.ID, n.Addr): - continue + processed++ } } + c.getPeersNodeTotal.With(prometheus.Labels{"result": "discovered_nodes"}).Add(float64(processed)) + c.getPeersNodeTotal.With(prometheus.Labels{"result": "skipped"}).Add(float64(len(res.Nodes) - processed)) + cancel() } diff --git a/internal/dhtcrawler/infohash_triage.go b/internal/dhtcrawler/infohash_triage.go index cc142e4a0..2555da747 100644 --- a/internal/dhtcrawler/infohash_triage.go +++ b/internal/dhtcrawler/infohash_triage.go @@ -7,6 +7,7 @@ import ( "github.com/bitmagnet-io/bitmagnet/internal/model" "github.com/bitmagnet-io/bitmagnet/internal/protocol" + "github.com/prometheus/client_golang/prometheus" ) // runInfoHashTriage receives discovered hashes on the infoHashTriage channel, determines if they should be crawled, @@ -30,6 +31,7 @@ func (c *crawler) runInfoHashTriage(ctx context.Context) { reqMap := make(map[protocol.ID]nodeHasPeersForHash, len(reqs)) for _, r := range reqs { if _, ok := reqMap[r.infoHash]; ok { + c.infohashTriageTotal.With(prometheus.Labels{"result": "duplicate"}).Inc() continue } @@ -43,16 +45,15 @@ func (c *crawler) runInfoHashTriage(ctx context.Context) { break } + c.infohashTriageTotal.With(prometheus.Labels{"result": "blocked"}).Add(float64(len(allHashes) - len(filteredHashes))) + if len(filteredHashes) == 0 { break } - filteredHashMap := make(map[protocol.ID]struct{}, len(filteredHashes)) valuers := make([]driver.Valuer, 0, len(filteredHashes)) for _, h := range filteredHashes { - filteredHashMap[h] = struct{}{} - valuers = append(valuers, h) } @@ -80,7 +81,7 @@ func (c *crawler) runInfoHashTriage(ctx context.Context) { foundTorrents[t.InfoHash] = *t } - for h := range filteredHashMap { + for _, h := range filteredHashes { r := reqMap[h] if t, ok := foundTorrents[r.infoHash]; !ok || t.FilesStatus == model.FilesStatusNoInfo || @@ -90,7 +91,7 @@ func (c *crawler) runInfoHashTriage(ctx context.Context) { case <-ctx.Done(): return case c.getPeers.In() <- r: - continue + c.infohashTriageTotal.With(prometheus.Labels{"result": "get_peers"}).Inc() } } else if (!t.Seeders.Valid || !t.Leechers.Valid) || t.UpdatedAt.Before(time.Now().Add(-c.rescrapeThreshold)) { @@ -98,7 +99,7 @@ func (c *crawler) runInfoHashTriage(ctx context.Context) { case <-ctx.Done(): return case c.scrape.In() <- r: - continue + c.infohashTriageTotal.With(prometheus.Labels{"result": "scrape"}).Inc() } } } diff --git a/internal/dhtcrawler/request_meta_info.go b/internal/dhtcrawler/request_meta_info.go index 492964279..78934fbb1 100644 --- a/internal/dhtcrawler/request_meta_info.go +++ b/internal/dhtcrawler/request_meta_info.go @@ -8,6 +8,7 @@ import ( "github.com/bitmagnet-io/bitmagnet/internal/protocol" "github.com/bitmagnet-io/bitmagnet/internal/protocol/metainfo/metainforequester" + "github.com/prometheus/client_golang/prometheus" ) func (c *crawler) runRequestMetaInfo(ctx context.Context) { @@ -49,11 +50,14 @@ func (c *crawler) doRequestMetaInfo( if banErr := c.banningChecker.Check(res.Info); banErr != nil { _ = c.blockingManager.Block(ctx, []protocol.ID{hash}, false) + c.requestMetaInfoTotal.With(prometheus.Labels{"result": "blocked"}).Inc() return metainforequester.Response{}, banErr } + c.requestMetaInfoTotal.With(prometheus.Labels{"result": "persist_torrents"}).Inc() return res, nil } + c.requestMetaInfoTotal.With(prometheus.Labels{"result": "error"}).Inc() return metainforequester.Response{}, errors.Join(errs...) } diff --git a/internal/dhtcrawler/sample_infohashes.go b/internal/dhtcrawler/sample_infohashes.go index a965cf159..f34a1b890 100644 --- a/internal/dhtcrawler/sample_infohashes.go +++ b/internal/dhtcrawler/sample_infohashes.go @@ -6,6 +6,7 @@ import ( "time" "github.com/bitmagnet-io/bitmagnet/internal/protocol/dht/ktable" + "github.com/prometheus/client_golang/prometheus" ) func (c *crawler) getNodesForSampleInfoHashes(ctx context.Context) { @@ -39,6 +40,9 @@ func (c *crawler) runSampleInfoHashes(ctx context.Context) { return } + c.sampleInfohashesHashCount.Observe(float64(len(res.Samples))) + c.sampleInfohashesNodeCount.Observe(float64(len(res.Nodes))) + var discoveredHashes []nodeHasPeersForHash for _, s := range res.Samples { @@ -47,6 +51,9 @@ func (c *crawler) runSampleInfoHashes(ctx context.Context) { infoHash: s, node: n.Addr(), }) + c.sampleInfohashesHashTotal.With(prometheus.Labels{"result": "infohash_triage"}).Inc() + } else { + c.sampleInfohashesHashTotal.With(prometheus.Labels{"result": "skipped"}).Inc() } } @@ -83,14 +90,20 @@ func (c *crawler) runSampleInfoHashes(ctx context.Context) { timeoutCtx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() + processed := 0 + + nodes: for _, n := range res.Nodes { select { case <-timeoutCtx.Done(): - return + break nodes case c.discoveredNodes.In() <- ktable.NewNode(n.ID, n.Addr): - continue + processed++ } } + + c.sampleInfohashesNodeTotal.With(prometheus.Labels{"result": "discovered_nodes"}).Add(float64(processed)) + c.sampleInfohashesNodeTotal.With(prometheus.Labels{"result": "skipped"}).Add(float64(len(res.Nodes) - processed)) }() } }) diff --git a/internal/dhtcrawler/scrape.go b/internal/dhtcrawler/scrape.go index 0094eb624..e08ccc177 100644 --- a/internal/dhtcrawler/scrape.go +++ b/internal/dhtcrawler/scrape.go @@ -6,6 +6,7 @@ import ( "time" "github.com/bitmagnet-io/bitmagnet/internal/protocol/dht/ktable" + "github.com/prometheus/client_golang/prometheus" ) func (c *crawler) runScrape(ctx context.Context) { @@ -47,18 +48,26 @@ func (c *crawler) requestScrape( Options: []ktable.NodeOption{ktable.NodeResponded()}, }) + c.scrapeNodeCount.Observe(float64(len(res.Nodes))) + if len(res.Nodes) > 0 { cancelCtx, cancel := context.WithTimeout(ctx, time.Second) + processed := 0 + + nodes: for _, n := range res.Nodes { select { case <-cancelCtx.Done(): - break + break nodes case c.discoveredNodes.In() <- ktable.NewNode(n.ID, n.Addr): - continue + processed++ } } + c.scrapeNodeTotal.With(prometheus.Labels{"result": "discovered_nodes"}).Add(float64(processed)) + c.scrapeNodeTotal.With(prometheus.Labels{"result": "skipped"}).Add(float64(len(res.Nodes) - processed)) + cancel() } diff --git a/internal/protocol/dht/ktable/mocks/Table.go b/internal/protocol/dht/ktable/mocks/Table.go index be0fd559d..10373284d 100644 --- a/internal/protocol/dht/ktable/mocks/Table.go +++ b/internal/protocol/dht/ktable/mocks/Table.go @@ -121,54 +121,6 @@ func (_c *Table_DropNode_Call) RunAndReturn(run func(protocol.ID, error) bool) * return _c } -// FilterKnownAddrs provides a mock function with given fields: addrs -func (_m *Table) FilterKnownAddrs(addrs []netip.Addr) []netip.Addr { - ret := _m.Called(addrs) - - if len(ret) == 0 { - panic("no return value specified for FilterKnownAddrs") - } - - var r0 []netip.Addr - if rf, ok := ret.Get(0).(func([]netip.Addr) []netip.Addr); ok { - r0 = rf(addrs) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]netip.Addr) - } - } - - return r0 -} - -// Table_FilterKnownAddrs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FilterKnownAddrs' -type Table_FilterKnownAddrs_Call struct { - *mock.Call -} - -// FilterKnownAddrs is a helper method to define mock.On call -// - addrs []netip.Addr -func (_e *Table_Expecter) FilterKnownAddrs(addrs interface{}) *Table_FilterKnownAddrs_Call { - return &Table_FilterKnownAddrs_Call{Call: _e.mock.On("FilterKnownAddrs", addrs)} -} - -func (_c *Table_FilterKnownAddrs_Call) Run(run func(addrs []netip.Addr)) *Table_FilterKnownAddrs_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].([]netip.Addr)) - }) - return _c -} - -func (_c *Table_FilterKnownAddrs_Call) Return(_a0 []netip.Addr) *Table_FilterKnownAddrs_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *Table_FilterKnownAddrs_Call) RunAndReturn(run func([]netip.Addr) []netip.Addr) *Table_FilterKnownAddrs_Call { - _c.Call.Return(run) - return _c -} - // GetClosestNodes provides a mock function with given fields: id func (_m *Table) GetClosestNodes(id protocol.ID) []ktable.Node { ret := _m.Called(id) diff --git a/internal/protocol/dht/ktable/query.go b/internal/protocol/dht/ktable/query.go index a24b27dda..270158948 100644 --- a/internal/protocol/dht/ktable/query.go +++ b/internal/protocol/dht/ktable/query.go @@ -1,7 +1,6 @@ package ktable import ( - "net/netip" "sort" "time" ) @@ -40,24 +39,6 @@ func (c GetOldestPeers) execReturn(t *table) []Node { return peers } -var _ Query[[]netip.Addr] = FilterKnownAddrs{} - -type FilterKnownAddrs struct { - Addrs []netip.Addr -} - -func (c FilterKnownAddrs) execReturn(t *table) []netip.Addr { - var unknown []netip.Addr - - for _, addr := range c.Addrs { - if _, ok := t.addrs.addrs[addr.String()]; !ok { - unknown = append(unknown, addr) - } - } - - return unknown -} - var _ Query[[]Node] = GetNodesForSampleInfoHashes{} type GetNodesForSampleInfoHashes struct { diff --git a/internal/protocol/dht/ktable/table.go b/internal/protocol/dht/ktable/table.go index 9e7c9d926..7befd1059 100644 --- a/internal/protocol/dht/ktable/table.go +++ b/internal/protocol/dht/ktable/table.go @@ -25,7 +25,6 @@ type TableQuery interface { GetClosestNodes(id ID) []Node GetOldestNodes(cutoff time.Time, n int) []Node GetNodesForSampleInfoHashes(n int) []Node - FilterKnownAddrs(addrs []netip.Addr) []netip.Addr GetHashOrClosestNodes(id ID) GetHashOrClosestNodesResult // SampleHashesAndNodes returns a random sample of up to 8 hashes and nodes, and the total hashes count. SampleHashesAndNodes() SampleHashesAndNodesResult @@ -133,15 +132,6 @@ func (t *table) GetNodesForSampleInfoHashes(n int) []Node { }.execReturn(t) } -func (t *table) FilterKnownAddrs(addrs []netip.Addr) []netip.Addr { - t.mutex.RLock() - defer t.mutex.RUnlock() - - return FilterKnownAddrs{ - Addrs: addrs, - }.execReturn(t) -} - func (t *table) GetHashOrClosestNodes(id ID) GetHashOrClosestNodesResult { t.mutex.RLock() defer t.mutex.RUnlock()