|
| 1 | +import hashlib |
| 2 | +import hmac |
| 3 | +import json |
| 4 | +import time |
| 5 | +from typing import get_args, get_type_hints |
| 6 | + |
| 7 | +import pytest |
| 8 | + |
| 9 | +from usesend import types |
| 10 | +from usesend.webhooks import ( |
| 11 | + WEBHOOK_SIGNATURE_HEADER, |
| 12 | + WEBHOOK_TIMESTAMP_HEADER, |
| 13 | + WebhookVerificationError, |
| 14 | + Webhooks, |
| 15 | +) |
| 16 | + |
| 17 | + |
| 18 | +def _sign(secret: str, timestamp: str, body: str) -> str: |
| 19 | + digest = hmac.new( |
| 20 | + secret.encode("utf-8"), |
| 21 | + f"{timestamp}.{body}".encode("utf-8"), |
| 22 | + hashlib.sha256, |
| 23 | + ).hexdigest() |
| 24 | + return f"v1={digest}" |
| 25 | + |
| 26 | + |
| 27 | +def test_verify_returns_false_for_non_utf8_bytes_body() -> None: |
| 28 | + webhooks = Webhooks("whsec_test") |
| 29 | + timestamp = str(int(time.time() * 1000)) |
| 30 | + |
| 31 | + is_valid = webhooks.verify( |
| 32 | + b"\xff", |
| 33 | + headers={ |
| 34 | + WEBHOOK_SIGNATURE_HEADER: "v1=deadbeef", |
| 35 | + WEBHOOK_TIMESTAMP_HEADER: timestamp, |
| 36 | + }, |
| 37 | + ) |
| 38 | + |
| 39 | + assert is_valid is False |
| 40 | + |
| 41 | + |
| 42 | +def test_construct_event_raises_invalid_body_for_non_utf8_bytes() -> None: |
| 43 | + webhooks = Webhooks("whsec_test") |
| 44 | + timestamp = str(int(time.time() * 1000)) |
| 45 | + |
| 46 | + with pytest.raises(WebhookVerificationError) as exc: |
| 47 | + webhooks.construct_event( |
| 48 | + b"\xff", |
| 49 | + headers={ |
| 50 | + WEBHOOK_SIGNATURE_HEADER: "v1=deadbeef", |
| 51 | + WEBHOOK_TIMESTAMP_HEADER: timestamp, |
| 52 | + }, |
| 53 | + ) |
| 54 | + |
| 55 | + assert exc.value.code == "INVALID_BODY" |
| 56 | + |
| 57 | + |
| 58 | +def test_email_webhook_event_type_excludes_specialized_events() -> None: |
| 59 | + email_event_type = get_type_hints(types.EmailWebhookEvent)["type"] |
| 60 | + supported = set(get_args(email_event_type)) |
| 61 | + |
| 62 | + assert "email.delivered" in supported |
| 63 | + assert "email.bounced" not in supported |
| 64 | + assert "email.failed" not in supported |
| 65 | + assert "email.suppressed" not in supported |
| 66 | + assert "email.opened" not in supported |
| 67 | + assert "email.clicked" not in supported |
| 68 | + |
| 69 | + |
| 70 | +def test_construct_event_parses_bounced_event_with_valid_signature() -> None: |
| 71 | + secret = "whsec_test" |
| 72 | + webhooks = Webhooks(secret) |
| 73 | + timestamp = str(int(time.time() * 1000)) |
| 74 | + |
| 75 | + payload = { |
| 76 | + "id": "evt_123", |
| 77 | + "type": "email.bounced", |
| 78 | + "createdAt": "2026-02-08T10:00:00.000Z", |
| 79 | + "data": { |
| 80 | + "id": "email_123", |
| 81 | + "status": "BOUNCED", |
| 82 | + "from": "from@example.com", |
| 83 | + "to": ["to@example.com"], |
| 84 | + "occurredAt": "2026-02-08T10:00:00.000Z", |
| 85 | + "bounce": { |
| 86 | + "type": "Permanent", |
| 87 | + "subType": "General", |
| 88 | + }, |
| 89 | + }, |
| 90 | + } |
| 91 | + body = json.dumps(payload) |
| 92 | + signature = _sign(secret, timestamp, body) |
| 93 | + |
| 94 | + event = webhooks.construct_event( |
| 95 | + body, |
| 96 | + headers={ |
| 97 | + WEBHOOK_SIGNATURE_HEADER: signature, |
| 98 | + WEBHOOK_TIMESTAMP_HEADER: timestamp, |
| 99 | + }, |
| 100 | + ) |
| 101 | + |
| 102 | + assert event["type"] == "email.bounced" |
| 103 | + assert event["data"]["bounce"]["type"] == "Permanent" |
0 commit comments