diff --git a/cpp/src/cylon/CMakeLists.txt b/cpp/src/cylon/CMakeLists.txt index 637fb69b2..597e96192 100644 --- a/cpp/src/cylon/CMakeLists.txt +++ b/cpp/src/cylon/CMakeLists.txt @@ -14,6 +14,8 @@ find_package(Threads REQUIRED) +option(CYLON_DEBUG "Enable Cylon Debug") + if (CYLON_BENCH) message("Benchmark timings enabled") add_definitions(-D_CYLON_BENCH) @@ -101,6 +103,11 @@ add_library(cylon SHARED groupby/hash_groupby.hpp groupby/pipeline_groupby.cpp groupby/pipeline_groupby.hpp + window/window.hpp + window/window.cpp + window/window_config.hpp + window/hash_window.hpp + window/hash_window.cpp indexing/index.cpp indexing/index.hpp indexing/index_utils.cpp @@ -250,6 +257,10 @@ if (PYCYLON_BUILD) endif (PYCYLON_BUILD) target_compile_options(cylon PRIVATE) + +if (CYLON_DEBUG) + target_compile_definitions(cylon PRIVATE CYLON_DEBUG) +endif() # target_compile_options(cylon PRIVATE -Werror -Wall -Wextra -Wno-unused-parameter) cylon_install_all_headers("cylon") diff --git a/cpp/src/cylon/groupby/groupby.cpp b/cpp/src/cylon/groupby/groupby.cpp index 2599d3203..72338d680 100644 --- a/cpp/src/cylon/groupby/groupby.cpp +++ b/cpp/src/cylon/groupby/groupby.cpp @@ -21,14 +21,7 @@ namespace cylon { -static const std::vector - ASSOCIATIVE_OPS{compute::SUM, compute::MIN, compute::MAX}; -static inline bool is_associative(const std::vector &aggregate_ops) { - return std::all_of(aggregate_ops.begin(), aggregate_ops.end(), [](const compute::AggregationOpId &op) { - return std::find(ASSOCIATIVE_OPS.begin(), ASSOCIATIVE_OPS.end(), op) != ASSOCIATIVE_OPS.end(); - }); -} Status DistributedHashGroupBy(std::shared_ptr &table, const std::vector &index_cols, diff --git a/cpp/src/cylon/groupby/groupby.hpp b/cpp/src/cylon/groupby/groupby.hpp index a494f333d..0020ac66f 100644 --- a/cpp/src/cylon/groupby/groupby.hpp +++ b/cpp/src/cylon/groupby/groupby.hpp @@ -20,6 +20,15 @@ namespace cylon { +static const std::vector + ASSOCIATIVE_OPS{compute::SUM, compute::MIN, compute::MAX}; + +static inline bool is_associative(const std::vector &aggregate_ops) { + return std::all_of(aggregate_ops.begin(), aggregate_ops.end(), [](const compute::AggregationOpId &op) { + return std::find(ASSOCIATIVE_OPS.begin(), ASSOCIATIVE_OPS.end(), op) != ASSOCIATIVE_OPS.end(); + }); +} + Status DistributedHashGroupBy(std::shared_ptr
&table, const std::vector &index_cols, const std::vector &aggregate_cols, diff --git a/cpp/src/cylon/window/CMakeLists.txt b/cpp/src/cylon/window/CMakeLists.txt new file mode 100644 index 000000000..641bc4f5e --- /dev/null +++ b/cpp/src/cylon/window/CMakeLists.txt @@ -0,0 +1,15 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +cylon_install_all_headers("cylon/window") \ No newline at end of file diff --git a/cpp/src/cylon/window/hash_window.cpp b/cpp/src/cylon/window/hash_window.cpp new file mode 100644 index 000000000..8d4d8e609 --- /dev/null +++ b/cpp/src/cylon/window/hash_window.cpp @@ -0,0 +1,214 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "hash_window.hpp" + +#include "cylon/thridparty/flat_hash_map/bytell_hash_map.hpp" +#include +#include +#include +#include +#include + +#include "cylon/arrow/arrow_comparator.hpp" +#include "cylon/ctx/arrow_memory_pool_utils.hpp" +#include "cylon/util/macros.hpp" +#include + +namespace cylon { +namespace windowing { + +Status HashWindow(const config::WindowConfig &window_config, + const std::shared_ptr
&table, + const std::vector &idx_cols, + const std::vector> &aggregate_cols, + std::shared_ptr
&output) { + std::vector>> aggregations; + aggregations.reserve(aggregate_cols.size()); + for (auto &&p:aggregate_cols) { + // create AggregationOp with nullptr options + aggregations.emplace_back(p.first, compute::MakeAggregationOpFromID(p.second)); + } + + return HashWindow(window_config, table, idx_cols, aggregations, output); +} + + +/** + * Hash group-by operation by using pairs + * NOTE: Nulls in the value columns will be ignored! + * @param table + * @param idx_cols + * @param aggregations + * @param output + * @return + */ +Status HashWindow(const config::WindowConfig &window_config, + const std::shared_ptr
&table, + const std::vector &idx_cols, + const std::vector>> &aggregations, + std::shared_ptr
&output) { + +#ifdef CYLON_DEBUG + auto t1 = std::chrono::steady_clock::now(); +#endif + const auto &ctx = table->GetContext(); + arrow::MemoryPool *pool = ToArrowPool(ctx); + + std::shared_ptr atable = table->get_table(); + + std::vector> slices; + + //slide arrays + if (window_config.GetObservations() > 0 && atable->num_rows() > 0) { + SlicesByObservations(window_config, table, slices, pool); + } + + std::vector> agg_slices; + + if (!slices.empty()) { + + for (const auto &slice : slices) { + //call hash group by for each slice and then combine slices + std::shared_ptr output; + RETURN_CYLON_STATUS_IF_FAILED(cylon::HashGroupBy(slice, idx_cols, aggregations, output)); + agg_slices.emplace_back(std::move(output)); + } + } + + if (!agg_slices.empty()) { + RETURN_CYLON_STATUS_IF_FAILED(Merge(agg_slices, output)); + } + + return cylon::Status::OK(); +} + +Status CreateEmptyTableAndMerge(const std::shared_ptr
*sliced_table, + const std::shared_ptr &schema, + std::shared_ptr
&output, + arrow::MemoryPool *pool, int64_t num_rows) { + std::vector> arrays; + arrays.reserve(schema->num_fields()); + + auto ctx = sliced_table->get()->GetContext(); + + for (int i = 0; i < schema->num_fields(); i++) { + const auto &t = schema->field(i)->type(); + CYLON_ASSIGN_OR_RAISE(auto arr, arrow::MakeArrayOfNull(t, 0, pool)) + arrays.emplace_back(std::make_shared(std::move(arr))); + } + + if (num_rows > 0) { + + std::shared_ptr arrow_empty_table = arrow::Table::Make(schema, std::move(arrays), num_rows); + std::shared_ptr
cylonEmptyTable1; + auto status1 = Table::FromArrowTable(ctx, std::move(arrow_empty_table), cylonEmptyTable1); + + if (!status1.is_ok()) { + return status1; + } + + std::shared_ptr
concat; + + std::shared_ptr
cylonSlicedTable; + + RETURN_CYLON_STATUS_IF_FAILED(Table::FromArrowTable(ctx, sliced_table->get()->get_table(), cylonSlicedTable)); + RETURN_CYLON_STATUS_IF_FAILED(Merge({cylonEmptyTable1, cylonSlicedTable}, concat)); + + output = std::move(cylonSlicedTable); + } else { + output = *sliced_table; + } + + return cylon::Status::OK(); +} + +Status SlicesByObservations(const config::WindowConfig &window_config, + const std::shared_ptr
&table, + std::vector> &output, + arrow::MemoryPool *pool) { + auto observations = window_config.GetObservations(); + + std::shared_ptr atable = table->get_table(); + + if (observations > atable->num_rows()) { //bound observations to number of rows + observations = atable->num_rows(); + } + + for (int64_t i = 0; i < atable->num_rows(); i++) { + + auto adjusted_observations = observations - 1; + + if ((i + 1) < observations ) { //prefill table with nulls + auto num_rows_to_add = adjusted_observations - i; + auto slice_start = i - num_rows_to_add; + + std::shared_ptr
merge_output; + + std::shared_ptr
cylon_output; + + if (slice_start <= 0) { //check if neg + + auto sliceSize = observations - num_rows_to_add; + auto slice = atable->Slice(0, sliceSize); + RETURN_CYLON_STATUS_IF_FAILED(Table::FromArrowTable(table->GetContext(), slice, cylon_output)); + CreateEmptyTableAndMerge(&cylon_output, atable->schema(), merge_output, pool, + num_rows_to_add); + output.emplace_back(std::move(merge_output)); + + } else { + + auto slice = atable->Slice(i - num_rows_to_add, observations); + RETURN_CYLON_STATUS_IF_FAILED(Table::FromArrowTable(table->GetContext(), + slice, + cylon_output)); + CreateEmptyTableAndMerge(&cylon_output, atable->schema(), merge_output, pool, + num_rows_to_add); + output.emplace_back(std::move(merge_output)); + } + + } else { + std::shared_ptr
cylonTable; + auto status = Table::FromArrowTable(table->GetContext(), atable->Slice(i - adjusted_observations, observations), + cylonTable); + if (status.is_ok()) { + output.emplace_back(std::move(cylonTable)); + } + } + } + + return cylon::Status::OK(); + +} + + +Status SlicesByOffset(const config::WindowConfig &window_config, + const std::shared_ptr &table, + std::vector
&output) { + return cylon::Status::OK(); +} + +Status HashWindow(const config::WindowConfig &window_config, + std::shared_ptr
&table, + int32_t idx_col, + const std::vector &aggregate_cols, + const std::vector &aggregate_ops, + std::shared_ptr
&output) { + + return cylon::Status::OK(); +} + +} +} diff --git a/cpp/src/cylon/window/hash_window.hpp b/cpp/src/cylon/window/hash_window.hpp new file mode 100644 index 000000000..31b3def00 --- /dev/null +++ b/cpp/src/cylon/window/hash_window.hpp @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef CYLON_SRC_CYLON_WINDOW_HASH_WINDOW_HPP_ + +#include +#include +#include "window_config.hpp" + + +namespace cylon { +namespace windowing { + +/** + * Hash group-by operation by using pairs + * NOTE: Nulls in the value columns will be ignored! + * @param table + * @param idx_cols + * @param aggregate_cols + * @param output + * @return + */ +Status HashWindow(const config::WindowConfig &window_config, + const std::shared_ptr
&table, + const std::vector &idx_cols, + const std::vector> &aggregate_cols, + std::shared_ptr
&output); + +/** + * Hash group-by operation by using pairs + * NOTE: Nulls in the value columns will be ignored! + * @param table + * @param idx_cols + * @param aggregations + * @param output + * @return + */ +Status HashWindow(const config::WindowConfig &window_config, + const std::shared_ptr
&table, + const std::vector &idx_cols, + const std::vector>> &aggregations, + std::shared_ptr
&output); + +/** + * Returns a vector of windows sliced by observation size + * @param window_config + * @param table + * @param output + * @param pool + * @return + */ +Status SlicesByObservations(const config::WindowConfig &window_config, + const std::shared_ptr
&table, + std::vector> &output, + arrow::MemoryPool *pool); + +Status CreateEmptyTableAndMerge(const std::shared_ptr
*sliced_table, + const std::shared_ptr &schema, + std::shared_ptr
&output, + arrow::MemoryPool *pool, int64_t num_rows); + +Status SlicesByOffset(const config::WindowConfig &window_config, + const std::shared_ptr &table, + std::vector
&output); +/** + * Hash group-by operation by using AggregationOpId vector + * NOTE: Nulls in the value columns will be ignored! + * @param table + * @param idx_cols + * @param aggregate_cols + * @param aggregate_ops + * @param output + * @return + */ +Status HashWindow(const config::WindowConfig &window_config, + std::shared_ptr
&table, + const std::vector &idx_cols, + const std::vector &aggregate_cols, + const std::vector &aggregate_ops, + std::shared_ptr
&output); + +Status HashWindow(const config::WindowConfig &window_config, + std::shared_ptr
&table, + int32_t idx_col, + const std::vector &aggregate_cols, + const std::vector &aggregate_ops, + std::shared_ptr
&output); + +} +} + +#endif //CYLON_SRC_CYLON_WINDOW_HASH_WINDOW_HPP_ diff --git a/cpp/src/cylon/window/window.cpp b/cpp/src/cylon/window/window.cpp new file mode 100644 index 000000000..1b23d2c22 --- /dev/null +++ b/cpp/src/cylon/window/window.cpp @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "window.hpp" + +namespace cylon { +namespace windowing { + +Status DistributedHashWindow(const config::WindowConfig &window_config, + std::shared_ptr
&table, + const std::vector &index_cols, + const std::vector &aggregate_cols, + const std::vector &aggregate_ops, + std::shared_ptr
&output) { + + + + + + + return cylon::Status(); +} +Status DistributedHashWindow(const config::WindowConfig &window_config, + std::shared_ptr
&table, + int32_t index_col, + const std::vector &aggregate_cols, + const std::vector &aggregate_ops, + std::shared_ptr
&output) { + return cylon::Status(); +} +} +} diff --git a/cpp/src/cylon/window/window.hpp b/cpp/src/cylon/window/window.hpp new file mode 100644 index 000000000..d051204e2 --- /dev/null +++ b/cpp/src/cylon/window/window.hpp @@ -0,0 +1,45 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef CYLON_SRC_CYLON_WINDOW_WINDOW_HPP_ +#define CYLON_SRC_CYLON_WINDOW_WINDOW_HPP_ + +#include +#include + +#include + +namespace cylon { +namespace windowing { + +class window { + + Status DistributedHashWindow(const config::WindowConfig &window_config, + std::shared_ptr
&table, + const std::vector &index_cols, + const std::vector &aggregate_cols, + const std::vector &aggregate_ops, + std::shared_ptr
&output); + + Status DistributedHashWindow(const config::WindowConfig &window_config, + std::shared_ptr
&table, + int32_t index_col, + const std::vector &aggregate_cols, + const std::vector &aggregate_ops, + std::shared_ptr
&output); + +}; +} +} + +#endif //CYLON_SRC_CYLON_WINDOW_WINDOW_HPP_ diff --git a/cpp/src/cylon/window/window_config.hpp b/cpp/src/cylon/window/window_config.hpp new file mode 100644 index 000000000..d6bf43483 --- /dev/null +++ b/cpp/src/cylon/window/window_config.hpp @@ -0,0 +1,100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef CYLON_SRC_CYLON_WINDOW_WINDOW_CONFIG_HPP_ +#define CYLON_SRC_CYLON_WINDOW_WINDOW_CONFIG_HPP_ + +#include +#include + +namespace cylon { +namespace windowing { +namespace config { + +enum LabelPosition { + LEFT, + CENTER, + RIGHT +}; + +enum WindowOperation { + ROLLING, + WEIGHTED, + EXPANDING, + EXP_WEIGHTED +}; + +class WindowConfig { + public: + WindowConfig() = delete; + + WindowConfig(const WindowOperation window_operation, const int observations) : window_operation(window_operation), + observations(observations) { + + } + + WindowConfig(const WindowOperation window_operation, const std::string offset) : window_operation(window_operation), + offset(std::move(offset)) {} + WindowOperation GetWindowOperation() const { + return window_operation; + } + int GetObservations() const { + return observations; + } + const std::string &GetOffset() const { + return offset; + } + LabelPosition GetLabelPosition() const { + return label_position; + } + int GetMinPeriods() const { + return min_periods; + } + const int GetStep() const { + return step; + } + + /** + * Creates a Windowing Configuration applying defaults and is based on the number of + * observations per window + * @param observations + * @return + */ + static WindowConfig DefaultWithObservations(const WindowOperation window_operation, const int observations) { + return WindowConfig(window_operation, observations); + } + + /** + * Creates a Windowing Configuration applying defaults and is based on an offset or + * a time period for each window. + * @param observations + * @return + */ + static WindowConfig DefaultWithOffset(const WindowOperation window_operation, const std::string offset) { + return WindowConfig(window_operation, offset); + } + + private: + const WindowOperation window_operation; + const int observations{}; + const std::string offset{}; + const LabelPosition label_position = LabelPosition::RIGHT; + const int min_periods{}; + const int step{}; +}; +} +} +} + +#endif //CYLON_SRC_CYLON_WINDOW_WINDOW_CONFIG_HPP_ diff --git a/cpp/src/examples/CMakeLists.txt b/cpp/src/examples/CMakeLists.txt index 510e3a1c8..3dee400b8 100644 --- a/cpp/src/examples/CMakeLists.txt +++ b/cpp/src/examples/CMakeLists.txt @@ -14,6 +14,7 @@ add_definitions(-DEXAMPLE_CMAKE_DIR=\"${CMAKE_CURRENT_SOURCE_DIR}\") + add_library(cylon_example_utils SHARED example_utils.cpp example_utils.hpp) @@ -40,6 +41,8 @@ macro(cylon_add_exe EXENAME) target_link_libraries(${EXENAME} ${GLOG_LIBRARIES}) target_link_libraries(${EXENAME} cylon_example_utils) + + if (CYLON_GLOO) target_link_libraries(${EXENAME} ${GLOO_LIBRARIES}) endif () @@ -76,6 +79,7 @@ cylon_add_exe(parquet_union_example) cylon_add_exe(parquet_join_example) cylon_add_exe(dist_sort_example) cylon_add_exe(slice_example) +cylon_add_exe(window_example) if (CYLON_UCX) cylon_add_exe(ucx_join_example) diff --git a/cpp/src/examples/groupby_perf.cpp b/cpp/src/examples/groupby_perf.cpp index d3df95a1f..056fe398f 100644 --- a/cpp/src/examples/groupby_perf.cpp +++ b/cpp/src/examples/groupby_perf.cpp @@ -55,6 +55,7 @@ int main(int argc, char *argv[]) { } } else if (mem == "f" && argc == 3) { LOG(INFO) << "using files"; + LOG(INFO) << "loading: " << std::string(argv[2]) + std::to_string(ctx->GetRank()) + ".csv"; if (!cylon::FromCSV(ctx, std::string(argv[2]) + std::to_string(ctx->GetRank()) + ".csv", table) .is_ok()) { LOG(ERROR) << "file reading failed!"; diff --git a/cpp/src/examples/window_example.cpp b/cpp/src/examples/window_example.cpp new file mode 100644 index 000000000..1b88031c1 --- /dev/null +++ b/cpp/src/examples/window_example.cpp @@ -0,0 +1,80 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#define CHECK_STATUS_AND_PRINT(first_table, status, output) \ + if (!status.is_ok()) { \ + LOG(INFO) << "Table GroupBy failed " << status.get_msg(); \ + return 1; \ + }; \ + LOG(INFO) << "table had : " << first_table->Rows() << ", window has : " << output->Rows(); \ + LOG(INFO) << "Output of Window Operation"; \ + std::cout << output->get_table()->schema()->ToString() << std::endl; \ + output->Print(); \ + std::cout << "-----------------------" << std::endl; \ + + + +int main(int argc, char *argv[]) { + if (argc < 2) { + LOG(ERROR) << "There should be two arguments with paths to csv files"; + return 1; + } + + auto start_start = std::chrono::steady_clock::now(); + auto mpi_config = std::make_shared(); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } + + std::shared_ptr first_table, output; + auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); + auto status = cylon::FromCSV(ctx, argv[1], first_table, read_options); + if (!status.is_ok()) { + LOG(INFO) << "Table reading failed " << argv[1]; + ctx->Finalize(); + return 1; + } + + auto read_end_time = std::chrono::steady_clock::now(); + + LOG(INFO) << "Read tables in " + << std::chrono::duration_cast(read_end_time - start_start).count() << "[ms]"; + + LOG(INFO) << "Table Data"; + first_table->Print(); + std::cout << "-----------------------" << std::endl; + + //call windowing operations + + auto window_config_rolling_obs = cylon::windowing::config::WindowConfig::DefaultWithObservations(cylon::windowing::config::WindowOperation::ROLLING, 2); + + + CHECK_STATUS_AND_PRINT(first_table, + cylon::windowing::HashWindow(window_config_rolling_obs, first_table, {0, 1}, + {{2, cylon::compute::SUM}}, output), + output); + + + return 0; +}