From ee0130451d49db3a0b2b120cf41f4547f12732b0 Mon Sep 17 00:00:00 2001 From: Laurenz Stampfl Date: Thu, 18 Sep 2025 11:24:34 +0200 Subject: [PATCH 1/5] Implement --- .../vello_cpu/src/dispatch/multi_threaded.rs | 142 +++++++++++++----- .../src/dispatch/multi_threaded/worker.rs | 56 ++++--- 2 files changed, 142 insertions(+), 56 deletions(-) diff --git a/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs b/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs index 3d7559e8a6..d410ab6c4e 100644 --- a/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs +++ b/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs @@ -51,10 +51,7 @@ pub(crate) struct MultiThreadedDispatcher { wide: Wide, /// The thread pool that is used for dispatching tasks. thread_pool: ThreadPool, - /// The current batch of paths we want to render. - task_batch: Vec, - /// The path containing all elements of the current batch. - batch_path: BezPath, + allocation_group: AllocationGroup, /// The cost of the current batch. batch_cost: f32, /// The sender used to dispatch new rendering tasks from the main thread. @@ -102,6 +99,8 @@ pub(crate) struct MultiThreadedDispatcher { strip_storage: StripStorage, level: Level, flushed: bool, + // So that we can reuse memory allocations across different runs. + allocations: Allocations, } impl MultiThreadedDispatcher { @@ -114,7 +113,6 @@ impl MultiThreadedDispatcher { // + 1 because the main thread also stores an alpha buffer, used for recordings. let alpha_storage = MaybePresent::new(vec![vec![]; usize::from(num_threads + 1)]); let workers = Arc::new(ThreadLocal::new()); - let task_batch = vec![]; { // Start counting from 1, as thread_idx 0 is reserved for the main thread. @@ -137,8 +135,8 @@ impl MultiThreadedDispatcher { let mut dispatcher = Self { wide, thread_pool, - task_batch, - batch_path: Default::default(), + allocations: Default::default(), + allocation_group: Default::default(), batch_cost, task_idx, flushed, @@ -224,8 +222,8 @@ impl MultiThreadedDispatcher { self.init(); } - let cost = estimate_render_task_cost(&task, self.batch_path.elements()); - self.task_batch.push(task); + let cost = estimate_render_task_cost(&task, &self.allocation_group.path); + self.allocation_group.render_tasks.push(task); self.batch_cost += cost; if self.batch_cost > COST_THRESHOLD { @@ -237,8 +235,6 @@ impl MultiThreadedDispatcher { self.send_pending_tasks(); self.batch_cost = 0.0; - self.task_batch.clear(); - self.batch_path.truncate(0); } fn bump_task_idx(&mut self) -> u32 { @@ -249,16 +245,12 @@ impl MultiThreadedDispatcher { fn send_pending_tasks(&mut self) { let task_idx = self.bump_task_idx(); - let tasks = self.task_batch.as_slice(); - let path = self.batch_path.elements(); + let allocation_group = + std::mem::replace(&mut self.allocation_group, self.allocations.get()); let task_sender = self.task_sender.as_mut().unwrap(); let task = RenderTask { idx: task_idx, - // TODO: Explore whether the main thread can hold a store of previous allocations so - // we can reuse them. Same for the strips that are returned from - // a child thread. - path: path.into(), - tasks: tasks.into(), + allocation_group, }; task_sender.send(task).unwrap(); self.run_coarse(true); @@ -281,23 +273,25 @@ impl MultiThreadedDispatcher { loop { match result_receiver.try_recv() { - Ok(task) => { - for cmd in task.tasks { + Ok(mut task) => { + let num_tasks = task.allocation_group.coarse_tasks.len(); + for cmd in task.allocation_group.coarse_tasks.drain(0..num_tasks) { match cmd { CoarseTaskType::RenderPath { strips: strip_range, paint, thread_id, } => self.wide.generate( - &task.strips[strip_range.start as usize..strip_range.end as usize], - paint, + &task.allocation_group.strips + [strip_range.start as usize..strip_range.end as usize], + paint.clone(), thread_id, ), CoarseTaskType::RenderWideCommand { strips, paint, thread_id, - } => self.wide.generate(&strips, paint, thread_id), + } => self.wide.generate(&strips, paint.clone(), thread_id), CoarseTaskType::PushLayer { thread_id, clip_path, @@ -306,7 +300,7 @@ impl MultiThreadedDispatcher { opacity, } => { let clip_path = clip_path.map(|strip_range| { - &task.strips + &task.allocation_group.strips [strip_range.start as usize..strip_range.end as usize] }); @@ -316,6 +310,8 @@ impl MultiThreadedDispatcher { CoarseTaskType::PopLayer => self.wide.pop_layer(), } } + + self.allocations.put(task.allocation_group); } Err(e) => match e { TryRecvError::Empty => { @@ -391,9 +387,9 @@ impl Dispatcher for MultiThreadedDispatcher { paint: Paint, aliasing_threshold: Option, ) { - let start = self.batch_path.elements().len() as u32; - self.batch_path.extend(path); - let end = self.batch_path.elements().len() as u32; + let start = self.allocation_group.path.len() as u32; + self.allocation_group.path.extend(path); + let end = self.allocation_group.path.len() as u32; self.register_task(RenderTaskType::FillPath { path_range: start..end, transform, @@ -411,9 +407,9 @@ impl Dispatcher for MultiThreadedDispatcher { paint: Paint, aliasing_threshold: Option, ) { - let start = self.batch_path.elements().len() as u32; - self.batch_path.extend(path); - let end = self.batch_path.elements().len() as u32; + let start = self.allocation_group.path.len() as u32; + self.allocation_group.path.extend(path); + let end = self.allocation_group.path.len() as u32; self.register_task(RenderTaskType::StrokePath { path_range: start..end, transform, @@ -434,9 +430,9 @@ impl Dispatcher for MultiThreadedDispatcher { mask: Option, ) { let mapped_clip = clip_path.map(|c| { - let start = self.batch_path.elements().len() as u32; - self.batch_path.extend(c); - let end = self.batch_path.elements().len() as u32; + let start = self.allocation_group.path.len() as u32; + self.allocation_group.path.extend(c); + let end = self.allocation_group.path.len() as u32; (start..end, clip_transform) }); @@ -456,9 +452,8 @@ impl Dispatcher for MultiThreadedDispatcher { fn reset(&mut self) { self.wide.reset(); - self.task_batch.clear(); + self.allocation_group.clear(); self.batch_cost = 0.0; - self.batch_path.truncate(0); self.task_idx = 0; self.flushed = false; self.task_sender = None; @@ -600,11 +595,80 @@ impl Debug for MultiThreadedDispatcher { } } +struct AllocationManager { + entries: Vec>, +} + +impl AllocationManager { + fn get(&mut self) -> Vec { + self.entries.pop().unwrap_or(vec![]) + } + + fn put(&mut self, mut allocation: Vec) { + allocation.clear(); + self.entries.push(allocation); + } +} + +impl Default for AllocationManager { + fn default() -> Self { + Self { entries: vec![] } + } +} + +#[derive(Default)] +struct Allocations { + render_tasks: AllocationManager, + paths: AllocationManager, + strips: AllocationManager, + coarse_tasks: AllocationManager, +} + +impl Allocations { + fn get(&mut self) -> AllocationGroup { + let render_tasks = self.render_tasks.get(); + let path = self.paths.get(); + let strips = self.strips.get(); + let coarse_tasks = self.coarse_tasks.get(); + + AllocationGroup { + path, + render_tasks, + coarse_tasks, + strips, + } + } + + fn put(&mut self, allocation: AllocationGroup) { + self.render_tasks.put(allocation.render_tasks); + self.paths.put(allocation.path); + self.strips.put(allocation.strips); + self.coarse_tasks.put(allocation.coarse_tasks); + } +} + +#[derive(Default, Debug)] +pub(crate) struct AllocationGroup { + pub(crate) path: Vec, + pub(crate) render_tasks: Vec, + pub(crate) strips: Vec, + pub(crate) coarse_tasks: Vec, +} + +impl AllocationGroup { + fn clear(&mut self) { + self.path.clear(); + self.render_tasks.clear(); + self.strips.clear(); + self.coarse_tasks.clear(); + } +} + #[derive(Debug)] pub(crate) struct RenderTask { pub(crate) idx: u32, - pub(crate) path: Box<[PathEl]>, - pub(crate) tasks: Box<[RenderTaskType]>, + /// The path elements of the task. + pub(crate) allocation_group: AllocationGroup, } #[derive(Debug, Clone)] @@ -640,10 +704,10 @@ pub(crate) enum RenderTaskType { } pub(crate) struct CoarseTask { - pub(crate) strips: Box<[Strip]>, - pub(crate) tasks: Vec, + allocation_group: AllocationGroup, } +#[derive(Debug)] pub(crate) enum CoarseTaskType { RenderPath { thread_id: u8, diff --git a/sparse_strips/vello_cpu/src/dispatch/multi_threaded/worker.rs b/sparse_strips/vello_cpu/src/dispatch/multi_threaded/worker.rs index 29814e43ce..9d1ca1f71c 100644 --- a/sparse_strips/vello_cpu/src/dispatch/multi_threaded/worker.rs +++ b/sparse_strips/vello_cpu/src/dispatch/multi_threaded/worker.rs @@ -41,16 +41,20 @@ impl Worker { pub(crate) fn run_render_task( &mut self, - render_task: RenderTask, + mut render_task: RenderTask, result_sender: &mut CoarseTaskSender, ) { + let num_tasks = render_task.allocation_group.render_tasks.len(); self.strip_storage.strips.clear(); self.strip_storage .set_generation_mode(GenerationMode::Append); - let mut task_buf = Vec::with_capacity(render_task.tasks.len()); let task_idx = render_task.idx; - for task in render_task.tasks { + for task in render_task + .allocation_group + .render_tasks + .drain(0..num_tasks) + { match task { RenderTaskType::FillPath { path_range, @@ -60,8 +64,8 @@ impl Worker { aliasing_threshold, } => { let start = self.strip_storage.strips.len() as u32; - let path = - &render_task.path[path_range.start as usize..path_range.end as usize]; + let path = &render_task.allocation_group.path + [path_range.start as usize..path_range.end as usize]; self.strip_generator.generate_filled_path( path.iter().copied(), @@ -78,7 +82,10 @@ impl Worker { paint, }; - task_buf.push(coarse_command); + render_task + .allocation_group + .coarse_tasks + .push(coarse_command); } RenderTaskType::StrokePath { path_range, @@ -88,8 +95,8 @@ impl Worker { aliasing_threshold, } => { let start = self.strip_storage.strips.len() as u32; - let path = - &render_task.path[path_range.start as usize..path_range.end as usize]; + let path = &render_task.allocation_group.path + [path_range.start as usize..path_range.end as usize]; self.strip_generator.generate_stroked_path( path.iter().copied(), @@ -106,7 +113,10 @@ impl Worker { paint, }; - task_buf.push(coarse_command); + render_task + .allocation_group + .coarse_tasks + .push(coarse_command); } RenderTaskType::PushLayer { clip_path, @@ -118,8 +128,8 @@ impl Worker { } => { let clip = if let Some((path_range, transform)) = clip_path { let start = self.strip_storage.strips.len() as u32; - let path = - &render_task.path[path_range.start as usize..path_range.end as usize]; + let path = &render_task.allocation_group.path + [path_range.start as usize..path_range.end as usize]; self.strip_generator.generate_filled_path( path.iter().copied(), @@ -144,10 +154,16 @@ impl Worker { opacity, }; - task_buf.push(coarse_command); + render_task + .allocation_group + .coarse_tasks + .push(coarse_command); } RenderTaskType::PopLayer => { - task_buf.push(CoarseTaskType::PopLayer); + render_task + .allocation_group + .coarse_tasks + .push(CoarseTaskType::PopLayer); } RenderTaskType::WideCommand { strip_buf, @@ -160,16 +176,22 @@ impl Worker { paint, }; - task_buf.push(coarse_command); + render_task + .allocation_group + .coarse_tasks + .push(coarse_command); } } } - let strips_slice = self.strip_storage.strips.as_slice(); + let taken_strips = std::mem::replace( + &mut self.strip_storage.strips, + render_task.allocation_group.strips, + ); + render_task.allocation_group.strips = taken_strips; let task = CoarseTask { - strips: strips_slice.into(), - tasks: task_buf, + allocation_group: render_task.allocation_group, }; result_sender.send(task_idx as usize, task).unwrap(); From bcfdc5e71bb8eac66387e66397f053853af1b971 Mon Sep 17 00:00:00 2001 From: Laurenz Stampfl Date: Thu, 18 Sep 2025 12:37:39 +0200 Subject: [PATCH 2/5] Adjust documentation --- .../vello_cpu/src/dispatch/multi_threaded.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs b/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs index d410ab6c4e..653230f855 100644 --- a/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs +++ b/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs @@ -311,6 +311,7 @@ impl MultiThreadedDispatcher { } } + // Put the allocation group back so it can be reused in future iterations! self.allocations.put(task.allocation_group); } Err(e) => match e { @@ -595,15 +596,20 @@ impl Debug for MultiThreadedDispatcher { } } +/// A structure that allows storing and fetching existing allocations. struct AllocationManager { entries: Vec>, } impl AllocationManager { + /// Get a new vector allocation. + /// + /// The vector is guaranteed to have been cleared before. fn get(&mut self) -> Vec { self.entries.pop().unwrap_or(vec![]) } + /// Insert a new allocation in the store. fn put(&mut self, mut allocation: Vec) { allocation.clear(); self.entries.push(allocation); @@ -616,15 +622,27 @@ impl Default for AllocationManager { } } +/// A structure to keep track of allocations that will be done while rendering with +/// multi-threading. #[derive(Default)] struct Allocations { + /// The render tasks of a batch. They will be filled by the main thread as new fill/stroke commands + /// come in and consumed by worker threads as they process them. render_tasks: AllocationManager, + /// The path store of a batch. It will be filled by the main thread as new commands come in + /// and be used by the worker thread to generate the strips of a path. paths: AllocationManager, + /// Stores allocations that are used by the worker thread to produce strips. They will be + /// sent back to the main thread which then uses them for coarse rasterization. strips: AllocationManager, + /// The coarse tasks produced by a worker thread, which will be processed by the main thread. coarse_tasks: AllocationManager, } impl Allocations { + /// Return a new allocation group. + /// + /// The group is guaranteed to have been cleared. fn get(&mut self) -> AllocationGroup { let render_tasks = self.render_tasks.get(); let path = self.paths.get(); @@ -667,7 +685,6 @@ impl AllocationGroup { #[derive(Debug)] pub(crate) struct RenderTask { pub(crate) idx: u32, - /// The path elements of the task. pub(crate) allocation_group: AllocationGroup, } From 518cd23528307e500b2031d22adbfc83bed00bbf Mon Sep 17 00:00:00 2001 From: Laurenz Stampfl Date: Thu, 18 Sep 2025 12:46:08 +0200 Subject: [PATCH 3/5] Fix clippy --- sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs b/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs index 653230f855..4ef1d0490d 100644 --- a/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs +++ b/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs @@ -606,7 +606,7 @@ impl AllocationManager { /// /// The vector is guaranteed to have been cleared before. fn get(&mut self) -> Vec { - self.entries.pop().unwrap_or(vec![]) + self.entries.pop().unwrap_or_default() } /// Insert a new allocation in the store. From e73891732f6aa93a0cf45fb3a22fa38b12ae60a0 Mon Sep 17 00:00:00 2001 From: Laurenz Stampfl Date: Fri, 19 Sep 2025 10:36:19 +0200 Subject: [PATCH 4/5] Add a test case --- .../vello_cpu/src/dispatch/multi_threaded.rs | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs b/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs index 4ef1d0490d..8f6b89fc34 100644 --- a/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs +++ b/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs @@ -789,3 +789,35 @@ impl MaybePresent { std::mem::take(&mut *locked) } } + +#[cfg(test)] +mod tests { + use crate::Level; + use crate::color::palette::css::BLUE; + use crate::dispatch::Dispatcher; + use crate::dispatch::multi_threaded::{MultiThreadedDispatcher}; + use crate::kurbo::{Affine, Rect, Shape}; + use crate::peniko::Fill; + use vello_common::paint::{Paint, PremulColor}; + + /// Ensure we don't cause a memory leak. + #[test] + fn allocations() { + let mut dispatcher = MultiThreadedDispatcher::new(100, 100, 4, Level::new()); + for _ in 0..20 { + dispatcher.fill_path( + &Rect::new(0.0, 0.0, 50.0, 50.0).to_path(0.1), + Fill::NonZero, + Affine::IDENTITY, + Paint::Solid(PremulColor::from_alpha_color(BLUE)), + None, + ); + dispatcher.flush(); + } + + assert_eq!(dispatcher.allocations.paths.entries.len(), 1); + assert_eq!(dispatcher.allocations.strips.entries.len(), 1); + assert_eq!(dispatcher.allocations.render_tasks.entries.len(), 1); + assert_eq!(dispatcher.allocations.coarse_tasks.entries.len(), 1); + } +} From f7766c7a878159ef7f145fa32cbdca0ee96568bb Mon Sep 17 00:00:00 2001 From: Laurenz Stampfl Date: Fri, 19 Sep 2025 10:44:36 +0200 Subject: [PATCH 5/5] Reformat --- sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs b/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs index 8f6b89fc34..925ab07f16 100644 --- a/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs +++ b/sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs @@ -795,7 +795,7 @@ mod tests { use crate::Level; use crate::color::palette::css::BLUE; use crate::dispatch::Dispatcher; - use crate::dispatch::multi_threaded::{MultiThreadedDispatcher}; + use crate::dispatch::multi_threaded::MultiThreadedDispatcher; use crate::kurbo::{Affine, Rect, Shape}; use crate::peniko::Fill; use vello_common::paint::{Paint, PremulColor};