-
Notifications
You must be signed in to change notification settings - Fork 711
TPCH round 2 (work in progress) #3128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
766b629
0aa6b7b
bc0c28e
6e944c1
a6a445d
f41bd1c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |
| #include <duckdb.hpp> | ||
| #include <duckdb/catalog/catalog.hpp> | ||
| #include <duckdb/common/types/uuid.hpp> | ||
| #include <duckdb/common/types/vector_buffer.hpp> | ||
| #include <duckdb/parser/parsed_data/create_table_function_info.hpp> | ||
| #include <duckdb/planner/expression/bound_conjunction_expression.hpp> | ||
| #include <duckdb/planner/expression/bound_reference_expression.hpp> | ||
|
|
@@ -22,8 +23,76 @@ | |
| #include <heimdall_common/filtered_dataset.hpp> | ||
| #include <query_core/index_holder.hpp> | ||
|
|
||
| #include <chrono> | ||
|
|
||
| namespace { | ||
|
|
||
| // STRING_PROFILING: Enable detailed timing for string operations | ||
| // Uncomment the following line to enable string operation profiling: | ||
| // #define DEEPLAKE_STRING_PROFILING 1 | ||
|
|
||
| #ifdef DEEPLAKE_STRING_PROFILING | ||
| // Timing guard for profiling string operations in TPC-H queries. | ||
| // Measures time spent in StringVector operations vs get_range_data. | ||
| struct string_timing_guard | ||
| { | ||
| const char* name; | ||
| std::chrono::high_resolution_clock::time_point start; | ||
| static inline thread_local uint64_t total_string_bytes = 0; | ||
| static inline thread_local uint64_t total_string_count = 0; | ||
| static inline thread_local uint64_t total_batch_count = 0; | ||
|
|
||
| explicit string_timing_guard(const char* n) | ||
| : name(n) | ||
| , start(std::chrono::high_resolution_clock::now()) | ||
| { | ||
| } | ||
|
|
||
| ~string_timing_guard() | ||
| { | ||
| auto end = std::chrono::high_resolution_clock::now(); | ||
| auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count(); | ||
| // Log timing data periodically (every 1000 batches) | ||
| if (total_batch_count % 1000 == 0 && total_batch_count > 0) { | ||
| elog(DEBUG1, | ||
| "String profiling [%s]: batch=%lu strings=%lu bytes=%lu time=%ldns", | ||
| name, | ||
| total_batch_count, | ||
| total_string_count, | ||
| total_string_bytes, | ||
| ns); | ||
| } | ||
| } | ||
|
|
||
| static void record_string_batch(uint64_t string_count, uint64_t total_bytes) | ||
| { | ||
| total_string_count += string_count; | ||
| total_string_bytes += total_bytes; | ||
| total_batch_count++; | ||
| } | ||
| }; | ||
| #define STRING_TIMING_GUARD(name) string_timing_guard _timing_guard_##__LINE__(name) | ||
| #define STRING_RECORD_BATCH(count, bytes) string_timing_guard::record_string_batch(count, bytes) | ||
| #else | ||
| #define STRING_TIMING_GUARD(name) (void)0 | ||
| #define STRING_RECORD_BATCH(count, bytes) (void)0 | ||
| #endif | ||
|
|
||
| // VectorBuffer wrapper that keeps a nd::array alive for zero-copy string access. | ||
| // This allows DuckDB to reference string data directly from our buffers without copying. | ||
| class DeeplakeStringBuffer : public duckdb::VectorBuffer | ||
| { | ||
| public: | ||
| explicit DeeplakeStringBuffer(nd::array&& arr) | ||
| : duckdb::VectorBuffer(duckdb::VectorBufferType::OPAQUE_BUFFER) | ||
| , array_(std::move(arr)) | ||
| { | ||
| } | ||
|
|
||
| private: | ||
| nd::array array_; | ||
| }; | ||
|
|
||
| struct deeplake_scan_bind_data final : public duckdb::TableFunctionData | ||
| { | ||
| pg::table_data& table_data; | ||
|
|
@@ -49,6 +118,13 @@ struct deeplake_scan_global_state final : public duckdb::GlobalTableFunctionStat | |
| heimdall::dataset_view_ptr index_search_result; | ||
| std::atomic<int64_t> current_row = 0; | ||
|
|
||
| // OPTIMIZATION: Cache column dtypes to avoid get_column_view() shared_ptr overhead in hot path. | ||
| // Populated once during init, used in set_streaming_column_output for nd::switch_dtype. | ||
| std::vector<nd::dtype> column_dtypes; | ||
|
|
||
| // OPTIMIZATION: Cache total row count to avoid virtual function calls in hot path. | ||
| int64_t cached_num_rows = 0; | ||
|
|
||
| idx_t MaxThreads() const override | ||
| { | ||
| return std::min(base::system_report::cpu_cores(), pg::max_num_threads_for_global_state); | ||
|
|
@@ -304,10 +380,25 @@ duckdb::unique_ptr<duckdb::GlobalTableFunctionState> deeplake_scan_init_global(d | |
| duckdb::TableFunctionInitInput& input) | ||
| { | ||
| auto& bind_data = input.bind_data->Cast<deeplake_scan_bind_data>(); | ||
| const auto& td = bind_data.table_data; | ||
| auto& td = bind_data.table_data; | ||
| auto r = duckdb::make_uniq<deeplake_scan_global_state>(); | ||
| r->column_ids = input.column_ids; | ||
|
|
||
| // OPTIMIZATION: Cache row count once to avoid repeated virtual calls during scan. | ||
| r->cached_num_rows = td.num_rows(); | ||
|
|
||
| // OPTIMIZATION: Cache column dtypes to avoid get_column_view() overhead in hot path. | ||
| // This eliminates shared_ptr atomic operations per chunk per column. | ||
| r->column_dtypes.reserve(input.column_ids.size()); | ||
| for (const auto col_idx : input.column_ids) { | ||
| if (td.is_column_requested(col_idx)) { | ||
| auto col_view = td.get_column_view(col_idx); | ||
| r->column_dtypes.push_back(col_view->dtype()); | ||
| } else { | ||
| r->column_dtypes.push_back(nd::dtype::unknown); | ||
| } | ||
| } | ||
|
|
||
| if (input.filters) { | ||
| duckdb::vector<duckdb::unique_ptr<duckdb::Expression>> filter_exprs; | ||
| for (auto& [output_col_idx, filter] : input.filters->filters) { | ||
|
|
@@ -458,16 +549,56 @@ class deeplake_scan_function_helper | |
| { | ||
| ASSERT(samples.dtype() == nd::dtype::string); | ||
| auto& output_vector = output_.data[output_column_id]; | ||
| auto* duckdb_data = duckdb::FlatVector::GetData<duckdb::string_t>(output_vector); | ||
| pg::impl::string_stream_array_holder string_holder(samples); | ||
| for (duckdb::idx_t row_in_batch = 0; row_in_batch < output_.size(); ++row_in_batch) { | ||
| auto* duckdb_data = duckdb::FlatVector::GetData<duckdb::string_t>(output_vector); | ||
| std::string_view value; | ||
| if (string_holder.is_valid()) { | ||
| value = string_holder.data(row_in_batch); | ||
|
|
||
| STRING_TIMING_GUARD("set_string_column_output"); | ||
|
|
||
| if (string_holder.is_valid()) { | ||
| // ZERO-COPY: Add the samples array as a buffer reference to keep data alive. | ||
| // This allows string_t to point directly into our buffer. | ||
| duckdb::StringVector::AddBuffer( | ||
| output_vector, | ||
| duckdb::make_buffer<DeeplakeStringBuffer>(nd::array(samples))); | ||
|
|
||
| // Construct string_t pointing directly to buffer data (zero-copy) | ||
| // OPTIMIZATION: Use bulk access path for single-chunk case (common for TPC-H) | ||
| if (string_holder.is_single_chunk()) [[likely]] { | ||
| // Bulk access: get raw buffer and offset array pointers | ||
| auto contiguous = string_holder.get_contiguous_strings(0); | ||
| const auto* buffer = contiguous.buffer; | ||
| const auto* offsets = contiguous.offsets; | ||
| const auto base_offset = contiguous.base_offset; | ||
| const auto start_idx = contiguous.start_index; | ||
| const auto batch_size = output_.size(); | ||
|
|
||
| // Record batch statistics for profiling | ||
| STRING_RECORD_BATCH(batch_size, string_holder.get_batch_total_bytes(0, batch_size)); | ||
|
|
||
| // Construct all string_t entries using direct buffer access | ||
| for (duckdb::idx_t row_in_batch = 0; row_in_batch < batch_size; ++row_in_batch) { | ||
| const auto local_idx = start_idx + row_in_batch; | ||
| const auto str_start = offsets[local_idx] - base_offset; | ||
| const auto str_end = offsets[local_idx + 1] - base_offset; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential buffer overrun: Accessing Add assertion or bounds check before the loop: |
||
| const auto len = static_cast<uint32_t>(str_end - str_start); | ||
| const auto* str_ptr = reinterpret_cast<const char*>(buffer + str_start); | ||
| duckdb_data[row_in_batch] = duckdb::string_t(str_ptr, len); | ||
| } | ||
| } else { | ||
| value = base::string_view_cast<const unsigned char>(samples[row_in_batch].data()); | ||
| for (duckdb::idx_t row_in_batch = 0; row_in_batch < output_.size(); ++row_in_batch) { | ||
| auto value = string_holder.data(row_in_batch); | ||
| const auto len = static_cast<uint32_t>(value.size()); | ||
| duckdb_data[row_in_batch] = duckdb::string_t(value.data(), len); | ||
| } | ||
| } | ||
| } else { | ||
| // Fallback path: need to copy since we don't have stable buffer | ||
| for (duckdb::idx_t row_in_batch = 0; row_in_batch < output_.size(); ++row_in_batch) { | ||
| auto value = base::string_view_cast<const unsigned char>(samples[row_in_batch].data()); | ||
| const auto len = static_cast<uint32_t>(value.size()); | ||
| duckdb_data[row_in_batch] = duckdb::StringVector::AddStringOrBlob( | ||
| output_vector, value.data(), len); | ||
| } | ||
| duckdb_data[row_in_batch] = add_string(output_vector, value.data(), value.size()); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -822,16 +953,20 @@ class deeplake_scan_function_helper | |
| auto& td = bind_data_.table_data; | ||
| auto& output_vector = output_.data[output_column_id]; | ||
|
|
||
| auto col_view = td.get_column_view(col_idx); | ||
| nd::switch_dtype(col_view->dtype(), [&]<typename T>() { | ||
| // OPTIMIZATION: Use cached dtype instead of calling get_column_view() each time. | ||
| // This eliminates shared_ptr atomic operations in the hot scan path. | ||
| const auto cached_dtype = global_state_.column_dtypes[output_column_id]; | ||
| nd::switch_dtype(cached_dtype, [&]<typename T>() { | ||
| if constexpr (std::is_arithmetic_v<T>) { | ||
| auto att_type = td.get_atttypid(col_idx); | ||
| auto* value_ptr = td.get_streamers().value_ptr<T>(col_idx, current_row); | ||
| if (att_type == VARCHAROID || att_type == CHAROID || att_type == BPCHAROID) { | ||
| // Single-character string columns: construct string_t directly without heap allocation. | ||
| // For len=1, string_t stores the character inline (no AddString needed). | ||
| auto* duckdb_data = duckdb::FlatVector::GetData<duckdb::string_t>(output_vector); | ||
| for (duckdb::idx_t row_in_batch = 0; row_in_batch < batch_size; ++row_in_batch) { | ||
| duckdb_data[row_in_batch] = | ||
| add_string(output_vector, reinterpret_cast<const char*>(value_ptr + row_in_batch), 1); | ||
| duckdb::string_t(reinterpret_cast<const char*>(value_ptr + row_in_batch), 1); | ||
| } | ||
| return; | ||
| } | ||
|
|
@@ -851,27 +986,75 @@ class deeplake_scan_function_helper | |
| } | ||
| } | ||
| } else { | ||
| for (duckdb::idx_t row_in_batch = 0; row_in_batch < batch_size; ++row_in_batch) { | ||
| const int64_t row_idx = current_row + row_in_batch; | ||
| auto value = td.get_streamers().value<std::string_view>(col_idx, row_idx); | ||
| // workaround. value is not always remain valid. Trying to make a copy as soon as possible. | ||
| // Most likely due to nd::array temporary object destruction. | ||
| std::string str_value(value); | ||
| if (is_uuid) { | ||
| // Treat empty string as NULL for UUID columns | ||
| if (str_value.empty()) { | ||
| // ZERO-COPY string path: the string data already exists in stable buffers | ||
| // maintained by the streamer. We construct string_t pointing directly to | ||
| // this data without copying. The batch owner array keeps the buffer alive. | ||
| // | ||
| // The streamer batches remain valid until streamers_.reset() is called, | ||
| // which only happens on commit/refresh. During query execution, the buffers | ||
| // are guaranteed to be alive. Since the streamer manages the lifetime, | ||
| // we don't need to add a buffer reference to DuckDB - the data is already | ||
| // stable for the duration of the query. | ||
| auto* duckdb_data = duckdb::FlatVector::GetData<duckdb::string_t>(output_vector); | ||
|
|
||
| // Get batch-level access | ||
| auto [holder, batch_start] = td.get_streamers().get_string_batch(col_idx, current_row); | ||
|
|
||
| if (is_uuid) { | ||
| // UUID columns: parse strings as UUIDs | ||
| auto* uuid_data = duckdb::FlatVector::GetData<duckdb::hugeint_t>(output_vector); | ||
| for (duckdb::idx_t row_in_batch = 0; row_in_batch < batch_size; ++row_in_batch) { | ||
| auto value = holder->data(batch_start + row_in_batch); | ||
| if (value.empty()) { | ||
| duckdb::FlatVector::SetNull(output_vector, row_in_batch, true); | ||
| } else { | ||
| duckdb::hugeint_t uuid_value; | ||
| if (!duckdb::UUID::FromString(str_value, uuid_value)) { | ||
| elog(ERROR, "Failed to parse UUID string: %s", str_value.c_str()); | ||
| if (!duckdb::UUID::FromString(std::string(value), uuid_value)) { | ||
| elog(ERROR, "Failed to parse UUID string: %.*s", | ||
| static_cast<int>(value.size()), value.data()); | ||
| } | ||
| auto* duckdb_data = duckdb::FlatVector::GetData<duckdb::hugeint_t>(output_vector); | ||
| duckdb_data[row_in_batch] = uuid_value; | ||
| uuid_data[row_in_batch] = uuid_value; | ||
| } | ||
| } | ||
| } else { | ||
| // ZERO-COPY: Construct string_t pointing directly to buffer data. | ||
| // For len <= 12: data is copied inline (no heap allocation needed) | ||
| // For len > 12: string_t stores pointer to our buffer (zero-copy) | ||
| // The streamer owns the batch data which remains stable during query execution. | ||
|
|
||
| STRING_TIMING_GUARD("string_column_output"); | ||
|
|
||
| // OPTIMIZATION: Use bulk access path for single-chunk case (common for TPC-H) | ||
| // This uses get_contiguous_strings() to access the raw buffer and offset arrays | ||
| // directly, enabling vectorized construction of string_t entries. | ||
| if (holder->is_single_chunk()) [[likely]] { | ||
| // Bulk access: get raw buffer and offset array pointers | ||
| auto contiguous = holder->get_contiguous_strings(batch_start); | ||
| const auto* buffer = contiguous.buffer; | ||
| const auto* offsets = contiguous.offsets; | ||
| const auto base_offset = contiguous.base_offset; | ||
| const auto start_idx = contiguous.start_index; | ||
|
|
||
| // Record batch statistics for profiling | ||
| STRING_RECORD_BATCH(batch_size, holder->get_batch_total_bytes(batch_start, batch_size)); | ||
|
|
||
| // Construct all string_t entries using direct buffer access | ||
| for (duckdb::idx_t row_in_batch = 0; row_in_batch < batch_size; ++row_in_batch) { | ||
| const auto local_idx = start_idx + row_in_batch; | ||
| const auto str_start = offsets[local_idx] - base_offset; | ||
| const auto str_end = offsets[local_idx + 1] - base_offset; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential buffer overrun (duplicate): Same issue as line 582 - accessing Add bounds check or assertion before the loop. |
||
| const auto len = static_cast<uint32_t>(str_end - str_start); | ||
| const auto* str_ptr = reinterpret_cast<const char*>(buffer + str_start); | ||
| duckdb_data[row_in_batch] = duckdb::string_t(str_ptr, len); | ||
| } | ||
| } else { | ||
| auto* duckdb_data = duckdb::FlatVector::GetData<duckdb::string_t>(output_vector); | ||
| duckdb_data[row_in_batch] = add_string(output_vector, str_value.data(), str_value.size()); | ||
| for (duckdb::idx_t row_in_batch = 0; row_in_batch < batch_size; ++row_in_batch) { | ||
| auto value = holder->data(batch_start + row_in_batch); | ||
| const auto len = static_cast<uint32_t>(value.size()); | ||
| // string_t constructor: for len <= 12, copies data inline; | ||
| // for len > 12, stores pointer + copies 4-byte prefix (zero-copy) | ||
| duckdb_data[row_in_batch] = duckdb::string_t(value.data(), len); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -914,12 +1097,14 @@ class deeplake_scan_function_helper | |
|
|
||
| int64_t next_chunk() | ||
| { | ||
| // OPTIMIZATION: Use cached row count for non-index-search case to avoid | ||
| // repeated virtual function calls through get_read_only_dataset()->num_rows(). | ||
| int64_t num_rows = -1; | ||
| if (has_index_search()) { | ||
| ASSERT(is_index_search_done()); | ||
| num_rows = global_state_.index_search_result->num_rows(); | ||
| } else { | ||
| num_rows = bind_data_.table_data.num_rows(); | ||
| num_rows = global_state_.cached_num_rows; | ||
| } | ||
|
|
||
| // Determine batch size (DuckDB's standard vector size is 2048) | ||
|
|
@@ -979,7 +1164,11 @@ class deeplake_scan_function_helper | |
| column_promises.emplace_back(request_range_and_set_column_output(cv, i, current_row)); | ||
| } | ||
| } | ||
| async::combine(std::move(column_promises)).get_future().get(); | ||
| // OPTIMIZATION: Skip async::combine overhead when all columns use the streaming path. | ||
| // This is the common case for TPC-H queries where all columns have streamers. | ||
| if (!column_promises.empty()) { | ||
| async::combine(std::move(column_promises)).get_future().get(); | ||
| } | ||
| } | ||
| }; | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential use-after-free risk: Creating
nd::array(samples)as a copy and moving it intoDeeplakeStringBuffer, but the originalsamplesparameter is an rvalue reference. If the caller doesn't maintain the original array, the buffer may be freed while DuckDB still references it.Consider moving
samplesdirectly:duckdb::make_buffer<DeeplakeStringBuffer>(std::move(samples))to ensure proper ownership transfer.