diff --git a/.config/codespell_ignore.txt b/.config/codespell_ignore.txt index cfdb00d50f8..1b79935161f 100644 --- a/.config/codespell_ignore.txt +++ b/.config/codespell_ignore.txt @@ -23,7 +23,7 @@ iff implementors inout interaktive -joinin +JoinIn merchantibility microsof mitre diff --git a/.gitignore b/.gitignore index fc08904957f..f41b47f7782 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ coverage.xml __pycache__/ doc/scapy/_build doc/scapy/api +.idea diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 00000000000..29bd8445480 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,78 @@ +# Change pip's cache directory to be inside the project directory since we can +# only cache local items. +variables: + PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip" + +# Pip's cache doesn't store the python packages +# https://pip.pypa.io/en/stable/reference/pip_install/#caching +# +# If you want to also cache the installed packages, you have to install +# them in a virtualenv and cache it as well. +cache: + paths: + - .cache/pip + - venv/ + - venv_pypy/ + +.default: + image: python:3.11 + tags: + - docker-ipv6 + before_script: + - pip install tox + +health: + extends: .default + script: + - tox -e flake8 + - tox -e spell + - tox -e twine + +mypy: + extends: .default + script: + - tox -e mypy + +py311: + image: dissecto/scapy-tests:latest + tags: + - docker-ipv6 + script: + - ./.config/ci/test.sh 3.11 non_root + +pypy3: + image: dissecto/scapy-tests-pypy:latest + tags: + - docker-ipv6 + script: + - ./.config/ci/test.sh pypy3 non_root + +.publish: + image: python:latest + needs: + - pypy3 + - py311 + - mypy + - health + tags: + - docker + script: + - pip install build twine + - python -m build + - TWINE_PASSWORD=${CI_JOB_TOKEN} TWINE_USERNAME=gitlab-ci-token python -m twine upload --verbose --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/* + +publish: + allow_failure: true + rules: + - if: $CI_COMMIT_BRANCH == "master" + when: always + - when: never + extends: .publish + +publish_tags: + extends: .publish + only: + - tags + except: + - branches + diff --git a/README.md b/README.md index 0da1b8ff279..55c62380109 100644 --- a/README.md +++ b/README.md @@ -100,3 +100,4 @@ The documentation (everything unless marked otherwise in `doc/`, and except the Want to contribute? Great! Please take a few minutes to [read this](CONTRIBUTING.md)! + diff --git a/pyproject.toml b/pyproject.toml index dcbfcc6750d..f79696c89a3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,10 +1,11 @@ [build-system] -requires = [ "setuptools>=62.0.0" ] +requires = ["setuptools>=64.0", "setuptools-scm>=8.0"] build-backend = "setuptools.build_meta" [project] name = "scapy" -dynamic = [ "version", "readme" ] +dynamic = [ "version"] +readme = "README.md" authors = [ { name="Philippe BIONDI" }, ] @@ -59,6 +60,7 @@ all = [ "matplotlib", ] doc = [ + "cryptography>=2.0", "sphinx>=7.0.0", "sphinx_rtd_theme>=1.3.0", "tox>=3.0.0", @@ -78,8 +80,7 @@ exclude = [ "doc*", ] -[tool.setuptools.dynamic] -version = { attr="scapy.VERSION" } +[tool.setuptools_scm] # coverage diff --git a/scapy/contrib/automotive/bmw/definitions.py b/scapy/contrib/automotive/bmw/definitions.py index 3746fc9a296..fe75c27cf5c 100644 --- a/scapy/contrib/automotive/bmw/definitions.py +++ b/scapy/contrib/automotive/bmw/definitions.py @@ -9,7 +9,7 @@ from scapy.packet import Packet, bind_layers from scapy.fields import ByteField, ShortField, ByteEnumField, X3BytesField, \ - StrField, StrFixedLenField, LEIntField, LEThreeBytesField, \ + StrField, StrFixedLenField, LEThreeBytesField, \ PacketListField, IntField, IPField, ThreeBytesField, ShortEnumField, \ XStrFixedLenField from scapy.contrib.automotive.uds import UDS, UDS_RDBI, UDS_DSC, UDS_IOCBI, \ @@ -321,14 +321,16 @@ class SVK(Packet): 3: "software entry incompatible to hardware entry", 4: "software entry incompatible with other software entry"} + @staticmethod + def get_length(p: Packet): + return len(p.original) - (8 * p.entries_count + 7) + fields_desc = [ ByteEnumField("prog_status1", 0, prog_status_enum), ByteEnumField("prog_status2", 0, prog_status_enum), ShortField("entries_count", 0), SVK_DateField("prog_date", 0), - ByteField("pad1", 0), - LEIntField("prog_milage", 0), - StrFixedLenField("pad2", b'\x00\x00\x00\x00\x00', length=5), + StrFixedLenField("pad", b'\x00', length_from=get_length), PacketListField("entries", [], SVK_Entry, count_from=lambda x: x.entries_count)] diff --git a/scapy/contrib/automotive/gm/gmlan.py b/scapy/contrib/automotive/gm/gmlan.py index ce88513c99d..76c9cf110f1 100644 --- a/scapy/contrib/automotive/gm/gmlan.py +++ b/scapy/contrib/automotive/gm/gmlan.py @@ -41,11 +41,11 @@ if conf.contribs['GMLAN']['treat-response-pending-as-answer']: pass except KeyError: - log_automotive.info("Specify \"conf.contribs['GMLAN'] = " - "{'treat-response-pending-as-answer': True}\" to treat " - "a negative response 'RequestCorrectlyReceived-" - "ResponsePending' as answer of a request. \n" - "The default value is False.") + # log_automotive.info("Specify \"conf.contribs['GMLAN'] = " + # "{'treat-response-pending-as-answer': True}\" to treat " + # "a negative response 'RequestCorrectlyReceived-" + # "ResponsePending' as answer of a request. \n" + # "The default value is False.") conf.contribs['GMLAN'] = {'treat-response-pending-as-answer': False} conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = None diff --git a/scapy/contrib/automotive/obd/obd.py b/scapy/contrib/automotive/obd/obd.py index 2256ed711d2..a165935d2c3 100644 --- a/scapy/contrib/automotive/obd/obd.py +++ b/scapy/contrib/automotive/obd/obd.py @@ -9,7 +9,6 @@ import struct -from scapy.contrib.automotive import log_automotive from scapy.contrib.automotive.obd.iid.iids import * from scapy.contrib.automotive.obd.mid.mids import * from scapy.contrib.automotive.obd.pid.pids import * @@ -24,11 +23,11 @@ if conf.contribs['OBD']['treat-response-pending-as-answer']: pass except KeyError: - log_automotive.info("Specify \"conf.contribs['OBD'] = " - "{'treat-response-pending-as-answer': True}\" to treat " - "a negative response 'requestCorrectlyReceived-" - "ResponsePending' as answer of a request. \n" - "The default value is False.") + # log_automotive.info("Specify \"conf.contribs['OBD'] = " + # "{'treat-response-pending-as-answer': True}\" to treat " + # "a negative response 'requestCorrectlyReceived-" + # "ResponsePending' as answer of a request. \n" + # "The default value is False.") conf.contribs['OBD'] = {'treat-response-pending-as-answer': False} diff --git a/scapy/contrib/automotive/scanner/enumerator.py b/scapy/contrib/automotive/scanner/enumerator.py index 98210f86a37..c778b56f53b 100644 --- a/scapy/contrib/automotive/scanner/enumerator.py +++ b/scapy/contrib/automotive/scanner/enumerator.py @@ -191,6 +191,12 @@ def _get_initial_requests(self, **kwargs): def __reduce__(self): # type: ignore f, t, d = super(ServiceEnumerator, self).__reduce__() # type: ignore + + try: + del d["_tester_present_sender"] + except KeyError: + pass + try: for k, v in d["_request_iterators"].items(): d["_request_iterators"][k] = list(v) @@ -287,6 +293,10 @@ def pre_execute(self, socket, state, global_configuration): except KeyError: self._tester_present_sender = None + def post_execute(self, socket, state, global_configuration): + # type: (_SocketUnion, EcuState, AutomotiveTestCaseExecutorConfiguration) -> None # noqa: E501 + self._tester_present_sender = None + def execute(self, socket, state, **kwargs): # type: (_SocketUnion, EcuState, Any) -> None self.check_kwargs(kwargs) diff --git a/scapy/contrib/automotive/scanner/executor.py b/scapy/contrib/automotive/scanner/executor.py index 6d58bc72d26..ec138bba409 100644 --- a/scapy/contrib/automotive/scanner/executor.py +++ b/scapy/contrib/automotive/scanner/executor.py @@ -156,20 +156,20 @@ def reset_target(self): def reconnect(self): # type: () -> None - if self.reconnect_handler: - try: - if self.socket: - self.socket.close() - except Exception as e: - log_automotive.exception( - "Exception '%s' during socket.close", e) - - log_automotive.info("Target reconnect") - socket = self.reconnect_handler() - if not isinstance(socket, SingleConversationSocket): - self.socket = SingleConversationSocket(socket) - else: - self.socket = socket + if not self.reconnect_handler: + return + + try: + if self.socket: + self.socket.close() + except Exception as e: + log_automotive.exception( + "Exception '%s' during socket.close", e) + + log_automotive.info("Target reconnect") + socket = self.reconnect_handler() + self.socket = socket if isinstance(socket, SingleConversationSocket) \ + else SingleConversationSocket(socket) if self.socket and self.socket.closed: raise Scapy_Exception( @@ -394,6 +394,20 @@ def enter_state(self, prev_state, next_state): trans_func, trans_kwargs, clean_func = funcs state_changed = trans_func( self.socket, self.configuration, trans_kwargs) + + if self.socket.closed: + for i in range(5): + try: + self.reconnect() + break + except Exception: + if i == 4: + raise + if self.configuration.stop_event: + self.configuration.stop_event.wait(1) + else: + time.sleep(1) + if state_changed: self.target_state = next_state @@ -410,15 +424,11 @@ def cleanup_state(self): Executes all collected cleanup functions from a traversed path :return: None """ - if not self.socket: - log_automotive.warning("Socket is None! Leaving cleanup_state") - return - for f in self.cleanup_functions: if not callable(f): continue try: - if not f(self.socket, self.configuration): + if not f(self.socket, self.configuration): # type: ignore log_automotive.info( "Cleanup function %s failed", repr(f)) except (OSError, ValueError, Scapy_Exception) as e: diff --git a/scapy/contrib/automotive/uds.py b/scapy/contrib/automotive/uds.py index 67a7bb59e52..9d038b2807b 100644 --- a/scapy/contrib/automotive/uds.py +++ b/scapy/contrib/automotive/uds.py @@ -21,7 +21,6 @@ PacketField from scapy.packet import Packet, bind_layers, NoPayload, Raw from scapy.config import conf -from scapy.error import log_loading from scapy.utils import PeriodicSenderThread from scapy.contrib.isotp import ISOTP @@ -35,11 +34,11 @@ if conf.contribs['UDS']['treat-response-pending-as-answer']: pass except KeyError: - log_loading.info("Specify \"conf.contribs['UDS'] = " - "{'treat-response-pending-as-answer': True}\" to treat " - "a negative response 'requestCorrectlyReceived-" - "ResponsePending' as answer of a request. \n" - "The default value is False.") + # log_loading.info("Specify \"conf.contribs['UDS'] = " + # "{'treat-response-pending-as-answer': True}\" to treat " + # "a negative response 'requestCorrectlyReceived-" + # "ResponsePending' as answer of a request. \n" + # "The default value is False.") conf.contribs['UDS'] = {'treat-response-pending-as-answer': False} diff --git a/scapy/contrib/automotive/uds_scan.py b/scapy/contrib/automotive/uds_scan.py index a23c7c6027d..2d077b28f7b 100644 --- a/scapy/contrib/automotive/uds_scan.py +++ b/scapy/contrib/automotive/uds_scan.py @@ -93,7 +93,8 @@ class UDS_DSCEnumerator(UDS_Enumerator, StateGeneratingServiceEnumerator): _supported_kwargs = copy.copy(ServiceEnumerator._supported_kwargs) _supported_kwargs.update({ 'delay_state_change': (int, lambda x: x >= 0), - 'overwrite_timeout': (bool, None) + 'overwrite_timeout': (bool, None), + 'close_socket_when_entering_session_2': (bool, None) }) _supported_kwargs["scan_range"] = ( (list, tuple, range), lambda x: max(x) < 0x100 and min(x) >= 0) @@ -112,7 +113,12 @@ class UDS_DSCEnumerator(UDS_Enumerator, StateGeneratingServiceEnumerator): unit-test scenarios, this value should be set to False, in order to use the timeout specified by the 'timeout' - argument.""" + argument. + :param bool close_socket_when_entering_session_2: False by default. + This enumerator will close the socket + if session 2 (ProgrammingSession) + was entered, if True. This will + force a reconnect by the executor.""" def _get_initial_requests(self, **kwargs): # type: (Any) -> Iterable[Packet] @@ -165,10 +171,21 @@ def get_new_edge(self, config # type: AutomotiveTestCaseExecutorConfiguration ): # type: (...) -> Optional[_Edge] edge = super(UDS_DSCEnumerator, self).get_new_edge(socket, config) + + try: + close_socket = config[UDS_DSCEnumerator.__name__]["close_socket_when_entering_session_2"] # noqa: E501 + except KeyError: + close_socket = False + if edge: state, new_state = edge # Force TesterPresent if session is changed new_state.tp = 1 # type: ignore + try: + if close_socket and new_state.session == 2: # type: ignore + new_state.tp = 0 # type: ignore + except (AttributeError, KeyError): + pass return state, new_state return None @@ -184,9 +201,30 @@ def enter_state_with_tp(sock, # type: _SocketUnion delay = conf[UDS_DSCEnumerator.__name__]["delay_state_change"] except KeyError: delay = 5 + + try: + close_socket = conf[UDS_DSCEnumerator.__name__]["close_socket_when_entering_session_2"] # noqa: E501 + except KeyError: + close_socket = False + conf.stop_event.wait(delay) state_changed = UDS_DSCEnumerator.enter_state( sock, conf, kwargs["req"]) + + try: + session = kwargs["req"].diagnosticSessionType + except AttributeError: + session = 0 + + if close_socket and session == 2: + if not hasattr(sock, "ip"): + log_automotive.warning("Likely closing a CAN based socket! " + "This might be a configuration issue.") + log_automotive.info( + "Entered Programming Session: Closing socket connection") + sock.close() + conf.stop_event.wait(delay) + if not state_changed: UDS_TPEnumerator.cleanup(sock, conf) return state_changed diff --git a/scapy/layers/sixlowpan.py b/scapy/layers/sixlowpan.py index 22bf979cf87..6de8ee38f55 100644 --- a/scapy/layers/sixlowpan.py +++ b/scapy/layers/sixlowpan.py @@ -1095,7 +1095,7 @@ class SixLoWPAN(Packet): @classmethod def dispatch_hook(cls, _pkt=b"", *args, **kargs): - """Depending on the payload content, the frame type we should interpret""" + """Depending on the payload content, the frame type we should interpret""" # noqa: E501 if _pkt and len(_pkt) >= 1: fb = ord(_pkt[:1]) if fb == 0x41: diff --git a/scapy/libs/crc.py b/scapy/libs/crc.py new file mode 100644 index 00000000000..849c4179779 --- /dev/null +++ b/scapy/libs/crc.py @@ -0,0 +1,909 @@ +# SPDX-License-Identifier: GPL-2.0-only +# This file is part of Scapy +# See https://scapy.net/ for more information +# Copyright (C) Philippe Biondi + +"""CRC (Cyclic Redundancy Check) Library + +This module provides a flexible and extensible framework for computing CRC +(Cyclic Redundancy Check) checksums. It supports various CRC algorithms with +different parameters and provides utilities for: + +- Computing CRCs with standard and custom parameters +- Searching for CRC patterns in binary data +- Testing CRC implementations against known test vectors +- Creating custom CRC variants dynamically + +Key Features: + - Pre-defined CRC algorithms (CRC-16, CRC-32, CRC-16-CCITT, CRC-32-AUTOSAR) + - Table-driven computation for performance + - Support for reflected/non-reflected input and output + - Custom header/trailer support + - CRC search and detection in binary streams + - Test vector validation + +CRC Parameters: + The CRC algorithms are defined by the following parameters: + + - poly: The generator polynomial (without the implicit highest bit) + - size: Size of the CRC in bits (e.g., 16, 32) + - init_crc: Initial value for the CRC register + - xor: Final XOR value applied to the result + - reflect_input: Whether to reflect (reverse bits of) input bytes + - reflect_output: Whether to reflect the final CRC value + - header: Optional bytes prepended before computing CRC + - trailer: Optional bytes appended before finalizing CRC + +Example Usage: + # Using pre-defined CRC algorithms + >>> from scapy.libs.crc import CRC_32 + >>> crc = CRC_32() + >>> checksum = crc(b"123456789") + >>> print(f"{checksum:#010x}") + 0xcbf43926 + + # Using context API for incremental computation + >>> crc = CRC_32() + >>> crc.init() + >>> crc.update(b"1234") + >>> crc.update(b"56789") + >>> checksum = crc.finish() + >>> print(f"{checksum:#010x}") + 0xcbf43926 + + # Creating custom CRC variant + >>> CustomCRC = CRC.from_parameters( + ... name="Custom-CRC16", + ... poly=0x1021, + ... size=16, + ... init_crc=0xFFFF, + ... xor=0, + ... reflect_input=False, + ... reflect_output=False + ... ) + >>> crc = CustomCRC() + >>> checksum = crc(b"Hello") + + # Searching for CRCs in binary data + >>> results = CRC.search(binary_data, min_substring_len=4) + >>> for (start, end), crc_value, crc_class in results: + ... print(f"Found {crc_class.name} at bytes {start}-{end}: {crc_value:#x}") + +Well-Known Polynomials: + The module includes well-known CRC polynomials from Wikipedia: + - 16-bit: CRC-16-CCITT (0x1021), CRC-16-IBM (0x8005), and others + - 32-bit: CRC-32 (0x04c11db7), CRC-32C (0x1edc6f41), and others + +References: + - https://en.wikipedia.org/wiki/Cyclic_redundancy_check + - https://reveng.sourceforge.io/crc-catalogue/ +""" + +from functools import lru_cache +from collections import defaultdict +import itertools +from typing import Set, List, Tuple, Any, Dict, Optional + + +# Taken from https://en.wikipedia.org/wiki/Cyclic_redundancy_check +# Only direct representation. Reversed, reciprocal, +# reversed reciprocal polynoms can be deduced. +WELL_KNOWN_POLY = { + 16: [0x1021, 0x8005, 0xa02b, 0x2f15, 0xc867, 0x0589, 0x8bb7, 0x3d65, + 0x5935, 0x755b, 0x1dcf], + 32: [0x04c11db7, 0x1edc6f41, 0x741b8cd7, 0x32583499, 0x814141ab, 0xf4acfb13] +} + + +class CRCParam: + """Container for CRC algorithm parameters. + + This class encapsulates all parameters needed to define a CRC algorithm, + including the polynomial, size, initialization value, XOR output, + reflection settings, and optional header/trailer bytes. + + Attributes: + poly (int): Generator polynomial (without implicit highest bit) + size (int): CRC size in bits (e.g., 16, 32) + init_crc (int): Initial value for the CRC register + xor (int): Final XOR value applied to the result + reflect_input (bool): Whether to reflect input bytes + reflect_output (bool): Whether to reflect final CRC value + header (bytes): Optional bytes prepended before CRC computation + trailer (bytes): Optional bytes appended before finalization + name (str): Descriptive name for this CRC variant + test_vectors (list): List of (input, expected_output) tuples for validation + + Example: + >>> param = CRCParam( + ... name="CRC-16", + ... poly=0x8005, + ... size=16, + ... init_crc=0, + ... xor=0, + ... reflect_input=True, + ... reflect_output=True + ... ) + >>> print(param) + + """ + + MISC = ["name", "test_vectors"] + PARAMETERS = ["poly", "size", "init_crc", "xor", + "reflect_input", "reflect_output"] + OPTIONS = ["header", "trailer"] + FMT = {"size": "", "reflect_input": "", "reflect_output": ""} + + def __init__(self, **args): + # type: (Any) -> None + """Initialize CRC parameters. + + Args: + **args: Keyword arguments for CRC parameters. Required parameters + are: poly, size, init_crc, xor, reflect_input, reflect_output. + Optional: header, trailer, name, test_vectors. + + Raises: + Exception: If any mandatory parameter is missing. + """ + self.remain = set(args) - set(self.PARAMETERS + self.OPTIONS + self.MISC) + + self.param = dict(header=b"", trailer=b"", test_vectors=[], + reflect_input=False, reflect_output=False) + try: + self.param.update({n: args[n] for n in self.PARAMETERS}) + except KeyError as e: + raise Exception(f"CRC parameter {e} is mandatory") + + self.param.update({n: args[n] for n in self.OPTIONS + self.MISC if n in args}) + self.__dict__.update(self.param) + if "name" not in self.param or self.param["name"] is None: + self.name = self.param["name"] = f"CRCsig_{self.signature()}" + + def copy(self): + # type: () -> CRCParam + """Create a deep copy of this CRCParam object. + + Returns: + CRCParam: A new CRCParam instance with the same parameters. + """ + return self.__class__(**self.param) + + def param_repr(self): + # type: () -> str + """Generate a string representation of the parameters. + + Returns: + str: Formatted string showing key CRC parameters. + """ + s = [f"{k}={getattr(self, k): {self.FMT.get(k, '#x')}}" + for k in self.PARAMETERS] + s += [f"+{k}" for k in self.OPTIONS if getattr(self, k)] + return ", ".join(s) + + def __repr__(self): + # type: () -> str + name = self.name if hasattr(self, "name") else "CRC param" + s = self.param_repr() + return f"" + + def __eq__(self, other): + # type: (object) -> bool + """Check equality based on all parameters and options. + + Args: + other: Object to compare with. + + Returns: + bool: True if all parameters match, False otherwise. + """ + return all(getattr(self, k) == getattr(other, k) + for k in self.PARAMETERS + self.OPTIONS) + + def __hash__(self): + # type: () -> int + """Compute hash based on parameters and options. + + Returns: + int: Hash value for this parameter set. + """ + return hash(tuple(getattr(self, k) for k in self.PARAMETERS + self.OPTIONS)) + + def __iter__(self): + """Iterate over all parameter names and their values. + + Yields: + tuple: (parameter_name, parameter_value) pairs. + """ + for k in self.PARAMETERS + self.MISC + self.OPTIONS: + yield (k, getattr(self, k)) + + def signature(self): + # type: () -> str + """Generate a unique signature string for this parameter set. + + The signature encodes all CRC parameters in a compact hexadecimal + string format that uniquely identifies this CRC variant. + + Returns: + str: Hexadecimal signature string. + """ + sig_end = ((self.reflect_input << 3) | (self.reflect_output << 2) + | (bool(self.header) << 1) | bool(self.trailer)) + # Use consistent zero-padding for all hex fields in signature + return f"{self.poly:0{self.size // 4}x}_{self.init_crc:0{self.size // 4}x}_{self.xor:0{self.size // 4}x}_{sig_end:x}" # noqa: E231,E501 + + +class _CRC_metaclass(type): + """Metaclass for CRC implementations. + + This metaclass automatically: + - Creates CRCParam from class attributes + - Pre-computes CRC lookup tables for performance + - Registers CRC classes in a global registry + - Provides factory methods for creating CRC variants + + The metaclass enables declarative CRC class definitions where you simply + specify the parameters as class attributes, and the metaclass handles + initialization and optimization automatically. + """ + + REGISTRY = set() # type: Set[CRC] + + def __new__(cls, name, bases, dct): + """Create a new CRC class with automatic initialization. + + Args: + name (str): Name of the class being created. + bases (tuple): Base classes. + dct (dict): Class dictionary with attributes. + + Returns: + type: The new CRC class with initialized tables and parameters. + """ + newcls = super(_CRC_metaclass, cls).__new__(cls, name, bases, dct) + if not hasattr(newcls, "name"): + newcls.name = newcls.__name__ + if bases: # exclude parent class because it is virtual + newcls.param = CRCParam(**dct) + newcls.precal_table = ( + cls._precalc_table_reflect + if newcls.reflect_input + else cls._precalc_table + ) + newcls.table = newcls.precal_table(newcls.poly, newcls.size) + if not getattr(newcls, "do_not_register", False): + newcls.REGISTRY.add(newcls) + newcls.mask = (1 << newcls.size) - 1 + else: + newcls.param = None + return newcls + + @staticmethod + @lru_cache(maxsize=128) + def _precalc_table_reflect(crcpoly, sz): + # type: (int, int) -> List[int] + """Pre-compute CRC lookup table for reflected (LSB-first) algorithms. + + For reflected CRCs, input bytes are processed LSB-first and the + polynomial operates on the lower bits of the CRC register. + + Args: + crcpoly (int): CRC polynomial. + sz (int): CRC size in bits. + + Returns: + List[int]: 256-entry lookup table for fast CRC computation. + """ + revpoly = CRC._reverse_bits(crcpoly, sz) + t = [] + for i in range(256): + crc = i + for j in range(8): + b0 = crc & 1 + crc >>= 1 + if b0: + crc ^= revpoly + t.append(crc) + return t + + @staticmethod + @lru_cache(maxsize=128) + def _precalc_table(crcpoly, sz): + # type: (int, int) -> List[int] + """Pre-compute CRC lookup table for non-reflected (MSB-first) algorithms. + + For non-reflected CRCs, input bytes are processed MSB-first and the + polynomial operates on the upper bits of the CRC register. + + Args: + crcpoly (int): CRC polynomial. + sz (int): CRC size in bits. + + Returns: + List[int]: 256-entry lookup table for fast CRC computation. + """ + t = [] + hbmsk = (1 << (sz - 1)) + msk = (1 << sz) - 1 + for i in range(256): + crc = i << (sz - 8) + for j in range(8): + bsz = crc & hbmsk + crc <<= 1 + if bsz: + crc ^= crcpoly + t.append(crc & msk) + return t + + @staticmethod + def _reverse_bits(x, sz): + # type: (int, int) -> int + """Reverse the bit order of an integer. + + Args: + x (int): Integer value to reverse. + sz (int): Number of significant bits to consider. + + Returns: + int: Integer with reversed bit order. + + Example: + >>> _reverse_bits(0b10110, 5) # Returns 0b01101 + 13 + """ + y = 0 + for i in range(sz): + y <<= 1 + y |= x & 1 + x >>= 1 + return y + + def from_parameters(self, crc_param=None, name=None, + do_not_register=False, **kargs): + # type: (Optional[CRCParam], Optional[str], bool, Any) -> type + """Create a new CRC class from parameters. + + This factory method creates a new CRC class dynamically with the + specified parameters, inheriting from the current CRC class. + + Args: + crc_param (CRCParam, optional): Pre-built CRC parameters object. + name (str, optional): Name for the new CRC class. + do_not_register (bool): If True, don't add to global registry. + **kargs: CRC parameters (poly, size, init_crc, xor, etc.). + + Returns: + type: A new CRC class with the specified parameters. + + Example: + >>> CustomCRC = CRC.from_parameters( + ... name="Custom16", + ... poly=0x1021, + ... size=16, + ... init_crc=0xFFFF, + ... xor=0, + ... reflect_input=False, + ... reflect_output=False + ... ) + >>> crc = CustomCRC() + >>> checksum = crc(b"test") + """ + if crc_param is None: + crc_param = CRCParam(name=name, **kargs) + p = dict(crc_param) + if name is not None: + p["name"] = name + p["do_not_register"] = do_not_register + cls = type(self).__new__(type(self), p["name"], (self,), p) + return cls + + def create_context(self): + # type: () -> CRC + """Create a new CRC computation context. + + This creates an instance of the CRC class without initializing it, + allowing manual control over the init/update/finish cycle. + + Returns: + CRC: A new CRC instance ready for init(). + """ + i = self.__new__(self) + i.__init__() + return i + + def _init(self): + # type: () -> int + """Initialize CRC computation with header bytes. + + Returns: + int: Initial CRC register value after processing header. + """ + return self._update(self.param.init_crc, self.param.header) + + def _update(self, crc, msg): + # type: (int, bytes) -> int + """Update CRC with message bytes. + + This is the core CRC computation using the pre-computed lookup table. + The algorithm varies based on whether input reflection is enabled. + + Args: + crc (int): Current CRC register value. + msg (bytes): Message bytes to process. + + Returns: + int: Updated CRC register value. + """ + if self.param.reflect_input: + # Reflected: process from LSB, shift right + for c in msg: + idx = (crc & 0xff) ^ c + crc >>= 8 + crc ^= self.table[idx] + else: + # Non-reflected: process from MSB, shift left + for c in msg: + idx = (crc >> (self.param.size - 8)) ^ c + crc <<= 8 + crc &= self.mask + crc ^= self.table[idx] + return crc + + def _finish(self, crc): + # type: (int) -> int + """Finalize CRC computation with trailer and XOR. + + Args: + crc (int): Current CRC register value. + + Returns: + int: Final CRC value after trailer, XOR, and output reflection. + """ + crc = self._update(crc, self.param.trailer) + crc = (crc ^ self.param.xor) & self.mask + if self.param.reflect_input ^ self.param.reflect_output: + crc = self._reverse_bits(crc, self.param.size) + return crc + + def __call__(self, msg): + # type: (bytes) -> int + """Compute CRC of a message in one call. + + Args: + msg (bytes): Message to compute CRC for. + + Returns: + int: Computed CRC value. + + Raises: + AssertionError: If msg is not bytes type. + """ + assert type(msg) is bytes, "type of input is bytes" + crc = self._init() + crc = self._update(crc, msg) + return self._finish(crc) + + def test(self): + # type: () -> bool + """Test CRC implementation against test vectors. + + Runs all test vectors defined in the CRC parameters and prints + results, showing whether each test passed or failed. + + Returns: + bool: True if all tests passed, False otherwise. + """ + ok = True + for (tvin, tvout) in self.param.test_vectors: + out = self(tvin) + ok &= (out == tvout) + # Format with 0x prefix and proper zero-padding (size//4 + 2 for '0x') + width = self.size // 4 + 2 + print(f"{self.name}\t({tvin.hex()})\t = {out:#0{width}x}\t{'ok' if out == tvout else f'FAILED. Expected {tvout:#0{width}x}'}".expandtabs(32)) # noqa: E501,E231 + return ok + + def __eq__(self, other): + # type: (object) -> bool + return hasattr(other, "param") and (self.param == other.param) + + def __hash__(self): + # type: () -> int + return hash(self.param) # if hasattr(self, "param") else 0) + + def __repr__(self): + # type: () -> str + repr = self.param.param_repr() if self.param else "-" + return f"<{self.name} {repr}>" + + def autotest(self): + # type: () -> bool + """Run tests on all registered CRC classes. + + Tests every CRC class in the global registry against their test + vectors and prints a summary. + + Returns: + bool: True if all tests passed, False otherwise. + """ + ok = 0 + n = len(self.REGISTRY) + ok = sum(c.test() for c in self.REGISTRY) + print(f"TOTAL: {ok}/{n} CRC test passed") + return ok == n + + def lookup(self, crc): + # type: (Any) -> Optional[type] + """Look up a CRC class in the registry by parameters. + + Args: + crc: Either a CRC instance or CRCParam to search for. + + Returns: + type or None: The matching CRC class if found, None otherwise. + """ + param = crc.param if isinstance(crc, self.__class__) else crc + for c in self.REGISTRY: + if c.param == param: + return c + return None + + def find_substring_from_crc(self, s, *target_crc): + # type: (bytes, int) -> List[Tuple[Tuple[int,int],int]] + """Find substrings in a byte sequence that produce specific CRC values. + + Searches for all substrings of the input that produce any of the + target CRC values when processed with this CRC algorithm. + + Args: + s (bytes): Byte sequence to search within. + *target_crc: One or more target CRC values to search for. + + Returns: + List[Tuple[Tuple[int,int],int]]: List of ((start, end), crc_value) + tuples for each matching substring. + + Example: + >>> crc = CRC_32() + >>> results = crc.find_substring_from_crc(data, 0x12345678) + >>> for (start, end), crc_val in results: + ... print(f"Substring at {start}:{end} has CRC {crc_val:#x}") + """ + data_len = len(s) + i = 0 + res = [] + while i < data_len: + j = i + c = self.create_context() + c.init() + while j < data_len: + c.update(s[j:j + 1]) + crc = c.finish() + if crc in target_crc: + res.append(((i, j), crc)) + j += 1 + i += 1 + return res + + def find_crc_from_string(self, s, *target_crc): + # type: (bytes, int) -> List[Tuple[int, type]] + """Find which registered CRC algorithms produce specific values for input. + + Tests all registered CRC algorithms on the input string and returns + those that produce any of the target CRC values. + + Args: + s (bytes): Input bytes to compute CRC for. + *target_crc: One or more target CRC values to match. + + Returns: + List[Tuple[int, type]]: List of (crc_value, CRC_class) tuples + for each matching algorithm. + + Example: + >>> results = CRC.find_crc_from_string(b"test", 0xabcd1234) + >>> for crc_val, crc_class in results: + ... print(f"{crc_class.name} produced {crc_val:#x}") + """ + res = [] + for crc in self.REGISTRY: + c = crc(s) + if c in target_crc: + res.append((c, crc)) + return res + + def search(self, s, min_substring_len=4, only_registry=False): + # type: (bytes, int, bool) -> List[Tuple[Tuple[int,int],int,type]] + """Search for CRC values embedded in a byte sequence. + + This powerful search function looks for CRC checksums within binary + data by: + 1. Extracting all possible CRC-sized values from the data + 2. Computing CRCs of all substrings with various algorithms + 3. Matching computed CRCs against extracted values + + This can help identify where CRCs are used in unknown protocols or + file formats. + + Args: + s (bytes): Binary data to search within. + min_substring_len (int): Minimum substring length to consider. + Default is 4 bytes. + only_registry (bool): If True, only test registered CRC algorithms. + If False (default), also test combinations of well-known + polynomials with different parameter variations. + + Returns: + List[Tuple[Tuple[int,int],int,type]]: List of + ((start, end), crc_value, CRC_class) tuples for each potential + CRC found in the data. + + Example: + >>> # Search for potential CRCs in a binary file + >>> with open("data.bin", "rb") as f: + ... data = f.read() + >>> results = CRC.search(data, min_substring_len=8) + >>> for (start, end), crc_val, crc_class in results: + ... print(f"Potential {crc_class.name} at offset {start}: " + ... f"data[{start}:{end}] -> CRC {crc_val:#x}") + """ + if only_registry: + crc_list = self.REGISTRY + else: + # Generate comprehensive set of CRC variants from well-known polynomials + crc_list = set() + for sz, poly_lst in WELL_KNOWN_POLY.items(): + msk = (1 << sz) - 1 + # Include both direct and bit-reversed polynomials + poly_lst_and_rev = ( + poly_lst + + [self._reverse_bits(p, sz) for p in poly_lst] + ) + # Test all combinations of parameters + crc_list |= { + self.from_parameters( + do_not_register=True, + poly=poly, size=sz, init_crc=init & msk, xor=xor & msk, + reflect_input=r_in, reflect_output=r_out) + for poly, init, xor, r_in, r_out + in itertools.product(poly_lst_and_rev, [0, -1], [0, -1], + [False, True], [False, True]) + } + + data_len = len(s) + sizes = set(c.size // 8 for c in crc_list) + + # Extract all potential CRC values from the data (both endiannesses) + targets = defaultdict(set) # type: Dict[int, Set[int]] + for sz in sizes: + i = 0 + while i <= data_len - sz: + ss = s[i:i + sz] + targets[sz].add(int.from_bytes(ss, "little")) + targets[sz].add(int.from_bytes(ss, "big")) + i += 1 + + # Group CRCs by size for efficient processing + crcs = defaultdict(list) # type: Dict[int, List[type]] + for c in crc_list: + crcs[c.size].append(c) + + res = [] + + # Create CRC contexts for each algorithm + ctx = {k // 8: [c.create_context() for c in v] for k, v in crcs.items()} + + # Search for matching CRCs + i = 0 + while i < data_len: + # Initialize all contexts + for clst in ctx.values(): + for c in clst: + c.init() + j = i + while j < data_len: + # Update all contexts with next byte + for sz in sizes: + for c in ctx[sz]: + c.update(s[j:j + 1]) + if j - i + 1 >= min_substring_len: + crc = c.finish() + # Check if this CRC matches any extracted value + if crc in targets[sz]: + res.append(((i, j + 1), crc, c.__class__)) + j += 1 + i += 1 + return res + + +class CRC(metaclass=_CRC_metaclass): + """Base class for CRC implementations. + + This is the base class for all CRC algorithms. Subclasses define specific + CRC variants by setting class attributes (poly, size, init_crc, etc.). + + The class provides two interfaces: + 1. Single-shot: crc_value = CRC_class()(message) + 2. Incremental (context API): init(), update(), finish() + + Example: + # Define a custom CRC-16 variant + >>> class MyCRC16(CRC): + ... name = "My-CRC16" + ... size = 16 + ... poly = 0x1021 + ... init_crc = 0xFFFF + ... xor = 0 + ... reflect_input = False + ... reflect_output = False + ... test_vectors = [(b"123456789", 0x29b1)] + + # Use it + >>> crc = MyCRC16() + >>> checksum = crc(b"test data") + """ + + def __init__(self): + """Initialize a CRC computation context. + + This prepares the CRC for incremental computation using the + init/update/finish pattern. + """ + self.init() + + # Context API: init()/update()/finish() + # finish() does not change state, so update()/finish() can be called again + + def init(self): + # type: () -> None + """Initialize CRC computation state. + + Resets the CRC register to the initial value (including header + processing). Call this before the first update() or to restart + computation. + """ + self.crc = self.__class__._init() + + def update(self, msg): + # type: (bytes) -> None + """Update CRC with additional message bytes. + + Incrementally processes message bytes, updating the internal CRC + state. Can be called multiple times to process a message in chunks. + + Args: + msg (bytes): Message bytes to process. + + Example: + >>> crc = CRC_32() + >>> crc.init() + >>> crc.update(b"Hello ") + >>> crc.update(b"World") + >>> checksum = crc.finish() + """ + self.crc = self.__class__._update(self.crc, msg) + + def finish(self): + # type: () -> int + """Finalize and return the CRC value. + + Completes CRC computation by processing trailer, applying final XOR, + and optionally reflecting the output. Does not modify the internal + state, so update() can be called again followed by another finish(). + + Returns: + int: The computed CRC value. + """ + return self.__class__._finish(self.crc) + + def __repr__(self): + # type: () -> str + return f"<{self.name} CTX>" + + +# Pre-defined CRC algorithms with test vectors + +class CRC_16(CRC): + """CRC-16 (also known as CRC-16-ANSI or CRC-16-IBM). + + This is one of the most common 16-bit CRC algorithms, used in many + protocols including Modbus, USB, and XMODEM. + + Parameters: + - Polynomial: 0x8005 (x^16 + x^15 + x^2 + 1) + - Initial value: 0x0000 + - Final XOR: 0x0000 + - Reflect input: Yes + - Reflect output: Yes + + Test vector: CRC("123456789") = 0xbb3d + """ + name = "CRC-16" + size = 16 + poly = 0x8005 + init_crc = 0 + xor = 0 + reflect_input = True + reflect_output = True + test_vectors = [(b"123456789", 0xbb3d)] + + +class CRC_32(CRC): + """CRC-32 (also known as CRC-32-IEEE 802.3). + + This is the standard 32-bit CRC used in Ethernet, ZIP, PNG, and many + other applications. It provides strong error detection for typical + data transmission scenarios. + + Parameters: + - Polynomial: 0x04c11db7 (x^32 + x^26 + x^23 + ... + x^2 + x + 1) + - Initial value: 0xffffffff + - Final XOR: 0xffffffff + - Reflect input: Yes + - Reflect output: Yes + + Test vector: CRC("123456789") = 0xcbf43926 + """ + name = "CRC-32" + size = 32 + poly = 0x4c11db7 + init_crc = 0xffffffff + xor = 0xffffffff + reflect_input = True + reflect_output = True + test_vectors = [(b"123456789", 0xcbf43926)] + + +class CRC_16_CCITT(CRC): + """CRC-16-CCITT (also known as KERMIT CRC or CRC-16-CCITT-TRUE). + + Used in XMODEM, Bluetooth, and many telecommunications protocols. + Note: There are several variants called "CRC-16-CCITT" with different + parameters; this is the "true" CCITT version with init=0. + + Parameters: + - Polynomial: 0x1021 (x^16 + x^12 + x^5 + 1) + - Initial value: 0x0000 + - Final XOR: 0x0000 + - Reflect input: Yes + - Reflect output: Yes + + Test vector: CRC(0xcb37) = 0x6b3e + """ + name = "CRC16 CCITT" + size = 16 + poly = 0x1021 + init_crc = 0 + xor = 0 + reflect_input = True + reflect_output = True + test_vectors = [(b"\xcb\x37", 0x6b3e)] + + +class CRC_32_AUTOSAR(CRC): + """CRC-32 used in AUTOSAR automotive standard. + + This CRC is specified in the AUTOSAR (Automotive Open System Architecture) + standard for automotive electronic control units (ECUs). It uses a + different polynomial than the standard CRC-32. + + Parameters: + - Polynomial: 0xf4acfb13 + - Initial value: 0xffffffff + - Final XOR: 0xffffffff + - Reflect input: Yes + - Reflect output: Yes + + Test vectors: + - CRC(0x00000000) = 0x6fb32240 + - CRC(0x332255aabbccddeeff) = 0xa65a343d + """ + name = "CRC32 AUTOSAR" + size = 32 + poly = 0xf4acfb13 + init_crc = 0xffffffff + xor = 0xffffffff + reflect_input = True + reflect_output = True + test_vectors = [(b"\0\0\0\0", 0x6fb32240), + (b"\x33\x22\x55\xAA\xBB\xCC\xDD\xEE\xFF", 0xa65a343d)] diff --git a/scapy/sendrecv.py b/scapy/sendrecv.py index 437241c4d9b..8732e34dc7c 100644 --- a/scapy/sendrecv.py +++ b/scapy/sendrecv.py @@ -74,28 +74,75 @@ class debug: # Send / Receive # #################### -_DOC_SNDRCV_PARAMS = """ +_DOC_SNDRCV_PARAMS_HEAD = """ :param pks: SuperSocket instance to send/receive packets - :param pkt: the packet to send - :param timeout: how much time to wait after the last packet has been sent - :param inter: delay between two packets during sending - :param verbose: set verbosity level - :param chainCC: if True, KeyboardInterrupts will be forwarded - :param retry: if positive, how many times to resend unanswered packets - if negative, how many times to retry when no more packets - are answered - :param multi: whether to accept multiple answers for the same stimulus - :param rcv_pks: if set, will be used instead of pks to receive packets. - packets will still be sent through pks - :param prebuild: pre-build the packets before starting to send them. - Automatically enabled when a generator is passed as the packet - :param _flood: - :param threaded: if True, packets are sent in a thread and received in another. - defaults to False. - :param session: a flow decoder used to handle stream of packets - :param chainEX: if True, exceptions during send will be forwarded + :type pkt: SuperSocket + """ + +_DOC_SNDRCV_PARAMS_BODY = """ + :param pkt: Packet or iterable of packets to be sent. + :type pkt: _PacketIterable + :param timeout: How much time to wait after the last packet + has been sent. Defaults to None. + :type timeout: Optional[int] + :param inter: Delay between two packets during sending. Defaults to 0. + :type inter: Optional[int] + + :param verbose: Set verbosity level. Defaults to None. + :type verbose: Optional[int] + + :param chainCC: If True, KeyboardInterrupts will be forwarded. + Defaults to False. + :type chainCC: Optional[bool] + + :param retry: If positive, how many times to resend unanswered packets. + If negative, how many times to retry when no more packets + are answered. Defaults to 0. + :type retry: Optional[int] + + :param multi: Whether to accept multiple answers for the same stimulus. + Defaults to False. + :type multi: Optional[bool] + + :param rcv_pks: If set, will be used instead of pks to receive packets. + Packets will still be sent through pks. + Defaults to None. + :type rcv_pks: Optional[SuperSocket] + + :param prebuild: Pre-build the packets before starting to send them. + Automatically enabled when a generator is passed as the + packet. Defaults to False. + :type prebuild: Optional[bool] + + :param _flood: _FloodGenerator object, internally used by `flood()` + methods. Defaults to None. + :type _flood: Optional[_FloodGenerator] + + :param threaded: If True, packets will be sent in an individual thread. + Defaults to False. + :type threaded: Optional[bool] + + :param session: A flow decoder used to handle the stream of packets. + Defaults to None. + :type session: Optional[_GlobSessionType] + + :param chainEX: If True, exceptions during send will be forwarded. + Defaults to False. + :type chainEX: Optional[bool] :param stop_filter: Python function applied to each packet to determine if - we have to stop the capture after this packet. + we have to stop the capture after this packet. + :type stop_filter: Optional[Callable[[Packet], bool]] +""" + +_DOC_SNDRCV_PARAMS_TAIL = """ + :return: A tuple, consisting of two packet lists, one with + answered packets, the other with unanswered packets + :rtype: Tuple[SndRcvList, PacketList] + """ + +_DOC_SNDRCV1_PARAMS_TAIL = """ + :return: A received Packet answering the sent packet, or None + :rtype: Optional[Packet] """ @@ -747,9 +794,26 @@ def srp1(*args, **kargs): # Append doc -for sr_func in [srp, srp1, sr, sr1]: +for sr_func in [srp, sr]: + if sr_func.__doc__ is not None: + sr_func.__doc__ += (_DOC_SNDRCV_PARAMS_HEAD + + _DOC_SNDRCV_PARAMS_BODY + + _DOC_SNDRCV_PARAMS_TAIL) + +for sr_func in [srp1, sr1]: if sr_func.__doc__ is not None: - sr_func.__doc__ += _DOC_SNDRCV_PARAMS + sr_func.__doc__ += (_DOC_SNDRCV_PARAMS_HEAD + + _DOC_SNDRCV_PARAMS_BODY + + _DOC_SNDRCV1_PARAMS_TAIL) + +# Append doc in SuperSocket +for sr_func in [SuperSocket.sr]: + if sr_func.__doc__ is not None: + sr_func.__doc__ += _DOC_SNDRCV_PARAMS_BODY + _DOC_SNDRCV_PARAMS_TAIL + +for sr_func in [SuperSocket.sr1]: + if sr_func.__doc__ is not None: + sr_func.__doc__ += _DOC_SNDRCV_PARAMS_BODY + _DOC_SNDRCV1_PARAMS_TAIL # SEND/RECV LOOP METHODS @@ -1029,40 +1093,49 @@ def srp1flood(x, # type: _PacketIterable # SNIFF METHODS -class AsyncSniffer(object): - """ - Sniff packets and return a list of packets. - - Args: - count: number of packets to capture. 0 means infinity. - store: whether to store sniffed packets or discard them - prn: function to apply to each packet. If something is returned, it - is displayed. - --Ex: prn = lambda x: x.summary() - session: a session = a flow decoder used to handle stream of packets. - --Ex: session=TCPSession - See below for more details. - filter: BPF filter to apply. - lfilter: Python function applied to each packet to determine if - further action may be done. - --Ex: lfilter = lambda x: x.haslayer(Padding) - offline: PCAP file (or list of PCAP files) to read packets from, - instead of sniffing them - quiet: when set to True, the process stderr is discarded - (default: False). - timeout: stop sniffing after a given time (default: None). - L2socket: use the provided L2socket (default: use conf.L2listen). - opened_socket: provide an object (or a list of objects) ready to use - .recv() on. - stop_filter: Python function applied to each packet to determine if - we have to stop the capture after this packet. - --Ex: stop_filter = lambda x: x.haslayer(TCP) - iface: interface or list of interfaces (default: None for sniffing - on the default interface). - monitor: use monitor mode. May not be available on all OS - started_callback: called as soon as the sniffer starts sniffing - (default: None). +_DOC_SNIFF_PARAMS = """ + :param count: Number of packets to capture. 0 means infinity. + :type count: int + :param store: Whether to store sniffed packets or discard them. + :type store: bool + :param offline: PCAP file (or list of PCAP files) to read packets from, + instead of sniffing them. + :type offline: Any + :param quiet: When set to True, the process stderr is discarded. + (default: False). + :type quiet: bool + :param prn: Function to apply to each packet. If something is returned, + it is displayed. + --Ex: prn = lambda x: x.summary() + :type prn: Optional[Callable[[Packet], Any]] + :param lfilter: Python function applied to each packet to determine if + further action may be done. + :type lfilter: Optional[Callable[[Packet], bool]] + :param L2socket: Use the provided L2socket (default: use conf.L2listen). + :type L2socket: Optional[Type[SuperSocket]] + :param timeout: Stop sniffing after a given time (default: None). + :type timeout: Optional[int] + :param opened_socket: Provide an object (or a list of objects) ready to + use .recv() on. + :type opened_socket: Optional[SuperSocket] + :param stop_filter: Python function applied to each packet to determine if + we have to stop the capture after this packet. + :type stop_filter: Optional[Callable[[Packet], bool]] + :param iface: Interface or list of interfaces (default: None for sniffing + on all interfaces). + :type iface: Optional[_GlobInterfaceType] + :param started_callback: Called as soon as the sniffer starts sniffing + (default: None). + :type started_callback: Optional[Callable[[], Any]] + :param session: A session, which is a flow decoder used to handle a stream + of packets. See the documentation for more details. + :type session: Optional[_GlobSessionType] + :param session_kwargs: Additional keyword arguments for session initialization. + :type session_kwargs: Dict[str, Any] +""" + +_DOC_ASYNC_SNIFF = """ The iface, offline and opened_socket parameters can be either an element, a list of elements, or a dict object mapping an element to a label (see examples below). @@ -1092,6 +1165,11 @@ class AsyncSniffer(object): >>> t.stop() """ + +class AsyncSniffer(object): + """Sniff packets and return a list of packets. + """ + def __init__(self, *args, **kwargs): # type: (*Any, **Any) -> None # Store keyword arguments @@ -1312,6 +1390,7 @@ def stop_cb(): try: # Make sure it's closed s.close() + log_interactive.exception("Closed socket %s", s) except Exception as ex2: msg = " close() failed with '%s'" % ex2 warning( @@ -1371,6 +1450,10 @@ def join(self, *args, **kwargs): raise self.exception +AsyncSniffer.__doc__ = ((AsyncSniffer.__doc__ or "") + _DOC_SNIFF_PARAMS + + _DOC_ASYNC_SNIFF) + + @conf.commands.register def sniff(*args, **kwargs): # type: (*Any, **Any) -> PacketList @@ -1379,7 +1462,7 @@ def sniff(*args, **kwargs): return cast(PacketList, sniffer.results) -sniff.__doc__ = AsyncSniffer.__doc__ +SuperSocket.sniff.__doc__ = sniff.__doc__ = AsyncSniffer.__doc__ @conf.commands.register @@ -1396,18 +1479,21 @@ def bridge_and_sniff(if1, # type: _GlobInterfaceType """Forward traffic between interfaces if1 and if2, sniff and return the exchanged packets. - :param if1: the interfaces to use (interface names or opened sockets). - :param if2: - :param xfrm12: a function to call when forwarding a packet from if1 to + :param if1: The interfaces to use (interface names or opened sockets). + :type if1: _GlobInterfaceType + + :param if2: The interfaces to use (interface names or opened sockets). + :type if2: _GlobInterfaceType + + :param xfrm12: A function to call when forwarding a packet from if1 to if2. If it returns True, the packet is forwarded as it. If it returns False or None, the packet is discarded. If it returns a - packet, this packet is forwarded instead of the original packet - one. - :param xfrm21: same as xfrm12 for packets forwarded from if2 to if1. + packet, this packet is forwarded instead of the original packet. + :type xfrm12: Optional[Callable[[Packet], Union[Packet, bool]]] + + :param xfrm21: Same as xfrm12 for packets forwarded from if2 to if1. + :type xfrm21: Optional[Callable[[Packet], Union[Packet, bool]]] - The other arguments are the same than for the function sniff(), - except for offline, opened_socket and iface that are ignored. - See help(sniff) for more. """ for arg in ['opened_socket', 'offline', 'iface']: if arg in kargs: @@ -1480,11 +1566,15 @@ def prn(pkt): *args, **kargs) +bridge_and_sniff.__doc__ = (bridge_and_sniff.__doc__ or "") + _DOC_SNIFF_PARAMS + + @conf.commands.register def tshark(*args, **kargs): # type: (Any, Any) -> None """Sniff packets and print them calling pkt.summary(). - This tries to replicate what text-wireshark (tshark) would look like""" + This tries to replicate what text-wireshark (tshark) would look like. + """ if 'iface' in kargs: iface = kargs.get('iface') @@ -1505,3 +1595,7 @@ def _cb(pkt): sniff(prn=_cb, store=False, *args, **kargs) print("\n%d packet%s captured" % (i[0], 's' if i[0] > 1 else '')) + + +tshark.__doc__ = (tshark.__doc__ or "") + _DOC_SNIFF_PARAMS +SuperSocket.tshark.__doc__ = tshark.__doc__ diff --git a/setup.py b/setup.py deleted file mode 100755 index 9869dc1ce9d..00000000000 --- a/setup.py +++ /dev/null @@ -1,88 +0,0 @@ -#! /usr/bin/env python - -""" -Setuptools setup file for Scapy. -""" - -import io -import os -import sys - -if sys.version_info[0] <= 2: - raise OSError("Scapy no longer supports Python 2 ! Please use Scapy 2.5.0") - -try: - from setuptools import setup - from setuptools.command.sdist import sdist - from setuptools.command.build_py import build_py -except: - raise ImportError("setuptools is required to install scapy !") - - -def get_long_description(): - """ - Extract description from README.md, for PyPI's usage - """ - def process_ignore_tags(buffer): - return "\n".join( - x for x in buffer.split("\n") if "" not in x - ) - try: - fpath = os.path.join(os.path.dirname(__file__), "README.md") - with io.open(fpath, encoding="utf-8") as f: - readme = f.read() - desc = readme.partition("")[2] - desc = desc.partition("")[0] - return process_ignore_tags(desc.strip()) - except IOError: - return None - - -# Note: why do we bother including a 'scapy/VERSION' file and doing our -# own versioning stuff, instead of using more standard methods? -# Because it's all garbage. - -# If you remain fully standard, there's no way -# of adding the version dynamically, even less when using archives -# (currently, we're able to add the version anytime someone exports Scapy -# on github). - -# If you use setuptools_scm, you'll be able to have the git tag set into -# the wheel (therefore the metadata), that you can then retrieve using -# importlib.metadata, BUT it breaks sdist (source packages), as those -# don't include metadata. - - -def _build_version(path): - """ - This adds the scapy/VERSION file when creating a sdist and a wheel - """ - fn = os.path.join(path, 'scapy', 'VERSION') - with open(fn, 'w') as f: - f.write(__import__('scapy').VERSION) - - -class SDist(sdist): - """ - Modified sdist to create scapy/VERSION file - """ - def make_release_tree(self, base_dir, *args, **kwargs): - super(SDist, self).make_release_tree(base_dir, *args, **kwargs) - # ensure there's a scapy/VERSION file - _build_version(base_dir) - - -class BuildPy(build_py): - """ - Modified build_py to create scapy/VERSION file - """ - def build_package_data(self): - super(BuildPy, self).build_package_data() - # ensure there's a scapy/VERSION file - _build_version(self.build_lib) - -setup( - cmdclass={'sdist': SDist, 'build_py': BuildPy}, - long_description=get_long_description(), - long_description_content_type='text/markdown', -) diff --git a/test/contrib/automotive/uds.uts b/test/contrib/automotive/uds.uts index 10691710a91..904222f2127 100644 --- a/test/contrib/automotive/uds.uts +++ b/test/contrib/automotive/uds.uts @@ -1046,20 +1046,6 @@ assert rdtcipr.DTCCount == 0xddaa assert rdtcipr.answers(rdtci) -rdtcipr1 = UDS(b'\x59\x02\xff\x11\x07\x11\'\x022\x12\'\x01\x07\x11\'\x01\x18\x12\'\x01\x13\x12\'\x01"\x11\'\x06C\x00\'\x06S\x00\'\x161\x00\'\x14\x03\x12\'') - -assert len(rdtcipr1.DTCAndStatusRecord) == 10 -assert rdtcipr1.DTCAndStatusRecord[0].dtc.system == 0 -assert rdtcipr1.DTCAndStatusRecord[0].dtc.type == 1 -assert rdtcipr1.DTCAndStatusRecord[0].dtc.numeric_value_code == 263 -assert rdtcipr1.DTCAndStatusRecord[0].dtc.additional_information_code == 17 -assert rdtcipr1.DTCAndStatusRecord[0].status == 0x27 -assert rdtcipr1.DTCAndStatusRecord[-1].dtc.system == 0 -assert rdtcipr1.DTCAndStatusRecord[-1].dtc.type == 1 -assert rdtcipr1.DTCAndStatusRecord[-1].dtc.numeric_value_code == 1027 -assert rdtcipr1.DTCAndStatusRecord[-1].dtc.additional_information_code == 18 -assert rdtcipr1.DTCAndStatusRecord[-1].status == 0x27 - = Check UDS_RDTCI rdtci = UDS(b'\x19\x02\xff') @@ -1173,6 +1159,21 @@ assert rdtcipr.DTCAndStatusRecord[0].status == 2 assert not rdtcipr.answers(rdtci) += Check UDS_RDTCIPR extended data + +p = UDS(b'Y\x06\x80SV`\x01\x00\x02\x01\x03\x15') + +assert len(p.extendedDataRecord.extendedData) == 3 + +assert p.extendedDataRecord.extendedData[0].data_type == 1 +assert p.extendedDataRecord.extendedData[1].data_type == 2 +assert p.extendedDataRecord.extendedData[2].data_type == 3 + +assert p.extendedDataRecord.extendedData[0].record == 0 +assert p.extendedDataRecord.extendedData[1].record == 1 +assert p.extendedDataRecord.extendedData[2].record == 0x15 + + = Check UDS_RDTCIPR rdtcipr = UDS(b'\x59\x03\xff\xee\xdd\xaa') diff --git a/test/regression.uts b/test/regression.uts index fb648178ab1..4670052b2e4 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -1006,7 +1006,7 @@ save_object(fname, 2807) assert load_object(fname) == 2807 = Test whois function -~ netaccess +~ netaccess disabled if not WINDOWS: result = whois("193.0.6.139") @@ -1830,7 +1830,7 @@ def _test(): retry_test(_test) = Whois request -~ netaccess IP +~ netaccess IP disabled * This test retries on failure because it often fails def _test(): IP(src="8.8.8.8").whois() @@ -1838,7 +1838,7 @@ def _test(): retry_test(_test) = AS resolvers -~ netaccess IP as_resolvers +~ netaccess IP as_resolvers disabled * This test retries on failure because it often fails def _test(): @@ -1874,7 +1874,7 @@ assert len(tmp) == 3 assert [l[1] for l in tmp] == ['AS24776', 'AS36459', 'AS26496'] = AS resolver - IPv6 -~ netaccess IP +~ netaccess IP disabled * This test retries on failure because it often fails def _test():