Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions internal/dhtcrawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ type crawler struct {
// containing every hash it has already encountered.
// This avoids multiple attempts to crawl the same hash, and takes a lot of load off the database query
// that checks if a hash has already been indexed.
ignoreHashes *ignoreHashes
ignoreHashes *ignoreFilter
ignoreNodes *ignoreFilter
blockingManager blocking.Manager
// soughtNodeID is a random node ID used as the target for find_node and sample_infohashes requests.
// It is rotated every 10 seconds.
Expand Down Expand Up @@ -101,18 +102,32 @@ type infoHashWithScrape struct {
bfpe bloom.Filter
}

type ignoreHashes struct {
type ignoreFilter struct {
mutex sync.Mutex
bloom *boom.StableBloomFilter
}

func (i *ignoreHashes) testAndAdd(id protocol.ID) bool {
func (i *ignoreFilter) testAndAdd(id protocol.ID) bool {
i.mutex.Lock()
defer i.mutex.Unlock()

return i.bloom.TestAndAdd(id[:])
}

func (i *ignoreFilter) test(id protocol.ID) bool {
i.mutex.Lock()
defer i.mutex.Unlock()

return i.bloom.Test(id[:])
}

func (i *ignoreFilter) add(id protocol.ID) {
i.mutex.Lock()
defer i.mutex.Unlock()

i.bloom.Add(id[:])
}

func (c *crawler) rotateSoughtNodeID(ctx context.Context) {
for {
select {
Expand Down
14 changes: 10 additions & 4 deletions internal/dhtcrawler/discovered_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,22 @@ func (c *crawler) runDiscoveredNodes(ctx context.Context) {
addrs = append(addrs, p.Addr().Addr())
}
}
// for any discovered node not already in the routing table,
// we will block until it can be sent to any one of the pipeline channels.
unknownAddrs := c.kTable.FilterKnownAddrs(addrs)
for _, addr := range unknownAddrs {

// For any newly discovered node, we will block until it can be
// sent to any one of the pipeline channels.
for _, addr := range addrs {
p := m[addr.String()]

if c.ignoreNodes.test(p.ID()) {
continue
}

select {
case <-ctx.Done():
return
case c.nodesForFindNode.In() <- p:
case c.nodesForSampleInfoHashes.In() <- p:
c.ignoreNodes.add(p.ID())
case c.nodesForPing.In() <- p:
}
}
Expand Down
5 changes: 4 additions & 1 deletion internal/dhtcrawler/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,12 @@ func New(params Params) Result {
savePieces: params.Config.SavePieces,
rescrapeThreshold: params.Config.RescrapeThreshold,
dao: query,
ignoreHashes: &ignoreHashes{
ignoreHashes: &ignoreFilter{
bloom: boom.NewStableBloomFilter(10_000_000, 2, 0.001),
},
ignoreNodes: &ignoreFilter{
bloom: boom.NewStableBloomFilter(200_000*uint(scalingFactor), 2, 0.001),
},

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the scaling factor should be used here...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering the scaling factor pretty directly affects the number of discovered torrents, having a fixed size filter would mean its effectiveness over time would decrease faster the higher the scaling factor. Having it grow dynamically like this ensures consistent false negative rates (over time, not per torrent count) no matter the scaling factor.

I can run some quick tests to see if this is actually the case, but I would expect it to be.

blockingManager: blockingManager,
soughtNodeID: &concurrency.AtomicValue[protocol.ID]{},
stopped: make(chan struct{}),
Expand Down
48 changes: 0 additions & 48 deletions internal/protocol/dht/ktable/mocks/Table.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 0 additions & 19 deletions internal/protocol/dht/ktable/query.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ktable

import (
"net/netip"
"sort"
"time"
)
Expand Down Expand Up @@ -40,24 +39,6 @@ func (c GetOldestPeers) execReturn(t *table) []Node {
return peers
}

var _ Query[[]netip.Addr] = FilterKnownAddrs{}

type FilterKnownAddrs struct {
Addrs []netip.Addr
}

func (c FilterKnownAddrs) execReturn(t *table) []netip.Addr {
var unknown []netip.Addr

for _, addr := range c.Addrs {
if _, ok := t.addrs.addrs[addr.String()]; !ok {
unknown = append(unknown, addr)
}
}

return unknown
}

var _ Query[[]Node] = GetNodesForSampleInfoHashes{}

type GetNodesForSampleInfoHashes struct {
Expand Down
10 changes: 0 additions & 10 deletions internal/protocol/dht/ktable/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type TableQuery interface {
GetClosestNodes(id ID) []Node
GetOldestNodes(cutoff time.Time, n int) []Node
GetNodesForSampleInfoHashes(n int) []Node
FilterKnownAddrs(addrs []netip.Addr) []netip.Addr
GetHashOrClosestNodes(id ID) GetHashOrClosestNodesResult
// SampleHashesAndNodes returns a random sample of up to 8 hashes and nodes, and the total hashes count.
SampleHashesAndNodes() SampleHashesAndNodesResult
Expand Down Expand Up @@ -133,15 +132,6 @@ func (t *table) GetNodesForSampleInfoHashes(n int) []Node {
}.execReturn(t)
}

func (t *table) FilterKnownAddrs(addrs []netip.Addr) []netip.Addr {
t.mutex.RLock()
defer t.mutex.RUnlock()

return FilterKnownAddrs{
Addrs: addrs,
}.execReturn(t)
}

func (t *table) GetHashOrClosestNodes(id ID) GetHashOrClosestNodesResult {
t.mutex.RLock()
defer t.mutex.RUnlock()
Expand Down