@@ -52,7 +52,7 @@ public extension Flow {
5252 var request = URLRequest ( url: url)
5353 request. timeoutInterval = timeoutInterval
5454
55- let pinner = FoundationSecurity ( allowSelfSigned: true ) // don't validate SSL certificates
55+ let pinner = FoundationSecurity ( allowSelfSigned: true ) // do not validate SSL certificates
5656 socket = WebSocket ( request: request, certPinner: pinner)
5757 socket? . delegate = self
5858 socket? . connect ( )
@@ -69,73 +69,68 @@ public extension Flow {
6969 }
7070
7171 // MARK: - Subscription Methods
72-
72+ @ discardableResult
7373 public func subscribeToBlockDigests(
7474 blockStatus: BlockStatus = . sealed,
7575 startBlockHeight: String ? = nil ,
7676 startBlockId: String ? = nil
77- ) -> AnyPublisher < Flow . WSBlockHeader , Error > {
77+ ) -> AnyPublisher < TopicResponse < Flow . WSBlockHeader > , Error > {
7878 let arguments = BlockDigestArguments (
7979 blockStatus: blockStatus,
8080 startBlockHeight: startBlockHeight,
8181 startBlockId: startBlockId
8282 )
8383 return subscribe ( topic: . blockDigests, arguments: arguments, type: Flow . WSBlockHeader. self)
84+ . map { payload in
85+ TopicResponse ( subscriptionId: payload. subscriptionId, topic: payload. topic, payload: payload. payload, error: payload. error)
86+ }
87+ . eraseToAnyPublisher ( )
8488 }
8589
86- public func subscribeToBlockHeaders( ) -> AnyPublisher < Flow . BlockHeader , Error > {
90+ @discardableResult
91+ public func subscribeToBlockHeaders( ) -> AnyPublisher < TopicResponse < Flow . BlockHeader > , Error > {
8792 return subscribe ( topic: . blockHeaders, arguments: EmptyArguments ( ) , type: Flow . BlockHeader. self)
8893 }
8994
90- public func subscribeToBlocks( ) -> AnyPublisher < Flow . Block , Error > {
95+ @discardableResult
96+ public func subscribeToBlocks( ) -> AnyPublisher < TopicResponse < Flow . Block > , Error > {
9197 return subscribe ( topic: . blocks, arguments: EmptyArguments ( ) , type: Flow . Block. self)
9298 }
9399
94- public func subscribeToEvents( type: String ? = nil , contractID: String ? = nil , address: String ? = nil ) -> AnyPublisher < Flow . Event , Error > {
100+ @discardableResult
101+ public func subscribeToEvents( type: String ? = nil , contractID: String ? = nil , address: String ? = nil ) -> AnyPublisher < TopicResponse < Flow . Event > , Error > {
95102 let arguments = EventArguments ( type: type, contractID: contractID, address: address)
96103 return subscribe ( topic: . events, arguments: arguments, type: Flow . Event. self)
97104 }
98105
99- public func subscribeToAccountStatuses ( address : String ) -> AnyPublisher < Flow . Account , Error > {
100- let arguments = AccountArguments ( address : address )
101- let publisher = subscribe ( topic: . accountStatuses, arguments: arguments , type: Flow . Account . self)
106+ @ discardableResult
107+ public func subscribeToAccountStatuses ( request : AccountArguments ) -> AnyPublisher < TopicResponse < AccountStatusResponse > , Error > {
108+ let publisher = subscribe ( topic: . accountStatuses, arguments: request , type: Flow . Websocket . AccountStatusResponse . self)
102109
103110 // Also publish to central publisher for account updates
104- Flow . Publisher. shared. publishAccountUpdate ( address: Flow . Address ( hex: address) )
111+ // Flow.Publisher.shared.publishAccountUpdate(address: Flow.Address(hex: address))
105112
106113 return publisher
107114 }
108115
109- public func subscribeToTransactionStatus( txId: Flow . ID ) -> AnyPublisher < Flow . WSTransactionResponse , Error > {
116+ @discardableResult
117+ public func subscribeToTransactionStatus( txId: Flow . ID ) -> AnyPublisher < TopicResponse < Flow . WSTransactionResponse > , Error > {
110118 let arguments = TransactionStatusRequest ( txId: txId. hex)
111119 let publisher = subscribe ( topic: . transactionStatuses, arguments: arguments, type: Flow . WSTransactionResponse. self)
112120
113121 // Also publish transaction status updates to central publisher
114122 publisher. sink (
115123 receiveCompletion: { _ in } ,
116- receiveValue: { status in
117- Flow . Publisher. shared. publishTransactionStatus ( id: txId, status: status. transactionResult)
124+ receiveValue: { response in
125+ if let status = response. payload {
126+ Flow . Publisher. shared. publishTransactionStatus ( id: txId, status: status. transactionResult)
127+ }
118128 }
119129 ) . store ( in: & cancellables)
120130
121131 return publisher
122132 }
123133
124- // public func sendAndSubscribeToTransactionStatus(transaction: Flow.Transaction) -> AnyPublisher<Flow.Transaction.Status, Error> {
125- // let arguments = SendTransactionArguments(transaction: transaction)
126- // let publisher = subscribe(topic: .sendAndGetTransactionStatuses, arguments: arguments, type: Flow.Transaction.Status.self)
127- //
128- // // Also publish transaction status updates to central publisher
129- // publisher.sink(
130- // receiveCompletion: { _ in },
131- // receiveValue: { status in
132- // Flow.Publisher.shared.publishTransactionStatus(id: transaction., status: status)
133- // }
134- // ).store(in: &cancellables)
135- //
136- // return publisher
137- // }
138-
139134 public func listSubscriptions( ) {
140135 let request = SubscribeRequest < EmptyArguments > ( id: generateShortUUID ( ) , action: . listSubscriptions, topic: . blocks, arguments: nil )
141136 do {
@@ -146,13 +141,11 @@ public extension Flow {
146141 }
147142 }
148143
149- private func subscribe< T: Encodable , U: Decodable > ( topic: Topic , arguments: T , type: U . Type ) -> AnyPublisher < U , Error > {
144+ private func subscribe< T: Encodable , U: Decodable > ( topic: Topic , arguments: T , type: U . Type ) -> AnyPublisher < TopicResponse < U > , Error > {
150145 let subscriptionId = generateShortUUID ( )
151146 let request = SubscribeRequest ( id: subscriptionId, action: . subscribe, topic: topic, arguments: arguments)
152-
153147 let subject = PassthroughSubject < Any , Error > ( )
154- subscriptions [ subscriptionId] = ( subject: subject, type: U . self)
155-
148+ subscriptions [ subscriptionId] = ( subject: subject, type: TopicResponse< U> . self )
156149 do {
157150 let data = try encoder. encode ( request)
158151 socket? . write ( data: data)
@@ -161,10 +154,9 @@ public extension Flow {
161154 subscriptions. removeValue ( forKey: subscriptionId)
162155 Flow . Publisher. shared. publishError ( error)
163156 }
164-
165157 return subject
166- . compactMap { value -> U ? in
167- return value as? U
158+ . compactMap { value -> TopicResponse < U > ? in
159+ return value as? TopicResponse < U >
168160 }
169161 . eraseToAnyPublisher ( )
170162 }
@@ -181,6 +173,13 @@ public extension Flow {
181173 Flow . Publisher. shared. publishError ( error)
182174 }
183175 }
176+
177+ // Helper method to generate short UUIDs
178+ private func generateShortUUID( ) -> String {
179+ // Generate UUID and take first 20 characters
180+ let fullUUID = UUID ( ) . uuidString
181+ return String ( fullUUID. prefix ( 20 ) )
182+ }
184183 }
185184}
186185
@@ -189,7 +188,7 @@ public extension Flow {
189188extension Flow . Websocket : WebSocketDelegate {
190189 public func didReceive( event: WebSocketEvent , client: any Starscream . WebSocketClient ) {
191190 switch event {
192- case let . connected( data ) :
191+ case . connected:
193192 isConnected = true
194193 Flow . Publisher. shared. publishConnectionStatus ( isConnected: true )
195194
@@ -230,77 +229,33 @@ extension Flow.Websocket: WebSocketDelegate {
230229 }
231230 return
232231 }
233-
234232 // Try to decode as a ListSubscriptionsResponse
235233 if let response = try ? decoder. decode ( ListSubscriptionsResponse . self, from: data) {
236234 print ( " Active subscriptions: \( response. subscriptions) " )
237235 return
238236 }
239-
240237 let object = try JSONSerialization . jsonObject ( with: data)
241238 print ( object)
242-
243239 if let _ = try ? decoder. decode ( SubscribeResponse . self, from: data) {
244240 return
245241 }
246-
247- // Try to decode as a TopicResponse with different types
248- let response = try decoder. decode ( TopicResponse< AnyDecodable> . self , from: data)
249- guard let subscription = subscriptions [ response. subscriptionId] else { return }
250-
251- if let error = response. error {
252- let wsError = WebSocketError . serverError ( error)
253- subscription. subject. send ( completion: . failure( wsError) )
254- Flow . Publisher. shared. publishError ( wsError)
255- return
256- }
257-
258- guard let anyData = response. payload else { return }
259-
260- do {
261- let jsonData = try JSONSerialization . data ( withJSONObject: anyData. value)
262- if let decodableType = subscription. type as? Decodable . Type {
263- let decodedData = try decoder. decode ( decodableType, from: jsonData)
264- subscription. subject. send ( decodedData)
242+ // Directly decode using the TopicResponse<U>.self type stored at subscription time
243+ // First use AnyDecodable to get the subscriptionId
244+ if let anyResponse = try ? decoder. decode ( TopicResponse< AnyDecodable> . self , from: data) ,
245+ let subscription = subscriptions [ anyResponse. subscriptionId] ,
246+ let decodableType = subscription. type as? Decodable . Type {
247+ do {
248+ let decoded = try decoder. decode ( decodableType, from: data)
249+ subscription. subject. send ( decoded)
250+ } catch {
251+ subscription. subject. send ( completion: . failure( error) )
252+ Flow . Publisher. shared. publishError ( error)
265253 }
266- } catch {
267- subscription. subject. send ( completion: . failure( error) )
268- Flow . Publisher. shared. publishError ( error)
254+ return
269255 }
270256 } catch {
271257 print ( " Error decoding message: \( error) " )
272258 Flow . Publisher. shared. publishError ( error)
273259 }
274260 }
275261}
276-
277- // MARK: - Supporting Types
278-
279- extension Flow . Websocket {
280- enum WebSocketError : Error {
281- case serverError( SocketError )
282- }
283-
284- // Helper method to generate short UUIDs
285- private func generateShortUUID( ) -> String {
286- // Generate UUID and take first 20 characters
287- let fullUUID = UUID ( ) . uuidString
288- return String ( fullUUID. prefix ( 20 ) )
289- }
290-
291- struct EmptyArguments : Codable { }
292-
293- struct EventArguments : Codable {
294- let type : String ?
295- let contractID : String ?
296- let address : String ?
297- }
298-
299- struct AccountArguments : Codable {
300- let address : String
301- }
302-
303- struct SendTransactionArguments : Codable {
304- let transaction : Flow . Transaction
305- }
306- }
0 commit comments