diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7d91954a4..bb01aabfe 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -25,12 +25,6 @@ jobs: fail-fast: false matrix: include: - - tox_env: py27-m_ans-ans2.10 - - tox_env: py27-m_ans-ans4 - - - tox_env: py36-m_ans-ans2.10 - - tox_env: py36-m_ans-ans4 - - tox_env: py27-m_mtg - tox_env: py36-m_mtg @@ -86,34 +80,6 @@ jobs: fail-fast: false matrix: include: - - tox_env: py311-m_ans-ans2.10 - python_version: '3.11' - - tox_env: py311-m_ans-ans3 - python_version: '3.11' - - tox_env: py311-m_ans-ans4 - python_version: '3.11' - - tox_env: py311-m_ans-ans5 - python_version: '3.11' - - tox_env: py313-m_ans-ans6 - python_version: '3.13' - - tox_env: py313-m_ans-ans7 - python_version: '3.13' - - tox_env: py313-m_ans-ans8 - python_version: '3.13' - - tox_env: py314-m_ans-ans9 - python_version: '3.14' - - tox_env: py314-m_ans-ans10 - python_version: '3.14' - - tox_env: py314-m_ans-ans11 - python_version: '3.14' - - tox_env: py314-m_ans-ans12 - python_version: '3.14' - - tox_env: py314-m_ans-ans13 - python_version: '3.14' - - - tox_env: py314-m_ans-ans13-s_lin - python_version: '3.14' - - tox_env: py314-m_mtg python_version: '3.14' @@ -161,11 +127,6 @@ jobs: fail-fast: false matrix: include: - - tox_env: py314-m_lcl-ans13 - python_version: '3.14' - - tox_env: py314-m_lcl-ans13-s_lin - python_version: '3.14' - - tox_env: py314-m_mtg python_version: '3.14' diff --git a/mitogen/core.py b/mitogen/core.py index 666905caf..ef29e74ff 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -141,7 +141,6 @@ def threading__thread_name(thread): return thread.getName() if sys.version_info >= (2, 5): all, any = all, any BaseException = BaseException - def _update_linecache(path, data): pass else: import linecache BaseException = Exception @@ -152,6 +151,7 @@ def _update_linecache(path, data): """ if 'mitogen' not in path: return + data = zlib.decompress(data, -15) linecache.cache[path] = (len(data), 0.0, data.splitlines(True), path) def all(it): @@ -1348,13 +1348,14 @@ def __init__(self, router, context, core_src, whitelist=(), blacklist=()): # Presence of an entry in this map indicates in-flight GET_MODULE. self._callbacks = {} self._cache = {} - if core_src: + if sys.version_info < (2, 5) and core_src: _update_linecache('x/mitogen/core.py', core_src) + if core_src: self._cache['mitogen.core'] = ( 'mitogen.core', None, 'x/mitogen/core.py', - zlib.compress(core_src, 9), + core_src, [], ) self._install_handler(router) @@ -1562,10 +1563,7 @@ def _on_load_module(self, msg): try: self._cache[fullname] = tup if sys.version_info < (2, 5) and tup[2] is not None: - _update_linecache( - path='master:' + tup[2], - data=zlib.decompress(tup[3]) - ) + _update_linecache('master:' + tup[2], tup[3]) callbacks = self._callbacks.pop(fullname, []) finally: self._lock.release() @@ -1721,7 +1719,7 @@ def get_source(self, fullname): if compressed is None: raise ModuleNotFoundError(self.absent_msg % (fullname,)) - source = zlib.decompress(self._cache[fullname][3]) + source = zlib.decompress(self._cache[fullname][3], -15) if sys.version_info >= (3, 0): return to_text(source) return source @@ -4135,16 +4133,19 @@ def _setup_importer(self): importer._install_handler(self.router) importer._context = self.parent else: - core_src_fd = self.config.get('core_src_fd', 101) - if core_src_fd: - fp = os.fdopen(core_src_fd, 'rb', 0) + preamble_fd = self.config.get('preamble_fd', 101) + if preamble_fd: + fp = os.fdopen(preamble_fd, 'rb', 0) try: - core_src = fp.read() - # Strip "ExternalContext.main()" call from last line. - core_src = b('\n').join(core_src.splitlines()[:-1]) + preamble = fp.read() finally: fp.close() + # Remove compressed "ExternalContext.main(...)" suffix. + stage2_prefix = preamble[:self.config['core_src_size']] + # Close deflate stream, append empty block with BFINAL bit set + core_src = stage2_prefix + b('\x03\x00') else: + stage2_prefix = None core_src = None importer = Importer( @@ -4154,6 +4155,7 @@ def _setup_importer(self): self.config.get('whitelist', ()), self.config.get('blacklist', ()), ) + self.router._stage2_prefix = stage2_prefix self.importer = importer self.router.importer = importer diff --git a/mitogen/fork.py b/mitogen/fork.py index d77ed6f21..275601d47 100644 --- a/mitogen/fork.py +++ b/mitogen/fork.py @@ -199,7 +199,7 @@ def _wrap_child_main(self, childfp): def get_econtext_config(self): config = super(Connection, self).get_econtext_config() - config['core_src_fd'] = None + config['preamble_fd'] = None config['importer'] = self.options.importer config['send_ec2'] = False config['setup_package'] = False diff --git a/mitogen/master.py b/mitogen/master.py index 03c897711..efe6004f7 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -234,19 +234,6 @@ def _py_filename(path): return None, False -def _get_core_source(): - """ - Master version of parent.get_core_source(). - """ - source = inspect.getsource(mitogen.core) - return mitogen.minify.minimize_source(source) - - -if mitogen.is_master: - # TODO: find a less surprising way of installing this. - mitogen.parent._get_core_source = _get_core_source - - class ThreadWatcher(object): """ Manage threads that wait for another thread to shut down, before invoking @@ -1040,7 +1027,7 @@ def __init__(self, router): self._log = logging.getLogger('mitogen.responder') self._router = router self._finder = ModuleFinder() - self._cache = {} # fullname -> pickled + self._cache = {} self.blacklist = [] self.whitelist = [''] @@ -1152,7 +1139,9 @@ def _build_tuple(self, fullname): if fullname == '__main__': source = self.neutralize_main(path, source) - compressed = mitogen.core.Blob(zlib.compress(source, 9)) + compressor = zlib.compressobj(9, 8, -15) + compressed = compressor.compress(source) + compressor.flush() + compressed = mitogen.core.Blob(compressed) related = [ to_text(name) for name in self._finder.find_related(fullname) @@ -1411,6 +1400,10 @@ def __init__(self, broker=None, max_message_size=None): self.upgrade() def upgrade(self): + compressor = zlib.compressobj(9, zlib.DEFLATED, -15) + core_src = inspect.getsource(mitogen.core) + core_src = mitogen.minify.minimize_source(core_src) + self._stage2_prefix = compressor.compress(core_src.encode('utf-8')) + compressor.flush(zlib.Z_FULL_FLUSH) self.id_allocator = IdAllocator(self) self.responder = ModuleResponder(self) self.resource_responder = ResourceResponder(self) diff --git a/mitogen/parent.py b/mitogen/parent.py index 721e3c0eb..ecf6ae403 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -58,6 +58,7 @@ select = __import__('select') import mitogen.core +from mitogen.core import BytesIO from mitogen.core import b from mitogen.core import bytes_partition from mitogen.core import IOLOG @@ -146,9 +147,6 @@ def _ioctl_cast(n): if name.startswith('SIG') and not name.startswith('SIG_') ) -_core_source_lock = threading.Lock() -_core_source_partial = None - def get_log_level(): return (LOG.getEffectiveLevel() or logging.INFO) @@ -170,35 +168,6 @@ def get_sys_executable(): return '/usr/bin/python' -def _get_core_source(): - """ - In non-masters, simply fetch the cached mitogen.core source code via the - import mechanism. In masters, this function is replaced with a version that - performs minification directly. - """ - return inspect.getsource(mitogen.core) - - -def get_core_source_partial(): - """ - _get_core_source() is expensive, even with @lru_cache in minify.py, threads - can enter it simultaneously causing severe slowdowns. - """ - global _core_source_partial - - if _core_source_partial is None: - _core_source_lock.acquire() - try: - if _core_source_partial is None: - _core_source_partial = PartialZlib( - _get_core_source().encode('utf-8') - ) - finally: - _core_source_lock.release() - - return _core_source_partial - - def get_default_remote_name(): """ Return the default name appearing in argv[0] of remote machines. @@ -680,43 +649,6 @@ def expire(self): timer.func() -class PartialZlib(object): - """ - Because the mitogen.core source has a line appended to it during bootstrap, - it must be recompressed for each connection. This is not a problem for a - small number of connections, but it amounts to 30 seconds CPU time by the - time 500 targets are in use. - - For that reason, build a compressor containing mitogen.core and flush as - much of it as possible into an initial buffer. Then to append the custom - line, clone the compressor and compress just that line. - - A full compression costs ~6ms on a modern machine, this method costs ~35 - usec. - """ - def __init__(self, s): - self.s = s - if sys.version_info > (2, 5): - self._compressor = zlib.compressobj(9) - self._out = self._compressor.compress(s) - self._out += self._compressor.flush(zlib.Z_SYNC_FLUSH) - else: - self._compressor = None - - def append(self, s): - """ - Append the bytestring `s` to the compressor state and return the - final compressed output. - """ - if self._compressor is None: - return zlib.compress(self.s + s, 9) - else: - compressor = self._compressor.copy() - out = self._out - out += compressor.compress(s) - return out + compressor.flush() - - def _upgrade_broker(broker): """ Extract the poller state from Broker and replace it with the industrial @@ -1446,10 +1378,9 @@ def _first_stage(): V='V' # Stop looping if no more data is needed or EOF is detected (empty bytes). while n-len(C) and V:select.select([0],[],[]);V=os.read(0,n-len(C));C+=V - # Raises `zlib.error` if compressed preamble is truncated or invalid - C=zlib.decompress(C) f=os.fdopen(W,'wb',0) - f.write(C) + # Raises `zlib.error` if compressed preamble is truncated or invalid + f.write(zlib.decompress(C,-15)) f.close() f=os.fdopen(w,'wb',0) f.write(C) @@ -1510,15 +1441,17 @@ def get_econtext_config(self): 'blacklist': self._router.get_module_blacklist(), 'max_message_size': self.options.max_message_size, 'version': mitogen.__version__, + 'core_src_size': len(self._router._stage2_prefix), } def get_preamble(self): - suffix = ( - '\nExternalContext(%r).main()\n' % - (self.get_econtext_config(),) - ) - partial = get_core_source_partial() - return partial.append(suffix.encode('utf-8')) + suffix = u'\nExternalContext(%r).main()\n' % self.get_econtext_config() + compressor = zlib.compressobj(9, zlib.DEFLATED, -15) + f = BytesIO() + f.write(self._router._stage2_prefix) + f.write(compressor.compress(suffix.encode('utf-8'))) + f.write(compressor.flush()) + return f.getvalue() def _get_name(self): """ diff --git a/tests/importer_test.py b/tests/importer_test.py index 33aafe931..81e143afe 100644 --- a/tests/importer_test.py +++ b/tests/importer_test.py @@ -10,13 +10,17 @@ import mock import mitogen.core -import mitogen.utils from mitogen.core import b import testlib import simple_pkg.imports_replaces_self +def compress(s): + compressor = zlib.compressobj(9, 8, -15) + return compressor.compress(s) + compressor.flush() + + class ImporterMixin(testlib.RouterMixin): modname = None @@ -90,7 +94,7 @@ def test_create_module_missing(self): @unittest.skipIf(sys.version_info >= (3, 4), 'Superceded in Python 3.4+') class LoadModuleTest(ImporterMixin, testlib.TestCase): - data = zlib.compress(b("data = 1\n\n")) + data = compress(b("data = 1\n\n")) path = 'fake_module.py' modname = 'fake_module' @@ -121,7 +125,7 @@ def test_module_package_unset(self): @unittest.skipIf(sys.version_info < (3, 4), 'Requires ModuleSpec, Python 3.4+') class ModuleSpecTest(ImporterMixin, testlib.TestCase): - data = zlib.compress(b("data = 1\n\n")) + data = compress(b("data = 1\n\n")) path = 'fake_module.py' modname = 'fake_module' @@ -140,7 +144,7 @@ def test_module_attributes(self): @unittest.skipIf(sys.version_info >= (3, 4), 'Superceded in Python 3.4+') class LoadSubmoduleTest(ImporterMixin, testlib.TestCase): - data = zlib.compress(b("data = 1\n\n")) + data = compress(b("data = 1\n\n")) path = 'fake_module.py' modname = 'mypkg.fake_module' # 0:fullname 1:pkg_present 2:path 3:compressed 4:related @@ -154,7 +158,7 @@ def test_module_package_unset(self): @unittest.skipIf(sys.version_info < (3, 4), 'Requires ModuleSpec, Python 3.4+') class SubmoduleSpecTest(ImporterMixin, testlib.TestCase): - data = zlib.compress(b("data = 1\n\n")) + data = compress(b("data = 1\n\n")) path = 'fake_module.py' modname = 'mypkg.fake_module' # 0:fullname 1:pkg_present 2:path 3:compressed 4:related @@ -172,7 +176,7 @@ def test_module_attributes(self): @unittest.skipIf(sys.version_info >= (3, 4), 'Superceded in Python 3.4+') class LoadModulePackageTest(ImporterMixin, testlib.TestCase): - data = zlib.compress(b("func = lambda: 1\n\n")) + data = compress(b("func = lambda: 1\n\n"), ) path = 'fake_pkg/__init__.py' modname = 'fake_pkg' # 0:fullname 1:pkg_present 2:path 3:compressed 4:related @@ -194,7 +198,8 @@ def test_get_source(self): mod = self.importer.load_module(self.modname) source = mod.__loader__.get_source(self.modname) self.assertEqual(source, - mitogen.core.to_text(zlib.decompress(self.data))) + mitogen.core.to_text(zlib.decompress(self.data, -15)), + ) def test_module_loader_set(self): self.set_get_module_response(self.response) @@ -220,7 +225,7 @@ def test_module_data(self): @unittest.skipIf(sys.version_info < (3, 4), 'Requires ModuleSpec, Python 3.4+') class PackageSpecTest(ImporterMixin, testlib.TestCase): - data = zlib.compress(b("func = lambda: 1\n\n")) + data = compress(b("func = lambda: 1\n\n")) path = 'fake_pkg/__init__.py' modname = 'fake_pkg' # 0:fullname 1:pkg_present 2:path 3:compressed 4:related @@ -250,7 +255,8 @@ def test_get_source(self): _ = self.importer.create_module(spec) source = self.importer.get_source(self.modname) self.assertEqual(source, - mitogen.core.to_text(zlib.decompress(self.data))) + mitogen.core.to_text(zlib.decompress(self.data, -15)), + ) class EmailParseAddrSysTest(testlib.RouterMixin, testlib.TestCase): diff --git a/tests/imports_test.py b/tests/imports_test.py index d150356b4..a31600333 100644 --- a/tests/imports_test.py +++ b/tests/imports_test.py @@ -75,8 +75,6 @@ def test_scoped_class(self): co = testmod_compile('scanning/scoped_class.py') self.assertEqual(list(self.func(co)), []) - pass - def test_scoped_function(self): co = testmod_compile('scanning/scoped_function.py') self.assertEqual(list(self.func(co)), [])