From 7e64d8989c83ac939f6fdf6e49dd98480a3f5b79 Mon Sep 17 00:00:00 2001 From: Tri Date: Wed, 6 Sep 2023 20:51:00 +1000 Subject: [PATCH 1/3] added replay subject --- src/AsyncObservable.fs | 14 +++++ src/Subject.fs | 114 +++++++++++++++++++++++++++++++++++++++++ test/SubjectTest.fs | 57 +++++++++++++++++++++ 3 files changed, 185 insertions(+) diff --git a/src/AsyncObservable.fs b/src/AsyncObservable.fs index 5e6c4f8..37ea522 100644 --- a/src/AsyncObservable.fs +++ b/src/AsyncObservable.fs @@ -319,6 +319,20 @@ module AsyncRx = /// A cold subject that only supports a single subscriber. Will await the caller if no-one is subscribing. let singleSubject<'a> () : IAsyncObserver<'a> * IAsyncObservable<'a> = Subjects.singleSubject<'a> () + /// + /// A variant of a subject that "replays" old values to new subscribers by emitting them when they first subscribe. + /// + /// + /// A replay subject has an internal buffer that will store a specified number of values that it has observed. Like subject, + /// a replay subject "observes" values by having them passed to its next method. When it observes a value, it will store + /// that value for a time determined by the configuration of the replay subject, as passed to its constructor. + /// + /// When a new subscriber subscribes to the replay subject, it will emit all values in its buffer in a First-In-First-Out + /// (FIFO) manner. The replay subject will also complete, if it has observed completion; and it will error if it has observed + /// an error. + /// + let replaySubject<'a> (bufferSize : int) : IAsyncObserver<'a> * IAsyncObservable<'a> = Subjects.replaySubject<'a> bufferSize + // Tap Region /// Tap asynchronously into the stream performing side effects by the given async actions. diff --git a/src/Subject.fs b/src/Subject.fs index bd02c50..c721dea 100644 --- a/src/Subject.fs +++ b/src/Subject.fs @@ -113,3 +113,117 @@ module internal Subjects = member this.OnCompletedAsync() = async { OnCompleted |> mb.Post } } obv, obs + + type private SafeQueue<'a> () = + let agent = + MailboxProcessor>.Start + <| fun inbox -> + let rec loop (queue: Queue<'a>) = + async { + let! message = inbox.Receive() + match message with + | Enqueue item -> + queue.Enqueue item + return! loop queue + | Dequeue channel -> + let item = queue.Dequeue() + channel.Reply item + return! loop queue + | IterAsync f -> + for msg in queue do + do! f msg + return! loop queue + | Count channel-> + channel.Reply (queue.Count) + return! loop queue + } + loop <| new Queue<'a>() + + member _.Count () = agent.PostAndReply Count + member _.Enqueue item = agent.Post <| Enqueue item + member _.Dequeue () = agent.PostAndReply Dequeue + member _.IterAsync f = agent.Post(IterAsync f) + + and private QueueMessage<'a> = + | Count of AsyncReplyChannel + | Enqueue of 'a + | Dequeue of AsyncReplyChannel<'a> + | IterAsync of ('a -> Async) + + type private BufferedMessage<'Value> = + | Next of 'Value + | Complete + | Error of exn + + let mbReplaySubject<'TSource> (bufferSize : int) : MailboxProcessor> * IAsyncObservable<'TSource> = + let obvs = new List>() + let cts = new CancellationTokenSource() + let queue = new SafeQueue> () + let mb = + MailboxProcessor.Start( + fun inbox -> + let rec messageLoop _ = + async { + let! n = inbox.Receive() + + match n with + | OnNext x -> + if queue.Count () < bufferSize then () + else queue.Dequeue() |> ignore + queue.Enqueue (BufferedMessage.Next x) + + for aobv in obvs do + do! aobv.OnNextAsync x + + | OnError err -> + for aobv in obvs do + queue.Enqueue (BufferedMessage.Error err) + do! aobv.OnErrorAsync err + + cts.Cancel() + + | OnCompleted -> + for aobv in obvs do + do! aobv.OnCompletedAsync() + queue.Enqueue BufferedMessage.Complete + + cts.Cancel() + + return! messageLoop () + } + + messageLoop () + , cts.Token + ) + + let subscribeAsync (aobv: IAsyncObserver<'TSource>) : Async = + async { + let sobv = safeObserver aobv AsyncDisposable.Empty + queue.IterAsync (function + | BufferedMessage.Next n -> sobv.OnNextAsync n + | BufferedMessage.Error exn -> sobv.OnErrorAsync exn + | BufferedMessage.Complete -> sobv.OnCompletedAsync ()) + + obvs.Add sobv + + let cancel () = async { obvs.Remove sobv |> ignore } + return AsyncDisposable.Create cancel + } + + mb, + { new IAsyncObservable<'TSource> with + member __.SubscribeAsync o = subscribeAsync o } + + let replaySubject<'TSource> (bufferSize : int) : IAsyncObserver<'TSource> * IAsyncObservable<'TSource> = + if bufferSize < 1 then + invalidArg (nameof(bufferSize)) $"Buffer size should be one or greater but it was {bufferSize}." + + let mb, obs = mbReplaySubject<'TSource> bufferSize + + let obv = + { new IAsyncObserver<'TSource> with + member this.OnNextAsync x = async { OnNext x |> mb.Post } + member this.OnErrorAsync err = async { OnError err |> mb.Post } + member this.OnCompletedAsync() = async { OnCompleted |> mb.Post } } + + obv, obs diff --git a/test/SubjectTest.fs b/test/SubjectTest.fs index 10eabfc..dda00b5 100644 --- a/test/SubjectTest.fs +++ b/test/SubjectTest.fs @@ -113,4 +113,61 @@ let tests = testList "Subject Tests" [ Expect.equal actual1 expected1 "Should be equal" Expect.equal actual2 expected2 "Should be equal" } + + testAsync "Test replay subject broadcasts last 2 values when second observer subscribes late" { + let (dispatch, stream) = AsyncRx.replaySubject 2 + let obv1 = TestObserver<_>() + let obv2 = TestObserver<_>() + + let! _ = stream.SubscribeAsync(obv1) + + do! dispatch.OnNextAsync 'a' + do! dispatch.OnNextAsync 'b' + do! dispatch.OnNextAsync 'c' + + // TODO: How should I be yielding here so that obv1.Notifications is populated? + // (I specifically don't want to emit a completion.) + do! Async.Sleep 1_000 + + let! _ = stream.SubscribeAsync(obv2) + + do! Async.Sleep 1_000 + + let actual1 = obv1.Notifications |> Seq.toList + let expected1 = [ OnNext 'a'; OnNext 'b'; OnNext 'c' ] + let actual2 = obv2.Notifications |> Seq.toList + let expected2 = [ OnNext 'b'; OnNext 'c' ] + + Expect.equal actual1 expected1 "Should be equal" + Expect.equal actual2 expected2 "Should be equal" + } + + // Behaviour like RxJS. + // https://github.com/ReactiveX/rxjs/blob/9aa16a9e1dfe73fd6c6ed4084e96d22847b63f9b/spec/subjects/ReplaySubject-spec.ts#L149-L171 + testAsync "Test replay subject broadcasts last 2 values when second observer subscribes late after completed" { + let (dispatch, stream) = AsyncRx.replaySubject 2 + let obv1 = TestObserver<_>() + let obv2 = TestObserver<_>() + + let! _ = stream.SubscribeAsync(obv1) + + do! dispatch.OnNextAsync 'a' + do! dispatch.OnNextAsync 'b' + do! dispatch.OnNextAsync 'c' + do! dispatch.OnCompletedAsync() + + do! obv1.AwaitIgnore() + + let! _ = stream.SubscribeAsync(obv2) + + do! obv2.AwaitIgnore() + + let actual1 = obv1.Notifications |> Seq.toList + let expected1 = [ OnNext 'a'; OnNext 'b'; OnNext 'c'; OnCompleted ] + let actual2 = obv2.Notifications |> Seq.toList + let expected2 = [ OnNext 'b'; OnNext 'c'; OnCompleted ] + + Expect.equal actual1 expected1 "Should be equal" + Expect.equal actual2 expected2 "Should be equal" + } ] From e8d47dc7b094ed702a5951bad4dede20713b3aaf Mon Sep 17 00:00:00 2001 From: Tri Date: Fri, 8 Sep 2023 08:38:41 +1000 Subject: [PATCH 2/3] Use existing notification type --- src/Subject.fs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/Subject.fs b/src/Subject.fs index c721dea..8fb5aff 100644 --- a/src/Subject.fs +++ b/src/Subject.fs @@ -150,15 +150,10 @@ module internal Subjects = | Dequeue of AsyncReplyChannel<'a> | IterAsync of ('a -> Async) - type private BufferedMessage<'Value> = - | Next of 'Value - | Complete - | Error of exn - let mbReplaySubject<'TSource> (bufferSize : int) : MailboxProcessor> * IAsyncObservable<'TSource> = let obvs = new List>() let cts = new CancellationTokenSource() - let queue = new SafeQueue> () + let queue = new SafeQueue> () let mb = MailboxProcessor.Start( fun inbox -> @@ -170,14 +165,14 @@ module internal Subjects = | OnNext x -> if queue.Count () < bufferSize then () else queue.Dequeue() |> ignore - queue.Enqueue (BufferedMessage.Next x) + queue.Enqueue (Notification.OnNext x) for aobv in obvs do do! aobv.OnNextAsync x | OnError err -> for aobv in obvs do - queue.Enqueue (BufferedMessage.Error err) + queue.Enqueue (Notification.OnError err) do! aobv.OnErrorAsync err cts.Cancel() @@ -185,7 +180,7 @@ module internal Subjects = | OnCompleted -> for aobv in obvs do do! aobv.OnCompletedAsync() - queue.Enqueue BufferedMessage.Complete + queue.Enqueue Notification.OnCompleted cts.Cancel() @@ -200,9 +195,9 @@ module internal Subjects = async { let sobv = safeObserver aobv AsyncDisposable.Empty queue.IterAsync (function - | BufferedMessage.Next n -> sobv.OnNextAsync n - | BufferedMessage.Error exn -> sobv.OnErrorAsync exn - | BufferedMessage.Complete -> sobv.OnCompletedAsync ()) + | Notification.OnNext n -> sobv.OnNextAsync n + | Notification.OnError exn -> sobv.OnErrorAsync exn + | Notification.OnCompleted -> sobv.OnCompletedAsync ()) obvs.Add sobv From 337e39fd883373eb9454e262c6e24ff3c6528357 Mon Sep 17 00:00:00 2001 From: Tri Date: Mon, 11 Sep 2023 11:05:07 +1000 Subject: [PATCH 3/3] chore:bumping packet version to 7.2.1 --- .config/dotnet-tools.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.config/dotnet-tools.json b/.config/dotnet-tools.json index ebd33b9..e519fdc 100644 --- a/.config/dotnet-tools.json +++ b/.config/dotnet-tools.json @@ -3,7 +3,7 @@ "isRoot": true, "tools": { "paket": { - "version": "7.1.5", + "version": "7.2.1 ", "commands": [ "paket" ]