Ucx process bootstrap#598
Conversation
55ec34e to
6cefdcc
Compare
|
@kaiyingshan can you send this commit 19d17b7 as a separate PR to the main branch? |
| redis->lpush("ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(rank), | ||
| "0"); | ||
|
|
||
| for (int i = 0; i < world_size; i++) { |
There was a problem hiding this comment.
we might need a barrier like mechanism here to guarantee that all processes have completed pushing data.
| void *coll_info, void **req) { | ||
| int world_size = ((UCXUCCCommunicator*) coll_info)->GetWorldSize(); | ||
| int rank = ((UCXUCCCommunicator*) coll_info)->GetRank(); | ||
| int num_comm = ((UCXUCCCommunicator *)coll_info)->num_oob_allgather; |
There was a problem hiding this comment.
why do we need this num_comm?
| this->displacements_ = new std::vector<std::vector<int>>(num_buffers); | ||
| this->all_recv_counts_ = new std::vector<std::vector<int>>(num_buffers); |
There was a problem hiding this comment.
I dont think you need to malloc for a vector object. You can simply do this,
this->displacements_ = std::vector<std::vector<int>>(num_buffers);
this->all_recv_counts_ = std::vector<std::vector<int>>(num_buffers);There was a problem hiding this comment.
Oh I think I understand why you did this. This is because of the const in GatherBufferSizes method, right?
|
|
||
| args.mask = 0; | ||
| args.coll_type = UCC_COLL_TYPE_GATHER; | ||
| args.coll_type = UCC_COLL_TYPE_ALLGATHER; |
There was a problem hiding this comment.
put a TODO and refer to the github issue.
Add a comment explaining why we are using allgather
| int sum = 0; | ||
| auto& recv_counts_ = (*all_recv_counts_)[buf_idx]; | ||
| for(auto count: recv_counts_) { | ||
| sum += count; | ||
| } |
There was a problem hiding this comment.
use std::accumulate
| for (int32_t i = 0; i < num_buffers; ++i) { | ||
| (*all_recv_counts_)[i] = cylon::net::receiveCounts(all_buffer_sizes, i, | ||
| num_buffers, world_size); | ||
| (*displacements_)[i] = std::move(cylon::net::displacementsPerBuffer(all_buffer_sizes, i, |
There was a problem hiding this comment.
don't think you need the move here.
| if(rank == gather_root) { | ||
| args.dst.info.buffer = rcv_data; | ||
| args.dst.info.count = num_buffers * world_size; | ||
| args.dst.info.count = total_sz; | ||
| args.dst.info.datatype = UCC_DT_INT32; | ||
| args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST; | ||
| } else { | ||
| all_buffer_sizes.resize(total_sz); | ||
| args.dst.info.buffer = all_buffer_sizes.data(); | ||
| args.dst.info.count = total_sz; | ||
| args.dst.info.datatype = UCC_DT_INT32; | ||
| args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST; | ||
| } |
There was a problem hiding this comment.
can we do this?
if(rank == gather_root) {
args.dst.info.buffer = rcv_data;
} else {
all_buffer_sizes.resize(total_sz);
args.dst.info.buffer = all_buffer_sizes.data();
}
args.dst.info.count = total_sz;
args.dst.info.datatype = UCC_DT_INT32;
args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST;| args.dst.info_v.counts = (ucc_count_t *)(*all_recv_counts_)[buf_idx].data(); | ||
| args.dst.info_v.displacements = (ucc_aint_t *)(*displacements_)[buf_idx].data(); | ||
| args.dst.info_v.datatype = UCC_DT_UINT8; | ||
| args.dst.info_v.mem_type = UCC_MEMORY_TYPE_HOST; |
There was a problem hiding this comment.
I actually think we dont need these! we just need to pass recv_data_placeholder buffer and make sure everyone (other than the root) copies data to this dummy buffer. So you might not even need to track all_recv_counts_ and displacements_.
| std::vector<std::vector<int>>* displacements_; | ||
| std::vector<std::vector<int>>* all_recv_counts_; |
There was a problem hiding this comment.
I have a feeling that you might not even need these. I've explained it in a previous comment
#594