Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/AsyncObservable.fs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,20 @@ module AsyncRx =
/// A cold subject that only supports a single subscriber. Will await the caller if no-one is subscribing.
let singleSubject<'a> () : IAsyncObserver<'a> * IAsyncObservable<'a> = Subjects.singleSubject<'a> ()

/// <summary>
/// A variant of a subject that "replays" old values to new subscribers by emitting them when they first subscribe.
/// </summary>
/// <remarks>
/// A replay subject has an internal buffer that will store a specified number of values that it has observed. Like subject,
/// a replay subject "observes" values by having them passed to its next method. When it observes a value, it will store
/// that value for a time determined by the configuration of the replay subject, as passed to its constructor.
///
/// When a new subscriber subscribes to the replay subject, it will emit all values in its buffer in a First-In-First-Out
/// (FIFO) manner. The replay subject will also complete, if it has observed completion; and it will error if it has observed
/// an error.
/// </remarks>
let replaySubject<'a> (bufferSize : int) : IAsyncObserver<'a> * IAsyncObservable<'a> = Subjects.replaySubject<'a> bufferSize

// Tap Region

/// Tap asynchronously into the stream performing side effects by the given async actions.
Expand Down
109 changes: 109 additions & 0 deletions src/Subject.fs
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,112 @@ module internal Subjects =
member this.OnCompletedAsync() = async { OnCompleted |> mb.Post } }

obv, obs

type private SafeQueue<'a> () =
let agent =
MailboxProcessor<QueueMessage<'a>>.Start
<| fun inbox ->
let rec loop (queue: Queue<'a>) =
async {
let! message = inbox.Receive()
match message with
| Enqueue item ->
queue.Enqueue item
return! loop queue
| Dequeue channel ->
let item = queue.Dequeue()
channel.Reply item
return! loop queue
| IterAsync f ->
for msg in queue do
do! f msg
return! loop queue
| Count channel->
channel.Reply (queue.Count)
return! loop queue
}
loop <| new Queue<'a>()

member _.Count () = agent.PostAndReply Count
member _.Enqueue item = agent.Post <| Enqueue item
member _.Dequeue () = agent.PostAndReply Dequeue
member _.IterAsync f = agent.Post(IterAsync f)

and private QueueMessage<'a> =
| Count of AsyncReplyChannel<int>
| Enqueue of 'a
| Dequeue of AsyncReplyChannel<'a>
| IterAsync of ('a -> Async<unit>)

let mbReplaySubject<'TSource> (bufferSize : int) : MailboxProcessor<Notification<'TSource>> * IAsyncObservable<'TSource> =
let obvs = new List<IAsyncObserver<'TSource>>()
let cts = new CancellationTokenSource()
let queue = new SafeQueue<Notification<'TSource>> ()
let mb =
MailboxProcessor.Start(
fun inbox ->
let rec messageLoop _ =
async {
let! n = inbox.Receive()

match n with
| OnNext x ->
if queue.Count () < bufferSize then ()
else queue.Dequeue() |> ignore
queue.Enqueue (Notification.OnNext x)

for aobv in obvs do
do! aobv.OnNextAsync x

| OnError err ->
for aobv in obvs do
queue.Enqueue (Notification.OnError err)
do! aobv.OnErrorAsync err

cts.Cancel()

| OnCompleted ->
for aobv in obvs do
do! aobv.OnCompletedAsync()
queue.Enqueue Notification.OnCompleted

cts.Cancel()

return! messageLoop ()
}

messageLoop ()
, cts.Token
)

let subscribeAsync (aobv: IAsyncObserver<'TSource>) : Async<IAsyncRxDisposable> =
async {
let sobv = safeObserver aobv AsyncDisposable.Empty
queue.IterAsync (function
| Notification.OnNext n -> sobv.OnNextAsync n
| Notification.OnError exn -> sobv.OnErrorAsync exn
| Notification.OnCompleted -> sobv.OnCompletedAsync ())

obvs.Add sobv

let cancel () = async { obvs.Remove sobv |> ignore }
return AsyncDisposable.Create cancel
}

mb,
{ new IAsyncObservable<'TSource> with
member __.SubscribeAsync o = subscribeAsync o }

let replaySubject<'TSource> (bufferSize : int) : IAsyncObserver<'TSource> * IAsyncObservable<'TSource> =
if bufferSize < 1 then
invalidArg (nameof(bufferSize)) $"Buffer size should be one or greater but it was {bufferSize}."

let mb, obs = mbReplaySubject<'TSource> bufferSize

let obv =
{ new IAsyncObserver<'TSource> with
member this.OnNextAsync x = async { OnNext x |> mb.Post }
member this.OnErrorAsync err = async { OnError err |> mb.Post }
member this.OnCompletedAsync() = async { OnCompleted |> mb.Post } }

obv, obs
57 changes: 57 additions & 0 deletions test/SubjectTest.fs
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,61 @@ let tests = testList "Subject Tests" [
Expect.equal actual1 expected1 "Should be equal"
Expect.equal actual2 expected2 "Should be equal"
}

testAsync "Test replay subject broadcasts last 2 values when second observer subscribes late" {
let (dispatch, stream) = AsyncRx.replaySubject 2
let obv1 = TestObserver<_>()
let obv2 = TestObserver<_>()

let! _ = stream.SubscribeAsync(obv1)

do! dispatch.OnNextAsync 'a'
do! dispatch.OnNextAsync 'b'
do! dispatch.OnNextAsync 'c'

// TODO: How should I be yielding here so that obv1.Notifications is populated?
// (I specifically don't want to emit a completion.)
do! Async.Sleep 1_000
Comment on lines +128 to +130
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbrattli is there a better way to ensure the value is yielded without using Async.Sleep?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a look, but currently on vacation so might take a few days 😬


let! _ = stream.SubscribeAsync(obv2)

do! Async.Sleep 1_000

let actual1 = obv1.Notifications |> Seq.toList
let expected1 = [ OnNext 'a'; OnNext 'b'; OnNext 'c' ]
let actual2 = obv2.Notifications |> Seq.toList
let expected2 = [ OnNext 'b'; OnNext 'c' ]

Expect.equal actual1 expected1 "Should be equal"
Expect.equal actual2 expected2 "Should be equal"
}

// Behaviour like RxJS.
// https://github.com/ReactiveX/rxjs/blob/9aa16a9e1dfe73fd6c6ed4084e96d22847b63f9b/spec/subjects/ReplaySubject-spec.ts#L149-L171
testAsync "Test replay subject broadcasts last 2 values when second observer subscribes late after completed" {
let (dispatch, stream) = AsyncRx.replaySubject 2
let obv1 = TestObserver<_>()
let obv2 = TestObserver<_>()

let! _ = stream.SubscribeAsync(obv1)

do! dispatch.OnNextAsync 'a'
do! dispatch.OnNextAsync 'b'
do! dispatch.OnNextAsync 'c'
do! dispatch.OnCompletedAsync()

do! obv1.AwaitIgnore()

let! _ = stream.SubscribeAsync(obv2)

do! obv2.AwaitIgnore()

let actual1 = obv1.Notifications |> Seq.toList
let expected1 = [ OnNext 'a'; OnNext 'b'; OnNext 'c'; OnCompleted ]
let actual2 = obv2.Notifications |> Seq.toList
let expected2 = [ OnNext 'b'; OnNext 'c'; OnCompleted ]

Expect.equal actual1 expected1 "Should be equal"
Expect.equal actual2 expected2 "Should be equal"
}
]