Skip to content

Commit dcb1f7a

Browse files
committed
Tako visibility refactoring
1 parent 3dad4a1 commit dcb1f7a

File tree

17 files changed

+41
-134
lines changed

17 files changed

+41
-134
lines changed

crates/tako/src/control.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl ServerRef {
8484

8585
pub fn stop_worker(&self, worker_id: WorkerId) -> crate::Result<()> {
8686
let mut core = self.core_ref.get_mut();
87-
if let Some(ref mut worker) = core.get_worker_mut(worker_id) {
87+
if let Some(ref mut worker) = core.find_worker_mut(worker_id) {
8888
if !worker.is_stopping() {
8989
worker.set_stop(LostWorkerReason::Stopped);
9090
let mut comm = self.comm_ref.get_mut();

crates/tako/src/internal/common/resources/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ pub use descriptor::{
1111
ResourceDescriptor, ResourceDescriptorCoupling, ResourceDescriptorCouplingItem,
1212
ResourceDescriptorItem, ResourceDescriptorKind,
1313
};
14+
1415
pub use map::{
1516
AMD_GPU_RESOURCE_NAME, CPU_RESOURCE_ID, CPU_RESOURCE_NAME, GlobalResourceMapping,
16-
MEM_RESOURCE_NAME, NVIDIA_GPU_RESOURCE_NAME, ResourceRqMap,
17+
MEM_RESOURCE_NAME, NVIDIA_GPU_RESOURCE_NAME,
1718
};
19+
1820
pub use request::{
1921
AllocationRequest, ResourceAllocRequest, ResourceRequest, ResourceRequestEntries,
2022
ResourceRequestVariants, TimeRequest,

crates/tako/src/internal/common/stablemap.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ where
8686
}
8787

8888
#[inline]
89+
#[allow(dead_code)]
8990
pub fn get<Q>(&self, key: &Q) -> &V
9091
where
9192
K: Borrow<Q>,

crates/tako/src/internal/scheduler/gap_cache.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,10 @@ fn compute_gap(
160160
#[cfg(test)]
161161
mod tests {
162162
use super::*;
163-
164-
163+
165164
use crate::tests::utils::env::TestEnv;
166165
use crate::tests::utils::resources::ResBuilder;
167-
166+
168167
use crate::tests::utils::worker::WorkerBuilder;
169168

170169
#[test]

crates/tako/src/internal/scheduler/main.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,3 @@ pub(crate) fn run_scheduling(core: &mut Core, comm: &mut CommSender, now: Instan
5353
);
5454
comm.reset_scheduling_flag();
5555
}
56-
57-
/*pub(crate) fn collect_assigned_not_running_tasks(core: &mut Core) -> Vec<TaskId> {
58-
let mut result = Vec::new();
59-
for worker in core.get_workers_mut() {
60-
worker.collect_assigned_non_running_tasks(&mut result);
61-
}
62-
result
63-
}*/

crates/tako/src/internal/scheduler/mapping.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ use std::cmp::Reverse;
88

99
#[derive(Debug, Default)]
1010
#[cfg_attr(test, derive(Clone, PartialEq, PartialOrd, Eq, Ord))]
11-
pub struct WorkerTaskUpdate {
11+
pub(crate) struct WorkerTaskUpdate {
1212
pub(crate) assigned: Vec<(TaskId, ResourceVariantId)>,
1313
pub(crate) prefills: Vec<TaskId>,
1414
pub(crate) retracts: Vec<TaskId>,
1515
}
1616

1717
#[derive(Debug, Default)]
18-
pub struct WorkerTaskMapping {
18+
pub(crate) struct WorkerTaskMapping {
1919
pub(crate) workers: Map<WorkerId, WorkerTaskUpdate>,
2020
pub(crate) mn_tasks_to_workers: Vec<TaskId>,
2121
}

crates/tako/src/internal/server/core.rs

Lines changed: 9 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub(crate) struct CoreSplit<'a> {
3939
}
4040

4141
#[derive(Default)]
42-
pub struct Core {
42+
pub(crate) struct Core {
4343
tasks: TaskMap,
4444
workers: WorkerMap,
4545
resource_map: GlobalResourceMapping,
@@ -109,12 +109,12 @@ impl Core {
109109
}
110110
}
111111

112-
pub fn new_worker_id(&mut self) -> WorkerId {
112+
pub(crate) fn new_worker_id(&mut self) -> WorkerId {
113113
self.worker_id_counter += 1;
114114
WorkerId::new(self.worker_id_counter)
115115
}
116116

117-
pub fn worker_counter(&self) -> u32 {
117+
pub(crate) fn worker_counter(&self) -> u32 {
118118
self.worker_id_counter
119119
}
120120

@@ -175,26 +175,22 @@ impl Core {
175175
}
176176

177177
#[inline]
178-
pub fn get_worker_by_id(&self, id: WorkerId) -> Option<&Worker> {
179-
self.workers.get(&id)
180-
}
181-
182-
#[inline]
183-
pub fn get_worker_by_id_or_panic(&self, id: WorkerId) -> &Worker {
178+
pub fn get_worker(&self, id: WorkerId) -> &Worker {
184179
self.workers.get(&id).unwrap_or_else(|| {
185180
panic!("Asking for invalid worker id={id}");
186181
})
187182
}
188183

189184
#[inline]
190-
pub fn get_worker_mut_by_id_or_panic(&mut self, id: WorkerId) -> &mut Worker {
185+
#[cfg(test)]
186+
pub fn get_worker_mut(&mut self, id: WorkerId) -> &mut Worker {
191187
self.workers.get_mut(&id).unwrap_or_else(|| {
192188
panic!("Asking for invalid worker id={id}");
193189
})
194190
}
195191

196192
#[inline]
197-
pub fn get_worker_mut(&mut self, id: WorkerId) -> Option<&mut Worker> {
193+
pub fn find_worker_mut(&mut self, id: WorkerId) -> Option<&mut Worker> {
198194
self.workers.get_mut(&id)
199195
}
200196

@@ -203,21 +199,11 @@ impl Core {
203199
self.workers.values()
204200
}
205201

206-
#[inline]
207-
pub fn get_workers_mut(&mut self) -> impl Iterator<Item = &mut Worker> {
208-
self.workers.values_mut()
209-
}
210-
211202
#[inline]
212203
pub fn get_worker_map(&self) -> &WorkerMap {
213204
&self.workers
214205
}
215206

216-
#[inline]
217-
pub fn has_workers(&self) -> bool {
218-
!self.workers.is_empty()
219-
}
220-
221207
pub fn add_task(&mut self, task: Task, retracted: &mut Vec<TaskId>) {
222208
if task.is_ready() {
223209
self.task_queues.add_ready_task(&task, retracted);
@@ -260,11 +246,6 @@ impl Core {
260246
&self.tasks
261247
}
262248

263-
#[inline]
264-
pub fn task_map_mut(&mut self) -> &mut TaskMap {
265-
&mut self.tasks
266-
}
267-
268249
#[inline]
269250
pub fn get_task(&self, task_id: TaskId) -> &Task {
270251
self.tasks.get_task(task_id)
@@ -453,13 +434,6 @@ impl Core {
453434
self.resource_map.get_or_create_resource_id(name)
454435
}
455436

456-
pub fn convert_client_resource_rq(
457-
&mut self,
458-
resources: &crate::gateway::ResourceRequestVariants,
459-
) -> ResourceRequestVariants {
460-
self.resource_map.convert_client_resource_rq(resources)
461-
}
462-
463437
#[inline]
464438
pub fn resource_map(&self) -> &GlobalResourceMapping {
465439
&self.resource_map
@@ -511,13 +485,10 @@ impl Core {
511485
mod tests {
512486
use crate::internal::server::core::Core;
513487
use crate::internal::server::task::Task;
514-
515-
use crate::internal::server::worker::Worker;
516-
use crate::internal::server::workergroup::WorkerGroup;
517488

518-
489+
use crate::internal::server::workergroup::WorkerGroup;
519490

520-
use crate::{TaskId, WorkerId};
491+
use crate::TaskId;
521492

522493
impl Core {
523494
pub fn worker_group(&self, group_name: &str) -> Option<&WorkerGroup> {
@@ -532,18 +503,6 @@ mod tests {
532503
}
533504
}
534505

535-
pub fn assert_worker_condition<F: Fn(&Worker) -> bool>(
536-
&self,
537-
worker_ids: &[WorkerId],
538-
op: F,
539-
) {
540-
for worker_id in worker_ids {
541-
if !op(self.get_worker_by_id_or_panic(*worker_id)) {
542-
panic!("Worker {worker_id} does not satisfy the condition");
543-
}
544-
}
545-
}
546-
547506
pub fn assert_waiting(&self, task_ids: &[TaskId]) {
548507
self.assert_task_condition(task_ids, |t| t.is_waiting());
549508
}

crates/tako/src/internal/server/rpc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ async fn worker_rpc_loop(
269269
let mut core = core_ref.get_mut();
270270
let mut comm = comm_ref.get_mut();
271271
let reason = core
272-
.get_worker_by_id_or_panic(worker_id)
272+
.get_worker(worker_id)
273273
.stop_reason
274274
.map(|(r, _)| r)
275275
.unwrap_or(reason);
@@ -299,7 +299,7 @@ pub(crate) async fn worker_receive_loop<
299299
on_retract_response(&mut core, &mut *comm, worker_id, &msg.retracted);
300300
}
301301
FromWorkerMessage::Heartbeat => {
302-
if let Some(worker) = core.get_worker_mut(worker_id) {
302+
if let Some(worker) = core.find_worker_mut(worker_id) {
303303
log::trace!("Heartbeat received, worker={worker_id}");
304304
worker.last_heartbeat = Instant::now();
305305
};

crates/tako/src/internal/tests/test_reactor.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -523,12 +523,7 @@ fn test_task_mn_fail() {
523523
comm.emptiness_check();
524524
assert!(rt.core().find_task(1.into()).is_none());
525525
for w in &ws {
526-
assert!(
527-
rt.core()
528-
.get_worker_by_id_or_panic((*w).into())
529-
.mn_assignment()
530-
.is_none()
531-
);
526+
assert!(rt.core().get_worker((*w).into()).mn_assignment().is_none());
532527
}
533528
}
534529

@@ -645,7 +640,7 @@ fn lost_worker_with_running_and_assign_tasks() {
645640
}
646641

647642
fn check_worker_tasks_exact(core: &Core, worker_id: WorkerId, tasks: &[TaskId]) {
648-
let worker = core.get_worker_by_id_or_panic(worker_id.into());
643+
let worker = core.get_worker(worker_id.into());
649644
let sn = worker.sn_assignment().unwrap();
650645
assert_eq!(sn.assign_tasks.len(), tasks.len());
651646
for task in tasks {
@@ -654,7 +649,7 @@ fn check_worker_tasks_exact(core: &Core, worker_id: WorkerId, tasks: &[TaskId])
654649
}
655650

656651
fn worker_has_task(core: &Core, worker_id: WorkerId, task_id: TaskId) -> bool {
657-
core.get_worker_by_id_or_panic(worker_id.into())
652+
core.get_worker(worker_id.into())
658653
.sn_assignment()
659654
.unwrap()
660655
.assign_tasks

crates/tako/src/internal/tests/test_scheduler_mn.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,7 @@ fn test_schedule_mn_simple() {
122122

123123
assert!(!rt.task_exists(t3));
124124
for w in ws3 {
125-
assert!(
126-
rt.core()
127-
.get_worker_by_id_or_panic(w)
128-
.mn_assignment()
129-
.is_none()
130-
);
125+
assert!(rt.core().get_worker(w).mn_assignment().is_none());
131126
}
132127

133128
let mut comm = rt.schedule();

0 commit comments

Comments
 (0)