@@ -18,6 +18,7 @@ import (
1818 "github.com/synnaxlabs/aspen/internal/node"
1919 "github.com/synnaxlabs/freighter/mock"
2020 "github.com/synnaxlabs/x/address"
21+ "github.com/synnaxlabs/x/encoding/gob"
2122 "github.com/synnaxlabs/x/encoding/msgpack"
2223 "github.com/synnaxlabs/x/kv/memkv"
2324 "github.com/synnaxlabs/x/signal"
@@ -151,6 +152,157 @@ var _ = Describe("Open", func() {
151152 Expect (kvDB .Close ()).To (Succeed ())
152153 })
153154
155+ It ("Should recover state written by msgpack (v0.39 to v0.53 upgrade)" , func (ctx SpecContext ) {
156+ gossipT1 := gossipNet .UnaryServer ("" )
157+ pledgeT1 := pledgeNet .UnaryServer (gossipT1 .Address )
158+ clusterOne := MustSucceed (cluster .Open (
159+ ctx ,
160+ cluster.Config {
161+ HostAddress : gossipT1 .Address ,
162+ Pledge : pledge.Config {
163+ Peers : []address.Address {},
164+ TransportClient : pledgeNet .UnaryClient (),
165+ TransportServer : pledgeT1 ,
166+ },
167+ Gossip : gossip.Config {
168+ TransportClient : gossipNet .UnaryClient (),
169+ TransportServer : gossipT1 ,
170+ Interval : 100 * time .Millisecond ,
171+ },
172+ },
173+ ))
174+
175+ kvDB := memkv .New ()
176+ gossipT2 := gossipNet .UnaryServer ("" )
177+ pledgeT2 := pledgeNet .UnaryServer (gossipT2 .Address )
178+ storageKey := []byte ("msgpack-upgrade-test" )
179+
180+ // Simulate a v0.39-v0.53 server that wrote state as msgpack.
181+ oldConfig := cluster.Config {
182+ HostAddress : gossipT2 .Address ,
183+ Pledge : pledge.Config {
184+ Peers : []address.Address {gossipT1 .Address },
185+ TransportClient : pledgeNet .UnaryClient (),
186+ TransportServer : pledgeT2 ,
187+ },
188+ Gossip : gossip.Config {
189+ TransportClient : gossipNet .UnaryClient (),
190+ TransportServer : gossipT2 ,
191+ Interval : 100 * time .Millisecond ,
192+ },
193+ StorageKey : storageKey ,
194+ Storage : kvDB ,
195+ StorageFlushInterval : cluster .FlushOnEvery ,
196+ Codec : msgpack .Codec ,
197+ }
198+ clusterTwo := MustSucceed (cluster .Open (ctx , oldConfig ))
199+ Expect (clusterTwo .Host ().Key ).To (Equal (node .Key (2 )))
200+ Expect (clusterTwo .Close ()).To (Succeed ())
201+
202+ // Reopen with the default codec (JSON primary, msgpack+gob fallback),
203+ // simulating an upgrade to v0.54+.
204+ gossipT3 := gossipNet .UnaryServer (gossipT2 .Address )
205+ pledgeT3 := pledgeNet .UnaryServer (gossipT3 .Address )
206+ upgradedConfig := cluster.Config {
207+ HostAddress : gossipT3 .Address ,
208+ Pledge : pledge.Config {
209+ Peers : []address.Address {gossipT1 .Address },
210+ TransportClient : pledgeNet .UnaryClient (),
211+ TransportServer : pledgeT3 ,
212+ },
213+ Gossip : gossip.Config {
214+ TransportClient : gossipNet .UnaryClient (),
215+ TransportServer : gossipT3 ,
216+ Interval : 100 * time .Millisecond ,
217+ },
218+ StorageKey : storageKey ,
219+ Storage : kvDB ,
220+ StorageFlushInterval : cluster .FlushOnEvery ,
221+ }
222+ clusterTwoAgain := MustSucceed (cluster .Open (ctx , upgradedConfig ))
223+ Expect (clusterTwoAgain .Host ().Key ).To (Equal (node .Key (2 )))
224+ Expect (clusterTwoAgain .Nodes ()).To (HaveLen (2 ))
225+
226+ Expect (clusterOne .Close ()).To (Succeed ())
227+ Expect (clusterTwoAgain .Close ()).To (Succeed ())
228+ Expect (kvDB .Close ()).To (Succeed ())
229+ })
230+
231+ It ("Should recover state written by gob (pre-v0.39 upgrade)" , func (ctx SpecContext ) {
232+ gossipT1 := gossipNet .UnaryServer ("" )
233+ pledgeT1 := pledgeNet .UnaryServer (gossipT1 .Address )
234+ clusterOne := MustSucceed (cluster .Open (
235+ ctx ,
236+ cluster.Config {
237+ HostAddress : gossipT1 .Address ,
238+ Pledge : pledge.Config {
239+ Peers : []address.Address {},
240+ TransportClient : pledgeNet .UnaryClient (),
241+ TransportServer : pledgeT1 ,
242+ },
243+ Gossip : gossip.Config {
244+ TransportClient : gossipNet .UnaryClient (),
245+ TransportServer : gossipT1 ,
246+ Interval : 100 * time .Millisecond ,
247+ },
248+ },
249+ ))
250+
251+ kvDB := memkv .New ()
252+ gossipT2 := gossipNet .UnaryServer ("" )
253+ pledgeT2 := pledgeNet .UnaryServer (gossipT2 .Address )
254+ storageKey := []byte ("gob-upgrade-test" )
255+
256+ // Simulate a pre-v0.39 server that wrote state as gob.
257+ oldConfig := cluster.Config {
258+ HostAddress : gossipT2 .Address ,
259+ Pledge : pledge.Config {
260+ Peers : []address.Address {gossipT1 .Address },
261+ TransportClient : pledgeNet .UnaryClient (),
262+ TransportServer : pledgeT2 ,
263+ },
264+ Gossip : gossip.Config {
265+ TransportClient : gossipNet .UnaryClient (),
266+ TransportServer : gossipT2 ,
267+ Interval : 100 * time .Millisecond ,
268+ },
269+ StorageKey : storageKey ,
270+ Storage : kvDB ,
271+ StorageFlushInterval : cluster .FlushOnEvery ,
272+ Codec : gob .Codec ,
273+ }
274+ clusterTwo := MustSucceed (cluster .Open (ctx , oldConfig ))
275+ Expect (clusterTwo .Host ().Key ).To (Equal (node .Key (2 )))
276+ Expect (clusterTwo .Close ()).To (Succeed ())
277+
278+ // Reopen with the default codec, simulating an upgrade to v0.54+.
279+ gossipT3 := gossipNet .UnaryServer (gossipT2 .Address )
280+ pledgeT3 := pledgeNet .UnaryServer (gossipT3 .Address )
281+ upgradedConfig := cluster.Config {
282+ HostAddress : gossipT3 .Address ,
283+ Pledge : pledge.Config {
284+ Peers : []address.Address {gossipT1 .Address },
285+ TransportClient : pledgeNet .UnaryClient (),
286+ TransportServer : pledgeT3 ,
287+ },
288+ Gossip : gossip.Config {
289+ TransportClient : gossipNet .UnaryClient (),
290+ TransportServer : gossipT3 ,
291+ Interval : 100 * time .Millisecond ,
292+ },
293+ StorageKey : storageKey ,
294+ Storage : kvDB ,
295+ StorageFlushInterval : cluster .FlushOnEvery ,
296+ }
297+ clusterTwoAgain := MustSucceed (cluster .Open (ctx , upgradedConfig ))
298+ Expect (clusterTwoAgain .Host ().Key ).To (Equal (node .Key (2 )))
299+ Expect (clusterTwoAgain .Nodes ()).To (HaveLen (2 ))
300+
301+ Expect (clusterOne .Close ()).To (Succeed ())
302+ Expect (clusterTwoAgain .Close ()).To (Succeed ())
303+ Expect (kvDB .Close ()).To (Succeed ())
304+ })
305+
154306 })
155307
156308 })
0 commit comments