Skip to content

INTPYTHON-946: parallelism "process" crash #391

@CoderJoshDK

Description

@CoderJoshDK

I tried to run find_polars_all with process parallelism to test my #390 fix. I actually didn't run it in process during the making of that issue.

The only other context missing from my below crash is that this instance is ran inside an async loop. Even more specific, asyncio.to_thread. The rest of that context is missing because scrubbing the personal details is a bit of a pain. So I just didn't. On better news ,,, my suggestion of using permissive worked!

Crash

/Users/joshie/work/kroo-tran/.venv/lib/python3.13/site-packages/pymongoarrow/api.py:503 in find_polars_all                                                                                                                                                                               │
│                                                                                                                                                                                                                                                                                          │
│   500 │   .. versionadded:: 1.3                                                                                                                                                                                                                                                          │
│   501 │   """                                                                                                                                                                                                                                                                            │
│   502 │   return _arrow_to_polars(                                                                                                                                                                                                                                                       │
│ ❱ 503 │   │   find_arrow_all(                                                                                                                                                                                                                                                            │
│   504 │   │   │   collection,                                                                                                                                                                                                                                                            │
│   505 │   │   │   query,                                                                                                                                                                                                                                                                 │
│   506 │   │   │   schema=schema,                                                                                                                                                                                                                                                         │
│                                                                                                                                                                                                                                                                                          │
│ ╭─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── locals ───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ │
│ │ allow_invalid = True                                                                                                                                                                                                                                                                 │ │
│ │    collection = Collection(Database(MongoClient(host=['b-shard-00-00.k9tf3.mongodb.net:27017', 'b-shard-00-02.k9tf3.mongodb.net:27017', 'b-shard-00-01.k9tf3.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, │ │
│ │                 appname='b', authsource='admin', replicaset='atlas-mrgjki-shard-0', tls=True), 'autodesk_cc'), 'form_worklog_entries')                                                                                                                               │ │
│ │        kwargs = {}                                                                                                                                                                                                                                                                   │ │
│ │   parallelism = 'processes'                                                                                                                                                                                                                                                          │ │
│ │         query = {}                                                                                                                                                                                                                                                                   │ │
│ │        schema = None                                                                                                                                                                                                                                                                 │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │
│                                                                                                                                                                                                                                                                                          │
│ /Users/joshie/work/kroo-tran/.venv/lib/python3.13/site-packages/pymongoarrow/api.py:175 in find_arrow_all                                                                                                                                                                                │
│                                                                                                                                                                                                                                                                                          │
│   172 │   │   return pa.concat_tables(results, promote_options="permissive")                                                                                                                                                                                                             │
│   173 │                                                                                                                                                                                                                                                                                  │
│   174 │   if parallelism == "processes":                                                                                                                                                                                                                                                 │
│ ❱ 175 │   │   with multiprocessing.Pool(processes=4) as pool:                                                                                                                                                                                                                            │
│   176 │   │   │   results = pool.starmap(_process_batch, args_iterable())                                                                                                                                                                                                                │
│   177 │   │   return pa.concat_tables(results, promote_options="permissive")                                                                                                                                                                                                             │
│   178                                                                                                                                                                                                                                                                                    │
│                                                                                                                                                                                                                                                                                          │
│ ╭─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── locals ───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ │
│ │    allow_invalid = True                                                                                                                                                                                                                                                              │ │
│ │       collection = Collection(Database(MongoClient(host=['b-shard-00-00.k9tf3.mongodb.net:27017', 'b-shard-00-02.k9tf3.mongodb.net:27017', 'b-shard-00-01.k9tf3.mongodb.net:27017'], document_class=dict, tz_aware=False,            │ │
│ │                    connect=True, appname='b', authsource='admin', replicaset='atlas-mrgjki-shard-0', tls=True), 'autodesk_cc'), 'form_worklog_entries')                                                                                                              │ │
│ │           kwargs = {}                                                                                                                                                                                                                                                                │ │
│ │              opt = 'cursor_type'                                                                                                                                                                                                                                                     │ │
│ │      parallelism = 'processes'                                                                                                                                                                                                                                                       │ │
│ │            query = {}                                                                                                                                                                                                                                                                │ │
│ │ raw_batch_cursor = <pymongo.synchronous.cursor.RawBatchCursor object at 0x10db0f620>                                                                                                                                                                                                 │ │
│ │           schema = None                                                                                                                                                                                                                                                              │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │
│                                                                                                                                                                                                                                                                                          │
│ /Users/joshie/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/multiprocessing/context.py:119 in Pool                                                                                                                                                            │
│                                                                                                                                                                                                                                                                                          │
│   116 │   │   │    maxtasksperchild=None):                                                     ╭───────────────────────────────────── locals ──────────────────────────────────────╮                                                                                                     │
│   117 │   │   '''Returns a process pool object'''                                              │         initargs = ()                                                             │                                                                                                     │
│   118 │   │   from .pool import Pool                                                           │      initializer = None                                                           │                                                                                                     │
│ ❱ 119 │   │   return Pool(processes, initializer, initargs, maxtasksperchild,                  │ maxtasksperchild = None                                                           │                                                                                                     │
│   120 │   │   │   │   │   context=self.get_context())                                          │        processes = 4                                                              │                                                                                                     │
│   121 │                                                                                        │             self = <multiprocessing.context.DefaultContext object at 0x10794c590> │                                                                                                     │
│   122 │   def RawValue(self, typecode_or_type, *args):                                         ╰───────────────────────────────────────────────────────────────────────────────────╯                                                                                                     │
│                                                                                                                                                                                                                                                                                          │
│ /Users/joshie/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/multiprocessing/pool.py:191 in __init__                                                                                                                                                           │
│                                                                                                                                                                                                                                                                                          │
│   188 │   │   self._state = INIT                                                               ╭──────────────────────────────────── locals ─────────────────────────────────────╮                                                                                                       │
│   189 │   │                                                                                    │          context = <multiprocessing.context.SpawnContext object at 0x10772ba10> │                                                                                                       │
│   190 │   │   self._ctx = context or get_context()                                             │         initargs = ()                                                           │                                                                                                       │
│ ❱ 191 │   │   self._setup_queues()                                                             │      initializer = None                                                         │                                                                                                       │
│   192 │   │   self._taskqueue = queue.SimpleQueue()                                            │ maxtasksperchild = None                                                         │                                                                                                       │
│   193 │   │   # The _change_notifier queue exist to wake up self._handle_workers()             │        processes = 4                                                            │                                                                                                       │
│   194 │   │   # when the cache (self._cache) is empty or when there is a change in             │             self = <multiprocessing.pool.Pool state=INIT pool_size=0>           │                                                                                                       │
│                                                                                                ╰─────────────────────────────────────────────────────────────────────────────────╯                                                                                                       │
│                                                                                                                                                                                                                                                                                          │
│ /Users/joshie/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/multiprocessing/pool.py:346 in _setup_queues                                                                                                                                                      │
│                                                                                                                                                                                                                                                                                          │
│   343 │   │   │   │   │   │   │   │   │   │    wrap_exception)                                 ╭───────────────────────── locals ──────────────────────────╮                                                                                                                             │
│   344 │                                                                                        │ self = <multiprocessing.pool.Pool state=INIT pool_size=0> │                                                                                                                             │
│   345 │   def _setup_queues(self):                                                             ╰───────────────────────────────────────────────────────────╯                                                                                                                             │
│ ❱ 346 │   │   self._inqueue = self._ctx.SimpleQueue()                                                                                                                                                                                                                                    │
│   347 │   │   self._outqueue = self._ctx.SimpleQueue()                                                                                                                                                                                                                                   │
│   348 │   │   self._quick_put = self._inqueue._writer.send                                                                                                                                                                                                                               │
│   349 │   │   self._quick_get = self._outqueue._reader.recv                                                                                                                                                                                                                              │
│                                                                                                                                                                                                                                                                                          │
│ /Users/joshie/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/multiprocessing/context.py:113 in SimpleQueue                                                                                                                                                     │
│                                                                                                                                                                                                                                                                                          │
│   110 │   def SimpleQueue(self):                                                               ╭────────────────────────────── locals ───────────────────────────────╮                                                                                                                   │
│   111 │   │   '''Returns a queue object'''                                                     │ self = <multiprocessing.context.SpawnContext object at 0x10772ba10> │                                                                                                                   │
│   112 │   │   from .queues import SimpleQueue                                                  ╰─────────────────────────────────────────────────────────────────────╯                                                                                                                   │
│ ❱ 113 │   │   return SimpleQueue(ctx=self.get_context())                                                                                                                                                                                                                                 │
│   114 │                                                                                                                                                                                                                                                                                  │
│   115 │   def Pool(self, processes=None, initializer=None, initargs=(),                                                                                                                                                                                                                  │
│   116 │   │   │    maxtasksperchild=None):                                                                                                                                                                                                                                               │
│                                                                                                                                                                                                                                                                                          │
│ /Users/joshie/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/multiprocessing/queues.py:361 in __init__                                                                                                                                                         │
│                                                                                                                                                                                                                                                                                          │
│   358 │                                                                                        ╭────────────────────────────── locals ───────────────────────────────╮                                                                                                                   │
│   359 │   def __init__(self, *, ctx):                                                          │  ctx = <multiprocessing.context.SpawnContext object at 0x10772ba10> │                                                                                                                   │
│   360 │   │   self._reader, self._writer = connection.Pipe(duplex=False)                       │ self = <multiprocessing.queues.SimpleQueue object at 0x10db0e900>   │                                                                                                                   │
│ ❱ 361 │   │   self._rlock = ctx.Lock()                                                         ╰─────────────────────────────────────────────────────────────────────╯                                                                                                                   │
│   362 │   │   self._poll = self._reader.poll                                                                                                                                                                                                                                             │
│   363 │   │   if sys.platform == 'win32':                                                                                                                                                                                                                                                │
│   364 │   │   │   self._wlock = None                                                                                                                                                                                                                                                     │
│                                                                                                                                                                                                                                                                                          │
│ /Users/joshie/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/multiprocessing/context.py:68 in Lock                                                                                                                                                             │
│                                                                                                                                                                                                                                                                                          │
│    65 │   def Lock(self):                                                                      ╭────────────────────────────── locals ───────────────────────────────╮                                                                                                                   │
│    66 │   │   '''Returns a non-recursive lock object'''                                        │ self = <multiprocessing.context.SpawnContext object at 0x10772ba10> │                                                                                                                   │
│    67 │   │   from .synchronize import Lock                                                    ╰─────────────────────────────────────────────────────────────────────╯                                                                                                                   │
│ ❱  68 │   │   return Lock(ctx=self.get_context())                                                                                                                                                                                                                                        │
│    69 │                                                                                                                                                                                                                                                                                  │
│    70 │   def RLock(self):                                                                                                                                                                                                                                                               │
│    71 │   │   '''Returns a recursive lock object'''                                                                                                                                                                                                                                      │
│                                                                                                                                                                                                                                                                                          │
│ /Users/joshie/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/multiprocessing/synchronize.py:169 in __init__                                                                                                                                                    │
│                                                                                                                                                                                                                                                                                          │
│   166 class Lock(SemLock):                                                                     ╭────────────────────────────── locals ───────────────────────────────╮                                                                                                                   │
│   167 │                                                                                        │  ctx = <multiprocessing.context.SpawnContext object at 0x10772ba10> │                                                                                                                   │
│   168 │   def __init__(self, *, ctx):                                                          │ self = <Lock(owner=None)>                                           │                                                                                                                   │
│ ❱ 169 │   │   SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)                                 ╰─────────────────────────────────────────────────────────────────────╯                                                                                                                   │
│   170 │                                                                                                                                                                                                                                                                                  │
│   171 │   def __repr__(self):                                                                                                                                                                                                                                                            │
│   172 │   │   try:                                                                                                                                                                                                                                                                       │
│                                                                                                                                                                                                                                                                                          │
│ /Users/joshie/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/multiprocessing/synchronize.py:80 in __init__                                                                                                                                                     │
│                                                                                                                                                                                                                                                                                          │
│    77 │   │   │   # disabled.  When the object is garbage collected or the                     ╭───────────────────────────────────────────────────────────── locals ─────────────────────────────────────────────────────────────╮                                                      │
│    78 │   │   │   # process shuts down we unlink the semaphore name                            │        ctx = <multiprocessing.context.SpawnContext object at 0x10772ba10>                                                        │                                                      │
│    79 │   │   │   from .resource_tracker import register                                       │          i = 0                                                                                                                   │                                                      │
│ ❱  80 │   │   │   register(self._semlock.name, "semaphore")                                    │       kind = 1                                                                                                                   │                                                      │
│    81 │   │   │   util.Finalize(self, SemLock._cleanup, (self._semlock.name,),                 │   maxvalue = 1                                                                                                                   │                                                      │
│    82 │   │   │   │   │   │     exitpriority=0)                                                │   register = <bound method ResourceTracker.register of <multiprocessing.resource_tracker.ResourceTracker object at 0x10dcd86e0>> │                                                      │
│    83                                                                                          │       self = <Lock(owner=None)>                                                                                                  │                                                      │
│                                                                                                │         sl = <_multiprocessing.SemLock object at 0x10dcea890>                                                                    │                                                      │
│                                                                                                │ unlink_now = False                                                                                                               │                                                      │
│                                                                                                │      value = 1                                                                                                                   │                                                      │
│                                                                                                ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯                                                      │
│                                                                                                                                                                                                                                                                                          │
│ /Users/joshie/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/multiprocessing/resource_tracker.py:272 in register                                                                                                                                               │
│                                                                                                                                                                                                                                                                                          │
│   269 │                                                                                        ╭───────────────────────────────────── locals ─────────────────────────────────────╮                                                                                                      │
│   270 │   def register(self, name, rtype):                                                     │  name = '/mp-qjv2djku'                                                           │                                                                                                      │
│   271 │   │   '''Register name of resource with resource tracker.'''                           │ rtype = 'semaphore'                                                              │                                                                                                      │
│ ❱ 272 │   │   self._send('REGISTER', name, rtype)                                              │  self = <multiprocessing.resource_tracker.ResourceTracker object at 0x10dcd86e0> │                                                                                                      │
│   273 │                                                                                        ╰──────────────────────────────────────────────────────────────────────────────────╯                                                                                                      │
│   274 │   def unregister(self, name, rtype):                                                                                                                                                                                                                                             │
│   275 │   │   '''Unregister name of resource with resource tracker.'''                                                                                                                                                                                                                   │
│                                                                                                                                                                                                                                                                                          │
│ /Users/joshie/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/multiprocessing/resource_tracker.py:289 in _send                                                                                                                                                  │
│                                                                                                                                                                                                                                                                                          │
│   286 │   │   │   │   # posix guarantees that writes to a pipe of less than PIPE_BUF           ╭───────────────────────────────────── locals ─────────────────────────────────────╮                                                                                                      │
│   287 │   │   │   │   # bytes are atomic, and that PIPE_BUF >= 512                             │   cmd = 'REGISTER'                                                               │                                                                                                      │
│   288 │   │   │   │   raise ValueError('msg too long')                                         │   msg = b'REGISTER:/mp-qjv2djku:semaphore\n'                                     │                                                                                                      │
│ ❱ 289 │   │   │   self._ensure_running_and_write(msg)                                          │  name = '/mp-qjv2djku'                                                           │                                                                                                      │
│   290 │   │   │   return                                                                       │ rtype = 'semaphore'                                                              │                                                                                                      │
│   291 │   │                                                                                    │  self = <multiprocessing.resource_tracker.ResourceTracker object at 0x10dcd86e0> │                                                                                                      │
│   292 │   │   # POSIX guarantees that writes to a pipe of less than PIPE_BUF (512 on Linux)    ╰──────────────────────────────────────────────────────────────────────────────────╯                                                                                                      │
│                                                                                                                                                                                                                                                                                          │
│ /Users/joshie/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/multiprocessing/resource_tracker.py:248 in _ensure_running_and_write                                                                                                                              │
│                                                                                                                                                                                                                                                                                          │
│   245 │   │   │   │                                                                            ╭──────────────────────────────────── locals ─────────────────────────────────────╮                                                                                                       │
│   246 │   │   │   │   msg = None  # message was sent in probe                                  │  msg = b'REGISTER:/mp-qjv2djku:semaphore\n'                                     │                                                                                                       │
│   247 │   │   │   else:                                                                        │ self = <multiprocessing.resource_tracker.ResourceTracker object at 0x10dcd86e0> │                                                                                                       │
│ ❱ 248 │   │   │   │   self._launch()                                                           ╰─────────────────────────────────────────────────────────────────────────────────╯                                                                                                       │
│   249 │   │                                                                                                                                                                                                                                                                              │
│   250 │   │   while True:                                                                                                                                                                                                                                                                │
│   251 │   │   │   try:                                                                                                                                                                                                                                                                   │
│                                                                                                                                                                                                                                                                                          │
│ /Users/joshie/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/multiprocessing/resource_tracker.py:200 in _launch                                                                                                                                                │
│                                                                                                                                                                                                                                                                                          │
│   197 │   │   │   try:                                                                         ╭──────────────────────────────────────────────────────────────── locals ────────────────────────────────────────────────────────────────╮                                                │
│   198 │   │   │   │   if _HAVE_SIGMASK:                                                        │         args = [b'/Users/joshie/work/kroo-tran/.venv/bin/python3', '-c', 'from multiprocessing.resource_tracker import main;main(25)'] │                                                │
│   199 │   │   │   │   │   prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGexe = b'/Users/joshie/work/kroo-tran/.venv/bin/python3'                                                                       │                                                │
│ ❱ 200 │   │   │   │   pid = util.spawnv_passfds(exe, args, fds_to_pass)                        │  fds_to_pass = [-1, 25]                                                                                                                │                                                │
│   201 │   │   │   finally:                                                                     │ prev_sigmask = set()                                                                                                                   │                                                │
│   202 │   │   │   │   if prev_sigmask is not None:                                             │            r = 25                                                                                                                      │                                                │
│   203 │   │   │   │   │   signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)             │         self = <multiprocessing.resource_tracker.ResourceTracker object at 0x10dcd86e0>                                                │                                                │
│                                                                                                │            w = 26                                                                                                                      │                                                │
│                                                                                                ╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯                                                │
│                                                                                                                                                                                                                                                                                          │
│ /Users/joshie/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/multiprocessing/util.py:524 in spawnv_passfds                                                                                                                                                     │
│                                                                                                                                                                                                                                                                                          │
│   521passfds = tuple(sorted(map(int, passfds)))                                           ╭─────────────────────────────────────────────────────────────────────── locals ───────────────────────────────────────────────────────────────────────╮                                  │
│   522errpipe_read, errpipe_write = os.pipe()                                              │ _posixsubprocess = <module '_posixsubprocess' (built-in)>                                                                                            │                                  │
│   523try:                                                                                 │             args = [b'/Users/joshie/work/kroo-tran/.venv/bin/python3', '-c', 'from multiprocessing.resource_tracker import main;main(25)']           │                                  │
│ ❱ 524 │   │   return _posixsubprocess.fork_exec(                                               │     errpipe_read = 27                                                                                                                                │                                  │
│   525 │   │   │   args, [path], True, passfds, None, None,                                     │    errpipe_write = 28                                                                                                                                │                                  │
│   526 │   │   │   -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,                         │          passfds = (-1, 25)                                                                                                                          │                                  │
│   527 │   │   │   False, False, -1, None, None, None, -1, None,                                │             path = b'/Users/joshie/work/kroo-tran/.venv/bin/python3'                                                                                 │                                  │
│                                                                                                │       subprocess = <module 'subprocess' from '/Users/joshie/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/subprocess.py'> │                                  │
│                                                                                                ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯                                  │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
ValueError: bad value(s) in fds_to_keep

I do not have a good minimum reproducible example. I will try to make one in the future. But for right now, I am very busy with other things. If I still don't have anything for you in two weeks, feel free to ping me and remind me. My important stuff will free up by then.
I am still likely to make the permissive PR though.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions