Skip to content

Commit 9af7478

Browse files
committed
fix: implement proper task_local! with .scope() wrapper for query isolation
Addresses Copilot review feedback from PR #40: - Created unified QueryContext module (src/server/query_context.rs) - Implements .scope() wrapper via with_query_context() function - Contains all per-query state: schema_name, cte_registry, denormalized_aliases, relationship_columns, cte_property_mappings, multi_type_vlp_aliases - Refactored handlers.rs to wrap query processing in with_query_context() - Each request gets isolated context via task_local .scope() - Context automatically cleaned up when task completes - No state leakage between concurrent queries - Updated render_plan/mod.rs, render_expr.rs, to_sql_query.rs - All modules now delegate to unified query_context - Consistent API for all context access This fix ensures concurrent queries don't interfere with each other's state, which was a critical concurrency bug identified by Copilot. Tests: 792 unit tests pass, 233 integration tests pass
1 parent 07b73d7 commit 9af7478

6 files changed

Lines changed: 426 additions & 310 deletions

File tree

src/clickhouse_query_generator/to_sql_query.rs

Lines changed: 18 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -12,198 +12,39 @@ use crate::{
1212
OrderByItems, OrderByOrder, RenderPlan, SelectItems, ToSql, UnionItems, UnionType,
1313
},
1414
},
15+
server::query_context::{
16+
self, clear_all_render_contexts, get_cte_column_registry, get_cte_property_mapping,
17+
get_relationship_columns, is_multi_type_vlp_alias, set_all_render_contexts,
18+
},
1519
};
16-
use std::cell::RefCell;
1720
use std::collections::HashMap;
1821

1922
// Import function translator for Neo4j -> ClickHouse function mappings
2023
use super::function_registry::get_function_mapping;
2124
use super::function_translator::{get_ch_function_name, CH_PASSTHROUGH_PREFIX};
2225

23-
/// TASK-LOCAL RENDER CONTEXT: Per-async-task storage for query rendering
24-
/// Each Axum async task (HTTP request) gets its own isolated context.
25-
/// Multiple concurrent queries on same OS thread have zero interference.
26-
///
27-
/// Why task_local instead of thread_local:
28-
/// - thread_local: Shared by all async tasks on same OS thread (UNSAFE for concurrent queries)
29-
/// - task_local: Each async task gets isolated storage (SAFE)
30-
///
31-
/// Usage:
32-
/// 1. At query handler entry: RENDER_CONTEXT_REGISTRY.scope(registry, async { ... }).await
33-
/// 2. During rendering: get_cte_column_from_context() accesses current task's registry
34-
/// 3. When task completes: context automatically cleaned up
35-
tokio::task_local! {
36-
/// Task-local CTE column registry: (cte_alias, cypher_property) → cte_output_column
37-
/// Populated once per query at render_to_sql() entry
38-
/// Isolated to current async task - no interference with concurrent queries
39-
pub static RENDER_CONTEXT_CTE_REGISTRY: RefCell<Option<CteColumnRegistry>>;
40-
}
41-
42-
/// Retrieve CTE column for property within current render context (task-local)
43-
/// Returns None if not in task context or property not found (safe fallback)
44-
fn get_cte_column_from_context(cte_alias: &str, property: &str) -> Option<String> {
45-
// Access the task-local context
46-
// task_local! uses sync accessors, not .with()
47-
RENDER_CONTEXT_CTE_REGISTRY
48-
.try_with(|ctx| {
49-
ctx.borrow()
50-
.as_ref()
51-
.and_then(|registry| registry.lookup(cte_alias, property))
52-
})
53-
.ok()
54-
.flatten()
55-
}
56-
57-
/// Set the CTE registry for the current async task
58-
/// Call at render_to_sql() entry to make registry available to rendering code
59-
fn set_render_context_cte_registry(registry: CteColumnRegistry) {
60-
// task_local! doesn't support runtime initialization, so we use try_with
61-
// If we're not in a task context, this returns an error which we ignore
62-
let _ = RENDER_CONTEXT_CTE_REGISTRY.try_with(|ctx| {
63-
let prev = ctx.borrow_mut().replace(registry);
64-
if prev.is_some() {
65-
log::warn!("⚠️ Overwriting previous render context - possible re-entrancy");
66-
}
67-
});
68-
}
69-
70-
/// Clear the CTE registry for the current async task
71-
/// Call at render_to_sql() exit to clean up
72-
fn clear_render_context_cte_registry() {
73-
// task_local! sync cleanup
74-
let _ = RENDER_CONTEXT_CTE_REGISTRY.try_with(|ctx| {
75-
ctx.borrow_mut().take();
76-
});
77-
}
78-
7926
// ============================================================================
80-
// RELATIONSHIP COLUMNS CONTEXT (for IS NULL checks on relationship aliases)
27+
// RENDER CONTEXT ACCESSORS (delegating to unified query_context)
8128
// ============================================================================
8229

