Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 32 additions & 20 deletions Datadog/Datadog.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import DatadogInternal
/// The message-bus sends messages to a set of registered receivers.
///
/// The bus dispatches messages on a serial queue.
internal final class MessageBus {
internal final class CoreMessageBus: @unchecked Sendable {
/// The message bus GDC queue.
let queue = DispatchQueue(
label: "com.datadoghq.ios-sdk-message-bus",
Expand All @@ -27,6 +27,25 @@ internal final class MessageBus {
/// The bus **must** be accessed within the queue.
private var bus: [String: FeatureMessageReceiver] = [:]

/// A closure that delivers a `BusMessage` to a single subscriber.
///
/// Captures the receiver strongly and performs the runtime cast from `Any`
/// to the receiver's expected `Message` type.
private typealias Dispatch = (Any, DatadogCoreProtocol) -> Void

/// Typed `BusMessageReceiver` subscribers grouped by `BusMessage.key`.
///
/// The outer key is the `BusMessage.key` of the message kind a subscriber
/// is registered for; the inner key is the receiver's object identity. The
/// keyed layout means dispatch only iterates receivers for the matching
/// message kind instead of the full subscriber set.
///
/// Each `Dispatch` captures its receiver strongly, so the bus retains
/// subscribers until they are explicitly removed via `unsubscribe(receiver:)`.
///
/// Must be accessed within the queue.
private var receivers: [String: [ObjectIdentifier: Dispatch]] = [:]

/// The current configuration.
///
/// The message-bus wil accumulate configuration by merge. A message
Expand All @@ -42,11 +61,18 @@ internal final class MessageBus {
/// - Parameter configurationDispatchTime: The delay to dispatch the
/// configuration telemetry
init(configurationDispatchTime: DispatchTimeInterval = .seconds(5)) {
queue.asyncAfter(deadline: .now() + configurationDispatchTime) {
guard let core = self.core, let configuration = self.configuration else {
queue.asyncAfter(deadline: .now() + configurationDispatchTime) { [weak self] in
guard let self = self, let core = self.core, let configuration = self.configuration else {
return
}

// Dispatch via typed bus to TelemetryMessage subscribers.
if let bucket = self.receivers[TelemetryMessage.key] {
let message = TelemetryMessage.configuration(configuration)
bucket.values.forEach { dispatch in dispatch(message, core) }
}

// Dispatch via legacy bus for receivers still on the legacy bus.
self.bus.values.forEach {
$0.receive(message: .telemetry(.configuration(configuration)), from: core)
}
Expand Down Expand Up @@ -125,7 +151,69 @@ internal final class MessageBus {
}
}

extension MessageBus: Flushable {
extension CoreMessageBus: MessageBus {
/// Adds `receiver` to the bucket for `Receiver.Message.key`.
///
/// The receiver is retained by the bus (via the captured closure) until
/// `unsubscribe(receiver:)` is called. Re-subscribing the same instance for
/// the same message kind replaces the previous subscription — entries are
/// keyed by object identity.
func subscribe<Receiver>(receiver: Receiver) where Receiver: BusMessageReceiver {
queue.async {
let id = ObjectIdentifier(receiver)
self.receivers[Receiver.Message.key, default: [:]][id] = { message, core in
guard let typed = message as? Receiver.Message else {
return
}
receiver.receive(message: typed, from: core)
}
}
}

/// Removes `receiver` from the bucket for `Receiver.Message.key`.
///
/// No-op if `receiver` is not currently subscribed. Empty buckets are
/// pruned to keep the registry tidy.
func unsubscribe<Receiver>(receiver: Receiver) where Receiver: BusMessageReceiver {
queue.async {
let key = Receiver.Message.key
let id = ObjectIdentifier(receiver)
self.receivers[key]?.removeValue(forKey: id)
if self.receivers[key]?.isEmpty == true {
self.receivers.removeValue(forKey: key)
}
}
}

/// Publishes `message` to every receiver in the bucket for `Message.key`.
///
/// Configuration telemetry messages are intercepted and accumulated for deferred
/// batch dispatch; they are never routed to subscribers immediately.
///
/// `fallback` is invoked when the bus has no core, or when the bucket is
/// empty. Delivery is dispatched on the bus's serial queue.
func send<Message>(message: Message, else fallback: @escaping () -> Void) where Message: BusMessage {
// Intercept configuration telemetry for deferred accumulated dispatch.
if let telemetry = message as? TelemetryMessage, case .configuration(let config) = telemetry {
save(configuration: config)
return
}

queue.async {
guard let core = self.core else {
return fallback()
}
guard let bucket = self.receivers[Message.key], !bucket.isEmpty else {
return fallback()
}
bucket.values.forEach { dispatch in
dispatch(message, core)
}
}
}
}

extension CoreMessageBus: Flushable {
/// Awaits completion of all asynchronous operations.
///
/// **blocks the caller thread**
Expand All @@ -134,7 +222,7 @@ extension MessageBus: Flushable {
}
}

extension MessageBus: Telemetry {
extension CoreMessageBus: Telemetry {
func send(telemetry: TelemetryMessage) {
send(message: .telemetry(telemetry))
}
Expand Down
7 changes: 5 additions & 2 deletions DatadogCore/Sources/Core/DatadogCore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ internal final class DatadogCore {
let applicationVersionPublisher: ApplicationVersionPublisher

/// The message-bus instance.
let bus = MessageBus()
let bus = CoreMessageBus()

/// Registry for Features.
@ReadWriteLock
Expand Down Expand Up @@ -121,7 +121,8 @@ internal final class DatadogCore {

// forward any context change on the message-bus
self.contextProvider.publish { [weak self] context in
self?.send(message: .context(context))
self?.bus.send(message: context) // typed bus (new receivers)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Replay initial context on the typed bus

This forwards only future context writes to typed subscribers, but registration still replays the current context only through add(messageReceiver:), which sends .context to the feature's legacy messageReceiver. The two migrated subscribers are now registered with NOPFeatureMessageReceiver() in CrossPlatformExtension and register(urlSessionHandler:), so if shared-context or network instrumentation is registered after user/RUM/trace context already exists, their typed receivers start with nil/stale context until another context mutation occurs; early shared-context callbacks and intercepted requests will miss that context.

Useful? React with 👍 / 👎.

self?.send(message: .context(context)) // legacy bus (receivers pending migration)
}
}

Expand Down Expand Up @@ -323,6 +324,8 @@ internal final class DatadogCore {
}

extension DatadogCore: DatadogCoreProtocol {
var messageBus: MessageBus { bus }

/// Registers a Feature instance.
///
/// A Feature collects and transfers data to a Datadog Product (e.g. Logs, RUM, ...). A registered Feature can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,16 @@

import DatadogInternal

internal final class ContextSharingTransformer: FeatureMessageReceiver, ContextValuePublisher {
internal final class ContextSharingTransformer: BusMessageReceiver, ContextValuePublisher {
@ReadWriteLock
private var sharedContext: SharedContext? = nil
@ReadWriteLock
private var receiver: ContextValueReceiver<SharedContext?>? = nil

// MARK: - FeatureMessageReceiver

func receive(message: FeatureMessage, from core: DatadogCoreProtocol) -> Bool {
switch message {
case .context(let context):
let newContext = SharedContext(datadogContext: context)
sharedContext = newContext
receiver?(newContext)
return true
default:
return false
}
func receive(message context: DatadogContext, from core: DatadogCoreProtocol) {
let newContext = SharedContext(datadogContext: context)
sharedContext = newContext
receiver?(newContext)
}

// MARK: - ContextValuePublisher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ public final class CrossPlatformExtension: NSObject {
if Self.contextSharingTransformer == nil {
let core = CoreRegistry.default
let contextSharingTransformer = ContextSharingTransformer()
try? core.register(feature: ContextSharingFeature(messageReceiver: contextSharingTransformer))
// Subscribe before registration so initial context push is received:
core.messageBus.subscribe(receiver: contextSharingTransformer)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Unsubscribe shared context from the bus

Because MessageBus.subscribe(receiver:) retains the receiver until unsubscribe(receiver:), this added subscription survives unsubscribeFromSharedContext(): that method only cancels the callback and clears the static reference, while the registered feature now holds only a NOPFeatureMessageReceiver. In apps that unsubscribe and later resubscribe the cross-platform bridge, each cycle leaves another ContextSharingTransformer retained and processing every context update, so the documented drop of the subscription leaks work and memory unless the receiver is removed from the bus.

Useful? React with 👍 / 👎.

try? core.register(feature: ContextSharingFeature(messageReceiver: NOPFeatureMessageReceiver()))
Self.contextSharingTransformer = contextSharingTransformer
}
contextSharingTransformer?.publish(to: toSharedContext)
Expand Down
Loading