Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 117 additions & 4 deletions tests/unit/utils/test_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
import tarfile
import zipfile

from uuid import uuid4

import pymacaroons
import pytest
import yara_x

from warehouse.macaroons import caveats
from warehouse.utils import scanner


Expand All @@ -18,6 +22,41 @@ def rules():
return compiled


def _generate_token(domain="pypi.org", projects_scope=False):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can imagine a future factory-style generator to create tokens for testing, so this paves that future nicely.

raw_macaroon = pymacaroons.Macaroon(
location=domain,
identifier=str(uuid4()),
key=b"fake key",
version=pymacaroons.MACAROON_V2,
)

if projects_scope:
caveats_ = [caveats.ProjectID(project_ids=[str(uuid4()) for _ in range(3)])]
else:
caveats_ = [
caveats.ProjectName(normalized_names=[f"project-{i}" for i in range(3)]),
caveats.RequestUser(user_id=str(uuid4())),
]
for caveat in caveats_:
raw_macaroon.add_first_party_caveat(caveats.serialize(caveat))

return f"pypi-{raw_macaroon.serialize()}"


@pytest.fixture(
scope="module", params=[False, True], ids=["user-scope", "projects-scope"]
)
def pypi_token(request):
return _generate_token(domain="pypi.org", projects_scope=request.param)


@pytest.fixture(
scope="module", params=[False, True], ids=["user-scope", "projects-scope"]
)
def localhost_token(request):
return _generate_token(domain="localhost", projects_scope=request.param)


def _make_wheel(tmp_path, files_dict, name="fake_package", version="1.0"):
whl_path = str(tmp_path / f"{name}-{version}-py3-none-any.whl")
with zipfile.ZipFile(whl_path, "w") as zfp:
Expand Down Expand Up @@ -248,22 +287,22 @@ def test_clean_archive_no_matches(self, tmp_path, rules):
)
assert scanner.scan_archive(whl, rules=rules) == []

def test_skips_non_python_files_in_wheel(self, tmp_path, rules):
def test_skips_excluded_files_in_wheel(self, tmp_path, rules):
whl = _make_wheel(
tmp_path,
{
"pkg/data.json": "__pyarmor__(__name__, __file__, b'x')",
"pkg/readme.txt": "__pyarmor_enter__()",
"pkg/module.so": b"__pyarmor_enter__()",
},
)
assert scanner.scan_archive(whl, rules=rules) == []

def test_skips_non_python_files_in_tarball(self, tmp_path, rules):
def test_skips_excluded_files_in_tarball(self, tmp_path, rules):
tar = _make_tarball(
tmp_path,
{
"fake-1.0/data.json": "__pyarmor__(__name__, __file__, b'x')",
"fake-1.0/readme.txt": "__pyarmor_enter__()",
"fake-1.0/module.so": b"__pyarmor_enter__()",
},
)
assert scanner.scan_archive(tar, rules=rules) == []
Expand Down Expand Up @@ -396,3 +435,77 @@ def test_spoofed_file_size_does_not_bypass_scan(self, tmp_path, rules):
assert len(matches) == 1
assert matches[0][0] == "pkg/__init__.py"
assert "pyarmor_encrypted" in matches[0][1]


_FILENAMES_TO_SCAN = [
"setup.py",
"README.md",
"PUBLISHING.RST",
"publish.sh",
"info.txt",
".env",
".pypirc",
"pyproject.toml",
"upload.cfg",
"upload.conf",
]


class TestPyPITokenDetection:
@pytest.mark.parametrize("filename", _FILENAMES_TO_SCAN)
def test_detects_pypi_token_in_wheel(self, tmp_path, rules, pypi_token, filename):
whl = _make_wheel(tmp_path, {f"pkg/{filename}": pypi_token})
matches = scanner.scan_archive(whl, rules=rules)
assert len(matches) == 1
assert matches[0][0] == f"pkg/{filename}"
assert "secrets_pypi_token" in matches[0][1]

@pytest.mark.parametrize("filename", [*_FILENAMES_TO_SCAN, "PKG-INFO"])
def test_detects_pypi_token_in_tarball(self, tmp_path, rules, pypi_token, filename):
tar = _make_tarball(tmp_path, {f"fake-1.0/pkg/{filename}": pypi_token})
matches = scanner.scan_archive(tar, rules=rules)
assert len(matches) == 1
assert matches[0][0] == f"fake-1.0/pkg/{filename}"
assert "secrets_pypi_token" in matches[0][1]

def test_detection_in_wheel_metadata(self, tmp_path, rules, pypi_token):
"""Case separated to prevent regressions by disabling .dist-info scanning."""
whl = _make_wheel(tmp_path, {"fake_package-1.0.dist-info/METADATA": pypi_token})
matches = scanner.scan_archive(whl, rules=rules)
assert len(matches) == 1
assert matches[0][0] == "fake_package-1.0.dist-info/METADATA"
assert "secrets_pypi_token" in matches[0][1]

@pytest.mark.parametrize("filename", [*_FILENAMES_TO_SCAN, "METADATA"])
def test_ignores_localhost_token_in_wheel(
self, tmp_path, rules, localhost_token, filename
):
whl = _make_wheel(tmp_path, {f"pkg/{filename}": localhost_token})
matches = scanner.scan_archive(whl, rules=rules)
assert len(matches) == 0

