Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ferry/database/generate_changelog.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,14 @@ def print_courses_diff(
# Process changed courses with indexed lookups
course_updates = ""
course_id_to_changes: dict[int, dict[str, tuple[Any, Any]]] = {}
for _, course in diff["courses"]["added_rows"].iterrows():
course_id = cast(int, course["course_id"])
course_id_to_changes[course_id] = {}
for column in courses_new_indexed.columns:
if column in computed_columns["courses"]:
continue
new_val = courses_new_indexed.loc[course_id, column]
course_id_to_changes[course_id][column] = (None, new_val)
for _, course in diff["courses"]["changed_rows"].iterrows():
course_id = cast(int, course["course_id"])
if course_id not in course_id_to_changes:
Expand Down
6 changes: 6 additions & 0 deletions ferry/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,12 @@ class Course(BaseModel):
comment="[computed] Whether last enrollment offering is with same professor as current.",
)

last_sync_diff = Column(
JSONB,
comment="Last sync: non-computed courses columns as {field: {old, new}}",
nullable=True,
)


class Listing(BaseModel):
"""
Expand Down
88 changes: 86 additions & 2 deletions ferry/database/sync_db_courses.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
from typing import Any, Dict, Union

from sqlalchemy import MetaData, text, inspect, Connection
from sqlalchemy import MetaData, text, inspect, Connection, bindparam
from psycopg2.extensions import register_adapter, AsIs

from ferry.database import Database
Expand Down Expand Up @@ -39,6 +39,58 @@ def safe_isna(value) -> bool:
}


def _jsonable_scalar(v: Any) -> Any:
if v is None:
return None
if isinstance(v, (np.integer, np.int64, np.int32)):
return int(v)
if isinstance(v, (np.floating, np.float64, np.float32)):
if np.isnan(v):
return None
return float(v)
if isinstance(v, np.bool_):
return bool(v)
if isinstance(v, pd.Timestamp):
return v.isoformat()
if isinstance(v, (bytes, bytearray)):
return v.decode("utf-8", errors="replace")
if isinstance(v, np.ndarray):
return _jsonable_scalar(v.tolist())
if isinstance(v, (list, tuple)):
return [_jsonable_scalar(x) for x in v]
if isinstance(v, dict):
return {str(k): _jsonable_scalar(x) for k, x in v.items()}
try:
if pd.isna(v):
return None
except (TypeError, ValueError):
pass
return v


def course_scalar_last_sync_diffs(
diff: dict[str, DiffRecord],
tables_old: dict[str, pd.DataFrame],
tables: dict[str, pd.DataFrame],
) -> dict[int, dict[str, dict[str, Any]]]:
co = tables_old["courses"].set_index("course_id")
cn = tables["courses"].set_index("course_id")
out: dict[int, dict[str, dict[str, Any]]] = {}
for _, row in diff["courses"]["changed_rows"].iterrows():
cid = int(row["course_id"])
fields: dict[str, dict[str, Any]] = {}
for col in row["columns_changed"]:
if col in computed_columns["courses"]:
continue
fields[col] = {
"old": _jsonable_scalar(co.loc[cid, col]),
"new": _jsonable_scalar(cn.loc[cid, col]),
}
if fields:
out[cid] = fields
return out
Comment on lines +71 to +91
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Include inserted courses in last_sync_diff.

This helper only walks diff["courses"]["changed_rows"], so newly inserted courses never get a payload and end the sync with last_sync_diff = NULL. That leaves course creations invisible to this feature.

Proposed fix
 def course_scalar_last_sync_diffs(
     diff: dict[str, DiffRecord],
     tables_old: dict[str, pd.DataFrame],
     tables: dict[str, pd.DataFrame],
 ) -> dict[int, dict[str, dict[str, Any]]]:
     co = tables_old["courses"].set_index("course_id")
     cn = tables["courses"].set_index("course_id")
     out: dict[int, dict[str, dict[str, Any]]] = {}
     for _, row in diff["courses"]["changed_rows"].iterrows():
         cid = int(row["course_id"])
         fields: dict[str, dict[str, Any]] = {}
         for col in row["columns_changed"]:
             if col in computed_columns["courses"]:
                 continue
             fields[col] = {
                 "old": _jsonable_scalar(co.loc[cid, col]),
                 "new": _jsonable_scalar(cn.loc[cid, col]),
             }
         if fields:
             out[cid] = fields
+
+    for _, row in diff["courses"]["added_rows"].iterrows():
+        cid = int(row["course_id"])
+        fields: dict[str, dict[str, Any]] = {}
+        for col in cn.columns:
+            if col == "course_id" or col in computed_columns["courses"]:
+                continue
+            fields[col] = {
+                "old": None,
+                "new": _jsonable_scalar(cn.loc[cid, col]),
+            }
+        if fields:
+            out[cid] = fields
+
     return out
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ferry/database/sync_db_courses.py` around lines 71 - 91, The helper
course_scalar_last_sync_diffs currently only processes
diff["courses"]["changed_rows"], so new courses are omitted; update it to also
iterate diff["courses"]["inserted_rows"] and add entries for each inserted
course id (use int(row["course_id"])) building a fields dict for non-computed
columns where "old" is None and "new" comes from cn.loc[cid, col] via
_jsonable_scalar (skip columns in computed_columns["courses"]); merge these into
the existing out mapping the same way as changed rows so inserts produce a
last_sync_diff payload.



def get_tables_from_db(database_connect_string: str) -> dict[str, pd.DataFrame]:
db = Database(database_connect_string)
db_meta = MetaData()
Expand All @@ -47,7 +99,8 @@ def get_tables_from_db(database_connect_string: str) -> dict[str, pd.DataFrame]:

return {
table_name: pd.read_sql_table(table_name, con=conn).drop(
columns=["time_added", "last_updated"], errors="ignore"
columns=["time_added", "last_updated", "last_sync_diff"],
errors="ignore",
)
for table_name in primary_keys.keys()
}
Expand Down Expand Up @@ -701,6 +754,8 @@ def sync_db_courses(

diff = generate_diff(tables_old_for_diff, tables_for_diff)

course_sync_diff_by_id = course_scalar_last_sync_diffs(diff, tables_old, tables)

print_diff(diff, tables_old, tables, data_dir / "change_log")

inspector = inspect(db.Engine)
Expand All @@ -726,6 +781,10 @@ def sync_db_courses(
)
)

course_col_names = {c["name"] for c in inspector.get_columns("courses")}
if "last_sync_diff" not in course_col_names:
conn.execute(text("ALTER TABLE courses ADD COLUMN last_sync_diff JSONB"))

# Process tables in dependency order (buildings before locations)
location_mapping = {}
for table_name in tables_order_add:
Expand Down Expand Up @@ -812,6 +871,31 @@ def sync_db_courses(
commit_deletions(table_name, deleted_rows, conn)
print("\033[F", end="")

changed_course_ids = list(course_sync_diff_by_id.keys())
if changed_course_ids:
clear_stmt = text(
"UPDATE courses SET last_sync_diff = NULL WHERE last_sync_diff IS NOT NULL AND course_id NOT IN :ids"
).bindparams(bindparam("ids", expanding=True))
conn.execute(clear_stmt, {"ids": changed_course_ids})
else:
conn.execute(
text(
"UPDATE courses SET last_sync_diff = NULL WHERE last_sync_diff IS NOT NULL"
)
)

upd = text(
"UPDATE courses SET last_sync_diff = CAST(:payload AS jsonb) WHERE course_id = :course_id"
)
for course_id, payload in course_sync_diff_by_id.items():
conn.execute(
upd,
{
"course_id": int(course_id),
"payload": ujson.dumps(payload),
},
)

# Print row counts for each table.
print("\n[Table Statistics]")
with db.Engine.begin() as conn:
Expand Down
Loading