forked from darshanDevrai/brahmand
-
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathdatabase.rs
More file actions
346 lines (310 loc) · 12.7 KB
/
database.rs
File metadata and controls
346 lines (310 loc) · 12.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
//! `Database` — the top-level handle for an embedded ClickGraph database.
//!
//! Analogous to `kuzu::Database`. Holds the schema and the chdb executor.
//! Created once; multiple `Connection`s can be created from a single `Database`.
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_trait::async_trait;
#[cfg(feature = "embedded")]
use clickgraph::executor::chdb_embedded::ChdbExecutor;
#[cfg(feature = "embedded")]
pub use clickgraph::executor::chdb_embedded::StorageCredentials;
use clickgraph::executor::remote::RemoteClickHouseExecutor;
use clickgraph::executor::{ExecutorError, QueryExecutor};
use clickgraph::graph_catalog::config::GraphSchemaConfig;
use clickgraph::graph_catalog::graph_schema::GraphSchema;
use clickgraph::server::connection_pool::RoleConnectionPool;
use super::error::EmbeddedError;
/// Default maximum CTE recursion depth for remote ClickHouse queries.
const DEFAULT_REMOTE_MAX_CTE_DEPTH: u32 = 100;
/// Configuration for connecting to a remote ClickHouse cluster.
///
/// When provided in `SystemConfig`, enables `Connection::query_remote()` and
/// `Connection::query_remote_graph()` to execute Cypher queries against a
/// remote ClickHouse instance while storing results locally via chdb.
#[derive(Clone)]
pub struct RemoteConfig {
/// ClickHouse HTTP endpoint URL (e.g., `"http://ch-cluster:8123"`).
pub url: String,
/// ClickHouse username.
pub user: String,
/// ClickHouse password.
pub password: String,
/// Database name. Defaults to `"default"` if `None`.
pub database: Option<String>,
/// Cluster name for multi-node round-robin. If `None`, single-node mode.
pub cluster_name: Option<String>,
}
impl std::fmt::Debug for RemoteConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RemoteConfig")
.field("url", &self.url)
.field("user", &self.user)
.field("password", &"********")
.field("database", &self.database)
.field("cluster_name", &self.cluster_name)
.finish()
}
}
/// Configuration for an embedded database session.
///
/// Mirrors `kuzu::SystemConfig`.
#[derive(Debug, Clone, Default)]
pub struct SystemConfig {
/// Directory where chdb stores its session data.
/// Defaults to a temporary directory (auto-cleaned on drop).
pub session_dir: Option<PathBuf>,
/// Base directory for resolving relative `source:` paths in the schema.
/// If `None`, relative paths are resolved from the current working directory.
/// Reserved for future use -- not yet wired into source resolution.
pub data_dir: Option<PathBuf>,
/// Maximum number of threads for chdb query execution.
/// `None` uses the chdb default (typically number of CPU cores).
pub max_threads: Option<usize>,
/// Maximum memory a single query may use, in bytes.
/// `None` uses the chdb/ClickHouse default (no cap).
/// Set this in test environments to prevent runaway memory usage.
pub max_memory_usage_bytes: Option<u64>,
/// Storage credentials for remote sources (S3, GCS, Azure Blob, Iceberg).
///
/// Applied as ClickHouse session-level `SET` commands before any VIEWs are
/// created, so they apply automatically to every `s3()` / `iceberg()` /
/// `deltaLake()` call inside the session.
///
/// If all fields are `None` (the default), chdb falls back to environment
/// variables (`AWS_ACCESS_KEY_ID`, etc.) or instance-profile credentials.
#[cfg(feature = "embedded")]
pub credentials: StorageCredentials,
/// Optional remote ClickHouse connection for hybrid query + local storage.
///
/// When set, `Connection::query_remote()` and `query_remote_graph()` execute
/// Cypher queries on the remote cluster. Results can then be stored locally
/// via `store_subgraph()` for fast re-querying.
pub remote: Option<RemoteConfig>,
}
/// An embedded ClickGraph database.
///
/// # Example
///
/// ```no_run
/// use clickgraph_embedded::{Database, SystemConfig};
///
/// let db = Database::new("schema.yaml", SystemConfig::default()).unwrap();
/// ```
pub struct Database {
pub(crate) executor: Arc<dyn QueryExecutor>,
/// Optional remote ClickHouse executor for hybrid query + local storage.
pub(crate) remote_executor: Option<Arc<dyn QueryExecutor>>,
pub(crate) schema: Arc<GraphSchema>,
/// Shared Tokio runtime for blocking `Connection::query()` calls.
/// Created once, reused by all connections -- avoids per-call overhead.
pub(crate) runtime: tokio::runtime::Runtime,
}
impl Database {
/// Open a database using a YAML schema file (requires `embedded` feature for chdb).
///
/// Loads the schema, creates a chdb session, and:
/// - Creates VIEWs for schema entries WITH a `source:` field
/// - Creates writable ReplacingMergeTree tables for entries WITHOUT `source:`
#[cfg(feature = "embedded")]
pub fn new(schema_path: impl AsRef<Path>, config: SystemConfig) -> Result<Self, EmbeddedError> {
let graph_schema = load_graph_schema(schema_path.as_ref())?;
Self::from_schema(Arc::new(graph_schema), config)
}
/// Open an in-memory database using a YAML schema file (requires `embedded` feature).
///
/// Equivalent to `new()` with a temporary session directory that is
/// automatically cleaned up when the `Database` is dropped.
#[cfg(feature = "embedded")]
pub fn in_memory(
schema_path: impl AsRef<Path>,
config: SystemConfig,
) -> Result<Self, EmbeddedError> {
let config = SystemConfig {
session_dir: None,
..config
};
Self::new(schema_path, config)
}
/// Open a database backed by an existing `GraphSchema` and a chdb session
/// (requires `embedded` feature).
#[cfg(feature = "embedded")]
pub fn from_schema(
schema: Arc<GraphSchema>,
config: SystemConfig,
) -> Result<Self, EmbeddedError> {
let runtime = build_runtime()?;
let (session_dir, auto_cleanup) = match config.session_dir {
Some(dir) => (dir, false),
None => {
let tmp =
std::env::temp_dir().join(format!("clickgraph-{}", pseudo_random_suffix()));
(tmp, true)
}
};
let executor =
ChdbExecutor::new_with_credentials(&session_dir, auto_cleanup, &config.credentials)
.map_err(|e| EmbeddedError::Executor(e.to_string()))?;
if let Some(threads) = config.max_threads {
executor
.execute_blocking_ddl(&format!("SET max_threads = {threads}"))
.map_err(|e| EmbeddedError::Executor(e.to_string()))?;
}
if let Some(bytes) = config.max_memory_usage_bytes {
executor
.execute_blocking_ddl(&format!("SET max_memory_usage = {bytes}"))
.map_err(|e| EmbeddedError::Executor(e.to_string()))?;
}
let view_count = clickgraph::executor::data_loader::load_schema_sources(&executor, &schema)
.map_err(|e| EmbeddedError::Executor(e.to_string()))?;
if view_count > 0 {
log::info!(
"Created {} chdb VIEW(s) from schema source: entries",
view_count
);
}
let table_count =
clickgraph::executor::data_loader::create_writable_tables(&executor, &schema)
.map_err(|e| EmbeddedError::Executor(e.to_string()))?;
if table_count > 0 {
log::info!(
"Created {} writable ReplacingMergeTree table(s)",
table_count
);
}
let remote_executor = Self::build_remote_executor(&runtime, config.remote.as_ref())?;
Ok(Database {
executor: Arc::new(executor),
remote_executor,
schema,
runtime,
})
}
/// Open a database connected to a remote ClickHouse cluster (no chdb needed).
///
/// Cypher is translated to SQL locally and executed on the remote ClickHouse.
/// Use `Connection::query_remote()` to run queries.
pub fn new_remote(
schema_path: impl AsRef<Path>,
remote: RemoteConfig,
) -> Result<Self, EmbeddedError> {
let graph_schema = load_graph_schema(schema_path.as_ref())?;
let runtime = build_runtime()?;
let remote_executor =
Self::build_remote_executor(&runtime, Some(&remote))?.ok_or_else(|| {
EmbeddedError::Executor("Failed to connect to remote ClickHouse".to_string())
})?;
Ok(Database {
executor: Arc::new(NullExecutor),
remote_executor: Some(remote_executor),
schema: Arc::new(graph_schema),
runtime,
})
}
/// Build a remote executor from an optional `RemoteConfig`.
fn build_remote_executor(
runtime: &tokio::runtime::Runtime,
remote: Option<&RemoteConfig>,
) -> Result<Option<Arc<dyn QueryExecutor>>, EmbeddedError> {
let Some(remote) = remote else {
return Ok(None);
};
let pool = runtime
.block_on(RoleConnectionPool::new_with_params(
&remote.url,
&remote.user,
&remote.password,
remote.database.as_deref(),
remote.cluster_name.as_deref(),
DEFAULT_REMOTE_MAX_CTE_DEPTH,
))
.map_err(EmbeddedError::Executor)?;
log::info!("Remote ClickHouse executor initialized: {}", remote.url);
Ok(Some(
Arc::new(RemoteClickHouseExecutor::new(Arc::new(pool))) as Arc<dyn QueryExecutor>,
))
}
/// Return a reference to the graph schema.
pub fn schema(&self) -> &Arc<GraphSchema> {
&self.schema
}
/// Create a `Database` from a pre-built schema and executor.
///
/// Primarily intended for testing -- allows injection of a custom executor
/// (e.g. a stub) without needing a chdb session.
pub fn from_executor(
schema: Arc<GraphSchema>,
executor: Arc<dyn QueryExecutor>,
) -> Result<Self, EmbeddedError> {
Ok(Database {
executor,
remote_executor: None,
schema,
runtime: build_runtime()?,
})
}
/// Open a database in SQL-only mode -- schema loaded, no chdb session.
///
/// This mode supports `query_to_sql()` and `export_to_sql()` for
/// Cypher -> SQL translation without requiring the chdb native library.
/// Calling `query()` or `export()` will return an error.
///
/// Useful for testing, debugging, and build-time SQL validation.
pub fn sql_only(schema_path: impl AsRef<Path>) -> Result<Self, EmbeddedError> {
let graph_schema = load_graph_schema(schema_path.as_ref())?;
Self::from_executor(Arc::new(graph_schema), Arc::new(NullExecutor))
}
}
/// A no-op executor for SQL-only mode. Returns an error if execution is attempted.
struct NullExecutor;
#[async_trait]
impl QueryExecutor for NullExecutor {
async fn execute_json(
&self,
_sql: &str,
_role: Option<&str>,
) -> Result<Vec<serde_json::Value>, ExecutorError> {
Err(ExecutorError::QueryFailed(
"Cannot execute queries in sql_only mode -- no backend is configured".to_string(),
))
}
async fn execute_text(
&self,
_sql: &str,
_format: &str,
_role: Option<&str>,
) -> Result<String, ExecutorError> {
Err(ExecutorError::QueryFailed(
"Cannot execute queries in sql_only mode -- no backend is configured".to_string(),
))
}
}
/// Load and parse a YAML schema file into a `GraphSchema`.
fn load_graph_schema(schema_path: &Path) -> Result<GraphSchema, EmbeddedError> {
let yaml_content = std::fs::read_to_string(schema_path).map_err(|e| {
EmbeddedError::Io(format!(
"Cannot read schema '{}': {}",
schema_path.display(),
e
))
})?;
let schema_config: GraphSchemaConfig = serde_yaml::from_str(&yaml_content)
.map_err(|e| EmbeddedError::Schema(format!("YAML parse error: {}", e)))?;
schema_config
.to_graph_schema()
.map_err(|e| EmbeddedError::Schema(format!("Schema build error: {}", e)))
}
/// Build a single-threaded Tokio runtime for blocking `Connection` calls.
fn build_runtime() -> Result<tokio::runtime::Runtime, EmbeddedError> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| EmbeddedError::Query(format!("Failed to create runtime: {}", e)))
}
fn pseudo_random_suffix() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| format!("{}", d.subsec_nanos()))
.unwrap_or_else(|_| "0".to_string())
}