Merge with bugfix-0.5
mp3togo/cluster.py
1 # - cluster.py -
2 # Cluster mode support.
3 #
4 # This file is part of mp3togo
5 #
6 # Convert audio files to play on a mp3 player
7 #
8 # (c) Simeon Veldstra 2006 <reallifesim@gmail.com>
9 #
10 # This software is free.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You may redistribute this program under the terms of the
18 # GNU General Public Licence version 2
19 # Available in this package or at http://www.fsf.org
20
21
22 """Cluster mode.
23
24 Run slave processes on multiple machines communicating by
25 pipes over ssh and sharing data through NFS mounts."""
26
27 import os
28 import sys
29 import pty
30 import time
31 import fcntl
32 import select
33 import signal
34 import termios
35 import cPickle
36
37 import mp3togo.filelist as filelist
38 import mp3togo.track as track
39 import mp3togo.cache as cache
40 import mp3togo.conf as conf
41 import mp3togo.task as task
42 import mp3togo.pool as pool
43 import mp3togo
44
45
46 BEGIN = 'begin'
47 READY = 'ready'
48 INUSE = 'inuse'
49 EXIT = 'exit'
50
51 MAXTRIES = 3
52
53 EOT = '\n*****\n\n'
54 EOR = '\n#####\n\n'
55 RS = '\n#**\n**#\n'
56
57 # Time to sleep between output messages
58 INTERVAL = 1.0
59
60
61 def slave(opts):
62 """Run as a slave process.
63
64 The program was run with --cluster-slave. Establish
65 communication with the master and do protocol.
66
67 The message protocol:
68
69 Enter: Master --> options --> Slave-\ \
70 | |
71 /-------<-----------<----------------/ |
72 | EXIT | BEGIN
73 Slave --> ack --> Master-\ ^ |
74 | | |
75 /-------<-------<--------/ | /
76 | | - READY
77 |->Master --> [work_unit | quit] --> Slave--| \
78 | | |
79 | /--------------<---------------<---------/ | INUSE
80 ^ | |
81 | \ Slave --> % ... % ... % ... Done --> Master-\ /
82 | |
83 \<----------------<---------------<--------------/ - READY
84
85 """
86
87 state = BEGIN
88 sin = sys.stdin
89 sout = sys.stdout
90 cooked = None
91 trk = None
92 seen_pickle = True
93
94
95 fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Want
96
97 def initialize(ropts):
98 """Integrate remote options into local opts"""
99 try:
100 st = ropts['pickle']
101 opts._d = cPickle.loads(st)
102 opts['verbosity'] = 0
103 return "SLAVE: ready"
104 except:
105 return None
106
107 def perform(rotps):
108 """Run the work unit and watch progress"""
109 job_start = time.time()
110 if not rotps.has_key('file'):
111 raise conf.ErrorClusterProtocol
112 if opts['usecache']:
113 fc = cache.Cache(opts['usecache'], opts)
114 else:
115 fc = None
116 trk = track.Track(rotps['file'], opts, cooked=fc)
117 pl = pool.Pool(opts)
118 if pl.add_track(trk):
119 tsk = trk.getfirst()
120 while tsk:
121 tsk.run()
122 while tsk.status() == task.RUNNING:
123 yield "Progress: %s %s\n" % (tsk.name, str(tsk.output()))
124 tsk.wait()
125 if tsk.status() != task.DONE:
126 name = tsk.name
127 while tsk:
128 tsk.undo()
129 tsk = tsk.prev()
130 yield "FAIL: %s\n" % name
131 raise StopIteration
132 tsk = tsk.next()
133 else:
134 yield "FAIL: Insufficient space\n"
135 raise StopIteration
136 yield "DONE: %f\n" % (time.time() - job_start)
137 raise StopIteration
138
139
140 # The state machine
141 while state != EXIT:
142 if state == INUSE:
143 raise conf.ErrorClusterProtocol
144 # Read from master
145 rdata = ''
146 while rdata[-len(EOT):] != EOT:
147 rfd, wfd, xfd = select.select([sys.stdin.fileno()],
148 [], [], 15)
149 if not rfd:
150 sout.write("No input within fifteen seconds, ")
151 sout.write("can't wait around forever.")
152 state = EXIT
153 break
154 rdata += sin.read()
155 if rdata == "SLAVE EXIT":
156 state = EXIT
157 break
158 sout.flush()
159 sin.flush()
160 rdata = rdata[:-len(EOT)]
161 rdata = rdata.replace('\r', '')
162 ropts = {}
163 for line in rdata.split(RS):
164 if '=' in line:
165 key, value = line.split('=', 1)
166 ropts[key] = value[:-1] # \n
167
168 # Write to master
169 if state == BEGIN:
170 msg = initialize(ropts)
171 if msg:
172 state = READY
173 else:
174 state = EXIT
175 msg = "Did not understand request."
176 msg += EOR
177 sout.write(msg)
178 sout.flush()
179 sin.flush()
180 elif state == READY:
181 state = INUSE
182 gen = perform(ropts)
183 while state == INUSE:
184 try:
185 msg = gen.next()
186 time.sleep(INTERVAL)
187 except StopIteration:
188 state = READY
189 msg = EOR
190 sout.write(msg)
191 sout.flush()
192 sin.flush()
193
194 # END REMOTE END
195
196 # START MASTER END
197
198 class Boss:
199 """Manage one slave"""
200
201 def __init__(self, host, files, failures, opts, r_schema=""):
202 """Manage a remote host. use r_schema='ssh %h ' for remote hosts
203 Files is a FileList of files to process the list should be
204 shared with the cooperating bosses"""
205 self.host = host
206 self.files = files
207 self.fails = failures
208 self.opts = opts
209 self.current = None
210 self.pid, self.fd1 = pty.fork()
211 if self.pid == 0:
212 args = ''
213 args = r_schema.replace('%h', host)
214 if args:
215 args += ' '
216 if self.opts.argv:
217 args += self.opts.argv[0] + " --cluster-slave True"
218 else:
219 args += "mp3togo --cluster-slave True"
220 args = args.split()
221 os.execvp(args[0], args)
222 else:
223 self.fd2 = os.dup(self.fd1)
224 self.rf = os.fdopen(self.fd1, 'r', 0)
225 fcntl.fcntl(self.fd1, fcntl.F_SETFL, os.O_NONBLOCK)
226 self.wf = os.fdopen(self.fd2, 'w', 0)
227 fcntl.fcntl(self.fd2, fcntl.F_SETFL, os.O_NONBLOCK)
228
229 def __del__(self):
230 self.rf.flush()
231 self.wf.flush()
232 self.rf.close()
233 self.wf.close()
234 try:
235 os.waitpid(self.pid, 0)
236 except OSError:
237 pass
238
239 def poll(self):
240 start = 0
241 last_action = ''
242 optd = self.opts._d.copy()
243 optd['arg_files'] = []
244 st = "pickle=" + cPickle.dumps(optd) + '\n'
245 self.wf.write(st)
246 self.wf.flush()
247 self.wf.write(EOT)
248 self.wf.flush()
249 errors = 0
250 while 1:
251 a, b, c = select.select([self.fd1, self.fd2], [], [], 60)
252 if a:
253 try:
254 inp = self.rf.read()
255 except IOError:
256 errors += 1
257 if errors > 25:
258 if self.current:
259 self.files.push(self.current, front=True)
260 raise StopIteration
261 yield ('Not Ready', 'Not Ready', 0)
262 # Roll back and try to start again self = Boss(yada, ...)
263 continue
264 self.rf.flush()
265 for line in inp.split('\n'):
266 line = line.replace('\r', '')
267 def next():
268 try:
269 self.current = self.files.pop(0)
270 except:
271 self.wf.write('SLAVE EXIT')
272 os.waitpid(self.pid, 0)
273 raise StopIteration
274 if self.fails.has_key(self.current) and (
275 self.host in self.fails[self.current]
276 or len(self.fails[self.current]) > MAXTRIES):
277 oldcurrent = self.current
278 try:
279 next()
280 finally:
281 self.files.push(oldcurrent, front=True)
282 return
283 self.wf.write('file=%s\n' % self.current)
284 self.wf.write(EOT)
285 self.start = time.time()
286 percent = 0.0
287 if EOR.replace('\n', '') == line:
288 if self.current:
289 yield (self.current, 'Finished',
290 float(time.time() - self.start))
291 next()
292 break
293 elif line.startswith('Progress:'):
294 action = ' '.join(line.split()[1:-1])
295 if not action:
296 action = last_action
297 if action != last_action:
298 percent = 0.0
299 last_action = action
300 try:
301 percent = float(line.split()[-1])
302 except ValueError:
303 pass
304 yield (self.current, action, percent)
305 elif line.startswith('FAIL:'):
306 if "Insufficient space" in line:
307 yield (self.current, 'Insufficient space', 0.0)
308 raise StopIteration
309 else:
310 yield (self.current, 'Subtask Failed', 0.0)
311 if self.fails.has_key(self.host):
312 self.fails[self.current].append(self.host)
313 else:
314 self.fails[self.current] = [self.host]
315 oldcurrent = self.current
316 try:
317 next()
318 finally:
319 self.files.push(oldcurrent, front=True)
320 break
321 else:
322 #Timed out
323 self.kill()
324 raise StopIteration
325
326 def kill(self):
327 """Kill the child
328 This kills the ssh connection, it is
329 unable to clean up the remote machine."""
330 os.kill(self.pid, signal.SIGKILL)
331 os.waitpid(self.pid, 0)
332
333
334
335 def slavedriver(playlist, opts, cooked=None):
336 """A simple text mode cluster controller.
337
338 Function signature for running from main.py"""
339
340 if opts['verbosity']:
341 def p(s):
342 sys.stdout.write(s)
343 else:
344 p = lambda x: None
345 L = opts.log
346
347 p("mp3togo version %s\n" % mp3togo.version)
348 p("Starting Cluster mode\n\n")
349
350 slaves = opts['cluster']
351 slaves = slaves.split(',')
352 master = None
353
354 p("Connecting to cluster nodes:\n")
355 fails = {} # {'badfilename': [host,...]}
356 if opts['clusternolocal']:
357 ring = []
358 else:
359 p(" localhost")
360 master = Boss('localhost', playlist, fails, opts, r_schema='')
361 ring = [('localhost', master.poll(), master)]
362 for slave in slaves:
363 p(" " + slave)
364 theman = Boss(slave, playlist, fails, opts, r_schema="ssh %h ")
365 tr = (slave, theman.poll(), theman)
366 ring.append(tr)
367
368
369 p("\n\n")
370 files = dict(map(lambda x: (x, ''), slaves))
371 if master:
372 files[master.host] = ''
373 while ring:
374 for slave in ring[:]:
375 try:
376 tup = slave[1].next()
377 #print tup
378 except StopIteration:
379 L(1, "Host %s finished.\n" % slave[0])
380 del ring[ring.index(slave)]
381 break
382 if tup[1] == 'Subtask Failed':
383 L(1, "Task failed on host: %s\n %s\n" % (
384 slave[0], tup[0]))
385 if tup[0] != files[slave[0]]:
386 files[slave[0]] = tup[0]
387 L(1, "Host %s at %s starting file: \n %s\n" % (
388 slave[0], time.ctime(), tup[0]))
389 if ring:
390 time.sleep(0.5)
391 print "All workers exited"
392
393 # Test that all files are actually done, if not, loop.
394
395
396
397