2 # Cluster mode support.
4 # This file is part of mp3togo
6 # Convert audio files to play on a mp3 player
8 # (c) Simeon Veldstra 2006 <reallifesim@gmail.com>
10 # This software is free.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You may redistribute this program under the terms of the
18 # GNU General Public Licence version 2
19 # Available in this package or at http://www.fsf.org
24 Run slave processes on multiple machines communicating by
25 pipes over ssh and sharing data through NFS mounts."""
37 import mp3togo.filelist as filelist
38 import mp3togo.track as track
39 import mp3togo.cache as cache
40 import mp3togo.conf as conf
41 import mp3togo.task as task
42 import mp3togo.pool as pool
57 # Time to sleep between output messages
62 """Run as a slave process.
64 The program was run with --cluster-slave. Establish
65 communication with the master and do protocol.
69 Enter: Master --> options --> Slave-\ \
71 /-------<-----------<----------------/ |
73 Slave --> ack --> Master-\ ^ |
75 /-------<-------<--------/ | /
77 |->Master --> [work_unit | quit] --> Slave--| \
79 | /--------------<---------------<---------/ | INUSE
81 | \ Slave --> % ... % ... % ... Done --> Master-\ /
83 \<----------------<---------------<--------------/ - READY
95 fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Want
97 def initialize(ropts):
98 """Integrate remote options into local opts"""
100 opts._d = cPickle.loads(st)
101 opts['verbosity'] = 0
102 return "SLAVE: ready"
105 """Run the work unit and watch progress"""
106 job_start = time.time()
107 if not rotps.has_key('file'):
108 raise conf.ErrorClusterProtocol
110 fc = cache.Cache(opts['usecache'], opts)
113 trk = track.Track(rotps['file'], opts, fc)
115 if pl.add_track(trk):
119 while tsk.status() == task.RUNNING:
120 yield "Progress: %s %s\n" % (tsk.name, str(tsk.output()))
122 if tsk.status() != task.DONE:
127 yield "FAIL: %s\n" % name
131 yield "FAIL: Insufficient space\n"
133 yield "DONE: %f\n" % (time.time() - job_start)
140 raise conf.ErrorClusterProtocol
143 while rdata[-len(EOT):] != EOT:
144 rfd, wfd, xfd = select.select([sys.stdin.fileno()],
147 sout.write("No input within fifteen seconds, ")
148 sout.write("can't wait around forever.")
152 if rdata == "SLAVE EXIT":
157 rdata = rdata[:-len(EOT)]
158 rdata = rdata.replace('\r', '')
160 for line in rdata.split(RS):
162 key, value = line.split('=', 1)
163 ropts[key] = value[:-1] # \n
167 msg = initialize(ropts)
172 msg = "Did not understand request."
180 while state == INUSE:
184 except StopIteration:
196 """Manage one slave"""
198 def __init__(self, host, files, failures, opts, r_schema=""):
199 """Manage a remote host. use r_schema='ssh %h ' for remote hosts
200 Files is a FileList of files to process the list should be
201 shared with the cooperating bosses"""
204 self.fails = failures
207 self.pid, self.fd1 = pty.fork()
209 args = r_schema.replace('%h', host)
210 args += " mp3togo --cluster-slave True"
212 os.execvp(args[0], args)
214 self.fd2 = os.dup(self.fd1)
215 self.rf = os.fdopen(self.fd1, 'r', 0)
216 fcntl.fcntl(self.fd1, fcntl.F_SETFL, os.O_NONBLOCK)
217 self.wf = os.fdopen(self.fd2, 'w', 0)
218 fcntl.fcntl(self.fd2, fcntl.F_SETFL, os.O_NONBLOCK)
226 os.waitpid(self.pid, 0)
233 optd = self.opts._d.copy()
234 optd['arg_files'] = []
235 st = "pickle=" + cPickle.dumps(optd) + '\n'
242 a, b, c = select.select([self.fd1, self.fd2], [], [], 60)
250 self.files.push(self.current, front=True)
252 yield ('Not Ready', 'Not Ready', 0)
255 for line in inp.split('\n'):
256 line = line.replace('\r', '')
259 self.current = self.files.pop(0)
261 self.wf.write('SLAVE EXIT')
262 os.waitpid(self.pid, 0)
264 if self.fails.has_key(self.current) and (
265 self.host in self.fails[self.current]
266 or len(self.fails[self.current]) > MAXTRIES):
267 oldcurrent = self.current
271 self.files.push(oldcurrent, front=True)
273 self.wf.write('file=%s\n' % self.current)
277 if EOR.replace('\n', '') == line:
279 yield (self.current, 'Finished',
280 float(time.time() - start))
283 elif line.startswith('Progress:'):
284 action = ' '.join(line.split()[1:-1])
287 if action != last_action:
291 percent = float(line.split()[-1])
294 yield (self.current, action, percent)
295 elif line.startswith('FAIL:'):
296 if "Insufficient space" in line:
297 yield (self.current, 'Insufficient space', 0.0)
300 yield (self.current, 'Subtask Failed', 0.0)
301 if self.fails.has_key(self.host):
302 self.fails[self.current].append(self.host)
304 self.fails[self.current] = [self.host]
305 oldcurrent = self.current
309 self.files.push(oldcurrent, front=True)
318 This kills the ssh connection, it is
319 unable to clean up the remote machine."""
320 os.kill(self.pid, signal.SIGKILL)
321 os.waitpid(self.pid, 0)
325 def slavedriver(playlist, opts, cooked=None):
326 """A simple text mode cluster controller.
328 Function signature for running from main.py"""
330 if opts['verbosity']:
337 p("mp3togo version %s\n" % mp3togo.version)
338 p("Starting Cluster mode\n\n")
340 slaves = opts['cluster']
341 slaves = slaves.split(',')
344 p("Connecting to cluster nodes:\n")
345 fails = {} # {'badfilename': [host,...]}
346 if opts['clusternolocal']:
350 master = Boss('localhost', playlist, fails, opts, r_schema='')
351 ring = [('localhost', master.poll(), master)]
354 theman = Boss(slave, playlist, fails, opts, r_schema="ssh %h ")
355 tr = (slave, theman.poll(), theman)
360 files = dict(map(lambda x: (x, ''), slaves))
362 files[master.host] = ''
364 for slave in ring[:]:
366 tup = slave[1].next()
368 except StopIteration:
369 L(1, "Host %s finished.\n" % slave[0])
370 del ring[ring.index(slave)]
372 if tup[1] == 'Subtask Failed':
373 L(1, "Task failed on host: %s\n %s\n" % (
375 if tup[0] != files[slave[0]]:
376 files[slave[0]] = tup[0]
377 L(1, "Host %s at %s starting file: \n %s\n" % (
378 slave[0], time.ctime(), tup[0]))
381 print "All workers exited"
383 # Test that all files are actually done, if not, loop.