Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 120 additions & 39 deletions sparse_strips/vello_cpu/src/dispatch/multi_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ pub(crate) struct MultiThreadedDispatcher {
wide: Wide,
/// The thread pool that is used for dispatching tasks.
thread_pool: ThreadPool,
/// The current batch of paths we want to render.
task_batch: Vec<RenderTaskType>,
/// The path containing all elements of the current batch.
batch_path: BezPath,
allocation_group: AllocationGroup,
/// The cost of the current batch.
batch_cost: f32,
/// The sender used to dispatch new rendering tasks from the main thread.
Expand Down Expand Up @@ -102,6 +99,8 @@ pub(crate) struct MultiThreadedDispatcher {
strip_storage: StripStorage,
level: Level,
flushed: bool,
// So that we can reuse memory allocations across different runs.
allocations: Allocations,
}

impl MultiThreadedDispatcher {
Expand All @@ -114,7 +113,6 @@ impl MultiThreadedDispatcher {
// + 1 because the main thread also stores an alpha buffer, used for recordings.
let alpha_storage = MaybePresent::new(vec![vec![]; usize::from(num_threads + 1)]);
let workers = Arc::new(ThreadLocal::new());
let task_batch = vec![];

{
// Start counting from 1, as thread_idx 0 is reserved for the main thread.
Expand All @@ -137,8 +135,8 @@ impl MultiThreadedDispatcher {
let mut dispatcher = Self {
wide,
thread_pool,
task_batch,
batch_path: Default::default(),
allocations: Default::default(),
allocation_group: Default::default(),
batch_cost,
task_idx,
flushed,
Expand Down Expand Up @@ -224,8 +222,8 @@ impl MultiThreadedDispatcher {
self.init();
}

let cost = estimate_render_task_cost(&task, self.batch_path.elements());
self.task_batch.push(task);
let cost = estimate_render_task_cost(&task, &self.allocation_group.path);
self.allocation_group.render_tasks.push(task);
self.batch_cost += cost;

if self.batch_cost > COST_THRESHOLD {
Expand All @@ -237,8 +235,6 @@ impl MultiThreadedDispatcher {
self.send_pending_tasks();

self.batch_cost = 0.0;
self.task_batch.clear();
self.batch_path.truncate(0);
}

fn bump_task_idx(&mut self) -> u32 {
Expand All @@ -249,16 +245,12 @@ impl MultiThreadedDispatcher {

fn send_pending_tasks(&mut self) {
let task_idx = self.bump_task_idx();
let tasks = self.task_batch.as_slice();
let path = self.batch_path.elements();
let allocation_group =
std::mem::replace(&mut self.allocation_group, self.allocations.get());
let task_sender = self.task_sender.as_mut().unwrap();
let task = RenderTask {
idx: task_idx,
// TODO: Explore whether the main thread can hold a store of previous allocations so
// we can reuse them. Same for the strips that are returned from
// a child thread.
path: path.into(),
tasks: tasks.into(),
allocation_group,
};
task_sender.send(task).unwrap();
self.run_coarse(true);
Expand All @@ -281,23 +273,25 @@ impl MultiThreadedDispatcher {

loop {
match result_receiver.try_recv() {
Ok(task) => {
for cmd in task.tasks {
Ok(mut task) => {
let num_tasks = task.allocation_group.coarse_tasks.len();
for cmd in task.allocation_group.coarse_tasks.drain(0..num_tasks) {
match cmd {
CoarseTaskType::RenderPath {
strips: strip_range,
paint,
thread_id,
} => self.wide.generate(
&task.strips[strip_range.start as usize..strip_range.end as usize],
paint,
&task.allocation_group.strips
[strip_range.start as usize..strip_range.end as usize],
paint.clone(),
thread_id,
),
CoarseTaskType::RenderWideCommand {
strips,
paint,
thread_id,
} => self.wide.generate(&strips, paint, thread_id),
} => self.wide.generate(&strips, paint.clone(), thread_id),
CoarseTaskType::PushLayer {
thread_id,
clip_path,
Expand All @@ -306,7 +300,7 @@ impl MultiThreadedDispatcher {
opacity,
} => {
let clip_path = clip_path.map(|strip_range| {
&task.strips
&task.allocation_group.strips
[strip_range.start as usize..strip_range.end as usize]
});

Expand All @@ -316,6 +310,9 @@ impl MultiThreadedDispatcher {
CoarseTaskType::PopLayer => self.wide.pop_layer(),
}
}

// Put the allocation group back so it can be reused in future iterations!
self.allocations.put(task.allocation_group);
}
Err(e) => match e {
TryRecvError::Empty => {
Expand Down Expand Up @@ -391,9 +388,9 @@ impl Dispatcher for MultiThreadedDispatcher {
paint: Paint,
aliasing_threshold: Option<u8>,
) {
let start = self.batch_path.elements().len() as u32;
self.batch_path.extend(path);
let end = self.batch_path.elements().len() as u32;
let start = self.allocation_group.path.len() as u32;
self.allocation_group.path.extend(path);
let end = self.allocation_group.path.len() as u32;
self.register_task(RenderTaskType::FillPath {
path_range: start..end,
transform,
Expand All @@ -411,9 +408,9 @@ impl Dispatcher for MultiThreadedDispatcher {
paint: Paint,
aliasing_threshold: Option<u8>,
) {
let start = self.batch_path.elements().len() as u32;
self.batch_path.extend(path);
let end = self.batch_path.elements().len() as u32;
let start = self.allocation_group.path.len() as u32;
self.allocation_group.path.extend(path);
let end = self.allocation_group.path.len() as u32;
self.register_task(RenderTaskType::StrokePath {
path_range: start..end,
transform,
Expand All @@ -434,9 +431,9 @@ impl Dispatcher for MultiThreadedDispatcher {
mask: Option<Mask>,
) {
let mapped_clip = clip_path.map(|c| {
let start = self.batch_path.elements().len() as u32;
self.batch_path.extend(c);
let end = self.batch_path.elements().len() as u32;
let start = self.allocation_group.path.len() as u32;
self.allocation_group.path.extend(c);
let end = self.allocation_group.path.len() as u32;
(start..end, clip_transform)
});

Expand All @@ -456,9 +453,8 @@ impl Dispatcher for MultiThreadedDispatcher {

fn reset(&mut self) {
self.wide.reset();
self.task_batch.clear();
self.allocation_group.clear();
self.batch_cost = 0.0;
self.batch_path.truncate(0);
self.task_idx = 0;
self.flushed = false;
self.task_sender = None;
Expand Down Expand Up @@ -600,11 +596,96 @@ impl Debug for MultiThreadedDispatcher {
}
}

/// A structure that allows storing and fetching existing allocations.
struct AllocationManager<T> {
entries: Vec<Vec<T>>,
}

impl<T> AllocationManager<T> {
/// Get a new vector allocation.
///
/// The vector is guaranteed to have been cleared before.
fn get(&mut self) -> Vec<T> {
self.entries.pop().unwrap_or_default()
}

/// Insert a new allocation in the store.
fn put(&mut self, mut allocation: Vec<T>) {
allocation.clear();
self.entries.push(allocation);
}
}

impl<T> Default for AllocationManager<T> {
fn default() -> Self {
Self { entries: vec![] }
}
}

/// A structure to keep track of allocations that will be done while rendering with
/// multi-threading.
#[derive(Default)]
struct Allocations {
/// The render tasks of a batch. They will be filled by the main thread as new fill/stroke commands
/// come in and consumed by worker threads as they process them.
render_tasks: AllocationManager<RenderTaskType>,
/// The path store of a batch. It will be filled by the main thread as new commands come in
/// and be used by the worker thread to generate the strips of a path.
paths: AllocationManager<PathEl>,
/// Stores allocations that are used by the worker thread to produce strips. They will be
/// sent back to the main thread which then uses them for coarse rasterization.
strips: AllocationManager<Strip>,
/// The coarse tasks produced by a worker thread, which will be processed by the main thread.
coarse_tasks: AllocationManager<CoarseTaskType>,
}

impl Allocations {
/// Return a new allocation group.
///
/// The group is guaranteed to have been cleared.
fn get(&mut self) -> AllocationGroup {
let render_tasks = self.render_tasks.get();
let path = self.paths.get();
let strips = self.strips.get();
let coarse_tasks = self.coarse_tasks.get();

AllocationGroup {
path,
render_tasks,
coarse_tasks,
strips,
}
}

fn put(&mut self, allocation: AllocationGroup) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could assert that we haven't allocated a huge number of stored allocations (to help detect leaks due to regression) or test that 1 send causes a get and 1 run_coarse causes a put

self.render_tasks.put(allocation.render_tasks);
self.paths.put(allocation.path);
self.strips.put(allocation.strips);
self.coarse_tasks.put(allocation.coarse_tasks);
}
}

#[derive(Default, Debug)]
pub(crate) struct AllocationGroup {
pub(crate) path: Vec<PathEl>,
pub(crate) render_tasks: Vec<RenderTaskType>,
pub(crate) strips: Vec<Strip>,
pub(crate) coarse_tasks: Vec<CoarseTaskType>,
}

impl AllocationGroup {
fn clear(&mut self) {
self.path.clear();
self.render_tasks.clear();
self.strips.clear();
self.coarse_tasks.clear();
}
}

#[derive(Debug)]
pub(crate) struct RenderTask {
pub(crate) idx: u32,
pub(crate) path: Box<[PathEl]>,
pub(crate) tasks: Box<[RenderTaskType]>,
pub(crate) allocation_group: AllocationGroup,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -640,10 +721,10 @@ pub(crate) enum RenderTaskType {
}

pub(crate) struct CoarseTask {
pub(crate) strips: Box<[Strip]>,
pub(crate) tasks: Vec<CoarseTaskType>,
allocation_group: AllocationGroup,
}

#[derive(Debug)]
pub(crate) enum CoarseTaskType {
RenderPath {
thread_id: u8,
Expand Down
Loading
Loading