Skip to content

Commit 6886b3f

Browse files
Observable grouping functions -- Rx.buffer, Rx.window, Rx.pairwise
1 parent 275fcfd commit 6886b3f

File tree

1 file changed

+89
-0
lines changed

1 file changed

+89
-0
lines changed

src/rx/src/Shared/Rx.lua

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2183,4 +2183,93 @@ function Rx.mergeScan(accumulator, seed)
21832183
})
21842184
end
21852185

2186+
--[=[
2187+
Collects values as an array, emitting the result when another Observable emits.
2188+
2189+
https://rxjs.dev/api/index/function/buffer
2190+
2191+
@param closingNotifier Observable
2192+
@return (source: Observable) -> Observable
2193+
]=]
2194+
2195+
function Rx.buffer(closingNotifier)
2196+
assert(Observable.isObservable(closingNotifier), "Bad observable")
2197+
2198+
return function(source)
2199+
assert(Observable.isObservable(source), "Bad observable")
2200+
2201+
return Observable.new(function(sub)
2202+
local maid = Maid.new()
2203+
local buffer = {}
2204+
2205+
maid:GiveTask(closingNotifier:Subscribe(function()
2206+
local latest = table.clone(buffer)
2207+
2208+
table.clear(buffer)
2209+
2210+
sub:Fire(latest)
2211+
end)
2212+
2213+
maid:GiveTask(source:Subscribe(function(value)
2214+
table.insert(buffer, value)
2215+
end, nil, function(value)
2216+
sub:Fire(buffer)
2217+
sub:Complete()
2218+
end))
2219+
2220+
maid:GiveTask(function()
2221+
table.clear(buffer)
2222+
end)
2223+
2224+
return maid
2225+
end)
2226+
end
2227+
end
2228+
2229+
--[=[
2230+
Collects values, then, when another Observable fires, emits the collected values as an Observable.
2231+
2232+
https://rxjs.dev/api/index/function/window
2233+
2234+
@param windowBoundaries Observable
2235+
@return (source: Observable) -> Observable
2236+
]=]
2237+
2238+
function Rx.window(windowBoundaries)
2239+
assert(Observable.isObservable(windowBoundaries), "Bad observable")
2240+
2241+
return Rx.pipe({
2242+
Rx.buffer(windowBoundaries),
2243+
Rx.map(function(data)
2244+
return Rx.of(unpack(data))
2245+
end)
2246+
})
2247+
end
2248+
2249+
--[=[
2250+
Groups pairs of emissions together, emitting them as an array.
2251+
2252+
https://rxjs.dev/api/index/function/pairwise
2253+
2254+
@return (source: Observable) -> Observable
2255+
]=]
2256+
2257+
function Rx.pairwise()
2258+
return function(source)
2259+
assert(Observable.isObservable(source), "Bad observable")
2260+
2261+
return Observable.new(function(sub)
2262+
local previous = UNSET_VALUE
2263+
2264+
return source:Subscribe(function(value)
2265+
if previous ~= UNSET_VALUE then
2266+
sub:Fire({previous, value})
2267+
end
2268+
2269+
previous = value
2270+
end, sub:GetFailComplete())
2271+
end)
2272+
end
2273+
end
2274+
21862275
return Rx

0 commit comments

Comments
 (0)