diff --git a/mitogen/core.py b/mitogen/core.py index 47b432401..91dd752e0 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -170,7 +170,7 @@ def set_blocking(fd, blocking): PY24 = sys.version_info < (2, 5) PY3 = sys.version_info > (3,) -if PY3: +if sys.version_info >= (3, 0): import pickle import _thread as thread from io import BytesIO @@ -237,6 +237,13 @@ def set_blocking(fd, blocking): #: writing small trailer chunks. CHUNK_SIZE = 131072 +#: Markers sent by bootstrap stages when it's ready to receive the next stage, +#: e.g. compressed :class:`mitogen.core`. +EC0 = b('MITO000') +EC1 = b('MITO001') +EC2 = b('MITO002 %d %d' % sys.version_info[0:2]) +EC2_PATTERN = b(r'MITO002 (\d) (\d\d?)') + _tls = threading.local() @@ -2357,10 +2364,13 @@ class MitogenProtocol(Protocol): #: peer. on_message = None - def __init__(self, router, remote_id, auth_id=None, - local_id=None, parent_ids=None): + def __init__( + self, router, remote_id, auth_id=None, local_id=None, parent_ids=None, + remote_python_version=None, + ): self._router = router self.remote_id = remote_id + self.remote_python_version = remote_python_version #: If not :data:`None`, :class:`Router` stamps this into #: :attr:`Message.auth_id` of every message received on this stream. self.auth_id = auth_id @@ -4130,7 +4140,8 @@ def _setup_master(self): self.router, parent_id, local_id=self.config['context_id'], - parent_ids=self.config['parent_ids'] + parent_ids=self.config['parent_ids'], + remote_python_version=self.config['parent_python_version'], ) for f in in_fp, out_fp: fd = f.fileno() @@ -4304,7 +4315,7 @@ def main(self): _v and LOG.debug('Recovered sys.executable: %r', sys.executable) if self.config.get('send_ec2', True): - self.stream.transmit_side.write(b('MITO002\n')) + self.stream.transmit_side.write(EC2 + b('\n')) self.broker._py24_25_compat() self.log_handler.uncork() self.dispatcher.run() diff --git a/mitogen/fork.py b/mitogen/fork.py index d77ed6f21..ecf51af47 100644 --- a/mitogen/fork.py +++ b/mitogen/fork.py @@ -213,7 +213,7 @@ def _child_main(self, childfp): self.options.on_fork() mitogen.core.set_blocking(childfp.fileno(), True) - childfp.send(b('MITO002\n')) + childfp.send(mitogen.core.EC2 + b('\n')) # Expected by the ExternalContext.main(). os.dup2(childfp.fileno(), 1) diff --git a/mitogen/parent.py b/mitogen/parent.py index 638956b00..2057869cf 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1186,18 +1186,8 @@ def on_unrecognized_partial_line_received(self, line): class BootstrapProtocol(RegexProtocol): """ - Respond to stdout of a child during bootstrap. Wait for :attr:`EC0_MARKER` - to be written by the first stage to indicate it can receive the bootstrap, - then await :attr:`EC1_MARKER` to indicate success, and - :class:`MitogenProtocol` can be enabled. - """ - #: Sentinel value emitted by the first stage to indicate it is ready to - #: receive the compressed bootstrap. For :mod:`mitogen.ssh` this must have - #: length of at least `max(len('password'), len('debug1:'))` - EC0_MARKER = b('MITO000') - EC1_MARKER = b('MITO001') - EC2_MARKER = b('MITO002') - + Respond to readiness markers sent to parent by child doing bootstrap. + """ def __init__(self, broker): super(BootstrapProtocol, self).__init__() self._writer = mitogen.core.BufferedWriter(broker, self) @@ -1213,8 +1203,12 @@ def _on_ec1_received(self, line, match): LOG.debug('%r: first stage received mitogen.core source', self) def _on_ec2_received(self, line, match): - LOG.debug('%r: new child booted successfully', self) - self.stream.conn._complete_connection() + py_major, py_minor = int(match.group(1)), int(match.group(2)) + LOG.debug( + '%r: new child booted successfully on Python %d.%d', + self, py_major, py_minor, + ) + self.stream.conn._complete_connection((py_major, py_minor)) return False def on_unrecognized_line_received(self, line): @@ -1222,9 +1216,9 @@ def on_unrecognized_line_received(self, line): line.decode('utf-8', 'replace')) PATTERNS = [ - (re.compile(EC0_MARKER), _on_ec0_received), - (re.compile(EC1_MARKER), _on_ec1_received), - (re.compile(EC2_MARKER), _on_ec2_received), + (re.compile(mitogen.core.EC0), _on_ec0_received), + (re.compile(mitogen.core.EC1), _on_ec1_received), + (re.compile(mitogen.core.EC2_PATTERN), _on_ec2_received), ] @@ -1510,6 +1504,7 @@ def get_econtext_config(self): 'blacklist': self._router.get_module_blacklist(), 'max_message_size': self.options.max_message_size, 'version': mitogen.__version__, + 'parent_python_version': sys.version_info[0:2], } def get_preamble(self): @@ -1546,7 +1541,7 @@ def _adorn_eof_error(self, e): if self.eof_error_hint: e.args = ('%s\n\n%s' % (e.args[0], self.eof_error_hint),) - def _complete_connection(self): + def _complete_connection(self, remote_python_version): self._timer.cancel() if not self.exception: mitogen.core.unlisten(self._router.broker, 'shutdown', @@ -1556,6 +1551,7 @@ def _complete_connection(self): MitogenProtocol( router=self._router, remote_id=self.context.context_id, + remote_python_version=remote_python_version, ) ) self._router.route_monitor.notice_stream(self.stdio_stream) diff --git a/tests/first_stage_test.py b/tests/first_stage_test.py index 354f7479b..f8c66d7e9 100644 --- a/tests/first_stage_test.py +++ b/tests/first_stage_test.py @@ -312,7 +312,7 @@ def test_valid_syntax(self): stdout, stderr = proc.communicate() self.assertEqual(0, proc.returncode) self.assertEqual(stdout, - mitogen.parent.BootstrapProtocol.EC0_MARKER+b('\n')) + mitogen.core.EC0 + b('\n')) self.assertIn( b("Error -3 while decompressing data"), # Unknown compression method stderr, @@ -356,7 +356,7 @@ def test_premature_eof(self): self.assertEqual(0, returncode) self.assertEqual( proc.stdout.read(), - mitogen.parent.BootstrapProtocol.EC0_MARKER + b("\n"), + mitogen.core.EC0 + b("\n"), ) self.assertIn( b("Error -5 while decompressing data"),