@@ -2197,28 +2197,32 @@ function Rx.buffer(closingNotifier)
21972197
21982198 return function (source )
21992199 assert (Observable .isObservable (source ), " Bad observable" )
2200-
2200+
22012201 return Observable .new (function (sub )
22022202 local maid = Maid .new ()
2203- local buffer = {}
2203+ local latestBuffer = {}
22042204
22052205 maid :GiveTask (closingNotifier :Subscribe (function ()
2206- local latest = table .clone (buffer )
2206+ local latest = table .clone (latestBuffer )
22072207
2208- table .clear (buffer )
2208+ table .clear (latestBuffer )
22092209
22102210 sub :Fire (latest )
22112211 end ))
22122212
2213- maid :GiveTask (source :Subscribe (function (value )
2214- table.insert (buffer , value )
2215- end , nil , function (value )
2216- sub :Fire (buffer )
2217- sub :Complete ()
2218- end ))
2213+ maid :GiveTask (source :Subscribe (
2214+ function (value )
2215+ table.insert (latestBuffer , value )
2216+ end ,
2217+ nil ,
2218+ function ()
2219+ sub :Fire (latestBuffer )
2220+ sub :Complete ()
2221+ end
2222+ ))
22192223
22202224 maid :GiveTask (function ()
2221- table .clear (buffer )
2225+ table .clear (latestBuffer )
22222226 end )
22232227
22242228 return maid
@@ -2242,7 +2246,7 @@ function Rx.window(windowBoundaries)
22422246 Rx .buffer (windowBoundaries ),
22432247 Rx .map (function (data )
22442248 return Rx .of (unpack (data ))
2245- end )
2249+ end ),
22462250 })
22472251end
22482252
@@ -2263,7 +2267,7 @@ function Rx.pairwise()
22632267
22642268 return source :Subscribe (function (value )
22652269 if previous ~= UNSET_VALUE then
2266- sub :Fire ({previous , value })
2270+ sub :Fire ({ previous , value })
22672271 end
22682272
22692273 previous = value
0 commit comments