Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ libc = { version = "*", default-features = false }
size = { version = "*", default-features = false, features = ["std"] }
tempfile = { version = "*", default-features = false }
url = { version = "*", default-features = false }
# Must match clickhouse-rs (read UUID columns)
uuid = { version = "*", default-features = false }
humantime = { version = "*", default-features = false }
backtrace = { version = "*", default-features = false, features = ["std"] }
futures = { version = "*", default-features = false, features = ["std"] }
Expand Down
58 changes: 41 additions & 17 deletions src/interpreter/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use chrono::{DateTime, Local};
use chrono_tz::Tz;
use clickhouse_rs::{
Block, Options, Pool,
types::{ColumnType, Complex, FromSql},
types::{ColumnType, Complex, Enum8, Enum16, FromSql, SqlType},
};
use futures_util::StreamExt;
use std::collections::HashMap;
Expand All @@ -23,6 +23,30 @@ use std::str::FromStr;

pub type Columns = Block<Complex>;

// clickhouse-rs reads an Enum column as its bare integer (FromSql drops the name mapping) and a
// UUID as uuid::Uuid, so block.get::<String> fails on both; resolve the enum name from the Vec
// carried by the column type and format the UUID. String and LowCardinality(String) fall through
// to a plain String read (get coerces LC).
pub fn column_as_string<K: ColumnType>(block: &Block<K>, row: usize, name: &str) -> Result<String> {
let column = block
.columns()
.iter()
.find(|c| c.name() == name)
.ok_or_else(|| Error::msg(format!("Cannot get {name} column")))?;
fn name_of<T: PartialEq + ToString>(values: &[(String, T)], v: T) -> String {
values
.iter()
.find(|(_, k)| *k == v)
.map_or_else(|| v.to_string(), |(name, _)| name.clone())
}
Ok(match column.sql_type() {
SqlType::Enum8(values) => name_of(&values, block.get::<Enum8, _>(row, name)?.internal()),
SqlType::Enum16(values) => name_of(&values, block.get::<Enum16, _>(row, name)?.internal()),
SqlType::Uuid => block.get::<uuid::Uuid, _>(row, name)?.to_string(),
_ => block.get::<String, _>(row, name)?,
})
}