83-
fn set_render_context_relationship_columns(columns: HashMap<String, (String, String)>) {
84-
let _ = RENDER_CONTEXT_RELATIONSHIP_COLUMNS.try_with(|ctx| {
85-
ctx.borrow_mut().replace(columns);
86-
});
30+
/// Retrieve CTE column for property within current render context
31+
fn get_cte_column_from_context(cte_alias: &str, property: &str) -> Option<String> {
32+
get_cte_column_registry().and_then(|registry| registry.lookup(cte_alias, property))
8733
}
8834

35+
/// Get relationship columns for IS NULL checks
8936
fn get_relationship_columns_from_context(alias: &str) -> Option<(String, String)> {
90-
RENDER_CONTEXT_RELATIONSHIP_COLUMNS
91-
.try_with(|ctx| {
92-
ctx.borrow()
93-
.as_ref()
94-
.and_then(|cols| cols.get(alias).cloned())
95-
})
96-
.ok()
97-
.flatten()
98-
}
99-
100-
fn clear_render_context_relationship_columns() {
101-
let _ = RENDER_CONTEXT_RELATIONSHIP_COLUMNS.try_with(|ctx| {
102-
ctx.borrow_mut().take();
103-
});
104-
}
105-
106-
// ============================================================================
107-
// CTE PROPERTY MAPPINGS CONTEXT (for CTE property → column resolution)
108-
// ============================================================================
109-
110-
fn set_render_context_cte_property_mappings(mappings: HashMap<String, HashMap<String, String>>) {
111-
let _ = RENDER_CONTEXT_CTE_PROPERTY_MAPPINGS.try_with(|ctx| {
112-
ctx.borrow_mut().replace(mappings);
113-
});
37+
get_relationship_columns(alias)
11438
}
11539

40+
/// Get CTE property mapping
11641
fn get_cte_property_from_context(cte_alias: &str, property: &str) -> Option<String> {
117-
RENDER_CONTEXT_CTE_PROPERTY_MAPPINGS
118-
.try_with(|ctx| {
119-
ctx.borrow().as_ref().and_then(|mappings| {
120-
mappings
121-
.get(cte_alias)
122-
.and_then(|props| props.get(property).cloned())
123-
})
124-
})
125-
.ok()
126-
.flatten()
127-
}
128-
129-
fn clear_render_context_cte_property_mappings() {
130-
let _ = RENDER_CONTEXT_CTE_PROPERTY_MAPPINGS.try_with(|ctx| {
131-
ctx.borrow_mut().take();
132-
});
133-
}
134-
135-
// ============================================================================
136-
// MULTI-TYPE VLP ALIASES CONTEXT (for multi-type VLP JSON property extraction)
137-
// ============================================================================
138-
139-
fn set_render_context_multi_type_vlp_aliases(aliases: HashMap<String, String>) {
140-
let _ = RENDER_CONTEXT_MULTI_TYPE_VLP_ALIASES.try_with(|ctx| {
141-
ctx.borrow_mut().replace(aliases);
142-
});
42+
get_cte_property_mapping(cte_alias, property)
14343
}
14444

45+
/// Check if alias is a multi-type VLP endpoint
14546
fn is_multi_type_vlp_alias_from_context(alias: &str) -> bool {
146-
RENDER_CONTEXT_MULTI_TYPE_VLP_ALIASES
147-
.try_with(|ctx| {
148-
ctx.borrow()
149-
.as_ref()
150-
.map(|aliases| aliases.contains_key(alias))
151-
.unwrap_or(false)
152-
})
153-
.unwrap_or(false)
154-
}
155-
156-
fn clear_render_context_multi_type_vlp_aliases() {
157-
let _ = RENDER_CONTEXT_MULTI_TYPE_VLP_ALIASES.try_with(|ctx| {
158-
ctx.borrow_mut().take();
159-
});
160-
}
161-
162-
/// Set ALL render context for the current async task
163-
/// Call at render_to_sql() entry
164-
fn set_all_render_contexts(
165-
cte_registry: CteColumnRegistry,
166-
relationship_columns: HashMap<String, (String, String)>,
167-
cte_mappings: HashMap<String, HashMap<String, String>>,
168-
multi_type_aliases: HashMap<String, String>,
169-
) {
170-
set_render_context_cte_registry(cte_registry);
171-
set_render_context_relationship_columns(relationship_columns);
172-
set_render_context_cte_property_mappings(cte_mappings);
173-
set_render_context_multi_type_vlp_aliases(multi_type_aliases);
174-
}
175-
176-
/// Clear ALL render contexts for the current async task
177-
/// Call at render_to_sql() exit (before final return)
178-
fn clear_all_render_contexts() {
179-
clear_render_context_cte_registry();
180-
clear_render_context_relationship_columns();
181-
clear_render_context_cte_property_mappings();
182-
clear_render_context_multi_type_vlp_aliases();
183-
}
184-
185-
/// TASK-LOCAL RENDER CONTEXT: Per-async-task storage for rendering state
186-
/// Each Axum async task (HTTP request) gets its own isolated context.
187-
/// Multiple concurrent queries on same OS thread have zero interference.
188-
///
189-
/// Lifecycle:
190-
/// 1. At render_to_sql() entry: Set all three contexts
191-
/// 2. During rendering: Access contexts for property/column lookups
192-
/// 3. At render_to_sql() exit: Clear all contexts before return
193-
tokio::task_local! {
194-
/// Task-local mapping of relationship alias → (from_id_column, to_id_column)
195-
/// Populated during JOIN rendering, used for IS NULL checks on relationship aliases
196-
pub static RENDER_CONTEXT_RELATIONSHIP_COLUMNS: RefCell<Option<HashMap<String, (String, String)>>>;
197-
198-
/// Task-local mapping of CTE alias → property mapping (Cypher property → CTE column name)
199-
/// Example: "cnt_friend" → { "id" → "friend_id", "firstName" → "friend_firstName" }
200-
/// Populated from RenderPlan CTEs during SQL generation
201-
pub static RENDER_CONTEXT_CTE_PROPERTY_MAPPINGS: RefCell<Option<HashMap<String, HashMap<String, String>>>>;
202-
203-
/// Task-local set of table aliases that are multi-type VLP endpoints
204-
/// Example: "x" for query (u)-[:FOLLOWS|AUTHORED*1..2]->(x)
205-
/// Properties on these aliases need JSON extraction from end_properties column
206-
pub static RENDER_CONTEXT_MULTI_TYPE_VLP_ALIASES: RefCell<Option<HashMap<String, String>>>;
47+
is_multi_type_vlp_alias(alias)
20748
}
20849

20950
/// Check if an expression contains a string literal (recursively for nested + operations)
@@ -270,10 +111,10 @@ fn build_relationship_columns_from_plan(plan: &RenderPlan) -> HashMap<String, (S
270111
}
271112

272113
/// Pre-populate the relationship columns mapping from a RenderPlan
273-
/// This builds the mapping and sets it in the task-local context
114+
/// This builds the mapping and sets it in the query context
274115
fn populate_relationship_columns_from_plan(plan: &RenderPlan) {
275116
let map = build_relationship_columns_from_plan(plan);
276-
set_render_context_relationship_columns(map);
117+
query_context::set_relationship_columns(map);
277118
}
278119

279120
/// Build CTE property mappings from RenderPlan CTEs (for collecting data)
@@ -370,8 +211,8 @@ fn populate_cte_property_mappings(plan: &RenderPlan) {
370211
let cte_mappings = build_cte_property_mappings(plan);
371212
let multi_type_aliases = build_multi_type_vlp_aliases(plan);
372213

373-
set_render_context_cte_property_mappings(cte_mappings);
374-
set_render_context_multi_type_vlp_aliases(multi_type_aliases);
214+
query_context::set_cte_property_mappings(cte_mappings);
215+
query_context::set_multi_type_vlp_aliases(multi_type_aliases);
375216
}
376217

377218
/// Rewrite property access in SELECT, GROUP BY items for VLP queries

src/render_plan/mod.rs

Lines changed: 6 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -31,69 +31,12 @@ pub use filter_pipeline::CategorizedFilters;
3131
pub use from_table::FromTable;
3232
pub use view_table_ref::ViewTableRef;
3333

34-
// Task-local storage for CTE column registry during render phase
35-
// This allows select_builder and filter_builder to resolve property access expressions
36-
// for WITH-exported variables to their CTE output column names
37-
// Task-local: Isolated per async task, no interference between concurrent queries
38-
tokio::task_local! {
39-
static CTE_COLUMN_REGISTRY_CONTEXT: RefCell<Option<CteColumnRegistry>>;
40-
}
41-
42-
/// Set the CTE column registry for the current render phase
43-
/// This should be called before rendering a logical plan that references WITH-exported variables
44-
pub fn set_cte_column_registry(registry: CteColumnRegistry) {
45-
let _ = CTE_COLUMN_REGISTRY_CONTEXT.try_with(|cell| {
46-
*cell.borrow_mut() = Some(registry);
47-
});
48-
}
49-
50-
/// Get the CTE column registry for property resolution
51-
pub fn get_cte_column_registry() -> Option<CteColumnRegistry> {
52-
CTE_COLUMN_REGISTRY_CONTEXT
53-
.try_with(|cell| cell.borrow().clone())
54-
.ok()
55-
.flatten()
56-
}
57-
58-
/// Clear the CTE column registry after rendering is complete
59-
pub fn clear_cte_column_registry() {
60-
let _ = CTE_COLUMN_REGISTRY_CONTEXT.try_with(|cell| {
61-
*cell.borrow_mut() = None;
62-
});
63-
}
64-
65-
// Denormalized edge alias mapping: maps target node alias to edge alias
66-
// For denormalized edges (e.g., AUTHORED where posts_bench is both edge and target node),
67-
// when we skip the second JOIN, we need to map the target node alias to the edge alias
68-
// so property resolution works correctly
69-
// Task-local: Isolated per async task, no interference between concurrent queries
70-
tokio::task_local! {
71-
static DENORMALIZED_EDGE_ALIASES: RefCell<std::collections::HashMap<String, String>>;
72-
}
73-
74-
/// Register an alias mapping for denormalized edges
75-
/// Maps target_node_alias → edge_alias (e.g., "d" → "r2")
76-
pub fn register_denormalized_alias(target_node_alias: &str, edge_alias: &str) {
77-
let _ = DENORMALIZED_EDGE_ALIASES.try_with(|cell| {
78-
cell.borrow_mut()
79-
.insert(target_node_alias.to_string(), edge_alias.to_string());
80-
});
81-
}
82-
83-
/// Look up the edge alias for a target node alias (if denormalized)
84-
pub fn get_denormalized_alias_mapping(target_node_alias: &str) -> Option<String> {
85-
DENORMALIZED_EDGE_ALIASES
86-
.try_with(|cell| cell.borrow().get(target_node_alias).cloned())
87-
.ok()
88-
.flatten()
89-
}
90-
91-
/// Clear all denormalized alias mappings after rendering is complete
92-
pub fn clear_denormalized_aliases() {
93-
let _ = DENORMALIZED_EDGE_ALIASES.try_with(|cell| {
94-
cell.borrow_mut().clear();
95-
});
96-
}
34+
// Re-export CTE column registry and denormalized alias accessors from unified query context
35+
// See server/query_context.rs for the task_local! implementation with .scope() support
36+
pub use crate::server::query_context::{
37+
clear_cte_column_registry, clear_denormalized_aliases, get_cte_column_registry,
38+
get_denormalized_alias_mapping, register_denormalized_alias, set_cte_column_registry,
39+
};
9740

9841
use crate::query_planner::join_context::{
9942
VLP_CTE_FROM_ALIAS, VLP_END_ID_COLUMN, VLP_START_ID_COLUMN,

src/render_plan/render_expr.rs

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,44 +20,11 @@ use crate::query_planner::logical_plan::LogicalPlan;
2020

2121
use super::errors::RenderBuildError;
2222

23-
// Thread-local storage for the current schema name (per-query context)
24-
//
25-
// NOTE: Using thread_local! instead of tokio::task_local! because:
26-
// - task_local! requires wrapping ALL query processing in a .scope() call
27-
// - The current architecture doesn't easily support that pattern
28-
// - thread_local! works correctly for our single-request-at-a-time HTTP handler pattern
29-
// - For true concurrent isolation, refactor to pass schema explicitly through the call chain
30-
//
31-
// Usage:
32-
// 1. At query handler entry: set_current_schema_name(schema_name)
33-
// 2. During all query processing: get_current_schema_name() accesses current query's schema
34-
// 3. At query handler exit: clear_current_schema_name()
35-
thread_local! {
36-
/// Query-scope schema name: which schema this query operates on
37-
static QUERY_SCHEMA_NAME: RefCell<Option<String>> = RefCell::new(None);
38-
}
39-
40-
/// Set the schema name for the current query (task/request)
41-
/// Call this immediately after determining schema_name in query handler
42-
/// Schema context is read-only and available to ALL query processing phases
43-
pub fn set_current_schema_name(name: Option<String>) {
44-
QUERY_SCHEMA_NAME.with(|cell| {
45-
*cell.borrow_mut() = name;
46-
});
47-
}
48-
49-
/// Get the current query's schema name (available to all processing phases)
50-
pub fn get_current_schema_name() -> Option<String> {
51-
QUERY_SCHEMA_NAME.with(|cell| cell.borrow().clone())
52-
}
53-
54-
/// Clear the schema name after query processing completes
55-
/// Call this at query handler exit for cleanup
56-
pub fn clear_current_schema_name() {
57-
QUERY_SCHEMA_NAME.with(|cell| {
58-
*cell.borrow_mut() = None;
59-
});
60-
}
23+
// Re-export schema name accessors from the unified query context
24+
// See server/query_context.rs for the task_local! implementation with .scope() support
25+
pub use crate::server::query_context::{
26+
clear_current_schema_name, get_current_schema_name, set_current_schema_name,
27+
};
6128

6229
/// Generate SQL for an EXISTS subquery directly from the logical plan
6330
/// This is a simplified approach that generates basic EXISTS SQL

0 commit comments

Comments
 (0)