@pytest.mark.parametrize("filename", [*_FILENAMES_TO_SCAN, "PKG-INFO"])
def test_ignores_localhost_token_in_tarball(
self, tmp_path, rules, localhost_token, filename
):
tar = _make_tarball(tmp_path, {f"fake-1.0/pkg/{filename}": localhost_token})
matches = scanner.scan_archive(tar, rules=rules)
assert len(matches) == 0

@pytest.mark.parametrize("filename", _FILENAMES_TO_SCAN)
@pytest.mark.parametrize(
"token",
[
"pypi-AgEIcHlwaS5vcmcCJDxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"pypi-AgEIcHlwaS5vcmcCJ...",
],
)
@pytest.mark.parametrize(
"dist_factory", [_make_tarball, _make_wheel], ids=["tarball", "wheel"]
)
def test_ignores_sample_tokens(
self, tmp_path, rules, token, filename, dist_factory
):
dist = dist_factory(tmp_path, {f"pkg/{filename}": token})
matches = scanner.scan_archive(dist, rules=rules)
assert len(matches) == 0
34 changes: 27 additions & 7 deletions warehouse/utils/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,25 @@
# YARA rules directory
_RULES_DIR = Path(__file__).parent / "scanner_rules"

# Extensions to scan inside archives. Python source (.py) for source-level
# rules (e.g. pyarmor), and .pye for SourceDefender-encrypted files.
_SCAN_EXTENSIONS = {".py", ".pye"}
# Extensions to scan inside archives.
_SCAN_TARGETS = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: there might be a better way to express these targets? Maybe as globs, to make it clearer which ones are filenames versus suffixes?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about globs (also to cover your second comment), but I decided it would be better to leave it for a separate change to better research what would be the best way for that. My personal preference would be scan every file under a size limit, but I think it requires more considerations

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks. I'll defer to whatever others think makes sense for an initial merge here.

# Python source for source-level rules (e.g. pyarmor)
".py",
# .pye for SourceDefender-encrypted files
".pye",
# Different textual files for common places where PyPI tokens are accidentally left.
".md",
".rst",
".env",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it might be good to match anything with *env* in the filename (including suffixes), since .env.prod, .prod.env, env.py, etc. are all conceivable.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - see the previous comment. As a bit of context, the current list is based on where I saw tokens recently.

".sh",
".txt",
".pypirc",
".toml",
".cfg",
".conf",
"METADATA",
"PKG-INFO",
}

# Max size of individual file to scan inside archive (5 MiB)
_SCAN_MAX_FILE_SIZE = 5 * 1024 * 1024
Expand Down Expand Up @@ -78,8 +94,10 @@ def iter_zip_members(zfp: zipfile.ZipFile) -> typing.Iterator[tuple[str, int, by
for entry in zfp.infolist():
if entry.is_dir():
continue
ext = Path(entry.filename).suffix.lower()
if ext not in _SCAN_EXTENSIONS:
path = Path(entry.filename)
ext = path.suffix.lower()
# Names like "METADATA", ".env" have empty suffix
if ext not in _SCAN_TARGETS and path.name not in _SCAN_TARGETS:
continue
data = zfp.read(entry.filename)
yield entry.filename, len(data), data
Expand All @@ -90,8 +108,10 @@ def iter_tar_members(tar: tarfile.TarFile) -> typing.Iterator[tuple[str, int, by
for member in tar.getmembers():
if not member.isfile():
continue
ext = Path(member.name).suffix.lower()
if ext not in _SCAN_EXTENSIONS:
path = Path(member.name)
ext = path.suffix.lower()
# Names like "PKG-INFO", ".env" have empty suffix
if ext not in _SCAN_TARGETS and path.name not in _SCAN_TARGETS:
continue
f = tar.extractfile(member)
if f is None: # pragma: no cover
Expand Down
24 changes: 24 additions & 0 deletions warehouse/utils/scanner_rules/pypi_token.yar
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
rule secrets_pypi_token
{
meta:
description = "Detects PyPI API tokens exposed in source code."
author = "Kamil Mankowski"
message = "We have detected a PyPI API token exposed in the uploaded file. Publishing it would allow anyone to perform actions on your behalf. For your own security, please revoke the token immediately. See https://pypi.org/help/#compromised-token for additional help."

strings:
// Regex adapted from trufflehog's PyPI token detector
// Intentionally not derived from the official Token format definition to spare unnecessary matches.
// Pre-computed head ensures we match actual pypi.org tokens
// https://github.com/trufflesecurity/trufflehog/blob/main/pkg/detectors/pypi/pypi.go
$pypi_token = /pypi-AgEIcHlwaS5vcmcCJ[a-zA-Z0-9-_]{150,157}/

// TODO: look if there are test tokens in use we should exclude
// $test_token = "pypi-AgEIcHlwaS5vcmcCJxxx"

condition:
$pypi_token
// If we want to allow some test-only tokens, we can use:
// and for all i in (1 .. #pypi_token) : (
// not $test_token at @pypi_token[i]
// )
}