Skip to content

Commit 25f3da7

Browse files
genezhangCopilot
andauthored
feat: Add APOC export procedures (Neo4j-compatible) (#182)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 4297553 commit 25f3da7

10 files changed

Lines changed: 1064 additions & 24 deletions

File tree

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
### 🚀 Features
44

5+
- **APOC Export Procedures**: Neo4j-compatible `CALL apoc.export.{csv|json|parquet}.query(cypher, destination, config)` for exporting query results. Supports local files, S3, GCS, Azure, and HTTP destinations. Works in HTTP server, Bolt protocol, and embedded mode.
6+
- **Destination resolver**: Maps URI schemes to ClickHouse `INSERT INTO FUNCTION` table functions (`file()`, `s3()`, `url()`, `azureBlobStorage()`)
7+
- **Parser fix**: Standalone CALL with positional args now correctly parsed even when inner Cypher contains RETURN/UNION keywords
8+
- **Config**: Parquet compression codecs (snappy, gzip, lz4, zstd, brotli)
9+
510
- **Embedded mode** (PR #179): Run Cypher graph queries entirely in-process via [chdb](https://github.com/chdb-io/chdb) — no external ClickHouse server required. Supports Parquet, CSV, Iceberg, Delta Lake, and S3-compatible storage.
611
- **`QueryExecutor` trait**: Abstracts SQL execution; `RemoteClickHouseExecutor` (existing) and `ChdbExecutor` (new) are the two backends. Default behaviour is unchanged.
712
- **`clickgraph-embedded` crate**: Kuzu-compatible Rust library API — `Database::new(schema, config)`, `Connection::new(&db)`, `conn.query(cypher)`, `result.next()``Row`.

STATUS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ Supports both remote ClickHouse and embedded (in-process) mode via chdb.
5454
- **GraphRAG structured output**: `format: "Graph"` returns deduplicated nodes, edges, and stats
5555
- **ClickHouse cluster load balancing**: `CLICKHOUSE_CLUSTER` for auto-discovery and load balancing
5656
- **Embedded mode** (`--features embedded`): `QueryExecutor` trait + `ChdbExecutor` + `clickgraph-embedded` crate — run Cypher queries in-process over Parquet/Iceberg/Delta/S3 without a ClickHouse server. Kuzu-compatible Rust API (`Database`, `Connection`, `QueryResult`). `source:` URI field in YAML schema. S3/GCS/Azure credential support via `StorageCredentials`.
57+
- **APOC Export Procedures**: Neo4j-compatible `CALL apoc.export.{csv|json|parquet}.query(cypher, destination, config)` — translates inner Cypher to SQL, resolves destination URI (local file, S3, GCS, Azure, HTTP), wraps in `INSERT INTO FUNCTION`. Works in server mode (HTTP + Bolt) and embedded mode.
5758

5859
## Current Limitations
5960

clickgraph-embedded/src/connection.rs

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,107 @@ impl<'db> Connection<'db> {
161161
.await
162162
}
163163

164+
/// Handle `CALL apoc.export.{csv|json|parquet}.query(...)` in embedded mode.
165+
///
166+
/// Parses arguments, translates inner Cypher → SQL, builds export SQL, executes.
167+
/// Returns a single-row result with export status.
168+
async fn handle_export_call(&self, cypher: &str) -> Result<QueryResult, EmbeddedError> {
169+
use clickgraph::clickhouse_query_generator::cypher_to_sql;
170+
use clickgraph::open_cypher_parser;
171+
use clickgraph::open_cypher_parser::ast::CypherStatement;
172+
use clickgraph::procedures::apoc_export;
173+
use clickgraph::server::query_context::{
174+
set_current_schema, with_query_context, QueryContext,
175+
};
176+
177+
let schema = Arc::clone(&self.schema);
178+
let executor = Arc::clone(&self.executor);
179+
let cypher = cypher.to_string();
180+
181+
with_query_context(QueryContext::new(None), async move {
182+
set_current_schema(Arc::clone(&schema));
183+
184+
// Parse the CALL statement
185+
let (_, stmt) = open_cypher_parser::parse_cypher_statement(&cypher)
186+
.map_err(|e| EmbeddedError::Query(format!("Parse error: {}", e)))?;
187+
188+
// Extract procedure name and arguments
189+
let (proc_name, expressions): (String, Vec<_>) = match &stmt {
190+
CypherStatement::ProcedureCall(pc) => {
191+
(pc.procedure_name.to_string(), pc.arguments.iter().collect())
192+
}
193+
CypherStatement::Query { query, .. } => {
194+
let cc = query
195+
.call_clause
196+
.as_ref()
197+
.ok_or_else(|| EmbeddedError::Query("No CALL clause found".to_string()))?;
198+
(
199+
cc.procedure_name.to_string(),
200+
cc.arguments.iter().map(|a| &a.value).collect(),
201+
)
202+
}
203+
};
204+
205+
let ch_format = apoc_export::format_from_procedure_name(&proc_name)
206+
.map_err(EmbeddedError::Query)?;
207+
208+
let args =
209+
apoc_export::parse_export_call(&expressions).map_err(EmbeddedError::Query)?;
210+
211+
// Translate inner Cypher → SQL
212+
let inner_sql =
213+
cypher_to_sql(&args.cypher_query, &schema, 100).map_err(EmbeddedError::Query)?;
214+
215+
// Build export SQL using the full destination resolver
216+
let export_sql = apoc_export::build_export_sql(
217+
&inner_sql,
218+
&args.destination,
219+
ch_format,
220+
&args.config,
221+
)
222+
.map_err(EmbeddedError::Query)?;
223+
224+
// Execute
225+
executor
226+
.execute_text(&export_sql, "TabSeparated", None)
227+
.await
228+
.map_err(EmbeddedError::from)?;
229+
230+
// Return status as a single-row result
231+
let columns = vec![
232+
"file".to_string(),
233+
"format".to_string(),
234+
"source".to_string(),
235+
];
236+
let rows = vec![vec![
237+
Value::String(args.destination),
238+
Value::String(ch_format.to_string()),
239+
Value::String(args.cypher_query),
240+
]];
241+
Ok(QueryResult::new(columns, rows))
242+
})
243+
.await
244+
}
245+
164246
async fn query_async(&self, cypher: &str) -> Result<QueryResult, EmbeddedError> {
247+
// Intercept CALL apoc.export.* — parse first to avoid false positives
248+
if let Ok((_, stmt)) = clickgraph::open_cypher_parser::parse_cypher_statement(cypher) {
249+
let proc_name = match &stmt {
250+
clickgraph::open_cypher_parser::ast::CypherStatement::ProcedureCall(pc) => {
251+
Some(pc.procedure_name.to_string())
252+
}
253+
clickgraph::open_cypher_parser::ast::CypherStatement::Query { query, .. } => query
254+
.call_clause
255+
.as_ref()
256+
.map(|cc| cc.procedure_name.to_string()),
257+
};
258+
if let Some(name) = proc_name {
259+
if clickgraph::procedures::apoc_export::is_export_procedure(&name) {
260+
return self.handle_export_call(cypher).await;
261+
}
262+
}
263+
}
264+
165265
use clickgraph::clickhouse_query_generator::cypher_to_sql;
166266
use clickgraph::server::query_context::{
167267
set_current_schema, with_query_context, QueryContext,
@@ -386,4 +486,43 @@ graph_schema:
386486
"unknown extension without format should error"
387487
);
388488
}
489+
490+
#[test]
491+
fn test_call_export_via_query() {
492+
// Verify that CALL apoc.export.*.query() is intercepted and routed
493+
// to the export handler (returns a status result, not SQL error)
494+
let db = make_stub_db();
495+
let conn = Connection::new(&db).unwrap();
496+
let result = conn.query(
497+
r#"CALL apoc.export.parquet.query("MATCH (u:User) RETURN u.name", "/tmp/users.parquet", {})"#,
498+
);
499+
// With stub executor, this should succeed (StubExecutor returns empty string)
500+
assert!(
501+
result.is_ok(),
502+
"CALL export should be handled: {:?}",
503+
result.err()
504+
);
505+
let qr = result.unwrap();
506+
assert_eq!(qr.get_column_names(), &["file", "format", "source"]);
507+
}
508+
509+
#[test]
510+
fn test_call_export_csv_via_query() {
511+
let db = make_stub_db();
512+
let conn = Connection::new(&db).unwrap();
513+
let result = conn.query(
514+
r#"CALL apoc.export.csv.query("MATCH (u:User) RETURN u.name", "/tmp/users.csv", {})"#,
515+
);
516+
assert!(result.is_ok(), "CSV export should work: {:?}", result.err());
517+
}
518+
519+
#[test]
520+
fn test_call_export_s3_destination() {
521+
let db = make_stub_db();
522+
let conn = Connection::new(&db).unwrap();
523+
let result = conn.query(
524+
r#"CALL apoc.export.json.query("MATCH (u:User) RETURN u.name", "s3://mybucket/users.json", {})"#,
525+
);
526+
assert!(result.is_ok(), "S3 export should work: {:?}", result.err());
527+
}
389528
}

docs/wiki/Cypher-Language-Reference.md

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ Complete syntax reference for Cypher queries supported by ClickGraph.
3232
- [System Procedures](#system-procedures)**NEW**
3333
- [Schema Metadata](#schema-metadata-procedures)
3434
- [Graph Algorithms](#graph-algorithms)
35+
- [Export Procedures (APOC-Compatible)](#export-procedures-apoc-compatible)
3536
- [Query Examples](#query-examples)
3637

3738
---
@@ -2231,7 +2232,52 @@ LIMIT 10
22312232

22322233
---
22332234

2234-
## Query Examples
2235+
### Export Procedures (APOC-Compatible)
2236+
2237+
Export query results to files or cloud storage using Neo4j APOC-compatible syntax.
2238+
2239+
**Syntax:**
2240+
```cypher
2241+
CALL apoc.export.{format}.query(cypher_query, destination, config)
2242+
```
2243+
2244+
**Formats:**
2245+
| Procedure | ClickHouse Format |
2246+
|-----------|------------------|
2247+
| `apoc.export.csv.query` | CSVWithNames |
2248+
| `apoc.export.json.query` | JSONEachRow |
2249+
| `apoc.export.parquet.query` | Parquet |
2250+
2251+
**Destination URIs:**
2252+
| Scheme | Example |
2253+
|--------|---------|
2254+
| Local file | `"/tmp/users.parquet"` or `"./output.csv"` |
2255+
| S3 | `"s3://bucket/path/file.parquet"` |
2256+
| GCS | `"gs://bucket/path/file.json"` |
2257+
| Azure | `"azure://container/path/file.csv"` |
2258+
| HTTP | `"https://webhook.example.com/ingest"` |
2259+
2260+
**Examples:**
2261+
```cypher
2262+
-- Export to local Parquet file
2263+
CALL apoc.export.parquet.query("MATCH (u:User) RETURN u.name, u.email", "/tmp/users.parquet", {})
2264+
2265+
-- Export to CSV
2266+
CALL apoc.export.csv.query("MATCH (u:User)-[:FOLLOWS]->(f:User) RETURN u.name, f.name", "follows.csv", {})
2267+
2268+
-- Export to S3 with compression
2269+
CALL apoc.export.parquet.query("MATCH (u:User) RETURN u.name", "s3://my-bucket/users.parquet", {compression: "zstd"})
2270+
2271+
-- Export to JSON
2272+
CALL apoc.export.json.query("MATCH (u:User) WHERE u.country = 'US' RETURN u", "us_users.json", {})
2273+
```
2274+
2275+
**Config options:**
2276+
- `compression`: Parquet codec — `"none"`, `"snappy"`, `"gzip"`, `"lz4"`, `"zstd"`, `"brotli"`
2277+
2278+
> **Note**: Works in all modes — HTTP server, Bolt protocol, and embedded mode.
2279+
2280+
---
22352281

22362282
### Simple Queries
22372283

docs/wiki/Embedded-Mode.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,30 @@ print(conn.export_to_sql("MATCH (u:User) RETURN u.name", "users.parquet"))
177177

178178
Supported file extensions: `.parquet` / `.pq`, `.csv`, `.tsv`, `.json`, `.ndjson` / `.jsonl`
179179

180+
**APOC Export Procedures (Neo4j-compatible):**
181+
182+
Use Neo4j APOC-style `CALL` syntax for exports with flexible destinations:
183+
184+
```python
185+
conn = db.connect()
186+
187+
# Export to local file
188+
conn.query('CALL apoc.export.parquet.query("MATCH (u:User) RETURN u.name", "/tmp/users.parquet", {})')
189+
190+
# Export to CSV
191+
conn.query('CALL apoc.export.csv.query("MATCH (u:User) RETURN u.name, u.email", "users.csv", {})')
192+
193+
# Export to S3
194+
conn.query('CALL apoc.export.json.query("MATCH (u:User) RETURN u", "s3://bucket/users.json", {})')
195+
196+
# Export with Parquet compression
197+
conn.query('CALL apoc.export.parquet.query("MATCH (u:User) RETURN u.name", "output.parquet", {compression: "zstd"})')
198+
```
199+
200+
Supported destinations: local files, `s3://`, `gs://`, `azure://`, `http://`, `https://`
201+
202+
The APOC export syntax also works in server mode (HTTP and Bolt), providing a unified export interface across all deployment modes.
203+
180204
---
181205

182206
## Schema Configuration

src/open_cypher_parser/mod.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,23 +42,20 @@ pub fn parse_cypher_statement(
4242
) -> IResult<&'_ str, CypherStatement<'_>, OpenCypherParsingError<'_>> {
4343
let (input, _) = multispace0.parse(input)?;
4444

45-
// Check if this looks like a CALL with UNION or RETURN
46-
// If so, parse as Query, not standalone procedure call
47-
let input_upper = input.to_uppercase();
48-
let has_union_in_query = input_upper.contains("UNION");
49-
let has_return_in_query = input_upper.contains("RETURN");
50-
51-
if !has_union_in_query && !has_return_in_query {
52-
// Try to parse as standalone procedure call
53-
if let Ok((remaining, procedure_call)) =
54-
standalone_procedure_call::parse_standalone_procedure_call(input)
55-
{
56-
// Optional trailing semicolon
45+
// Try standalone procedure call first.
46+
// Only use it if parsing consumed the entire input (modulo whitespace/semicolons).
47+
// If the standalone parser succeeds but leaves unconsumed tokens (e.g., RETURN, UNION),
48+
// fall through to the query parser which handles CALL as part of a larger query.
49+
if let Ok((remaining, procedure_call)) =
50+
standalone_procedure_call::parse_standalone_procedure_call(input)
51+
{
52+
let check = remaining.trim();
53+
if check.is_empty() || check == ";" {
5754
let (input, _) = opt(ws(tag(";"))).parse(remaining)?;
5855
return Ok((input, CypherStatement::ProcedureCall(procedure_call)));
5956
}
6057
}
61-
// Has UNION or RETURN (or standalone parse failed) - fall through to regular query parser
58+
// Standalone parse didn't consume everything - fall through to regular query parser
6259

6360
// Parse the first query
6461
let (input, first_query) = parse_query_with_nom.parse(input)?;

0 commit comments

Comments
 (0)