pub struct ClickHouse {
pub quirks: ClickHouseQuirks,
// Server has use_shared_merge_tree_log_pipeline enabled (SharedMergeTree-backed system.*_log).
Expand Down Expand Up @@ -532,7 +556,7 @@ impl ClickHouse {
elapsed / {q} AS elapsed,
user,
initial_user,
''::String AS exception,
'' AS exception,
is_initial_query,
is_cancelled,
initial_query_id,
Expand Down Expand Up @@ -1029,9 +1053,9 @@ impl ClickHouse {
event_time,
event_time_microseconds,
thread_id,
level::String AS level,
logger_name::String AS logger_name,
query_id::String AS query_id,
level,
logger_name,
query_id,
message
FROM {}
WHERE
Expand Down Expand Up @@ -1458,7 +1482,7 @@ impl ClickHouse {
fromUnixTimestamp64Nano({start}) AS start_,
fromUnixTimestamp64Nano({end}) AS end_
SELECT
event_type::String AS event_type,
event_type,
event_time_microseconds,
duration_ms,
database,
Expand Down Expand Up @@ -1532,7 +1556,7 @@ impl ClickHouse {
{with}
SELECT
event_time_microseconds,
trace_type::String AS trace_type,
trace_type,
cityHash64(trace) AS stack_hash,
{host_expr} AS host_name
FROM {dbtable}
Expand Down Expand Up @@ -1602,8 +1626,8 @@ impl ClickHouse {
fromUnixTimestamp64Nano({end}) AS end_
SELECT
event_time_microseconds,
level::String AS level,
logger_name::String AS logger_name,
level,
logger_name,
message,
query_id,
{host_expr} AS host_name
Expand Down Expand Up @@ -1867,7 +1891,7 @@ impl ClickHouse {
database,
table,
format,
status::String AS status,
status,
bytes,
exception,
event_time_microseconds,
Expand Down Expand Up @@ -1934,7 +1958,7 @@ impl ClickHouse {
SELECT
file_name,
rows_processed,
status::String AS status,
status,
processing_start_time,
processing_end_time,
exception
Expand Down Expand Up @@ -1966,7 +1990,7 @@ impl ClickHouse {
table,
file_name,
rows_processed,
status::String AS status,
status,
processing_start_time,
processing_end_time,
exception
Expand Down Expand Up @@ -1997,7 +2021,7 @@ impl ClickHouse {
fromUnixTimestamp64Nano({start}) AS start_,
fromUnixTimestamp64Nano({end}) AS end_
SELECT
event_type::String AS event_type,
event_type,
query_id,
disk_name,
bucket,
Expand Down Expand Up @@ -2069,11 +2093,11 @@ impl ClickHouse {
fromUnixTimestamp64Nano({start}) AS start_,
fromUnixTimestamp64Nano({end}) AS end_
SELECT
type::String AS type,
type,
-- user/auth_type are Nullable
coalesce(user, '') AS user,
coalesce(auth_type::String, '') AS auth_type,
interface::String AS interface,
coalesce(auth_type, '') AS auth_type,
interface,
toString(client_address) AS client_address,
client_name,
failure_reason,
Expand Down Expand Up @@ -2106,7 +2130,7 @@ impl ClickHouse {
event_time,
session_id,
parent_path,
operation::String AS operation,
operation,
count::UInt64 AS count,
mapKeys(errors) AS error_names,
mapValues(errors) AS error_counts,
Expand Down
22 changes: 11 additions & 11 deletions src/interpreter/perfetto.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::interpreter::Query;
use crate::interpreter::clickhouse::{MetricLogRow, QueryMetricRow};
use crate::interpreter::clickhouse::{MetricLogRow, QueryMetricRow, column_as_string};
use anyhow::Result;
use chrono::{DateTime, Local};
use chrono_tz::Tz;
Expand Down Expand Up @@ -614,7 +614,7 @@ impl PerfettoTraceBuilder {
let process_uuid = self.process_track_uuid("Part Log");

for i in 0..columns.row_count() {
let event_type: String = columns.get(i, "event_type").unwrap_or_default();
let event_type: String = column_as_string(columns, i, "event_type").unwrap_or_default();
let event_time: DateTime<Tz> = match columns.get(i, "event_time_microseconds") {
Ok(v) => v,
Err(e) => {
Expand Down Expand Up @@ -740,7 +740,7 @@ impl PerfettoTraceBuilder {
};

for i in 0..columns.row_count() {
let level: String = columns.get(i, "level").unwrap_or_default();
let level: String = column_as_string(columns, i, "level").unwrap_or_default();
let logger_name: String = columns.get(i, "logger_name").unwrap_or_default();
let message: String = columns.get(i, "message").unwrap_or_default();
let query_id: String = columns.get(i, "query_id").unwrap_or_default();
Expand Down Expand Up @@ -854,7 +854,7 @@ impl PerfettoTraceBuilder {
let database: String = columns.get(i, "database").unwrap_or_default();
let table: String = columns.get(i, "table").unwrap_or_default();
let format: String = columns.get(i, "format").unwrap_or_default();
let status: String = columns.get(i, "status").unwrap_or_default();
let status: String = column_as_string(columns, i, "status").unwrap_or_default();
let bytes: u64 = columns.get(i, "bytes").unwrap_or(0);
let exception: String = columns.get(i, "exception").unwrap_or_default();
let query_id: String = columns.get(i, "query_id").unwrap_or_default();
Expand Down Expand Up @@ -945,7 +945,7 @@ impl PerfettoTraceBuilder {
for i in 0..columns.row_count() {
let file_name: String = columns.get(i, "file_name").unwrap_or_default();
let rows_processed: u64 = columns.get(i, "rows_processed").unwrap_or(0);
let status: String = columns.get(i, "status").unwrap_or_default();
let status: String = column_as_string(columns, i, "status").unwrap_or_default();
let exception: String = columns.get(i, "exception").unwrap_or_default();

let start_ns: u64 = match columns.get::<DateTime<Tz>, _>(i, "processing_start_time") {
Expand Down Expand Up @@ -983,7 +983,7 @@ impl PerfettoTraceBuilder {
let table: String = columns.get(i, "table").unwrap_or_default();
let file_name: String = columns.get(i, "file_name").unwrap_or_default();
let rows_processed: u64 = columns.get(i, "rows_processed").unwrap_or(0);
let status: String = columns.get(i, "status").unwrap_or_default();
let status: String = column_as_string(columns, i, "status").unwrap_or_default();
let exception: String = columns.get(i, "exception").unwrap_or_default();

let start_ns: u64 = match columns.get::<DateTime<Tz>, _>(i, "processing_start_time") {
Expand Down Expand Up @@ -1020,7 +1020,7 @@ impl PerfettoTraceBuilder {
let process_uuid = self.process_track_uuid("Blob Storage");

for i in 0..columns.row_count() {
let event_type: String = columns.get(i, "event_type").unwrap_or_default();
let event_type: String = column_as_string(columns, i, "event_type").unwrap_or_default();
let query_id: String = columns.get(i, "query_id").unwrap_or_default();
let disk_name: String = columns.get(i, "disk_name").unwrap_or_default();
let bucket: String = columns.get(i, "bucket").unwrap_or_default();
Expand Down Expand Up @@ -1113,10 +1113,10 @@ impl PerfettoTraceBuilder {
let process_uuid = self.process_track_uuid("Sessions");

for i in 0..columns.row_count() {
let session_type: String = columns.get(i, "type").unwrap_or_default();
let session_type: String = column_as_string(columns, i, "type").unwrap_or_default();
let user: String = columns.get(i, "user").unwrap_or_default();
let auth_type: String = columns.get(i, "auth_type").unwrap_or_default();
let interface: String = columns.get(i, "interface").unwrap_or_default();
let interface: String = column_as_string(columns, i, "interface").unwrap_or_default();
let client_address: String = columns.get(i, "client_address").unwrap_or_default();
let client_name: String = columns.get(i, "client_name").unwrap_or_default();
let failure_reason: String = columns.get(i, "failure_reason").unwrap_or_default();
Expand Down Expand Up @@ -1159,7 +1159,7 @@ impl PerfettoTraceBuilder {
let process_uuid = self.process_track_uuid("ZooKeeper");

for i in 0..columns.row_count() {
let operation: String = columns.get(i, "operation").unwrap_or_default();
let operation: String = column_as_string(columns, i, "operation").unwrap_or_default();
let count: u64 = columns.get(i, "count").unwrap_or(0);
let average_latency: f64 = columns.get(i, "average_latency").unwrap_or(0.0);
let parent_path: String = columns.get(i, "parent_path").unwrap_or_default();
Expand Down Expand Up @@ -1310,7 +1310,7 @@ impl PerfettoTraceBuilder {
/// emitted by finalize_stack_traces() from build().
pub fn add_stack_samples<K: ColumnType>(&mut self, samples: &Block<K>) {
for i in 0..samples.row_count() {
let trace_type: String = samples.get(i, "trace_type").unwrap_or_default();
let trace_type: String = column_as_string(samples, i, "trace_type").unwrap_or_default();
let stack_hash: u64 = samples.get(i, "stack_hash").unwrap_or_default();
let host_name: String = samples.get(i, "host_name").unwrap_or_default();

Expand Down
2 changes: 1 addition & 1 deletion src/view/providers/backups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl ViewProvider for BackupsViewProvider {
fn show(&self, siv: &mut Cursive, context: ContextArc) {
let columns = vec![
"name",
"status::String status",
"status",
"error",
"start_time",
"end_time",
Expand Down
2 changes: 1 addition & 1 deletion src/view/providers/dictionaries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl ViewProvider for DictionariesViewProvider {
fn show(&self, siv: &mut Cursive, context: ContextArc) {
let columns = vec![
"name",
"status::String status",
"status",
"source",
"bytes_allocated memory",
"query_count queries",
Expand Down
2 changes: 1 addition & 1 deletion src/view/providers/error_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl ViewProvider for ErrorLogViewProvider {
}

let columns = vec![
"error::String name",
"error name",
"any(code) code",
"sum(value) total",
"total bar",
Expand Down
2 changes: 1 addition & 1 deletion src/view/providers/logger_names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl ViewProvider for LoggerNamesViewProvider {
let end = view_options.end;

let mut columns = vec![
"logger_name::String logger_name",
"logger_name",
"count() count",
"countIf(level = 'Fatal') fatal",
"countIf(level = 'Critical') critical",
Expand Down
4 changes: 2 additions & 2 deletions src/view/providers/merges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fn get_columns(is_dialog: bool) -> Vec<&'static str> {
"rows_written",
"memory_usage memory",
"now()-elapsed _create_time",
"tables.uuid::String _table_uuid",
"tables.uuid _table_uuid",
]
} else {
vec![
Expand All @@ -54,7 +54,7 @@ fn get_columns(is_dialog: bool) -> Vec<&'static str> {
"rows_written",
"memory_usage memory",
"now()-elapsed _create_time",
"tables.uuid::String _table_uuid",
"tables.uuid _table_uuid",
]
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/view/providers/part_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,30 +123,30 @@ fn build_query(context: &ContextArc, filters: &FilterParams, is_dialog: bool) ->

let select_clause = if is_dialog {
r#"event_time,
event_type::String event_type,
event_type,
part_name,
merge_algorithm::String merge_algorithm,
merge_algorithm,
part_type,
rows,
size_in_bytes,
duration_ms,
peak_memory_usage,
exception,
table_uuid::String _table_uuid"#
table_uuid _table_uuid"#
} else {
r#"event_time,
event_type::String event_type,
event_type,
database,
table,
part_name,
merge_algorithm::String merge_algorithm,
merge_algorithm,
part_type,
rows,
size_in_bytes,
duration_ms,
peak_memory_usage,
exception,
table_uuid::String _table_uuid"#
table_uuid _table_uuid"#
};

format!(
Expand Down
2 changes: 1 addition & 1 deletion src/view/providers/replicas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl ViewProvider for ReplicasViewProvider {
];

if has_uuid {
columns.push("uuid::String _uuid");
columns.push("uuid _uuid");
}

let (cluster, dbtable, clickhouse, selected_host) = {
Expand Down
4 changes: 2 additions & 2 deletions src/view/providers/table_parts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn build_query(
parts.data_uncompressed_bytes,
parts.modification_time,
parts.active,
tables.uuid::String _table_uuid"#
tables.uuid _table_uuid"#
} else {
r#"parts.database,
parts.table,
Expand All @@ -78,7 +78,7 @@ fn build_query(
parts.data_uncompressed_bytes,
parts.modification_time,
parts.active,
tables.uuid::String _table_uuid"#
tables.uuid _table_uuid"#
};

format!(
Expand Down
2 changes: 1 addition & 1 deletion src/view/providers/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl ViewProvider for TablesViewProvider {
"database",
"table",
"engine",
"uuid::String _uuid",
"uuid _uuid",
"assumeNotNull(total_bytes) total_bytes",
"assumeNotNull(total_rows) total_rows",
];
Expand Down
Loading
Loading