-
-
Notifications
You must be signed in to change notification settings - Fork 8.9k
Optimization of data initialization for large sparce datasets #11390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 28 commits
11d4b63
4589533
3464f8c
e211ab9
9221573
0a793e3
e249a3b
1be6f5d
396f4b3
55a89d7
8a15c70
085627f
b1e714f
70fd6bc
edef9e7
606c537
6f885b0
c0dbd7e
1cb3693
560a67a
0ac338e
61b3878
98ef541
2b090e6
9f5ba75
8cdd7db
15aa65b
58920a0
0b7037a
820b79a
6d553fb
ea5c4fe
7dc15f2
390efd1
3250cc0
a8df33b
0fcd8a7
e1d0ae9
9f1d131
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -13,6 +13,7 @@ | |||||||
| #include <cstdint> // for uint8_t | ||||||||
| #include <limits> | ||||||||
| #include <memory> | ||||||||
| #include <vector> | ||||||||
| #include <type_traits> // for enable_if_t, is_same_v, is_signed_v | ||||||||
|
|
||||||||
| #include "../data/adapter.h" | ||||||||
|
|
@@ -144,40 +145,83 @@ class DenseColumnIter : public Column<BinIdxT> { | |||||||
| * in a column is below the threshold it's classified as dense column. | ||||||||
| */ | ||||||||
| class ColumnMatrix { | ||||||||
| /** | ||||||||
| * @brief A bit set for indicating whether an element in a dense column is missing. | ||||||||
| */ | ||||||||
| /** | ||||||||
| * @brief A bit set for indicating whether an element in a dense column is missing. | ||||||||
| * Access is carefully managed to ensure thread safety during parallel operations. | ||||||||
| */ | ||||||||
| struct MissingIndicator { | ||||||||
| using BitFieldT = LBitField32; | ||||||||
| using T = typename BitFieldT::value_type; | ||||||||
|
|
||||||||
| BitFieldT missing; | ||||||||
| RefResourceView<T> storage; | ||||||||
| // Feature offset padded to allow concurrent access. | ||||||||
| RefResourceView<std::size_t> feature_offsets_padded; | ||||||||
| static_assert(std::is_same_v<T, std::uint32_t>); | ||||||||
|
|
||||||||
| template <typename U> | ||||||||
| [[nodiscard]] std::enable_if_t<!std::is_signed_v<U>, U> static InitValue(bool init) { | ||||||||
| return init ? ~U{0} : U{0}; | ||||||||
| } | ||||||||
|
|
||||||||
| /** | ||||||||
| * @param feature_offsets Offest of the first element for each feature | ||||||||
| * @param type Type of each column (Dense or Sparse). | ||||||||
| */ | ||||||||
| void InitOffsetsPadded(const RefResourceView<std::size_t>& feature_offsets, | ||||||||
| const RefResourceView<ColumnType>& type) { | ||||||||
| if (feature_offsets_padded.size() != feature_offsets.size()) { | ||||||||
| CHECK(feature_offsets_padded.empty()); | ||||||||
| feature_offsets_padded = common::MakeFixedVecWithMalloc(feature_offsets.size(), | ||||||||
| std::size_t{0}); | ||||||||
| } | ||||||||
|
|
||||||||
| /* | ||||||||
| * For missing indicator feature offsets are alligned to be a factor of | ||||||||
| * BitFieldT::kValueSize (4 bytes). | ||||||||
| * This is critical requariment for thread-safe access to bitfield. | ||||||||
| * Each word processed by one thread. | ||||||||
| */ | ||||||||
| for (std::size_t fid = 1; fid < feature_offsets.size(); ++fid) { | ||||||||
| if (type[fid - 1] == ColumnType::kDenseColumn) { | ||||||||
| std::size_t n_rows = feature_offsets[fid] - feature_offsets[fid - 1]; | ||||||||
| std::size_t n_rows_padded = | ||||||||
| DivRoundUp(n_rows, BitFieldT::kValueSize) * BitFieldT::kValueSize; | ||||||||
| feature_offsets_padded[fid] = feature_offsets_padded[fid - 1] + n_rows_padded; | ||||||||
| } else { | ||||||||
| feature_offsets_padded[fid] = feature_offsets_padded[fid - 1]; | ||||||||
| } | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| MissingIndicator() = default; | ||||||||
| /** | ||||||||
| * @param n_elements Size of the bit set | ||||||||
| * @param init Initialize the indicator to true or false. | ||||||||
| */ | ||||||||
| MissingIndicator(std::size_t n_elements, bool init) { | ||||||||
| MissingIndicator(const RefResourceView<std::size_t>& feature_offsets, | ||||||||
| const RefResourceView<ColumnType>& type, bool init) { | ||||||||
| this->InitOffsetsPadded(feature_offsets, type); | ||||||||
| size_t n_elements = feature_offsets_padded.back(); | ||||||||
| auto m_size = missing.ComputeStorageSize(n_elements); | ||||||||
| storage = common::MakeFixedVecWithMalloc(m_size, InitValue<T>(init)); | ||||||||
| this->InitView(); | ||||||||
| } | ||||||||
| /** @brief Set the i^th element to be a valid element (instead of missing). */ | ||||||||
| void SetValid(typename LBitField32::index_type i) { missing.Clear(i); } | ||||||||
| /** @brief Set the i^th element corresponding to feature fid | ||||||||
| * to be a valid element (instead of missing). */ | ||||||||
| void SetValid(typename LBitField32::index_type i, std::size_t fid) { | ||||||||
| missing.Clear(feature_offsets_padded[fid] + i); | ||||||||
| } | ||||||||
| /** @brief assign the storage to the view. */ | ||||||||
| void InitView() { | ||||||||
| missing = LBitField32{Span{storage.data(), static_cast<size_t>(storage.size())}}; | ||||||||
| } | ||||||||
|
|
||||||||
| void GrowTo(std::size_t n_elements, bool init) { | ||||||||
| void GrowTo(const RefResourceView<std::size_t>& feature_offsets, | ||||||||
| const RefResourceView<ColumnType>& type, bool init) { | ||||||||
| this->InitOffsetsPadded(feature_offsets, type); | ||||||||
| size_t n_elements = feature_offsets_padded.back(); | ||||||||
|
|
||||||||
| CHECK(storage.Resource()->Type() == ResourceHandler::kMalloc) | ||||||||
| << "[Internal Error]: Cannot grow the vector when external memory is used."; | ||||||||
| auto m_size = missing.ComputeStorageSize(n_elements); | ||||||||
|
|
@@ -195,34 +239,42 @@ class ColumnMatrix { | |||||||
| } | ||||||||
| }; | ||||||||
|
|
||||||||
| void InitStorage(GHistIndexMatrix const& gmat, double sparse_threshold); | ||||||||
| void InitStorage(GHistIndexMatrix const& gmat, double sparse_threshold, int n_threads); | ||||||||
|
|
||||||||
| template <typename ColumnBinT, typename BinT, typename RIdx> | ||||||||
| void SetBinSparse(BinT bin_id, RIdx rid, bst_feature_t fid, ColumnBinT* local_index) { | ||||||||
| ColumnBinT* begin = &local_index[feature_offsets_[fid]]; | ||||||||
| if (type_[fid] == kDenseColumn) { | ||||||||
| ColumnBinT* begin = &local_index[feature_offsets_[fid]]; | ||||||||
| begin[rid] = bin_id - index_base_[fid]; | ||||||||
| // not thread-safe with bit field. | ||||||||
| // FIXME(jiamingy): We can directly assign kMissingId to the index to avoid missing | ||||||||
| // flags. | ||||||||
| missing_.SetValid(feature_offsets_[fid] + rid); | ||||||||
| missing_.SetValid(rid, fid); | ||||||||
| } else { | ||||||||
| ColumnBinT* begin = &local_index[feature_offsets_[fid]]; | ||||||||
| begin[num_nonzeros_[fid]] = bin_id - index_base_[fid]; | ||||||||
| row_ind_[feature_offsets_[fid] + num_nonzeros_[fid]] = rid; | ||||||||
| ++num_nonzeros_[fid]; | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| template <typename ColumnBinT, typename BinT, typename RIdx> | ||||||||
| void SetBinSparse(BinT bin_id, RIdx rid, bst_feature_t fid, ColumnBinT* local_index, size_t nnz) { | ||||||||
| ColumnBinT* begin = &local_index[feature_offsets_[fid]]; | ||||||||
| if (type_[fid] == kDenseColumn) { | ||||||||
| begin[rid] = bin_id - index_base_[fid]; | ||||||||
| missing_.SetValid(rid, fid); | ||||||||
| } else { | ||||||||
| begin[nnz] = bin_id - index_base_[fid]; | ||||||||
| row_ind_[feature_offsets_[fid] + nnz] = rid; | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| public: | ||||||||
| // get number of features | ||||||||
| [[nodiscard]] bst_feature_t GetNumFeature() const { | ||||||||
| return static_cast<bst_feature_t>(type_.size()); | ||||||||
| } | ||||||||
|
|
||||||||
| ColumnMatrix() = default; | ||||||||
| ColumnMatrix(GHistIndexMatrix const& gmat, double sparse_threshold) { | ||||||||
| this->InitStorage(gmat, sparse_threshold); | ||||||||
| ColumnMatrix(GHistIndexMatrix const& gmat, double sparse_threshold, int n_threads) { | ||||||||
| this->InitStorage(gmat, sparse_threshold, n_threads); | ||||||||
| } | ||||||||
|
|
||||||||
| /** | ||||||||
|
|
@@ -232,7 +284,7 @@ class ColumnMatrix { | |||||||
| void InitFromSparse(SparsePage const& page, const GHistIndexMatrix& gmat, double sparse_threshold, | ||||||||
| int32_t n_threads) { | ||||||||
| auto batch = data::SparsePageAdapterBatch{page.GetView()}; | ||||||||
| this->InitStorage(gmat, sparse_threshold); | ||||||||
| this->InitStorage(gmat, sparse_threshold, n_threads); | ||||||||
| // ignore base row id here as we always has one column matrix for each sparse page. | ||||||||
| this->PushBatch(n_threads, batch, std::numeric_limits<float>::quiet_NaN(), gmat, 0); | ||||||||
| } | ||||||||
|
|
@@ -283,7 +335,7 @@ class ColumnMatrix { | |||||||
| SetIndexNoMissing(base_rowid, gmat.index.data<RowBinIdxT>(), size, n_features, n_threads); | ||||||||
| }); | ||||||||
| } else { | ||||||||
| SetIndexMixedColumns(base_rowid, batch, gmat, missing); | ||||||||
| SetIndexMixedColumns(base_rowid, batch, gmat, missing, n_threads); | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
|
|
@@ -316,16 +368,21 @@ class ColumnMatrix { | |||||||
| common::Span<const BinIdxType> bin_index = { | ||||||||
| reinterpret_cast<const BinIdxType*>(&index_[feature_offset * bins_type_size_]), | ||||||||
| column_size}; | ||||||||
| /* | ||||||||
| * Pass the pre-calculated starting offset missing_.feature_offsets_expand[fidx] | ||||||||
| * in the bitfield for this specific feature (fidx). | ||||||||
| */ | ||||||||
| return DenseColumnIter<BinIdxType, any_missing>{ | ||||||||
| bin_index, static_cast<bst_bin_t>(index_base_[fidx]), missing_.missing, feature_offset}; | ||||||||
| bin_index, static_cast<bst_bin_t>(index_base_[fidx]), missing_.missing, | ||||||||
| missing_.feature_offsets_padded[fidx]}; | ||||||||
| } | ||||||||
|
|
||||||||
| // all columns are dense column and has no missing value | ||||||||
| // FIXME(jiamingy): We don't need a column matrix if there's no missing value. | ||||||||
| template <typename RowBinIdxT> | ||||||||
| void SetIndexNoMissing(bst_idx_t base_rowid, RowBinIdxT const* row_index, const size_t n_samples, | ||||||||
| const size_t n_features, int32_t n_threads) { | ||||||||
| missing_.GrowTo(feature_offsets_[n_features], false); | ||||||||
| missing_.GrowTo(feature_offsets_, type_, false); | ||||||||
|
|
||||||||
| DispatchBinType(bins_type_size_, [&](auto t) { | ||||||||
| using ColumnBinT = decltype(t); | ||||||||
|
|
@@ -348,11 +405,11 @@ class ColumnMatrix { | |||||||
| * \brief Set column index for both dense and sparse columns | ||||||||
| */ | ||||||||
| template <typename Batch> | ||||||||
| void SetIndexMixedColumns(size_t base_rowid, Batch const& batch, const GHistIndexMatrix& gmat, | ||||||||
| float missing) { | ||||||||
| void SetIndexMixedColumns(bst_idx_t base_rowid, Batch const& batch, const GHistIndexMatrix& gmat, | ||||||||
| float missing, int n_threads) { | ||||||||
| auto n_features = gmat.Features(); | ||||||||
|
|
||||||||
| missing_.GrowTo(feature_offsets_[n_features], true); | ||||||||
| missing_.GrowTo(feature_offsets_, type_, true); | ||||||||
| auto const* row_index = gmat.index.data<std::uint32_t>() + gmat.row_ptr[base_rowid]; | ||||||||
| if (num_nonzeros_.empty()) { | ||||||||
| num_nonzeros_ = common::MakeFixedVecWithMalloc(n_features, std::size_t{0}); | ||||||||
|
|
@@ -366,19 +423,100 @@ class ColumnMatrix { | |||||||
| using ColumnBinT = decltype(t); | ||||||||
| ColumnBinT* local_index = reinterpret_cast<ColumnBinT*>(index_.data()); | ||||||||
| size_t const batch_size = batch.Size(); | ||||||||
| size_t k{0}; | ||||||||
| for (size_t rid = 0; rid < batch_size; ++rid) { | ||||||||
| auto line = batch.GetLine(rid); | ||||||||
| for (size_t i = 0; i < line.Size(); ++i) { | ||||||||
| auto coo = line.GetElement(i); | ||||||||
| if (is_valid(coo)) { | ||||||||
| auto fid = coo.column_idx; | ||||||||
| const uint32_t bin_id = row_index[k]; | ||||||||
| SetBinSparse(bin_id, rid + base_rowid, fid, local_index); | ||||||||
| ++k; | ||||||||
|
|
||||||||
| // Parallel sparse batch processing | ||||||||
|
razdoburdin marked this conversation as resolved.
Outdated
|
||||||||
| dmlc::OMPException exc; | ||||||||
|
trivialfis marked this conversation as resolved.
|
||||||||
| std::vector<size_t> n_elements((n_threads + 1) * n_features, 0); | ||||||||
| std::vector<size_t> k_offsets(n_threads + 1, 0); | ||||||||
|
razdoburdin marked this conversation as resolved.
Outdated
|
||||||||
| size_t block_size = DivRoundUp(batch_size, n_threads); | ||||||||
|
|
||||||||
| /* | ||||||||
| * We use bitfield as a missing indicator. To ensure thread safe access to the bitfield | ||||||||
| * each underlying word of the bitfiled should be processed by a single thread. | ||||||||
| * So we need to align the row-blocks. | ||||||||
| */ | ||||||||
| block_size = DivRoundUp(block_size, MissingIndicator::BitFieldT::kValueSize) * | ||||||||
| MissingIndicator::BitFieldT::kValueSize; | ||||||||
|
razdoburdin marked this conversation as resolved.
Outdated
|
||||||||
| /* | ||||||||
| * If base_rowid > 0 we need to shift the blocks boundaries. | ||||||||
| * Otherwise the two threads may operate with the single word of bitfield. | ||||||||
| */ | ||||||||
| size_t shift = MissingIndicator::BitFieldT::kValueSize - | ||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't quite understand how this shifting works. Could you please help clarify it? For starters, this should represent the number of samples each thread needs to shift. How is it related to the bit field value size? How is it related to the module? Why set it to 0 when it equals the value size?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
if For instance, if the first batch had 35 rows,
If A have modified the comment to make this logic more clear.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are talking about the external memory in XGBoost. Is the bitfield shared across multiple batches of data? Otherwise, the
should not be true since there should be a different column matrix for each batch.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In the original code, the bitfield is allocated for the total amount of elements, but each batch uses it's own part. I didn't touch this part in my PR.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me do some digging tomorrow.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the two different cases, see:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. In case of external memory the function call goes the following:
xgboost/src/data/gradient_index.cc Line 122 in e4406da
xgboost/src/common/column_matrix.h Line 287 in e4406da
So, the value of
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me do more tests, probably need some cleanup/comments.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we postpone this PR a little bit? It's an optimization for data initialization, and I find it quite difficult to understand. I will merge the inference PR.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can postpone it, but I think it shouldn't be postponed for a long time. Overwise, resolving of merge conflicts will be extremely tricky. |
||||||||
| (base_rowid % MissingIndicator::BitFieldT::kValueSize); | ||||||||
| if (shift == MissingIndicator::BitFieldT::kValueSize) { | ||||||||
| shift = 0; | ||||||||
| } | ||||||||
|
|
||||||||
| // Parallel row processing for thread-local counting. | ||||||||
| #pragma omp parallel num_threads(n_threads) | ||||||||
| { | ||||||||
| exc.Run([&, is_valid]() { | ||||||||
| int tid = omp_get_thread_num(); | ||||||||
| size_t begin = block_size * tid; | ||||||||
| size_t end = std::min(begin + shift + block_size, batch_size); | ||||||||
| // Apply shift for threads > 0 to maintain word alignment across blocks. | ||||||||
| if (tid > 0) { | ||||||||
| begin += shift; | ||||||||
| } | ||||||||
| for (size_t rid = begin; rid < end; ++rid) { | ||||||||
| const auto& line = batch.GetLine(rid); | ||||||||
| for (size_t i = 0; i < line.Size(); ++i) { | ||||||||
| auto coo = line.GetElement(i); | ||||||||
| if (is_valid(coo)) { | ||||||||
| auto fid = coo.column_idx; | ||||||||
| if ((type_[fid] != kDenseColumn)) { | ||||||||
| n_elements[(tid + 1) * n_features + fid] += 1; | ||||||||
| } | ||||||||
| k_offsets[tid + 1] += 1; | ||||||||
| } | ||||||||
| } | ||||||||
| } | ||||||||
| }); | ||||||||
| } | ||||||||
| exc.Rethrow(); | ||||||||
|
|
||||||||
| // Parallel feature processing to aggregate counts & calculate offsets. | ||||||||
|
razdoburdin marked this conversation as resolved.
|
||||||||
| ParallelFor(n_features, n_threads, [&](auto fid) { | ||||||||
| n_elements[fid] += num_nonzeros_[fid]; | ||||||||
| for (int tid = 0; tid < n_threads; ++tid) { | ||||||||
| n_elements[(tid + 1) * n_features + fid] += | ||||||||
| n_elements[tid * n_features + fid]; | ||||||||
| } | ||||||||
| num_nonzeros_[fid] = n_elements[n_threads * n_features + fid]; | ||||||||
| }); | ||||||||
| std::partial_sum(k_offsets.cbegin(), k_offsets.cend(), k_offsets.begin()); | ||||||||
|
|
||||||||
| // Parallel row processing to place data using offsets into sparse structure. | ||||||||
| #pragma omp parallel num_threads(n_threads) | ||||||||
| { | ||||||||
| std::vector<size_t> nnz_offsets(n_features, 0); | ||||||||
|
razdoburdin marked this conversation as resolved.
|
||||||||
| exc.Run([&, is_valid, base_rowid, row_index]() { | ||||||||
| int tid = omp_get_thread_num(); | ||||||||
| size_t begin = block_size * tid; | ||||||||
| size_t end = std::min(begin + shift + block_size, batch_size); | ||||||||
| // Apply shift for threads > 0 to maintain word alignment across blocks. | ||||||||
| if (tid > 0) { | ||||||||
| begin += shift; | ||||||||
| } | ||||||||
|
|
||||||||
| size_t k = 0; | ||||||||
| for (size_t rid = begin; rid < end; ++rid) { | ||||||||
| const auto& line = batch.GetLine(rid); | ||||||||
| for (size_t i = 0; i < line.Size(); ++i) { | ||||||||
| auto coo = line.GetElement(i); | ||||||||
| if (is_valid(coo)) { | ||||||||
| auto fid = coo.column_idx; | ||||||||
| const uint32_t bin_id = row_index[k_offsets[tid] + k]; | ||||||||
| size_t nnz = n_elements[tid * n_features + fid] + nnz_offsets[fid]; | ||||||||
|
razdoburdin marked this conversation as resolved.
|
||||||||
| SetBinSparse(bin_id, rid + base_rowid, fid, local_index, nnz); | ||||||||
| ++k; | ||||||||
| nnz_offsets[fid] += (type_[fid] != kDenseColumn); | ||||||||
| } | ||||||||
| } | ||||||||
| } | ||||||||
| }); | ||||||||
| } | ||||||||
| exc.Rethrow(); | ||||||||
| }); | ||||||||
| } | ||||||||
|
|
||||||||
|
|
@@ -389,7 +527,7 @@ class ColumnMatrix { | |||||||
| void SetIndexMixedColumns(const GHistIndexMatrix& gmat) { | ||||||||
| auto n_features = gmat.Features(); | ||||||||
|
|
||||||||
| missing_ = MissingIndicator{feature_offsets_[n_features], true}; | ||||||||
| missing_ = MissingIndicator{feature_offsets_, type_, true}; | ||||||||
| num_nonzeros_ = common::MakeFixedVecWithMalloc(n_features, std::size_t{0}); | ||||||||
|
|
||||||||
| DispatchBinType(bins_type_size_, [&](auto t) { | ||||||||
|
|
||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this function still used now that we have a new
SetBinSparse?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original SetBinSparse is also used