From 530edc626f40bfaff1cdaad55e242a9b9208f302 Mon Sep 17 00:00:00 2001 From: "Kamil Mankowski (kam193)" Date: Fri, 1 May 2026 21:05:56 +0200 Subject: [PATCH 1/2] feat: prevent uploading PyPI tokens in common places Minimal implementaiton detecting PyPI API tokens in the commonly used places using the Yara scanner. --- tests/unit/utils/test_scanner.py | 92 +++++++++++++++++++- warehouse/utils/scanner.py | 30 +++++-- warehouse/utils/scanner_rules/pypi_token.yar | 24 +++++ 3 files changed, 135 insertions(+), 11 deletions(-) create mode 100644 warehouse/utils/scanner_rules/pypi_token.yar diff --git a/tests/unit/utils/test_scanner.py b/tests/unit/utils/test_scanner.py index e683f4feac64..55d7387d8dc8 100644 --- a/tests/unit/utils/test_scanner.py +++ b/tests/unit/utils/test_scanner.py @@ -5,9 +5,13 @@ import tarfile import zipfile +from uuid import uuid4 + +import pymacaroons import pytest import yara_x +from warehouse.macaroons import caveats from warehouse.utils import scanner @@ -18,6 +22,41 @@ def rules(): return compiled +def _generate_token(domain="pypi.org", projects_scope=False): + raw_macaroon = pymacaroons.Macaroon( + location=domain, + identifier=str(uuid4()), + key=b"fake key", + version=pymacaroons.MACAROON_V2, + ) + + if projects_scope: + caveats_ = [caveats.ProjectID(project_ids=[str(uuid4()) for _ in range(3)])] + else: + caveats_ = [ + caveats.ProjectName(normalized_names=[f"project-{i}" for i in range(3)]), + caveats.RequestUser(user_id=str(uuid4())), + ] + for caveat in caveats_: + raw_macaroon.add_first_party_caveat(caveats.serialize(caveat)) + + return f"pypi-{raw_macaroon.serialize()}" + + +@pytest.fixture( + scope="module", params=[False, True], ids=["user-scope", "projects-scope"] +) +def pypi_token(request): + return _generate_token(domain="pypi.org", projects_scope=request.param) + + +@pytest.fixture( + scope="module", params=[False, True], ids=["user-scope", "projects-scope"] +) +def localhost_token(request): + return _generate_token(domain="localhost", projects_scope=request.param) + + def _make_wheel(tmp_path, files_dict, name="fake_package", version="1.0"): whl_path = str(tmp_path / f"{name}-{version}-py3-none-any.whl") with zipfile.ZipFile(whl_path, "w") as zfp: @@ -248,22 +287,22 @@ def test_clean_archive_no_matches(self, tmp_path, rules): ) assert scanner.scan_archive(whl, rules=rules) == [] - def test_skips_non_python_files_in_wheel(self, tmp_path, rules): + def test_skips_excluded_files_in_wheel(self, tmp_path, rules): whl = _make_wheel( tmp_path, { "pkg/data.json": "__pyarmor__(__name__, __file__, b'x')", - "pkg/readme.txt": "__pyarmor_enter__()", + "pkg/module.so": b"__pyarmor_enter__()", }, ) assert scanner.scan_archive(whl, rules=rules) == [] - def test_skips_non_python_files_in_tarball(self, tmp_path, rules): + def test_skips_excluded_files_in_tarball(self, tmp_path, rules): tar = _make_tarball( tmp_path, { "fake-1.0/data.json": "__pyarmor__(__name__, __file__, b'x')", - "fake-1.0/readme.txt": "__pyarmor_enter__()", + "fake-1.0/module.so": b"__pyarmor_enter__()", }, ) assert scanner.scan_archive(tar, rules=rules) == [] @@ -396,3 +435,48 @@ def test_spoofed_file_size_does_not_bypass_scan(self, tmp_path, rules): assert len(matches) == 1 assert matches[0][0] == "pkg/__init__.py" assert "pyarmor_encrypted" in matches[0][1] + + +_FILENAMES_TO_SCAN = [ + "setup.py", + "README.md", + "PUBLISHING.RST", + "publish.sh", + "info.txt", + ".env", +] + + +class TestPyPITokenDetection: + # TODO: separated METADATA/PKG-INFO tests with correct paths + @pytest.mark.parametrize("filename", [*_FILENAMES_TO_SCAN, "METADATA"]) + def test_detects_pypi_token_in_wheel(self, tmp_path, rules, pypi_token, filename): + whl = _make_wheel(tmp_path, {f"pkg/{filename}": pypi_token}) + matches = scanner.scan_archive(whl, rules=rules) + assert len(matches) == 1 + assert matches[0][0] == f"pkg/{filename}" + assert "secrets_pypi_token" in matches[0][1] + + @pytest.mark.parametrize("filename", [*_FILENAMES_TO_SCAN, "PKG-INFO"]) + def test_detects_pypi_token_in_tarball(self, tmp_path, rules, pypi_token, filename): + tar = _make_tarball(tmp_path, {f"fake-1.0/pkg/{filename}": pypi_token}) + matches = scanner.scan_archive(tar, rules=rules) + assert len(matches) == 1 + assert matches[0][0] == f"fake-1.0/pkg/{filename}" + assert "secrets_pypi_token" in matches[0][1] + + @pytest.mark.parametrize("filename", [*_FILENAMES_TO_SCAN, "METADATA"]) + def test_ignores_localhost_token_in_wheel( + self, tmp_path, rules, localhost_token, filename + ): + whl = _make_wheel(tmp_path, {f"pkg/{filename}": localhost_token}) + matches = scanner.scan_archive(whl, rules=rules) + assert len(matches) == 0 + + @pytest.mark.parametrize("filename", [*_FILENAMES_TO_SCAN, "PKG-INFO"]) + def test_ignores_localhost_token_in_tarball( + self, tmp_path, rules, localhost_token, filename + ): + tar = _make_tarball(tmp_path, {f"fake-1.0/pkg/{filename}": localhost_token}) + matches = scanner.scan_archive(tar, rules=rules) + assert len(matches) == 0 diff --git a/warehouse/utils/scanner.py b/warehouse/utils/scanner.py index 71a6e3d7b734..6b56d7d4ac4c 100644 --- a/warehouse/utils/scanner.py +++ b/warehouse/utils/scanner.py @@ -16,9 +16,21 @@ # YARA rules directory _RULES_DIR = Path(__file__).parent / "scanner_rules" -# Extensions to scan inside archives. Python source (.py) for source-level -# rules (e.g. pyarmor), and .pye for SourceDefender-encrypted files. -_SCAN_EXTENSIONS = {".py", ".pye"} +# Extensions to scan inside archives. +_SCAN_EXTENSIONS = { + # Python source for source-level rules (e.g. pyarmor) + ".py", + # .pye for SourceDefender-encrypted files + ".pye", + # Different textual files for common places where PyPI tokens are accidentally left. + ".md", + ".rst", + ".env", + ".sh", + ".txt", + "METADATA", + "PKG-INFO", +} # Max size of individual file to scan inside archive (5 MiB) _SCAN_MAX_FILE_SIZE = 5 * 1024 * 1024 @@ -78,8 +90,10 @@ def iter_zip_members(zfp: zipfile.ZipFile) -> typing.Iterator[tuple[str, int, by for entry in zfp.infolist(): if entry.is_dir(): continue - ext = Path(entry.filename).suffix.lower() - if ext not in _SCAN_EXTENSIONS: + path = Path(entry.filename) + ext = path.suffix.lower() + # Names like "METADATA", ".env" have empty suffix + if ext not in _SCAN_EXTENSIONS and path.name not in _SCAN_EXTENSIONS: continue data = zfp.read(entry.filename) yield entry.filename, len(data), data @@ -90,8 +104,10 @@ def iter_tar_members(tar: tarfile.TarFile) -> typing.Iterator[tuple[str, int, by for member in tar.getmembers(): if not member.isfile(): continue - ext = Path(member.name).suffix.lower() - if ext not in _SCAN_EXTENSIONS: + path = Path(member.name) + ext = path.suffix.lower() + # Names like "PKG-INFO", ".env" have empty suffix + if ext not in _SCAN_EXTENSIONS and path.name not in _SCAN_EXTENSIONS: continue f = tar.extractfile(member) if f is None: # pragma: no cover diff --git a/warehouse/utils/scanner_rules/pypi_token.yar b/warehouse/utils/scanner_rules/pypi_token.yar new file mode 100644 index 000000000000..43058d5b6f58 --- /dev/null +++ b/warehouse/utils/scanner_rules/pypi_token.yar @@ -0,0 +1,24 @@ +rule secrets_pypi_token +{ + meta: + description = "Detects PyPI API tokens exposed in source code." + author = "Kamil Mankowski" + message = "We have detected a PyPI API token exposed in the uploaded file. Publishing it would allow anyone to perform actions on your behalf. For your own security, please revoke the token immediately." + + strings: + // Regex adapted from trufflehog's PyPI token detector + // Intentionally not derived from the official Token format definition to spare unnecessary matches. + // Pre-computed head ensures we match actual pypi.org tokens + // https://github.com/trufflesecurity/trufflehog/blob/main/pkg/detectors/pypi/pypi.go + $pypi_token = /pypi-AgEIcHlwaS5vcmcCJ[a-zA-Z0-9-_]{150,157}/ + + // TODO: look if there are test tokens in use we should exclude + // $test_token = "pypi-AgEIcHlwaS5vcmcCJxxx" + + condition: + $pypi_token + // If we want to allow some test-only tokens, we can use: + // and for all i in (1 .. #pypi_token) : ( + // not $test_token at @pypi_token[i] + // ) +} \ No newline at end of file From e210ba30f4266051113938fe88d3ad380155f8aa Mon Sep 17 00:00:00 2001 From: "Kamil Mankowski (kam193)" Date: Sun, 3 May 2026 23:57:07 +0200 Subject: [PATCH 2/2] add tests for sample tokens, new extensions, better message --- tests/unit/utils/test_scanner.py | 33 ++++++++++++++++++-- warehouse/utils/scanner.py | 10 ++++-- warehouse/utils/scanner_rules/pypi_token.yar | 4 +-- 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/tests/unit/utils/test_scanner.py b/tests/unit/utils/test_scanner.py index 55d7387d8dc8..de3cd4deb790 100644 --- a/tests/unit/utils/test_scanner.py +++ b/tests/unit/utils/test_scanner.py @@ -444,12 +444,15 @@ def test_spoofed_file_size_does_not_bypass_scan(self, tmp_path, rules): "publish.sh", "info.txt", ".env", + ".pypirc", + "pyproject.toml", + "upload.cfg", + "upload.conf", ] class TestPyPITokenDetection: - # TODO: separated METADATA/PKG-INFO tests with correct paths - @pytest.mark.parametrize("filename", [*_FILENAMES_TO_SCAN, "METADATA"]) + @pytest.mark.parametrize("filename", _FILENAMES_TO_SCAN) def test_detects_pypi_token_in_wheel(self, tmp_path, rules, pypi_token, filename): whl = _make_wheel(tmp_path, {f"pkg/{filename}": pypi_token}) matches = scanner.scan_archive(whl, rules=rules) @@ -465,6 +468,14 @@ def test_detects_pypi_token_in_tarball(self, tmp_path, rules, pypi_token, filena assert matches[0][0] == f"fake-1.0/pkg/{filename}" assert "secrets_pypi_token" in matches[0][1] + def test_detection_in_wheel_metadata(self, tmp_path, rules, pypi_token): + """Case separated to prevent regressions by disabling .dist-info scanning.""" + whl = _make_wheel(tmp_path, {"fake_package-1.0.dist-info/METADATA": pypi_token}) + matches = scanner.scan_archive(whl, rules=rules) + assert len(matches) == 1 + assert matches[0][0] == "fake_package-1.0.dist-info/METADATA" + assert "secrets_pypi_token" in matches[0][1] + @pytest.mark.parametrize("filename", [*_FILENAMES_TO_SCAN, "METADATA"]) def test_ignores_localhost_token_in_wheel( self, tmp_path, rules, localhost_token, filename @@ -480,3 +491,21 @@ def test_ignores_localhost_token_in_tarball( tar = _make_tarball(tmp_path, {f"fake-1.0/pkg/{filename}": localhost_token}) matches = scanner.scan_archive(tar, rules=rules) assert len(matches) == 0 + + @pytest.mark.parametrize("filename", _FILENAMES_TO_SCAN) + @pytest.mark.parametrize( + "token", + [ + "pypi-AgEIcHlwaS5vcmcCJDxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", + "pypi-AgEIcHlwaS5vcmcCJ...", + ], + ) + @pytest.mark.parametrize( + "dist_factory", [_make_tarball, _make_wheel], ids=["tarball", "wheel"] + ) + def test_ignores_sample_tokens( + self, tmp_path, rules, token, filename, dist_factory + ): + dist = dist_factory(tmp_path, {f"pkg/{filename}": token}) + matches = scanner.scan_archive(dist, rules=rules) + assert len(matches) == 0 diff --git a/warehouse/utils/scanner.py b/warehouse/utils/scanner.py index 6b56d7d4ac4c..c6e1aa3c9498 100644 --- a/warehouse/utils/scanner.py +++ b/warehouse/utils/scanner.py @@ -17,7 +17,7 @@ _RULES_DIR = Path(__file__).parent / "scanner_rules" # Extensions to scan inside archives. -_SCAN_EXTENSIONS = { +_SCAN_TARGETS = { # Python source for source-level rules (e.g. pyarmor) ".py", # .pye for SourceDefender-encrypted files @@ -28,6 +28,10 @@ ".env", ".sh", ".txt", + ".pypirc", + ".toml", + ".cfg", + ".conf", "METADATA", "PKG-INFO", } @@ -93,7 +97,7 @@ def iter_zip_members(zfp: zipfile.ZipFile) -> typing.Iterator[tuple[str, int, by path = Path(entry.filename) ext = path.suffix.lower() # Names like "METADATA", ".env" have empty suffix - if ext not in _SCAN_EXTENSIONS and path.name not in _SCAN_EXTENSIONS: + if ext not in _SCAN_TARGETS and path.name not in _SCAN_TARGETS: continue data = zfp.read(entry.filename) yield entry.filename, len(data), data @@ -107,7 +111,7 @@ def iter_tar_members(tar: tarfile.TarFile) -> typing.Iterator[tuple[str, int, by path = Path(member.name) ext = path.suffix.lower() # Names like "PKG-INFO", ".env" have empty suffix - if ext not in _SCAN_EXTENSIONS and path.name not in _SCAN_EXTENSIONS: + if ext not in _SCAN_TARGETS and path.name not in _SCAN_TARGETS: continue f = tar.extractfile(member) if f is None: # pragma: no cover diff --git a/warehouse/utils/scanner_rules/pypi_token.yar b/warehouse/utils/scanner_rules/pypi_token.yar index 43058d5b6f58..2e0cf9ccf963 100644 --- a/warehouse/utils/scanner_rules/pypi_token.yar +++ b/warehouse/utils/scanner_rules/pypi_token.yar @@ -3,7 +3,7 @@ rule secrets_pypi_token meta: description = "Detects PyPI API tokens exposed in source code." author = "Kamil Mankowski" - message = "We have detected a PyPI API token exposed in the uploaded file. Publishing it would allow anyone to perform actions on your behalf. For your own security, please revoke the token immediately." + message = "We have detected a PyPI API token exposed in the uploaded file. Publishing it would allow anyone to perform actions on your behalf. For your own security, please revoke the token immediately. See https://pypi.org/help/#compromised-token for additional help." strings: // Regex adapted from trufflehog's PyPI token detector @@ -21,4 +21,4 @@ rule secrets_pypi_token // and for all i in (1 .. #pypi_token) : ( // not $test_token at @pypi_token[i] // ) -} \ No newline at end of file +}