2 # Cluster mode support.
4 # This file is part of mp3togo
6 # Convert audio files to play on a mp3 player
8 # (c) Simeon Veldstra 2006 <reallifesim@gmail.com>
10 # This software is free.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You may redistribute this program under the terms of the
18 # GNU General Public Licence version 2
19 # Available in this package or at http://www.fsf.org
24 Run slave processes on multiple machines communicating by
25 pipes over ssh and sharing data through NFS mounts."""
37 import mp3togo.filelist as filelist
38 import mp3togo.track as track
39 import mp3togo.cache as cache
40 import mp3togo.conf as conf
41 import mp3togo.task as task
42 import mp3togo.pool as pool
57 # Time to sleep between output messages
62 """Run as a slave process.
64 The program was run with --cluster-slave. Establish
65 communication with the master and do protocol.
69 Enter: Master --> options --> Slave-\ \
71 /-------<-----------<----------------/ |
73 Slave --> ack --> Master-\ ^ |
75 /-------<-------<--------/ | /
77 |->Master --> [work_unit | quit] --> Slave--| \
79 | /--------------<---------------<---------/ | INUSE
81 | \ Slave --> % ... % ... % ... Done --> Master-\ /
83 \<----------------<---------------<--------------/ - READY
95 fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Want
96 #fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Don't
98 def initialize(ropts):
99 """Integrate remote options into local opts"""
102 opts._d = cPickle.loads(st)
103 opts['verbosity'] = 0
104 return "SLAVE: ready"
109 """Run the work unit and watch progress"""
110 job_start = time.time()
111 if not rotps.has_key('file'):
112 raise conf.ErrorClusterProtocol
114 fc = cache.Cache(opts['usecache'], opts)
117 trk = track.Track(rotps['file'], opts, cooked=fc)
119 if pl.add_track(trk):
123 while tsk.status() == task.RUNNING:
124 yield "Progress: %s %s\n" % (tsk.name, str(tsk.output()))
126 if tsk.status() != task.DONE:
131 yield "FAIL: %s\n" % name
135 yield "FAIL: Insufficient space\n"
137 yield "DONE: %f\n" % (time.time() - job_start)
144 raise conf.ErrorClusterProtocol
147 while rdata[-len(EOT):] != EOT:
148 rfd, wfd, xfd = select.select([sys.stdin.fileno()],
151 sout.write("No input within fifteen seconds, ")
152 sout.write("can't wait around forever.")
156 if rdata == "SLAVE EXIT":
161 rdata = rdata[:-len(EOT)]
162 rdata = rdata.replace('\r', '')
164 for line in rdata.split(RS):
166 key, value = line.split('=', 1)
167 ropts[key] = value[:-1] # \n
171 msg = initialize(ropts)
176 msg = "Did not understand request."
184 while state == INUSE:
188 except StopIteration:
200 """Manage one slave"""
202 def __init__(self, host, files, failures, opts, r_schema=""):
203 """Manage a remote host. use r_schema='ssh %h ' for remote hosts
204 Files is a FileList of files to process the list should be
205 shared with the cooperating bosses"""
208 self.fails = failures
211 self.pid, self.fd1 = pty.fork()
214 args = r_schema.replace('%h', host)
218 args += self.opts.argv[0] + " --cluster-slave True"
220 args += "mp3togo --cluster-slave True"
222 os.execvp(args[0], args)
224 self.fd2 = os.dup(self.fd1)
225 self.rf = os.fdopen(self.fd1, 'r', 0)
226 fcntl.fcntl(self.fd1, fcntl.F_SETFL, os.O_NONBLOCK)
227 self.wf = os.fdopen(self.fd2, 'w', 0)
228 fcntl.fcntl(self.fd2, fcntl.F_SETFL, os.O_NONBLOCK)
236 os.waitpid(self.pid, 0)
243 #for key in self.opts.keys():
244 # self.wf.write('%s=%s\n' % (key, self.opts[key]))
245 optd = self.opts._d.copy()
246 optd['arg_files'] = []
247 st = "pickle=" + cPickle.dumps(optd) + '\n'
254 a, b, c = select.select([self.fd1, self.fd2], [], [], 60)
262 self.files.push(self.current, front=True)
264 yield ('Not Ready', 'Not Ready', 0)
265 # Roll back and try to start again self = Boss(yada, ...)
268 for line in inp.split('\n'):
269 line = line.replace('\r', '')
272 self.current = self.files.pop(0)
274 self.wf.write('SLAVE EXIT')
275 os.waitpid(self.pid, 0)
277 if self.fails.has_key(self.current) and (
278 self.host in self.fails[self.current]
279 or len(self.fails[self.current]) > MAXTRIES):
280 oldcurrent = self.current
284 self.files.push(oldcurrent, front=True)
286 self.wf.write('file=%s\n' % self.current)
288 self.start = time.time()
290 if EOR.replace('\n', '') == line:
292 yield (self.current, 'Finished',
293 float(time.time() - self.start))
296 elif line.startswith('Progress:'):
297 action = ' '.join(line.split()[1:-1])
300 if action != last_action:
304 percent = float(line.split()[-1])
307 yield (self.current, action, percent)
308 elif line.startswith('FAIL:'):
309 if "Insufficient space" in line:
310 yield (self.current, 'Insufficient space', 0.0)
313 yield (self.current, 'Subtask Failed', 0.0)
314 if self.fails.has_key(self.host):
315 self.fails[self.current].append(self.host)
317 self.fails[self.current] = [self.host]
318 oldcurrent = self.current
322 self.files.push(oldcurrent, front=True)
331 This kills the ssh connection, it is
332 unable to clean up the remote machine."""
333 os.kill(self.pid, signal.SIGKILL)
334 os.waitpid(self.pid, 0)
338 def slavedriver(playlist, opts, cooked=None):
339 """A simple text mode cluster controller.
341 Function signature for running from main.py"""
343 if opts['verbosity']:
350 p("mp3togo version %s\n" % mp3togo.version)
351 p("Starting Cluster mode\n\n")
353 slaves = opts['cluster']
354 slaves = slaves.split(',')
357 p("Connecting to cluster nodes:\n")
358 fails = {} # {'badfilename': [host,...]}
359 if opts['clusternolocal']:
363 master = Boss('localhost', playlist, fails, opts, r_schema='')
364 ring = [('localhost', master.poll(), master)]
367 theman = Boss(slave, playlist, fails, opts, r_schema="ssh %h ")
368 tr = (slave, theman.poll(), theman)
373 files = dict(map(lambda x: (x, ''), slaves))
375 files[master.host] = ''
377 for slave in ring[:]:
379 tup = slave[1].next()
381 except StopIteration:
382 L(1, "Host %s finished.\n" % slave[0])
383 del ring[ring.index(slave)]
385 if tup[1] == 'Subtask Failed':
386 L(1, "Task failed on host: %s\n %s\n" % (
388 if tup[0] != files[slave[0]]:
389 files[slave[0]] = tup[0]
390 L(1, "Host %s at %s starting file: \n %s\n" % (
391 slave[0], time.ctime(), tup[0]))
394 print "All workers exited"
396 # Test that all files are actually done, if not, loop.