Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 25 additions & 41 deletions src/interpreter/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use chrono::{DateTime, Local};
use chrono_tz::Tz;
use clickhouse_rs::{
Block, Options, Pool,
types::{ColumnType, Complex, Enum8, Enum16, FromSql, SqlType},
types::{ColumnType, Complex, Enum8, Enum16, FromSql, Query, SqlType},
};
use futures_util::StreamExt;
use std::collections::HashMap;
Expand Down Expand Up @@ -974,7 +974,6 @@ impl ClickHouse {
return Ok(statement);
}

// TODO: copy all settings from the query
async fn explain(
&self,
what: &str,
Expand All @@ -984,40 +983,17 @@ impl ClickHouse {
) -> Result<Vec<String>> {
self.execute_simple(&format!("USE {}", database)).await?;

// The query's own settings are passed through the protocol rather than
// appended as a SETTINGS clause: the query may already carry its own
// SETTINGS, and a second one would be dropped from EXPLAIN SYNTAX output.
let mut explain = Query::new(format!("EXPLAIN {} {}", what, query));
if let Some(settings) = settings {
// NOTE: it handles queries with SETTINGS incorrectly, i.e.:
//
// SELECT 1 SETTINGS max_threads=1
//
// EXPLAIN SYNTAX SELECT 1 SETTINGS max_threads=1 SETTINGS max_threads=1, max_insert_threads=1 ->
// SELECT 1 SETTINGS max_threads=1
//
// This can be fixed two ways:
// - in ClickHouse
// - by passing settings in the protocol
if !settings.is_empty() {
return Ok(collect_values(
&self
.execute(&format!(
"EXPLAIN {} {} SETTINGS {}",
what,
query,
settings
.iter()
.map(|kv| format!("{}='{}'", kv.0, kv.1.replace('\'', "\\\'")))
.collect::<Vec<String>>()
.join(",")
))
.await?,
"explain",
));
for (name, value) in settings {
explain = explain.with_setting(name, value.as_str(), false);
}
}

return Ok(collect_values(
&self.execute(&format!("EXPLAIN {} {}", what, query)).await?,
"explain",
));
return Ok(collect_values(&self.execute(explain).await?, "explain"));
}

/// Stream the text_log rows block-by-block into `on_block` (see
Expand Down Expand Up @@ -2170,15 +2146,20 @@ impl ClickHouse {
Ok(filtered)
}

pub async fn execute(&self, query: &str) -> Result<Columns> {
pub async fn execute(&self, query: impl Into<Query>) -> Result<Columns> {
let query = query.into();
let columns = self
.pool
.get_handle()
.await?
.query(query)
.query(query.clone())
.fetch_all()
.await?;
log::trace!("Received {} rows for query: {}", columns.row_count(), query);
log::trace!(
"Received {} rows for query: {:?}",
columns.row_count(),
query
);
Ok(columns)
}

Expand All @@ -2193,21 +2174,24 @@ impl ClickHouse {
) -> Result<()> {
// Blocks are capped by rows, not bytes: with wide rows (e.g. ~2K
// metric_log columns) a default 65K-row block is GBs of allocations,
// and consumers keep a few blocks in flight.
let query = format!("{} SETTINGS max_block_size=8192", query);
self.execute_for_each_raw(&query, on_block).await
// and consumers keep a few blocks in flight. A per-query setting rather
// than a SETTINGS clause, so it composes with queries that already carry
// their own SETTINGS (e.g. allow_introspection_functions).
let query = Query::new(query).with_setting("max_block_size", 8192u64, false);
self.execute_for_each_raw(query, on_block).await
}

/// Same as execute_for_each(), but keeps the server default block size —
/// for narrow-row tables (e.g. text_log) where capping it only slows the
/// scan down.
async fn execute_for_each_raw(
&self,
query: &str,
query: impl Into<Query>,
mut on_block: impl AsyncFnMut(Block) -> bool,
) -> Result<()> {
let query = query.into();
let mut client = self.pool.get_handle().await?;
let mut stream = client.query(&query).stream_blocks();
let mut stream = client.query(query.clone()).stream_blocks();
let mut rows = 0;
while let Some(block) = stream.next().await {
let block = block?;
Expand All @@ -2219,7 +2203,7 @@ impl ClickHouse {
break;
}
}
log::trace!("Received {} rows (streamed) for query: {}", rows, query);
log::trace!("Received {} rows (streamed) for query: {:?}", rows, query);
Ok(())
}

Expand Down
9 changes: 4 additions & 5 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,10 @@ pub fn get_query(query: &str, settings: &HashMap<String, String>) -> String {
})
.collect::<Vec<String>>()
.join("");
if !query.contains("SETTINGS") {
ret.push_str("\nSETTINGS\n");
} else {
ret.push_str(",\n");
}
// ClickHouse accepts multiple SETTINGS clauses (last value wins per setting),
// so always append our own instead of detecting and merging into one the
// query may already carry.
ret.push_str("\nSETTINGS\n");
ret.push_str(&settings_str);
return ret;
}
Expand Down
Loading