Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions Datadog/Datadog.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import DatadogInternal
/// The message-bus sends messages to a set of registered receivers.
///
/// The bus dispatches messages on a serial queue.
internal final class MessageBus {
internal final class CoreMessageBus: @unchecked Sendable {
/// The message bus GDC queue.
let queue = DispatchQueue(
label: "com.datadoghq.ios-sdk-message-bus",
Expand All @@ -27,6 +27,25 @@ internal final class MessageBus {
/// The bus **must** be accessed within the queue.
private var bus: [String: FeatureMessageReceiver] = [:]

/// A closure that delivers a `BusMessage` to a single subscriber.
///
/// Captures the receiver strongly and performs the runtime cast from `Any`
/// to the receiver's expected `Message` type.
private typealias Dispatch = (Any, DatadogCoreProtocol) -> Void

/// Typed `BusMessageReceiver` subscribers grouped by `BusMessage.key`.
///
/// The outer key is the `BusMessage.key` of the message kind a subscriber
/// is registered for; the inner key is the receiver's object identity. The
/// keyed layout means dispatch only iterates receivers for the matching
/// message kind instead of the full subscriber set.
///
/// Each `Dispatch` captures its receiver strongly, so the bus retains
/// subscribers until they are explicitly removed via `unsubscribe(receiver:)`.
///
/// Must be accessed within the queue.
private var receivers: [String: [ObjectIdentifier: Dispatch]] = [:]

/// The current configuration.
///
/// The message-bus wil accumulate configuration by merge. A message
Expand All @@ -42,11 +61,18 @@ internal final class MessageBus {
/// - Parameter configurationDispatchTime: The delay to dispatch the
/// configuration telemetry
init(configurationDispatchTime: DispatchTimeInterval = .seconds(5)) {
queue.asyncAfter(deadline: .now() + configurationDispatchTime) {
guard let core = self.core, let configuration = self.configuration else {
queue.asyncAfter(deadline: .now() + configurationDispatchTime) { [weak self] in
guard let self = self, let core = self.core, let configuration = self.configuration else {
return
}

// Dispatch via typed bus to TelemetryMessage subscribers.
if let bucket = self.receivers[TelemetryMessage.key] {
let message = TelemetryMessage.configuration(configuration)
bucket.values.forEach { dispatch in dispatch(message, core) }
}

// Dispatch via legacy bus for receivers still on the legacy bus.
self.bus.values.forEach {
$0.receive(message: .telemetry(.configuration(configuration)), from: core)
}
Expand Down Expand Up @@ -125,7 +151,69 @@ internal final class MessageBus {
}
}

extension MessageBus: Flushable {
extension CoreMessageBus: MessageBus {
/// Adds `receiver` to the bucket for `Receiver.Message.key`.
///
/// The receiver is retained by the bus (via the captured closure) until
/// `unsubscribe(receiver:)` is called. Re-subscribing the same instance for
/// the same message kind replaces the previous subscription — entries are
/// keyed by object identity.
func subscribe<Receiver>(receiver: Receiver) where Receiver: BusMessageReceiver {
queue.async {
let id = ObjectIdentifier(receiver)
self.receivers[Receiver.Message.key, default: [:]][id] = { message, core in
guard let typed = message as? Receiver.Message else {
return
}
receiver.receive(message: typed, from: core)
}
}
}

/// Removes `receiver` from the bucket for `Receiver.Message.key`.
///
/// No-op if `receiver` is not currently subscribed. Empty buckets are
/// pruned to keep the registry tidy.
func unsubscribe<Receiver>(receiver: Receiver) where Receiver: BusMessageReceiver {
queue.async {
let key = Receiver.Message.key
let id = ObjectIdentifier(receiver)
self.receivers[key]?.removeValue(forKey: id)
if self.receivers[key]?.isEmpty == true {
self.receivers.removeValue(forKey: key)
}
}
}

/// Publishes `message` to every receiver in the bucket for `Message.key`.
///
/// Configuration telemetry messages are intercepted and accumulated for deferred
/// batch dispatch; they are never routed to subscribers immediately.
///
/// `fallback` is invoked when the bus has no core, or when the bucket is
/// empty. Delivery is dispatched on the bus's serial queue.
func send<Message>(message: Message, else fallback: @escaping () -> Void) where Message: BusMessage {
// Intercept configuration telemetry for deferred accumulated dispatch.
if let telemetry = message as? TelemetryMessage, case .configuration(let config) = telemetry {
save(configuration: config)
return
}

queue.async {
guard let core = self.core else {
return fallback()
}
guard let bucket = self.receivers[Message.key], !bucket.isEmpty else {
return fallback()
}
bucket.values.forEach { dispatch in
dispatch(message, core)
}
}
}
}

extension CoreMessageBus: Flushable {
/// Awaits completion of all asynchronous operations.
///
/// **blocks the caller thread**
Expand Down
11 changes: 7 additions & 4 deletions DatadogCore/Sources/Core/DatadogCore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ internal final class DatadogCore {
let applicationVersionPublisher: ApplicationVersionPublisher

/// The message-bus instance.
let bus = MessageBus()
let bus = CoreMessageBus()

/// Registry for Features.
@ReadWriteLock
Expand Down Expand Up @@ -119,9 +119,9 @@ internal final class DatadogCore {
// the bus will keep a weak ref to the core.
bus.connect(core: self)

// forward any context change on the message-bus
// forward any context change on the typed message-bus
self.contextProvider.publish { [weak self] context in
self?.send(message: .context(context))
self?.bus.send(message: context)
}
}

Expand Down Expand Up @@ -264,8 +264,9 @@ internal final class DatadogCore {
/// - key: The key associated with the receiver.
private func add(messageReceiver: FeatureMessageReceiver, forKey key: String) {
bus.connect(messageReceiver, forKey: key)
// Push current context to typed-bus DatadogContext subscribers.
contextProvider.read { context in
self.bus.queue.async { messageReceiver.receive(message: .context(context), from: self) }
self.bus.send(message: context)
}
}

Expand Down Expand Up @@ -323,6 +324,8 @@ internal final class DatadogCore {
}

extension DatadogCore: DatadogCoreProtocol {
var messageBus: MessageBus { bus }

/// Registers a Feature instance.
///
/// A Feature collects and transfers data to a Datadog Product (e.g. Logs, RUM, ...). A registered Feature can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,16 @@

import DatadogInternal

internal final class ContextSharingTransformer: FeatureMessageReceiver, ContextValuePublisher {
internal final class ContextSharingTransformer: BusMessageReceiver, ContextValuePublisher {
@ReadWriteLock
private var sharedContext: SharedContext? = nil
@ReadWriteLock
private var receiver: ContextValueReceiver<SharedContext?>? = nil

// MARK: - FeatureMessageReceiver

func receive(message: FeatureMessage, from core: DatadogCoreProtocol) -> Bool {
switch message {
case .context(let context):
let newContext = SharedContext(datadogContext: context)
sharedContext = newContext
receiver?(newContext)
return true
default:
return false
}
func receive(message context: DatadogContext, from core: DatadogCoreProtocol) {
let newContext = SharedContext(datadogContext: context)
sharedContext = newContext
receiver?(newContext)
}

// MARK: - ContextValuePublisher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ public final class CrossPlatformExtension: NSObject {
if Self.contextSharingTransformer == nil {
let core = CoreRegistry.default
let contextSharingTransformer = ContextSharingTransformer()
try? core.register(feature: ContextSharingFeature(messageReceiver: contextSharingTransformer))
// Subscribe before registration so initial context push is received:
core.messageBus.subscribe(receiver: contextSharingTransformer)
try? core.register(feature: ContextSharingFeature(messageReceiver: NOPFeatureMessageReceiver()))
Self.contextSharingTransformer = contextSharingTransformer
}
contextSharingTransformer?.publish(to: toSharedContext)
Expand Down
Loading