Skip to content

Commit 1fd0d27

Browse files
committed
Remove all allocations in metrics system
1 parent df46034 commit 1fd0d27

File tree

3 files changed

+23
-80
lines changed

3 files changed

+23
-80
lines changed

datafusion/physical-plan/src/metrics/builder.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
//! Builder for creating arbitrary metrics
1919
20-
use std::{borrow::Cow, sync::Arc};
20+
use std::{borrow::Cow};
2121

2222
use super::{
23-
Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp,
23+
Count, ExecutionPlanMetricsSet, Gauge, Label, MetricValue, Time, Timestamp,
2424
};
2525

2626
/// Structure for constructing metrics, counters, timers, etc.
@@ -45,7 +45,7 @@ use super::{
4545
/// ```
4646
pub struct MetricBuilder<'a> {
4747
/// Location that the metric created by this builder will be added do
48-
metrics: &'a ExecutionPlanMetricsSet,
48+
_metrics: &'a ExecutionPlanMetricsSet,
4949

5050
/// optional partition number
5151
partition: Option<usize>,
@@ -58,7 +58,7 @@ impl<'a> MetricBuilder<'a> {
5858
/// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
5959
pub fn new(metrics: &'a ExecutionPlanMetricsSet) -> Self {
6060
Self {
61-
metrics,
61+
_metrics: metrics,
6262
partition: None,
6363
labels: vec![],
6464
}
@@ -87,14 +87,7 @@ impl<'a> MetricBuilder<'a> {
8787

8888
/// Consume self and create a metric of the specified value
8989
/// registered with the MetricsSet
90-
pub fn build(self, value: MetricValue) {
91-
let Self {
92-
labels,
93-
partition,
94-
metrics,
95-
} = self;
96-
let metric = Arc::new(Metric::new_with_labels(value, partition, labels));
97-
metrics.register(metric);
90+
pub fn build(self, _value: MetricValue) {
9891
}
9992

10093
/// Consume self and create a new counter for recording output rows

datafusion/physical-plan/src/metrics/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,8 +369,8 @@ impl ExecutionPlanMetricsSet {
369369
}
370370

371371
/// Add the specified metric to the underlying metric set
372-
pub fn register(&self, metric: Arc<Metric>) {
373-
self.inner.lock().push(metric)
372+
pub fn register(&self, _metric: Arc<Metric>) {
373+
//self.inner.lock().push(metric)
374374
}
375375

376376
/// Return a clone of the inner [`MetricsSet`]

datafusion/physical-plan/src/metrics/value.rs

Lines changed: 16 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,18 @@
2020
use std::{
2121
borrow::{Borrow, Cow},
2222
fmt::Display,
23-
sync::{
24-
atomic::{AtomicUsize, Ordering},
25-
Arc,
26-
},
2723
time::Duration,
2824
};
2925

3026
use chrono::{DateTime, Utc};
3127
use datafusion_common::instant::Instant;
3228
use datafusion_execution::memory_pool::human_readable_size;
33-
use parking_lot::Mutex;
3429

3530
/// A counter to record things such as number of input or output rows
3631
///
3732
/// Note `clone`ing counters update the same underlying metrics
3833
#[derive(Debug, Clone)]
3934
pub struct Count {
40-
/// value of the metric counter
41-
value: Arc<AtomicUsize>,
4235
}
4336

4437
impl PartialEq for Count {
@@ -63,20 +56,16 @@ impl Count {
6356
/// create a new counter
6457
pub fn new() -> Self {
6558
Self {
66-
value: Arc::new(AtomicUsize::new(0)),
6759
}
6860
}
6961

7062
/// Add `n` to the metric's value
71-
pub fn add(&self, n: usize) {
72-
// relaxed ordering for operations on `value` poses no issues
73-
// we're purely using atomic ops with no associated memory ops
74-
self.value.fetch_add(n, Ordering::Relaxed);
63+
pub fn add(&self, _n: usize) {
7564
}
7665

7766
/// Get the current value
7867
pub fn value(&self) -> usize {
79-
self.value.load(Ordering::Relaxed)
68+
0
8069
}
8170
}
8271

@@ -86,8 +75,6 @@ impl Count {
8675
/// Note `clone`ing gauge update the same underlying metrics
8776
#[derive(Debug, Clone)]
8877
pub struct Gauge {
89-
/// value of the metric gauge
90-
value: Arc<AtomicUsize>,
9178
}
9279

9380
impl PartialEq for Gauge {
@@ -112,47 +99,35 @@ impl Gauge {
11299
/// create a new gauge
113100
pub fn new() -> Self {
114101
Self {
115-
value: Arc::new(AtomicUsize::new(0)),
116102
}
117103
}
118104

119105
/// Add `n` to the metric's value
120-
pub fn add(&self, n: usize) {
121-
// relaxed ordering for operations on `value` poses no issues
122-
// we're purely using atomic ops with no associated memory ops
123-
self.value.fetch_add(n, Ordering::Relaxed);
106+
pub fn add(&self, _n: usize) {
124107
}
125108

126109
/// Sub `n` from the metric's value
127-
pub fn sub(&self, n: usize) {
128-
// relaxed ordering for operations on `value` poses no issues
129-
// we're purely using atomic ops with no associated memory ops
130-
self.value.fetch_sub(n, Ordering::Relaxed);
110+
pub fn sub(&self, _n: usize) {
131111
}
132112

133113
/// Set metric's value to maximum of `n` and current value
134-
pub fn set_max(&self, n: usize) {
135-
self.value.fetch_max(n, Ordering::Relaxed);
114+
pub fn set_max(&self, _n: usize) {
136115
}
137116

138117
/// Set the metric's value to `n` and return the previous value
139-
pub fn set(&self, n: usize) -> usize {
140-
// relaxed ordering for operations on `value` poses no issues
141-
// we're purely using atomic ops with no associated memory ops
142-
self.value.swap(n, Ordering::Relaxed)
118+
pub fn set(&self, _n: usize) -> usize {
119+
0
143120
}
144121

145122
/// Get the current value
146123
pub fn value(&self) -> usize {
147-
self.value.load(Ordering::Relaxed)
124+
0
148125
}
149126
}
150127

151128
/// Measure a potentially non contiguous duration of time
152129
#[derive(Debug, Clone)]
153130
pub struct Time {
154-
/// elapsed time, in nanoseconds
155-
nanos: Arc<AtomicUsize>,
156131
}
157132

158133
impl Default for Time {
@@ -179,13 +154,11 @@ impl Time {
179154
/// times for operations.
180155
pub fn new() -> Self {
181156
Self {
182-
nanos: Arc::new(AtomicUsize::new(0)),
183157
}
184158
}
185159

186160
/// Add elapsed nanoseconds since `start`to self
187-
pub fn add_elapsed(&self, start: Instant) {
188-
self.add_duration(start.elapsed());
161+
pub fn add_elapsed(&self, _start: Instant) {
189162
}
190163

191164
/// Add duration of time to self
@@ -198,14 +171,11 @@ impl Time {
198171
/// This is based on the assumption that the timing logic in most cases is likely
199172
/// to take at least a nanosecond, and so this is reasonable mechanism to avoid
200173
/// ambiguity, especially on systems with low-resolution monotonic clocks
201-
pub fn add_duration(&self, duration: Duration) {
202-
let more_nanos = duration.as_nanos() as usize;
203-
self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
174+
pub fn add_duration(&self, _duration: Duration) {
204175
}
205176

206177
/// Add the number of nanoseconds of other `Time` to self
207-
pub fn add(&self, other: &Time) {
208-
self.add_duration(Duration::from_nanos(other.value() as u64))
178+
pub fn add(&self, _other: &Time) {
209179
}
210180

211181
/// return a scoped guard that adds the amount of time elapsed
@@ -220,16 +190,14 @@ impl Time {
220190

221191
/// Get the number of nanoseconds record by this Time metric
222192
pub fn value(&self) -> usize {
223-
self.nanos.load(Ordering::Relaxed)
193+
0
224194
}
225195
}
226196

227197
/// Stores a single timestamp, stored as the number of nanoseconds
228198
/// elapsed from Jan 1, 1970 UTC
229199
#[derive(Debug, Clone)]
230200
pub struct Timestamp {
231-
/// Time thing started
232-
timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
233201
}
234202

235203
impl Default for Timestamp {
@@ -242,7 +210,6 @@ impl Timestamp {
242210
/// Create a new timestamp and sets its value to 0
243211
pub fn new() -> Self {
244212
Self {
245-
timestamp: Arc::new(Mutex::new(None)),
246213
}
247214
}
248215

@@ -252,40 +219,23 @@ impl Timestamp {
252219
}
253220

254221
/// Sets the timestamps value to a specified time
255-
pub fn set(&self, now: DateTime<Utc>) {
256-
*self.timestamp.lock() = Some(now);
222+
pub fn set(&self, _now: DateTime<Utc>) {
257223
}
258224

259225
/// return the timestamps value at the last time `record()` was
260226
/// called.
261227
///
262228
/// Returns `None` if `record()` has not been called
263229
pub fn value(&self) -> Option<DateTime<Utc>> {
264-
*self.timestamp.lock()
230+
None
265231
}
266232

267233
/// sets the value of this timestamp to the minimum of this and other
268-
pub fn update_to_min(&self, other: &Timestamp) {
269-
let min = match (self.value(), other.value()) {
270-
(None, None) => None,
271-
(Some(v), None) => Some(v),
272-
(None, Some(v)) => Some(v),
273-
(Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
274-
};
275-
276-
*self.timestamp.lock() = min;
234+
pub fn update_to_min(&self, _other: &Timestamp) {
277235
}
278236

279237
/// sets the value of this timestamp to the maximum of this and other
280-
pub fn update_to_max(&self, other: &Timestamp) {
281-
let max = match (self.value(), other.value()) {
282-
(None, None) => None,
283-
(Some(v), None) => Some(v),
284-
(None, Some(v)) => Some(v),
285-
(Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
286-
};
287-
288-
*self.timestamp.lock() = max;
238+
pub fn update_to_max(&self, _other: &Timestamp) {
289239
}
290240
}
291241

0 commit comments

Comments
 (0)