Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions internal/dhtcrawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,28 @@ type crawler struct {
// containing every hash it has already encountered.
// This avoids multiple attempts to crawl the same hash, and takes a lot of load off the database query
// that checks if a hash has already been indexed.
ignoreHashes *ignoreHashes
ignoreHashes *ignoreFilter
ignoreNodes *ignoreFilter
blockingManager blocking.Manager
// soughtNodeID is a random node ID used as the target for find_node and sample_infohashes requests.
// It is rotated every 10 seconds.
soughtNodeID *concurrency.AtomicValue[protocol.ID]
stopped chan struct{}
persistedTotal *prometheus.CounterVec
logger *zap.SugaredLogger
soughtNodeID *concurrency.AtomicValue[protocol.ID]
stopped chan struct{}
persistedTotal *prometheus.CounterVec
discoveredNodesTotal *prometheus.CounterVec
findNodesCount prometheus.Histogram
getPeersPeerCount prometheus.Histogram
getPeersNodeCount prometheus.Histogram
getPeersNodeTotal *prometheus.CounterVec
requestMetaInfoTotal *prometheus.CounterVec
infohashTriageTotal *prometheus.CounterVec
sampleInfohashesHashCount prometheus.Histogram
sampleInfohashesHashTotal *prometheus.CounterVec
sampleInfohashesNodeCount prometheus.Histogram
sampleInfohashesNodeTotal *prometheus.CounterVec
scrapeNodeCount prometheus.Histogram
scrapeNodeTotal *prometheus.CounterVec
logger *zap.SugaredLogger

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a LOT of properties to add here; I think much of this is already being collected here/elsewhere? https://github.com/bitmagnet-io/bitmagnet/blob/main/internal/protocol/dht/server/prometheus_collector.go

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the metrics I've added here are separate from the existing collectors. They serve two purposes not covered by the sucess/latency metrics already existing.

  • The histograms gather statistics about the responses received from the DHT per query type (something that isn't recorded by current metrics as the recorded metric depends on the query type). This isn't immediately useful to us, it's more interesting from a general DHT analysis perspective.
  • The counters gather statistics about the flow of data through the DHT crawler itself.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am aware it's a lot of metrics to add; my opinion with metrics is that it's better to err on the side of too many rather than not enough seeing as they are so cheap to add. If there's any metrics in particular you think aren't useful, we can get rid of them.

}

func (c *crawler) start() {
Expand Down Expand Up @@ -101,18 +115,32 @@ type infoHashWithScrape struct {
bfpe bloom.Filter
}

type ignoreHashes struct {
type ignoreFilter struct {
mutex sync.Mutex
bloom *boom.StableBloomFilter
}

func (i *ignoreHashes) testAndAdd(id protocol.ID) bool {
func (i *ignoreFilter) testAndAdd(id protocol.ID) bool {
i.mutex.Lock()
defer i.mutex.Unlock()

return i.bloom.TestAndAdd(id[:])
}

func (i *ignoreFilter) test(id protocol.ID) bool {
i.mutex.Lock()
defer i.mutex.Unlock()

return i.bloom.Test(id[:])
}

func (i *ignoreFilter) add(id protocol.ID) {
i.mutex.Lock()
defer i.mutex.Unlock()

i.bloom.Add(id[:])
}

func (c *crawler) rotateSoughtNodeID(ctx context.Context) {
for {
select {
Expand Down
34 changes: 24 additions & 10 deletions internal/dhtcrawler/discovered_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/bitmagnet-io/bitmagnet/internal/concurrency"
"github.com/bitmagnet-io/bitmagnet/internal/protocol/dht/ktable"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/fx"
)

Expand Down Expand Up @@ -45,18 +46,31 @@ func (c *crawler) runDiscoveredNodes(ctx context.Context) {
addrs = append(addrs, p.Addr().Addr())
}
}
// for any discovered node not already in the routing table,
// we will block until it can be sent to any one of the pipeline channels.
unknownAddrs := c.kTable.FilterKnownAddrs(addrs)
for _, addr := range unknownAddrs {

// For any newly discovered node, we will block until it can be
// sent to any one of the pipeline channels.
for _, addr := range addrs {
p := m[addr.String()]
select {
case <-ctx.Done():
return
case c.nodesForFindNode.In() <- p:
case c.nodesForSampleInfoHashes.In() <- p:
case c.nodesForPing.In() <- p:

var result string

if c.ignoreNodes.test(p.ID()) {
result = "ignored"
} else {
select {
case <-ctx.Done():
return
case c.nodesForFindNode.In() <- p:
result = "find_node"
case c.nodesForSampleInfoHashes.In() <- p:
c.ignoreNodes.add(p.ID())
result = "sample_infohashes"
case c.nodesForPing.In() <- p:
result = "ping"
}
}

c.discoveredNodesTotal.With(prometheus.Labels{"result": result}).Inc()
}
}
}
Expand Down
167 changes: 156 additions & 11 deletions internal/dhtcrawler/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,137 @@ type Result struct {

DhtCrawlerActive *concurrency.AtomicValue[bool] `name:"dht_crawler_active"`

PersistedTotal prometheus.Collector `group:"prometheus_collectors"`
PersistedTotal prometheus.Collector `group:"prometheus_collectors"`
DiscoveredNodesTotal prometheus.Collector `group:"prometheus_collectors"`
FindNodesCount prometheus.Collector `group:"prometheus_collectors"`
GetPeersPeerCount prometheus.Collector `group:"prometheus_collectors"`
GetPeersNodeCount prometheus.Collector `group:"prometheus_collectors"`
GetPeersNodeTotal prometheus.Collector `group:"prometheus_collectors"`
RequestMetaInfoTotal prometheus.Collector `group:"prometheus_collectors"`
InfohashTriageTotal prometheus.Collector `group:"prometheus_collectors"`
SampleInfohashesHashCount prometheus.Collector `group:"prometheus_collectors"`
SampleInfohashesHashTotal prometheus.Collector `group:"prometheus_collectors"`
SampleInfohashesNodeCount prometheus.Collector `group:"prometheus_collectors"`
SampleInfohashesNodeTotal prometheus.Collector `group:"prometheus_collectors"`
ScrapeNodeCount prometheus.Collector `group:"prometheus_collectors"`
ScrapeNodeTotal prometheus.Collector `group:"prometheus_collectors"`
}

const (
namespace = "bitmagnet"
subsystem = "dht_crawler"
)

func New(params Params) Result {
active := &concurrency.AtomicValue[bool]{}

var c crawler

persistedTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "bitmagnet",
Subsystem: "dht_crawler",
Namespace: namespace,
Subsystem: subsystem,
Name: "persisted_total",
Help: "A counter of persisted database entities.",
}, []string{"entity"})

discoveredNodesTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "discovered_nodes_total",
Help: "Total number of nodes the crawler discovers.",
}, []string{"result"})

findNodesCount := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "find_nodes_count",
Help: "Number of nodes found by find_node requests.",
// Spec compliant DHT nodes should never return more than 8 nodes per find_node request.
Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8},
})

