Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 0 additions & 39 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@ jobs:
fail-fast: false
matrix:
include:
- tox_env: py27-m_ans-ans2.10
- tox_env: py27-m_ans-ans4

- tox_env: py36-m_ans-ans2.10
- tox_env: py36-m_ans-ans4

- tox_env: py27-m_mtg
- tox_env: py36-m_mtg

Expand Down Expand Up @@ -86,34 +80,6 @@ jobs:
fail-fast: false
matrix:
include:
- tox_env: py311-m_ans-ans2.10
python_version: '3.11'
- tox_env: py311-m_ans-ans3
python_version: '3.11'
- tox_env: py311-m_ans-ans4
python_version: '3.11'
- tox_env: py311-m_ans-ans5
python_version: '3.11'
- tox_env: py313-m_ans-ans6
python_version: '3.13'
- tox_env: py313-m_ans-ans7
python_version: '3.13'
- tox_env: py313-m_ans-ans8
python_version: '3.13'
- tox_env: py314-m_ans-ans9
python_version: '3.14'
- tox_env: py314-m_ans-ans10
python_version: '3.14'
- tox_env: py314-m_ans-ans11
python_version: '3.14'
- tox_env: py314-m_ans-ans12
python_version: '3.14'
- tox_env: py314-m_ans-ans13
python_version: '3.14'

- tox_env: py314-m_ans-ans13-s_lin
python_version: '3.14'

- tox_env: py314-m_mtg
python_version: '3.14'

Expand Down Expand Up @@ -161,11 +127,6 @@ jobs:
fail-fast: false
matrix:
include:
- tox_env: py314-m_lcl-ans13
python_version: '3.14'
- tox_env: py314-m_lcl-ans13-s_lin
python_version: '3.14'

- tox_env: py314-m_mtg
python_version: '3.14'

Expand Down
30 changes: 16 additions & 14 deletions mitogen/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ def threading__thread_name(thread): return thread.getName()
if sys.version_info >= (2, 5):
all, any = all, any
BaseException = BaseException
def _update_linecache(path, data): pass
else:
import linecache
BaseException = Exception
Expand All @@ -152,6 +151,7 @@ def _update_linecache(path, data):
"""
if 'mitogen' not in path:
return
data = zlib.decompress(data, -15)
linecache.cache[path] = (len(data), 0.0, data.splitlines(True), path)

def all(it):
Expand Down Expand Up @@ -1348,13 +1348,14 @@ def __init__(self, router, context, core_src, whitelist=(), blacklist=()):
# Presence of an entry in this map indicates in-flight GET_MODULE.
self._callbacks = {}
self._cache = {}
if core_src:
if sys.version_info < (2, 5) and core_src:
_update_linecache('x/mitogen/core.py', core_src)
if core_src:
self._cache['mitogen.core'] = (
'mitogen.core',
None,
'x/mitogen/core.py',
zlib.compress(core_src, 9),
core_src,
[],
)
self._install_handler(router)
Expand Down Expand Up @@ -1562,10 +1563,7 @@ def _on_load_module(self, msg):
try:
self._cache[fullname] = tup
if sys.version_info < (2, 5) and tup[2] is not None:
_update_linecache(
path='master:' + tup[2],
data=zlib.decompress(tup[3])
)
_update_linecache('master:' + tup[2], tup[3])
callbacks = self._callbacks.pop(fullname, [])
finally:
self._lock.release()
Expand Down Expand Up @@ -1721,7 +1719,7 @@ def get_source(self, fullname):
if compressed is None:
raise ModuleNotFoundError(self.absent_msg % (fullname,))

source = zlib.decompress(self._cache[fullname][3])
source = zlib.decompress(self._cache[fullname][3], -15)
if sys.version_info >= (3, 0):
return to_text(source)
return source
Expand Down Expand Up @@ -4135,16 +4133,19 @@ def _setup_importer(self):
importer._install_handler(self.router)
importer._context = self.parent
else:
core_src_fd = self.config.get('core_src_fd', 101)
if core_src_fd:
fp = os.fdopen(core_src_fd, 'rb', 0)
preamble_fd = self.config.get('preamble_fd', 101)
if preamble_fd:
fp = os.fdopen(preamble_fd, 'rb', 0)
try:
core_src = fp.read()
# Strip "ExternalContext.main()" call from last line.
core_src = b('\n').join(core_src.splitlines()[:-1])
preamble = fp.read()
finally:
fp.close()
# Remove compressed "ExternalContext.main(...)" suffix.
stage2_prefix = preamble[:self.config['core_src_size']]
# Close deflate stream, append empty block with BFINAL bit set
core_src = stage2_prefix + b('\x03\x00')
else:
stage2_prefix = None
core_src = None

importer = Importer(
Expand All @@ -4154,6 +4155,7 @@ def _setup_importer(self):
self.config.get('whitelist', ()),
self.config.get('blacklist', ()),
)
self.router._stage2_prefix = stage2_prefix

self.importer = importer
self.router.importer = importer
Expand Down
2 changes: 1 addition & 1 deletion mitogen/fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def _wrap_child_main(self, childfp):

def get_econtext_config(self):
config = super(Connection, self).get_econtext_config()
config['core_src_fd'] = None
config['preamble_fd'] = None
config['importer'] = self.options.importer
config['send_ec2'] = False
config['setup_package'] = False
Expand Down
23 changes: 8 additions & 15 deletions mitogen/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,19 +234,6 @@ def _py_filename(path):
return None, False


def _get_core_source():
"""
Master version of parent.get_core_source().
"""
source = inspect.getsource(mitogen.core)
return mitogen.minify.minimize_source(source)


if mitogen.is_master:
# TODO: find a less surprising way of installing this.
mitogen.parent._get_core_source = _get_core_source


class ThreadWatcher(object):
"""
Manage threads that wait for another thread to shut down, before invoking
Expand Down Expand Up @@ -1040,7 +1027,7 @@ def __init__(self, router):
self._log = logging.getLogger('mitogen.responder')
self._router = router
self._finder = ModuleFinder()
self._cache = {} # fullname -> pickled
self._cache = {}
self.blacklist = []
self.whitelist = ['']

