diff --git a/src/rx/src/Shared/Rx.lua b/src/rx/src/Shared/Rx.lua index 80236f04af..e98b3deab7 100644 --- a/src/rx/src/Shared/Rx.lua +++ b/src/rx/src/Shared/Rx.lua @@ -2183,4 +2183,97 @@ function Rx.mergeScan(accumulator, seed) }) end +--[=[ + Collects values as an array, emitting the result when another Observable emits. + + https://rxjs.dev/api/index/function/buffer + + @param closingNotifier Observable + @return (source: Observable) -> Observable +]=] + +function Rx.buffer(closingNotifier) + assert(Observable.isObservable(closingNotifier), "Bad observable") + + return function(source) + assert(Observable.isObservable(source), "Bad observable") + + return Observable.new(function(sub) + local maid = Maid.new() + local latestBuffer = {} + + maid:GiveTask(closingNotifier:Subscribe(function() + local latest = table.clone(latestBuffer) + + table.clear(latestBuffer) + + sub:Fire(latest) + end)) + + maid:GiveTask(source:Subscribe( + function(value) + table.insert(latestBuffer, value) + end, + nil, + function() + sub:Fire(latestBuffer) + sub:Complete() + end + )) + + maid:GiveTask(function() + table.clear(latestBuffer) + end) + + return maid + end) + end +end + +--[=[ + Collects values, then, when another Observable fires, emits the collected values as an Observable. + + https://rxjs.dev/api/index/function/window + + @param windowBoundaries Observable + @return (source: Observable) -> Observable +]=] + +function Rx.window(windowBoundaries) + assert(Observable.isObservable(windowBoundaries), "Bad observable") + + return Rx.pipe({ + Rx.buffer(windowBoundaries) :: any, + Rx.map(function(data) + return Rx.of(unpack(data)) :: any + end) :: any, + }) +end + +--[=[ + Groups pairs of emissions together, emitting them as an array. + + https://rxjs.dev/api/index/function/pairwise + + @return (source: Observable) -> Observable +]=] + +function Rx.pairwise() + return function(source) + assert(Observable.isObservable(source), "Bad observable") + + return Observable.new(function(sub) + local previous = UNSET_VALUE + + return source:Subscribe(function(value) + if previous ~= UNSET_VALUE then + sub:Fire({ previous, value }) + end + + previous = value + end, sub:GetFailComplete()) + end) + end +end + return Rx