Two external hosts; turbot and pike, controled by ibeam, syncronize the
processing of files in the correct sequence around a failure.
-- output: --
Connecting to cluster nodes:
pike.internal turbot.internal
Host pike.internal at Mon May 22 23:29:06 2006 starting file:
/home/sim/temp/2go/cluster/tst2/01 - 21st Century Schizoid Man
(including Mirrors).flac
Host turbot.internal at Mon May 22 23:29:07 2006 starting file:
/home/sim/temp/2go/cluster/tst2/07-Daft_Punk-Around_the_World.mp3
Host turbot.internal at Mon May 22 23:30:00 2006 starting file:
/home/sim/temp/2go/cluster/tst2/Cat_Stevens-Greatest_Hits-02-Oh_very_young.ogg
Task failed on host: turbot.internal
/home/sim/temp/2go/cluster/tst2/Cat_Stevens-Greatest_Hits-02-Oh_very_young.ogg
Host turbot.internal at Mon May 22 23:30:04 2006 starting file:
/home/sim/temp/2go/cluster/tst2/Creep.mp3
Host pike.internal at Mon May 22 23:30:09 2006 starting file:
/home/sim/temp/2go/cluster/tst2/Cat_Stevens-Greatest_Hits-02-Oh_very_young.ogg
Host pike.internal at Mon May 22 23:30:33 2006 starting file:
/home/sim/temp/2go/cluster/tst2/Nina_Simone-The_Blues-02-Day_And_Night.ogg
Host turbot.internal at Mon May 22 23:30:35 2006 starting file:
/home/sim/temp/2go/cluster/tst2/testflac.flac
Host turbot.internal finished.
Host pike.internal finished.
All workers exited
-- end --
When I threw ibeam in as a worker I got a traceback.
--- a/mp3togo/cluster.py
+++ b/mp3togo/cluster.py
@@ -34,13 +34,13 @@ import signal
import termios
import cPickle
-import mp3togo
import mp3togo.filelist as filelist
-import mp3togo.conf as conf
import mp3togo.track as track
+import mp3togo.cache as cache
+import mp3togo.conf as conf
import mp3togo.task as task
import mp3togo.pool as pool
-import mp3togo.cache as cache
+import mp3togo
BEGIN = 'begin'
@@ -48,6 +48,8 @@ READY = 'ready'
INUSE = 'inuse'
EXIT = 'exit'
+MAXTRIES = 3
+
EOT = '\n*****\n\n'
EOR = '\n#####\n\n'
RS = '\n#**\n**#\n'
@@ -200,24 +202,29 @@ def slave(opts):
sout.flush()
sin.flush()
- # EXIT reached - clean up
+ # END REMOTE END
+
+ # START MASTER END
class Boss:
"""Manage one slave"""
- def __init__(self, host, files, opts):
+ def __init__(self, host, files, failures, opts, r_schema="ssh %h "):
"""Manage a remote host.
- Files is a list of files to process"""
+ Files is a FileList of files to process the list should be
+ shared with the cooperating bosses"""
self.host = host
self.files = files
+ self.fails = failures
self.opts = opts
self.current = None
self.pid, self.fd1 = pty.fork()
if self.pid == 0:
- #os.execvp('ssh', ['ssh', host, 'mp3togo',
- # '--cluster-slave', 'True'])
- os.execvp('ssh', ['ssh', host, '/home/sim/temp/2go/cluster/run-mp3togo',
- '--cluster-slave', 'True'])
+ args = r_schema.replace('%h', host)
+ #args += " mp3togo --cluster-slave True"
+ args += " python /home/sim/temp/2go/cluster/run-mp3togo --cluster-slave True"
+ args = args.split()
+ os.execvp(args[0], args)
else:
self.fd2 = os.dup(self.fd1)
self.rf = os.fdopen(self.fd1, 'r', 0)
@@ -254,22 +261,35 @@ class Boss:
self.rf.flush()
for line in inp.split('\n'):
line = line.replace('\r', '')
- if EOR.replace('\n', '') == line:
- if self.current:
- yield (self.current, 'Finished',
- float(time.time() - start))
+ def next():
try:
self.current = self.files.pop(0)
except:
self.wf.write('SLAVE EXIT')
os.waitpid(self.pid, 0)
raise StopIteration
+ if self.fails.has_key(self.current) and (
+ self.host in self.fails[self.current]
+ or len(self.fails[self.current]) > MAXTRIES):
+ oldcurrent = self.current
+ try:
+ print "recurse"
+ next()
+ print "return"
+ finally:
+ self.files.push(oldcurrent, front=True)
+ return
self.wf.write('file=%s\n' % self.current)
self.wf.write(EOT)
start = time.time()
percent = 0.0
+ if EOR.replace('\n', '') == line:
+ if self.current:
+ yield (self.current, 'Finished',
+ float(time.time() - start))
+ next()
break
- if line.startswith('Progress:'):
+ elif line.startswith('Progress:'):
action = ' '.join(line.split()[1:-1])
if not action:
action = last_action
@@ -281,6 +301,22 @@ class Boss:
except ValueError:
pass
yield (self.current, action, percent)
+ elif line.startswith('FAIL:'):
+ if "Insufficient space" in line:
+ yield (self.current, 'Insufficient space', 0.0)
+ raise StopIteration
+ else:
+ yield (self.current, 'Subtask Failed', 0.0)
+ if self.fails.has_key(self.host):
+ self.fails[self.current].append(self.host)
+ else:
+ self.fails[self.current] = [self.host]
+ oldcurrent = self.current
+ try:
+ next()
+ finally:
+ self.files.push(oldcurrent, front=True)
+ break
else:
#Timed out
self.kill()
@@ -297,6 +333,59 @@ class Boss:
def slavedriver(playlist, opts, cooked=None):
"""Signature to run from main.py"""
+
+ if opts['verbosity']:
+ def p(s):
+ sys.stdout.write(s)
+ else:
+ p = lambda x: None
+ L = opts.log
+
+ p("mp3togo version %s\n" % mp3togo.version)
+ p("Starting Cluster mode\n\n")
+
+ slaves = opts['cluster']
+ slaves = slaves.split(',')
+
+ p("Connecting to cluster nodes:\n")
+ fails = {} # {'badfilename': [host,...]}
+ if opts['clusternolocal']:
+ ring = []
+ else:
+ p(" localhost")
+ master = Boss('localhost', playlist, fails, opts, r_schema='')
+ ring = [('localhost', master.poll(), master)]
+ for slave in slaves:
+ p(" " + slave)
+ theman = Boss(slave, playlist, fails, opts)
+ tr = (slave, theman.poll(), theman)
+ ring.append(tr)
+
+
+ p("\n\n")
+ files = dict(map(lambda x: (x, ''), slaves))
+ while ring:
+ for slave in ring[:]:
+ try:
+ tup = slave[1].next()
+ #print tup
+ except StopIteration:
+ L(1, "Host %s finished.\n" % slave[0])
+ del ring[ring.index(slave)]
+ break
+ if tup[1] == 'Subtask Failed':
+ L(1, "Task failed on host: %s\n %s\n" % (
+ slave[0], tup[0]))
+ if tup[0] != files[slave[0]]:
+ files[slave[0]] = tup[0]
+ L(1, "Host %s at %s starting file: \n %s\n" % (
+ slave[0], time.ctime(), tup[0]))
+ if ring:
+ time.sleep(0.5)
+ print "All workers exited"
+
+# Test that all files are actually done, if not, loop.
+
--- a/mp3togo/conf.py
+++ b/mp3togo/conf.py
@@ -76,7 +76,7 @@ class Options(options.Options):
# Options to add:
# ((name, short option, long option, default value, hook, help text), ...)
self._set += [
- ('brwarning', 't', 'file-tag', '-PORTABLE-', None,
+ ('brwarning', 't', 'file-tag', '.2go', None,
'''A string appended to the name of the file to indicate it has been recoded.'''),
('tempdir', 'w', 'work-dir', '/tmp', self._absfile,
'''The path to store temporary files. This could need several hundred megabytes free.'''),
@@ -115,8 +115,10 @@ class Options(options.Options):
'''Create an index file when in wav output mode.'''),
('nonormal', '', 'no-normalize', False, None,
'''Don't normalize the wav file before encoding.'''),
- ('cluster', '', 'cluster', False, None,
+ ('cluster', '', 'cluster', '', None,
'''Manage a cluster of worker machines. Cluster mode distributes tracks between multiple computers connected to a LAN. Provide a comma separated list of IP addresses or hostnames. The master machine must be able to log in to the slaves without a password and the slaves must have shared access to the filesystem. See the website for an example of how to set it up.<li>http://puddle.ca/mp3togo/'''),
+ ('clusternolocal', '', 'cluster-no-local-node', False, None,
+ '''Only use the remote nodes for processing files.'''),
('clusterslave', '', 'cluster-slave', False, None,
'''Start a worker process for cluster mode. This option is used by the cluster master to start slave processes. Do not use.''')]
--- a/mp3togo/filelist.py
+++ b/mp3togo/filelist.py
@@ -35,6 +35,7 @@ class FileList:
self._list = []
self._i = 0
self._lock = threading.Lock()
+ self._poplock = threading.Lock()
self._running = False
def run(self):
@@ -145,12 +146,22 @@ class FileList:
def pop(self, index=-1):
"""Pop off a value atomicly
Use either this or the iterator access, not both."""
- self._lock.acquire()
- ret = self._list.pop(index)
- self._lock.release()
+ self._poplock.acquire()
+ try:
+ ret = self._list.pop(index)
+ finally:
+ self._poplock.release()
return ret
-
+ def push(self, value, front=False):
+ """Put a file back in the list
+ for another cluster node to try maybe."""
+ self._poplock.acquire()
+ if front:
+ self._list.insert(0, value)
+ else:
+ self._list.append(value)
+ self._poplock.release()
def __iter__(self):
if not self._lock.locked():
--- a/mp3togo/main.py
+++ b/mp3togo/main.py
@@ -259,7 +259,7 @@ def execute_sequential(playlist, opts, c
bad_str = ", %d tracks failed." % bad_ones
else:
bad_str = "."
- #bytes = format_bytes(pl.produced_bytes)
+ #bytes = format_bytes(pl.produced_bytes) # you are using a new pool each time!
tryp("\n%d tracks done%s %s total time elapsed.\n" %
(good_ones, bad_str, ts))
--- a/mp3togo/task.py
+++ b/mp3togo/task.py
@@ -230,6 +230,9 @@ class Task(SimpleTask):
if self._paused.locked():
self._paused.release()
if os.WIFSIGNALED(status):
+ #if os.WSTOPSIG(status) == 0:
+ # self._status = DONE
+ #else:
self._status = FAILED
elif os.WEXITSTATUS(status) == 0:
self._status = DONE
@@ -263,6 +266,7 @@ class Task(SimpleTask):
try:
wpid, status = os.waitpid(pid, 0)
if os.WIFSIGNALED(status):
+ print "Signaled: ", os.WTERMSIG(status)
self._status = FAILED
elif os.WEXITSTATUS(status) == 0:
self._status = DONE