Expand Down Expand Up @@ -1152,7 +1139,9 @@ def _build_tuple(self, fullname):

if fullname == '__main__':
source = self.neutralize_main(path, source)
compressed = mitogen.core.Blob(zlib.compress(source, 9))
compressor = zlib.compressobj(9, 8, -15)
compressed = compressor.compress(source) + compressor.flush()
compressed = mitogen.core.Blob(compressed)
related = [
to_text(name)
for name in self._finder.find_related(fullname)
Expand Down Expand Up @@ -1411,6 +1400,10 @@ def __init__(self, broker=None, max_message_size=None):
self.upgrade()

def upgrade(self):
compressor = zlib.compressobj(9, zlib.DEFLATED, -15)
core_src = inspect.getsource(mitogen.core)
core_src = mitogen.minify.minimize_source(core_src)
self._stage2_prefix = compressor.compress(core_src.encode('utf-8')) + compressor.flush(zlib.Z_FULL_FLUSH)
self.id_allocator = IdAllocator(self)
self.responder = ModuleResponder(self)
self.resource_responder = ResourceResponder(self)
Expand Down
89 changes: 11 additions & 78 deletions mitogen/parent.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
select = __import__('select')

import mitogen.core
from mitogen.core import BytesIO
from mitogen.core import b
from mitogen.core import bytes_partition
from mitogen.core import IOLOG
Expand Down Expand Up @@ -146,9 +147,6 @@ def _ioctl_cast(n):
if name.startswith('SIG') and not name.startswith('SIG_')
)

_core_source_lock = threading.Lock()
_core_source_partial = None


def get_log_level():
return (LOG.getEffectiveLevel() or logging.INFO)
Expand All @@ -170,35 +168,6 @@ def get_sys_executable():
return '/usr/bin/python'


def _get_core_source():
"""
In non-masters, simply fetch the cached mitogen.core source code via the
import mechanism. In masters, this function is replaced with a version that
performs minification directly.
"""
return inspect.getsource(mitogen.core)


def get_core_source_partial():
"""
_get_core_source() is expensive, even with @lru_cache in minify.py, threads
can enter it simultaneously causing severe slowdowns.
"""
global _core_source_partial

if _core_source_partial is None:
_core_source_lock.acquire()
try:
if _core_source_partial is None:
_core_source_partial = PartialZlib(
_get_core_source().encode('utf-8')
)
finally:
_core_source_lock.release()

return _core_source_partial


def get_default_remote_name():
"""
Return the default name appearing in argv[0] of remote machines.
Expand Down Expand Up @@ -680,43 +649,6 @@ def expire(self):
timer.func()


class PartialZlib(object):
"""
Because the mitogen.core source has a line appended to it during bootstrap,
it must be recompressed for each connection. This is not a problem for a
small number of connections, but it amounts to 30 seconds CPU time by the
time 500 targets are in use.

For that reason, build a compressor containing mitogen.core and flush as
much of it as possible into an initial buffer. Then to append the custom
line, clone the compressor and compress just that line.

A full compression costs ~6ms on a modern machine, this method costs ~35
usec.
"""
def __init__(self, s):
self.s = s
if sys.version_info > (2, 5):
self._compressor = zlib.compressobj(9)
self._out = self._compressor.compress(s)
self._out += self._compressor.flush(zlib.Z_SYNC_FLUSH)
else:
self._compressor = None

def append(self, s):
"""
Append the bytestring `s` to the compressor state and return the
final compressed output.
"""
if self._compressor is None:
return zlib.compress(self.s + s, 9)
else:
compressor = self._compressor.copy()
out = self._out
out += compressor.compress(s)
return out + compressor.flush()


def _upgrade_broker(broker):
"""
Extract the poller state from Broker and replace it with the industrial
Expand Down Expand Up @@ -1446,10 +1378,9 @@ def _first_stage():
V='V'
# Stop looping if no more data is needed or EOF is detected (empty bytes).
while n-len(C) and V:select.select([0],[],[]);V=os.read(0,n-len(C));C+=V
# Raises `zlib.error` if compressed preamble is truncated or invalid
C=zlib.decompress(C)
f=os.fdopen(W,'wb',0)
f.write(C)
# Raises `zlib.error` if compressed preamble is truncated or invalid
f.write(zlib.decompress(C,-15))
f.close()
f=os.fdopen(w,'wb',0)
f.write(C)
Expand Down Expand Up @@ -1510,15 +1441,17 @@ def get_econtext_config(self):
'blacklist': self._router.get_module_blacklist(),
'max_message_size': self.options.max_message_size,
'version': mitogen.__version__,
'core_src_size': len(self._router._stage2_prefix),
}

def get_preamble(self):
suffix = (
'\nExternalContext(%r).main()\n' %
(self.get_econtext_config(),)
)
partial = get_core_source_partial()
return partial.append(suffix.encode('utf-8'))
suffix = u'\nExternalContext(%r).main()\n' % self.get_econtext_config()
compressor = zlib.compressobj(9, zlib.DEFLATED, -15)
f = BytesIO()
f.write(self._router._stage2_prefix)
f.write(compressor.compress(suffix.encode('utf-8')))
f.write(compressor.flush())
return f.getvalue()

def _get_name(self):
"""
Expand Down
Loading
Loading