Skip to content

Commit 4297553

Browse files
genezhangCopilot
andauthored
feat: Result export to Parquet/CSV/JSON/NDJSON for embedded mode (#181)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent e65a199 commit 4297553

7 files changed

Lines changed: 678 additions & 0 deletions

File tree

clickgraph-embedded/src/connection.rs

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use clickgraph::graph_catalog::graph_schema::GraphSchema;
1010

1111
use super::database::Database;
1212
use super::error::EmbeddedError;
13+
use super::export::{build_export_sql, ExportOptions};
1314
use super::query_result::QueryResult;
1415
use super::value::Value;
1516

@@ -85,6 +86,81 @@ impl<'db> Connection<'db> {
8586
})
8687
}
8788

89+
/// Export Cypher query results to a file.
90+
///
91+
/// Translates the Cypher query to SQL, wraps it in
92+
/// `INSERT INTO FUNCTION file(...)`, and executes via chdb.
93+
/// The file is written directly by chdb — results are streamed to disk
94+
/// without buffering the full result set in memory.
95+
///
96+
/// # Example
97+
///
98+
/// ```no_run
99+
/// # use clickgraph_embedded::{Database, Connection, SystemConfig, ExportOptions};
100+
/// # let db = Database::new("schema.yaml", SystemConfig::default()).unwrap();
101+
/// # let conn = Connection::new(&db).unwrap();
102+
/// // Auto-detect format from extension
103+
/// conn.export("MATCH (u:User) RETURN u.name", "users.parquet", ExportOptions::default()).unwrap();
104+
///
105+
/// // CSV with explicit options
106+
/// conn.export("MATCH (u:User) RETURN u.name", "users.csv", ExportOptions::default()).unwrap();
107+
/// ```
108+
pub fn export(
109+
&self,
110+
cypher: &str,
111+
output_path: &str,
112+
options: ExportOptions,
113+
) -> Result<(), EmbeddedError> {
114+
self.db
115+
.runtime
116+
.block_on(self.export_async(cypher, output_path, options))
117+
}
118+
119+
/// Generate the export SQL without executing it (for debugging).
120+
pub fn export_to_sql(
121+
&self,
122+
cypher: &str,
123+
output_path: &str,
124+
options: ExportOptions,
125+
) -> Result<String, EmbeddedError> {
126+
let select_sql = self.query_to_sql(cypher)?;
127+
build_export_sql(&select_sql, output_path, &options).map_err(EmbeddedError::Query)
128+
}
129+
130+
async fn export_async(
131+
&self,
132+
cypher: &str,
133+
output_path: &str,
134+
options: ExportOptions,
135+
) -> Result<(), EmbeddedError> {
136+
use clickgraph::clickhouse_query_generator::cypher_to_sql;
137+
use clickgraph::server::query_context::{
138+
set_current_schema, with_query_context, QueryContext,
139+
};
140+
141+
let schema = Arc::clone(&self.schema);
142+
let executor = Arc::clone(&self.executor);
143+
let cypher = cypher.to_string();
144+
let output_path = output_path.to_string();
145+
146+
with_query_context(QueryContext::new(None), async move {
147+
set_current_schema(Arc::clone(&schema));
148+
149+
let select_sql = cypher_to_sql(&cypher, &schema, 100).map_err(EmbeddedError::Query)?;
150+
let export_sql = build_export_sql(&select_sql, &output_path, &options)
151+
.map_err(EmbeddedError::Query)?;
152+
153+
// Execute the INSERT INTO FUNCTION file(...) — no result rows expected
154+
executor
155+
.execute_text(&export_sql, "TabSeparated", None)
156+
.await
157+
.map_err(EmbeddedError::from)?;
158+
159+
Ok(())
160+
})
161+
.await
162+
}
163+
88164
async fn query_async(&self, cypher: &str) -> Result<QueryResult, EmbeddedError> {
89165
use clickgraph::clickhouse_query_generator::cypher_to_sql;
90166
use clickgraph::server::query_context::{
@@ -236,4 +312,78 @@ graph_schema:
236312
let result = conn.query_to_sql("NOT VALID CYPHER @@@@");
237313
assert!(result.is_err(), "invalid Cypher should return error");
238314
}
315+
316+
#[test]
317+
fn test_export_to_sql_parquet() {
318+
use crate::export::ExportOptions;
319+
let db = make_stub_db();
320+
let conn = Connection::new(&db).unwrap();
321+
let sql = conn
322+
.export_to_sql(
323+
"MATCH (u:User) RETURN u.name",
324+
"output.parquet",
325+
ExportOptions::default(),
326+
)
327+
.expect("should generate export SQL");
328+
assert!(
329+
sql.starts_with("INSERT INTO FUNCTION file('output.parquet', 'Parquet')"),
330+
"should wrap in INSERT INTO FUNCTION file: {}",
331+
sql
332+
);
333+
assert!(sql.contains("full_name"), "property mapping should apply");
334+
}
335+
336+
#[test]
337+
fn test_export_to_sql_csv() {
338+
use crate::export::ExportOptions;
339+
let db = make_stub_db();
340+
let conn = Connection::new(&db).unwrap();
341+
let sql = conn
342+
.export_to_sql(
343+
"MATCH (u:User) RETURN u.name",
344+
"results.csv",
345+
ExportOptions::default(),
346+
)
347+
.expect("should generate export SQL");
348+
assert!(
349+
sql.contains("CSVWithNames"),
350+
"CSV should include header: {}",
351+
sql
352+
);
353+
}
354+
355+
#[test]
356+
fn test_export_to_sql_explicit_format() {
357+
use crate::export::{ExportFormat, ExportOptions};
358+
let db = make_stub_db();
359+
let conn = Connection::new(&db).unwrap();
360+
let opts = ExportOptions {
361+
format: Some(ExportFormat::JSONEachRow),
362+
..Default::default()
363+
};
364+
let sql = conn
365+
.export_to_sql("MATCH (u:User) RETURN u.name", "data.txt", opts)
366+
.expect("should generate export SQL");
367+
assert!(
368+
sql.contains("JSONEachRow"),
369+
"explicit format should apply: {}",
370+
sql
371+
);
372+
}
373+
374+
#[test]
375+
fn test_export_to_sql_unknown_extension() {
376+
use crate::export::ExportOptions;
377+
let db = make_stub_db();
378+
let conn = Connection::new(&db).unwrap();
379+
let result = conn.export_to_sql(
380+
"MATCH (u:User) RETURN u.name",
381+
"output.xyz",
382+
ExportOptions::default(),
383+
);
384+
assert!(
385+
result.is_err(),
386+
"unknown extension without format should error"
387+
);
388+
}
239389
}

0 commit comments

Comments
 (0)