2 # Cluster mode support.
4 # This file is part of mp3togo
6 # Convert audio files to play on a mp3 player
8 # (c) Simeon Veldstra 2006 <reallifesim@gmail.com>
10 # This software is free.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You may redistribute this program under the terms of the
18 # GNU General Public Licence version 2
19 # Available in this package or at http://www.fsf.org
24 Run slave processes on multiple machines communicating by
25 pipes over ssh and sharing data through NFS mounts."""
37 import mp3togo.filelist as filelist
38 import mp3togo.track as track
39 import mp3togo.cache as cache
40 import mp3togo.conf as conf
41 import mp3togo.task as task
42 import mp3togo.pool as pool
57 # Time to sleep between output messages
62 """Run as a slave process.
64 The program was run with --cluster-slave. Establish
65 communication with the master and do protocol.
69 Enter: Master --> options --> Slave-\ \
71 /-------<-----------<----------------/ |
73 Slave --> ack --> Master-\ ^ |
75 /-------<-------<--------/ | /
77 |->Master --> [work_unit | quit] --> Slave--| \
79 | /--------------<---------------<---------/ | INUSE
81 | \ Slave --> % ... % ... % ... Done --> Master-\ /
83 \<----------------<---------------<--------------/ - READY
95 fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Want
97 def initialize(ropts):
98 """Integrate remote options into local opts"""
101 opts._d = cPickle.loads(st)
102 opts['verbosity'] = 0
103 return "SLAVE: ready"
108 """Run the work unit and watch progress"""
109 job_start = time.time()
110 if not rotps.has_key('file'):
111 raise conf.ErrorClusterProtocol
113 fc = cache.Cache(opts['usecache'], opts)
116 trk = track.Track(rotps['file'], opts, cooked=fc)
118 if pl.add_track(trk):
122 while tsk.status() == task.RUNNING:
123 yield "Progress: %s %s\n" % (tsk.name, str(tsk.output()))
125 if tsk.status() != task.DONE:
130 yield "FAIL: %s\n" % name
134 yield "FAIL: Insufficient space\n"
136 yield "DONE: %f\n" % (time.time() - job_start)
143 raise conf.ErrorClusterProtocol
146 while rdata[-len(EOT):] != EOT:
147 rfd, wfd, xfd = select.select([sys.stdin.fileno()],
150 sout.write("No input within fifteen seconds, ")
151 sout.write("can't wait around forever.")
155 if rdata == "SLAVE EXIT":
160 rdata = rdata[:-len(EOT)]
161 rdata = rdata.replace('\r', '')
163 for line in rdata.split(RS):
165 key, value = line.split('=', 1)
166 ropts[key] = value[:-1] # \n
170 msg = initialize(ropts)
175 msg = "Did not understand request."
183 while state == INUSE:
187 except StopIteration:
199 """Manage one slave"""
201 def __init__(self, host, files, failures, opts, r_schema=""):
202 """Manage a remote host. use r_schema='ssh %h ' for remote hosts
203 Files is a FileList of files to process the list should be
204 shared with the cooperating bosses"""
207 self.fails = failures
210 self.pid, self.fd1 = pty.fork()
213 args = r_schema.replace('%h', host)
217 args += self.opts.argv[0] + " --cluster-slave True"
219 args += "mp3togo --cluster-slave True"
221 os.execvp(args[0], args)
223 self.fd2 = os.dup(self.fd1)
224 self.rf = os.fdopen(self.fd1, 'r', 0)
225 fcntl.fcntl(self.fd1, fcntl.F_SETFL, os.O_NONBLOCK)
226 self.wf = os.fdopen(self.fd2, 'w', 0)
227 fcntl.fcntl(self.fd2, fcntl.F_SETFL, os.O_NONBLOCK)
235 os.waitpid(self.pid, 0)
242 optd = self.opts._d.copy()
243 optd['arg_files'] = []
244 st = "pickle=" + cPickle.dumps(optd) + '\n'
251 a, b, c = select.select([self.fd1, self.fd2], [], [], 60)
259 self.files.push(self.current, front=True)
261 yield ('Not Ready', 'Not Ready', 0)
262 # Roll back and try to start again self = Boss(yada, ...)
265 for line in inp.split('\n'):
266 line = line.replace('\r', '')
269 self.current = self.files.pop(0)
271 self.wf.write('SLAVE EXIT')
272 os.waitpid(self.pid, 0)
274 if self.fails.has_key(self.current) and (
275 self.host in self.fails[self.current]
276 or len(self.fails[self.current]) > MAXTRIES):
277 oldcurrent = self.current
281 self.files.push(oldcurrent, front=True)
283 self.wf.write('file=%s\n' % self.current)
285 self.start = time.time()
287 if EOR.replace('\n', '') == line:
289 yield (self.current, 'Finished',
290 float(time.time() - self.start))
293 elif line.startswith('Progress:'):
294 action = ' '.join(line.split()[1:-1])
297 if action != last_action:
301 percent = float(line.split()[-1])
304 yield (self.current, action, percent)
305 elif line.startswith('FAIL:'):
306 if "Insufficient space" in line:
307 yield (self.current, 'Insufficient space', 0.0)
310 yield (self.current, 'Subtask Failed', 0.0)
311 if self.fails.has_key(self.host):
312 self.fails[self.current].append(self.host)
314 self.fails[self.current] = [self.host]
315 oldcurrent = self.current
319 self.files.push(oldcurrent, front=True)
328 This kills the ssh connection, it is
329 unable to clean up the remote machine."""
330 os.kill(self.pid, signal.SIGKILL)
331 os.waitpid(self.pid, 0)
335 def slavedriver(playlist, opts, cooked=None):
336 """A simple text mode cluster controller.
338 Function signature for running from main.py"""
340 if opts['verbosity']:
347 p("mp3togo version %s\n" % mp3togo.version)
348 p("Starting Cluster mode\n\n")
350 slaves = opts['cluster']
351 slaves = slaves.split(',')
354 p("Connecting to cluster nodes:\n")
355 fails = {} # {'badfilename': [host,...]}
356 if opts['clusternolocal']:
360 master = Boss('localhost', playlist, fails, opts, r_schema='')
361 ring = [('localhost', master.poll(), master)]
364 theman = Boss(slave, playlist, fails, opts, r_schema="ssh %h ")
365 tr = (slave, theman.poll(), theman)
370 files = dict(map(lambda x: (x, ''), slaves))
372 files[master.host] = ''
374 for slave in ring[:]:
376 tup = slave[1].next()
378 except StopIteration:
379 L(1, "Host %s finished.\n" % slave[0])
380 del ring[ring.index(slave)]
382 if tup[1] == 'Subtask Failed':
383 L(1, "Task failed on host: %s\n %s\n" % (
385 if tup[0] != files[slave[0]]:
386 files[slave[0]] = tup[0]
387 L(1, "Host %s at %s starting file: \n %s\n" % (
388 slave[0], time.ctime(), tup[0]))
391 print "All workers exited"
393 # Test that all files are actually done, if not, loop.