forked from repowise-dev/repowise
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_repos.py
More file actions
157 lines (118 loc) · 5.13 KB
/
test_repos.py
File metadata and controls
157 lines (118 loc) · 5.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""Tests for /api/repos endpoints."""
from __future__ import annotations
from unittest.mock import AsyncMock, patch
import pytest
from httpx import AsyncClient
from tests.unit.server.conftest import create_test_repo
@pytest.mark.asyncio
async def test_create_repo(client: AsyncClient) -> None:
resp = await client.post(
"/api/repos",
json={
"name": "my-repo",
"local_path": "/tmp/my-repo",
"url": "https://github.com/example/my-repo",
},
)
assert resp.status_code == 201
data = resp.json()
assert data["name"] == "my-repo"
assert data["local_path"] == "/tmp/my-repo"
assert data["url"] == "https://github.com/example/my-repo"
assert data["default_branch"] == "main"
assert "id" in data
@pytest.mark.asyncio
async def test_list_repos_empty(client: AsyncClient) -> None:
resp = await client.get("/api/repos")
assert resp.status_code == 200
assert resp.json() == []
@pytest.mark.asyncio
async def test_list_repos_with_data(client: AsyncClient) -> None:
await create_test_repo(client)
resp = await client.get("/api/repos")
assert resp.status_code == 200
data = resp.json()
assert len(data) == 1
assert data[0]["name"] == "test-repo"
@pytest.mark.asyncio
async def test_get_repo_by_id(client: AsyncClient) -> None:
repo = await create_test_repo(client)
resp = await client.get(f"/api/repos/{repo['id']}")
assert resp.status_code == 200
assert resp.json()["name"] == "test-repo"
@pytest.mark.asyncio
async def test_get_repo_not_found(client: AsyncClient) -> None:
resp = await client.get("/api/repos/nonexistent")
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_update_repo(client: AsyncClient) -> None:
repo = await create_test_repo(client)
resp = await client.patch(
f"/api/repos/{repo['id']}",
json={"name": "updated-name"},
)
assert resp.status_code == 200
assert resp.json()["name"] == "updated-name"
@pytest.mark.asyncio
async def test_sync_repo_returns_202(client: AsyncClient) -> None:
repo = await create_test_repo(client)
with patch("repowise.server.routers.repos.execute_job", new_callable=AsyncMock):
resp = await client.post(f"/api/repos/{repo['id']}/sync")
assert resp.status_code == 202
data = resp.json()
assert "job_id" in data
assert data["status"] == "accepted"
@pytest.mark.asyncio
async def test_sync_repo_not_found(client: AsyncClient) -> None:
resp = await client.post("/api/repos/nonexistent/sync")
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_full_resync_returns_202(client: AsyncClient) -> None:
repo = await create_test_repo(client)
with patch("repowise.server.routers.repos.execute_job", new_callable=AsyncMock):
resp = await client.post(f"/api/repos/{repo['id']}/full-resync")
assert resp.status_code == 202
data = resp.json()
assert "job_id" in data
@pytest.mark.asyncio
async def test_sync_duplicate_returns_409(client: AsyncClient) -> None:
"""A second sync while one is pending/running returns 409."""
repo = await create_test_repo(client)
# Mock execute_job to prevent actual background pipeline run during tests
with patch("repowise.server.routers.repos.execute_job", new_callable=AsyncMock):
resp1 = await client.post(f"/api/repos/{repo['id']}/sync")
assert resp1.status_code == 202
# Second sync should be rejected (job is still pending/running)
resp2 = await client.post(f"/api/repos/{repo['id']}/sync")
assert resp2.status_code == 409
assert "already in progress" in resp2.json()["detail"].lower()
@pytest.mark.asyncio
async def test_full_resync_duplicate_returns_409(client: AsyncClient) -> None:
"""A second full-resync while one is pending/running returns 409."""
repo = await create_test_repo(client)
with patch("repowise.server.routers.repos.execute_job", new_callable=AsyncMock):
resp1 = await client.post(f"/api/repos/{repo['id']}/full-resync")
assert resp1.status_code == 202
resp2 = await client.post(f"/api/repos/{repo['id']}/full-resync")
assert resp2.status_code == 409
@pytest.mark.asyncio
async def test_export_wiki_not_found(client: AsyncClient) -> None:
resp = await client.get("/api/repos/nonexistent/export")
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_export_wiki_returns_zip(client: AsyncClient, session) -> None:
import zipfile
from io import BytesIO
from repowise.core.persistence.crud import upsert_page, upsert_repository
from tests.unit.persistence.helpers import make_page_kwargs
repo = await upsert_repository(session, name="export-test", local_path="/tmp/export-test")
await upsert_page(session, **make_page_kwargs(repo.id))
await session.commit()
resp = await client.get(f"/api/repos/{repo.id}/export")
assert resp.status_code == 200
assert resp.headers["content-type"] == "application/zip"
zf = zipfile.ZipFile(BytesIO(resp.content))
names = zf.namelist()
assert len(names) == 1
assert names[0].startswith("wiki/")
assert names[0].endswith(".md")