diff --git a/Cargo.lock b/Cargo.lock index cf14abf..df781b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -379,6 +379,7 @@ dependencies = [ "tokio", "unicode-width 0.1.14", "url", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2b2b68a..f7cf061 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,8 @@ libc = { version = "*", default-features = false } size = { version = "*", default-features = false, features = ["std"] } tempfile = { version = "*", default-features = false } url = { version = "*", default-features = false } +# Must match clickhouse-rs (read UUID columns) +uuid = { version = "*", default-features = false } humantime = { version = "*", default-features = false } backtrace = { version = "*", default-features = false, features = ["std"] } futures = { version = "*", default-features = false, features = ["std"] } diff --git a/src/interpreter/clickhouse.rs b/src/interpreter/clickhouse.rs index a178f73..583b1c6 100644 --- a/src/interpreter/clickhouse.rs +++ b/src/interpreter/clickhouse.rs @@ -10,7 +10,7 @@ use chrono::{DateTime, Local}; use chrono_tz::Tz; use clickhouse_rs::{ Block, Options, Pool, - types::{ColumnType, Complex, FromSql}, + types::{ColumnType, Complex, Enum8, Enum16, FromSql, SqlType}, }; use futures_util::StreamExt; use std::collections::HashMap; @@ -23,6 +23,30 @@ use std::str::FromStr; pub type Columns = Block; +// clickhouse-rs reads an Enum column as its bare integer (FromSql drops the name mapping) and a +// UUID as uuid::Uuid, so block.get:: fails on both; resolve the enum name from the Vec +// carried by the column type and format the UUID. String and LowCardinality(String) fall through +// to a plain String read (get coerces LC). +pub fn column_as_string(block: &Block, row: usize, name: &str) -> Result { + let column = block + .columns() + .iter() + .find(|c| c.name() == name) + .ok_or_else(|| Error::msg(format!("Cannot get {name} column")))?; + fn name_of(values: &[(String, T)], v: T) -> String { + values + .iter() + .find(|(_, k)| *k == v) + .map_or_else(|| v.to_string(), |(name, _)| name.clone()) + } + Ok(match column.sql_type() { + SqlType::Enum8(values) => name_of(&values, block.get::(row, name)?.internal()), + SqlType::Enum16(values) => name_of(&values, block.get::(row, name)?.internal()), + SqlType::Uuid => block.get::(row, name)?.to_string(), + _ => block.get::(row, name)?, + }) +} + pub struct ClickHouse { pub quirks: ClickHouseQuirks, // Server has use_shared_merge_tree_log_pipeline enabled (SharedMergeTree-backed system.*_log). @@ -532,7 +556,7 @@ impl ClickHouse { elapsed / {q} AS elapsed, user, initial_user, - ''::String AS exception, + '' AS exception, is_initial_query, is_cancelled, initial_query_id, @@ -1029,9 +1053,9 @@ impl ClickHouse { event_time, event_time_microseconds, thread_id, - level::String AS level, - logger_name::String AS logger_name, - query_id::String AS query_id, + level, + logger_name, + query_id, message FROM {} WHERE @@ -1458,7 +1482,7 @@ impl ClickHouse { fromUnixTimestamp64Nano({start}) AS start_, fromUnixTimestamp64Nano({end}) AS end_ SELECT - event_type::String AS event_type, + event_type, event_time_microseconds, duration_ms, database, @@ -1532,7 +1556,7 @@ impl ClickHouse { {with} SELECT event_time_microseconds, - trace_type::String AS trace_type, + trace_type, cityHash64(trace) AS stack_hash, {host_expr} AS host_name FROM {dbtable} @@ -1602,8 +1626,8 @@ impl ClickHouse { fromUnixTimestamp64Nano({end}) AS end_ SELECT event_time_microseconds, - level::String AS level, - logger_name::String AS logger_name, + level, + logger_name, message, query_id, {host_expr} AS host_name @@ -1867,7 +1891,7 @@ impl ClickHouse { database, table, format, - status::String AS status, + status, bytes, exception, event_time_microseconds, @@ -1934,7 +1958,7 @@ impl ClickHouse { SELECT file_name, rows_processed, - status::String AS status, + status, processing_start_time, processing_end_time, exception @@ -1966,7 +1990,7 @@ impl ClickHouse { table, file_name, rows_processed, - status::String AS status, + status, processing_start_time, processing_end_time, exception @@ -1997,7 +2021,7 @@ impl ClickHouse { fromUnixTimestamp64Nano({start}) AS start_, fromUnixTimestamp64Nano({end}) AS end_ SELECT - event_type::String AS event_type, + event_type, query_id, disk_name, bucket, @@ -2069,11 +2093,11 @@ impl ClickHouse { fromUnixTimestamp64Nano({start}) AS start_, fromUnixTimestamp64Nano({end}) AS end_ SELECT - type::String AS type, + type, -- user/auth_type are Nullable coalesce(user, '') AS user, - coalesce(auth_type::String, '') AS auth_type, - interface::String AS interface, + coalesce(auth_type, '') AS auth_type, + interface, toString(client_address) AS client_address, client_name, failure_reason, @@ -2106,7 +2130,7 @@ impl ClickHouse { event_time, session_id, parent_path, - operation::String AS operation, + operation, count::UInt64 AS count, mapKeys(errors) AS error_names, mapValues(errors) AS error_counts, diff --git a/src/interpreter/perfetto.rs b/src/interpreter/perfetto.rs index 4e05f54..f131f31 100644 --- a/src/interpreter/perfetto.rs +++ b/src/interpreter/perfetto.rs @@ -1,5 +1,5 @@ use crate::interpreter::Query; -use crate::interpreter::clickhouse::{MetricLogRow, QueryMetricRow}; +use crate::interpreter::clickhouse::{MetricLogRow, QueryMetricRow, column_as_string}; use anyhow::Result; use chrono::{DateTime, Local}; use chrono_tz::Tz; @@ -614,7 +614,7 @@ impl PerfettoTraceBuilder { let process_uuid = self.process_track_uuid("Part Log"); for i in 0..columns.row_count() { - let event_type: String = columns.get(i, "event_type").unwrap_or_default(); + let event_type: String = column_as_string(columns, i, "event_type").unwrap_or_default(); let event_time: DateTime = match columns.get(i, "event_time_microseconds") { Ok(v) => v, Err(e) => { @@ -740,7 +740,7 @@ impl PerfettoTraceBuilder { }; for i in 0..columns.row_count() { - let level: String = columns.get(i, "level").unwrap_or_default(); + let level: String = column_as_string(columns, i, "level").unwrap_or_default(); let logger_name: String = columns.get(i, "logger_name").unwrap_or_default(); let message: String = columns.get(i, "message").unwrap_or_default(); let query_id: String = columns.get(i, "query_id").unwrap_or_default(); @@ -854,7 +854,7 @@ impl PerfettoTraceBuilder { let database: String = columns.get(i, "database").unwrap_or_default(); let table: String = columns.get(i, "table").unwrap_or_default(); let format: String = columns.get(i, "format").unwrap_or_default(); - let status: String = columns.get(i, "status").unwrap_or_default(); + let status: String = column_as_string(columns, i, "status").unwrap_or_default(); let bytes: u64 = columns.get(i, "bytes").unwrap_or(0); let exception: String = columns.get(i, "exception").unwrap_or_default(); let query_id: String = columns.get(i, "query_id").unwrap_or_default(); @@ -945,7 +945,7 @@ impl PerfettoTraceBuilder { for i in 0..columns.row_count() { let file_name: String = columns.get(i, "file_name").unwrap_or_default(); let rows_processed: u64 = columns.get(i, "rows_processed").unwrap_or(0); - let status: String = columns.get(i, "status").unwrap_or_default(); + let status: String = column_as_string(columns, i, "status").unwrap_or_default(); let exception: String = columns.get(i, "exception").unwrap_or_default(); let start_ns: u64 = match columns.get::, _>(i, "processing_start_time") { @@ -983,7 +983,7 @@ impl PerfettoTraceBuilder { let table: String = columns.get(i, "table").unwrap_or_default(); let file_name: String = columns.get(i, "file_name").unwrap_or_default(); let rows_processed: u64 = columns.get(i, "rows_processed").unwrap_or(0); - let status: String = columns.get(i, "status").unwrap_or_default(); + let status: String = column_as_string(columns, i, "status").unwrap_or_default(); let exception: String = columns.get(i, "exception").unwrap_or_default(); let start_ns: u64 = match columns.get::, _>(i, "processing_start_time") { @@ -1020,7 +1020,7 @@ impl PerfettoTraceBuilder { let process_uuid = self.process_track_uuid("Blob Storage"); for i in 0..columns.row_count() { - let event_type: String = columns.get(i, "event_type").unwrap_or_default(); + let event_type: String = column_as_string(columns, i, "event_type").unwrap_or_default(); let query_id: String = columns.get(i, "query_id").unwrap_or_default(); let disk_name: String = columns.get(i, "disk_name").unwrap_or_default(); let bucket: String = columns.get(i, "bucket").unwrap_or_default(); @@ -1113,10 +1113,10 @@ impl PerfettoTraceBuilder { let process_uuid = self.process_track_uuid("Sessions"); for i in 0..columns.row_count() { - let session_type: String = columns.get(i, "type").unwrap_or_default(); + let session_type: String = column_as_string(columns, i, "type").unwrap_or_default(); let user: String = columns.get(i, "user").unwrap_or_default(); let auth_type: String = columns.get(i, "auth_type").unwrap_or_default(); - let interface: String = columns.get(i, "interface").unwrap_or_default(); + let interface: String = column_as_string(columns, i, "interface").unwrap_or_default(); let client_address: String = columns.get(i, "client_address").unwrap_or_default(); let client_name: String = columns.get(i, "client_name").unwrap_or_default(); let failure_reason: String = columns.get(i, "failure_reason").unwrap_or_default(); @@ -1159,7 +1159,7 @@ impl PerfettoTraceBuilder { let process_uuid = self.process_track_uuid("ZooKeeper"); for i in 0..columns.row_count() { - let operation: String = columns.get(i, "operation").unwrap_or_default(); + let operation: String = column_as_string(columns, i, "operation").unwrap_or_default(); let count: u64 = columns.get(i, "count").unwrap_or(0); let average_latency: f64 = columns.get(i, "average_latency").unwrap_or(0.0); let parent_path: String = columns.get(i, "parent_path").unwrap_or_default(); @@ -1310,7 +1310,7 @@ impl PerfettoTraceBuilder { /// emitted by finalize_stack_traces() from build(). pub fn add_stack_samples(&mut self, samples: &Block) { for i in 0..samples.row_count() { - let trace_type: String = samples.get(i, "trace_type").unwrap_or_default(); + let trace_type: String = column_as_string(samples, i, "trace_type").unwrap_or_default(); let stack_hash: u64 = samples.get(i, "stack_hash").unwrap_or_default(); let host_name: String = samples.get(i, "host_name").unwrap_or_default(); diff --git a/src/view/providers/backups.rs b/src/view/providers/backups.rs index 71e77f3..7e3f3a7 100644 --- a/src/view/providers/backups.rs +++ b/src/view/providers/backups.rs @@ -24,7 +24,7 @@ impl ViewProvider for BackupsViewProvider { fn show(&self, siv: &mut Cursive, context: ContextArc) { let columns = vec![ "name", - "status::String status", + "status", "error", "start_time", "end_time", diff --git a/src/view/providers/dictionaries.rs b/src/view/providers/dictionaries.rs index 0bbed1f..a9edc74 100644 --- a/src/view/providers/dictionaries.rs +++ b/src/view/providers/dictionaries.rs @@ -19,7 +19,7 @@ impl ViewProvider for DictionariesViewProvider { fn show(&self, siv: &mut Cursive, context: ContextArc) { let columns = vec![ "name", - "status::String status", + "status", "source", "bytes_allocated memory", "query_count queries", diff --git a/src/view/providers/error_log.rs b/src/view/providers/error_log.rs index 955d842..f502d96 100644 --- a/src/view/providers/error_log.rs +++ b/src/view/providers/error_log.rs @@ -26,7 +26,7 @@ impl ViewProvider for ErrorLogViewProvider { } let columns = vec![ - "error::String name", + "error name", "any(code) code", "sum(value) total", "total bar", diff --git a/src/view/providers/logger_names.rs b/src/view/providers/logger_names.rs index 49fa432..7ff06e4 100644 --- a/src/view/providers/logger_names.rs +++ b/src/view/providers/logger_names.rs @@ -38,7 +38,7 @@ impl ViewProvider for LoggerNamesViewProvider { let end = view_options.end; let mut columns = vec![ - "logger_name::String logger_name", + "logger_name", "count() count", "countIf(level = 'Fatal') fatal", "countIf(level = 'Critical') critical", diff --git a/src/view/providers/merges.rs b/src/view/providers/merges.rs index 72d4403..646d93e 100644 --- a/src/view/providers/merges.rs +++ b/src/view/providers/merges.rs @@ -38,7 +38,7 @@ fn get_columns(is_dialog: bool) -> Vec<&'static str> { "rows_written", "memory_usage memory", "now()-elapsed _create_time", - "tables.uuid::String _table_uuid", + "tables.uuid _table_uuid", ] } else { vec![ @@ -54,7 +54,7 @@ fn get_columns(is_dialog: bool) -> Vec<&'static str> { "rows_written", "memory_usage memory", "now()-elapsed _create_time", - "tables.uuid::String _table_uuid", + "tables.uuid _table_uuid", ] } } diff --git a/src/view/providers/part_log.rs b/src/view/providers/part_log.rs index 6294c7e..cb1de37 100644 --- a/src/view/providers/part_log.rs +++ b/src/view/providers/part_log.rs @@ -123,30 +123,30 @@ fn build_query(context: &ContextArc, filters: &FilterParams, is_dialog: bool) -> let select_clause = if is_dialog { r#"event_time, - event_type::String event_type, + event_type, part_name, - merge_algorithm::String merge_algorithm, + merge_algorithm, part_type, rows, size_in_bytes, duration_ms, peak_memory_usage, exception, - table_uuid::String _table_uuid"# + table_uuid _table_uuid"# } else { r#"event_time, - event_type::String event_type, + event_type, database, table, part_name, - merge_algorithm::String merge_algorithm, + merge_algorithm, part_type, rows, size_in_bytes, duration_ms, peak_memory_usage, exception, - table_uuid::String _table_uuid"# + table_uuid _table_uuid"# }; format!( diff --git a/src/view/providers/replicas.rs b/src/view/providers/replicas.rs index a462de4..d1aee37 100644 --- a/src/view/providers/replicas.rs +++ b/src/view/providers/replicas.rs @@ -41,7 +41,7 @@ impl ViewProvider for ReplicasViewProvider { ]; if has_uuid { - columns.push("uuid::String _uuid"); + columns.push("uuid _uuid"); } let (cluster, dbtable, clickhouse, selected_host) = { diff --git a/src/view/providers/table_parts.rs b/src/view/providers/table_parts.rs index 49b5daa..9d1278b 100644 --- a/src/view/providers/table_parts.rs +++ b/src/view/providers/table_parts.rs @@ -66,7 +66,7 @@ fn build_query( parts.data_uncompressed_bytes, parts.modification_time, parts.active, - tables.uuid::String _table_uuid"# + tables.uuid _table_uuid"# } else { r#"parts.database, parts.table, @@ -78,7 +78,7 @@ fn build_query( parts.data_uncompressed_bytes, parts.modification_time, parts.active, - tables.uuid::String _table_uuid"# + tables.uuid _table_uuid"# }; format!( diff --git a/src/view/providers/tables.rs b/src/view/providers/tables.rs index 6edb1d0..20f35a8 100644 --- a/src/view/providers/tables.rs +++ b/src/view/providers/tables.rs @@ -31,7 +31,7 @@ impl ViewProvider for TablesViewProvider { "database", "table", "engine", - "uuid::String _uuid", + "uuid _uuid", "assumeNotNull(total_bytes) total_bytes", "assumeNotNull(total_rows) total_rows", ]; diff --git a/src/view/sql_query_view.rs b/src/view/sql_query_view.rs index e6d8c46..9fba5fa 100644 --- a/src/view/sql_query_view.rs +++ b/src/view/sql_query_view.rs @@ -4,7 +4,10 @@ use std::sync::{Arc, Mutex}; use anyhow::{Result, anyhow}; use size::{Base, SizeFormatter, Style}; -use crate::interpreter::{BackgroundRunner, ContextArc, WorkerEvent, clickhouse::Columns}; +use crate::interpreter::{ + BackgroundRunner, ContextArc, WorkerEvent, + clickhouse::{Columns, column_as_string}, +}; use crate::view::TableViewItem; use crate::view::table_view::TableView; use crate::wrap_impl_no_move; @@ -230,7 +233,6 @@ impl SQLQueryView { .find(|c| c.name() == column) .ok_or(anyhow!("Cannot get {} column", column))?; let field = match sql_column.sql_type() { - SqlType::String => Field::String(block.get::<_, _>(i, column)?), SqlType::Float64 => Field::Float64(block.get::<_, _>(i, column)?), SqlType::Float32 => Field::Float32(block.get::<_, _>(i, column)?), SqlType::UInt64 => Field::UInt64(block.get::<_, _>(i, column)?), @@ -246,7 +248,8 @@ impl SQLQueryView { .get::, _>(i, column)? .with_timezone(&Local), ), - _ => unreachable!("Type for column {} not implemented", column), + // String, LowCardinality(String), Enum8/16 and UUID all render as text + _ => Field::String(column_as_string(&block, i, column)?), }; row.0.push(field); } diff --git a/src/view/text_log_view.rs b/src/view/text_log_view.rs index a188776..263bc4f 100644 --- a/src/view/text_log_view.rs +++ b/src/view/text_log_view.rs @@ -6,6 +6,7 @@ use chrono_tz::Tz; use cursive::view::ViewWrapper; use crate::common::RelativeDateTime; +use crate::interpreter::clickhouse::column_as_string; use crate::interpreter::{BackgroundRunner, ContextArc, TextLogArguments, WorkerEvent}; use crate::view::{LogEntry, LogView}; use crate::wrap_impl_no_move; @@ -145,7 +146,7 @@ impl TextLogView { .get::, _>(i, "event_time_microseconds")? .with_timezone(&Local), thread_id: logs_block.get::<_, _>(i, "thread_id")?, - level: logs_block.get::<_, _>(i, "level")?, + level: column_as_string(&logs_block, i, "level")?, message: logs_block.get::<_, _>(i, "message")?, query_id: logs_block.get::<_, _>(i, "query_id").ok(), logger_name: logs_block.get::<_, _>(i, "logger_name").ok(), diff --git a/tests/integration.rs b/tests/integration.rs index 98b60d2..d8742ae 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -2,7 +2,7 @@ mod common; use chdig::common::RelativeDateTime; use chdig::interpreter::clickhouse::{ - TraceType, parse_metric_log_block, parse_query_metric_log_block, + TraceType, column_as_string, parse_metric_log_block, parse_query_metric_log_block, }; use chdig::interpreter::options::ClickHouseOptions; use chdig::interpreter::{ClickHouse, TextLogArguments}; @@ -363,7 +363,7 @@ async fn test_text_log() { .await; assert_eq!(blocks.iter().map(|b| b.row_count()).sum::(), 1); assert_eq!( - blocks[0].get::(0, "level").unwrap(), + column_as_string(&blocks[0], 0, "level").unwrap(), "Information" ); @@ -450,7 +450,7 @@ async fn test_stack_traces_for_perfetto() { stack_traces_for_perfetto(Some(&["it-stack-1".to_string()]), start, end) ); assert_eq!(samples.row_count(), 1); - assert_eq!(samples.get::(0, "trace_type").unwrap(), "CPU"); + assert_eq!(column_as_string(&samples, 0, "trace_type").unwrap(), "CPU"); assert_eq!(stacks.row_count(), 1); assert_eq!( stacks.get::, _>(0, "stack").unwrap(), @@ -612,7 +612,10 @@ async fn test_part_log_for_perfetto() { part_log_for_perfetto(Some(&["it-part-1".to_string()]), start, end) ); assert_eq!(block.row_count(), 1); - assert_eq!(block.get::(0, "event_type").unwrap(), "NewPart"); + assert_eq!( + column_as_string(&block, 0, "event_type").unwrap(), + "NewPart" + ); assert_eq!(block.get::(0, "part_name").unwrap(), "all_1_1_0"); assert_eq!(block.get::(0, "rows").unwrap(), 100); } @@ -669,7 +672,7 @@ async fn test_asynchronous_insert_log_for_perfetto() { let block = fetch_streamed!(chdig, asynchronous_insert_log_for_perfetto(start, end)); let rows = find_rows(&block, "query_id", "it-ai-1"); assert_eq!(rows.len(), 1); - assert_eq!(block.get::(rows[0], "status").unwrap(), "Ok"); + assert_eq!(column_as_string(&block, rows[0], "status").unwrap(), "Ok"); assert_eq!(block.get::(rows[0], "table").unwrap(), "it_ai"); } @@ -745,10 +748,13 @@ async fn test_session_log_for_perfetto() { let rows = find_rows(&block, "user", "it_sess"); assert_eq!(rows.len(), 1); assert_eq!( - block.get::(rows[0], "type").unwrap(), + column_as_string(&block, rows[0], "type").unwrap(), "LoginSuccess" ); - assert_eq!(block.get::(rows[0], "interface").unwrap(), "TCP"); + assert_eq!( + column_as_string(&block, rows[0], "interface").unwrap(), + "TCP" + ); } async fn test_background_schedule_pool_log() { @@ -884,7 +890,7 @@ async fn test_s3_queue_log_for_perfetto() { let rows = find_rows(&block, "file_name", "it.csv"); assert_eq!(rows.len(), 1); assert_eq!( - block.get::(rows[0], "status").unwrap(), + column_as_string(&block, rows[0], "status").unwrap(), "Processed" ); assert_eq!(block.get::(rows[0], "rows_processed").unwrap(), 5); @@ -932,7 +938,10 @@ async fn test_aggregated_zookeeper_log_for_perfetto() { let block = fetch_streamed!(chdig, aggregated_zookeeper_log_for_perfetto(start, end)); let rows = find_rows(&block, "parent_path", "/it"); assert_eq!(rows.len(), 1); - assert_eq!(block.get::(rows[0], "operation").unwrap(), "Get"); + assert_eq!( + column_as_string(&block, rows[0], "operation").unwrap(), + "Get" + ); assert_eq!(block.get::(rows[0], "count").unwrap(), 3); }