Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 96 additions & 21 deletions langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import atexit
import os
import threading
import weakref
from queue import Full, Queue
from typing import Any, Callable, Dict, List, Optional, cast

Expand Down Expand Up @@ -170,6 +171,7 @@ def _initialize_instance(
self.base_url = base_url
self.mask = mask
self.environment = environment
self._shutdown = False

# Store additional client settings for get_client() to use
self.timeout = timeout
Expand Down Expand Up @@ -243,7 +245,9 @@ def _initialize_instance(
x_langfuse_public_key=self.public_key,
timeout=timeout,
)
score_ingestion_client = LangfuseClient(

# Store as instance variable so _at_fork_reinit can reuse without recreation
self._score_ingestion_client = LangfuseClient(
public_key=self.public_key,
secret_key=secret_key,
base_url=base_url,
Expand All @@ -257,6 +261,52 @@ def _initialize_instance(
LANGFUSE_MEDIA_UPLOAD_ENABLED, "True"
).lower() not in ("false", "0")

self._media_upload_thread_count = media_upload_thread_count or max(
int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1
)

self._init_consumer_threads()

# Prompt cache
self.prompt_cache = PromptCache()

# Register shutdown handler
atexit.register(self.shutdown)

# Register fork handler to reinitialize consumer threads in child process.
# When using Gunicorn with --preload, os.fork() copies memory but not threads
# (POSIX.1: https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html).
# Without this, media upload and score ingestion threads are lost after fork,
# causing silent data loss.
#
# Note: LangfuseSpanProcessor (BatchSpanProcessor) already handles fork-safety
# for span export via its own os.register_at_fork. This handler covers the
# remaining background threads managed by LangfuseResourceManager.
#
# weakref.WeakMethod prevents os.register_at_fork from holding a permanent strong
# reference to this instance, which would block garbage collection.
# See: https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py
if hasattr(os, "register_at_fork"):
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(
# Walrus operator resolves the weak reference once and stores it in
# a temporary variable before calling it. This avoids a TOCTOU window
# where GC could collect the referent between checking for None and
# invoking the method.
after_in_child=lambda: (m := weak_reinit()) and m()
)

langfuse_logger.info(
f"Startup: Langfuse tracer successfully initialized | "
f"public_key={self.public_key} | "
f"base_url={base_url} | "
f"environment={environment or 'default'} | "
f"sample_rate={sample_rate if sample_rate is not None else 1.0} | "
f"media_threads={self._media_upload_thread_count}"
)

def _init_consumer_threads(self) -> None:
"""Initialize media upload and score ingestion consumer threads."""
self._media_upload_queue: Queue[Any] = Queue(100_000)
self._media_manager = MediaManager(
api_client=self.api,
Expand All @@ -266,48 +316,71 @@ def _initialize_instance(
)
self._media_upload_consumers = []

media_upload_thread_count = media_upload_thread_count or max(
int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1
)

if self._media_upload_enabled:
for i in range(media_upload_thread_count):
for i in range(self._media_upload_thread_count):
media_upload_consumer = MediaUploadConsumer(
identifier=i,
media_manager=self._media_manager,
)
media_upload_consumer.start()
self._media_upload_consumers.append(media_upload_consumer)

# Prompt cache
self.prompt_cache = PromptCache()

# Score ingestion
self._score_ingestion_queue: Queue[Any] = Queue(100_000)
self._ingestion_consumers = []

ingestion_consumer = ScoreIngestionConsumer(
ingestion_queue=self._score_ingestion_queue,
identifier=0,
client=score_ingestion_client,
flush_at=flush_at,
flush_interval=flush_interval,
client=self._score_ingestion_client,
flush_at=self.flush_at,
flush_interval=self.flush_interval,
max_retries=3,
public_key=self.public_key,
)
ingestion_consumer.start()
self._ingestion_consumers.append(ingestion_consumer)

# Register shutdown handler
atexit.register(self.shutdown)
def _at_fork_reinit(self) -> None:
Comment thread
wochinge marked this conversation as resolved.
"""Reinitialize consumer threads after fork in child process.

langfuse_logger.info(
f"Startup: Langfuse tracer successfully initialized | "
f"public_key={self.public_key} | "
f"base_url={base_url} | "
f"environment={environment or 'default'} | "
f"sample_rate={sample_rate if sample_rate is not None else 1.0} | "
f"media_threads={media_upload_thread_count or 1}"
Called automatically via os.register_at_fork() after fork().
Necessary for Gunicorn --preload deployments where os.fork() is used:
threads are not copied to child processes (POSIX standard), so without
reinitialization, the child process has no consumer threads and all
media upload and score ingestion events are silently lost.

Note: LangfuseSpanProcessor (BatchSpanProcessor) handles span export
fork-safety separately via its own os.register_at_fork handler.

Skipped if shutdown() was already called on this instance, to avoid
restarting threads on an intentionally torn-down manager.
"""
if self._shutdown:
return

langfuse_logger.debug(
f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads."
)

# Queues are intentionally recreated after fork. Items enqueued before fork
# belong to the preloaded parent process and must not be processed by every
# worker — otherwise uploads/scores would be duplicated across workers.
#
# HTTP clients (self.httpx_client, self._score_ingestion_client) are not recreated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid we need to recreate them too.
From what I saw in the docs for httpx it is promising thread-safety (which makes the singleton usage fine) but no process-safety. So copying that stuff cross processes might lead to obscure issues.

Copy link
Copy Markdown
Author

@pyg410 pyg410 May 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid we need to recreate them too. From what I saw in the docs for httpx it is promising thread-safety (which makes the singleton usage fine) but no process-safety. So copying that stuff cross processes might lead to obscure issues.

Thanks for the feedback!
You're right that httpx.Client is not process-safe across fork().
We've updated _at_fork_reinit to always recreate self.httpx_client, self.api, and self._score_ingestion_client with a fresh client after fork.
For the case where a user passes a custom httpx_client: we considered preserving it, but users have no practical way to handle this themselves — there's no public API to replace the client inside the singleton after fork. So after fork we always create a new default client, which means any custom transport/SSL/proxy settings on a user-provided client will not carry over into child processes.
We're treating this as a known limitation for now: custom httpx_client + fork() (e.g. Gunicorn --preload) is not fully supported.
A proper fix would require accepting a factory callable (e.g. httpx_client_factory: Callable[[], httpx.Client]) instead of an instance, so the library can recreate the client with the original settings after fork — but this is a public API change that requires broader team discussion before we can proceed. We can follow up on that as a separate improvement. (eba27cc)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pyg410 Thanks for addressing the comments.

Regarding the httpx.Client:
I think I'd do explicitly the opposite: don't reinitialize custom httpx client upon fork (but rather just use the copy).

My reasoning:
Users with custom httpx client can't use the SDK with forking in the current implementation. If we let them pass it (but don't reinitalize) it, then we give them at least the chance to handle the reinitialization themselves (or make the client otherwise process safe)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wochinge

Thanks! Made the following changes.(157122e)

  • Custom httpx_client is no longer recreated after fork. The fork-inherited copy is reused as-is, so callers retain the opportunity to handle process-safety themselves (e.g. via their own os.register_at_fork(after_in_child=...) handler).
  • Stored the original httpx_client reference as self._custom_httpx_client instead of a boolean flag, to avoid potential state drift.
  • Updated the docstring to reflect the new behavior.

# here to keep this handler minimal; this mirrors the existing singleton client
# lifecycle. If preload-time network I/O is introduced in the future, clients
# may need fork-specific reinitialization as well.
try:
self._init_consumer_threads()
except Exception as e:
langfuse_logger.error(
f"[PID {os.getpid()}] Failed to reinitialize consumer threads after fork: {e}. "
f"Media upload and score ingestion will be unavailable in this worker."
)

langfuse_logger.debug(
f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork"
)
Comment on lines +345 to 426
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Unhandled exception in after_in_child callback crashes Gunicorn worker

_at_fork_reinit calls _init_consumer_threads, which calls Thread.start(). If the OS refuses to create a thread — e.g., due to resource exhaustion (OSError: can't start new thread) — the exception propagates through the after_in_child callback chain and surfaces as an exception from os.fork() in the child process. Gunicorn would see the worker fail immediately at startup rather than the telemetry path degrading gracefully. Wrapping _init_consumer_threads() in a try/except Exception and logging the error would allow the child to continue (without consumer threads) instead of crashing.

Prompt To Fix With AI
This is a comment left during a code review.
Path: langfuse/_client/resource_manager.py
Line: 344-378

Comment:
**Unhandled exception in `after_in_child` callback crashes Gunicorn worker**

`_at_fork_reinit` calls `_init_consumer_threads`, which calls `Thread.start()`. If the OS refuses to create a thread — e.g., due to resource exhaustion (`OSError: can't start new thread`) — the exception propagates through the `after_in_child` callback chain and surfaces as an exception from `os.fork()` in the child process. Gunicorn would see the worker fail immediately at startup rather than the telemetry path degrading gracefully. Wrapping `_init_consumer_threads()` in a `try/except Exception` and logging the error would allow the child to continue (without consumer threads) instead of crashing.

How can I resolve this? If you propose a fix, please make it concise.


@classmethod
Expand Down Expand Up @@ -449,6 +522,8 @@ def flush(self) -> None:
langfuse_logger.debug("Successfully flushed media upload queue")

def shutdown(self) -> None:
self._shutdown = True

# Unregister the atexit handler first
atexit.unregister(self.shutdown)

Expand Down
60 changes: 60 additions & 0 deletions tests/unit/test_resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,66 @@ def test_media_upload_consumer_signal_shutdown_wakes_blocked_thread():
assert not consumer.is_alive()


def test_at_fork_reinit_creates_new_queues_and_consumers(monkeypatch):
"""_at_fork_reinit() must replace queues and start fresh consumer threads."""
monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false")

with LangfuseResourceManager._lock:
LangfuseResourceManager._instances.clear()

client = Langfuse(
public_key="pk-fork-reinit",
secret_key="sk-fork-reinit",
span_exporter=NoOpSpanExporter(),
)
rm = client._resources
assert rm is not None

old_score_queue = rm._score_ingestion_queue
old_media_queue = rm._media_upload_queue
old_ingestion_consumers = list(rm._ingestion_consumers)

rm._at_fork_reinit()

assert rm._score_ingestion_queue is not old_score_queue
assert rm._media_upload_queue is not old_media_queue
assert len(rm._ingestion_consumers) == 1
assert rm._ingestion_consumers[0].is_alive()

# In a real fork, old threads don't exist in the child process.
# In this unit test they do — stop them explicitly to avoid leaking threads.
for consumer in old_ingestion_consumers:
consumer.pause()
consumer.join(timeout=1.0)

client.shutdown()


def test_at_fork_reinit_skips_when_shutdown(monkeypatch):
"""_at_fork_reinit() must not restart threads after intentional shutdown."""
monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false")

with LangfuseResourceManager._lock:
LangfuseResourceManager._instances.clear()

client = Langfuse(
public_key="pk-fork-shutdown",
secret_key="sk-fork-shutdown",
span_exporter=NoOpSpanExporter(),
)
rm = client._resources
assert rm is not None

old_score_queue = rm._score_ingestion_queue

rm._shutdown = True
rm._at_fork_reinit()

assert rm._score_ingestion_queue is old_score_queue # queue must not be replaced

client.shutdown()


def test_stop_and_join_consumer_threads_broadcasts_media_shutdown_after_pausing_all():
events = []

Expand Down