Skip to content

Commit c0100a5

Browse files
authored
Merge pull request #135 from nlocascio/nl/support-audit-schemas
Adds schema support to audit triggers/functions
2 parents 669ee45 + 31d2756 commit c0100a5

File tree

4 files changed

+268
-3
lines changed

4 files changed

+268
-3
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
## 0.16
66

7+
### 0.16.8
8+
9+
- fix: Adds schema support to audit functions (that is, they will now be created in the schema of the table to avoid conflicts).
10+
711
### 0.16.7
812

913
- feat: Add missing support for `exclude` on triggers.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "sqlalchemy-declarative-extensions"
3-
version = "0.16.7"
3+
version = "0.16.8"
44
authors = [{ name = "Dan Cardin", email = "ddcardin@gmail.com" }]
55
description = "Library to declare additional kinds of objects not natively supported by SQLAlchemy/Alembic."
66
license = { file = "LICENSE" }

src/sqlalchemy_declarative_extensions/audit.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
register_trigger,
1111
)
1212
from sqlalchemy_declarative_extensions.dialects.postgresql.trigger import Trigger
13-
from sqlalchemy_declarative_extensions.sql import quote_name
13+
from sqlalchemy_declarative_extensions.sql import qualify_name, quote_name
1414

1515
default_primary_key = Column(
1616
"audit_pk", types.Integer(), primary_key=True, autoincrement=True
@@ -109,6 +109,7 @@ def audit_table(
109109
create_audit_triggers(
110110
table.metadata,
111111
table,
112+
audit_table,
112113
insert=insert,
113114
update=update,
114115
delete=delete,
@@ -242,6 +243,7 @@ def create_audit_functions(
242243
""",
243244
returns="TRIGGER",
244245
language="plpgsql",
246+
schema=audit_table.schema,
245247
)
246248
functions.append(function)
247249
register_function(metadata, function)
@@ -252,6 +254,7 @@ def create_audit_functions(
252254
def create_audit_triggers(
253255
metadata: MetaData,
254256
table: Table,
257+
audit_table: Table,
255258
insert: bool = True,
256259
update: bool = True,
257260
delete: bool = True,
@@ -282,10 +285,15 @@ def create_audit_triggers(
282285
if not enabled:
283286
continue
284287

288+
# Use qualified function name (schema.function_name) for trigger execution
289+
# Use audit_table.schema since functions are created in the audit table's schema
290+
function_qualified_name = qualify_name(
291+
audit_table.schema, "_".join([function_name, op])
292+
)
285293
trigger = Trigger.after(
286294
op,
287295
on=table.fullname,
288-
execute="_".join([function_name, op]),
296+
execute=function_qualified_name,
289297
name="_".join([trigger_name, op]),
290298
).for_each_row()
291299
triggers.append(trigger)

tests/audit/test_schema_support.py

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
"""Tests for schema-aware audit function and trigger creation."""
2+
from pytest_mock_resources import create_postgres_fixture
3+
from sqlalchemy import Column, text, types
4+
5+
from sqlalchemy_declarative_extensions import (
6+
Schemas,
7+
declarative_database,
8+
register_sqlalchemy_events,
9+
)
10+
from sqlalchemy_declarative_extensions.audit import audit
11+
from sqlalchemy_declarative_extensions.sqlalchemy import declarative_base
12+
13+
_Base = declarative_base()
14+
15+
16+
@declarative_database
17+
class Base(_Base): # type: ignore
18+
__abstract__ = True
19+
20+
schemas = Schemas().are("myschema", "otherschema")
21+
22+
23+
# Test table with schema from __table_args__
24+
@audit()
25+
class Product(Base):
26+
__tablename__ = "product"
27+
__table_args__ = {"schema": "myschema"}
28+
29+
id = Column(types.Integer(), primary_key=True)
30+
name = Column(types.Unicode())
31+
price = Column(types.Numeric())
32+
33+
34+
# Test table with explicit schema parameter in @audit decorator
35+
@audit(schema="otherschema")
36+
class Order(Base):
37+
__tablename__ = "order"
38+
__table_args__ = {"schema": "myschema"}
39+
40+
id = Column(types.Integer(), primary_key=True)
41+
product_id = Column(types.Integer())
42+
quantity = Column(types.Integer())
43+
44+
45+
# Test table without schema (should use default)
46+
@audit()
47+
class Customer(Base):
48+
__tablename__ = "customer"
49+
50+
id = Column(types.Integer(), primary_key=True)
51+
name = Column(types.Unicode())
52+
53+
54+
register_sqlalchemy_events(Base.metadata, schemas=True, functions=True, triggers=True)
55+
56+
pg = create_postgres_fixture(engine_kwargs={"echo": True}, session=True)
57+
58+
59+
def test_audit_functions_in_table_schema(pg):
60+
"""Test that audit functions are created in the same schema as the audited table."""
61+
Base.metadata.create_all(bind=pg.connection())
62+
pg.commit()
63+
64+
# Check that audit functions exist in myschema (for Product table)
65+
result = pg.execute(
66+
text(
67+
"""
68+
SELECT routine_schema, routine_name
69+
FROM information_schema.routines
70+
WHERE routine_schema = 'myschema'
71+
AND routine_name LIKE '%product_audit%'
72+
ORDER BY routine_name
73+
"""
74+
)
75+
).fetchall()
76+
77+
# Should have 3 functions: insert, update, delete
78+
assert len(result) == 3
79+
schemas = {r[0] for r in result}
80+
assert schemas == {"myschema"}
81+
82+
function_names = {r[1] for r in result}
83+
assert function_names == {
84+
"myschema_product_audit_insert",
85+
"myschema_product_audit_update",
86+
"myschema_product_audit_delete",
87+
}
88+
89+
90+
def test_audit_functions_with_explicit_schema(pg):
91+
"""Test that audit functions respect explicit schema parameter in @audit decorator."""
92+
Base.metadata.create_all(bind=pg.connection())
93+
pg.commit()
94+
95+
# Check that audit functions exist in otherschema (explicit schema for Order)
96+
result = pg.execute(
97+
text(
98+
"""
99+
SELECT routine_schema, routine_name
100+
FROM information_schema.routines
101+
WHERE routine_schema = 'otherschema'
102+
AND routine_name LIKE '%order_audit%'
103+
ORDER BY routine_name
104+
"""
105+
)
106+
).fetchall()
107+
108+
# Should have 3 functions: insert, update, delete
109+
assert len(result) == 3
110+
schemas = {r[0] for r in result}
111+
assert schemas == {"otherschema"}
112+
113+
114+
def test_audit_table_in_correct_schema(pg):
115+
"""Test that audit tables are created in the correct schema."""
116+
Base.metadata.create_all(bind=pg.connection())
117+
pg.commit()
118+
119+
# Check Product audit table is in myschema
120+
result = pg.execute(
121+
text(
122+
"""
123+
SELECT table_schema, table_name
124+
FROM information_schema.tables
125+
WHERE table_schema = 'myschema'
126+
AND table_name = 'product_audit'
127+
"""
128+
)
129+
).fetchall()
130+
131+
assert len(result) == 1
132+
assert result[0] == ("myschema", "product_audit")
133+
134+
# Check Order audit table is in otherschema (explicit schema)
135+
result = pg.execute(
136+
text(
137+
"""
138+
SELECT table_schema, table_name
139+
FROM information_schema.tables
140+
WHERE table_schema = 'otherschema'
141+
AND table_name = 'order_audit'
142+
"""
143+
)
144+
).fetchall()
145+
146+
assert len(result) == 1
147+
assert result[0] == ("otherschema", "order_audit")
148+
149+
150+
def test_audit_triggers_reference_correct_functions(pg):
151+
"""Test that triggers correctly reference schema-qualified function names."""
152+
Base.metadata.create_all(bind=pg.connection())
153+
pg.commit()
154+
155+
# Check Product triggers reference myschema functions
156+
result = pg.execute(
157+
text(
158+
"""
159+
SELECT trigger_name, action_statement
160+
FROM information_schema.triggers
161+
WHERE event_object_schema = 'myschema'
162+
AND event_object_table = 'product'
163+
ORDER BY trigger_name
164+
"""
165+
)
166+
).fetchall()
167+
168+
assert len(result) == 3
169+
170+
# Each trigger should execute a function from myschema
171+
for trigger_name, action_statement in result:
172+
assert "myschema." in action_statement.lower(), (
173+
f"Trigger {trigger_name} should reference myschema-qualified function, "
174+
f"got: {action_statement}"
175+
)
176+
177+
178+
def test_audit_functionality_with_schema(pg):
179+
"""Integration test: verify audit trail works correctly with schema-qualified functions."""
180+
Base.metadata.create_all(bind=pg.connection())
181+
pg.commit()
182+
183+
# Insert a product
184+
product = Product(id=1, name="Widget", price=19.99)
185+
pg.add(product)
186+
pg.commit()
187+
188+
# Check audit trail
189+
result = pg.execute(
190+
text("SELECT audit_operation, name, price FROM myschema.product_audit ORDER BY audit_pk")
191+
).fetchall()
192+
193+
assert len(result) == 1
194+
assert result[0][0] == "I" # Insert operation
195+
assert result[0][1] == "Widget"
196+
assert float(result[0][2]) == 19.99
197+
198+
# Update the product
199+
product.price = 24.99
200+
pg.commit()
201+
202+
result = pg.execute(
203+
text("SELECT audit_operation, name, price FROM myschema.product_audit ORDER BY audit_pk")
204+
).fetchall()
205+
206+
assert len(result) == 2
207+
assert result[1][0] == "U" # Update operation
208+
assert float(result[1][2]) == 24.99
209+
210+
# Delete the product
211+
pg.delete(product)
212+
pg.commit()
213+
214+
result = pg.execute(
215+
text("SELECT audit_operation FROM myschema.product_audit ORDER BY audit_pk")
216+
).fetchall()
217+
218+
assert len(result) == 3
219+
assert result[2][0] == "D" # Delete operation
220+
221+
222+
def test_audit_functions_default_schema(pg):
223+
"""Test that audit functions work in default schema when no schema is specified."""
224+
Base.metadata.create_all(bind=pg.connection())
225+
pg.commit()
226+
227+
# Check that Customer audit functions exist in public schema
228+
result = pg.execute(
229+
text(
230+
"""
231+
SELECT routine_schema, routine_name
232+
FROM information_schema.routines
233+
WHERE routine_schema = 'public'
234+
AND routine_name LIKE '%customer_audit%'
235+
ORDER BY routine_name
236+
"""
237+
)
238+
).fetchall()
239+
240+
# Should have 3 functions: insert, update, delete
241+
assert len(result) == 3
242+
243+
# Verify Customer audit works
244+
customer = Customer(id=1, name="John Doe")
245+
pg.add(customer)
246+
pg.commit()
247+
248+
result = pg.execute(
249+
text("SELECT audit_operation, name FROM public.customer_audit ORDER BY audit_pk")
250+
).fetchall()
251+
252+
assert len(result) == 1
253+
assert result[0] == ("I", "John Doe")

0 commit comments

Comments
 (0)