2 # Cluster mode support.
4 # This file is part of mp3togo
6 # Convert audio files to play on a mp3 player
8 # (c) Simeon Veldstra 2006 <reallifesim@gmail.com>
10 # This software is free.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You may redistribute this program under the terms of the
18 # GNU General Public Licence version 2
19 # Available in this package or at http://www.fsf.org
24 Run slave processes on multiple machines communicating by
25 pipes over ssh and sharing data through NFS mounts."""
37 import mp3togo.filelist as filelist
38 import mp3togo.track as track
39 import mp3togo.cache as cache
40 import mp3togo.conf as conf
41 import mp3togo.task as task
42 import mp3togo.pool as pool
57 # Time to sleep between output messages
62 """Run as a slave process.
64 The program was run with --cluster-slave. Establish
65 communication with the master and do protocol.
69 Enter: Master --> options --> Slave-\ \
71 /-------<-----------<----------------/ |
73 Slave --> ack --> Master-\ ^ |
75 /-------<-------<--------/ | /
77 |->Master --> [work_unit | quit] --> Slave--| \
79 | /--------------<---------------<---------/ | INUSE
81 | \ Slave --> % ... % ... % ... Done --> Master-\ /
83 \<----------------<---------------<--------------/ - READY
95 fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Want
96 #fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Don't
98 def initialize(ropts):
99 """Integrate remote options into local opts"""
100 # of = file('/home/sim/temp/2go/cluster/opts-slave', 'w')
101 # for key in ropts.keys():
102 # if opts.has_key(key):
103 # opts[key] = ropts[key]
104 # # of.write("%s: %s\n" % (key, opts[key]))
109 opts._d = cPickle.loads(st)
111 opts['verbosity'] = 0
112 of = file('/home/sim/temp/2go/cluster/opts-slave', 'w')
113 for key in opts.keys():
114 of.write("%s: %s\n" % (key, opts[key]))
116 return "SLAVE: ready"
119 """Run the work unit and watch progress"""
120 job_start = time.time()
121 if not rotps.has_key('file'):
122 raise conf.ErrorClusterProtocol
124 fc = cache.Cache(opts['usecache'], opts)
127 trk = track.Track(rotps['file'], opts, fc)
129 if pl.add_track(trk):
133 while tsk.status() == task.RUNNING:
134 yield "Progress: %s %s\n" % (tsk.name, str(tsk.output()))
136 if tsk.status() != task.DONE:
141 yield "FAIL: %s\n" % name
145 yield "FAIL: Insufficient space\n"
147 yield "DONE: %f\n" % (time.time() - job_start)
154 raise conf.ErrorClusterProtocol
157 while rdata[-len(EOT):] != EOT:
158 rfd, wfd, xfd = select.select([sys.stdin.fileno()],
161 sout.write("No input within fifteen seconds, ")
162 sout.write("can't wait around forever.")
166 if rdata == "SLAVE EXIT":
171 rdata = rdata[:-len(EOT)]
172 rdata = rdata.replace('\r', '')
174 for line in rdata.split(RS):
176 key, value = line.split('=', 1)
177 ropts[key] = value[:-1] # \n
181 msg = initialize(ropts)
186 msg = "Did not understand request."
194 while state == INUSE:
198 except StopIteration:
210 """Manage one slave"""
212 def __init__(self, host, files, failures, opts, r_schema=""):
213 """Manage a remote host. use r_schema='ssh %h ' for remote hosts
214 Files is a FileList of files to process the list should be
215 shared with the cooperating bosses"""
218 self.fails = failures
221 self.pid, self.fd1 = pty.fork()
223 args = r_schema.replace('%h', host)
224 args += " mp3togo --cluster-slave True"
225 #args += " test-mp3togo --cluster-slave True"
227 os.execvp(args[0], args)
229 self.fd2 = os.dup(self.fd1)
230 self.rf = os.fdopen(self.fd1, 'r', 0)
231 fcntl.fcntl(self.fd1, fcntl.F_SETFL, os.O_NONBLOCK)
232 self.wf = os.fdopen(self.fd2, 'w', 0)
233 fcntl.fcntl(self.fd2, fcntl.F_SETFL, os.O_NONBLOCK)
241 os.waitpid(self.pid, 0)
248 #for key in self.opts.keys():
249 # self.wf.write('%s=%s\n' % (key, self.opts[key]))
250 optd = self.opts._d.copy()
251 optd['arg_files'] = []
252 st = "pickle=" + cPickle.dumps(optd) + '\n'
259 a, b, c = select.select([self.fd1, self.fd2], [], [], 60)
267 self.files.push(self.current, front=True)
269 yield ('Not Ready', 'Not Ready', 0)
272 for line in inp.split('\n'):
273 line = line.replace('\r', '')
276 self.current = self.files.pop(0)
278 self.wf.write('SLAVE EXIT')
279 os.waitpid(self.pid, 0)
281 if self.fails.has_key(self.current) and (
282 self.host in self.fails[self.current]
283 or len(self.fails[self.current]) > MAXTRIES):
284 oldcurrent = self.current
288 self.files.push(oldcurrent, front=True)
290 self.wf.write('file=%s\n' % self.current)
294 if EOR.replace('\n', '') == line:
296 yield (self.current, 'Finished',
297 float(time.time() - start))
300 elif line.startswith('Progress:'):
301 action = ' '.join(line.split()[1:-1])
304 if action != last_action:
308 percent = float(line.split()[-1])
311 yield (self.current, action, percent)
312 elif line.startswith('FAIL:'):
313 if "Insufficient space" in line:
314 yield (self.current, 'Insufficient space', 0.0)
317 yield (self.current, 'Subtask Failed', 0.0)
318 if self.fails.has_key(self.host):
319 self.fails[self.current].append(self.host)
321 self.fails[self.current] = [self.host]
322 oldcurrent = self.current
326 self.files.push(oldcurrent, front=True)
335 This kills the ssh connection, it is
336 unable to clean up the remote machine."""
337 os.kill(self.pid, signal.SIGKILL)
338 os.waitpid(self.pid, 0)
342 def slavedriver(playlist, opts, cooked=None):
343 """A simple text mode cluster controller.
345 Function signature for running from main.py"""
347 if opts['verbosity']:
354 p("mp3togo version %s\n" % mp3togo.version)
355 p("Starting Cluster mode\n\n")
357 slaves = opts['cluster']
358 slaves = slaves.split(',')
361 p("Connecting to cluster nodes:\n")
362 fails = {} # {'badfilename': [host,...]}
363 if opts['clusternolocal']:
367 master = Boss('localhost', playlist, fails, opts, r_schema='')
368 ring = [('localhost', master.poll(), master)]
371 theman = Boss(slave, playlist, fails, opts, r_schema="ssh %h ")
372 tr = (slave, theman.poll(), theman)
377 files = dict(map(lambda x: (x, ''), slaves))
379 files[master.host] = ''
381 for slave in ring[:]:
383 tup = slave[1].next()
385 except StopIteration:
386 L(1, "Host %s finished.\n" % slave[0])
387 del ring[ring.index(slave)]
389 if tup[1] == 'Subtask Failed':
390 L(1, "Task failed on host: %s\n %s\n" % (
392 if tup[0] != files[slave[0]]:
393 files[slave[0]] = tup[0]
394 L(1, "Host %s at %s starting file: \n %s\n" % (
395 slave[0], time.ctime(), tup[0]))
398 print "All workers exited"
400 # Test that all files are actually done, if not, loop.