Skip to content

Commit ef14f0d

Browse files
davidv1992rnijveld
authored andcommitted
Split measurements into outgoing and incoming.
1 parent ea1c50c commit ef14f0d

File tree

10 files changed

+298
-168
lines changed

10 files changed

+298
-168
lines changed

ntp-proto/src/algorithm/mod.rs

Lines changed: 201 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{fmt::Debug, time::Duration};
33
use serde::{Deserialize, Serialize, de::DeserializeOwned};
44

55
use crate::{
6-
ClockId, NtpLeapIndicator, NtpPacket, PollInterval,
6+
ClockId, NtpLeapIndicator, PollInterval,
77
clock::NtpClock,
88
config::{SourceConfig, SynchronizationConfig},
99
system::TimeSnapshot,
@@ -139,26 +139,12 @@ pub struct InternalMeasurement<D: Debug + Copy + Clone> {
139139
pub precision: i8,
140140
}
141141

142-
impl<D: Debug + Copy + Clone> From<Measurement<D>> for InternalMeasurement<D> {
143-
fn from(value: Measurement<D>) -> Self {
144-
Self {
145-
delay: value.delay,
146-
offset: value.offset,
147-
localtime: value.localtime,
148-
stratum: value.stratum,
149-
root_delay: value.root_delay,
150-
root_dispersion: value.root_dispersion,
151-
leap: value.leap,
152-
precision: value.precision,
153-
}
154-
}
155-
}
156-
157142
#[derive(Debug, Copy, Clone)]
158-
pub struct Measurement<D: Debug + Copy + Clone> {
159-
pub delay: D,
160-
pub offset: NtpDuration,
161-
pub localtime: NtpTimestamp,
143+
pub struct Measurement {
144+
pub sender_id: ClockId,
145+
pub receiver_id: ClockId,
146+
pub sender_ts: NtpTimestamp,
147+
pub receiver_ts: NtpTimestamp,
162148

163149
pub stratum: u8,
164150
pub root_delay: NtpDuration,
@@ -167,29 +153,6 @@ pub struct Measurement<D: Debug + Copy + Clone> {
167153
pub precision: i8,
168154
}
169155

170-
impl Measurement<NtpDuration> {
171-
pub(crate) fn from_packet(
172-
packet: &NtpPacket,
173-
send_timestamp: NtpTimestamp,
174-
recv_timestamp: NtpTimestamp,
175-
) -> Self {
176-
Self {
177-
delay: (recv_timestamp - send_timestamp)
178-
- (packet.transmit_timestamp() - packet.receive_timestamp()),
179-
offset: ((packet.receive_timestamp() - send_timestamp)
180-
+ (packet.transmit_timestamp() - recv_timestamp))
181-
/ 2,
182-
localtime: send_timestamp + (recv_timestamp - send_timestamp) / 2,
183-
184-
stratum: packet.stratum(),
185-
root_delay: packet.root_delay(),
186-
root_dispersion: packet.root_dispersion(),
187-
leap: packet.leap(),
188-
precision: packet.precision(),
189-
}
190-
}
191-
}
192-
193156
pub trait TimeSyncController: Sized + Send + 'static {
194157
type Clock: NtpClock;
195158
type AlgorithmConfig: Debug + Copy + DeserializeOwned + Send;
@@ -198,12 +161,10 @@ pub trait TimeSyncController: Sized + Send + 'static {
198161
type NtpSourceController: SourceController<
199162
ControllerMessage = Self::ControllerMessage,
200163
SourceMessage = Self::SourceMessage,
201-
MeasurementDelay = NtpDuration,
202164
>;
203165
type OneWaySourceController: SourceController<
204166
ControllerMessage = Self::ControllerMessage,
205167
SourceMessage = Self::SourceMessage,
206-
MeasurementDelay = (),
207168
>;
208169

209170
/// Create a new clock controller controlling the given clock
@@ -249,7 +210,7 @@ impl<T: InternalTimeSyncController> TimeSyncController for T {
249210
type AlgorithmConfig = T::AlgorithmConfig;
250211
type ControllerMessage = T::ControllerMessage;
251212
type SourceMessage = T::SourceMessage;
252-
type NtpSourceController = T::NtpSourceController;
213+
type NtpSourceController = TwoWaySourceControllerWrapper<T::NtpSourceController>;
253214
type OneWaySourceController = T::OneWaySourceController;
254215

255216
fn new(
@@ -269,7 +230,10 @@ impl<T: InternalTimeSyncController> TimeSyncController for T {
269230
id: ClockId,
270231
source_config: SourceConfig,
271232
) -> Self::NtpSourceController {
272-
T::add_source(self, id, source_config)
233+
TwoWaySourceControllerWrapper {
234+
inner: T::add_source(self, id, source_config),
235+
last_outgoing_measurement: None,
236+
}
273237
}
274238

275239
fn add_one_way_source(
@@ -306,34 +270,38 @@ impl<T: InternalTimeSyncController> TimeSyncController for T {
306270
pub trait SourceController: Sized + Send + 'static {
307271
type ControllerMessage: Debug + Clone + Send + 'static;
308272
type SourceMessage: Debug + Clone + Send + 'static;
309-
type MeasurementDelay: Debug + Copy + Clone;
310273

311274
fn handle_message(&mut self, message: Self::ControllerMessage);
312275

313-
fn handle_measurement(
314-
&mut self,
315-
measurement: Measurement<Self::MeasurementDelay>,
316-
) -> Option<Self::SourceMessage>;
276+
fn handle_measurement(&mut self, measurement: Measurement) -> Option<Self::SourceMessage>;
317277

318278
fn desired_poll_interval(&self) -> PollInterval;
319279

320280
fn observe(&self) -> ObservableSourceTimedata;
321281
}
322282

323-
impl<T: InternalSourceController> SourceController for T {
283+
impl<T: InternalSourceController<MeasurementDelay = ()>> SourceController for T {
324284
type ControllerMessage = T::ControllerMessage;
325285
type SourceMessage = T::SourceMessage;
326-
type MeasurementDelay = T::MeasurementDelay;
327286

328287
fn handle_message(&mut self, message: Self::ControllerMessage) {
329288
T::handle_message(self, message);
330289
}
331290

332-
fn handle_measurement(
333-
&mut self,
334-
measurement: Measurement<Self::MeasurementDelay>,
335-
) -> Option<Self::SourceMessage> {
336-
T::handle_measurement(self, measurement.into())
291+
fn handle_measurement(&mut self, measurement: Measurement) -> Option<Self::SourceMessage> {
292+
T::handle_measurement(
293+
self,
294+
InternalMeasurement {
295+
delay: (),
296+
offset: measurement.sender_ts - measurement.receiver_ts,
297+
localtime: measurement.receiver_ts,
298+
stratum: measurement.stratum,
299+
root_delay: measurement.root_delay,
300+
root_dispersion: measurement.root_dispersion,
301+
leap: measurement.leap,
302+
precision: measurement.precision,
303+
},
304+
)
337305
}
338306

339307
fn desired_poll_interval(&self) -> PollInterval {
@@ -344,3 +312,177 @@ impl<T: InternalSourceController> SourceController for T {
344312
T::observe(self)
345313
}
346314
}
315+
316+
pub struct TwoWaySourceControllerWrapper<
317+
T: InternalSourceController<MeasurementDelay = NtpDuration>,
318+
> {
319+
inner: T,
320+
last_outgoing_measurement: Option<Measurement>,
321+
}
322+
323+
impl<T: InternalSourceController<MeasurementDelay = NtpDuration>> SourceController
324+
for TwoWaySourceControllerWrapper<T>
325+
{
326+
type ControllerMessage = T::ControllerMessage;
327+
type SourceMessage = T::SourceMessage;
328+
329+
fn handle_message(&mut self, message: Self::ControllerMessage) {
330+
self.inner.handle_message(message);
331+
}
332+
333+
fn handle_measurement(&mut self, measurement: Measurement) -> Option<Self::SourceMessage> {
334+
if measurement.sender_id == ClockId::SYSTEM {
335+
// This is an outgoing measurement, store it for later
336+
self.last_outgoing_measurement = Some(measurement);
337+
None
338+
} else {
339+
// This is an incoming measurement, we need to have an outgoing one to compute the delay
340+
let last_outgoing = self.last_outgoing_measurement.take()?;
341+
self.inner.handle_measurement(InternalMeasurement {
342+
delay: (measurement.receiver_ts - last_outgoing.sender_ts)
343+
- (measurement.sender_ts - last_outgoing.receiver_ts),
344+
offset: ((last_outgoing.receiver_ts - last_outgoing.sender_ts)
345+
+ (measurement.sender_ts - measurement.receiver_ts))
346+
/ 2,
347+
localtime: measurement.receiver_ts,
348+
stratum: measurement.stratum,
349+
root_delay: measurement.root_delay,
350+
root_dispersion: measurement.root_dispersion,
351+
leap: measurement.leap,
352+
precision: measurement.precision,
353+
})
354+
}
355+
}
356+
357+
fn desired_poll_interval(&self) -> PollInterval {
358+
self.inner.desired_poll_interval()
359+
}
360+
361+
fn observe(&self) -> ObservableSourceTimedata {
362+
self.inner.observe()
363+
}
364+
}
365+
366+
#[cfg(test)]
367+
mod tests {
368+
use super::*;
369+
370+
struct TestInternalSourceController {
371+
last_measurement: Option<InternalMeasurement<NtpDuration>>,
372+
}
373+
374+
impl InternalSourceController for TestInternalSourceController {
375+
type ControllerMessage = ();
376+
type SourceMessage = ();
377+
type MeasurementDelay = NtpDuration;
378+
379+
fn handle_message(&mut self, _message: Self::ControllerMessage) {
380+
unimplemented!()
381+
}
382+
383+
fn handle_measurement(
384+
&mut self,
385+
measurement: InternalMeasurement<Self::MeasurementDelay>,
386+
) -> Option<Self::SourceMessage> {
387+
self.last_measurement = Some(measurement);
388+
None
389+
}
390+
391+
fn desired_poll_interval(&self) -> PollInterval {
392+
unimplemented!()
393+
}
394+
395+
fn observe(&self) -> ObservableSourceTimedata {
396+
unimplemented!()
397+
}
398+
}
399+
400+
#[test]
401+
fn test_measurements_from_packet() {
402+
let mut measurement_outgoing = Measurement {
403+
sender_id: ClockId::SYSTEM,
404+
receiver_id: ClockId(1),
405+
sender_ts: NtpTimestamp::from_fixed_int(0),
406+
receiver_ts: NtpTimestamp::from_fixed_int(1),
407+
stratum: 0,
408+
root_delay: NtpDuration::from_fixed_int(0),
409+
root_dispersion: NtpDuration::from_fixed_int(0),
410+
leap: NtpLeapIndicator::NoWarning,
411+
precision: 0,
412+
};
413+
let mut measurement_incoming = Measurement {
414+
sender_id: ClockId(1),
415+
receiver_id: ClockId::SYSTEM,
416+
sender_ts: NtpTimestamp::from_fixed_int(2),
417+
receiver_ts: NtpTimestamp::from_fixed_int(3),
418+
stratum: 0,
419+
root_delay: NtpDuration::from_fixed_int(0),
420+
root_dispersion: NtpDuration::from_fixed_int(0),
421+
leap: NtpLeapIndicator::NoWarning,
422+
precision: 0,
423+
};
424+
425+
let mut controller = TwoWaySourceControllerWrapper {
426+
inner: TestInternalSourceController {
427+
last_measurement: None,
428+
},
429+
last_outgoing_measurement: None,
430+
};
431+
measurement_outgoing.sender_ts = NtpTimestamp::from_fixed_int(0);
432+
measurement_outgoing.receiver_ts = NtpTimestamp::from_fixed_int(1);
433+
measurement_incoming.sender_ts = NtpTimestamp::from_fixed_int(2);
434+
measurement_incoming.receiver_ts = NtpTimestamp::from_fixed_int(3);
435+
controller.handle_measurement(measurement_outgoing);
436+
controller.handle_measurement(measurement_incoming);
437+
assert_eq!(
438+
controller.inner.last_measurement.unwrap().offset,
439+
NtpDuration::from_fixed_int(0)
440+
);
441+
assert_eq!(
442+
controller.inner.last_measurement.unwrap().delay,
443+
NtpDuration::from_fixed_int(2)
444+
);
445+
446+
let mut controller = TwoWaySourceControllerWrapper {
447+
inner: TestInternalSourceController {
448+
last_measurement: None,
449+
},
450+
last_outgoing_measurement: None,
451+
};
452+
measurement_outgoing.sender_ts = NtpTimestamp::from_fixed_int(0);
453+
measurement_outgoing.receiver_ts = NtpTimestamp::from_fixed_int(2);
454+
measurement_incoming.sender_ts = NtpTimestamp::from_fixed_int(3);
455+
measurement_incoming.receiver_ts = NtpTimestamp::from_fixed_int(3);
456+
controller.handle_measurement(measurement_outgoing);
457+
controller.handle_measurement(measurement_incoming);
458+
assert_eq!(
459+
controller.inner.last_measurement.unwrap().offset,
460+
NtpDuration::from_fixed_int(1)
461+
);
462+
assert_eq!(
463+
controller.inner.last_measurement.unwrap().delay,
464+
NtpDuration::from_fixed_int(2)
465+
);
466+
467+
let mut controller = TwoWaySourceControllerWrapper {
468+
inner: TestInternalSourceController {
469+
last_measurement: None,
470+
},
471+
last_outgoing_measurement: None,
472+
};
473+
measurement_outgoing.sender_ts = NtpTimestamp::from_fixed_int(0);
474+
measurement_outgoing.receiver_ts = NtpTimestamp::from_fixed_int(0);
475+
measurement_incoming.sender_ts = NtpTimestamp::from_fixed_int(5);
476+
measurement_incoming.receiver_ts = NtpTimestamp::from_fixed_int(3);
477+
controller.handle_measurement(measurement_outgoing);
478+
controller.handle_measurement(measurement_incoming);
479+
assert_eq!(
480+
controller.inner.last_measurement.unwrap().offset,
481+
NtpDuration::from_fixed_int(1)
482+
);
483+
assert_eq!(
484+
controller.inner.last_measurement.unwrap().delay,
485+
NtpDuration::from_fixed_int(-2)
486+
);
487+
}
488+
}

ntp-proto/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,8 @@ mod generic {
226226
pub struct ClockId(u64);
227227

228228
impl ClockId {
229+
pub const SYSTEM: ClockId = ClockId(0);
230+
229231
pub fn new() -> ClockId {
230232
static COUNTER: AtomicU64 = AtomicU64::new(1);
231233
ClockId(COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
@@ -248,7 +250,7 @@ mod exports {
248250
pub use super::algorithm::{
249251
AlgorithmConfig, KalmanClockController, KalmanControllerMessage, KalmanSourceController,
250252
KalmanSourceMessage, Measurement, ObservableSourceTimedata, SourceController, StateUpdate,
251-
TimeSyncController, TwoWayKalmanSourceController,
253+
TimeSyncController, TwoWayKalmanSourceController, TwoWaySourceControllerWrapper,
252254
};
253255
pub use super::clock::NtpClock;
254256
pub use super::config::{SourceConfig, StepThreshold, SynchronizationConfig};

0 commit comments

Comments
 (0)