getPeersPeerCount := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "get_peers_peer_count",
Help: "Number of peers found by get_peers requests.",
Buckets: []float64{0, 1, 2, 5, 10, 20, 50, 100},
})

getPeersNodeCount := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "get_peers_node_count",
Help: "Number of nodes found by get_peers requests.",
Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8},
})

getPeersNodeTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "get_peers_node_total",
Help: "Total number of nodes found by get_peers requests.",
}, []string{"result"})

requestMetaInfoTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_metainfo_total",
Help: "Total number of metainfo requests.",
}, []string{"result"})

infohashTriageTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "infohash_triage_total",
Help: "Total number of triaged infohashes.",
}, []string{"result"})

sampleInfohashesHashCount := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sample_infohashes_hash_count",
Help: "Number of infohashes found by sample_infohashes requests.",
Buckets: []float64{0, 1, 2, 5, 10, 15, 20},
})

sampleInfohashesHashTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sample_infohashes_hash_total",
Help: "Total number of infohashes found by sample_infohashes requests.",
}, []string{"result"})

sampleInfohashesNodeCount := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sample_infohashes_node_count",
Help: "Number of nodes found by sample_infohashes requests.",
Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8},
})

sampleInfohashesNodeTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sample_infohashes_node_total",
Help: "Total number of nodes found by sample_infohashes requests.",
}, []string{"result"})

scrapeNodeCount := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "scrape_node_count",
Help: "Number of nodes found by scrape requests.",
Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8},
})

scrapeNodeTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "scrape_node_total",
Help: "Total number of nodes found by scrape requests.",
}, []string{"result"})

