Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions starcluster/plugins/ipcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def _start_engines(node, user, n_engines=None, kill_existing=False):
n_engines = node.num_processors
node.ssh.switch_user(user)
if kill_existing:
node.ssh.execute("pkill -f ipengineapp", ignore_exit_status=True)
node.ssh.execute("pkill -f IPython.parallel.engine", ignore_exit_status=True)
node.ssh.execute("ipcluster engines --n=%i --daemonize" % n_engines)
node.ssh.switch_user('root')

Expand All @@ -91,7 +91,7 @@ class IPCluster(DefaultClusterSetup):

"""
def __init__(self, enable_notebook=False, notebook_passwd=None,
notebook_directory=None, packer=None, log_level='INFO'):
notebook_directory=None, packer=None, master_engines=None, node_engines=None, log_level='INFO'):
super(IPCluster, self).__init__()
if isinstance(enable_notebook, basestring):
self.enable_notebook = enable_notebook.lower().strip() == 'true'
Expand All @@ -100,6 +100,8 @@ def __init__(self, enable_notebook=False, notebook_passwd=None,
self.notebook_passwd = notebook_passwd or utils.generate_passwd(16)
self.notebook_directory = notebook_directory
self.log_level = log_level
self.master_engines = master_engines
self.node_engines = node_engines
if packer not in (None, 'json', 'pickle', 'msgpack'):
log.error("Unsupported packer: %s", packer)
self.packer = None
Expand Down Expand Up @@ -163,7 +165,10 @@ def _write_config(self, master, user, profile_dir):
f.close()

def _start_cluster(self, master, profile_dir):
n_engines = max(1, master.num_processors - 1)
if not self.master_engines:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if self.master_engines == 0 won't this branch be taken? i.e. using 0 or None to turn of the master will not work.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... this works in testing. Let me figure this out.

Oh, actually, at this point in the execution, this is the string '0', which evaluates to true. This is bad and confusing, though, so I will change to check for "None". Thanks.

n_engines = max(1, master.num_processors - 1)
else:
n_engines = int(self.master_engines)
log.info("Starting the IPython controller and %i engines on master"
% n_engines)
# cleanup existing connection files, to prevent their use
Expand Down Expand Up @@ -215,7 +220,7 @@ def _start_cluster(self, master, profile_dir):
self._authorize_port(master, (1000, 65535), "IPython controller")
return local_json, n_engines

def _start_notebook(self, master, user, profile_dir):
def _start_notebook(self, master, user, profile_dir, time_to_dead=30.0):
log.info("Setting up IPython web notebook for user: %s" % user)
user_cert = posixpath.join(profile_dir, '%s.pem' % user)
ssl_cert = posixpath.join(profile_dir, '%s.pem' % user)
Expand All @@ -242,6 +247,7 @@ def _start_notebook(self, master, user, profile_dir):
"c.NotebookApp.open_browser = False",
"c.NotebookApp.password = u'%s'" % sha1pass,
"c.NotebookApp.port = %d" % notebook_port,
"c.NotebookApp.time_to_dead = %d" % time_to_dead,
]))
f.close()
if self.notebook_directory is not None:
Expand Down Expand Up @@ -289,10 +295,15 @@ def run(self, nodes, master, user, user_shell, volumes):
# Start engines on each of the non-master nodes
non_master_nodes = [node for node in nodes if not node.is_master()]
for node in non_master_nodes:
if not self.node_engines:
n_engines = node.num_processors
else:
n_engines = int(self.node_engines)
self.pool.simple_job(
_start_engines, (node, user, node.num_processors),
_start_engines, (node, user, n_engines),
jobid=node.alias)
n_engines_non_master = sum(node.num_processors
n_engines_non_master = 0
n_engines_non_master += sum(node.num_processors if not self.node_engines else int(self.node_engines)
for node in non_master_nodes)
if len(non_master_nodes) > 0:
log.info("Adding %d engines on %d nodes",
Expand All @@ -310,9 +321,12 @@ def run(self, nodes, master, user, user_shell, volumes):

def on_add_node(self, node, nodes, master, user, user_shell, volumes):
self._check_ipython_installed(node)
n_engines = node.num_processors
if not self.node_engines:
n_engines = node.num_processors
else:
n_engines = int(self.node_engines)
log.info("Adding %d engines on %s", n_engines, node.alias)
_start_engines(node, user)
_start_engines(node, user, n_engines)

def on_remove_node(self, node, nodes, master, user, user_shell, volumes):
raise NotImplementedError("on_remove_node method not implemented")
Expand All @@ -332,7 +346,7 @@ def run(self, nodes, master, user, user_shell, volumes):
master.ssh.execute("ipcluster stop", ignore_exit_status=True)
time.sleep(2)
log.info("Stopping IPython controller on %s", master.alias)
master.ssh.execute("pkill -f ipcontrollerapp",
master.ssh.execute("pkill -f IPython.parallel.controller",
ignore_exit_status=True)
master.ssh.execute("pkill -f 'ipython notebook'",
ignore_exit_status=True)
Expand All @@ -344,7 +358,7 @@ def run(self, nodes, master, user, user_shell, volumes):

def _stop_engines(self, node, user):
node.ssh.switch_user(user)
node.ssh.execute("pkill -f ipengineapp", ignore_exit_status=True)
node.ssh.execute("pkill -f IPython.parallel.engine", ignore_exit_status=True)
node.ssh.switch_user('root')

def on_add_node(self, node, nodes, master, user, user_shell, volumes):
Expand All @@ -354,7 +368,7 @@ def on_remove_node(self, node, nodes, master, user, user_shell, volumes):
raise NotImplementedError("on_remove_node method not implemented")


class IPClusterRestartEngines(DefaultClusterSetup):
class IPClusterRestartEngines(IPCluster):
"""Plugin to kill and restart all engines of an IPython cluster

This plugin can be useful to hard-reset the all the engines, for instance
Expand All @@ -364,14 +378,19 @@ class IPClusterRestartEngines(DefaultClusterSetup):
This plugin is meant to be run manually with:

starcluster runplugin plugin_conf_name cluster_name

"""
def run(self, nodes, master, user, user_shell, volumes):
n_total = 0
for node in nodes:
n_engines = node.num_processors
if node.is_master() and n_engines > 2:
n_engines -= 1
if node.is_master() and self.master_engines:
n_engines = int(self.master_engines)
elif self.node_engines:
n_engines = int(self.node_engines)
elif node.is_master():
# and n_engines > 2: # XXX I'm not sure I understand this logic yet.
n_engines = node.num_processors - 1
else:
n_engines = node.num_processors
self.pool.simple_job(
_start_engines, (node, user, n_engines, True),
jobid=node.alias)
Expand Down