Skip to content

Commit e230aef

Browse files
authored
Wrap connection handling in a mutex for some thread-safe handling (#100)
1 parent 07a3073 commit e230aef

File tree

1 file changed

+38
-9
lines changed

1 file changed

+38
-9
lines changed

src/cable/server.cr

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ module Cable
2121
include Debug
2222

2323
# The String key is the `connection_identifier` value for `Cable::Connection`
24-
getter connections = {} of String => Cable::Connection
2524
getter errors = 0
2625
getter fiber_channel = ::Channel({String, String}).new
2726
getter pinger : Cable::BackendPinger do
@@ -39,10 +38,14 @@ module Cable
3938

4039
@channels : Hash(String, Channels)
4140
@channel_mutex : Mutex
41+
@connections : Hash(String, Cable::Connection)
42+
@connections_mutex : Mutex
4243

4344
def initialize
4445
@channels = {} of String => Channels
4546
@channel_mutex = Mutex.new
47+
@connections = {} of String => Cable::Connection
48+
@connections_mutex = Mutex.new
4649

4750
begin
4851
# load the connections
@@ -59,12 +62,24 @@ module Cable
5962
RemoteConnections.new(self)
6063
end
6164

65+
# Returns the current connections
66+
def connections : Hash(String, Cable::Connection)
67+
@connections_mutex.synchronize do
68+
@connections.dup
69+
end
70+
end
71+
6272
def add_connection(connection)
63-
connections[connection.connection_identifier] = connection
73+
@connections_mutex.synchronize do
74+
@connections[connection.connection_identifier] = connection
75+
end
6476
end
6577

6678
def remove_connection(connection_id)
67-
connections.delete(connection_id).try(&.close)
79+
connection = @connections_mutex.synchronize do
80+
@connections.delete(connection_id)
81+
end
82+
connection.try(&.close)
6883
end
6984

7085
# You shouldn't rely on these following two methods
@@ -73,7 +88,9 @@ module Cable
7388

7489
# Only returns connections opened on this instance.
7590
def active_connections_for(token : String) : Array(Connection)
76-
connections.values.select { |connection| connection.token == token && !connection.closed? }
91+
@connections_mutex.synchronize do
92+
@connections.values.select { |connection| connection.token == token && !connection.closed? }
93+
end
7794
end
7895

7996
# Only returns channel subscriptions opened on this instance.
@@ -139,7 +156,10 @@ module Cable
139156
end
140157

141158
def send_to_internal_connections(connection_identifier : String, message : String)
142-
if internal_connection = connections[connection_identifier]?
159+
internal_connection = @connections_mutex.synchronize do
160+
@connections[connection_identifier]?
161+
end
162+
if internal_connection
143163
case message
144164
when Cable.message(:disconnect)
145165
Cable::Logger.info { "Removing connection (#{connection_identifier})" }
@@ -169,8 +189,11 @@ module Cable
169189
Cable::Logger.debug { "Cable::Server#shutdown Connection to backend was severed: #{e.message}" }
170190
end
171191
pinger.stop
172-
connections.each do |_k, v|
173-
v.close
192+
connections_to_close = @connections_mutex.synchronize do
193+
@connections.values.dup
194+
end
195+
connections_to_close.each do |connection|
196+
connection.close
174197
end
175198
end
176199

@@ -191,8 +214,8 @@ module Cable
191214
channel, message = received
192215
if channel.starts_with?("cable_internal")
193216
identifier = channel.split('/').last
194-
connection_identifier = server.connections.keys.find!(&.starts_with?(identifier))
195-
server.send_to_internal_connections(connection_identifier, message)
217+
connection_identifier = server.find_connection_identifier(identifier)
218+
server.send_to_internal_connections(connection_identifier, message) if connection_identifier
196219
else
197220
server.send_to_channels(channel, message)
198221
end
@@ -201,6 +224,12 @@ module Cable
201224
end
202225
end
203226

227+
protected def find_connection_identifier(identifier : String) : String?
228+
@connections_mutex.synchronize do
229+
@connections.keys.find(&.starts_with?(identifier))
230+
end
231+
end
232+
204233
private def subscribe
205234
spawn(name: "Cable::Server - subscribe") do
206235
backend.open_subscribe_connection("_internal")

0 commit comments

Comments
 (0)