return Result{
Worker: worker.NewWorker(
"dht_crawler",
Expand Down Expand Up @@ -117,14 +233,30 @@ func New(params Params) Result {
savePieces: params.Config.SavePieces,
rescrapeThreshold: params.Config.RescrapeThreshold,
dao: query,
ignoreHashes: &ignoreHashes{
ignoreHashes: &ignoreFilter{
bloom: boom.NewStableBloomFilter(10_000_000, 2, 0.001),
},
blockingManager: blockingManager,
soughtNodeID: &concurrency.AtomicValue[protocol.ID]{},
stopped: make(chan struct{}),
persistedTotal: persistedTotal,
logger: params.Logger.Named("dht_crawler"),
ignoreNodes: &ignoreFilter{
bloom: boom.NewStableBloomFilter(200_000*uint(scalingFactor), 2, 0.001),
},
blockingManager: blockingManager,
soughtNodeID: &concurrency.AtomicValue[protocol.ID]{},
stopped: make(chan struct{}),
persistedTotal: persistedTotal,
discoveredNodesTotal: discoveredNodesTotal,
findNodesCount: findNodesCount,
getPeersPeerCount: getPeersPeerCount,
getPeersNodeCount: getPeersNodeCount,
getPeersNodeTotal: getPeersNodeTotal,
requestMetaInfoTotal: requestMetaInfoTotal,
infohashTriageTotal: infohashTriageTotal,
sampleInfohashesHashCount: sampleInfohashesHashCount,
sampleInfohashesHashTotal: sampleInfohashesHashTotal,
sampleInfohashesNodeCount: sampleInfohashesNodeCount,
sampleInfohashesNodeTotal: sampleInfohashesNodeTotal,
scrapeNodeCount: scrapeNodeCount,
scrapeNodeTotal: scrapeNodeTotal,
logger: params.Logger.Named("dht_crawler"),
}
c.soughtNodeID.Set(protocol.RandomNodeID())

Expand All @@ -142,7 +274,20 @@ func New(params Params) Result {
},
},
),
PersistedTotal: persistedTotal,
DhtCrawlerActive: active,
PersistedTotal: persistedTotal,
DiscoveredNodesTotal: discoveredNodesTotal,
FindNodesCount: findNodesCount,
GetPeersPeerCount: getPeersPeerCount,
GetPeersNodeCount: getPeersNodeCount,
GetPeersNodeTotal: getPeersNodeTotal,
RequestMetaInfoTotal: requestMetaInfoTotal,
InfohashTriageTotal: infohashTriageTotal,
SampleInfohashesHashCount: sampleInfohashesHashCount,
SampleInfohashesHashTotal: sampleInfohashesHashTotal,
SampleInfohashesNodeCount: sampleInfohashesNodeCount,
SampleInfohashesNodeTotal: sampleInfohashesNodeTotal,
ScrapeNodeCount: scrapeNodeCount,
ScrapeNodeTotal: scrapeNodeTotal,
DhtCrawlerActive: active,
}
}
2 changes: 2 additions & 0 deletions internal/dhtcrawler/find_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func (c *crawler) runFindNode(ctx context.Context) {
Reason: fmt.Errorf("find_node failed: %w", err),
})
} else {
c.findNodesCount.Observe(float64(len(res.Nodes)))

c.kTable.BatchCommand(ktable.PutNode{
ID: p.ID(),
Addr: p.Addr(),
Expand Down
14 changes: 12 additions & 2 deletions internal/dhtcrawler/get_peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/bitmagnet-io/bitmagnet/internal/protocol/dht/ktable"
"github.com/prometheus/client_golang/prometheus"
)

func (c *crawler) runGetPeers(ctx context.Context) {
Expand Down Expand Up @@ -62,19 +63,28 @@ func (c *crawler) requestPeersForHash(
Options: []ktable.NodeOption{ktable.NodeResponded()},
})

c.getPeersPeerCount.Observe(float64(len(res.Values)))
c.getPeersNodeCount.Observe(float64(len(res.Nodes)))

if len(res.Nodes) > 0 {
// block the channel for up to a second in an attempt to add the nodes to the discoveredNodes channel
cancelCtx, cancel := context.WithTimeout(ctx, time.Second)

processed := 0

nodes:
for _, n := range res.Nodes {
select {
case <-cancelCtx.Done():
break
break nodes
case c.discoveredNodes.In() <- ktable.NewNode(n.ID, n.Addr):
continue
processed++
}
}

c.getPeersNodeTotal.With(prometheus.Labels{"result": "discovered_nodes"}).Add(float64(processed))
c.getPeersNodeTotal.With(prometheus.Labels{"result": "skipped"}).Add(float64(len(res.Nodes) - processed))

cancel()
}

Expand Down
Loading