diff --git a/Cargo.lock b/Cargo.lock index 456f382..f2bd111 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -125,7 +125,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -136,7 +136,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -483,7 +483,7 @@ checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" [[package]] name = "clickhouse-rs" version = "1.1.0-alpha.1" -source = "git+https://github.com/azat-rust/clickhouse-rs?branch=next#d4572ae7b53e3756676267202ba6a484f7df8e79" +source = "git+https://github.com/azat-rust/clickhouse-rs?branch=next#014e7ecee9348a3f38770ec102662af29db78e23" dependencies = [ "byteorder", "cfg-if", @@ -515,7 +515,7 @@ dependencies = [ [[package]] name = "clickhouse-rs-cityhash-sys" version = "0.1.2" -source = "git+https://github.com/azat-rust/clickhouse-rs?branch=next#d4572ae7b53e3756676267202ba6a484f7df8e79" +source = "git+https://github.com/azat-rust/clickhouse-rs?branch=next#014e7ecee9348a3f38770ec102662af29db78e23" dependencies = [ "cc", ] @@ -1029,7 +1029,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -1664,7 +1664,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2593,7 +2593,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2822,7 +2822,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52d1cfed4120b4d927bf7c0f86d2087a4a7d6027c906d9f9d525a80573b9be51" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -2936,7 +2936,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix 1.1.4", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3513,7 +3513,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] diff --git a/src/interpreter/clickhouse.rs b/src/interpreter/clickhouse.rs index 583b1c6..a6a9748 100644 --- a/src/interpreter/clickhouse.rs +++ b/src/interpreter/clickhouse.rs @@ -10,7 +10,7 @@ use chrono::{DateTime, Local}; use chrono_tz::Tz; use clickhouse_rs::{ Block, Options, Pool, - types::{ColumnType, Complex, Enum8, Enum16, FromSql, SqlType}, + types::{ColumnType, Complex, Enum8, Enum16, FromSql, Query, SqlType}, }; use futures_util::StreamExt; use std::collections::HashMap; @@ -974,7 +974,6 @@ impl ClickHouse { return Ok(statement); } - // TODO: copy all settings from the query async fn explain( &self, what: &str, @@ -984,40 +983,17 @@ impl ClickHouse { ) -> Result> { self.execute_simple(&format!("USE {}", database)).await?; + // The query's own settings are passed through the protocol rather than + // appended as a SETTINGS clause: the query may already carry its own + // SETTINGS, and a second one would be dropped from EXPLAIN SYNTAX output. + let mut explain = Query::new(format!("EXPLAIN {} {}", what, query)); if let Some(settings) = settings { - // NOTE: it handles queries with SETTINGS incorrectly, i.e.: - // - // SELECT 1 SETTINGS max_threads=1 - // - // EXPLAIN SYNTAX SELECT 1 SETTINGS max_threads=1 SETTINGS max_threads=1, max_insert_threads=1 -> - // SELECT 1 SETTINGS max_threads=1 - // - // This can be fixed two ways: - // - in ClickHouse - // - by passing settings in the protocol - if !settings.is_empty() { - return Ok(collect_values( - &self - .execute(&format!( - "EXPLAIN {} {} SETTINGS {}", - what, - query, - settings - .iter() - .map(|kv| format!("{}='{}'", kv.0, kv.1.replace('\'', "\\\'"))) - .collect::>() - .join(",") - )) - .await?, - "explain", - )); + for (name, value) in settings { + explain = explain.with_setting(name, value.as_str(), false); } } - return Ok(collect_values( - &self.execute(&format!("EXPLAIN {} {}", what, query)).await?, - "explain", - )); + return Ok(collect_values(&self.execute(explain).await?, "explain")); } /// Stream the text_log rows block-by-block into `on_block` (see @@ -2170,15 +2146,20 @@ impl ClickHouse { Ok(filtered) } - pub async fn execute(&self, query: &str) -> Result { + pub async fn execute(&self, query: impl Into) -> Result { + let query = query.into(); let columns = self .pool .get_handle() .await? - .query(query) + .query(query.clone()) .fetch_all() .await?; - log::trace!("Received {} rows for query: {}", columns.row_count(), query); + log::trace!( + "Received {} rows for query: {:?}", + columns.row_count(), + query + ); Ok(columns) } @@ -2193,9 +2174,11 @@ impl ClickHouse { ) -> Result<()> { // Blocks are capped by rows, not bytes: with wide rows (e.g. ~2K // metric_log columns) a default 65K-row block is GBs of allocations, - // and consumers keep a few blocks in flight. - let query = format!("{} SETTINGS max_block_size=8192", query); - self.execute_for_each_raw(&query, on_block).await + // and consumers keep a few blocks in flight. A per-query setting rather + // than a SETTINGS clause, so it composes with queries that already carry + // their own SETTINGS (e.g. allow_introspection_functions). + let query = Query::new(query).with_setting("max_block_size", 8192u64, false); + self.execute_for_each_raw(query, on_block).await } /// Same as execute_for_each(), but keeps the server default block size — @@ -2203,11 +2186,12 @@ impl ClickHouse { /// scan down. async fn execute_for_each_raw( &self, - query: &str, + query: impl Into, mut on_block: impl AsyncFnMut(Block) -> bool, ) -> Result<()> { + let query = query.into(); let mut client = self.pool.get_handle().await?; - let mut stream = client.query(&query).stream_blocks(); + let mut stream = client.query(query.clone()).stream_blocks(); let mut rows = 0; while let Some(block) = stream.next().await { let block = block?; @@ -2219,7 +2203,7 @@ impl ClickHouse { break; } } - log::trace!("Received {} rows (streamed) for query: {}", rows, query); + log::trace!("Received {} rows (streamed) for query: {:?}", rows, query); Ok(()) } diff --git a/src/utils.rs b/src/utils.rs index 1f73cca..3cb8fdc 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -238,11 +238,10 @@ pub fn get_query(query: &str, settings: &HashMap) -> String { }) .collect::>() .join(""); - if !query.contains("SETTINGS") { - ret.push_str("\nSETTINGS\n"); - } else { - ret.push_str(",\n"); - } + // ClickHouse accepts multiple SETTINGS clauses (last value wins per setting), + // so always append our own instead of detecting and merging into one the + // query may already carry. + ret.push_str("\nSETTINGS\n"); ret.push_str(&settings_str); return ret; }