Skip to content

Commit 7b73e2b

Browse files
protocolstardustsingaraiona
authored andcommitted
Revert "pool fit"
This reverts commit d55a23b.
1 parent 9cb14c9 commit 7b73e2b

4 files changed

Lines changed: 95 additions & 201 deletions

File tree

core/pool.c

Lines changed: 93 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,8 @@ obj_p pool_call_task_fn(raw_p fn, i64_t argc, raw_p argv[]) {
167167

168168
raw_p executor_run(raw_p arg) {
169169
executor_t *executor = (executor_t *)arg;
170-
task_data_t *task;
171-
i64_t i, task_start, task_end, processed;
170+
task_data_t data;
171+
i64_t i, tasks_count;
172172
obj_p res;
173173
vm_p vm;
174174

@@ -180,33 +180,34 @@ raw_p executor_run(raw_p arg) {
180180
__atomic_store_n(&executor->vm, vm, __ATOMIC_RELAXED);
181181

182182
for (;;) {
183-
// Wait for work using lightweight eventfd (no mutex needed)
184-
event_wait(executor->event_fd);
183+
mutex_lock(&executor->pool->mutex);
184+
cond_wait(&executor->pool->run, &executor->pool->mutex);
185185

186-
// Memory barrier: ensure we see all writes from the signaling thread
187-
__atomic_thread_fence(__ATOMIC_ACQUIRE);
188-
189-
// Check if pool is being destroyed
190-
if (executor->pool->state == RUN_STATE_STOPPED)
186+
if (executor->pool->state == RUN_STATE_STOPPED) {
187+
mutex_unlock(&executor->pool->mutex);
191188
break;
189+
}
190+
191+
tasks_count = executor->pool->tasks_count;
192+
mutex_unlock(&executor->pool->mutex);
193+
194+
// process tasks
195+
for (i = 0; i < tasks_count; i++) {
196+
data = mpmc_pop(executor->pool->task_queue);
192197

193-
// Read assigned task range (set by pool_run before signaling)
194-
task_start = executor->task_start;
195-
task_end = executor->task_end;
196-
197-
// Process assigned tasks directly from array (no queue contention)
198-
processed = 0;
199-
for (i = task_start; i < task_end; i++) {
200-
task = &executor->pool->tasks[i];
201-
res = pool_call_task_fn(task->fn, task->argc, task->argv);
202-
__atomic_store_n(&executor->pool->results[task->id], res, __ATOMIC_RELEASE);
203-
processed++;
198+
// Nothing to do
199+
if (data.id == -1)
200+
break;
201+
202+
// execute task
203+
res = pool_call_task_fn(data.fn, data.argc, data.argv);
204+
data.result = res;
205+
mpmc_push(executor->pool->result_queue, data);
204206
}
205207

206-
if (processed > 0) {
207-
// Atomic increment of done_count + signal main thread
208-
__atomic_fetch_add(&executor->pool->done_count, processed, __ATOMIC_RELEASE);
208+
if (i > 0) {
209209
mutex_lock(&executor->pool->mutex);
210+
executor->pool->done_count += i;
210211
cond_signal(&executor->pool->done);
211212
mutex_unlock(&executor->pool->mutex);
212213
}
@@ -227,9 +228,8 @@ pool_p pool_create(i64_t thread_count) {
227228
pool->executors_count = thread_count;
228229
pool->done_count = 0;
229230
pool->tasks_count = 0;
230-
pool->tasks_capacity = DEFAULT_MPMC_SIZE;
231-
pool->tasks = (task_data_t *)heap_mmap(pool->tasks_capacity * sizeof(task_data_t));
232-
pool->results = NULL; // Allocated per-run
231+
pool->task_queue = mpmc_create(DEFAULT_MPMC_SIZE);
232+
pool->result_queue = mpmc_create(DEFAULT_MPMC_SIZE);
233233
pool->state = RUN_STATE_RUNNING;
234234
pool->mutex = mutex_create();
235235
pool->run = cond_create();
@@ -238,7 +238,6 @@ pool_p pool_create(i64_t thread_count) {
238238
// Executor[0] is the main thread - create VM directly here
239239
pool->executors[0].id = 0;
240240
pool->executors[0].pool = pool;
241-
pool->executors[0].event_fd = -1; // Main thread doesn't need event_fd
242241
vm = vm_create(0, pool);
243242
pool->executors[0].vm = vm;
244243
pool->executors[0].heap = vm->heap;
@@ -254,7 +253,6 @@ pool_p pool_create(i64_t thread_count) {
254253
pool->executors[i].pool = pool;
255254
pool->executors[i].heap = NULL;
256255
pool->executors[i].vm = NULL;
257-
pool->executors[i].event_fd = event_create(); // Per-worker event for wake-up
258256
pool->executors[i].handle = ray_thread_create(executor_run, &pool->executors[i]);
259257
if (thread_pin(pool->executors[i].handle, i) != 0)
260258
LOG_WARN("failed to pin thread %lld", i);
@@ -273,33 +271,24 @@ pool_p pool_create(i64_t thread_count) {
273271
nil_t pool_destroy(pool_p pool) {
274272
i64_t i, n;
275273

276-
n = pool->executors_count;
277-
278-
// Signal stop state
279274
mutex_lock(&pool->mutex);
280275
pool->state = RUN_STATE_STOPPED;
276+
cond_broadcast(&pool->run);
281277
mutex_unlock(&pool->mutex);
282278

283-
// Wake all workers using eventfd (they'll see RUN_STATE_STOPPED and exit)
284-
for (i = 1; i < n; i++) {
285-
event_signal(pool->executors[i].event_fd);
286-
}
279+
n = pool->executors_count;
287280

288281
// Join worker threads (executor[1..n-1]), not main thread
289282
for (i = 1; i < n; i++) {
290283
if (thread_join(pool->executors[i].handle) != 0)
291284
LOG_WARN("failed to join thread %lld", i);
292285
}
293286

294-
// Destroy event fds
295-
for (i = 1; i < n; i++) {
296-
event_destroy(pool->executors[i].event_fd);
297-
}
298-
299287
mutex_destroy(&pool->mutex);
300288
cond_destroy(&pool->run);
301289
cond_destroy(&pool->done);
302-
heap_unmap(pool->tasks, pool->tasks_capacity * sizeof(task_data_t));
290+
mpmc_destroy(pool->task_queue);
291+
mpmc_destroy(pool->result_queue);
303292

304293
// Destroy main thread's VM (executor[0]) last - after all heap operations
305294
vm_destroy(pool->executors[0].vm);
@@ -324,49 +313,63 @@ nil_t pool_prepare(pool_p pool) {
324313
n = pool->executors_count;
325314
for (i = 1; i < n; i++) { // Skip executor[0] (main thread) - no self-borrow
326315
heap_borrow(pool->executors[i].heap);
327-
event_clear(pool->executors[i].event_fd); // Clear any stale signals
328316
}
329317

330318
mutex_unlock(&pool->mutex);
331319
}
332320

333321
nil_t pool_add_task(pool_p pool, raw_p fn, i64_t argc, ...) {
334-
i64_t i, idx;
322+
i64_t i, size;
335323
va_list args;
336-
task_data_t *new_tasks;
324+
task_data_t data, old_data;
325+
mpmc_p queue;
337326

338327
if (pool == NULL)
339328
PANIC("pool is NULL");
340329

341330
mutex_lock(&pool->mutex);
342331

343-
// Grow tasks array if needed
344-
if (pool->tasks_count >= pool->tasks_capacity) {
345-
i64_t new_capacity = pool->tasks_capacity * 2;
346-
new_tasks = (task_data_t *)heap_mmap(new_capacity * sizeof(task_data_t));
347-
memcpy(new_tasks, pool->tasks, pool->tasks_count * sizeof(task_data_t));
348-
heap_unmap(pool->tasks, pool->tasks_capacity * sizeof(task_data_t));
349-
pool->tasks = new_tasks;
350-
pool->tasks_capacity = new_capacity;
351-
}
352-
353-
idx = pool->tasks_count++;
354-
pool->tasks[idx].id = idx;
355-
pool->tasks[idx].fn = fn;
356-
pool->tasks[idx].argc = argc;
332+
data.id = pool->tasks_count++;
333+
data.fn = fn;
334+
data.argc = argc;
357335

358336
va_start(args, argc);
337+
359338
for (i = 0; i < argc; i++)
360-
pool->tasks[idx].argv[i] = va_arg(args, raw_p);
339+
data.argv[i] = va_arg(args, raw_p);
340+
361341
va_end(args);
362342

343+
if (mpmc_push(pool->task_queue, data) == -1) // queue is full
344+
{
345+
size = pool->tasks_count * 2;
346+
// Grow task queue
347+
queue = mpmc_create(size);
348+
349+
for (;;) {
350+
old_data = mpmc_pop(pool->task_queue);
351+
if (old_data.id == -1)
352+
break;
353+
mpmc_push(queue, old_data);
354+
}
355+
356+
if (mpmc_push(queue, data) == -1)
357+
PANIC("oom");
358+
359+
mpmc_destroy(pool->task_queue);
360+
pool->task_queue = queue;
361+
// Grow result queue
362+
mpmc_destroy(pool->result_queue);
363+
pool->result_queue = mpmc_create(size);
364+
}
365+
363366
mutex_unlock(&pool->mutex);
364367
}
365368

366369
obj_p pool_run(pool_p pool) {
367-
i64_t i, n, tasks_count, executors_count, workers_needed, chunk, main_processed;
368-
obj_p e, res, result;
369-
task_data_t *task;
370+
i64_t i, n, tasks_count, executors_count;
371+
obj_p e, res;
372+
task_data_t data;
370373

371374
if (pool == NULL)
372375
PANIC("pool is NULL");
@@ -378,63 +381,47 @@ obj_p pool_run(pool_p pool) {
378381
tasks_count = pool->tasks_count;
379382
executors_count = pool->executors_count;
380383

381-
// Allocate direct results array (workers write directly, avoiding result queue)
382-
pool->results = (obj_p *)heap_alloc(tasks_count * sizeof(obj_p));
383-
for (i = 0; i < tasks_count; i++)
384-
pool->results[i] = NULL_OBJ;
385-
386-
// Distribute tasks evenly among workers (including main thread)
387-
// This eliminates queue contention - each worker has its own range
388-
workers_needed = MINI64(tasks_count, executors_count);
389-
chunk = tasks_count / workers_needed;
390-
391-
// Assign task ranges to workers (executor[1..n-1])
392-
for (i = 1; i < workers_needed; i++) {
393-
pool->executors[i].task_start = i * chunk;
394-
pool->executors[i].task_end = (i == workers_needed - 1) ? tasks_count : (i + 1) * chunk;
395-
}
396-
397-
// Main thread (executor[0]) takes the first chunk
398-
pool->executors[0].task_start = 0;
399-
pool->executors[0].task_end = (workers_needed > 1) ? chunk : tasks_count;
400-
401-
// Memory barrier: ensure task ranges and results array are visible
402-
__atomic_thread_fence(__ATOMIC_RELEASE);
384+
// wake up needed executors
385+
if (executors_count < tasks_count) {
386+
for (i = 0; i < executors_count; i++)
387+
cond_signal(&pool->run);
388+
} else
389+
cond_broadcast(&pool->run);
403390

404391
mutex_unlock(&pool->mutex);
405392

406-
// Wake up needed workers using lightweight eventfd
407-
for (i = 1; i < workers_needed; i++)
408-
event_signal(pool->executors[i].event_fd);
409-
410-
// Process main thread's assigned tasks (no queue, direct array access)
411-
main_processed = 0;
412-
for (i = pool->executors[0].task_start; i < pool->executors[0].task_end; i++) {
413-
task = &pool->tasks[i];
414-
result = pool_call_task_fn(task->fn, task->argc, task->argv);
415-
__atomic_store_n(&pool->results[task->id], result, __ATOMIC_RELEASE);
416-
main_processed++;
417-
}
393+
// process tasks on self too
394+
for (i = 0; i < tasks_count; i++) {
395+
data = mpmc_pop(pool->task_queue);
418396

419-
// Add main thread's contribution to done_count
420-
__atomic_fetch_add(&pool->done_count, main_processed, __ATOMIC_RELEASE);
397+
// Nothing to do
398+
if (data.id == -1)
399+
break;
400+
401+
// execute task
402+
res = pool_call_task_fn(data.fn, data.argc, data.argv);
403+
data.result = res;
404+
mpmc_push(pool->result_queue, data);
405+
}
421406

422407
mutex_lock(&pool->mutex);
423408

409+
pool->done_count += i;
410+
424411
// wait for all tasks to be done
425-
while (__atomic_load_n(&pool->done_count, __ATOMIC_ACQUIRE) < tasks_count)
412+
while (pool->done_count < tasks_count)
426413
cond_wait(&pool->done, &pool->mutex);
427414

428-
// Collect results directly from array (no queue overhead)
415+
// collect results
429416
res = LIST(tasks_count);
417+
430418
for (i = 0; i < tasks_count; i++) {
431-
result = __atomic_load_n(&pool->results[i], __ATOMIC_ACQUIRE);
432-
AS_LIST(res)[i] = result;
433-
}
419+
data = mpmc_pop(pool->result_queue);
420+
if (data.id < 0 || data.id >= (i64_t)tasks_count)
421+
PANIC("corrupted: %lld", data.id);
434422

435-
// Free results array
436-
heap_free(pool->results);
437-
pool->results = NULL;
423+
ins_obj(&res, data.id, data.result);
424+
}
438425

439426
// merge heaps
440427
n = pool->executors_count;

core/pool.h

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,6 @@ typedef struct executor_t {
7777
ray_thread_t handle;
7878
heap_p heap;
7979
vm_p vm;
80-
i32_t event_fd; // Per-worker event for lightweight wake-up
81-
i64_t task_start; // Start index of assigned tasks
82-
i64_t task_end; // End index (exclusive) of assigned tasks
8380
} executor_t;
8481

8582
typedef struct pool_t {
@@ -90,9 +87,8 @@ typedef struct pool_t {
9087
i64_t done_count; // Number of done VMs
9188
i64_t executors_count; // Number of executors
9289
i64_t tasks_count; // Number of tasks
93-
i64_t tasks_capacity; // Capacity of tasks array
94-
task_data_t *tasks; // Direct task array (workers read by index, no queue contention)
95-
obj_p *results; // Direct result array (avoids result queue contention)
90+
mpmc_p task_queue; // Pool's task queue
91+
mpmc_p result_queue; // Pool's result queue
9692
executor_t executors[]; // Array of executors
9793
} *pool_p;
9894

0 commit comments

Comments
 (0)