diff --git a/bin/configure-trusted-publisher b/bin/configure-trusted-publisher new file mode 100755 index 000000000000..aac1537eeaa5 --- /dev/null +++ b/bin/configure-trusted-publisher @@ -0,0 +1,353 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +""" +Configure a PyPI trusted publisher via the API. + +Reads credentials from ~/.pypirc (like twine), auto-detects the provider and +repo from the current git checkout, and lets you pick a workflow file. + +Usage: + configure-trusted-publisher [options] + +Examples: + configure-trusted-publisher mypackage + configure-trusted-publisher mypackage --dry-run + configure-trusted-publisher mypackage --environment release + configure-trusted-publisher mypackage --api-url https://test.pypi.org +""" + +from __future__ import annotations + +import argparse +import configparser +import glob +import json +import os +import re +import subprocess +import sys +import urllib.error +import urllib.request +from pathlib import Path + + +CONTENT_TYPE = "application/vnd.pypi.api-v0-danger+json" +DEFAULT_PYPI_URL = "https://pypi.org" +DEFAULT_REPOSITORY = "pypi" + + +def _read_pypirc(config_file: str | None, repository: str) -> dict: + """Read credentials from ~/.pypirc, mirroring twine's logic.""" + path = Path(config_file) if config_file else Path.home() / ".pypirc" + + parser = configparser.RawConfigParser() + if path.exists(): + try: + parser.read(str(path), encoding="utf-8") + except UnicodeDecodeError: + parser.read(str(path)) + + # Collect server-login defaults (deprecated but supported) + defaults: dict[str, str | None] = { + "username": None, + "password": None, + } + if parser.has_section("server-login"): + defaults["username"] = parser.get("server-login", "username", fallback=None) + defaults["password"] = parser.get("server-login", "password", fallback=None) + + if parser.has_section(repository): + return { + "repository": parser.get(repository, "repository", fallback=None), + "username": parser.get(repository, "username", fallback=defaults["username"]), + "password": parser.get(repository, "password", fallback=defaults["password"]), + } + + return defaults + + +def _resolve_token(args: argparse.Namespace) -> str: + """Resolve API token from CLI arg, env var, or .pypirc.""" + if args.token: + return args.token + + token = os.environ.get("PYPI_TOKEN") + if token: + return token + + config = _read_pypirc(getattr(args, "config_file", None), args.repository) + token = config.get("password") + if token: + return token + + print( + "Error: No API token found.\n" + "Provide via --token, PYPI_TOKEN env var, or ~/.pypirc [pypi] password field.", + file=sys.stderr, + ) + sys.exit(1) + + +def _git_remote_url() -> str | None: + """Return the upstream or origin remote URL of the current git checkout.""" + for remote in ("upstream", "origin"): + try: + result = subprocess.run( + ["git", "remote", "get-url", remote], + capture_output=True, + text=True, + check=True, + ) + return result.stdout.strip() + except subprocess.CalledProcessError: + continue + return None + + +def _parse_github_remote(url: str) -> tuple[str, str] | None: + """Parse owner and repo from a GitHub remote URL.""" + patterns = [ + r"github\.com[:/]([^/]+)/([^/.]+?)(?:\.git)?$", + ] + for pattern in patterns: + m = re.search(pattern, url) + if m: + return m.group(1), m.group(2) + return None + + +def _parse_gitlab_remote(url: str) -> tuple[str, str, str] | None: + """Parse namespace, project, and host from a GitLab remote URL.""" + # Matches both gitlab.com and self-hosted instances + patterns = [ + r"(gitlab\.[^:/]+)[:/](.+)/([^/.]+?)(?:\.git)?$", + ] + for pattern in patterns: + m = re.search(pattern, url) + if m: + host = f"https://{m.group(1)}" + namespace = m.group(2) + project = m.group(3) + return namespace, project, host + return None + + +def _list_github_workflows() -> list[str]: + """List workflow files in .github/workflows/.""" + workflows = sorted(glob.glob(".github/workflows/*.yml") + glob.glob(".github/workflows/*.yaml")) + return [Path(w).name for w in workflows] + + +def _list_gitlab_pipelines() -> list[str]: + """List GitLab CI pipeline files.""" + candidates = [".gitlab-ci.yml", ".gitlab-ci.yaml"] + candidates += sorted(glob.glob("ci/**/*.yml", recursive=True)) + candidates += sorted(glob.glob("ci/**/*.yaml", recursive=True)) + return [p for p in candidates if Path(p).exists()] + + +def _prompt_choice(prompt: str, choices: list[str]) -> str: + """Print numbered choices and prompt user to pick one.""" + for i, choice in enumerate(choices, 1): + print(f" {i}. {choice}") + while True: + raw = input(f"{prompt} [1-{len(choices)}]: ").strip() + try: + idx = int(raw) - 1 + if 0 <= idx < len(choices): + return choices[idx] + except ValueError: + pass + print(f"Please enter a number between 1 and {len(choices)}.") + + +def _api_url(base_url: str, project: str, publisher_id: str | None = None) -> str: + base = base_url.rstrip("/") + path = f"/danger-api/projects/{project}/trusted-publishers" + if publisher_id: + path += f"/{publisher_id}" + return base + path + + +def _call_api(url: str, token: str, payload: dict) -> dict: + body = json.dumps(payload).encode() + req = urllib.request.Request( + url, + data=body, + method="POST", + headers={ + "Authorization": f"Bearer {token}", + "Content-Type": CONTENT_TYPE, + "Accept": CONTENT_TYPE, + }, + ) + try: + with urllib.request.urlopen(req) as resp: + return json.loads(resp.read()) + except urllib.error.HTTPError as e: + body = e.read().decode(errors="replace") + try: + detail = json.loads(body) + except Exception: + detail = {"raw": body} + print(f"Error {e.code}: {detail}", file=sys.stderr) + sys.exit(1) + + +def _print_curl(url: str, token: str, payload: dict, *, expose_token: bool = False) -> None: + body = json.dumps(payload, indent=2) + token_value = token if expose_token else "" + print("\nEquivalent curl command:\n") + print( + f"curl -X POST '{url}' \\\n" + f" -H 'Authorization: Bearer {token_value}' \\\n" + f" -H 'Content-Type: {CONTENT_TYPE}' \\\n" + f" -H 'Accept: {CONTENT_TYPE}' \\\n" + f" -d '{body}'" + ) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Configure a PyPI trusted publisher via the API.", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("project", help="PyPI project name") + parser.add_argument( + "--api-url", + default=DEFAULT_PYPI_URL, + help=f"PyPI API base URL (default: {DEFAULT_PYPI_URL})", + ) + parser.add_argument( + "--repository", + default=DEFAULT_REPOSITORY, + help=f"~/.pypirc repository section (default: {DEFAULT_REPOSITORY})", + ) + parser.add_argument("--token", help="API token (overrides .pypirc and PYPI_TOKEN)") + parser.add_argument( + "--environment", + default="", + help="CI environment name (optional, e.g. 'release')", + ) + parser.add_argument( + "-n", "--dry-run", + action="store_true", + help="Print the curl command instead of calling the API", + ) + parser.add_argument( + "--show-token", + action="store_true", + help="Include the real API token in the --dry-run curl output (CAUTION: token will be visible in your shell history and terminal)", + ) + parser.add_argument( + "--workflow", + help="Workflow filename (skips interactive selection)", + ) + parser.add_argument( + "--config-file", + help="Path to .pypirc (default: ~/.pypirc)", + ) + args = parser.parse_args() + + token = _resolve_token(args) + + remote_url = _git_remote_url() + if not remote_url: + print( + "Error: Not in a git repository or no 'upstream'/'origin' remote found.", + file=sys.stderr, + ) + sys.exit(1) + + github_info = _parse_github_remote(remote_url) + gitlab_info = _parse_gitlab_remote(remote_url) + + if github_info: + owner, repo = github_info + print(f"Detected GitHub repository: {owner}/{repo}") + + workflows = _list_github_workflows() + if args.workflow: + workflow = args.workflow + elif not workflows: + print( + "No workflow files found in .github/workflows/.\n" + "Provide --workflow to specify one manually.", + file=sys.stderr, + ) + sys.exit(1) + elif len(workflows) == 1: + workflow = workflows[0] + print(f"Using workflow: {workflow}") + else: + print("\nAvailable workflows:") + workflow = _prompt_choice("Select workflow", workflows) + + payload: dict = { + "publisher": "github", + "owner": owner, + "repository": repo, + "workflow_filename": workflow, + } + if args.environment: + payload["environment"] = args.environment + + elif gitlab_info: + namespace, project, host = gitlab_info + print(f"Detected GitLab repository: {namespace}/{project} at {host}") + + pipelines = _list_gitlab_pipelines() + if args.workflow: + workflow_filepath = args.workflow + elif not pipelines: + print( + "No pipeline files found. " + "Provide --workflow to specify one manually.", + file=sys.stderr, + ) + sys.exit(1) + elif len(pipelines) == 1: + workflow_filepath = pipelines[0] + print(f"Using pipeline: {workflow_filepath}") + else: + print("\nAvailable pipeline files:") + workflow_filepath = _prompt_choice("Select pipeline file", pipelines) + + payload = { + "publisher": "gitlab", + "namespace": namespace, + "project": project, + "workflow_filepath": workflow_filepath, + "issuer_url": host, + } + if args.environment: + payload["environment"] = args.environment + + else: + print( + f"Error: Could not detect GitHub or GitLab from remote URL: {remote_url}\n" + "Only GitHub and GitLab are supported by this auto-detection script.\n" + "For Google or ActiveState publishers, call the API directly.", + file=sys.stderr, + ) + sys.exit(1) + + url = _api_url(args.api_url, args.project) + + if args.dry_run: + _print_curl(url, token, payload, expose_token=args.show_token) + return + + result = _call_api(url, token, payload) + publisher = result.get("trusted_publisher", {}) + print( + f"\nSuccess! Trusted publisher added to {args.project}:\n" + f" Provider: {publisher.get('publisher_name')}\n" + f" Specifier: {publisher.get('specifier')}\n" + f" URL: {publisher.get('publisher_url')}\n" + f" ID: {publisher.get('id')}" + ) + + +if __name__ == "__main__": + main() diff --git a/tests/unit/api/test_oidc_publishers.py b/tests/unit/api/test_oidc_publishers.py new file mode 100644 index 000000000000..946e7b64e687 --- /dev/null +++ b/tests/unit/api/test_oidc_publishers.py @@ -0,0 +1,1106 @@ +# SPDX-License-Identifier: Apache-2.0 + +import uuid + +import pretend +import pytest + +from pyramid.httpexceptions import ( + HTTPBadRequest, + HTTPConflict, + HTTPForbidden, + HTTPNotFound, + HTTPTooManyRequests, +) +from webob.multidict import MultiDict + +from tests.common.db.accounts import EmailFactory, UserFactory +from tests.common.db.oidc import ( + ActiveStatePublisherFactory, + GitHubPublisherFactory, + GitLabPublisherFactory, + GooglePublisherFactory, +) +from tests.common.db.packaging import ProjectFactory, RoleFactory +from warehouse.admin.flags import AdminFlagValue +from warehouse.api import oidc_publishers as views +from warehouse.events.tags import EventTag +from warehouse.metrics import IMetricsService +from warehouse.oidc.interfaces import TooManyOIDCRegistrations +from warehouse.oidc.models import ( + ActiveStatePublisher, + GitHubPublisher, + GitLabPublisher, + GooglePublisher, + OIDCPublisher, +) +from warehouse.rate_limiting import IRateLimiter + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_ratelimiters(user_ok=True, ip_ok=True): + resets = pretend.stub(total_seconds=lambda: 60) + return { + "user": pretend.stub( + test=pretend.call_recorder(lambda uid: user_ok), + hit=pretend.call_recorder(lambda uid: None), + resets_in=pretend.call_recorder(lambda uid: resets), + ), + "ip": pretend.stub( + test=pretend.call_recorder(lambda ip: ip_ok), + hit=pretend.call_recorder(lambda ip: None), + resets_in=pretend.call_recorder(lambda ip: resets), + ), + } + + +def _make_find_service(metrics, ratelimiters=None): + ratelimiters = ratelimiters or _make_ratelimiters() + + def find_service(iface=None, context=None, name=""): + if iface is IMetricsService: + return metrics + if iface is IRateLimiter and name == "user_oidc.publisher.register": + return ratelimiters["user"] + if iface is IRateLimiter and name == "ip_oidc.publisher.register": + return ratelimiters["ip"] + raise LookupError(f"No service: {iface!r} name={name!r}") # pragma: no cover + + return find_service + + +# --------------------------------------------------------------------------- +# _multidict_from_json +# --------------------------------------------------------------------------- + + +class TestMultidictFromJson: + def test_simple_fields(self): + md = views._multidict_from_json( + {"publisher": "github", "owner": "myorg", "environment": ""} + ) + assert md["publisher"] == "github" + assert md["owner"] == "myorg" + # empty string is kept + assert md["environment"] == "" + + def test_none_values_excluded(self): + md = views._multidict_from_json({"owner": "myorg", "sub": None}) + assert "owner" in md + assert "sub" not in md + + def test_non_string_coerced(self): + md = views._multidict_from_json({"count": 42}) + assert md["count"] == "42" + + +# --------------------------------------------------------------------------- +# GET trusted publishers +# --------------------------------------------------------------------------- + + +class TestAPIGetTrustedPublishers: + def test_empty_list(self): + project = pretend.stub(oidc_publishers=[]) + request = pretend.stub() + result = views.api_get_trusted_publishers(project, request) + assert result == {"trusted_publishers": []} + + def test_with_publishers(self, monkeypatch): + pub_id = uuid.uuid4() + publisher = pretend.stub( + id=pub_id, + publisher_name="GitHub", + publisher_url=pretend.call_recorder( + lambda: "https://github.com/owner/repo" + ), + ) + monkeypatch.setattr(publisher.__class__, "__str__", lambda s: "owner/repo/.github/workflows/publish.yml") + project = pretend.stub(oidc_publishers=[publisher]) + request = pretend.stub() + + result = views.api_get_trusted_publishers(project, request) + + assert result == { + "trusted_publishers": [ + { + "id": str(pub_id), + "publisher_name": "GitHub", + "publisher_url": "https://github.com/owner/repo", + "specifier": "owner/repo/.github/workflows/publish.yml", + } + ] + } + + +# --------------------------------------------------------------------------- +# POST add trusted publisher — shared behaviours +# --------------------------------------------------------------------------- + + +class TestAPIAddTrustedPublisher: + def test_oidc_globally_disabled(self, metrics): + project = pretend.stub() + request = pretend.stub( + find_service=_make_find_service(metrics), + flags=pretend.stub(disallow_oidc=pretend.call_recorder(lambda f=None: True)), + json_body={"publisher": "github"}, + user=pretend.stub(id=uuid.uuid4()), + remote_addr="1.2.3.4", + ) + + with pytest.raises(HTTPForbidden) as exc: + views.api_add_trusted_publisher(project, request) + + assert "temporarily disabled" in exc.value.json["error"] + assert request.flags.disallow_oidc.calls == [pretend.call()] + + def test_unknown_publisher_type(self, metrics): + project = pretend.stub() + request = pretend.stub( + find_service=_make_find_service(metrics), + flags=pretend.stub(disallow_oidc=pretend.call_recorder(lambda f=None: False)), + json_body={"publisher": "unknown_provider"}, + user=pretend.stub(id=uuid.uuid4()), + remote_addr="1.2.3.4", + ) + + with pytest.raises(HTTPBadRequest) as exc: + views.api_add_trusted_publisher(project, request) + + assert "Unknown publisher type" in exc.value.json["error"] + + +# --------------------------------------------------------------------------- +# POST add trusted publisher — GitHub +# --------------------------------------------------------------------------- + + +class TestAPIAddTrustedPublisherGitHub: + def _make_request(self, metrics, ratelimiters, data, *, github_disabled=False): + def disallow_oidc(flag=None): + if flag is AdminFlagValue.DISALLOW_GITHUB_OIDC: + return github_disabled + return False + + return pretend.stub( + find_service=_make_find_service(metrics, ratelimiters), + flags=pretend.stub(disallow_oidc=pretend.call_recorder(disallow_oidc)), + json_body=data, + user=pretend.stub(id=uuid.uuid4(), username="testuser"), + remote_addr="1.2.3.4", + registry=pretend.stub(settings={"github.token": "fake-token"}), + ) + + def test_github_admin_disabled(self, metrics): + ratelimiters = _make_ratelimiters() + request = self._make_request( + metrics, + ratelimiters, + {"publisher": "github"}, + github_disabled=True, + ) + + with pytest.raises(HTTPForbidden) as exc: + views.api_add_trusted_publisher(pretend.stub(), request) + + assert "GitHub-based" in exc.value.json["error"] + + def test_ratelimited_by_user(self, metrics): + ratelimiters = _make_ratelimiters(user_ok=False) + request = self._make_request( + metrics, + ratelimiters, + {"publisher": "github"}, + ) + + with pytest.raises(HTTPTooManyRequests): + views.api_add_trusted_publisher(pretend.stub(), request) + + assert metrics.increment.calls == [ + pretend.call( + "warehouse.oidc.add_publisher.attempt", tags=["publisher:GitHub"] + ), + pretend.call( + "warehouse.oidc.add_publisher.ratelimited", tags=["publisher:GitHub"] + ), + ] + + def test_ratelimited_by_ip(self, metrics): + ratelimiters = _make_ratelimiters(user_ok=True, ip_ok=False) + request = self._make_request( + metrics, + ratelimiters, + {"publisher": "github"}, + ) + + with pytest.raises(HTTPTooManyRequests): + views.api_add_trusted_publisher(pretend.stub(), request) + + def test_form_validation_error(self, metrics, monkeypatch): + ratelimiters = _make_ratelimiters() + request = self._make_request( + metrics, + ratelimiters, + {"publisher": "github", "owner": "", "repository": "", "workflow_filename": ""}, + ) + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: False), + errors={"owner": ["This field is required."]}, + ) + form_cls = pretend.call_recorder(lambda *a, **kw: form_obj) + monkeypatch.setattr(views, "GitHubPublisherForm", form_cls) + + with pytest.raises(HTTPBadRequest) as exc: + views.api_add_trusted_publisher(pretend.stub(), request) + + assert exc.value.json["errors"] == form_obj.errors + + def test_add_new_github_publisher(self, monkeypatch, db_request, metrics): + owner = UserFactory.create() + db_request.user = owner + db_request.find_service = _make_find_service(metrics) + db_request.registry.settings["github.token"] = "fake-token" + + project = ProjectFactory.create(oidc_publishers=[]) + RoleFactory.create(user=owner, project=project, role_name="Owner") + project.record_event = pretend.call_recorder(lambda *a, **kw: None) + + db_request.flags = pretend.stub( + disallow_oidc=pretend.call_recorder(lambda f=None: False) + ) + db_request.json_body = { + "publisher": "github", + "owner": "myorg", + "repository": "myrepo", + "workflow_filename": "publish.yml", + "environment": "release", + } + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + normalized_owner="myorg", + owner_id="12345", + repository=pretend.stub(data="myrepo"), + workflow_filename=pretend.stub(data="publish.yml"), + normalized_environment="release", + ) + monkeypatch.setattr( + views, "GitHubPublisherForm", lambda *a, **kw: form_obj + ) + monkeypatch.setattr( + views, + "send_trusted_publisher_added_email", + pretend.call_recorder(lambda *a, **kw: None), + ) + + result = views.api_add_trusted_publisher(project, db_request) + + assert db_request.response.status == "201 Created" + assert result["trusted_publisher"]["publisher_name"] == "GitHub" + + publisher = db_request.db.query(GitHubPublisher).one() + assert publisher.repository_owner == "myorg" + assert publisher.repository_name == "myrepo" + assert publisher.workflow_filename == "publish.yml" + assert publisher.environment == "release" + assert publisher in project.oidc_publishers + + assert project.record_event.calls == [ + pretend.call( + tag=EventTag.Project.OIDCPublisherAdded, + request=db_request, + additional={ + "publisher": publisher.publisher_name, + "id": str(publisher.id), + "specifier": str(publisher), + "url": publisher.publisher_url(), + "submitted_by": owner.username, + "reified_from_pending_publisher": False, + "constrained_from_existing_publisher": False, + }, + ) + ] + assert metrics.increment.calls[-1] == pretend.call( + "warehouse.oidc.add_publisher.ok", tags=["publisher:GitHub"] + ) + + def test_reuses_existing_github_publisher(self, monkeypatch, db_request, metrics): + owner = UserFactory.create() + db_request.user = owner + db_request.find_service = _make_find_service(metrics) + db_request.registry.settings["github.token"] = "fake-token" + + existing = GitHubPublisherFactory.create( + repository_owner="myorg", + repository_name="myrepo", + workflow_filename="publish.yml", + environment="release", + ) + project = ProjectFactory.create(oidc_publishers=[]) + project.record_event = pretend.call_recorder(lambda *a, **kw: None) + + db_request.flags = pretend.stub( + disallow_oidc=pretend.call_recorder(lambda f=None: False) + ) + db_request.json_body = { + "publisher": "github", + "owner": "myorg", + "repository": "myrepo", + "workflow_filename": "publish.yml", + "environment": "release", + } + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + normalized_owner="myorg", + owner_id="12345", + repository=pretend.stub(data="myrepo"), + workflow_filename=pretend.stub(data="publish.yml"), + normalized_environment="release", + ) + monkeypatch.setattr(views, "GitHubPublisherForm", lambda *a, **kw: form_obj) + monkeypatch.setattr( + views, + "send_trusted_publisher_added_email", + pretend.call_recorder(lambda *a, **kw: None), + ) + + result = views.api_add_trusted_publisher(project, db_request) + + assert result["trusted_publisher"]["id"] == str(existing.id) + # Only one publisher row exists (reused, not duplicated) + assert db_request.db.query(GitHubPublisher).count() == 1 + assert existing in project.oidc_publishers + + def test_conflict_already_registered(self, monkeypatch, db_request, metrics): + owner = UserFactory.create() + db_request.user = owner + db_request.find_service = _make_find_service(metrics) + db_request.registry.settings["github.token"] = "fake-token" + + publisher = GitHubPublisherFactory.create( + repository_owner="myorg", + repository_name="myrepo", + workflow_filename="publish.yml", + environment="release", + ) + project = ProjectFactory.create(oidc_publishers=[publisher]) + + db_request.flags = pretend.stub( + disallow_oidc=pretend.call_recorder(lambda f=None: False) + ) + db_request.json_body = { + "publisher": "github", + "owner": "myorg", + "repository": "myrepo", + "workflow_filename": "publish.yml", + "environment": "release", + } + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + normalized_owner="myorg", + owner_id="12345", + repository=pretend.stub(data="myrepo"), + workflow_filename=pretend.stub(data="publish.yml"), + normalized_environment="release", + ) + monkeypatch.setattr(views, "GitHubPublisherForm", lambda *a, **kw: form_obj) + + with pytest.raises(HTTPConflict) as exc: + views.api_add_trusted_publisher(project, db_request) + + assert project.name in exc.value.json["error"] + + +# --------------------------------------------------------------------------- +# POST add trusted publisher — GitLab +# --------------------------------------------------------------------------- + + +class TestAPIAddTrustedPublisherGitLab: + def _make_request(self, metrics, ratelimiters, data, *, gitlab_disabled=False): + def disallow_oidc(flag=None): + if flag is AdminFlagValue.DISALLOW_GITLAB_OIDC: + return gitlab_disabled + return False + + return pretend.stub( + find_service=_make_find_service(metrics, ratelimiters), + flags=pretend.stub(disallow_oidc=pretend.call_recorder(disallow_oidc)), + json_body=data, + user=pretend.stub(id=uuid.uuid4(), username="testuser"), + remote_addr="1.2.3.4", + ) + + def test_gitlab_admin_disabled(self, metrics): + def disallow_oidc(flag=None): + return flag is AdminFlagValue.DISALLOW_GITLAB_OIDC + + request = pretend.stub( + find_service=_make_find_service(metrics), + flags=pretend.stub(disallow_oidc=disallow_oidc), + json_body={"publisher": "gitlab"}, + user=pretend.stub(id=uuid.uuid4()), + remote_addr="1.2.3.4", + registry=pretend.stub(settings={}), + ) + + with pytest.raises(HTTPForbidden) as exc: + views.api_add_trusted_publisher(pretend.stub(), request) + + assert "GitLab-based" in exc.value.json["error"] + + def test_ratelimited_by_user(self, metrics): + ratelimiters = _make_ratelimiters(user_ok=False) + request = self._make_request(metrics, ratelimiters, {"publisher": "gitlab"}) + + with pytest.raises(HTTPTooManyRequests): + views.api_add_trusted_publisher(pretend.stub(), request) + + assert metrics.increment.calls == [ + pretend.call( + "warehouse.oidc.add_publisher.attempt", tags=["publisher:GitLab"] + ), + pretend.call( + "warehouse.oidc.add_publisher.ratelimited", tags=["publisher:GitLab"] + ), + ] + + def test_ratelimited_by_ip(self, metrics): + ratelimiters = _make_ratelimiters(user_ok=True, ip_ok=False) + request = self._make_request(metrics, ratelimiters, {"publisher": "gitlab"}) + + with pytest.raises(HTTPTooManyRequests): + views.api_add_trusted_publisher(pretend.stub(), request) + + def test_form_validation_error(self, metrics, monkeypatch): + ratelimiters = _make_ratelimiters() + request = self._make_request( + metrics, + ratelimiters, + {"publisher": "gitlab", "namespace": "", "project": "", "workflow_filepath": ""}, + ) + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: False), + errors={"namespace": ["This field is required."]}, + ) + monkeypatch.setattr(views, "GitLabPublisherForm", lambda *a, **kw: form_obj) + monkeypatch.setattr( + views.GitLabPublisher, + "get_available_issuer_urls", + pretend.call_recorder(lambda *a, **kw: ["https://gitlab.com"]), + ) + + with pytest.raises(HTTPBadRequest) as exc: + views.api_add_trusted_publisher(pretend.stub(organization=None), request) + + assert exc.value.json["errors"] == form_obj.errors + + def test_add_new_gitlab_publisher(self, monkeypatch, db_request, metrics): + owner = UserFactory.create() + db_request.user = owner + db_request.find_service = _make_find_service(metrics) + + project = ProjectFactory.create(oidc_publishers=[]) + project.record_event = pretend.call_recorder(lambda *a, **kw: None) + + db_request.flags = pretend.stub( + disallow_oidc=pretend.call_recorder(lambda f=None: False) + ) + db_request.json_body = { + "publisher": "gitlab", + "namespace": "mygroup", + "project": "myrepo", + "workflow_filepath": ".gitlab-ci.yml", + "environment": "production", + "issuer_url": "https://gitlab.com", + } + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + namespace=pretend.stub(data="mygroup"), + project=pretend.stub(data="myrepo"), + workflow_filepath=pretend.stub(data=".gitlab-ci.yml"), + normalized_environment="production", + issuer_url=pretend.stub(data="https://gitlab.com"), + ) + monkeypatch.setattr(views, "GitLabPublisherForm", lambda *a, **kw: form_obj) + monkeypatch.setattr( + views, + "send_trusted_publisher_added_email", + pretend.call_recorder(lambda *a, **kw: None), + ) + monkeypatch.setattr( + views.GitLabPublisher, + "get_available_issuer_urls", + pretend.call_recorder( + lambda *a, **kw: ["https://gitlab.com"] + ), + ) + + result = views.api_add_trusted_publisher(project, db_request) + + assert db_request.response.status == "201 Created" + assert result["trusted_publisher"]["publisher_name"] == "GitLab" + + publisher = db_request.db.query(GitLabPublisher).one() + assert publisher.namespace == "mygroup" + assert publisher in project.oidc_publishers + + def test_reuses_existing_gitlab_publisher(self, monkeypatch, db_request, metrics): + owner = UserFactory.create() + db_request.user = owner + db_request.find_service = _make_find_service(metrics) + + existing = GitLabPublisherFactory.create( + namespace="mygroup", + project="myrepo", + workflow_filepath=".gitlab-ci.yml", + environment="production", + issuer_url="https://gitlab.com", + ) + project = ProjectFactory.create(oidc_publishers=[]) + project.record_event = pretend.call_recorder(lambda *a, **kw: None) + + db_request.flags = pretend.stub( + disallow_oidc=pretend.call_recorder(lambda f=None: False) + ) + db_request.json_body = { + "publisher": "gitlab", + "namespace": "mygroup", + "project": "myrepo", + "workflow_filepath": ".gitlab-ci.yml", + "environment": "production", + "issuer_url": "https://gitlab.com", + } + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + namespace=pretend.stub(data="mygroup"), + project=pretend.stub(data="myrepo"), + workflow_filepath=pretend.stub(data=".gitlab-ci.yml"), + normalized_environment="production", + issuer_url=pretend.stub(data="https://gitlab.com"), + ) + monkeypatch.setattr(views, "GitLabPublisherForm", lambda *a, **kw: form_obj) + monkeypatch.setattr( + views, + "send_trusted_publisher_added_email", + pretend.call_recorder(lambda *a, **kw: None), + ) + monkeypatch.setattr( + views.GitLabPublisher, + "get_available_issuer_urls", + pretend.call_recorder(lambda *a, **kw: ["https://gitlab.com"]), + ) + + result = views.api_add_trusted_publisher(project, db_request) + + assert result["trusted_publisher"]["id"] == str(existing.id) + assert db_request.db.query(GitLabPublisher).count() == 1 + assert existing in project.oidc_publishers + + +# --------------------------------------------------------------------------- +# POST add trusted publisher — Google +# --------------------------------------------------------------------------- + + +class TestAPIAddTrustedPublisherGoogle: + def _make_request(self, metrics, ratelimiters, data, *, google_disabled=False): + def disallow_oidc(flag=None): + if flag is AdminFlagValue.DISALLOW_GOOGLE_OIDC: + return google_disabled + return False + + return pretend.stub( + find_service=_make_find_service(metrics, ratelimiters), + flags=pretend.stub(disallow_oidc=pretend.call_recorder(disallow_oidc)), + json_body=data, + user=pretend.stub(id=uuid.uuid4(), username="testuser"), + remote_addr="1.2.3.4", + ) + + def test_google_admin_disabled(self, metrics): + def disallow_oidc(flag=None): + return flag is AdminFlagValue.DISALLOW_GOOGLE_OIDC + + request = pretend.stub( + find_service=_make_find_service(metrics), + flags=pretend.stub(disallow_oidc=disallow_oidc), + json_body={"publisher": "google"}, + user=pretend.stub(id=uuid.uuid4()), + remote_addr="1.2.3.4", + registry=pretend.stub(settings={}), + ) + + with pytest.raises(HTTPForbidden) as exc: + views.api_add_trusted_publisher(pretend.stub(), request) + + assert "Google-based" in exc.value.json["error"] + + def test_ratelimited_by_user(self, metrics): + ratelimiters = _make_ratelimiters(user_ok=False) + request = self._make_request(metrics, ratelimiters, {"publisher": "google"}) + + with pytest.raises(HTTPTooManyRequests): + views.api_add_trusted_publisher(pretend.stub(), request) + + assert metrics.increment.calls == [ + pretend.call( + "warehouse.oidc.add_publisher.attempt", tags=["publisher:Google"] + ), + pretend.call( + "warehouse.oidc.add_publisher.ratelimited", tags=["publisher:Google"] + ), + ] + + def test_ratelimited_by_ip(self, metrics): + ratelimiters = _make_ratelimiters(user_ok=True, ip_ok=False) + request = self._make_request(metrics, ratelimiters, {"publisher": "google"}) + + with pytest.raises(HTTPTooManyRequests): + views.api_add_trusted_publisher(pretend.stub(), request) + + def test_form_validation_error(self, metrics, monkeypatch): + ratelimiters = _make_ratelimiters() + request = self._make_request( + metrics, + ratelimiters, + {"publisher": "google", "email": ""}, + ) + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: False), + errors={"email": ["This field is required."]}, + ) + monkeypatch.setattr(views, "GooglePublisherForm", lambda *a, **kw: form_obj) + + with pytest.raises(HTTPBadRequest) as exc: + views.api_add_trusted_publisher(pretend.stub(), request) + + assert exc.value.json["errors"] == form_obj.errors + + def test_add_new_google_publisher(self, monkeypatch, db_request, metrics): + owner = UserFactory.create() + db_request.user = owner + db_request.find_service = _make_find_service(metrics) + + project = ProjectFactory.create(oidc_publishers=[]) + project.record_event = pretend.call_recorder(lambda *a, **kw: None) + + db_request.flags = pretend.stub( + disallow_oidc=pretend.call_recorder(lambda f=None: False) + ) + db_request.json_body = { + "publisher": "google", + "email": "sa@my-project.iam.gserviceaccount.com", + "sub": "123456", + } + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + email=pretend.stub(data="sa@my-project.iam.gserviceaccount.com"), + sub=pretend.stub(data="123456"), + ) + monkeypatch.setattr(views, "GooglePublisherForm", lambda *a, **kw: form_obj) + monkeypatch.setattr( + views, + "send_trusted_publisher_added_email", + pretend.call_recorder(lambda *a, **kw: None), + ) + + result = views.api_add_trusted_publisher(project, db_request) + + assert db_request.response.status == "201 Created" + assert result["trusted_publisher"]["publisher_name"] == "Google" + + publisher = db_request.db.query(GooglePublisher).one() + assert publisher.email == "sa@my-project.iam.gserviceaccount.com" + assert publisher.sub == "123456" + assert publisher in project.oidc_publishers + + def test_reuses_existing_google_publisher(self, monkeypatch, db_request, metrics): + owner = UserFactory.create() + db_request.user = owner + db_request.find_service = _make_find_service(metrics) + + existing = GooglePublisherFactory.create( + email="sa@my-project.iam.gserviceaccount.com", + sub="123456", + ) + project = ProjectFactory.create(oidc_publishers=[]) + project.record_event = pretend.call_recorder(lambda *a, **kw: None) + + db_request.flags = pretend.stub( + disallow_oidc=pretend.call_recorder(lambda f=None: False) + ) + db_request.json_body = { + "publisher": "google", + "email": "sa@my-project.iam.gserviceaccount.com", + "sub": "123456", + } + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + email=pretend.stub(data="sa@my-project.iam.gserviceaccount.com"), + sub=pretend.stub(data="123456"), + ) + monkeypatch.setattr(views, "GooglePublisherForm", lambda *a, **kw: form_obj) + monkeypatch.setattr( + views, + "send_trusted_publisher_added_email", + pretend.call_recorder(lambda *a, **kw: None), + ) + + result = views.api_add_trusted_publisher(project, db_request) + + assert result["trusted_publisher"]["id"] == str(existing.id) + assert db_request.db.query(GooglePublisher).count() == 1 + assert existing in project.oidc_publishers + + +# --------------------------------------------------------------------------- +# POST add trusted publisher — ActiveState +# --------------------------------------------------------------------------- + + +class TestAPIAddTrustedPublisherActiveState: + def _make_request(self, metrics, ratelimiters, data, *, activestate_disabled=False): + def disallow_oidc(flag=None): + if flag is AdminFlagValue.DISALLOW_ACTIVESTATE_OIDC: + return activestate_disabled + return False + + return pretend.stub( + find_service=_make_find_service(metrics, ratelimiters), + flags=pretend.stub(disallow_oidc=pretend.call_recorder(disallow_oidc)), + json_body=data, + user=pretend.stub(id=uuid.uuid4(), username="testuser"), + remote_addr="1.2.3.4", + ) + + def test_activestate_admin_disabled(self, metrics): + def disallow_oidc(flag=None): + return flag is AdminFlagValue.DISALLOW_ACTIVESTATE_OIDC + + request = pretend.stub( + find_service=_make_find_service(metrics), + flags=pretend.stub(disallow_oidc=disallow_oidc), + json_body={"publisher": "activestate"}, + user=pretend.stub(id=uuid.uuid4()), + remote_addr="1.2.3.4", + registry=pretend.stub(settings={}), + ) + + with pytest.raises(HTTPForbidden) as exc: + views.api_add_trusted_publisher(pretend.stub(), request) + + assert "ActiveState-based" in exc.value.json["error"] + + def test_ratelimited_by_user(self, metrics): + ratelimiters = _make_ratelimiters(user_ok=False) + request = self._make_request(metrics, ratelimiters, {"publisher": "activestate"}) + + with pytest.raises(HTTPTooManyRequests): + views.api_add_trusted_publisher(pretend.stub(), request) + + assert metrics.increment.calls == [ + pretend.call( + "warehouse.oidc.add_publisher.attempt", tags=["publisher:ActiveState"] + ), + pretend.call( + "warehouse.oidc.add_publisher.ratelimited", + tags=["publisher:ActiveState"], + ), + ] + + def test_ratelimited_by_ip(self, metrics): + ratelimiters = _make_ratelimiters(user_ok=True, ip_ok=False) + request = self._make_request(metrics, ratelimiters, {"publisher": "activestate"}) + + with pytest.raises(HTTPTooManyRequests): + views.api_add_trusted_publisher(pretend.stub(), request) + + def test_form_validation_error(self, metrics, monkeypatch): + ratelimiters = _make_ratelimiters() + request = self._make_request( + metrics, + ratelimiters, + {"publisher": "activestate", "organization": "", "project": "", "actor": ""}, + ) + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: False), + errors={"organization": ["This field is required."]}, + ) + monkeypatch.setattr( + views, "ActiveStatePublisherForm", lambda *a, **kw: form_obj + ) + + with pytest.raises(HTTPBadRequest) as exc: + views.api_add_trusted_publisher(pretend.stub(), request) + + assert exc.value.json["errors"] == form_obj.errors + + def test_add_new_activestate_publisher(self, monkeypatch, db_request, metrics): + owner = UserFactory.create() + db_request.user = owner + db_request.find_service = _make_find_service(metrics) + + project = ProjectFactory.create(oidc_publishers=[]) + project.record_event = pretend.call_recorder(lambda *a, **kw: None) + + db_request.flags = pretend.stub( + disallow_oidc=pretend.call_recorder(lambda f=None: False) + ) + actor_id = str(uuid.uuid4()) + db_request.json_body = { + "publisher": "activestate", + "organization": "myorg", + "project": "myproject", + "actor": "myuser", + } + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + organization=pretend.stub(data="myorg"), + project=pretend.stub(data="myproject"), + actor=pretend.stub(data="myuser"), + actor_id=actor_id, + ) + monkeypatch.setattr( + views, "ActiveStatePublisherForm", lambda *a, **kw: form_obj + ) + monkeypatch.setattr( + views, + "send_trusted_publisher_added_email", + pretend.call_recorder(lambda *a, **kw: None), + ) + + result = views.api_add_trusted_publisher(project, db_request) + + assert db_request.response.status == "201 Created" + assert result["trusted_publisher"]["publisher_name"] == "ActiveState" + + publisher = db_request.db.query(ActiveStatePublisher).one() + assert publisher.organization == "myorg" + assert publisher.actor == "myuser" + assert publisher in project.oidc_publishers + + def test_reuses_existing_activestate_publisher( + self, monkeypatch, db_request, metrics + ): + owner = UserFactory.create() + db_request.user = owner + db_request.find_service = _make_find_service(metrics) + + actor_id = str(uuid.uuid4()) + existing = ActiveStatePublisherFactory.create( + organization="myorg", + activestate_project_name="myproject", + actor="myuser", + actor_id=actor_id, + ) + project = ProjectFactory.create(oidc_publishers=[]) + project.record_event = pretend.call_recorder(lambda *a, **kw: None) + + db_request.flags = pretend.stub( + disallow_oidc=pretend.call_recorder(lambda f=None: False) + ) + db_request.json_body = { + "publisher": "activestate", + "organization": "myorg", + "project": "myproject", + "actor": "myuser", + } + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + organization=pretend.stub(data="myorg"), + project=pretend.stub(data="myproject"), + actor=pretend.stub(data="myuser"), + actor_id=actor_id, + ) + monkeypatch.setattr( + views, "ActiveStatePublisherForm", lambda *a, **kw: form_obj + ) + monkeypatch.setattr( + views, + "send_trusted_publisher_added_email", + pretend.call_recorder(lambda *a, **kw: None), + ) + + result = views.api_add_trusted_publisher(project, db_request) + + assert result["trusted_publisher"]["id"] == str(existing.id) + assert db_request.db.query(ActiveStatePublisher).count() == 1 + assert existing in project.oidc_publishers + + +# --------------------------------------------------------------------------- +# DELETE trusted publisher +# --------------------------------------------------------------------------- + + +class TestAPIDeleteTrustedPublisher: + def test_oidc_globally_disabled(self, metrics): + project = pretend.stub() + request = pretend.stub( + find_service=_make_find_service(metrics), + flags=pretend.stub(disallow_oidc=pretend.call_recorder(lambda f=None: True)), + matchdict={"publisher_id": str(uuid.uuid4())}, + user=pretend.stub(id=uuid.uuid4(), username="testuser"), + ) + + with pytest.raises(HTTPForbidden) as exc: + views.api_delete_trusted_publisher(project, request) + + assert "temporarily disabled" in exc.value.json["error"] + + def test_publisher_not_found(self, metrics, db_request): + db_request.find_service = _make_find_service(metrics) + db_request.flags = pretend.stub( + disallow_oidc=pretend.call_recorder(lambda f=None: False) + ) + db_request.matchdict = {"publisher_id": str(uuid.uuid4())} + + project = ProjectFactory.create(oidc_publishers=[]) + + result = views.api_delete_trusted_publisher(project, db_request) + + assert isinstance(result, HTTPNotFound) + assert "not found" in result.json["error"] + assert metrics.increment.calls == [ + pretend.call("warehouse.oidc.delete_publisher.attempt") + ] + + def test_publisher_not_on_project(self, metrics, db_request): + db_request.user = UserFactory.create() + db_request.find_service = _make_find_service(metrics) + db_request.flags = pretend.stub( + disallow_oidc=pretend.call_recorder(lambda f=None: False) + ) + + publisher = GitHubPublisherFactory.create() + db_request.db.add(publisher) + db_request.db.flush() + + # Publisher exists in DB but is NOT on this project + project = ProjectFactory.create(oidc_publishers=[]) + db_request.matchdict = {"publisher_id": str(publisher.id)} + + result = views.api_delete_trusted_publisher(project, db_request) + + assert isinstance(result, HTTPNotFound) + + @pytest.mark.parametrize( + "publisher_factory", + [ + GitHubPublisherFactory, + GitLabPublisherFactory, + GooglePublisherFactory, + ActiveStatePublisherFactory, + ], + ) + def test_delete_publisher_entirely( + self, monkeypatch, db_request, metrics, publisher_factory + ): + owner = UserFactory.create() + EmailFactory.create(user=owner, verified=True, primary=True) + db_request.user = owner + db_request.find_service = _make_find_service(metrics) + db_request.flags = pretend.stub( + disallow_oidc=pretend.call_recorder(lambda f=None: False) + ) + + publisher = publisher_factory.create() + db_request.db.flush() + + project = ProjectFactory.create(oidc_publishers=[publisher]) + RoleFactory.create(user=owner, project=project, role_name="Owner") + project.record_event = pretend.call_recorder(lambda *a, **kw: None) + db_request.matchdict = {"publisher_id": str(publisher.id)} + + monkeypatch.setattr( + views, + "send_trusted_publisher_removed_email", + pretend.call_recorder(lambda *a, **kw: None), + ) + + result = views.api_delete_trusted_publisher(project, db_request) + + assert result == {"message": f"Removed trusted publisher from {project.name}"} + assert publisher not in project.oidc_publishers + # Removed from DB entirely since no other projects reference it + assert db_request.db.query(OIDCPublisher).count() == 0 + + assert project.record_event.calls == [ + pretend.call( + tag=EventTag.Project.OIDCPublisherRemoved, + request=db_request, + additional={ + "publisher": publisher.publisher_name, + "id": str(publisher.id), + "specifier": str(publisher), + "url": publisher.publisher_url(), + "submitted_by": owner.username, + }, + ) + ] + assert metrics.increment.calls == [ + pretend.call("warehouse.oidc.delete_publisher.attempt"), + pretend.call( + "warehouse.oidc.delete_publisher.ok", + tags=[f"publisher:{publisher.publisher_name}"], + ), + ] + assert views.send_trusted_publisher_removed_email.calls == [ + pretend.call( + db_request, + owner, + project_name=project.name, + publisher=publisher, + ) + ] + + def test_delete_shared_publisher_keeps_row( + self, monkeypatch, db_request, metrics + ): + owner = UserFactory.create() + EmailFactory.create(user=owner, verified=True, primary=True) + db_request.user = owner + db_request.find_service = _make_find_service(metrics) + db_request.flags = pretend.stub( + disallow_oidc=pretend.call_recorder(lambda f=None: False) + ) + + publisher = GitHubPublisherFactory.create() + db_request.db.flush() + + project = ProjectFactory.create(oidc_publishers=[publisher]) + project.record_event = pretend.call_recorder(lambda *a, **kw: None) + other_project = ProjectFactory.create(oidc_publishers=[publisher]) + db_request.matchdict = {"publisher_id": str(publisher.id)} + + monkeypatch.setattr( + views, + "send_trusted_publisher_removed_email", + pretend.call_recorder(lambda *a, **kw: None), + ) + + views.api_delete_trusted_publisher(project, db_request) + + # Publisher is removed from this project … + assert publisher not in project.oidc_publishers + # … but still exists in the DB because other_project still uses it + assert db_request.db.query(OIDCPublisher).one() == publisher + assert publisher in other_project.oidc_publishers diff --git a/tests/unit/packaging/test_models.py b/tests/unit/packaging/test_models.py index fbceea18a18e..7acc7ca07d2b 100644 --- a/tests/unit/packaging/test_models.py +++ b/tests/unit/packaging/test_models.py @@ -201,6 +201,7 @@ def test_acl(self, db_session): Permissions.ProjectsRead, Permissions.ProjectsUpload, Permissions.ProjectsWrite, + Permissions.APITrustedPublishersManage, ], ), ( @@ -210,6 +211,7 @@ def test_acl(self, db_session): Permissions.ProjectsRead, Permissions.ProjectsUpload, Permissions.ProjectsWrite, + Permissions.APITrustedPublishersManage, ], ), ( @@ -219,6 +221,7 @@ def test_acl(self, db_session): Permissions.ProjectsRead, Permissions.ProjectsUpload, Permissions.ProjectsWrite, + Permissions.APITrustedPublishersManage, ], ), ( @@ -228,6 +231,7 @@ def test_acl(self, db_session): Permissions.ProjectsRead, Permissions.ProjectsUpload, Permissions.ProjectsWrite, + Permissions.APITrustedPublishersManage, ], ), ], @@ -366,6 +370,7 @@ def test_acl_for_archived_project(self, db_session): _perms_read_and_write = [ Permissions.ProjectsRead, Permissions.ProjectsWrite, + Permissions.APITrustedPublishersManage, ] assert acls == [ ( @@ -953,6 +958,7 @@ def test_acl(self, db_session): Permissions.ProjectsRead, Permissions.ProjectsUpload, Permissions.ProjectsWrite, + Permissions.APITrustedPublishersManage, ], ), ( @@ -962,6 +968,7 @@ def test_acl(self, db_session): Permissions.ProjectsRead, Permissions.ProjectsUpload, Permissions.ProjectsWrite, + Permissions.APITrustedPublishersManage, ], ), ], diff --git a/tests/unit/test_routes.py b/tests/unit/test_routes.py index 484daabcd8a0..a233827b0849 100644 --- a/tests/unit/test_routes.py +++ b/tests/unit/test_routes.py @@ -625,6 +625,20 @@ def add_redirect_rule(*args, **kwargs): traverse="/{name}", domain=warehouse, ), + pretend.call( + "api.projects.trusted_publishers", + "/danger-api/projects/{name}/trusted-publishers", + factory="warehouse.packaging.models:ProjectFactory", + traverse="/{name}", + domain=warehouse, + ), + pretend.call( + "api.projects.trusted_publisher", + "/danger-api/projects/{name}/trusted-publishers/{publisher_id}", + factory="warehouse.packaging.models:ProjectFactory", + traverse="/{name}", + domain=warehouse, + ), # PEP 740 URLs pretend.call( "integrity.provenance", diff --git a/warehouse/api/oidc_publishers.py b/warehouse/api/oidc_publishers.py new file mode 100644 index 000000000000..3fb1c9635f17 --- /dev/null +++ b/warehouse/api/oidc_publishers.py @@ -0,0 +1,476 @@ +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import http +import typing + +from pyramid.httpexceptions import ( + HTTPBadRequest, + HTTPConflict, + HTTPForbidden, + HTTPNotFound, + HTTPTooManyRequests, +) +from webob.multidict import MultiDict + +from warehouse.admin.flags import AdminFlagValue +from warehouse.authnz import Permissions +from warehouse.email import ( + send_trusted_publisher_added_email, + send_trusted_publisher_removed_email, +) +from warehouse.events.tags import EventTag +from warehouse.metrics import IMetricsService +from warehouse.oidc.forms import ( + ActiveStatePublisherForm, + GitHubPublisherForm, + GitLabPublisherForm, + GooglePublisherForm, +) +from warehouse.oidc.interfaces import TooManyOIDCRegistrations +from warehouse.oidc.models import ( + ActiveStatePublisher, + GitHubPublisher, + GitLabPublisher, + GooglePublisher, + OIDCPublisher, +) +from warehouse.rate_limiting import IRateLimiter + +from .echo import api_v0_view_config + +if typing.TYPE_CHECKING: + from pyramid.request import Request + + from warehouse.packaging.models import Project + + +def _multidict_from_json(data: dict) -> MultiDict: + md = MultiDict() + for key, value in data.items(): + if value is not None: + md.add(key, value if isinstance(value, str) else str(value)) + return md + + +def _check_ratelimits(request: Request) -> None: + user_limiter = request.find_service( + IRateLimiter, name="user_oidc.publisher.register" + ) + ip_limiter = request.find_service( + IRateLimiter, name="ip_oidc.publisher.register" + ) + + if not user_limiter.test(request.user.id): + raise TooManyOIDCRegistrations( + resets_in=user_limiter.resets_in(request.user.id) + ) + if not ip_limiter.test(request.remote_addr): + raise TooManyOIDCRegistrations( + resets_in=ip_limiter.resets_in(request.remote_addr) + ) + + +def _hit_ratelimits(request: Request) -> None: + request.find_service( + IRateLimiter, name="user_oidc.publisher.register" + ).hit(request.user.id) + request.find_service( + IRateLimiter, name="ip_oidc.publisher.register" + ).hit(request.remote_addr) + + +def _publisher_as_dict(publisher: OIDCPublisher) -> dict: + return { + "id": str(publisher.id), + "publisher_name": publisher.publisher_name, + "publisher_url": publisher.publisher_url(), + "specifier": str(publisher), + } + + +@api_v0_view_config( + route_name="api.projects.trusted_publishers", + permission=Permissions.APITrustedPublishersManage, + require_methods=["GET"], +) +def api_get_trusted_publishers(project: Project, request: Request) -> dict: + return { + "trusted_publishers": [ + _publisher_as_dict(p) for p in project.oidc_publishers + ] + } + + +@api_v0_view_config( + route_name="api.projects.trusted_publishers", + permission=Permissions.APITrustedPublishersManage, + require_methods=["POST"], +) +def api_add_trusted_publisher(project: Project, request: Request) -> dict: + metrics = request.find_service(IMetricsService, context=None) + + if request.flags.disallow_oidc(): + raise HTTPForbidden( + json={ + "error": ( + "Trusted publishing is temporarily disabled. " + "See https://pypi.org/help#admin-intervention for details." + ) + } + ) + + data = request.json_body + publisher_type = data.get("publisher", "") + + if publisher_type == "github": + if request.flags.disallow_oidc(AdminFlagValue.DISALLOW_GITHUB_OIDC): + raise HTTPForbidden( + json={ + "error": ( + "GitHub-based trusted publishing is temporarily disabled. " + "See https://pypi.org/help#admin-intervention for details." + ) + } + ) + + metrics.increment( + "warehouse.oidc.add_publisher.attempt", tags=["publisher:GitHub"] + ) + + try: + _check_ratelimits(request) + except TooManyOIDCRegistrations as exc: + metrics.increment( + "warehouse.oidc.add_publisher.ratelimited", tags=["publisher:GitHub"] + ) + raise HTTPTooManyRequests( + json={ + "error": ( + "Too many trusted publisher registrations. Try again later." + ) + }, + headers={ + "Retry-After": str(int(exc.resets_in.total_seconds())) + }, + ) + + _hit_ratelimits(request) + + form = GitHubPublisherForm( + _multidict_from_json(data), + api_token=request.registry.settings.get("github.token"), + ) + + if not form.validate(): + raise HTTPBadRequest(json={"errors": form.errors}) + + publisher = ( + request.db.query(GitHubPublisher) + .filter( + GitHubPublisher.repository_name == form.repository.data, + GitHubPublisher.repository_owner == form.normalized_owner, + GitHubPublisher.workflow_filename == form.workflow_filename.data, + GitHubPublisher.environment == form.normalized_environment, + ) + .one_or_none() + ) + if publisher is None: + publisher = GitHubPublisher( + repository_name=form.repository.data, + repository_owner=form.normalized_owner, + repository_owner_id=form.owner_id, + workflow_filename=form.workflow_filename.data, + environment=form.normalized_environment, + ) + request.db.add(publisher) + + publisher_tag = "GitHub" + + elif publisher_type == "gitlab": + if request.flags.disallow_oidc(AdminFlagValue.DISALLOW_GITLAB_OIDC): + raise HTTPForbidden( + json={ + "error": ( + "GitLab-based trusted publishing is temporarily disabled. " + "See https://pypi.org/help#admin-intervention for details." + ) + } + ) + + metrics.increment( + "warehouse.oidc.add_publisher.attempt", tags=["publisher:GitLab"] + ) + + try: + _check_ratelimits(request) + except TooManyOIDCRegistrations as exc: + metrics.increment( + "warehouse.oidc.add_publisher.ratelimited", tags=["publisher:GitLab"] + ) + raise HTTPTooManyRequests( + json={ + "error": ( + "Too many trusted publisher registrations. Try again later." + ) + }, + headers={ + "Retry-After": str(int(exc.resets_in.total_seconds())) + }, + ) + + _hit_ratelimits(request) + + _gl_issuers = GitLabPublisher.get_available_issuer_urls( + organization=project.organization + ) + form = GitLabPublisherForm( + _multidict_from_json(data), + issuer_url_choices=_gl_issuers, + ) + + if not form.validate(): + raise HTTPBadRequest(json={"errors": form.errors}) + + publisher = ( + request.db.query(GitLabPublisher) + .filter( + GitLabPublisher.namespace == form.namespace.data, + GitLabPublisher.project == form.project.data, + GitLabPublisher.workflow_filepath == form.workflow_filepath.data, + GitLabPublisher.environment == form.normalized_environment, + GitLabPublisher.issuer_url == form.issuer_url.data, + ) + .one_or_none() + ) + if publisher is None: + publisher = GitLabPublisher( + namespace=form.namespace.data, + project=form.project.data, + workflow_filepath=form.workflow_filepath.data, + environment=form.normalized_environment, + issuer_url=form.issuer_url.data, + ) + request.db.add(publisher) + + publisher_tag = "GitLab" + + elif publisher_type == "google": + if request.flags.disallow_oidc(AdminFlagValue.DISALLOW_GOOGLE_OIDC): + raise HTTPForbidden( + json={ + "error": ( + "Google-based trusted publishing is temporarily disabled. " + "See https://pypi.org/help#admin-intervention for details." + ) + } + ) + + metrics.increment( + "warehouse.oidc.add_publisher.attempt", tags=["publisher:Google"] + ) + + try: + _check_ratelimits(request) + except TooManyOIDCRegistrations as exc: + metrics.increment( + "warehouse.oidc.add_publisher.ratelimited", tags=["publisher:Google"] + ) + raise HTTPTooManyRequests( + json={ + "error": ( + "Too many trusted publisher registrations. Try again later." + ) + }, + headers={ + "Retry-After": str(int(exc.resets_in.total_seconds())) + }, + ) + + _hit_ratelimits(request) + + form = GooglePublisherForm(_multidict_from_json(data)) + + if not form.validate(): + raise HTTPBadRequest(json={"errors": form.errors}) + + publisher = ( + request.db.query(GooglePublisher) + .filter( + GooglePublisher.email == form.email.data, + GooglePublisher.sub == form.sub.data, + ) + .one_or_none() + ) + if publisher is None: + publisher = GooglePublisher( + email=form.email.data, + sub=form.sub.data, + ) + request.db.add(publisher) + + publisher_tag = "Google" + + elif publisher_type == "activestate": + if request.flags.disallow_oidc(AdminFlagValue.DISALLOW_ACTIVESTATE_OIDC): + raise HTTPForbidden( + json={ + "error": ( + "ActiveState-based trusted publishing is temporarily disabled. " + "See https://pypi.org/help#admin-intervention for details." + ) + } + ) + + metrics.increment( + "warehouse.oidc.add_publisher.attempt", tags=["publisher:ActiveState"] + ) + + try: + _check_ratelimits(request) + except TooManyOIDCRegistrations as exc: + metrics.increment( + "warehouse.oidc.add_publisher.ratelimited", + tags=["publisher:ActiveState"], + ) + raise HTTPTooManyRequests( + json={ + "error": ( + "Too many trusted publisher registrations. Try again later." + ) + }, + headers={ + "Retry-After": str(int(exc.resets_in.total_seconds())) + }, + ) + + _hit_ratelimits(request) + + form = ActiveStatePublisherForm(_multidict_from_json(data)) + + if not form.validate(): + raise HTTPBadRequest(json={"errors": form.errors}) + + publisher = ( + request.db.query(ActiveStatePublisher) + .filter( + ActiveStatePublisher.organization == form.organization.data, + ActiveStatePublisher.activestate_project_name == form.project.data, + ActiveStatePublisher.actor_id == form.actor_id, + ) + .one_or_none() + ) + if publisher is None: + publisher = ActiveStatePublisher( + organization=form.organization.data, + activestate_project_name=form.project.data, + actor=form.actor.data, + actor_id=form.actor_id, + ) + request.db.add(publisher) + + publisher_tag = "ActiveState" + + else: + raise HTTPBadRequest( + json={"error": f"Unknown publisher type: {publisher_type!r}"} + ) + + if publisher in project.oidc_publishers: + raise HTTPConflict( + json={ + "error": f"{publisher} is already registered with {project.name}" + } + ) + + for user in project.users: + send_trusted_publisher_added_email( + request, + user, + project_name=project.name, + publisher=publisher, + ) + + project.oidc_publishers.append(publisher) + + project.record_event( + tag=EventTag.Project.OIDCPublisherAdded, + request=request, + additional={ + "publisher": publisher.publisher_name, + "id": str(publisher.id), + "specifier": str(publisher), + "url": publisher.publisher_url(), + "submitted_by": request.user.username, + "reified_from_pending_publisher": False, + "constrained_from_existing_publisher": False, + }, + ) + + metrics.increment( + "warehouse.oidc.add_publisher.ok", tags=[f"publisher:{publisher_tag}"] + ) + + request.response.status = http.HTTPStatus.CREATED + return {"trusted_publisher": _publisher_as_dict(publisher)} + + +@api_v0_view_config( + route_name="api.projects.trusted_publisher", + permission=Permissions.APITrustedPublishersManage, + require_methods=["DELETE"], +) +def api_delete_trusted_publisher(project: Project, request: Request) -> dict: + metrics = request.find_service(IMetricsService, context=None) + + if request.flags.disallow_oidc(): + raise HTTPForbidden( + json={ + "error": ( + "Trusted publishing is temporarily disabled. " + "See https://pypi.org/help#admin-intervention for details." + ) + } + ) + + metrics.increment("warehouse.oidc.delete_publisher.attempt") + + publisher_id = request.matchdict.get("publisher_id") + publisher = request.db.get(OIDCPublisher, publisher_id) + + if publisher is None or publisher not in project.oidc_publishers: + return HTTPNotFound( + json={"error": "Publisher not found for this project"} + ) + + for user in project.users: + send_trusted_publisher_removed_email( + request, + user, + project_name=project.name, + publisher=publisher, + ) + + project.record_event( + tag=EventTag.Project.OIDCPublisherRemoved, + request=request, + additional={ + "publisher": publisher.publisher_name, + "id": str(publisher.id), + "specifier": str(publisher), + "url": publisher.publisher_url(), + "submitted_by": request.user.username, + }, + ) + + project.oidc_publishers.remove(publisher) + if len(publisher.projects) == 0: + request.db.delete(publisher) + + metrics.increment( + "warehouse.oidc.delete_publisher.ok", + tags=[f"publisher:{publisher.publisher_name}"], + ) + + return {"message": f"Removed trusted publisher from {project.name}"} diff --git a/warehouse/api/openapi.yaml b/warehouse/api/openapi.yaml index 499155a25a4c..24e71da584bd 100644 --- a/warehouse/api/openapi.yaml +++ b/warehouse/api/openapi.yaml @@ -8,6 +8,113 @@ info: components: schemas: + TrustedPublisher: + type: object + description: A registered trusted publisher for a project + properties: + id: + type: string + format: uuid + example: "7f89a3b2-1234-5678-abcd-ef0123456789" + publisher_name: + type: string + example: "GitHub" + publisher_url: + type: string + nullable: true + example: "https://github.com/myorg/myrepo" + specifier: + type: string + example: "myorg/myrepo/.github/workflows/publish.yml" + TrustedPublisherGitHub: + type: object + description: GitHub Actions trusted publisher configuration + properties: + publisher: + type: string + enum: [github] + owner: + type: string + description: GitHub repository owner (username or organization) + example: "myorg" + repository: + type: string + description: Repository name + example: "myrepo" + workflow_filename: + type: string + description: Workflow filename (must end with .yml or .yaml, no path separators) + example: "publish.yml" + environment: + type: string + description: Optional GitHub Actions environment name + example: "release" + required: [publisher, owner, repository, workflow_filename] + TrustedPublisherGitLab: + type: object + description: GitLab CI trusted publisher configuration + properties: + publisher: + type: string + enum: [gitlab] + namespace: + type: string + description: GitLab namespace (username or group/subgroup) + example: "mygroup" + project: + type: string + description: GitLab project name + example: "myrepo" + workflow_filepath: + type: string + description: Top-level pipeline file path (must end with .yml or .yaml) + example: ".gitlab-ci.yml" + environment: + type: string + description: Optional GitLab environment name + example: "production" + issuer_url: + type: string + description: GitLab issuer URL (defaults to https://gitlab.com) + example: "https://gitlab.com" + required: [publisher, namespace, project, workflow_filepath] + TrustedPublisherGoogle: + type: object + description: Google Cloud trusted publisher configuration + properties: + publisher: + type: string + enum: [google] + email: + type: string + format: email + description: Google service account email + example: "my-sa@my-project.iam.gserviceaccount.com" + sub: + type: string + description: Optional subject claim + example: "123456789" + required: [publisher, email] + TrustedPublisherActiveState: + type: object + description: ActiveState Platform trusted publisher configuration + properties: + publisher: + type: string + enum: [activestate] + organization: + type: string + description: ActiveState organization name + example: "myorg" + project: + type: string + description: ActiveState project name + example: "myproject" + actor: + type: string + description: ActiveState actor username + example: "myuser" + required: [publisher, organization, project, actor] Observation: type: object description: A generic Observation object shape @@ -71,6 +178,125 @@ paths: example: { "username": "someuser" } '400': description: Bad request + /danger-api/projects/{name}/trusted-publishers: + get: + summary: List trusted publishers for a project + description: List all trusted publishers registered to a project. + operationId: listTrustedPublishers + tags: + - danger-api + parameters: + - name: name + in: path + description: The name of the Project + required: true + schema: + type: string + example: "project-name" + responses: + '200': + description: List of trusted publishers + content: + application/vnd.pypi.api-v0-danger+json: + schema: + type: object + properties: + trusted_publishers: + type: array + items: + $ref: '#/components/schemas/TrustedPublisher' + '403': + description: Forbidden + '404': + description: Project not found + post: + summary: Add a trusted publisher to a project + description: | + Add a new trusted publisher to a project. Requires a project-owner API token. + + The `publisher` field selects the provider; remaining fields are provider-specific. + operationId: addTrustedPublisher + tags: + - danger-api + parameters: + - name: name + in: path + description: The name of the Project + required: true + schema: + type: string + example: "project-name" + requestBody: + description: Trusted publisher configuration + required: true + content: + application/vnd.pypi.api-v0-danger+json: + schema: + oneOf: + - $ref: '#/components/schemas/TrustedPublisherGitHub' + - $ref: '#/components/schemas/TrustedPublisherGitLab' + - $ref: '#/components/schemas/TrustedPublisherGoogle' + - $ref: '#/components/schemas/TrustedPublisherActiveState' + discriminator: + propertyName: publisher + responses: + '201': + description: Trusted publisher added + content: + application/vnd.pypi.api-v0-danger+json: + schema: + type: object + properties: + trusted_publisher: + $ref: '#/components/schemas/TrustedPublisher' + '400': + description: Bad request (validation error) + '403': + description: Forbidden + '404': + description: Project not found + '409': + description: Publisher already registered to this project + '429': + description: Rate limit exceeded + /danger-api/projects/{name}/trusted-publishers/{publisher_id}: + delete: + summary: Remove a trusted publisher from a project + description: Remove a trusted publisher from a project by its ID. + operationId: deleteTrustedPublisher + tags: + - danger-api + parameters: + - name: name + in: path + description: The name of the Project + required: true + schema: + type: string + example: "project-name" + - name: publisher_id + in: path + description: The UUID of the trusted publisher + required: true + schema: + type: string + format: uuid + example: "7f89a3b2-1234-5678-abcd-ef0123456789" + responses: + '200': + description: Trusted publisher removed + content: + application/vnd.pypi.api-v0-danger+json: + schema: + type: object + properties: + message: + type: string + example: "Removed trusted publisher from project-name" + '403': + description: Forbidden + '404': + description: Publisher not found for this project /danger-api/projects/{name}/observations: post: summary: Submit a project observation diff --git a/warehouse/authnz/_permissions.py b/warehouse/authnz/_permissions.py index b7241d267f86..65fc095d6844 100644 --- a/warehouse/authnz/_permissions.py +++ b/warehouse/authnz/_permissions.py @@ -87,6 +87,7 @@ class Permissions(StrEnum): # API Permissions APIEcho = "api:echo" APIObservationsAdd = "api:observations:add" + APITrustedPublishersManage = "api:trusted-publishers:manage" # User Permissions Account2FA = "account:2fa" diff --git a/warehouse/packaging/models.py b/warehouse/packaging/models.py index 4b3d1f793c5d..ffd13a29389b 100644 --- a/warehouse/packaging/models.py +++ b/warehouse/packaging/models.py @@ -405,6 +405,7 @@ def __acl__(self): Permissions.ProjectsRead, Permissions.ProjectsUpload, Permissions.ProjectsWrite, + Permissions.APITrustedPublishersManage, ] else: current_permissions = [Permissions.ProjectsUpload] diff --git a/warehouse/routes.py b/warehouse/routes.py index b47317d977bc..8808e142f946 100644 --- a/warehouse/routes.py +++ b/warehouse/routes.py @@ -644,6 +644,20 @@ def includeme(config): traverse="/{name}", domain=warehouse, ) + config.add_route( + "api.projects.trusted_publishers", + "/danger-api/projects/{name}/trusted-publishers", + factory="warehouse.packaging.models:ProjectFactory", + traverse="/{name}", + domain=warehouse, + ) + config.add_route( + "api.projects.trusted_publisher", + "/danger-api/projects/{name}/trusted-publishers/{publisher_id}", + factory="warehouse.packaging.models:ProjectFactory", + traverse="/{name}", + domain=warehouse, + ) # PEP 740 URLs config.add_route(