Skip to content

Commit dcd47c1

Browse files
authored
Merge pull request #32 from NodeDB-Lab/bug/pgwire-tenant-scoping-29-30
fix(pgwire): scope catalog resolution and trust auth to the connecting tenant
2 parents 6513eca + 3483711 commit dcd47c1

File tree

17 files changed

+437
-68
lines changed

17 files changed

+437
-68
lines changed

nodedb/src/control/event_trigger.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ async fn execute_then_action(
124124
.replace("$collection", &event.collection)
125125
.replace("$operation", event.operation.as_str());
126126

127-
let query_ctx = QueryContext::for_state(shared, 1);
127+
let query_ctx = QueryContext::for_state(shared);
128128

129129
match query_ctx.plan_sql(&sql, event.tenant_id).await {
130130
Ok(tasks) => {

nodedb/src/control/planner/context.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,29 +50,33 @@ pub struct QueryContext {
5050
}
5151

5252
/// Inputs needed to construct an `OriginCatalog` per plan call.
53+
///
54+
/// Tenant is intentionally **not** stored here: every plan call passes the
55+
/// effective tenant to `build_adapter`, so a single `QueryContext` shared
56+
/// across a pgwire handler can serve queries from connections belonging to
57+
/// different tenants without cross-tenant catalog resolution.
5358
#[derive(Clone)]
5459
struct CatalogInputs {
5560
credentials: Arc<CredentialStore>,
5661
shared: Option<std::sync::Weak<crate::control::state::SharedState>>,
57-
tenant_id: u32,
5862
retention_policy_registry:
5963
Option<Arc<crate::engine::timeseries::retention_policy::RetentionPolicyRegistry>>,
6064
}
6165

6266
impl CatalogInputs {
63-
fn build_adapter(&self) -> super::catalog_adapter::OriginCatalog {
67+
fn build_adapter(&self, tenant_id: u32) -> super::catalog_adapter::OriginCatalog {
6468
if let Some(weak) = &self.shared
6569
&& let Some(shared) = weak.upgrade()
6670
{
6771
super::catalog_adapter::OriginCatalog::new_with_lease(
6872
&shared,
69-
self.tenant_id,
73+
tenant_id,
7074
self.retention_policy_registry.clone(),
7175
)
7276
} else {
7377
super::catalog_adapter::OriginCatalog::new(
7478
Arc::clone(&self.credentials),
75-
self.tenant_id,
79+
tenant_id,
7680
self.retention_policy_registry.clone(),
7781
)
7882
}
@@ -97,10 +101,9 @@ impl QueryContext {
97101
/// path would return instantly anyway, but going through the
98102
/// sub-planner without a direct `Arc<SharedState>` reference
99103
/// would require threading one through every call site.
100-
pub fn for_state(state: &crate::control::state::SharedState, tenant_id: u32) -> Self {
104+
pub fn for_state(state: &crate::control::state::SharedState) -> Self {
101105
Self::with_catalog(
102106
Arc::clone(&state.credentials),
103-
tenant_id,
104107
Some(Arc::clone(&state.retention_policy_registry)),
105108
)
106109
}
@@ -110,16 +113,12 @@ impl QueryContext {
110113
/// query's plan acquires descriptor leases before execution.
111114
/// Callers must hold an `Arc<SharedState>` — the adapter
112115
/// downgrades to `Weak` internally.
113-
pub fn for_state_with_lease(
114-
state: &Arc<crate::control::state::SharedState>,
115-
tenant_id: u32,
116-
) -> Self {
116+
pub fn for_state_with_lease(state: &Arc<crate::control::state::SharedState>) -> Self {
117117
let retention = Some(Arc::clone(&state.retention_policy_registry));
118118
Self {
119119
catalog_inputs: Some(CatalogInputs {
120120
credentials: Arc::clone(&state.credentials),
121121
shared: Some(Arc::downgrade(state)),
122-
tenant_id,
123122
retention_policy_registry: retention.clone(),
124123
}),
125124
retention_registry: retention,
@@ -136,15 +135,13 @@ impl QueryContext {
136135
/// that construct a context without an `Arc<SharedState>`.
137136
pub fn with_catalog(
138137
credentials: Arc<CredentialStore>,
139-
tenant_id: u32,
140138
retention_policy_registry: Option<
141139
Arc<crate::engine::timeseries::retention_policy::RetentionPolicyRegistry>,
142140
>,
143141
) -> Self {
144142
let catalog_inputs = Some(CatalogInputs {
145143
credentials,
146144
shared: None,
147-
tenant_id,
148145
retention_policy_registry: retention_policy_registry.clone(),
149146
});
150147

@@ -193,7 +190,7 @@ impl QueryContext {
193190
// `recorded_versions` field is per-plan state, and
194191
// two concurrent plans through a shared QueryContext
195192
// would otherwise interleave their recorded sets.
196-
let catalog = inputs.build_adapter();
193+
let catalog = inputs.build_adapter(tenant_id.as_u32());
197194
let plans = nodedb_sql::plan_sql(sql, &catalog).map_err(|e| match e {
198195
nodedb_sql::SqlError::RetryableSchemaChanged { descriptor } => {
199196
crate::Error::RetryableSchemaChanged { descriptor }
@@ -292,7 +289,7 @@ impl QueryContext {
292289
// through a different cache key), but constructing the
293290
// adapter fresh keeps the adapter's state per-plan and
294291
// allows future extension.
295-
let catalog = inputs.build_adapter();
292+
let catalog = inputs.build_adapter(tenant_id.as_u32());
296293
let plans = nodedb_sql::plan_sql_with_params(sql, params, &catalog).map_err(|e| {
297294
crate::Error::PlanError {
298295
detail: format!("{e}"),

nodedb/src/control/planner/procedural/executor/core/dispatch.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,7 @@ impl<'a> StatementExecutor<'a> {
4444
pub(super) async fn execute_dml(&self, sql: &str, bindings: &RowBindings) -> crate::Result<()> {
4545
let bound_sql = fold_literal_string_concat(&bindings.substitute(sql));
4646

47-
let ctx = crate::control::planner::context::QueryContext::for_state(
48-
self.state,
49-
self.tenant_id.as_u32(),
50-
);
47+
let ctx = crate::control::planner::context::QueryContext::for_state(self.state);
5148
let tasks = ctx.plan_sql(&bound_sql, self.tenant_id).await?;
5249

5350
if let Some(ref tx_ctx) = self.tx_ctx {

nodedb/src/control/scatter_gather.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,6 @@ pub async fn coordinate_cross_shard_hop(
343343
// (same pattern as QueryContext::for_state but without &SharedState).
344344
let plan_ctx = crate::control::planner::context::QueryContext::with_catalog(
345345
std::sync::Arc::clone(&credentials_clone),
346-
tenant_id_u32,
347346
Some(std::sync::Arc::clone(&retention_clone)),
348347
);
349348

nodedb/src/control/server/http/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ pub async fn run_with_listener(
159159
let mut shutdown_rx = bus.handle().flat_watch().raw_receiver();
160160

161161
let query_ctx = Arc::new(crate::control::planner::context::QueryContext::for_state(
162-
&shared, 1,
162+
&shared,
163163
));
164164
let state = AppState {
165165
shared,
@@ -198,7 +198,7 @@ pub async fn run(
198198
let mut shutdown_rx = bus.handle().flat_watch().raw_receiver();
199199

200200
let query_ctx = Arc::new(crate::control::planner::context::QueryContext::for_state(
201-
&shared, 1,
201+
&shared,
202202
));
203203
let state = AppState {
204204
shared,

nodedb/src/control/server/native/session.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl NativeSession {
4848
state: Arc<SharedState>,
4949
auth_mode: AuthMode,
5050
) -> Self {
51-
let query_ctx = QueryContext::for_state(&state, 1); // default tenant
51+
let query_ctx = QueryContext::for_state(&state);
5252
Self {
5353
stream,
5454
peer_addr,

nodedb/src/control/server/pgwire/ddl/collection/check_constraint.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,7 @@ async fn enforce_subquery_check(
102102
// General fallback: wrap in subselect.
103103
let restructured = restructure_subquery_check(&substituted);
104104

105-
let query_ctx =
106-
crate::control::planner::context::QueryContext::for_state(state, tenant_id.as_u32());
105+
let query_ctx = crate::control::planner::context::QueryContext::for_state(state);
107106

108107
let tasks = match query_ctx.plan_sql(&restructured.sql, tenant_id).await {
109108
Ok(t) => t,

nodedb/src/control/server/pgwire/ddl/collection/insert_parse.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,7 @@ pub(super) async fn plan_and_dispatch(
304304
tenant_id: nodedb_types::TenantId,
305305
sql: &str,
306306
) -> PgWireResult<()> {
307-
let query_ctx =
308-
crate::control::planner::context::QueryContext::for_state(state, tenant_id.as_u32());
307+
let query_ctx = crate::control::planner::context::QueryContext::for_state(state);
309308
let tasks = query_ctx
310309
.plan_sql(sql, tenant_id)
311310
.await

nodedb/src/control/server/pgwire/ddl/maintenance/analyze.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ pub async fn handle_analyze(
6969

7070
// Dispatch a scan to the Data Plane to collect all rows.
7171
let scan_sql = format!("SELECT * FROM {collection}");
72-
let query_ctx = crate::control::planner::context::QueryContext::for_state(state, tenant_id);
72+
let query_ctx = crate::control::planner::context::QueryContext::for_state(state);
7373
let rows = match query_ctx.plan_sql(&scan_sql, identity.tenant_id).await {
7474
Ok(tasks) => {
7575
let mut json_rows = Vec::new();

nodedb/src/control/server/pgwire/ddl/typeguard/validate.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ pub async fn validate_typeguard(
5757

5858
// Scan all documents.
5959
let scan_sql = format!("SELECT * FROM {coll_name}");
60-
let query_ctx =
61-
crate::control::planner::context::QueryContext::for_state(state, tenant_id.as_u32());
60+
let query_ctx = crate::control::planner::context::QueryContext::for_state(state);
6261
let tasks = query_ctx
6362
.plan_sql(&scan_sql, tenant_id)
6463
.await

0 commit comments

Comments
 (0)