Skip to content

Commit a282673

Browse files
authored
Merge pull request #134 from AgentFlocks/fix/channel-last-message-at
fix(channel): record last_message_at for all channels including DingTalk
2 parents fa8ca0f + f1da52d commit a282673

File tree

5 files changed

+38
-7
lines changed

5 files changed

+38
-7
lines changed

.flocks/plugins/channels/dingtalk/runner.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,10 @@ function startProxy(): Promise<number> {
258258
return;
259259
}
260260

261+
// Record inbound activity as early as possible, before any Flocks API
262+
// calls that might fail (session creation, inference, etc.).
263+
fetch(`${FLOCKS_BASE}/api/channel/dingtalk/record-inbound`, { method: 'POST' }).catch(() => {});
264+
261265
res.writeHead(200, {
262266
'Content-Type': 'text/event-stream',
263267
'Cache-Control': 'no-cache',

flocks/channel/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,4 +286,4 @@ def mark_disconnected(self, error: Optional[str] = None) -> None:
286286

287287
def record_message(self) -> None:
288288
"""Update the last-message timestamp (called by inbound dispatcher)."""
289-
self._status.last_message_at = time.monotonic()
289+
self._status.last_message_at = time.time()

flocks/channel/builtin/feishu/channel.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
import asyncio
1212
import json
13-
import time
1413
from collections import OrderedDict
1514
from typing import Optional
1615

@@ -95,7 +94,7 @@ async def _send_static(self, ctx: OutboundContext) -> DeliveryResult:
9594
reply_to_id=ctx.reply_to_id,
9695
account_id=ctx.account_id,
9796
)
98-
self._status.last_message_at = time.monotonic()
97+
self.record_message()
9998
return DeliveryResult(
10099
channel_id="feishu",
101100
message_id=result["message_id"],
@@ -130,7 +129,7 @@ async def _send_streaming(self, ctx: OutboundContext) -> DeliveryResult:
130129
try:
131130
# Write full text at once (send_text scenario has no per-chunk streaming data)
132131
await card.finalize(ctx.text)
133-
self._status.last_message_at = time.monotonic()
132+
self.record_message()
134133
return DeliveryResult(
135134
channel_id="feishu",
136135
message_id=message_id,
@@ -158,7 +157,7 @@ async def send_media(self, ctx: OutboundContext) -> DeliveryResult:
158157
reply_to_id=ctx.reply_to_id,
159158
account_id=ctx.account_id,
160159
)
161-
self._status.last_message_at = time.monotonic()
160+
self.record_message()
162161
return DeliveryResult(
163162
channel_id="feishu",
164163
message_id=result["message_id"],

flocks/channel/gateway/manager.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ async def start_all(self) -> None:
7979
channel_id=channel_id,
8080
plugin=plugin,
8181
config=config_dict,
82-
on_message=self._dispatcher.dispatch,
82+
on_message=self._make_on_message(plugin, self._dispatcher.dispatch),
8383
abort_event=abort_event,
8484
),
8585
name=f"channel-{channel_id}",
@@ -182,7 +182,7 @@ async def start_channel(self, channel_id: str) -> None:
182182
channel_id=channel_id,
183183
plugin=plugin,
184184
config=config_dict,
185-
on_message=self._dispatcher.dispatch,
185+
on_message=self._make_on_message(plugin, self._dispatcher.dispatch),
186186
abort_event=abort_event,
187187
),
188188
name=f"channel-{channel_id}",
@@ -328,6 +328,23 @@ async def _run_with_reconnect(
328328
# Reconnect helpers
329329
# ------------------------------------------------------------------
330330

331+
@staticmethod
332+
def _make_on_message(
333+
plugin: ChannelPlugin,
334+
dispatch: Callable,
335+
) -> Callable:
336+
"""Wrap *dispatch* so that each inbound message records a timestamp."""
337+
async def _on_message(msg) -> None:
338+
plugin.record_message()
339+
await dispatch(msg)
340+
return _on_message
341+
342+
def record_message(self, channel_id: str) -> None:
343+
"""Update last_message_at on a running channel plugin (e.g. from an HTTP handler)."""
344+
plugin = self._running_plugins.get(channel_id) or self._registry.get(channel_id)
345+
if plugin:
346+
plugin.record_message()
347+
331348
@staticmethod
332349
async def _mark_connected(plugin: ChannelPlugin, channel_id: str) -> None:
333350
plugin.mark_connected()

flocks/server/routes/channel.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,17 @@ async def list_channels():
178178
]
179179

180180

181+
@router.post("/{channel_id}/record-inbound")
182+
async def record_inbound(channel_id: str):
183+
"""Notify the gateway that a message was received on this channel.
184+
185+
Used by out-of-process bridges (e.g. DingTalk's runner.ts) that bypass the
186+
InboundDispatcher so that last_message_at is updated on the plugin status.
187+
"""
188+
default_manager.record_message(channel_id)
189+
return {"ok": True}
190+
191+
181192
@router.post("/{channel_id}/restart")
182193
async def restart_channel(channel_id: str):
183194
"""Restart a single channel connection with the latest config.

0 commit comments

Comments
 (0)