Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 109 additions & 35 deletions tests/unit/forklift/test_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,78 @@
import pretend
import pytest

from pyramid.httpexceptions import HTTPBadRequest, HTTPForbidden
from pyramid.testing import DummyRequest
from webob.multidict import MultiDict

from warehouse.admin.flags import AdminFlagValue
from warehouse.forklift import decorators


class TestUploadMetrics:
@pytest.mark.parametrize("filetype", [None, "bdist_foo"])
def test_successful_view(self, filetype):
m = pretend.stub(increment=pretend.call_recorder(lambda n, tags=None: None))
request = pretend.stub(metrics=m, POST={"filetype": filetype})
response = pretend.stub()

@decorators.upload_metrics
def wrapped(context, request):
return response

tags = {f"filetype:{filetype}"} if filetype else set()

assert wrapped(pretend.stub(), request) is response
assert request.metrics.increment.calls == [
pretend.call("warehouse.upload.attempt", tags=tags),
pretend.call("warehouse.upload.ok", tags=tags),
]

@pytest.mark.parametrize("filetype", [None, "bdist_foo"])
def test_forklift_error(self, filetype):
m = pretend.stub(increment=pretend.call_recorder(lambda n, tags=None: None))
request = pretend.stub(metrics=m, POST={"filetype": filetype})

@decorators.upload_metrics
def wrapped(context, request):
raise decorators.InvalidTupleFieldError(field="foo")

with pytest.raises(decorators.InvalidTupleFieldError):
wrapped(pretend.stub(), request)

tags = {f"filetype:{filetype}"} if filetype else set()

assert request.metrics.increment.calls == [
pretend.call("warehouse.upload.attempt", tags=tags),
pretend.call(
"warehouse.upload.failed", tags={"reason:field-is-tuple", "field:foo"}
),
]

@pytest.mark.parametrize("filetype", [None, "bdist_foo"])
def test_unknown_error(self, filetype):
m = pretend.stub(increment=pretend.call_recorder(lambda n, tags=None: None))
request = pretend.stub(metrics=m, POST={"filetype": filetype})

class SomeError(Exception):
pass

@decorators.upload_metrics
def wrapped(context, request):
raise SomeError("blah")

with pytest.raises(SomeError):
wrapped(pretend.stub(), request)

tags = {f"filetype:{filetype}"} if filetype else set()

assert request.metrics.increment.calls == [
pretend.call("warehouse.upload.attempt", tags=tags),
pretend.call(
"warehouse.upload.failed", tags={"reason:unknown-error"} | tags
),
]


class TestSanitizeRequest:
def test_removes_unknowns(self):
req = DummyRequest(
Expand All @@ -21,20 +85,34 @@ def test_removes_unknowns(self):
"foo": "UNKNOWN",
"bar": " UNKNOWN ",
"real": "value",
"content": cgi.FieldStorage(
headers={"content-type": "application/octet-stream"},
),
}
)
)
resp = pretend.stub()

@decorators.sanitize
def wrapped(context, request):
# Remove the content field as it doesn't compare correctly.
del request.POST["content"]
assert MultiDict({"real": "value"}) == request.POST
return resp

assert wrapped(pretend.stub(), req) is resp

def test_escapes_nul_characters(self):
req = DummyRequest(post=MultiDict({"summary": "I want to go to the \x00"}))
req = DummyRequest(
post=MultiDict(
{
"summary": "I want to go to the \x00",
"content": cgi.FieldStorage(
headers={"content-type": "application/octet-stream"},
),
}
)
)
resp = pretend.stub()

@decorators.sanitize
Expand All @@ -46,18 +124,36 @@ def wrapped(context, request):

def test_fails_with_fieldstorage(self):
req = DummyRequest(post=MultiDict({"keywords": cgi.FieldStorage()}))
req.metrics = pretend.stub(increment=lambda key, tags: None)

@decorators.sanitize
def wrapped(context, request):
pytest.fail("wrapped view should not have been called")

with pytest.raises(HTTPBadRequest) as excinfo:
with pytest.raises(decorators.InvalidTupleFieldError) as excinfo:
wrapped(pretend.stub(), req)

