From 8c8fdcde9a6b6f40a83870981aefee65f9521f31 Mon Sep 17 00:00:00 2001 From: Mylo Fawcett Date: Sun, 29 Jun 2025 20:52:59 +0200 Subject: [PATCH 1/2] Skip recently processed nodes in crawler --- internal/dhtcrawler/crawler.go | 7 +-- internal/dhtcrawler/discovered_nodes.go | 13 ++++-- internal/dhtcrawler/factory.go | 5 ++- internal/protocol/dht/ktable/mocks/Table.go | 48 --------------------- internal/protocol/dht/ktable/query.go | 19 -------- internal/protocol/dht/ktable/table.go | 10 ----- 6 files changed, 17 insertions(+), 85 deletions(-) diff --git a/internal/dhtcrawler/crawler.go b/internal/dhtcrawler/crawler.go index 988085724..2c74026e5 100644 --- a/internal/dhtcrawler/crawler.go +++ b/internal/dhtcrawler/crawler.go @@ -48,7 +48,8 @@ type crawler struct { // containing every hash it has already encountered. // This avoids multiple attempts to crawl the same hash, and takes a lot of load off the database query // that checks if a hash has already been indexed. - ignoreHashes *ignoreHashes + ignoreHashes *ignoreFilter + ignoreNodes *ignoreFilter blockingManager blocking.Manager // soughtNodeID is a random node ID used as the target for find_node and sample_infohashes requests. // It is rotated every 10 seconds. @@ -101,12 +102,12 @@ type infoHashWithScrape struct { bfpe bloom.Filter } -type ignoreHashes struct { +type ignoreFilter struct { mutex sync.Mutex bloom *boom.StableBloomFilter } -func (i *ignoreHashes) testAndAdd(id protocol.ID) bool { +func (i *ignoreFilter) testAndAdd(id protocol.ID) bool { i.mutex.Lock() defer i.mutex.Unlock() diff --git a/internal/dhtcrawler/discovered_nodes.go b/internal/dhtcrawler/discovered_nodes.go index 7154fd37b..4b0de5683 100644 --- a/internal/dhtcrawler/discovered_nodes.go +++ b/internal/dhtcrawler/discovered_nodes.go @@ -45,11 +45,16 @@ func (c *crawler) runDiscoveredNodes(ctx context.Context) { addrs = append(addrs, p.Addr().Addr()) } } - // for any discovered node not already in the routing table, - // we will block until it can be sent to any one of the pipeline channels. - unknownAddrs := c.kTable.FilterKnownAddrs(addrs) - for _, addr := range unknownAddrs { + + // For any newly discovered node, we will block until it can be + // sent to any one of the pipeline channels. + for _, addr := range addrs { p := m[addr.String()] + + if c.ignoreNodes.testAndAdd(p.ID()) { + continue + } + select { case <-ctx.Done(): return diff --git a/internal/dhtcrawler/factory.go b/internal/dhtcrawler/factory.go index 412fc0413..18794702d 100644 --- a/internal/dhtcrawler/factory.go +++ b/internal/dhtcrawler/factory.go @@ -117,9 +117,12 @@ func New(params Params) Result { savePieces: params.Config.SavePieces, rescrapeThreshold: params.Config.RescrapeThreshold, dao: query, - ignoreHashes: &ignoreHashes{ + ignoreHashes: &ignoreFilter{ bloom: boom.NewStableBloomFilter(10_000_000, 2, 0.001), }, + ignoreNodes: &ignoreFilter{ + bloom: boom.NewStableBloomFilter(200_000*uint(scalingFactor), 2, 0.001), + }, blockingManager: blockingManager, soughtNodeID: &concurrency.AtomicValue[protocol.ID]{}, stopped: make(chan struct{}), diff --git a/internal/protocol/dht/ktable/mocks/Table.go b/internal/protocol/dht/ktable/mocks/Table.go index be0fd559d..10373284d 100644 --- a/internal/protocol/dht/ktable/mocks/Table.go +++ b/internal/protocol/dht/ktable/mocks/Table.go @@ -121,54 +121,6 @@ func (_c *Table_DropNode_Call) RunAndReturn(run func(protocol.ID, error) bool) * return _c } -// FilterKnownAddrs provides a mock function with given fields: addrs -func (_m *Table) FilterKnownAddrs(addrs []netip.Addr) []netip.Addr { - ret := _m.Called(addrs) - - if len(ret) == 0 { - panic("no return value specified for FilterKnownAddrs") - } - - var r0 []netip.Addr - if rf, ok := ret.Get(0).(func([]netip.Addr) []netip.Addr); ok { - r0 = rf(addrs) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]netip.Addr) - } - } - - return r0 -} - -// Table_FilterKnownAddrs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FilterKnownAddrs' -type Table_FilterKnownAddrs_Call struct { - *mock.Call -} - -// FilterKnownAddrs is a helper method to define mock.On call -// - addrs []netip.Addr -func (_e *Table_Expecter) FilterKnownAddrs(addrs interface{}) *Table_FilterKnownAddrs_Call { - return &Table_FilterKnownAddrs_Call{Call: _e.mock.On("FilterKnownAddrs", addrs)} -} - -func (_c *Table_FilterKnownAddrs_Call) Run(run func(addrs []netip.Addr)) *Table_FilterKnownAddrs_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].([]netip.Addr)) - }) - return _c -} - -func (_c *Table_FilterKnownAddrs_Call) Return(_a0 []netip.Addr) *Table_FilterKnownAddrs_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *Table_FilterKnownAddrs_Call) RunAndReturn(run func([]netip.Addr) []netip.Addr) *Table_FilterKnownAddrs_Call { - _c.Call.Return(run) - return _c -} - // GetClosestNodes provides a mock function with given fields: id func (_m *Table) GetClosestNodes(id protocol.ID) []ktable.Node { ret := _m.Called(id) diff --git a/internal/protocol/dht/ktable/query.go b/internal/protocol/dht/ktable/query.go index a24b27dda..270158948 100644 --- a/internal/protocol/dht/ktable/query.go +++ b/internal/protocol/dht/ktable/query.go @@ -1,7 +1,6 @@ package ktable import ( - "net/netip" "sort" "time" ) @@ -40,24 +39,6 @@ func (c GetOldestPeers) execReturn(t *table) []Node { return peers } -var _ Query[[]netip.Addr] = FilterKnownAddrs{} - -type FilterKnownAddrs struct { - Addrs []netip.Addr -} - -func (c FilterKnownAddrs) execReturn(t *table) []netip.Addr { - var unknown []netip.Addr - - for _, addr := range c.Addrs { - if _, ok := t.addrs.addrs[addr.String()]; !ok { - unknown = append(unknown, addr) - } - } - - return unknown -} - var _ Query[[]Node] = GetNodesForSampleInfoHashes{} type GetNodesForSampleInfoHashes struct { diff --git a/internal/protocol/dht/ktable/table.go b/internal/protocol/dht/ktable/table.go index 9e7c9d926..7befd1059 100644 --- a/internal/protocol/dht/ktable/table.go +++ b/internal/protocol/dht/ktable/table.go @@ -25,7 +25,6 @@ type TableQuery interface { GetClosestNodes(id ID) []Node GetOldestNodes(cutoff time.Time, n int) []Node GetNodesForSampleInfoHashes(n int) []Node - FilterKnownAddrs(addrs []netip.Addr) []netip.Addr GetHashOrClosestNodes(id ID) GetHashOrClosestNodesResult // SampleHashesAndNodes returns a random sample of up to 8 hashes and nodes, and the total hashes count. SampleHashesAndNodes() SampleHashesAndNodesResult @@ -133,15 +132,6 @@ func (t *table) GetNodesForSampleInfoHashes(n int) []Node { }.execReturn(t) } -func (t *table) FilterKnownAddrs(addrs []netip.Addr) []netip.Addr { - t.mutex.RLock() - defer t.mutex.RUnlock() - - return FilterKnownAddrs{ - Addrs: addrs, - }.execReturn(t) -} - func (t *table) GetHashOrClosestNodes(id ID) GetHashOrClosestNodesResult { t.mutex.RLock() defer t.mutex.RUnlock() From 61e92b7edc6549d0c12956a02828abb62438ca1f Mon Sep 17 00:00:00 2001 From: Mylo Fawcett Date: Sun, 29 Jun 2025 21:09:17 +0200 Subject: [PATCH 2/2] Only ignore nodes once they have had their infohashes sampled --- internal/dhtcrawler/crawler.go | 14 ++++++++++++++ internal/dhtcrawler/discovered_nodes.go | 3 ++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/internal/dhtcrawler/crawler.go b/internal/dhtcrawler/crawler.go index 2c74026e5..397658dd9 100644 --- a/internal/dhtcrawler/crawler.go +++ b/internal/dhtcrawler/crawler.go @@ -114,6 +114,20 @@ func (i *ignoreFilter) testAndAdd(id protocol.ID) bool { return i.bloom.TestAndAdd(id[:]) } +func (i *ignoreFilter) test(id protocol.ID) bool { + i.mutex.Lock() + defer i.mutex.Unlock() + + return i.bloom.Test(id[:]) +} + +func (i *ignoreFilter) add(id protocol.ID) { + i.mutex.Lock() + defer i.mutex.Unlock() + + i.bloom.Add(id[:]) +} + func (c *crawler) rotateSoughtNodeID(ctx context.Context) { for { select { diff --git a/internal/dhtcrawler/discovered_nodes.go b/internal/dhtcrawler/discovered_nodes.go index 4b0de5683..c4d27c0bf 100644 --- a/internal/dhtcrawler/discovered_nodes.go +++ b/internal/dhtcrawler/discovered_nodes.go @@ -51,7 +51,7 @@ func (c *crawler) runDiscoveredNodes(ctx context.Context) { for _, addr := range addrs { p := m[addr.String()] - if c.ignoreNodes.testAndAdd(p.ID()) { + if c.ignoreNodes.test(p.ID()) { continue } @@ -60,6 +60,7 @@ func (c *crawler) runDiscoveredNodes(ctx context.Context) { return case c.nodesForFindNode.In() <- p: case c.nodesForSampleInfoHashes.In() <- p: + c.ignoreNodes.add(p.ID()) case c.nodesForPing.In() <- p: } }