diff --git a/include/xgboost/data.h b/include/xgboost/data.h index 5afbc8c63c9c..22967caa3690 100644 --- a/include/xgboost/data.h +++ b/include/xgboost/data.h @@ -230,6 +230,7 @@ class MetaInfo { [[nodiscard]] CatContainer const* Cats() const; [[nodiscard]] CatContainer* Cats(); [[nodiscard]] std::shared_ptr CatsShared() const; + [[nodiscard]] std::shared_ptr CatsShared(); /** * @brief Setter for categories. */ @@ -726,6 +727,7 @@ class DMatrix { [[nodiscard]] std::shared_ptr CatsShared() const { return this->Info().CatsShared(); } + [[nodiscard]] std::shared_ptr CatsShared() { return this->Info().CatsShared(); } protected: virtual BatchSet GetRowBatches() = 0; diff --git a/ops/conda_env/aarch64_test.yml b/ops/conda_env/aarch64_test.yml index f62586627602..7277692bc850 100644 --- a/ops/conda_env/aarch64_test.yml +++ b/ops/conda_env/aarch64_test.yml @@ -26,6 +26,7 @@ dependencies: - llvmlite - loky>=3.5.1 - pyarrow +- polars - pyspark>=4.0.0 - cloudpickle - pip: diff --git a/ops/conda_env/macos_cpu_test.yml b/ops/conda_env/macos_cpu_test.yml index ef57ed3af9a1..1b13048260e6 100644 --- a/ops/conda_env/macos_cpu_test.yml +++ b/ops/conda_env/macos_cpu_test.yml @@ -28,4 +28,5 @@ dependencies: - awscli - loky>=3.5.1 - pyarrow +- polars - cloudpickle diff --git a/ops/conda_env/win64_test.yml b/ops/conda_env/win64_test.yml index 45a0db8f2577..a3266cd30ec4 100644 --- a/ops/conda_env/win64_test.yml +++ b/ops/conda_env/win64_test.yml @@ -17,3 +17,4 @@ dependencies: - py-ubjson - loky>=3.5.1 - pyarrow +- polars diff --git a/src/common/categorical.h b/src/common/categorical.h index de9ffe04b7c2..3f02224daa02 100644 --- a/src/common/categorical.h +++ b/src/common/categorical.h @@ -21,6 +21,19 @@ XGBOOST_DEVICE bst_cat_t AsCat(T const& v) { return static_cast(v); } +/** + * @brief Storage size for a CatBitField whose largest valid bit is @p max_code. + * + * Widens to size_t before +1 so max_code near INT32_MAX cannot trigger signed-overflow + * UB on bst_cat_t = int32_t. + * + * @return Storage size in @c CatBitField::value_type units. + */ +[[nodiscard]] inline std::size_t SizeCatBitsForMaxCode(bst_cat_t max_code) { + CHECK_GE(max_code, 0); + return CatBitField::ComputeStorageSize(static_cast(max_code) + 1); +} + /* \brief Whether is fidx a categorical feature. * * \param ft Feature type for all features. diff --git a/src/common/quantile.cc b/src/common/quantile.cc index f17b2d9b7d47..841979402b02 100644 --- a/src/common/quantile.cc +++ b/src/common/quantile.cc @@ -533,14 +533,18 @@ void AddCategories(std::set const &categories, float *max_cat, HistogramC InvalidCategory(); } auto &cut_values = cuts->cut_values_.HostVector(); - // With column-wise data split, the categories may be empty. - auto feature_max_cat = - categories.empty() ? 0.0f : *std::max_element(categories.cbegin(), categories.cend()); + if (categories.empty()) { + // column-wise split: emit a placeholder cut and treat the synthetic 0.0f as the + // observed max so downstream sizing (evaluator.cu MaxCategory()+1) does not see -1 + cut_values.push_back(0.0f); + *max_cat = std::max(*max_cat, 0.0f); + return; + } + auto feature_max_cat = *std::max_element(categories.cbegin(), categories.cend()); CheckMaxCat(feature_max_cat, categories.size()); *max_cat = std::max(*max_cat, feature_max_cat); - for (bst_cat_t i = 0; i <= AsCat(feature_max_cat); ++i) { - cut_values.push_back(i); - } + // one cut per observed code; categories is sorted ascending + cut_values.insert(cut_values.end(), categories.cbegin(), categories.cend()); } HistogramCuts HostSketchContainer::MakeCuts(Context const *ctx, MetaInfo const &info) { diff --git a/src/common/quantile.cu b/src/common/quantile.cu index 3ca2dff9ab30..2eeca0434d5d 100644 --- a/src/common/quantile.cu +++ b/src/common/quantile.cu @@ -705,16 +705,23 @@ HistogramCuts SketchContainer::MakeCuts(Context const *ctx, bool is_column_split auto column = Span{h_entries.data() + begin, end - begin}; if (IsCat(h_feature_types, i)) { - auto column_size = std::max(static_cast(1), column.size()); - auto feature_max = column.empty() ? 0.0f : column.back().value; - if (std::any_of(column.cbegin(), column.cend(), - [](auto const &entry) { return InvalidCat(entry.value); })) { - InvalidCategory(); - } - CheckMaxCat(feature_max, column_size); - max_cat = std::max(max_cat, feature_max); - for (std::size_t cat = 0; cat <= static_cast(feature_max); ++cat) { - h_out_cut_values.push_back(cat); + if (column.empty()) { + // column-split worker with no rows: emit a placeholder cut and treat the + // synthetic 0.0f as observed max so MaxCategory() is never -1 + h_out_cut_values.push_back(0.0f); + max_cat = std::max(max_cat, 0.0f); + } else { + auto feature_max = column.back().value; + if (std::any_of(column.cbegin(), column.cend(), + [](auto const &entry) { return InvalidCat(entry.value); })) { + InvalidCategory(); + } + CheckMaxCat(feature_max, column.size()); + max_cat = std::max(max_cat, feature_max); + // one cut per observed physical code; column sorted ascending + for (auto const &entry : column) { + h_out_cut_values.push_back(entry.value); + } } } else { summary.Reserve(column.size()); diff --git a/src/data/adapter.cc b/src/data/adapter.cc index 58d7e84e7e67..0744a4cfafe6 100644 --- a/src/data/adapter.cc +++ b/src/data/adapter.cc @@ -19,11 +19,12 @@ namespace xgboost::data { namespace { -auto GetRefCats(Json handle) { - auto cats = reinterpret_cast(get(handle)); +// Pair the owning ref CatContainer pointer with its host view +[[nodiscard]] std::pair GetRefCats(Json handle) { + auto cats = reinterpret_cast(get(handle)); CHECK(cats); auto h_cats = cats->HostView(); - return h_cats; + return {cats, h_cats}; } } // anonymous namespace @@ -32,7 +33,9 @@ ColumnarAdapter::ColumnarAdapter(StringView columns) { if (IsA(jdf)) { // Has reference categories. - this->ref_cats_ = GetRefCats(jdf["ref_categories"]); + auto [ref_cats_ptr, ref_cats_view] = GetRefCats(jdf["ref_categories"]); + this->ref_cats_ptr_ = ref_cats_ptr; + this->ref_cats_ = ref_cats_view; jdf = jdf["columns"]; } diff --git a/src/data/adapter.h b/src/data/adapter.h index 88b7650c75bc..d2bfb82de93c 100644 --- a/src/data/adapter.h +++ b/src/data/adapter.h @@ -11,7 +11,8 @@ #include // for uint8_t #include // for numeric_limits #include // for unique_ptr, make_unique -#include // for move +#include // for once_flag, call_once +#include // for move, forward #include // for variant #include // for vector @@ -437,6 +438,11 @@ using EncColumnarAdapterBatch = EncColumnarAdapterBatchImpl; class ColumnarAdapter : public detail::SingleBatchDataIter { std::vector> columns_; enc::HostColumnsView ref_cats_; + // non-owning; pointee outlives the adapter (caller keeps the ref DMatrix alive) + CatContainer* ref_cats_ptr_{nullptr}; + // cached Recode mapping; write-once via CachedRefMapping() + mutable std::once_flag cache_once_; + mutable std::vector cached_ref_mapping_; std::vector cats_; std::vector cat_segments_; ColumnarAdapterBatch batch_; @@ -454,6 +460,12 @@ class ColumnarAdapter : public detail::SingleBatchDataIter */ explicit ColumnarAdapter(StringView columns); + // non-copyable and non-movable (owns std::once_flag) + ColumnarAdapter(ColumnarAdapter const&) = delete; + ColumnarAdapter& operator=(ColumnarAdapter const&) = delete; + ColumnarAdapter(ColumnarAdapter&&) = delete; + ColumnarAdapter& operator=(ColumnarAdapter&&) = delete; + [[nodiscard]] ColumnarAdapterBatch const& Value() const override { return batch_; } [[nodiscard]] bst_idx_t NumRows() const { @@ -474,18 +486,44 @@ class ColumnarAdapter : public detail::SingleBatchDataIter static_cast(this->cat_segments_.back())}; } [[nodiscard]] enc::HostColumnsView RefCats() const { return this->ref_cats_; } + // non-owning; pointee outlives the adapter; non-const so dispatchers can call Sort() + // on the ref CatContainer through a const adapter + [[nodiscard]] CatContainer* RefCatsPtr() const { return this->ref_cats_ptr_; } [[nodiscard]] common::Span const> Columns() const { return this->columns_; } + + /** @brief Cached Recode mapping; first call wins, later calls ignore @p builder. + * + * @warning The returned span aliases adapter storage; lifetime <= adapter. + */ + template + [[nodiscard]] common::Span CachedRefMapping(Fn&& builder) const { + std::call_once(this->cache_once_, + [&] { this->cached_ref_mapping_ = std::forward(builder)(); }); + return common::Span{this->cached_ref_mapping_}; + } }; -inline auto MakeEncColumnarBatch(Context const* ctx, ColumnarAdapter const* adapter) { - auto cats = std::make_unique(adapter->RefCats(), true); - cats->Sort(ctx); - auto [acc, mapping] = cpu_impl::MakeCatAccessor(ctx, adapter->Cats(), cats.get()); - return std::tuple{EncColumnarAdapterBatch{adapter->Columns(), acc}, std::move(mapping)}; +inline EncColumnarAdapterBatch MakeEncColumnarBatch(Context const* ctx, + ColumnarAdapter const* adapter) { + // alias the reference dictionary when available; Sort() is idempotent under sort_mu_ + auto* ref_cats_ptr = adapter->RefCatsPtr(); + if (ref_cats_ptr != nullptr) { + ref_cats_ptr->Sort(ctx); + auto cached = adapter->CachedRefMapping([&] { + [[maybe_unused]] auto [acc, mapping] = + cpu_impl::MakeCatAccessor(ctx, adapter->Cats(), ref_cats_ptr); + return std::move(mapping); + }); + auto cats_mapping = enc::MappingView{adapter->Cats().feature_segments, cached}; + return EncColumnarAdapterBatch{adapter->Columns(), CatAccessor{cats_mapping}}; + } + CHECK(!adapter->HasRefCategorical()) + << "ColumnarAdapter has reference categorical view but no CatContainer pointer."; + return EncColumnarAdapterBatch{adapter->Columns(), CatAccessor{}}; } -inline auto MakeEncColumnarBatch(Context const* ctx, - std::shared_ptr const& adapter) { +inline EncColumnarAdapterBatch MakeEncColumnarBatch( + Context const* ctx, std::shared_ptr const& adapter) { return MakeEncColumnarBatch(ctx, adapter.get()); } diff --git a/src/data/cat_container.cc b/src/data/cat_container.cc index f6f0fc0214ae..44da4e2a16a4 100644 --- a/src/data/cat_container.cc +++ b/src/data/cat_container.cc @@ -6,6 +6,7 @@ #include // for copy #include // for size_t #include // for make_unique +#include // for lock_guard, scoped_lock #include // for move #include // for vector @@ -16,6 +17,37 @@ #include "xgboost/json.h" // for Json namespace xgboost { +namespace { +// Validate Arrow StringArray offset invariants before the copy; malformed offsets cause +// SortNames to compute OOB substrings and break stable_sort's strict-weak-ordering. +void ValidateCatStrArrayOffsets(enc::CatStrArrayView const& str) { + if (str.offsets.empty()) { + return; + } + constexpr auto kHint = + " The producing dataframe library is emitting inconsistent Arrow data; update it" + " to the latest version."; + CHECK_EQ(str.offsets.front(), 0) + << "Malformed Arrow categorical dictionary: offsets[0] must be 0." << kHint; + auto const n = str.offsets.size(); + for (std::size_t i = 0; i < n; ++i) { + auto const off = str.offsets[i]; + CHECK_GE(off, 0) + << "Malformed Arrow categorical dictionary: offsets[" << i << "] = " << off + << " is negative." << kHint; + if (i + 1 < n) { + CHECK_LE(off, str.offsets[i + 1]) + << "Malformed Arrow categorical dictionary: offsets not monotonic at i=" << i + << "." << kHint; + } + } + auto last = static_cast(str.offsets.back()); + CHECK_LE(last, str.values.size()) + << "Malformed Arrow categorical dictionary: last offset " << last + << " exceeds values buffer size " << str.values.size() << "." << kHint; +} +} // namespace + CatContainer::CatContainer(enc::HostColumnsView const& df, bool is_ref) : CatContainer{} { this->is_ref_ = is_ref; this->n_total_cats_ = df.n_total_cats; @@ -30,6 +62,7 @@ CatContainer::CatContainer(enc::HostColumnsView const& df, bool is_ref) : CatCon for (auto const& col : df.columns) { std::visit(enc::Overloaded{ [this](enc::CatStrArrayView str) { + ValidateCatStrArrayOffsets(str); using T = typename cpu_impl::ViewToStorageImpl::Type; this->cpu_impl_->columns.emplace_back(); this->cpu_impl_->columns.back().emplace(); @@ -116,6 +149,8 @@ struct PrimToUbj { } // anonymous namespace void CatContainer::Save(Json* p_out) const { + // serializes the full container snapshot against Sort()/Copy() + std::lock_guard guard{sort_mu_}; [[maybe_unused]] auto _ = this->HostView(); auto& out = *p_out; @@ -166,6 +201,9 @@ void CatContainer::Save(Json* p_out) const { out["sorted_idx"] = std::move(jsorted_index); out["feature_segments"] = std::move(jf_segments); out["enc"] = arr; + // persist is_ref_ and sorted_; optional fields for back-compat with pre-field models + out["is_ref"] = Boolean{this->is_ref_}; + out["sorted"] = Boolean{this->sorted_}; } namespace { @@ -187,6 +225,8 @@ void LoadJson(Json jvalues, Vec* p_out) { } // namespace void CatContainer::Load(Json const& in) { + // serializes the full container snapshot against Sort()/Copy() + std::lock_guard guard{sort_mu_}; auto array = get(in["enc"]); auto n_features = array.size(); @@ -266,6 +306,19 @@ void CatContainer::Load(Json const& in) { auto& h_sorted_idx = this->sorted_idx_.HostVector(); LoadJson(in["sorted_idx"], &h_sorted_idx); + // back-compat: missing fields default to is_ref=false, sorted=!sorted_idx.empty() + auto const& obj = get(in); + if (auto it = obj.find("is_ref"); it != obj.cend()) { + this->is_ref_ = get(it->second); + } else { + this->is_ref_ = false; + } + if (auto it = obj.find("sorted"); it != obj.cend()) { + this->sorted_ = get(it->second); + } else { + this->sorted_ = !h_sorted_idx.empty(); + } + this->cpu_impl_->Finalize(); } @@ -275,6 +328,12 @@ CatContainer::CatContainer() : cpu_impl_{std::make_uniquedevice_mu_ guards + // destination writes against a concurrent this->HostView() on another thread + std::scoped_lock guard{this->sort_mu_, that.sort_mu_, this->device_mu_}; [[maybe_unused]] auto h_view = that.HostView(); this->CopyCommon(ctx, that); this->cpu_impl_->Copy(that.cpu_impl_.get()); @@ -290,9 +349,16 @@ void CatContainer::Copy(Context const* ctx, CatContainer const& that) { void CatContainer::Sort(Context const* ctx) { CHECK(ctx->IsCPU()); + // sort_mu_ serializes Sort()/Copy(); HasCategorical() reads n_total_cats_ which + // Copy() writes under sort_mu_, so check inside the lock + std::lock_guard guard{sort_mu_}; + if (!this->HasCategorical() || this->sorted_) { + return; + } auto view = this->HostView(); this->sorted_idx_.HostVector().resize(view.n_total_cats); enc::SortNames(enc::Policy{}, view, this->sorted_idx_.HostSpan()); + this->sorted_ = true; } #endif // !defined(XGBOOST_USE_CUDA) diff --git a/src/data/cat_container.cu b/src/data/cat_container.cu index d957089b8ea1..e9297ac2c058 100644 --- a/src/data/cat_container.cu +++ b/src/data/cat_container.cu @@ -4,6 +4,7 @@ #include // for copy #include // for make_unique +#include // for lock_guard, scoped_lock #include // for vector #include "../common/cuda_context.cuh" // for CUDAContext @@ -154,6 +155,7 @@ CatContainer::CatContainer() // NOLINT CatContainer::CatContainer(Context const* ctx, enc::DeviceColumnsView const& df, bool is_ref) : CatContainer{} { + // device path skips ValidateCatStrArrayOffsets; cuDF validates Arrow offsets upstream this->is_ref_ = is_ref; this->n_total_cats_ = df.n_total_cats; @@ -179,6 +181,12 @@ CatContainer::CatContainer(Context const* ctx, enc::DeviceColumnsView const& df, CatContainer::~CatContainer() = default; void CatContainer::Copy(Context const* ctx, CatContainer const& that) { + if (&that == this) { + return; + } + // scoped_lock serializes concurrent a.Copy(b)+b.Copy(a); that.HostView() and + // that.DeviceView() acquire that.device_mu_ internally for the brief migration path + std::scoped_lock guard{this->sort_mu_, that.sort_mu_, this->device_mu_}; if (ctx->IsCPU()) { // Pull data to host [[maybe_unused]] auto h_view = that.HostView(); @@ -247,10 +255,12 @@ void CatContainer::Copy(Context const* ctx, CatContainer const& that) { } void CatContainer::Sort(Context const* ctx) { - if (!this->HasCategorical()) { + // sort_mu_ serializes Sort()/Copy(); HasCategorical() reads n_total_cats_ which + // Copy() writes under sort_mu_, so check inside the lock + std::lock_guard guard{sort_mu_}; + if (!this->HasCategorical() || this->sorted_) { return; } - if (ctx->IsCPU()) { auto view = this->HostView(); CHECK(!view.Empty()) << view.n_total_cats; @@ -263,6 +273,7 @@ void CatContainer::Sort(Context const* ctx) { this->sorted_idx_.Resize(view.n_total_cats); enc::SortNames(cuda_impl::EncPolicy, view, this->sorted_idx_.DeviceSpan()); } + this->sorted_ = true; } [[nodiscard]] enc::HostColumnsView CatContainer::HostView() const { diff --git a/src/data/cat_container.h b/src/data/cat_container.h index 4ad989b16ce4..cf8d65466f73 100644 --- a/src/data/cat_container.h +++ b/src/data/cat_container.h @@ -108,9 +108,10 @@ struct CatContainerImpl; */ class CatContainer { /** - * @brief Implementation of the Copy method, used by both CPU and GPU. Note that this - * method changes the permission in the HostDeviceVector as we need to pull data into - * targeted devices. + * @brief Implementation of the Copy method, used by both CPU and GPU. Changes the + * permission in the HostDeviceVector to pull data into the targeted device. + * + * @pre caller holds @c sort_mu_ on both @c *this and @c *that (see @ref Copy()). */ void CopyCommon(Context const* ctx, CatContainer const& that) { auto device = ctx->Device(); @@ -126,6 +127,9 @@ class CatContainer { this->feature_segments_.Copy(that.feature_segments_); this->n_total_cats_ = that.n_total_cats_; + // sorted_idx_ already copied above; carry the flag so Sort() short-circuits + this->sorted_ = that.sorted_; + this->is_ref_ = that.is_ref_; if (!device.IsCPU()) { // Pull to device @@ -180,9 +184,14 @@ class CatContainer { void Sort(Context const* ctx); /** * @brief Obtain a view to the sorted index created by the @ref Sort method. + * + * @warning Caller is responsible for serializing span use beyond construction. */ [[nodiscard]] common::Span RefSortedIndex(Context const* ctx) const { - std::lock_guard guard{device_mu_}; + // sort_mu_ excludes concurrent Sort(); device_mu_ excludes the SetDevice() / + // permission flip on sorted_idx_'s HostDeviceVector storage; both held atomically + // to follow the class-wide lock order (sort_mu_ then device_mu_) + std::scoped_lock guard{sort_mu_, device_mu_}; if (ctx->IsCPU()) { return this->sorted_idx_.ConstHostSpan(); } else { @@ -212,16 +221,24 @@ class CatContainer { #endif // defined(XGBOOST_USE_CUDA) private: - mutable std::mutex device_mu_; // mutex for copying between devices. + // mutex for copying between devices; HostView/DeviceView take only this lock and + // do not touch sort_mu_; lock order: sort_mu_ then device_mu_ + mutable std::mutex device_mu_; + // serializes Sort()/Copy()/Save()/Load() and Copy() flag propagation; + // lock order: sort_mu_ then device_mu_ + mutable std::mutex sort_mu_; HostDeviceVector feature_segments_; bst_cat_t n_total_cats_{0}; std::unique_ptr cpu_impl_; HostDeviceVector sorted_idx_; + // idempotency flag for Sort(); guarded by sort_mu_ + bool sorted_{false}; #if defined(XGBOOST_USE_CUDA) std::unique_ptr cu_impl_; #endif // defined(XGBOOST_USE_CUDA) + // Copy() propagates this from src under sort_mu_ bool is_ref_{false}; }; diff --git a/src/data/cat_container_hash.cuh b/src/data/cat_container_hash.cuh new file mode 100644 index 000000000000..5a2a52e1d13d --- /dev/null +++ b/src/data/cat_container_hash.cuh @@ -0,0 +1,61 @@ +/** + * Copyright 2026, XGBoost Contributors + */ +#pragma once + +#include // for visit + +#include // for uint64_t + +#include "../common/cuda_context.cuh" // for CUDAContext +#include "../common/device_helpers.cuh" // for LaunchN, TemporaryArray +#include "../encoder/ordinal.h" // for DeviceColumnsView +#include "cat_container_hash.h" // for CatContentDigest, k* constants + +namespace xgboost::data { +/** + * @brief Device sibling of @ref HashCatHostContent. + * + * Single-thread kernel because the byte fold is sequential; only the 16-byte digest + * crosses PCIe. Mixed CPU/GPU clusters compare digests directly only when host and device + * column views expose byte-identical offset and value buffers per feature. + * + * @param ctx CUDA context whose stream owns the kernel + memcpy. + * @param view Device columns view; offsets and values bytes are folded per column. + * @return Dual-component digest seeded with `n_total_cats` and `columns.size()`. + */ +[[nodiscard]] inline CatContentDigest HashCatDeviceContent( + Context const* ctx, enc::DeviceColumnsView const& view) { + dh::TemporaryArray d_digest(1); + auto* p_digest = d_digest.data().get(); + auto columns = view.columns; + auto n_total_cats = view.n_total_cats; + dh::LaunchN(1, ctx->CUDACtx()->Stream(), [=] __device__(std::size_t) { + std::uint64_t h1 = kHashSeedPrimary ^ static_cast(n_total_cats); + std::uint64_t h2 = kHashSeedSecondary ^ static_cast(columns.size()); + auto fold = [&](void const* p, std::size_t n) { + auto const* b = static_cast(p); + for (std::size_t i = 0; i < n; ++i) { + h1 = (h1 ^ b[i]) * kHashPrimePrimary; + h2 = (h2 ^ b[i]) * kHashPrimeSecondary; + } + }; + for (auto const& column : columns) { + cuda::std::visit( + enc::Overloaded{ + [&](enc::CatStrArrayView const& s) { + fold(s.offsets.data(), s.offsets.size_bytes()); + fold(s.values.data(), s.values.size_bytes()); + }, + [&](auto const& idx) { fold(idx.data(), idx.size_bytes()); }}, + column); + } + *p_digest = CatContentDigest{h1, h2}; + }); + CatContentDigest result{}; + dh::safe_cuda(cudaMemcpyAsync(&result, p_digest, sizeof(result), cudaMemcpyDeviceToHost, + ctx->CUDACtx()->Stream())); + ctx->CUDACtx()->Stream().Sync(); + return result; +} +} // namespace xgboost::data diff --git a/src/data/cat_container_hash.h b/src/data/cat_container_hash.h new file mode 100644 index 000000000000..ccaafe647e84 --- /dev/null +++ b/src/data/cat_container_hash.h @@ -0,0 +1,68 @@ +/** + * Copyright 2026, XGBoost Contributors + */ +#pragma once + +#include // for size_t +#include // for uint8_t, uint64_t +#include // for visit + +#include "../encoder/ordinal.h" // for HostColumnsView +#include "../encoder/types.h" // for Overloaded + +namespace xgboost::data { +// Primary stream is standard FNV-1a 64-bit (offset basis + prime); secondary stream +// uses CityHash/FarmHash mixing constants for an independent fold. Agreement on both +// components after Allreduce kMin+kMax indicates dictionary parity. +inline constexpr std::uint64_t kHashSeedPrimary = 0xcbf29ce484222325ULL; +inline constexpr std::uint64_t kHashSeedSecondary = 0x9ae16a3b2f90404fULL; +inline constexpr std::uint64_t kHashPrimePrimary = 0x100000001b3ULL; +inline constexpr std::uint64_t kHashPrimeSecondary = 0xc2b2ae3d27d4eb4fULL; +static_assert(kHashPrimePrimary != kHashPrimeSecondary, + "secondary prime must differ from primary to keep streams independent"); +static_assert(kHashSeedPrimary != kHashSeedSecondary, + "secondary seed must differ from primary to keep streams independent"); + +/** @brief Dual 64-bit content digest used for cross-worker dictionary parity. */ +struct CatContentDigest { + std::uint64_t primary; + std::uint64_t secondary; + + friend bool operator==(CatContentDigest const& a, CatContentDigest const& b) { + return a.primary == b.primary && a.secondary == b.secondary; + } + friend bool operator!=(CatContentDigest const& a, CatContentDigest const& b) { + return !(a == b); + } +}; + +/** + * @brief Fold a host CatContainer view's bytes into the dual-stream content digest. + * + * @param view Host columns view; offsets and values bytes are folded per column. + * @return Dual-component digest seeded with `n_total_cats` and `columns.size()`. + */ +[[nodiscard]] inline CatContentDigest HashCatHostContent(enc::HostColumnsView const& view) { + // seed with n_total_cats + columns.size() so empty-bytes workers diverge on shape + std::uint64_t h1 = kHashSeedPrimary ^ static_cast(view.n_total_cats); + std::uint64_t h2 = kHashSeedSecondary ^ static_cast(view.columns.size()); + auto fold = [&](void const* p, std::size_t n) { + auto const* b = static_cast(p); + for (std::size_t i = 0; i < n; ++i) { + h1 = (h1 ^ b[i]) * kHashPrimePrimary; + h2 = (h2 ^ b[i]) * kHashPrimeSecondary; + } + }; + for (auto const& col : view.columns) { + std::visit(enc::Overloaded{[&](enc::CatStrArrayView const& s) { + fold(s.offsets.data(), s.offsets.size_bytes()); + fold(s.values.data(), s.values.size_bytes()); + }, + [&](auto const& idx) { + fold(idx.data(), idx.size_bytes()); + }}, + col); + } + return {h1, h2}; +} +} // namespace xgboost::data diff --git a/src/data/data.cc b/src/data/data.cc index 919720a634a8..062768c36cd2 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -900,6 +900,7 @@ bool MetaInfo::ShouldHaveLabels() const { [[nodiscard]] std::shared_ptr MetaInfo::CatsShared() const { return this->cats_; } +[[nodiscard]] std::shared_ptr MetaInfo::CatsShared() { return this->cats_; } void MetaInfo::Cats(std::shared_ptr cats) { this->cats_ = std::move(cats); diff --git a/src/data/device_adapter.cu b/src/data/device_adapter.cu index 38a52ebfbb25..988f5a07b4a4 100644 --- a/src/data/device_adapter.cu +++ b/src/data/device_adapter.cu @@ -7,10 +7,13 @@ namespace xgboost::data { namespace { -auto GetRefCats(Context const* ctx, Json handle, - std::vector* p_h_ref_cats) { +// returns both the CatContainer pointer and its device view; keeping the pointer lets +// MakeEncColumnarBatch alias the reference dictionary instead of building a fresh +// CatContainer on every adapter dispatch (same optimization as the CPU path) +std::pair GetRefCats( + Context const* ctx, Json handle, std::vector* p_h_ref_cats) { auto& h_ref_cats = *p_h_ref_cats; - auto cats = reinterpret_cast(get(handle)); + auto cats = reinterpret_cast(get(handle)); CHECK(cats); auto d_cats = cats->DeviceView(ctx); // FIXME(jiamingy): Remove this along with the host copy in the cat container once @@ -18,7 +21,7 @@ auto GetRefCats(Context const* ctx, Json handle, h_ref_cats.resize(d_cats.columns.size()); thrust::copy(dh::tcbegin(d_cats.columns), dh::tcend(d_cats.columns), h_ref_cats.begin()); d_cats.columns = common::Span{h_ref_cats}; - return d_cats; + return {cats, d_cats}; } } // anonymous namespace @@ -28,7 +31,10 @@ CudfAdapter::CudfAdapter(StringView cuda_arrinf) { if (IsA(jdf)) { // Has reference categories. auto ctx = Context{}.MakeCUDA(curt::CurrentDevice()); - this->ref_cats_ = GetRefCats(&ctx, jdf["ref_categories"], &this->h_ref_cats_); + auto [ref_cats_ptr, ref_cats_view] = + GetRefCats(&ctx, jdf["ref_categories"], &this->h_ref_cats_); + this->ref_cats_ptr_ = ref_cats_ptr; + this->ref_cats_ = ref_cats_view; jdf = jdf["columns"]; } diff --git a/src/data/device_adapter.cuh b/src/data/device_adapter.cuh index ed63892c5364..7076184c6852 100644 --- a/src/data/device_adapter.cuh +++ b/src/data/device_adapter.cuh @@ -11,7 +11,9 @@ #include // for variant #include // for numeric_limits #include // for make_unique +#include // for once_flag, call_once #include // for string +#include // for forward #include "../common/algorithm.cuh" // for AllOf #include "../common/cuda_context.cuh" @@ -70,6 +72,13 @@ class CudfAdapter : public detail::SingleBatchDataIter { explicit CudfAdapter(std::string cuda_interfaces_str) : CudfAdapter{StringView{cuda_interfaces_str}} {} + // non-copyable and non-movable (owns std::once_flag for the ref mapping cache, + // matches the ColumnarAdapter CPU sibling) + CudfAdapter(CudfAdapter const&) = delete; + CudfAdapter& operator=(CudfAdapter const&) = delete; + CudfAdapter(CudfAdapter&&) = delete; + CudfAdapter& operator=(CudfAdapter&&) = delete; + [[nodiscard]] CudfAdapterBatch const& Value() const override { CHECK_EQ(batch_.Columns().data(), columns_.data().get()); return batch_; @@ -87,6 +96,9 @@ class CudfAdapter : public detail::SingleBatchDataIter { return {dh::ToSpan(this->d_cats_), dh::ToSpan(this->cat_segments_), this->n_total_cats_}; } [[nodiscard]] enc::DeviceColumnsView RefCats() const { return ref_cats_; } + // non-owning; pointee outlives the adapter; non-const so dispatchers can call Sort() + // on the ref CatContainer through a const adapter + [[nodiscard]] CatContainer* RefCatsPtr() const { return this->ref_cats_ptr_; } [[nodiscard]] bool HasCategorical() const { return n_total_cats_ != 0; } [[nodiscard]] bool HasRefCategorical() const { return this->ref_cats_.n_total_cats != 0; } @@ -94,6 +106,18 @@ class CudfAdapter : public detail::SingleBatchDataIter { return dh::ToSpan(this->columns_); } + /** @brief Cached Recode mapping; thread-safe via std::call_once, first call wins. + * + * @warning Returned span aliases adapter storage (lifetime <= adapter) and is pinned + * to the device of the first invocation; later callers must dispatch on that device. + */ + template + [[nodiscard]] common::Span CachedRefMapping(Fn&& builder) const { + std::call_once(this->cache_once_, + [&] { this->cached_ref_mapping_ = std::forward(builder)(); }); + return dh::ToSpan(this->cached_ref_mapping_); + } + private: CudfAdapterBatch batch_; dh::device_vector> columns_; @@ -106,6 +130,11 @@ class CudfAdapter : public detail::SingleBatchDataIter { enc::DeviceColumnsView ref_cats_; // A view to the reference category. std::vector h_ref_cats_; // host storage for column view + // non-owning; pointee outlives the adapter (caller keeps the ref DMatrix alive) + CatContainer* ref_cats_ptr_{nullptr}; + // cached Recode mapping; write-once via CachedRefMapping() + mutable std::once_flag cache_once_; + mutable dh::DeviceUVector cached_ref_mapping_; size_t num_rows_{0}; bst_idx_t n_bytes_{0}; @@ -139,15 +168,27 @@ class CupyAdapterBatch : public detail::NoMetaInfo { ArrayInterface<2> array_interface_; }; -inline auto MakeEncColumnarBatch(Context const* ctx, CudfAdapter const* adapter) { - auto cats = std::make_unique(ctx, adapter->RefCats(), true); - cats->Sort(ctx); - auto [acc, mapping] = ::xgboost::cuda_impl::MakeCatAccessor(ctx, adapter->DCats(), cats.get()); - return std::tuple{EncCudfAdapterBatch{adapter->Columns(), acc, adapter->NumRows()}, - std::move(mapping)}; +inline EncCudfAdapterBatch MakeEncColumnarBatch(Context const* ctx, + CudfAdapter const* adapter) { + // alias the reference dictionary; Sort() is idempotent under sort_mu_ + auto* ref_cats_ptr = adapter->RefCatsPtr(); + if (ref_cats_ptr != nullptr) { + ref_cats_ptr->Sort(ctx); + auto cached = adapter->CachedRefMapping([&] { + [[maybe_unused]] auto [acc, mapping] = + ::xgboost::cuda_impl::MakeCatAccessor(ctx, adapter->DCats(), ref_cats_ptr); + return std::move(mapping); + }); + auto cats_mapping = enc::MappingView{adapter->DCats().feature_segments, cached}; + return EncCudfAdapterBatch{adapter->Columns(), CatAccessor{cats_mapping}, adapter->NumRows()}; + } + CHECK(!adapter->HasRefCategorical()) + << "CudfAdapter has reference categorical view but no CatContainer pointer."; + return EncCudfAdapterBatch{adapter->Columns(), CatAccessor{}, adapter->NumRows()}; } -inline auto MakeEncColumnarBatch(Context const* ctx, std::shared_ptr const& adapter) { +inline EncCudfAdapterBatch MakeEncColumnarBatch( + Context const* ctx, std::shared_ptr const& adapter) { return MakeEncColumnarBatch(ctx, adapter.get()); } diff --git a/src/data/extmem_quantile_dmatrix.cc b/src/data/extmem_quantile_dmatrix.cc index d2e05da3ca0d..5f7772bae70c 100644 --- a/src/data/extmem_quantile_dmatrix.cc +++ b/src/data/extmem_quantile_dmatrix.cc @@ -80,7 +80,7 @@ void ExtMemQuantileDMatrix::InitFromCPU( common::HistogramCuts cuts{0}; ExternalDataInfo ext_info; - cpu_impl::GetDataShape(ctx, proxy, iter.get(), missing, &ext_info); + cpu_impl::GetDataShape(ctx, proxy, iter.get(), missing, ref, &ext_info); ext_info.SetInfo(ctx, true, &this->info_); this->n_batches_ = ext_info.n_batches; diff --git a/src/data/gradient_index.cc b/src/data/gradient_index.cc index dd698987ab53..2cc42c0651e0 100644 --- a/src/data/gradient_index.cc +++ b/src/data/gradient_index.cc @@ -171,12 +171,13 @@ void GHistIndexMatrix::ResizeIndex(Context const *ctx, const size_t n_index, con common::Index{common::Span{data.data(), static_cast(data.size())}, t_size}; }; - if ((MaxNumBinPerFeat() - 1 <= static_cast(std::numeric_limits::max())) && - isDense) { + // evaluate once per ResizeIndex call; MaxNumBinPerFeat() is O(n_features) now + auto const max_bin_per_feat = MaxNumBinPerFeat(); + if ((max_bin_per_feat - 1 <= static_cast(std::numeric_limits::max())) && isDense) { // compress dense index to uint8 make_index(std::uint8_t{}, common::kUint8BinsTypeSize); - } else if ((MaxNumBinPerFeat() - 1 > static_cast(std::numeric_limits::max()) && - MaxNumBinPerFeat() - 1 <= static_cast(std::numeric_limits::max())) && + } else if ((max_bin_per_feat - 1 > static_cast(std::numeric_limits::max()) && + max_bin_per_feat - 1 <= static_cast(std::numeric_limits::max())) && isDense) { // compress dense index to uint16 make_index(std::uint16_t{}, common::kUint16BinsTypeSize); diff --git a/src/data/gradient_index.h b/src/data/gradient_index.h index 7cd0b814a393..0cba6c1e3130 100644 --- a/src/data/gradient_index.h +++ b/src/data/gradient_index.h @@ -161,7 +161,13 @@ class GHistIndexMatrix { bst_idx_t base_rowid{0}; [[nodiscard]] bst_bin_t MaxNumBinPerFeat() const { - return std::max(static_cast(cut.MaxCategory() + 1), max_numeric_bins_per_feat); + // widest per-feature bin range across cut.Ptrs() (cut_values is no longer dense) + auto const& ptrs = cut.Ptrs(); + bst_bin_t max_bins = max_numeric_bins_per_feat; + for (std::size_t f = 1; f < ptrs.size(); ++f) { + max_bins = std::max(max_bins, static_cast(ptrs[f] - ptrs[f - 1])); + } + return max_bins; } ~GHistIndexMatrix(); diff --git a/src/data/iterative_dmatrix.cc b/src/data/iterative_dmatrix.cc index e5e46d872709..237b13bac4c3 100644 --- a/src/data/iterative_dmatrix.cc +++ b/src/data/iterative_dmatrix.cc @@ -66,7 +66,7 @@ void IterativeDMatrix::InitFromCPU( common::HistogramCuts cuts{0}; ExternalDataInfo ext_info; - cpu_impl::GetDataShape(ctx, proxy, &iter, missing, &ext_info); + cpu_impl::GetDataShape(ctx, proxy, &iter, missing, ref, &ext_info); ext_info.SetInfo(ctx, true, &this->info_); /** diff --git a/src/data/proxy_dmatrix.cc b/src/data/proxy_dmatrix.cc index 3bd680be6bc0..ff6317bb1c27 100644 --- a/src/data/proxy_dmatrix.cc +++ b/src/data/proxy_dmatrix.cc @@ -58,7 +58,7 @@ namespace cuda_impl { common::AssertGPUSupport(); return 0; } -[[nodiscard]] bst_idx_t BatchColumns(DMatrixProxy const *) { +[[nodiscard]] bst_feature_t BatchColumns(DMatrixProxy const *) { common::AssertGPUSupport(); return 0; } diff --git a/src/data/proxy_dmatrix.cu b/src/data/proxy_dmatrix.cu index 34d9221f2ece..644ceab1a219 100644 --- a/src/data/proxy_dmatrix.cu +++ b/src/data/proxy_dmatrix.cu @@ -1,6 +1,8 @@ /** * Copyright 2020-2025, XGBoost contributors */ +#include // for numeric_limits + #include "../encoder/ordinal.h" // for DeviceColumnsView #include "device_adapter.cuh" #include "proxy_dmatrix.cuh" @@ -48,11 +50,19 @@ std::shared_ptr CreateDMatrixFromProxy(Context const* ctx, } [[nodiscard]] bst_idx_t BatchSamples(DMatrixProxy const* proxy) { - return cuda_impl::DispatchAny(proxy, [](auto const& value) { return value.NumRows(); }); + return cuda_impl::DispatchAny( + proxy, [](auto const& adapter) -> bst_idx_t { return adapter->NumRows(); }); } -[[nodiscard]] bst_idx_t BatchColumns(DMatrixProxy const* proxy) { - return cuda_impl::DispatchAny(proxy, [](auto const& value) { return value.NumCols(); }); +[[nodiscard]] bst_feature_t BatchColumns(DMatrixProxy const* proxy) { + return cuda_impl::DispatchAny( + proxy, [](auto const& adapter) -> bst_feature_t { + auto const cols = adapter->NumColumns(); + CHECK_LE(cols, static_cast(std::numeric_limits::max())) + << "Number of features exceeds bst_feature_t range; reduce the column" + " count or update the producing library."; + return static_cast(cols); + }); } [[nodiscard]] bool BatchCatsIsRef(DMatrixProxy const* proxy) { diff --git a/src/data/proxy_dmatrix.cuh b/src/data/proxy_dmatrix.cuh index 06c516d43931..ea4ee688a8dd 100644 --- a/src/data/proxy_dmatrix.cuh +++ b/src/data/proxy_dmatrix.cuh @@ -32,7 +32,7 @@ decltype(auto) DispatchAny(Context const* ctx, std::any x, Fn&& fn, bool* type_e if constexpr (get_value) { auto value = adapter->Value(); if (adapter->HasRefCategorical()) { - auto [batch, mapping] = MakeEncColumnarBatch(ctx, adapter); + auto batch = MakeEncColumnarBatch(ctx, adapter); return fn(batch); } return fn(value); diff --git a/src/data/proxy_dmatrix.h b/src/data/proxy_dmatrix.h index b2ea9a2d5afa..5f4046066c17 100644 --- a/src/data/proxy_dmatrix.h +++ b/src/data/proxy_dmatrix.h @@ -7,6 +7,7 @@ #include // for none_of #include // for any, any_cast #include // for uint32_t, int32_t +#include // for numeric_limits #include // for shared_ptr #include // for invoke_result_t, declval #include // for forward @@ -207,7 +208,7 @@ decltype(auto) DispatchAny(Context const* ctx, std::any x, Fn&& fn, bool* type_e return fn(value); } else { auto value = std::any_cast>(x); - fn(value); + return fn(value); } } else if (x.type() == typeid(AddPtrT)) { has_type(); @@ -233,7 +234,7 @@ decltype(auto) DispatchAny(Context const* ctx, std::any x, Fn&& fn, bool* type_e if constexpr (get_value) { auto value = adapter->Value(); if (adapter->HasRefCategorical()) { - auto [batch, mapping] = MakeEncColumnarBatch(ctx, adapter); + auto batch = MakeEncColumnarBatch(ctx, adapter); return fn(batch); } return fn(value); @@ -289,40 +290,43 @@ std::shared_ptr CreateDMatrixFromProxy(Context const* ctx, namespace cuda_impl { [[nodiscard]] bst_idx_t BatchSamples(DMatrixProxy const*); -[[nodiscard]] bst_idx_t BatchColumns(DMatrixProxy const*); +[[nodiscard]] bst_feature_t BatchColumns(DMatrixProxy const*); #if defined(XGBOOST_USE_CUDA) [[nodiscard]] bool BatchCatsIsRef(DMatrixProxy const*); [[nodiscard]] enc::DeviceColumnsView BatchCats(DMatrixProxy const*); #endif // defined(XGBOOST_USE_CUDA) } // namespace cuda_impl -/** - * @brief Get the number of samples for the current batch. - */ +/** @brief Get the number of samples for the current batch. */ [[nodiscard]] inline bst_idx_t BatchSamples(DMatrixProxy const* proxy) { bool type_error = false; - auto n_samples = - cpu_impl::DispatchAny(proxy, [](auto const& value) { return value.NumRows(); }, &type_error); + auto n_samples = cpu_impl::DispatchAny( + proxy, [](auto const& adapter) -> bst_idx_t { return adapter->NumRows(); }, &type_error); if (type_error) { n_samples = cuda_impl::BatchSamples(proxy); } return n_samples; } -/** - * @brief Get the number of features for the current batch. - */ +/** @brief Get the number of features for the current batch. */ [[nodiscard]] inline bst_feature_t BatchColumns(DMatrixProxy const* proxy) { bool type_error = false; - auto n_features = - cpu_impl::DispatchAny(proxy, [](auto const& value) { return value.NumCols(); }, &type_error); + auto n_features = cpu_impl::DispatchAny( + proxy, + [](auto const& adapter) -> bst_feature_t { + auto const cols = adapter->NumColumns(); + CHECK_LE(cols, static_cast(std::numeric_limits::max())) + << "Number of features exceeds bst_feature_t range; reduce the column" + " count or update the producing library."; + return static_cast(cols); + }, + &type_error); if (type_error) { n_features = cuda_impl::BatchColumns(proxy); } return n_features; } -namespace cpu_impl {} // namespace cpu_impl [[nodiscard]] bool BatchCatsIsRef(DMatrixProxy const* proxy); } // namespace xgboost::data #endif // XGBOOST_DATA_PROXY_DMATRIX_H_ diff --git a/src/data/quantile_dmatrix.cc b/src/data/quantile_dmatrix.cc index 2000ebabcb44..b1159268f2e7 100644 --- a/src/data/quantile_dmatrix.cc +++ b/src/data/quantile_dmatrix.cc @@ -3,6 +3,8 @@ */ #include "quantile_dmatrix.h" +#include // for array +#include // for int32_t, uint64_t #include // for accumulate #include "../collective/allreduce.h" // for Allreduce @@ -10,12 +12,51 @@ #include "../common/error_msg.h" // for InconsistentCategories #include "../common/threading_utils.h" // for ParallelFor #include "cat_container.h" // for CatContainer +#include "cat_container_hash.h" // for HashCatHostContent #include "gradient_index.h" // for GHistIndexMatrix #include "proxy_dmatrix.h" // for DispatchAny #include "xgboost/collective/result.h" // for SafeColl #include "xgboost/linalg.h" // for Tensor namespace xgboost::data { +[[nodiscard]] bool AllreduceRefAgreement(Context const* ctx, bool has_ref, bool has_cats) { + CHECK(collective::IsDistributed()); + // pack [has_ref, has_cats] into one payload, exchanged as kMin then kMax (two + // collectives); asymmetric workers fail on CHECK_EQ instead of deadlocking later + std::array lo{has_ref ? 1 : 0, has_cats ? 1 : 0}; + std::array hi{lo}; + collective::SafeColl( + collective::Allreduce(ctx, linalg::MakeVec(lo.data(), 2), collective::Op::kMin)); + collective::SafeColl( + collective::Allreduce(ctx, linalg::MakeVec(hi.data(), 2), collective::Op::kMax)); + CHECK_EQ(lo[0], hi[0]) + << "Inconsistent ref DMatrix presence across workers; every worker must pass" + " the same ref (or none) to QuantileDMatrix construction."; + CHECK_EQ(lo[1], hi[1]) + << "Inconsistent ref categorical state across workers; every worker's ref" + " DMatrix must carry the same categorical dictionary (or none)."; + bool const all_workers_have_cats = (lo[1] == 1); + return all_workers_have_cats; +} + +void AllreduceDigestAndCheck(Context const* ctx, CatContentDigest digest) { + CHECK(collective::IsDistributed()); + std::array lo{digest.primary, digest.secondary}; + std::array hi{lo}; + collective::SafeColl( + collective::Allreduce(ctx, linalg::MakeVec(lo.data(), 2), collective::Op::kMin)); + collective::SafeColl( + collective::Allreduce(ctx, linalg::MakeVec(hi.data(), 2), collective::Op::kMax)); + CHECK_EQ(lo[0], hi[0]) + << "Reference DMatrix categorical dictionary primary hash differs across" + " workers. Check that all workers built the reference from the same" + " source DataFrame (e.g. same polars StringCache scope)."; + CHECK_EQ(lo[1], hi[1]) + << "Reference DMatrix categorical dictionary secondary hash differs across" + " workers. Check that all workers built the reference from the same" + " source DataFrame (e.g. same polars StringCache scope)."; +} + void GetCutsFromRef(Context const* ctx, std::shared_ptr ref, bst_feature_t n_features, BatchParam p, common::HistogramCuts* p_cuts) { CHECK(ref); @@ -86,7 +127,7 @@ void SyncFeatureType(Context const* ctx, std::vector* p_h_ft) { void GetDataShape(Context const* ctx, DMatrixProxy* proxy, DataIterProxy* iter, float missing, - ExternalDataInfo* p_info) { + std::shared_ptr const& ref, ExternalDataInfo* p_info) { auto& info = *p_info; auto const is_valid = data::IsValidFunctor{missing}; @@ -127,8 +168,22 @@ void GetDataShape(Context const* ctx, DMatrixProxy* proxy, collective::SafeColl(collective::Allreduce(ctx, &info.n_features, collective::Op::kMax)); info.column_sizes.clear(); info.column_sizes.resize(info.n_features, 0); + // build the per-batch dictionary unconditionally; the predictor's Recode flow + // requires p_info->cats to be the proxy's own dictionary so MakeCatAccessor can + // map predict-side codes against the (separate) ref-side dictionary held in the + // model. Aliasing p_info->cats to ref->Info().CatsShared() collapses the two + // dictionaries and breaks Recode (test_ordinal.py::test_recode_dmatrix_predict + // and ::test_cat_shap surface this immediately). p_info->cats = std::make_shared(cpu_impl::BatchCats(proxy), BatchCatsIsRef(proxy)); + if (collective::IsDistributed()) { + bool const has_cats = + ref && p_info->cats && p_info->cats->HasCategorical(); + if (AllreduceRefAgreement(ctx, ref != nullptr, has_cats)) { + // hash the ref dictionary directly to detect cross-worker divergence + AllreduceDigestAndCheck(ctx, HashCatHostContent(ref->Info().CatsShared()->HostView())); + } + } } else { CHECK_EQ(info.n_features, BatchColumns(proxy)) << "Inconsistent number of columns."; auto cats = cpu_impl::BatchCats(proxy); diff --git a/src/data/quantile_dmatrix.cu b/src/data/quantile_dmatrix.cu index 7c66fe800a85..381b1a819ac8 100644 --- a/src/data/quantile_dmatrix.cu +++ b/src/data/quantile_dmatrix.cu @@ -6,19 +6,22 @@ #include // for partial_sum #include // for vector -#include "../collective/allreduce.h" // for Allreduce -#include "../common/cuda_context.cuh" // for CUDAContext -#include "../common/cuda_rt_utils.h" // for AllVisibleGPUs -#include "../common/device_vector.cuh" // for XGBCachingDeviceAllocator -#include "../common/error_msg.h" // for InconsistentCategories -#include "../common/hist_util.cuh" // for AdapterDeviceSketch -#include "../common/nvtx_utils.h" // for xgboost_NVTX_FN_RANGE -#include "../common/quantile.cuh" // for SketchContainer -#include "cat_container.h" // for CatContainer -#include "ellpack_page.cuh" // for EllpackPage -#include "proxy_dmatrix.cuh" // for DispatchAny -#include "proxy_dmatrix.h" // for DataIterProxy -#include "quantile_dmatrix.h" // for GetCutsFromRef +#include "../collective/allreduce.h" // for Allreduce +#include "../collective/communicator-inl.h" // for IsDistributed +#include "../common/cuda_context.cuh" // for CUDAContext +#include "../common/cuda_rt_utils.h" // for AllVisibleGPUs +#include "../common/device_vector.cuh" // for XGBCachingDeviceAllocator +#include "../common/error_msg.h" // for InconsistentCategories +#include "../common/hist_util.cuh" // for AdapterDeviceSketch +#include "../common/nvtx_utils.h" // for xgboost_NVTX_FN_RANGE +#include "../common/quantile.cuh" // for SketchContainer +#include "cat_container.h" // for CatContainer +#include "cat_container_hash.cuh" // for HashCatDeviceContent +#include "cat_container_hash.h" // for CatContentDigest +#include "ellpack_page.cuh" // for EllpackPage +#include "proxy_dmatrix.cuh" // for DispatchAny +#include "proxy_dmatrix.h" // for DataIterProxy +#include "quantile_dmatrix.h" // for GetCutsFromRef namespace xgboost::data { void GetCutsFromEllpack(EllpackPage const& page, common::HistogramCuts* cuts) { @@ -57,14 +60,36 @@ void MakeSketches(Context const* ctx, auto cats = cuda_impl::BatchCats(proxy); if (ext_info.n_features == 0) { ext_info.n_features = data::BatchColumns(proxy); - ext_info.cats = - std::make_shared(p_ctx, cats, ::xgboost::data::BatchCatsIsRef(proxy)); + // kMax(n_features) before the categorical digest -- matches cpu_impl::GetDataShape order auto rc = collective::Allreduce(ctx, linalg::MakeVec(&ext_info.n_features, 1), collective::Op::kMax); SafeColl(rc); + // build the per-batch dictionary unconditionally; the predictor's Recode flow + // requires ext_info.cats to be the proxy's own dictionary so MakeCatAccessor can + // map predict-side codes against the (separate) ref-side dictionary held in the + // model. Aliasing ext_info.cats to ref->Info().CatsShared() collapses the two + // dictionaries and breaks Recode (test_ordinal.py::test_recode_dmatrix_predict + // and ::test_cat_shap surface this immediately). + ext_info.cats = + std::make_shared(p_ctx, cats, ::xgboost::data::BatchCatsIsRef(proxy)); + if (collective::IsDistributed()) { + bool const has_cats = + ref && ext_info.cats && ext_info.cats->HasCategorical(); + if (AllreduceRefAgreement(ctx, ref != nullptr, has_cats)) { + // hash the ref dictionary directly; on a CUDA-side ref keep the fold on-device + auto const ref_cats = ref->Info().CatsShared(); + CatContentDigest digest; + if (ref_cats->DeviceCanRead()) { + digest = HashCatDeviceContent(ctx, ref_cats->DeviceView(ctx)); + } else { + digest = HashCatHostContent(ref_cats->HostView()); + } + AllreduceDigestAndCheck(ctx, digest); + } + } } else { - CHECK_EQ(cats.n_total_cats, ext_info.cats->NumCatsTotal()) << error::InconsistentCategories(); CHECK_EQ(ext_info.n_features, data::BatchColumns(proxy)) << "Inconsistent number of columns."; + CHECK_EQ(cats.n_total_cats, ext_info.cats->NumCatsTotal()) << error::InconsistentCategories(); } auto batch_rows = data::BatchSamples(proxy); diff --git a/src/data/quantile_dmatrix.h b/src/data/quantile_dmatrix.h index 44a53ca72c4e..527796cfcdf1 100644 --- a/src/data/quantile_dmatrix.h +++ b/src/data/quantile_dmatrix.h @@ -6,15 +6,40 @@ #include // for shared_ptr #include // for vector -#include "proxy_dmatrix.h" // for DataIterProxy -#include "xgboost/data.h" // for DMatrix, BatchIterator, SparsePage -#include "xgboost/span.h" // for Span +#include "cat_container_hash.h" // for CatContentDigest +#include "proxy_dmatrix.h" // for DataIterProxy +#include "xgboost/data.h" // for DMatrix, BatchIterator, SparsePage +#include "xgboost/span.h" // for Span namespace xgboost::common { class HistogramCuts; } // namespace xgboost::common namespace xgboost::data { +/** + * @brief Allreduce ref-presence and ref-cats-populated across workers; CHECK_EQ on + * disagreement. + * + * @pre @c collective::IsDistributed() is true. + * @param ctx Distributed context. + * @param has_ref Whether this worker received a non-null @c ref DMatrix. + * @param has_cats Whether this worker's ref CatContainer is populated. + * @return True iff every worker reports populated ref-cats; caller should then run + * the dictionary digest exchange. + */ +[[nodiscard]] bool AllreduceRefAgreement(Context const* ctx, bool has_ref, bool has_cats); + +/** + * @brief Allreduce a dual-component digest across workers and CHECK_EQ on either + * stream diverging. + * + * @pre @c collective::IsDistributed() is true. + * @param ctx Distributed context. + * @param digest Local dual-component digest from @c HashCatHostContent or + * @c HashCatDeviceContent. + */ +void AllreduceDigestAndCheck(Context const* ctx, CatContentDigest digest); + /** * @brief Base class for quantile-based DMatrix. * @@ -89,10 +114,19 @@ void SyncFeatureType(Context const *ctx, std::vector *p_h_ft); /** * @brief Fetch the external data shape. + * + * `p_info->cats` is always built from the per-batch proxy dictionary, never aliased to + * `ref->Info().CatsShared()`. The predictor's Recode flow needs the predict-side and + * ref-side dictionaries to be distinct objects; aliasing collapses the Recode mapping + * to identity and silently mis-routes categorical splits at predict time. + * + * When `ref` is set and the cluster is distributed, this function additionally hashes + * `ref->Info().CatsShared()` and Allreduces the digest so divergent ref dictionaries + * across workers fail fast on the matching CHECK_EQ. */ void GetDataShape(Context const *ctx, DMatrixProxy *proxy, DataIterProxy *iter, float missing, - ExternalDataInfo *p_info); + std::shared_ptr const &ref, ExternalDataInfo *p_info); /** * @brief Create quantile sketch for CPU from an external iterator or from a reference diff --git a/src/tree/hist/evaluate_splits.h b/src/tree/hist/evaluate_splits.h index 0b9eed3f3c33..676a53cc0159 100644 --- a/src/tree/hist/evaluate_splits.h +++ b/src/tree/hist/evaluate_splits.h @@ -162,8 +162,7 @@ class HistEvaluator { } if (best.is_cat) { - auto n = common::CatBitField::ComputeStorageSize(n_bins + 1); - best.cat_bits.resize(n, 0); + best.cat_bits.resize(common::SizeCatBitsForMaxCode(common::AsCat(best.split_value)), 0); common::CatBitField cat_bits{best.cat_bits}; cat_bits.Set(best.split_value); } @@ -242,8 +241,10 @@ class HistEvaluator { } if (best_thresh != -1) { - auto n = common::CatBitField::ComputeStorageSize(n_bins_feature); - best.cat_bits = decltype(best.cat_bits)(n, 0); + // size by max observed physical code; cut_val[f_begin..f_end) is sorted ascending + CHECK_GT(f_end, f_begin); + auto max_code = common::AsCat(cut_val[f_end - 1]); + best.cat_bits = decltype(best.cat_bits)(common::SizeCatBitsForMaxCode(max_code), 0); common::CatBitField cat_bits{best.cat_bits}; bst_bin_t partition = d_step == 1 ? (best_thresh - it_begin + 1) : (best_thresh - f_begin); CHECK_GT(partition, 0); @@ -620,8 +621,7 @@ class HistMultiEvaluator { } if (best.is_cat) { - auto n = common::CatBitField::ComputeStorageSize(n_bins + 1); - best.cat_bits.resize(n, 0); + best.cat_bits.resize(common::SizeCatBitsForMaxCode(common::AsCat(best.split_value)), 0); common::CatBitField cat_bits{best.cat_bits}; cat_bits.Set(best.split_value); } diff --git a/src/tree/updater_gpu_hist.cu b/src/tree/updater_gpu_hist.cu index 5e04212aaece..064c6b474c6d 100644 --- a/src/tree/updater_gpu_hist.cu +++ b/src/tree/updater_gpu_hist.cu @@ -607,7 +607,9 @@ struct GPUHistMakerDevice { return true; } - void ApplySplit(const GPUExpandEntry& candidate, RegTree* p_tree) { + void ApplySplit(GPUExpandEntry const& candidate, RegTree* p_tree, + std::vector const& h_cut_ptrs, + std::vector const& h_cut_vals) { RegTree& tree = *p_tree; // Sanity check - have we created a leaf with no training instances? @@ -632,9 +634,23 @@ struct GPUHistMakerDevice { std::vector split_cats; auto h_cats = this->evaluator_.GetHostNodeCats(candidate.nidx); - auto n_bins_feature = cuts_->FeatureBins(candidate.split.findex); - split_cats.resize(common::CatBitField::ComputeStorageSize(n_bins_feature), 0); - CHECK_LE(split_cats.size(), h_cats.size()); + // size by max observed physical code; cut_values is compact + sorted + auto f_end = h_cut_ptrs[candidate.split.findex + 1]; + auto f_begin = h_cut_ptrs[candidate.split.findex]; + // sketch invariant: a categorical split must have >= 1 cut for that feature + CHECK_GT(f_end, f_begin) + << "Categorical split on feature " << candidate.split.findex + << " but the feature has zero cuts; sketch invariant violated."; + auto max_code = common::AsCat(h_cut_vals[f_end - 1]); + // per-feature cuts are sorted ascending; bound is the global MaxCategory + CHECK_LE(max_code, common::AsCat(cuts_->MaxCategory())) + << "Per-feature max_code (" << max_code << ") exceeds global MaxCategory (" + << cuts_->MaxCategory() << "); cuts table out of sync with feature " << candidate.split.findex << "."; + split_cats.resize(common::SizeCatBitsForMaxCode(max_code), 0); + // h_cats is global-max-sized; per-feature is always <= + CHECK_LE(split_cats.size(), h_cats.size()) + << "Saved-tree split_cats overruns evaluator h_cats; per-feature max_code=" + << max_code << " exceeds the global MaxCategory the evaluator sized for."; std::copy(h_cats.data(), h_cats.data() + split_cats.size(), split_cats.data()); tree.ExpandCategorical(candidate.nidx, candidate.split.findex, split_cats, @@ -697,11 +713,17 @@ struct GPUHistMakerDevice { p_fmat = this->Reset(gpair_all, p_fmat); driver.Push({this->InitRoot(p_fmat, p_tree)}); + // hoist cuts host vectors out of the per-leaf hot path; cuts_ is fixed for the + // tree so ConstHostVector() may force a one-shot D->H migration here, never per + // categorical split + auto const& h_cut_ptrs = cuts_->cut_ptrs_.ConstHostVector(); + auto const& h_cut_vals = cuts_->cut_values_.ConstHostVector(); + // The set of leaves that can be expanded asynchronously auto expand_set = driver.Pop(); while (!expand_set.empty()) { for (auto& candidate : expand_set) { - this->ApplySplit(candidate, p_tree); + this->ApplySplit(candidate, p_tree, h_cut_ptrs, h_cut_vals); } // Get the candidates we are allowed to expand further // e.g. We do not bother further processing nodes whose children are beyond max depth diff --git a/tests/cpp/common/test_quantile.cc b/tests/cpp/common/test_quantile.cc index d1da5f564357..5d4228c3f928 100644 --- a/tests/cpp/common/test_quantile.cc +++ b/tests/cpp/common/test_quantile.cc @@ -13,6 +13,10 @@ #include "xgboost/context.h" namespace xgboost::common { + +// Forward declaration for the regression tests below; definition is in src/common/quantile.cc. +void AddCategories(std::set const &categories, float *max_cat, HistogramCuts *cuts); + namespace quantile_test { class QuantileSummaryTest : public ::testing::TestWithParam {}; class QuantileContainerTest : public ::testing::TestWithParam {}; @@ -447,4 +451,48 @@ TEST(Quantile, ColumnSplit) { collective::TestDistributedGlobal(4, [&] { DoPropertyColumnSplitQuantile(kRows, kCols); }); } +TEST(Quantile, AddCategoriesSparseCodes) { + HistogramCuts cuts{1}; + std::set categories{0.f, 1.f, 2.f, 87.f, 2526058.f}; + float max_cat{-1.f}; + AddCategories(categories, &max_cat, &cuts); + + auto const& cut_values = cuts.cut_values_.HostVector(); + ASSERT_EQ(cut_values.size(), categories.size()); + EXPECT_FLOAT_EQ(max_cat, 2526058.f); + + std::vector expected(categories.cbegin(), categories.cend()); + for (std::size_t i = 0; i < expected.size(); ++i) { + EXPECT_FLOAT_EQ(cut_values[i], expected[i]) << "entry=" << i; + } +} + +TEST(Quantile, AddCategoriesDenseCodesUnchanged) { + HistogramCuts cuts{1}; + std::set categories{0.f, 1.f, 2.f, 3.f}; + float max_cat{-1.f}; + AddCategories(categories, &max_cat, &cuts); + + auto const& cut_values = cuts.cut_values_.HostVector(); + ASSERT_EQ(cut_values.size(), 4u); + EXPECT_FLOAT_EQ(max_cat, 3.f); + for (bst_cat_t i = 0; i < 4; ++i) { + EXPECT_FLOAT_EQ(cut_values[i], static_cast(i)); + } +} + +TEST(Quantile, AddCategoriesEmptyInputBranch) { + HistogramCuts cuts{1}; + std::set categories; + float max_cat{-1.f}; + AddCategories(categories, &max_cat, &cuts); + + auto const& cut_values = cuts.cut_values_.HostVector(); + ASSERT_EQ(cut_values.size(), 1u); + EXPECT_FLOAT_EQ(cut_values[0], 0.f); + // empty branch must update max_cat to 0.0f so MaxCategory()+1 in downstream sizing + // (gpu_hist/evaluator.cu) never sees the -1.f sentinel + EXPECT_FLOAT_EQ(max_cat, 0.f); +} + } // namespace xgboost::common diff --git a/tests/cpp/data/test_cat_container.cc b/tests/cpp/data/test_cat_container.cc index f59db82872ad..edf414e4a6dc 100644 --- a/tests/cpp/data/test_cat_container.cc +++ b/tests/cpp/data/test_cat_container.cc @@ -4,9 +4,21 @@ #include "test_cat_container.h" +#include // for dmlc::Error #include +#include // for array +#include // for int32_t, uint64_t +#include // for vector + +#include "../../../src/collective/allreduce.h" // for Allreduce +#include "../../../src/data/cat_container.h" +#include "../../../src/data/cat_container_hash.h" // for HashCatHostContent +#include "../collective/test_worker.h" // for TestDistributedGlobal #include "../encoder/df_mock.h" +#include "xgboost/collective/result.h" // for SafeColl +#include "xgboost/json.h" // for Json +#include "xgboost/linalg.h" // for MakeVec namespace xgboost { using DfTest = enc::cpu_impl::DfTest; @@ -24,4 +36,254 @@ TEST(CatContainer, Mixed) { Context ctx; TestCatContainerMixed(&ctx, eq_check); } + +namespace { +struct StrColumnForTest { + std::vector offsets; + std::vector values; + std::vector columns; + std::vector feature_segments; + + StrColumnForTest(std::vector o, std::vector v, + std::vector seg) + : offsets{std::move(o)}, values{std::move(v)}, feature_segments{std::move(seg)} {} + + enc::HostColumnsView AsView() { + columns.clear(); + columns.emplace_back(enc::CatStrArrayView{common::Span{offsets}, common::Span{values}}); + return enc::HostColumnsView{common::Span{columns}, common::Span{feature_segments}, + feature_segments.empty() ? 0 : feature_segments.back()}; + } +}; +} // namespace + +TEST(CatContainer, ValidatesGoodStrOffsets) { + // two strings: "ab" (offset 0..2) and "cde" (offset 2..5); offsets.back() == values.size() + StrColumnForTest fx{std::vector{0, 2, 5}, + std::vector{'a', 'b', 'c', 'd', 'e'}, + std::vector{0, 2}}; + auto view = fx.AsView(); + auto build = [&] { CatContainer cats{view, /*is_ref=*/false}; }; + EXPECT_NO_THROW(build()); +} + +TEST(CatContainer, RejectsOverrunStrOffsets) { + // last offset 6 exceeds values.size()=5; tail-overrun must be rejected + StrColumnForTest fx{std::vector{0, 2, 6}, + std::vector{'a', 'b', 'c', 'd', 'e'}, + std::vector{0, 2}}; + auto view = fx.AsView(); + auto build = [&] { CatContainer cats{view, /*is_ref=*/false}; }; + EXPECT_THROW(build(), dmlc::Error); +} + +TEST(CatContainer, RejectsNonZeroFirstOffset) { + // arrow invariant offsets[0]==0; non-zero first offset corrupts SortNames substring + StrColumnForTest fx{std::vector{1, 2, 5}, + std::vector{'a', 'b', 'c', 'd', 'e'}, + std::vector{0, 2}}; + auto view = fx.AsView(); + auto build = [&] { CatContainer cats{view, /*is_ref=*/false}; }; + EXPECT_THROW(build(), dmlc::Error); +} + +TEST(CatContainer, RejectsNegativeOffset) { + // negative int32 offset casts to huge size_t; per-offset non-negativity catches it + StrColumnForTest fx{std::vector{0, -1, 5}, + std::vector{'a', 'b', 'c', 'd', 'e'}, + std::vector{0, 2}}; + auto view = fx.AsView(); + auto build = [&] { CatContainer cats{view, /*is_ref=*/false}; }; + EXPECT_THROW(build(), dmlc::Error); +} + +TEST(CatContainer, RejectsNonMonotonicOffset) { + // non-monotonic offsets[1]=3 > offsets[2]=2; tail-bound passes but substring + // (r_beg=3, r_end=2) yields an OOB unsigned length in SortNames + StrColumnForTest fx{std::vector{0, 3, 2, 5}, + std::vector{'a', 'b', 'c', 'd', 'e'}, + std::vector{0, 3}}; + auto view = fx.AsView(); + auto build = [&] { CatContainer cats{view, /*is_ref=*/false}; }; + EXPECT_THROW(build(), dmlc::Error); +} + +TEST(CatContainer, SelfCopyReturnsAndPreservesContent) { + // a.Copy(&ctx, a) must early-return before scoped_lock{a, a, a}; re-locking a + // non-recursive mutex is UB so the failure mode is unspecified; assert content + // preservation as the observable witness + StrColumnForTest fx{std::vector{0, 2, 5}, + std::vector{'a', 'b', 'c', 'd', 'e'}, + std::vector{0, 2}}; + auto view = fx.AsView(); + CatContainer cats{view, /*is_ref=*/false}; + Context ctx; + cats.Copy(&ctx, cats); + ASSERT_EQ(cats.NumCatsTotal(), 2u); +} + +// SortIsIdempotent and SelfCopyReturnsAndPreservesContent are CPU-only by design: +// both exercise the shared sort_mu_ + sorted_ idempotency logic in CatContainer; the +// CUDA-build sibling code path goes through the same private members (verified by +// the existing CatContainer.ThreadSafety GPU test which exercises Sort()). +TEST(CatContainer, SortIsIdempotent) { + // calling Sort twice on the same container must yield the same sorted_idx_ on the + // second call; short-circuit via the sorted_ flag guarded by sort_mu_ + StrColumnForTest fx{std::vector{0, 1, 3, 5}, + std::vector{'c', 'a', 'b', 'd', 'e'}, + std::vector{0, 3}}; + auto view = fx.AsView(); + CatContainer cats{view, /*is_ref=*/false}; + Context ctx; + cats.Sort(&ctx); + std::vector idx_first; + { + auto span = cats.RefSortedIndex(&ctx); + idx_first.assign(span.cbegin(), span.cend()); + } + cats.Sort(&ctx); + std::vector idx_second; + { + auto span = cats.RefSortedIndex(&ctx); + idx_second.assign(span.cbegin(), span.cend()); + } + ASSERT_EQ(idx_first, idx_second); +} + +TEST(CatContainer, SaveLoadIsRefAndSortedKeysPersist) { + // Save writes is_ref+sorted as JSON booleans; Load reads them back. Direct JSON + // inspection on Save proves the keys land; re-Save on the loaded container with + // identical-key inspection proves Load read both flags into the destination. + StrColumnForTest fx{std::vector{0, 1, 3, 5}, + std::vector{'c', 'a', 'b', 'd', 'e'}, + std::vector{0, 3}}; + auto view = fx.AsView(); + CatContainer src{view, /*is_ref=*/true}; + Context ctx; + src.Sort(&ctx); + Json saved{Object{}}; + src.Save(&saved); + + // Save side: both flags appear in JSON with the right value + auto const& obj = get(saved); + ASSERT_TRUE(obj.find("is_ref") != obj.cend()); + ASSERT_TRUE(obj.find("sorted") != obj.cend()); + ASSERT_TRUE(get(obj.at("is_ref"))); + ASSERT_TRUE(get(obj.at("sorted"))); + + // Load side: is_ref restored (NeedRecode reflects the flag); re-Save round-trip + // on dst proves Load also wrote sorted_ to the destination, since dst.Save() + // serialises the in-memory sorted_ field + CatContainer dst; + dst.Load(saved); + ASSERT_EQ(dst.NumCatsTotal(), src.NumCatsTotal()); + ASSERT_FALSE(dst.NeedRecode()); + + Json resaved{Object{}}; + dst.Save(&resaved); + auto const& robj = get(resaved); + ASSERT_TRUE(get(robj.at("is_ref"))); + ASSERT_TRUE(get(robj.at("sorted"))); +} + +TEST(CatContainer, LoadBackCompatIsRefDefaultsFalse) { + // pre-PR JSON lacks is_ref; Load must default to false so NeedRecode() fires at + // predict (the safe value -- a stale is_ref=true would skip required recoding) + StrColumnForTest fx{std::vector{0, 1, 3, 5}, + std::vector{'c', 'a', 'b', 'd', 'e'}, + std::vector{0, 3}}; + auto view = fx.AsView(); + CatContainer src{view, /*is_ref=*/true}; + Json saved{Object{}}; + src.Save(&saved); + // simulate pre-PR JSON by removing the new keys + auto& obj = get(saved); + obj.erase("is_ref"); + obj.erase("sorted"); + + CatContainer dst; + dst.Load(saved); + ASSERT_TRUE(dst.NeedRecode()); +} + +TEST(CatContainerHash, DistinguishesIdenticalVsDivergentContent) { + // identical inputs hash equal; divergent inputs hash differ (the distributed + // consistency check in QuantileDMatrix construction relies on this contract) + StrColumnForTest fx_a{std::vector{0, 2, 5}, + std::vector{'a', 'b', 'c', 'd', 'e'}, + std::vector{0, 2}}; + StrColumnForTest fx_a_copy{std::vector{0, 2, 5}, + std::vector{'a', 'b', 'c', 'd', 'e'}, + std::vector{0, 2}}; + StrColumnForTest fx_b{std::vector{0, 2, 5}, + std::vector{'x', 'b', 'c', 'd', 'e'}, + std::vector{0, 2}}; + + auto hash_a = data::HashCatHostContent(fx_a.AsView()); + auto hash_a_copy = data::HashCatHostContent(fx_a_copy.AsView()); + auto hash_b = data::HashCatHostContent(fx_b.AsView()); + ASSERT_EQ(hash_a, hash_a_copy); + ASSERT_NE(hash_a, hash_b); +} + +TEST(CatContainerHash, DistributedDivergentDictsAllreduceDiffer) { + // two workers with byte-divergent dictionaries; after kMin+kMax Allreduce on the + // dual-component digest, lo != hi on at least one component, firing the production + // CHECK_EQ in quantile_dmatrix.{cc,cu} + constexpr std::int32_t kWorkers = 2; + collective::TestDistributedGlobal(kWorkers, [] { + auto rank = collective::GetRank(); + // rank 0 sees dictionary "ab"/"cde"; rank 1 sees "xy"/"cde" (first 2 bytes differ) + std::vector values_r0{'a', 'b', 'c', 'd', 'e'}; + std::vector values_r1{'x', 'y', 'c', 'd', 'e'}; + std::vector offsets{0, 2, 5}; + std::vector segs{0, 2}; + std::vector columns; + columns.emplace_back(enc::CatStrArrayView{ + common::Span{offsets}, + common::Span{rank == 0 ? values_r0 : values_r1}}); + auto view = enc::HostColumnsView{common::Span{columns}, common::Span{segs}, 2}; + + auto digest = data::HashCatHostContent(view); + // mirror the packed-Allreduce shape used by quantile_dmatrix.{cc,cu} so a refactor + // of either side is caught here too + std::array hi{digest.primary, digest.secondary}; + std::array lo{digest.primary, digest.secondary}; + Context ctx; + collective::SafeColl( + collective::Allreduce(&ctx, linalg::MakeVec(lo.data(), 2), collective::Op::kMin)); + collective::SafeColl( + collective::Allreduce(&ctx, linalg::MakeVec(hi.data(), 2), collective::Op::kMax)); + // independent primes + seeds ensure both streams diverge on this divergent input; + // assert both to mirror the production pair of CHECK_EQ calls + ASSERT_NE(lo[0], hi[0]); + ASSERT_NE(lo[1], hi[1]); + }); +} + +TEST(CatContainerHash, DistributedAgreeingDictsAllreduceMatch) { + // positive sibling of DistributedDivergentDictsAllreduceDiffer: identical + // dictionaries on every worker yield equal lo/hi after kMin+kMax Allreduce, so + // the production CHECK_EQ pair does not fire + constexpr std::int32_t kWorkers = 2; + collective::TestDistributedGlobal(kWorkers, [] { + std::vector values{'a', 'b', 'c', 'd', 'e'}; + std::vector offsets{0, 2, 5}; + std::vector segs{0, 2}; + std::vector columns; + columns.emplace_back(enc::CatStrArrayView{common::Span{offsets}, common::Span{values}}); + auto view = enc::HostColumnsView{common::Span{columns}, common::Span{segs}, 2}; + + auto digest = data::HashCatHostContent(view); + std::array hi{digest.primary, digest.secondary}; + std::array lo{digest.primary, digest.secondary}; + Context ctx; + collective::SafeColl( + collective::Allreduce(&ctx, linalg::MakeVec(lo.data(), 2), collective::Op::kMin)); + collective::SafeColl( + collective::Allreduce(&ctx, linalg::MakeVec(hi.data(), 2), collective::Op::kMax)); + ASSERT_EQ(lo[0], hi[0]); + ASSERT_EQ(lo[1], hi[1]); + }); +} } // namespace xgboost diff --git a/tests/python-gpu/test_gpu_with_polars.py b/tests/python-gpu/test_gpu_with_polars.py new file mode 100644 index 000000000000..7dff833b8c8a --- /dev/null +++ b/tests/python-gpu/test_gpu_with_polars.py @@ -0,0 +1,145 @@ +"""GPU-side polars Categorical regressions for the sparse-codes fix. + +Auto-skipped when polars is not installed; the surrounding tests/python-gpu/ suite +already requires CUDA, so the CPU<->GPU training-metric parity check below will +exercise both paths through the same dictionary. +""" + +import os +import time +from typing import Any + +import numpy as np +import pytest + +import xgboost as xgb + +pl = pytest.importorskip("polars") + + +def _make_sparse_categorical_df( + rng: np.random.Generator, n_rows: int, n_real_cats: int, primer_size: int +) -> Any: + """Build a polars Categorical with codes that land at sparse positions. + + Primes the StringCache with `primer_size` strings before constructing the real + feature so the actual categorical codes start at offset >= primer_size, matching + the sparse-codes reproducer. + """ + with pl.StringCache(): + # bind the primer Series to a local so its StringCache entries outlive it + primer = pl.Series( + "primer", [f"primer_{i}" for i in range(primer_size)], dtype=pl.Categorical + ) + _unused = primer + cats = [f"cat_{i:02d}" for i in range(n_real_cats)] + col = pl.Series("f0", rng.choice(cats, size=n_rows), dtype=pl.Categorical) + return pl.DataFrame({"f0": col, "f1": rng.normal(size=n_rows)}) + + +def test_categorical_sparse_codes_cpu_gpu_parity() -> None: + """CPU vs GPU training-metric parity on a polars Categorical with sparse codes. + + A single QuantileDMatrix is consumed by both `device=cpu` and `device=cuda` + train calls; the per-iteration train RMSE must match within rtol=1e-2 (the + tolerance band used by tests/python-gpu/test_gpu_updaters.py::test_gpu_hist). + A regression in the categorical bitfield sizing or split path misroutes some + fraction of categories and breaks the bound. + """ + rng = np.random.default_rng(2030) + n_rows = 1024 + with pl.StringCache(): + df = _make_sparse_categorical_df( + rng, n_rows=n_rows, n_real_cats=12, primer_size=20_000 + ) + y_floats = rng.normal(size=n_rows).astype(np.float32) + + params = { + "tree_method": "hist", + "objective": "reg:squarederror", + "max_depth": 6, + "eta": 0.1, + "seed": 0, + } + n_rounds = 16 + + dtrain = xgb.QuantileDMatrix(df, y_floats, enable_categorical=True) + + # CPU train must run first; the device=cuda call below migrates cuts to the + # device and a subsequent CPU train on the same DMatrix would migrate back + cpu_eval: dict = {} + xgb.train( + {**params, "device": "cpu"}, + dtrain, + num_boost_round=n_rounds, + evals=[(dtrain, "train")], + evals_result=cpu_eval, + verbose_eval=False, + ) + + gpu_eval: dict = {} + xgb.train( + {**params, "device": "cuda"}, + dtrain, + num_boost_round=n_rounds, + evals=[(dtrain, "train")], + evals_result=gpu_eval, + verbose_eval=False, + ) + + cpu_rmse = np.asarray(cpu_eval["train"]["rmse"], dtype=np.float64) + gpu_rmse = np.asarray(gpu_eval["train"]["rmse"], dtype=np.float64) + assert cpu_rmse.shape == gpu_rmse.shape == (n_rounds,) + assert np.isfinite(cpu_rmse).all() + assert np.isfinite(gpu_rmse).all() + np.testing.assert_allclose(cpu_rmse, gpu_rmse, rtol=1e-2) + + +@pytest.mark.parametrize("primer_size", [1_000, 50_000, 500_000]) +def test_categorical_sparse_codes_gpu_bench( + primer_size: int, capsys: pytest.CaptureFixture[str] +) -> None: + """Times xgb.train(device=cuda) under a sparse-codes dictionary. + + QuantileDMatrix from a host polars df runs CPU-side construction; the GPU hot + path is engaged only at train time when cut values reach device memory. The + env-gated assertion catches an O(primer_size) cut-layout regression. + """ + rng = np.random.default_rng(2032) + n_real_cats = 16 + n_rows = 2048 + with pl.StringCache(): + primer = pl.Series( + "primer", + [f"primer_{i}" for i in range(primer_size)], + dtype=pl.Categorical, + ) + _unused = primer + cats = [f"cat_{i:02d}" for i in range(n_real_cats)] + df = pl.DataFrame( + {"f0": pl.Series("f0", rng.choice(cats, size=n_rows), dtype=pl.Categorical)} + ) + y = rng.integers(0, 2, size=n_rows) + dm = xgb.QuantileDMatrix(df, y, enable_categorical=True) + t0 = time.perf_counter() + booster = xgb.train( + {"tree_method": "hist", "device": "cuda", "objective": "binary:logistic"}, + dm, + num_boost_round=1, + ) + gpu_train_t = time.perf_counter() - t0 + + # explicit env gate keeps CI green on contended runners; with the gate set, a + # regression that re-introduces O(primer_size) cut-value materialisation pushes + # train well past 10s on this primer range + if os.environ.get("XGB_PERF_ASSERT") == "1": + assert gpu_train_t < 10.0, ( + f"gpu_train_t={gpu_train_t:.3f}s for primer={primer_size}" + ) + pred = booster.inplace_predict(df) + assert np.isfinite(pred).all() + with capsys.disabled(): + print( + f"[sparse_codes_gpu_bench] primer_size={primer_size:>7d} " + f"n_real_cats={n_real_cats} gpu_train_s={gpu_train_t:.4f}" + ) diff --git a/tests/python/test_with_polars.py b/tests/python/test_with_polars.py index 14f0a6e2fa1d..9afcc15ce96c 100644 --- a/tests/python/test_with_polars.py +++ b/tests/python/test_with_polars.py @@ -1,6 +1,8 @@ """Copyright 2024, XGBoost contributors""" import json +import os +import time from pathlib import Path from typing import Type, Union @@ -182,3 +184,248 @@ def test_categorical() -> None: != df_rev["f1"].cat.get_categories().to_list() ) np.testing.assert_allclose(predt_0, predt_1) + + +def test_categorical_sparse_codes() -> None: + """Regression test for AddCategories over-allocation with sparse dictionary codes. + + A polars Categorical built against a pre-populated global StringCache holds a few + unique strings at very large physical codes. Asserts prediction parity against an + unprimed baseline so a silent dictionary-misalignment bug surfaces here rather than + via the platform-specific heap-corruption symptom. + """ + cats = [f"cat_{i}" for i in range(16)] + n_rows = 2048 + rng = np.random.default_rng(2026) + cat_choices = rng.choice(cats, size=n_rows) + f0 = rng.normal(size=n_rows) + y = rng.normal(size=n_rows) + train_params = {"tree_method": "hist", "seed": 0} + + with pl.StringCache(): + primer = pl.Series( + "primer", [f"primer_{i}" for i in range(200_000)], dtype=pl.Categorical + ) + _unused = primer + df_primed = pl.DataFrame( + {"f0": f0, "f1": pl.Series("f1", cat_choices, dtype=pl.Categorical)} + ) + Xy_primed = xgb.QuantileDMatrix(df_primed, y, enable_categorical=True) + booster_primed = xgb.train(train_params, Xy_primed, num_boost_round=4) + predt_primed = booster_primed.inplace_predict(df_primed) + + with pl.StringCache(): + df_baseline = pl.DataFrame( + {"f0": f0, "f1": pl.Series("f1", cat_choices, dtype=pl.Categorical)} + ) + Xy_baseline = xgb.QuantileDMatrix(df_baseline, y, enable_categorical=True) + booster_baseline = xgb.train(train_params, Xy_baseline, num_boost_round=4) + predt_baseline = booster_baseline.inplace_predict(df_baseline) + + assert predt_primed.shape == (n_rows,) + assert np.isfinite(predt_primed).all() + # primed (sparse codes ~200k+) and baseline (codes 0..15) carry the same logical + # rows; predictions must match -- a deviation means dictionary alignment is off + np.testing.assert_allclose(predt_primed, predt_baseline, rtol=1e-6, atol=1e-6) + + +def test_categorical_model_save_load_roundtrip(tmp_path: Path) -> None: + """Save/load round-trip for a model trained on sparse polars Categorical codes. + + Regression safety net for the CatBitField / cut_values / MaxNumBinPerFeat cleanup: the + serialized tree splits store categorical bit fields sized by observed physical codes, and + this test guarantees that a model saved by the current code reloads and predicts identically. + """ + rng = np.random.default_rng(2027) + with pl.StringCache(): + # bind the primer Series to a local so its StringCache entries outlive it + primer = pl.Series( + "primer", [f"primer_{i}" for i in range(20_000)], dtype=pl.Categorical + ) + _unused = primer + cats = [f"cat_{i:02d}" for i in range(12)] + col = pl.Series("f0", rng.choice(cats, size=1024), dtype=pl.Categorical) + df = pl.DataFrame({"f0": col, "f1": rng.normal(size=col.len())}) + y = rng.integers(0, 2, size=col.len()) + + dtrain = xgb.QuantileDMatrix(df, y, enable_categorical=True) + booster = xgb.train( + {"tree_method": "hist", "objective": "binary:logistic"}, + dtrain, + num_boost_round=6, + ) + pred_before = booster.inplace_predict(df) + + model_path = tmp_path / "booster.ubj" + booster.save_model(str(model_path)) + + booster2 = xgb.Booster() + booster2.load_model(str(model_path)) + pred_after = booster2.inplace_predict(df) + + np.testing.assert_allclose(pred_before, pred_after, rtol=1e-6, atol=1e-6) + + +def test_categorical_many_eval_sets_share_ref() -> None: + """Prediction parity across multiple val DMatrices sharing one train as reference. + + Builds the booster once, then predicts on each val DMatrix built with ref=train + and cross-checks against inplace_predict on the raw polars DataFrame. A silent + alias failure (val DMatrix acquiring a fresh empty cats instead of the train's + dictionary) would diverge here. + """ + rng = np.random.default_rng(2028) + with pl.StringCache(): + cats = [f"cat_{i:02d}" for i in range(8)] + df_train = pl.DataFrame( + { + "f0": pl.Series("f0", rng.choice(cats, size=512), dtype=pl.Categorical), + "f1": rng.normal(size=512), + } + ) + df_vals = [ + pl.DataFrame( + { + "f0": pl.Series("f0", rng.choice(cats, size=128), dtype=pl.Categorical), + "f1": rng.normal(size=128), + } + ) + for _ in range(4) + ] + y_tr = rng.integers(0, 2, size=512) + + train_dm = xgb.QuantileDMatrix(df_train, y_tr, enable_categorical=True) + booster = xgb.train( + {"tree_method": "hist", "objective": "binary:logistic"}, + train_dm, + num_boost_round=4, + ) + + train_arrow = train_dm.get_categories(export_to_arrow=True).to_arrow() + for df_v in df_vals: + val_with_ref = xgb.QuantileDMatrix(df_v, ref=train_dm, enable_categorical=True) + val_arrow = val_with_ref.get_categories(export_to_arrow=True).to_arrow() + assert len(val_arrow) == len(train_arrow) + for (vn, va), (tn, ta) in zip(val_arrow, train_arrow): + assert vn == tn + if va is not None: + assert va.equals(ta) + pred_dm = booster.predict(val_with_ref) + pred_inplace = booster.inplace_predict(df_v) + assert pred_dm.shape == (128,) + assert np.isfinite(pred_dm).all() + np.testing.assert_allclose(pred_dm, pred_inplace, rtol=1e-6, atol=1e-7) + + +@pytest.mark.parametrize("dict_size", [1_000, 10_000, 100_000]) +def test_categorical_val_dmatrix_shares_ref_cats( + dict_size: int, capsys: pytest.CaptureFixture[str] +) -> None: + """Correctness + first-call observational timing for the ref-cats alias. + + Builds train and val QuantileDMatrices, trains a booster, and verifies prediction + parity against inplace_predict on the raw polars DataFrame. Emits per-dict-size + first-call val construction time via capsys.disabled(); timing is observational + only. + """ + rng = np.random.default_rng(2029) + with pl.StringCache(): + # bind the primer Series to a local so its StringCache entries outlive it + primer = pl.Series( + "primer", + [f"primer_{i}" for i in range(dict_size)], + dtype=pl.Categorical, + ) + _unused = primer + cats = [f"cat_{i:02d}" for i in range(16)] + train_df = pl.DataFrame( + {"f0": pl.Series("f0", rng.choice(cats, size=2048), dtype=pl.Categorical)} + ) + val_df = pl.DataFrame( + {"f0": pl.Series("f0", rng.choice(cats, size=512), dtype=pl.Categorical)} + ) + y_train = rng.integers(0, 2, size=2048) + + train_dm = xgb.QuantileDMatrix(train_df, y_train, enable_categorical=True) + booster = xgb.train( + {"tree_method": "hist", "objective": "binary:logistic"}, + train_dm, + num_boost_round=4, + ) + + t0 = time.perf_counter() + val_dm = xgb.QuantileDMatrix(val_df, ref=train_dm, enable_categorical=True) + val_t = time.perf_counter() - t0 + + assert val_dm.num_row() == 512 + assert val_dm.num_col() == 1 + + train_arrow = train_dm.get_categories(export_to_arrow=True).to_arrow() + val_arrow = val_dm.get_categories(export_to_arrow=True).to_arrow() + assert len(val_arrow) == len(train_arrow) + for (vn, va), (tn, ta) in zip(val_arrow, train_arrow): + assert vn == tn + if va is not None: + assert va.equals(ta) + + pred_dm = booster.predict(val_dm) + pred_inplace = booster.inplace_predict(val_df) + assert pred_dm.shape == (512,) + assert np.isfinite(pred_dm).all() + np.testing.assert_allclose(pred_dm, pred_inplace, rtol=1e-6, atol=1e-7) + + with capsys.disabled(): + print( + f"[categorical_val_dmatrix_ref_alias_bench] dict_size={dict_size} " + f"first_call_s={val_t:.4f}" + ) + + +@pytest.mark.parametrize("primer_size", [1_000, 50_000, 500_000]) +def test_categorical_sparse_codes_cpu_bench( + primer_size: int, capsys: pytest.CaptureFixture[str] +) -> None: + """Times QuantileDMatrix construction under a sparse-codes dictionary. + + Construction is the CPU-side hot path. The wall-clock budget is observational; + the env-gated assertion catches an O(primer_size) cut-layout regression. + """ + rng = np.random.default_rng(2031) + n_real_cats = 16 + n_rows = 2048 + with pl.StringCache(): + primer = pl.Series( + "primer", + [f"primer_{i}" for i in range(primer_size)], + dtype=pl.Categorical, + ) + _unused = primer + cats = [f"cat_{i:02d}" for i in range(n_real_cats)] + df = pl.DataFrame( + {"f0": pl.Series("f0", rng.choice(cats, size=n_rows), dtype=pl.Categorical)} + ) + y = rng.integers(0, 2, size=n_rows) + + t0 = time.perf_counter() + dm = xgb.QuantileDMatrix(df, y, enable_categorical=True) + construct_t = time.perf_counter() - t0 + + assert dm.num_row() == n_rows + assert dm.num_col() == 1 + # explicit env gate keeps CI green on contended runners; with the gate set, a + # regression that re-introduces O(primer_size) cut materialisation pushes + # construct well past 10s on this primer range + if os.environ.get("XGB_PERF_ASSERT") == "1": + assert construct_t < 10.0, ( + f"construct_t={construct_t:.3f}s for primer={primer_size}" + ) + booster = xgb.train( + {"tree_method": "hist", "objective": "binary:logistic"}, dm, num_boost_round=1 + ) + pred = booster.inplace_predict(df) + assert np.isfinite(pred).all() + with capsys.disabled(): + print( + f"[sparse_codes_cpu_bench] primer_size={primer_size:>7d} " + f"n_real_cats={n_real_cats} construct_s={construct_t:.4f}" + ) diff --git a/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py b/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py index 118c95a39676..59f071b2704b 100644 --- a/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py +++ b/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py @@ -589,6 +589,37 @@ def test_categorical(tmp_path: Path, local_cuda_client: Client) -> None: run_categorical(local_cuda_client, "hist", "cuda", X, X_onehot, y, tmp_path) +@pytest.mark.skipif(**tm.no_dask_cudf()) +def test_categorical_ref_quantile_dmatrix(local_cuda_client: Client) -> None: + """GPU mirror of tests/test_distributed/test_with_dask::test_categorical_ref_quantile_dmatrix. + + Smoke-test that ref-cats-aliased DaskQuantileDMatrix construction + GPU train + succeeds end-to-end. The cross-worker content-hash Allreduce runs as a side effect + of construction; the negative path is covered by the C++ CatContainerHash suite. + """ + X, y = make_categorical(local_cuda_client, 3000, 16, 4) + X = X.to_backend("cudf") + X_val, y_val = make_categorical(local_cuda_client, 1000, 16, 4) + X_val = X_val.to_backend("cudf") + + train = dxgb.DaskQuantileDMatrix(local_cuda_client, X, y, enable_categorical=True) + valid = dxgb.DaskQuantileDMatrix( + local_cuda_client, X_val, y_val, ref=train, enable_categorical=True + ) + out = dxgb.train( + local_cuda_client, + {"tree_method": "hist", "device": "cuda", "seed": 0}, + train, + num_boost_round=8, + evals=[(train, "Train"), (valid, "Valid")], + ) + train_rmse = np.asarray(out["history"]["Train"]["rmse"], dtype=np.float64) + valid_rmse = np.asarray(out["history"]["Valid"]["rmse"], dtype=np.float64) + assert np.isfinite(train_rmse).all() and np.isfinite(valid_rmse).all() + assert train_rmse[-1] / train_rmse[0] < 0.5, train_rmse.tolist() + assert valid_rmse[-1] / valid_rmse[0] < 0.85, valid_rmse.tolist() + + @pytest.mark.skipif(**tm.no_dask_cudf()) def test_recode(local_cuda_client: Client) -> None: with dask.config.set( diff --git a/tests/test_distributed/test_with_dask/test_with_dask.py b/tests/test_distributed/test_with_dask/test_with_dask.py index b0b8d521bc71..e8e80849c491 100644 --- a/tests/test_distributed/test_with_dask/test_with_dask.py +++ b/tests/test_distributed/test_with_dask/test_with_dask.py @@ -297,6 +297,35 @@ def test_categorical(client: "Client", tmp_path: Path) -> None: assert reg.get_booster().feature_types == ft +def test_categorical_ref_quantile_dmatrix(client: "Client") -> None: + """Smoke-test that ref-cats-aliased QuantileDMatrix construction + train succeeds + end-to-end in distributed mode. The cross-worker content-hash Allreduce runs as a + side effect of construction; its negative path is covered by the C++ + CatContainerHash suite under a multi-process gtest harness. + """ + X, y = make_categorical(client, 3000, 16, 4) + X_val, y_val = make_categorical(client, 1000, 16, 4) + + train = dxgb.DaskQuantileDMatrix(client, X, y, enable_categorical=True) + valid = dxgb.DaskQuantileDMatrix( + client, X_val, y_val, ref=train, enable_categorical=True + ) + out = dxgb.train( + client, + {"tree_method": "hist", "seed": 0}, + train, + num_boost_round=8, + evals=[(train, "Train"), (valid, "Valid")], + ) + train_rmse = np.asarray(out["history"]["Train"]["rmse"], dtype=np.float64) + valid_rmse = np.asarray(out["history"]["Valid"]["rmse"], dtype=np.float64) + assert np.isfinite(train_rmse).all() and np.isfinite(valid_rmse).all() + # use ratio form so a future synthetic-data tweak that pushes initial RMSE near + # zero does not silently make the bound trivially true + assert train_rmse[-1] / train_rmse[0] < 0.5, train_rmse.tolist() + assert valid_rmse[-1] / valid_rmse[0] < 0.85, valid_rmse.tolist() + + def test_recode(client: "Client") -> None: run_recode(client, "cpu")