resp = excinfo.value
assert resp.status_code == 400
assert resp.status == "400 keywords: Should not be a tuple."
assert excinfo.value.values == {"field": "keywords"}

def test_fails_without_file(self):
req = DummyRequest(post=MultiDict({}))

@decorators.sanitize
def wrapped(context, request):
pytest.fail("wrapped view should not have been called")

with pytest.raises(decorators.NoFileUploadError):
wrapped(pretend.stub(), req)

@pytest.mark.parametrize("content_type", [None, "image/foobar"])
def test_fails_invalid_content_type(self, content_type):
req = DummyRequest(post=MultiDict({"content": pretend.stub(type=content_type)}))

@decorators.sanitize
def wrapped(context, request):
pytest.fail("wrapped view should not have been called")

with pytest.raises(decorators.InvalidContentTypeError):
wrapped(pretend.stub(), req)


class TestEnsureUploadsAllowed:
Expand Down Expand Up @@ -90,58 +186,36 @@ def wrapped(context, request):
assert wrapped(pretend.stub(), req) is resp

@pytest.mark.parametrize(
("flag", "error", "help_url"),
("flag", "error_type"),
[
(
AdminFlagValue.READ_ONLY,
"Read-only mode: Uploads are temporarily disabled.",
"",
decorators.ReadOnlyError,
),
(
AdminFlagValue.DISALLOW_NEW_UPLOAD,
(
"New uploads are temporarily disabled. "
"See /help/url/ for more information."
),
"/help/url/",
decorators.UploadsDisabledError,
),
],
)
def test_disallowed_with_admin_flags(self, flag, error, help_url):
def test_disallowed_with_admin_flags(self, flag, error_type):
req = DummyRequest()
req.flags = pretend.stub(enabled=lambda f: f is flag)
req.help_url = lambda *a, **k: help_url
req.metrics = pretend.stub(increment=lambda key, tags: None)

@decorators.ensure_uploads_allowed
def wrapped(context, request):
pytest.fail("wrapped view should not have been called")

with pytest.raises(HTTPForbidden) as excinfo:
with pytest.raises(error_type):
wrapped(pretend.stub(), req)

resp = excinfo.value

assert resp.status_code == 403
assert resp.status == f"403 {error}"

def test_fails_without_identity(self):
req = DummyRequest()
req.flags = pretend.stub(enabled=lambda f: False)
req.help_url = lambda *a, **k: "/path/to/help/"
req.metrics = pretend.stub(increment=lambda key, tags: None)

@decorators.ensure_uploads_allowed
def wrapped(context, request):
pytest.fail("wrapped view should not have been called")

with pytest.raises(HTTPForbidden) as excinfo:
with pytest.raises(decorators.MissingIdentityError):
wrapped(pretend.stub(), req)

resp = excinfo.value

assert resp.status_code == 403
assert resp.status == (
"403 Invalid or non-existent authentication information. "
"See /path/to/help/ for more information."
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@

from pyramid.httpexceptions import HTTPBadRequest

from warehouse.forklift import utils
from warehouse.forklift import errors


class TestExcWithMessage:
def test_exc_with_message(self):
exc = utils._exc_with_message(HTTPBadRequest, "My Test Message.")
exc = errors._exc_with_message(HTTPBadRequest, "My Test Message.")
assert isinstance(exc, HTTPBadRequest)
assert exc.status_code == 400
assert exc.status == "400 My Test Message."

def test_exc_with_exotic_message(self):
exc = utils._exc_with_message(
exc = errors._exc_with_message(
HTTPBadRequest, "look at these wild chars: аä‗"
)
assert isinstance(exc, HTTPBadRequest)
Expand All @@ -26,8 +26,8 @@ def test_exc_with_missing_message(self, monkeypatch):
sentry_sdk = pretend.stub(
capture_message=pretend.call_recorder(lambda message: None)
)
monkeypatch.setattr(utils, "sentry_sdk", sentry_sdk)
exc = utils._exc_with_message(HTTPBadRequest, "")
monkeypatch.setattr(errors, "sentry_sdk", sentry_sdk)
exc = errors._exc_with_message(HTTPBadRequest, "")
assert isinstance(exc, HTTPBadRequest)
assert exc.status_code == 400
assert exc.status == "400 Bad Request"
Expand Down
Loading
Loading