Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 56 additions & 56 deletions Datadog/Datadog.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import DatadogInternal
/// The message-bus sends messages to a set of registered receivers.
///
/// The bus dispatches messages on a serial queue.
internal final class MessageBus {
internal final class CoreMessageBus: @unchecked Sendable {
/// The message bus GDC queue.
let queue = DispatchQueue(
label: "com.datadoghq.ios-sdk-message-bus",
Expand All @@ -27,6 +27,25 @@ internal final class MessageBus {
/// The bus **must** be accessed within the queue.
private var bus: [String: FeatureMessageReceiver] = [:]

/// A closure that delivers a `BusMessage` to a single subscriber.
///
/// Captures the receiver strongly and performs the runtime cast from `Any`
/// to the receiver's expected `Message` type.
private typealias Dispatch = (Any, DatadogCoreProtocol) -> Void

/// Typed `BusMessageReceiver` subscribers grouped by `BusMessage.key`.
///
/// The outer key is the `BusMessage.key` of the message kind a subscriber
/// is registered for; the inner key is the receiver's object identity. The
/// keyed layout means dispatch only iterates receivers for the matching
/// message kind instead of the full subscriber set.
///
/// Each `Dispatch` captures its receiver strongly, so the bus retains
/// subscribers until they are explicitly removed via `unsubscribe(receiver:)`.
///
/// Must be accessed within the queue.
private var receivers: [String: [ObjectIdentifier: Dispatch]] = [:]

/// The current configuration.
///
/// The message-bus wil accumulate configuration by merge. A message
Expand All @@ -42,11 +61,18 @@ internal final class MessageBus {
/// - Parameter configurationDispatchTime: The delay to dispatch the
/// configuration telemetry
init(configurationDispatchTime: DispatchTimeInterval = .seconds(5)) {
queue.asyncAfter(deadline: .now() + configurationDispatchTime) {
guard let core = self.core, let configuration = self.configuration else {
queue.asyncAfter(deadline: .now() + configurationDispatchTime) { [weak self] in
guard let self = self, let core = self.core, let configuration = self.configuration else {
return
}

// Dispatch via typed bus to TelemetryMessage subscribers.
if let bucket = self.receivers[TelemetryMessage.key] {
let message = TelemetryMessage.configuration(configuration)
bucket.values.forEach { dispatch in dispatch(message, core) }
}

// Dispatch via legacy bus for receivers still on the legacy bus.
self.bus.values.forEach {
$0.receive(message: .telemetry(.configuration(configuration)), from: core)
}
Expand Down Expand Up @@ -125,7 +151,69 @@ internal final class MessageBus {
}
}

extension MessageBus: Flushable {
extension CoreMessageBus: MessageBus {
/// Adds `receiver` to the bucket for `Receiver.Message.key`.
///
/// The receiver is retained by the bus (via the captured closure) until
/// `unsubscribe(receiver:)` is called. Re-subscribing the same instance for
/// the same message kind replaces the previous subscription — entries are
/// keyed by object identity.
func subscribe<Receiver>(receiver: Receiver) where Receiver: BusMessageReceiver {
queue.async {
let id = ObjectIdentifier(receiver)
self.receivers[Receiver.Message.key, default: [:]][id] = { message, core in
guard let typed = message as? Receiver.Message else {
return
}
receiver.receive(message: typed, from: core)
}
}
}

/// Removes `receiver` from the bucket for `Receiver.Message.key`.
///
/// No-op if `receiver` is not currently subscribed. Empty buckets are
/// pruned to keep the registry tidy.
func unsubscribe<Receiver>(receiver: Receiver) where Receiver: BusMessageReceiver {
queue.async {
let key = Receiver.Message.key
let id = ObjectIdentifier(receiver)
self.receivers[key]?.removeValue(forKey: id)
if self.receivers[key]?.isEmpty == true {
self.receivers.removeValue(forKey: key)
}
}
}

/// Publishes `message` to every receiver in the bucket for `Message.key`.
///
/// Configuration telemetry messages are intercepted and accumulated for deferred
/// batch dispatch; they are never routed to subscribers immediately.
///
/// `fallback` is invoked when the bus has no core, or when the bucket is
/// empty. Delivery is dispatched on the bus's serial queue.
func send<Message>(message: Message, else fallback: @escaping () -> Void) where Message: BusMessage {
// Intercept configuration telemetry for deferred accumulated dispatch.
if let telemetry = message as? TelemetryMessage, case .configuration(let config) = telemetry {
save(configuration: config)
return
}

queue.async {
guard let core = self.core else {
return fallback()
}
guard let bucket = self.receivers[Message.key], !bucket.isEmpty else {
return fallback()
}
bucket.values.forEach { dispatch in
dispatch(message, core)
}
}
}
}

extension CoreMessageBus: Flushable {
/// Awaits completion of all asynchronous operations.
///
/// **blocks the caller thread**
Expand All @@ -134,7 +222,7 @@ extension MessageBus: Flushable {
}
}

extension MessageBus: Telemetry {
extension CoreMessageBus: Telemetry {
func send(telemetry: TelemetryMessage) {
send(message: .telemetry(telemetry))
}
Expand Down
7 changes: 5 additions & 2 deletions DatadogCore/Sources/Core/DatadogCore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ internal final class DatadogCore {
let applicationVersionPublisher: ApplicationVersionPublisher

/// The message-bus instance.
let bus = MessageBus()
let bus = CoreMessageBus()

/// Registry for Features.
@ReadWriteLock
Expand Down Expand Up @@ -121,7 +121,8 @@ internal final class DatadogCore {

// forward any context change on the message-bus
self.contextProvider.publish { [weak self] context in
self?.send(message: .context(context))
self?.bus.send(message: context) // typed bus (new receivers)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Replay initial context on the typed bus

This forwards only future context writes to typed subscribers, but registration still replays the current context only through add(messageReceiver:), which sends .context to the feature's legacy messageReceiver. The two migrated subscribers are now registered with NOPFeatureMessageReceiver() in CrossPlatformExtension and register(urlSessionHandler:), so if shared-context or network instrumentation is registered after user/RUM/trace context already exists, their typed receivers start with nil/stale context until another context mutation occurs; early shared-context callbacks and intercepted requests will miss that context.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial context replay for typed bus subscribers will be tackled in a follow-up PR.

self?.send(message: .context(context)) // legacy bus (receivers pending migration)
}
}

Expand Down Expand Up @@ -323,6 +324,8 @@ internal final class DatadogCore {
}

extension DatadogCore: DatadogCoreProtocol {
var messageBus: MessageBus { bus }

/// Registers a Feature instance.
///
/// A Feature collects and transfers data to a Datadog Product (e.g. Logs, RUM, ...). A registered Feature can
Expand Down

This file was deleted.

This file was deleted.

19 changes: 9 additions & 10 deletions DatadogCore/Sources/Extensions/CrossPlatformExtension.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import DatadogInternal
@objcMembers
@_spi(Internal)
public final class CrossPlatformExtension: NSObject {
private static var contextSharingTransformer: ContextSharingTransformer?
private static var contextSubscription: MessageBusSubscription?

/// Subscribes to shared context updates.
///
Expand All @@ -26,21 +26,20 @@ public final class CrossPlatformExtension: NSObject {
/// - Parameter toSharedContext: A closure that receives `SharedContext` updates. Called on context changes.
@objc
public static func subscribe(toSharedContext: @escaping (SharedContext?) -> Void) {
if Self.contextSharingTransformer == nil {
let core = CoreRegistry.default
let contextSharingTransformer = ContextSharingTransformer()
try? core.register(feature: ContextSharingFeature(messageReceiver: contextSharingTransformer))
Self.contextSharingTransformer = contextSharingTransformer
guard contextSubscription == nil else {
return
Comment on lines +29 to +30

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Allow resubscribe to replace the shared-context callback

When the bridge calls subscribe(toSharedContext:) again without first calling unsubscribeFromSharedContext() (for example, recreating a cross-platform bridge while the SDK stays initialized), this guard returns and keeps the first closure registered. Before this change publish(to:) ran on every subscribe call, so a later bridge could replace the callback; now the new bridge never receives context updates and the stale callback remains retained until an explicit unsubscribe.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intended contract here that callers must always unsubscribe before subscribing again? If so, it might be worth documenting that, since the previous implementation allowed a later subscribe call to replace the callback.

}
contextSubscription = CoreRegistry.default.messageBus.subscribe { (context: DatadogContext, _) in
toSharedContext(SharedContext(datadogContext: context))
}
contextSharingTransformer?.publish(to: toSharedContext)
}

/// Drops the subscription to `SharedContext`, and removes the static refrence.
/// Drops the subscription to `SharedContext`.
///
/// Note: that it doesn't remove the registered feature.
@objc
public static func unsubscribeFromSharedContext() {
contextSharingTransformer?.cancel()
contextSharingTransformer = nil
contextSubscription.map { CoreRegistry.default.messageBus.unsubscribe($0) }
contextSubscription = nil
}
}
Loading