--- a/mp3togo/cluster.py
+++ b/mp3togo/cluster.py
@@ -24,12 +24,15 @@
Run slave processes on multiple machines communicating by
pipes over ssh and sharing data through NFS mounts."""
-import sys
import os
+import sys
+import pty
import time
-import cPickle
import fcntl
import select
+import signal
+import termios
+import cPickle
import mp3togo
import mp3togo.filelist as filelist
@@ -40,13 +43,20 @@ import mp3togo.pool as pool
import mp3togo.cache as cache
-BEGIN = 1
-READY = 2
-INUSE = 3
-EXIT = 4
+BEGIN = 'begin'
+READY = 'ready'
+INUSE = 'inuse'
+EXIT = 'exit'
+
+EOT = '\n*****\n\n'
+EOR = '\n#####\n\n'
+RS = '\n#**\n**#\n'
+
+# Time to sleep between output messages
+INTERVAL = 1.0
-def run_slave(opts):
+def slave(opts):
"""Run as a slave process.
The program was run with --cluster-slave. Establish
@@ -54,17 +64,17 @@ def run_slave(opts):
The message protocol:
- Master --> options --> Slave-\ \
- | |
- /-------<-----------<-------/ |
- | EXIT | BEGIN
- Slave --> ack --> Master-\ ^ |
- | | |
- /-------<-------<--------/ | /
- | | - READY
- |->Master --> [work_unit | quit] --> Slave-| \
- | | |
- | /--------------<---------------<--------/ | INUSE
+ Enter: Master --> options --> Slave-\ \
+ | |
+ /-------<-----------<----------------/ |
+ | EXIT | BEGIN
+ Slave --> ack --> Master-\ ^ |
+ | | |
+ /-------<-------<--------/ | /
+ | | - READY
+ |->Master --> [work_unit | quit] --> Slave--| \
+ | | |
+ | /--------------<---------------<---------/ | INUSE
^ | |
| \ Slave --> % ... % ... % ... Done --> Master-\ /
| |
@@ -75,42 +85,218 @@ def run_slave(opts):
state = BEGIN
sin = sys.stdin
sout = sys.stdout
+ cooked = None
+ trk = None
+ seen_pickle = True
+
fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Want
#fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Don't
- sout.write("foo bar baz output quux\n")
- sout.flush()
- sin.flush()
-
- sout.write("Below now, foo bar baz output quux\n")
- sout.flush()
- sin.flush()
-
- rcuke = ''
- while rcuke[-5:] != '\n***\n':
- rfd, wfd, xfd = select.select([sys.stdin.fileno()], [], [], 40)
- if not rfd:
- sout.write("No input in thirty seconds")
- sys.exit()
- rcuke = sin.read()
- rcuke = rcuke[:-4]
- ropts = {}
- for line in rcuke.split('\n'):
- if '=' in line:
- key, value = line.split('=', 1)
- ropts[key] = value
- sout.flush()
- sin.flush()
-
- sout.write("Real Below now, foo bar baz output quux\n")
- sout.flush()
- sin.flush()
-
- of = file("XXX-Test-Cluster.out", 'w')
- for k, v in ropts.items():
- of.write("%s, %s\n" % (k, v))
- of.close()
+ def initialize(ropts):
+ """Integrate remote options into local opts"""
+ # of = file('/home/sim/temp/2go/cluster/opts-slave', 'w')
+# for key in ropts.keys():
+# if opts.has_key(key):
+# opts[key] = ropts[key]
+# # of.write("%s: %s\n" % (key, opts[key]))
+# else:
+# return None
+ # of.close()
+ st = ropts['pickle']
+ opts._d = cPickle.loads(st)
+
+ opts['verbosity'] = 0
+ of = file('/home/sim/temp/2go/cluster/opts-slave', 'w')
+ for key in opts.keys():
+ of.write("%s: %s\n" % (key, opts[key]))
+ of.close()
+ return "SLAVE: ready"
+
+ def perform(rotps):
+ """Run the work unit and watch progress"""
+ job_start = time.time()
+ if not rotps.has_key('file'):
+ raise conf.ErrorClusterProtocol
+ if opts['usecache']:
+ fc = cache.Cache(opts['usecache'], opts)
+ else:
+ fc = None
+ trk = track.Track(rotps['file'], opts, fc)
+ pl = pool.Pool(opts)
+ if pl.add_track(trk):
+ tsk = trk.getfirst()
+ while tsk:
+ tsk.run()
+ while tsk.status() == task.RUNNING:
+ yield "Progress: %s %s\n" % (tsk.name, str(tsk.output()))
+ tsk.wait()
+ if tsk.status() != task.DONE:
+ name = tsk.name
+ while tsk:
+ tsk.undo()
+ tsk = tsk.prev()
+ yield "FAIL: %s\n" % name
+ raise StopIteration
+ tsk = tsk.next()
+ else:
+ yield "FAIL: Insufficient space\n"
+ raise StopIteration
+ yield "DONE: %f\n" % (time.time() - job_start)
+ raise StopIteration
+
+
+ # The state machine
+ while state != EXIT:
+ if state == INUSE:
+ raise conf.ErrorClusterProtocol
+ # Read from master
+ rdata = ''
+ while rdata[-len(EOT):] != EOT:
+ rfd, wfd, xfd = select.select([sys.stdin.fileno()],
+ [], [], 15)
+ if not rfd:
+ sout.write("No input within fifteen seconds, ")
+ sout.write("can't wait around forever.")
+ state = EXIT
+ break
+ rdata += sin.read()
+ if rdata == "SLAVE EXIT":
+ state = EXIT
+ break
+ sout.flush()
+ sin.flush()
+ rdata = rdata[:-len(EOT)]
+ rdata = rdata.replace('\r', '')
+ ropts = {}
+ for line in rdata.split(RS):
+ if '=' in line:
+ key, value = line.split('=', 1)
+ ropts[key] = value[:-1] # \n
+
+ # Write to master
+ if state == BEGIN:
+ msg = initialize(ropts)
+ if msg:
+ state = READY
+ else:
+ state = EXIT
+ msg = "Did not understand request."
+ msg += EOR
+ sout.write(msg)
+ sout.flush()
+ sin.flush()
+ elif state == READY:
+ state = INUSE
+ gen = perform(ropts)
+ while state == INUSE:
+ try:
+ msg = gen.next()
+ time.sleep(INTERVAL)
+ except StopIteration:
+ state = READY
+ msg = EOR
+ sout.write(msg)
+ sout.flush()
+ sin.flush()
+
+ # EXIT reached - clean up
+
+class Boss:
+ """Manage one slave"""
+
+ def __init__(self, host, files, opts):
+ """Manage a remote host.
+ Files is a list of files to process"""
+ self.host = host
+ self.files = files
+ self.opts = opts
+ self.current = None
+ self.pid, self.fd1 = pty.fork()
+ if self.pid == 0:
+ #os.execvp('ssh', ['ssh', host, 'mp3togo',
+ # '--cluster-slave', 'True'])
+ os.execvp('ssh', ['ssh', host, '/home/sim/temp/2go/cluster/run-mp3togo',
+ '--cluster-slave', 'True'])
+ else:
+ self.fd2 = os.dup(self.fd1)
+ self.rf = os.fdopen(self.fd1, 'r', 0)
+ fcntl.fcntl(self.fd1, fcntl.F_SETFL, os.O_NONBLOCK)
+ self.wf = os.fdopen(self.fd2, 'w', 0)
+ fcntl.fcntl(self.fd2, fcntl.F_SETFL, os.O_NONBLOCK)
+
+ def __del__(self):
+ self.rf.flush()
+ self.wf.flush()
+ self.rf.close()
+ self.wf.close()
+ try:
+ os.waitpid(self.pid, 0)
+ except OSError:
+ pass
+
+ def poll(self):
+ start = 0
+ last_action = ''
+ #for key in self.opts.keys():
+ # self.wf.write('%s=%s\n' % (key, self.opts[key]))
+ optd = self.opts._d.copy()
+ optd['arg_files'] = []
+ st = "pickle=" + cPickle.dumps(optd) + '\n'
+ self.wf.write(st)
+ self.wf.flush()
+ self.wf.write(EOT)
+ self.wf.flush()
+ while 1:
+ a, b, c = select.select([self.fd1, self.fd2], [], [], 60)
+ if a:
+ inp = self.rf.read()
+ self.rf.flush()
+ for line in inp.split('\n'):
+ line = line.replace('\r', '')
+ if EOR.replace('\n', '') == line:
+ if self.current:
+ yield (self.current, 'Finished',
+ float(time.time() - start))
+ try:
+ self.current = self.files.pop(0)
+ except:
+ self.wf.write('SLAVE EXIT')
+ os.waitpid(self.pid, 0)
+ raise StopIteration
+ self.wf.write('file=%s\n' % self.current)
+ self.wf.write(EOT)
+ start = time.time()
+ percent = 0.0
+ break
+ if line.startswith('Progress:'):
+ action = ' '.join(line.split()[1:-1])
+ if not action:
+ action = last_action
+ if action != last_action:
+ percent = 0.0
+ last_action = action
+ try:
+ percent = float(line.split()[-1])
+ except ValueError:
+ pass
+ yield (self.current, action, percent)
+ else:
+ #Timed out
+ self.kill()
+ raise StopIteration
+
+ def kill(self):
+ """Kill the child
+ This kills the ssh connection, it is
+ unable to clean up the remote machine."""
+ os.kill(self.pid, signal.SIGKILL)
+ os.waitpid(self.pid, 0)
+
+
+
+def slavedriver(playlist, opts, cooked=None):
+ """Signature to run from main.py"""
--- a/mp3togo/conf.py
+++ b/mp3togo/conf.py
@@ -62,6 +62,10 @@ class ErrorUnlocked(Error):
class TaskNotReadyError(Error):
pass
+class ErrorClusterProtocol(Error):
+ pass
+
+
class Options(options.Options):
"""Subclass options.Options and fill in application specific
@@ -111,6 +115,8 @@ class Options(options.Options):
'''Create an index file when in wav output mode.'''),
('nonormal', '', 'no-normalize', False, None,
'''Don't normalize the wav file before encoding.'''),
+ ('cluster', '', 'cluster', False, None,
+ '''Manage a cluster of worker machines. Cluster mode distributes tracks between multiple computers connected to a LAN. Provide a comma separated list of IP addresses or hostnames. The master machine must be able to log in to the slaves without a password and the slaves must have shared access to the filesystem. See the website for an example of how to set it up.<li>http://puddle.ca/mp3togo/'''),
('clusterslave', '', 'cluster-slave', False, None,
'''Start a worker process for cluster mode. This option is used by the cluster master to start slave processes. Do not use.''')]
--- a/mp3togo/filelist.py
+++ b/mp3togo/filelist.py
@@ -142,6 +142,16 @@ class FileList:
self._lock.release()
return True
+ def pop(self, index=-1):
+ """Pop off a value atomicly
+ Use either this or the iterator access, not both."""
+ self._lock.acquire()
+ ret = self._list.pop(index)
+ self._lock.release()
+ return ret
+
+
+
def __iter__(self):
if not self._lock.locked():
raise setup.ErrorUnlocked
--- a/mp3togo/main.py
+++ b/mp3togo/main.py
@@ -58,7 +58,7 @@ def main(argv):
# Cluster mode is handled in the cluster module
if opts['clusterslave']:
- cluster.run_slave(opts)
+ cluster.slave(opts)
return 0
# Are we creating a cache?
@@ -129,7 +129,13 @@ def main(argv):
fail('')
# Start work
+ if opts['cluster']:
+ cluster.slavedriver(playlist, opts, cooked)
+ return
+
+ # Nothing fancy
execute_sequential(playlist, opts, cooked)
+ return
def execute_sequential(playlist, opts, cooked=None):
@@ -153,13 +159,16 @@ def execute_sequential(playlist, opts, c
oldflags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
- tryp = conf.try_print
+ if opts['verbosity']:
+ tryp = conf.try_print
+ else:
+ tryp = lambda x: None
for name in playlist:
track_start = time.time()
tryp("(%d/%d) %s: \n" % (playlist.cur_file() + 1, len(playlist), name))
trk = track.Track(name, opts, cooked)
- pl = pool.Pool(opts)
+ pl = pool.Pool(opts) # make a new one for every track? Huh?
if pl.add_track(trk):
tsk = trk.getfirst()
try:
@@ -211,6 +220,7 @@ def execute_sequential(playlist, opts, c
tryp("\r%s: Failed. Cleaning up.\n" % tsk.name)
while tsk:
tryp("Undo: %s\n" % tsk.name)
+ tsk.undo()
tsk = tsk.prev()
except KeyboardInterrupt:
tsk.stop()
--- a/mp3togo/tags.py
+++ b/mp3togo/tags.py
@@ -61,6 +61,7 @@ class Tags(UserDict.DictMixin):
"""
def __init__(self, filename):
+ print filename
if not os.path.exists(filename):
raise setup.ErrorNoFile
self._file = filename