Merge with bugfix-0.5
mp3togo/cluster.py
1 # - cluster.py -
2 # Cluster mode support.
3 #
4 # This file is part of mp3togo
5 #
6 # Convert audio files to play on a mp3 player
7 #
8 # (c) Simeon Veldstra 2006 <reallifesim@gmail.com>
9 #
10 # This software is free.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You may redistribute this program under the terms of the
18 # GNU General Public Licence version 2
19 # Available in this package or at http://www.fsf.org
20
21
22 """Cluster mode.
23
24 Run slave processes on multiple machines communicating by
25 pipes over ssh and sharing data through NFS mounts."""
26
27 import os
28 import sys
29 import pty
30 import time
31 import fcntl
32 import select
33 import signal
34 import termios
35 import cPickle
36
37 import mp3togo.filelist as filelist
38 import mp3togo.track as track
39 import mp3togo.cache as cache
40 import mp3togo.conf as conf
41 import mp3togo.task as task
42 import mp3togo.pool as pool
43 import mp3togo
44
45
46 BEGIN = 'begin'
47 READY = 'ready'
48 INUSE = 'inuse'
49 EXIT = 'exit'
50
51 MAXTRIES = 3
52
53 EOT = '\n*****\n\n'
54 EOR = '\n#####\n\n'
55 RS = '\n#**\n**#\n'
56
57 # Time to sleep between output messages
58 INTERVAL = 1.0
59
60
61 def slave(opts):
62 """Run as a slave process.
63
64 The program was run with --cluster-slave. Establish
65 communication with the master and do protocol.
66
67 The message protocol:
68
69 Enter: Master --> options --> Slave-\ \
70 | |
71 /-------<-----------<----------------/ |
72 | EXIT | BEGIN
73 Slave --> ack --> Master-\ ^ |
74 | | |
75 /-------<-------<--------/ | /
76 | | - READY
77 |->Master --> [work_unit | quit] --> Slave--| \
78 | | |
79 | /--------------<---------------<---------/ | INUSE
80 ^ | |
81 | \ Slave --> % ... % ... % ... Done --> Master-\ /
82 | |
83 \<----------------<---------------<--------------/ - READY
84
85 """
86
87 state = BEGIN
88 sin = sys.stdin
89 sout = sys.stdout
90 cooked = None
91 trk = None
92 seen_pickle = True
93
94
95 fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Want
96 #fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Don't
97
98 def initialize(ropts):
99 """Integrate remote options into local opts"""
100 try:
101 st = ropts['pickle']
102 opts._d = cPickle.loads(st)
103 opts['verbosity'] = 0
104 return "SLAVE: ready"
105 except:
106 return None
107
108 def perform(rotps):
109 """Run the work unit and watch progress"""
110 job_start = time.time()
111 if not rotps.has_key('file'):
112 raise conf.ErrorClusterProtocol
113 if opts['usecache']:
114 fc = cache.Cache(opts['usecache'], opts)
115 else:
116 fc = None
117 trk = track.Track(rotps['file'], opts, cooked=fc)
118 pl = pool.Pool(opts)
119 if pl.add_track(trk):
120 tsk = trk.getfirst()
121 while tsk:
122 tsk.run()
123 while tsk.status() == task.RUNNING:
124 yield "Progress: %s %s\n" % (tsk.name, str(tsk.output()))
125 tsk.wait()
126 if tsk.status() != task.DONE:
127 name = tsk.name
128 while tsk:
129 tsk.undo()
130 tsk = tsk.prev()
131 yield "FAIL: %s\n" % name
132 raise StopIteration
133 tsk = tsk.next()
134 else:
135 yield "FAIL: Insufficient space\n"
136 raise StopIteration
137 yield "DONE: %f\n" % (time.time() - job_start)
138 raise StopIteration
139
140
141 # The state machine
142 while state != EXIT:
143 if state == INUSE:
144 raise conf.ErrorClusterProtocol
145 # Read from master
146 rdata = ''
147 while rdata[-len(EOT):] != EOT:
148 rfd, wfd, xfd = select.select([sys.stdin.fileno()],
149 [], [], 15)
150 if not rfd:
151 sout.write("No input within fifteen seconds, ")
152 sout.write("can't wait around forever.")
153 state = EXIT
154 break
155 rdata += sin.read()
156 if rdata == "SLAVE EXIT":
157 state = EXIT
158 break
159 sout.flush()
160 sin.flush()
161 rdata = rdata[:-len(EOT)]
162 rdata = rdata.replace('\r', '')
163 ropts = {}
164 for line in rdata.split(RS):
165 if '=' in line:
166 key, value = line.split('=', 1)
167 ropts[key] = value[:-1] # \n
168
169 # Write to master
170 if state == BEGIN:
171 msg = initialize(ropts)
172 if msg:
173 state = READY
174 else:
175 state = EXIT
176 msg = "Did not understand request."
177 msg += EOR
178 sout.write(msg)
179 sout.flush()
180 sin.flush()
181 elif state == READY:
182 state = INUSE
183 gen = perform(ropts)
184 while state == INUSE:
185 try:
186 msg = gen.next()
187 time.sleep(INTERVAL)
188 except StopIteration:
189 state = READY
190 msg = EOR
191 sout.write(msg)
192 sout.flush()
193 sin.flush()
194
195 # END REMOTE END
196
197 # START MASTER END
198
199 class Boss:
200 """Manage one slave"""
201
202 def __init__(self, host, files, failures, opts, r_schema=""):
203 """Manage a remote host. use r_schema='ssh %h ' for remote hosts
204 Files is a FileList of files to process the list should be
205 shared with the cooperating bosses"""
206 self.host = host
207 self.files = files
208 self.fails = failures
209 self.opts = opts
210 self.current = None
211 self.pid, self.fd1 = pty.fork()
212 if self.pid == 0:
213 args = ''
214 args = r_schema.replace('%h', host)
215 if args:
216 args += ' '
217 if self.opts.argv:
218 args += self.opts.argv[0] + " --cluster-slave True"
219 else:
220 args += "mp3togo --cluster-slave True"
221 args = args.split()
222 os.execvp(args[0], args)
223 else:
224 self.fd2 = os.dup(self.fd1)
225 self.rf = os.fdopen(self.fd1, 'r', 0)
226 fcntl.fcntl(self.fd1, fcntl.F_SETFL, os.O_NONBLOCK)
227 self.wf = os.fdopen(self.fd2, 'w', 0)
228 fcntl.fcntl(self.fd2, fcntl.F_SETFL, os.O_NONBLOCK)
229
230 def __del__(self):
231 self.rf.flush()
232 self.wf.flush()
233 self.rf.close()
234 self.wf.close()
235 try:
236 os.waitpid(self.pid, 0)
237 except OSError:
238 pass
239
240 def poll(self):
241 start = 0
242 last_action = ''
243 #for key in self.opts.keys():
244 # self.wf.write('%s=%s\n' % (key, self.opts[key]))
245 optd = self.opts._d.copy()
246 optd['arg_files'] = []
247 st = "pickle=" + cPickle.dumps(optd) + '\n'
248 self.wf.write(st)
249 self.wf.flush()
250 self.wf.write(EOT)
251 self.wf.flush()
252 errors = 0
253 while 1:
254 a, b, c = select.select([self.fd1, self.fd2], [], [], 60)
255 if a:
256 try:
257 inp = self.rf.read()
258 except IOError:
259 errors += 1
260 if errors > 25:
261 if self.current:
262 self.files.push(self.current, front=True)
263 raise StopIteration
264 yield ('Not Ready', 'Not Ready', 0)
265 # Roll back and try to start again self = Boss(yada, ...)
266 continue
267 self.rf.flush()
268 for line in inp.split('\n'):
269 line = line.replace('\r', '')
270 def next():
271 try:
272 self.current = self.files.pop(0)
273 except:
274 self.wf.write('SLAVE EXIT')
275 os.waitpid(self.pid, 0)
276 raise StopIteration
277 if self.fails.has_key(self.current) and (
278 self.host in self.fails[self.current]
279 or len(self.fails[self.current]) > MAXTRIES):
280 oldcurrent = self.current
281 try:
282 next()
283 finally:
284 self.files.push(oldcurrent, front=True)
285 return
286 self.wf.write('file=%s\n' % self.current)
287 self.wf.write(EOT)
288 self.start = time.time()
289 percent = 0.0
290 if EOR.replace('\n', '') == line:
291 if self.current:
292 yield (self.current, 'Finished',
293 float(time.time() - self.start))
294 next()
295 break
296 elif line.startswith('Progress:'):
297 action = ' '.join(line.split()[1:-1])
298 if not action:
299 action = last_action
300 if action != last_action:
301 percent = 0.0
302 last_action = action
303 try:
304 percent = float(line.split()[-1])
305 except ValueError:
306 pass
307 yield (self.current, action, percent)
308 elif line.startswith('FAIL:'):
309 if "Insufficient space" in line:
310 yield (self.current, 'Insufficient space', 0.0)
311 raise StopIteration
312 else:
313 yield (self.current, 'Subtask Failed', 0.0)
314 if self.fails.has_key(self.host):
315 self.fails[self.current].append(self.host)
316 else:
317 self.fails[self.current] = [self.host]
318 oldcurrent = self.current
319 try:
320 next()
321 finally:
322 self.files.push(oldcurrent, front=True)
323 break
324 else:
325 #Timed out
326 self.kill()
327 raise StopIteration
328
329 def kill(self):
330 """Kill the child
331 This kills the ssh connection, it is
332 unable to clean up the remote machine."""
333 os.kill(self.pid, signal.SIGKILL)
334 os.waitpid(self.pid, 0)
335
336
337
338 def slavedriver(playlist, opts, cooked=None):
339 """A simple text mode cluster controller.
340
341 Function signature for running from main.py"""
342
343 if opts['verbosity']:
344 def p(s):
345 sys.stdout.write(s)
346 else:
347 p = lambda x: None
348 L = opts.log
349
350 p("mp3togo version %s\n" % mp3togo.version)
351 p("Starting Cluster mode\n\n")
352
353 slaves = opts['cluster']
354 slaves = slaves.split(',')
355 master = None
356
357 p("Connecting to cluster nodes:\n")
358 fails = {} # {'badfilename': [host,...]}
359 if opts['clusternolocal']:
360 ring = []
361 else:
362 p(" localhost")
363 master = Boss('localhost', playlist, fails, opts, r_schema='')
364 ring = [('localhost', master.poll(), master)]
365 for slave in slaves:
366 p(" " + slave)
367 theman = Boss(slave, playlist, fails, opts, r_schema="ssh %h ")
368 tr = (slave, theman.poll(), theman)
369 ring.append(tr)
370
371
372 p("\n\n")
373 files = dict(map(lambda x: (x, ''), slaves))
374 if master:
375 files[master.host] = ''
376 while ring:
377 for slave in ring[:]:
378 try:
379 tup = slave[1].next()
380 #print tup
381 except StopIteration:
382 L(1, "Host %s finished.\n" % slave[0])
383 del ring[ring.index(slave)]
384 break
385 if tup[1] == 'Subtask Failed':
386 L(1, "Task failed on host: %s\n %s\n" % (
387 slave[0], tup[0]))
388 if tup[0] != files[slave[0]]:
389 files[slave[0]] = tup[0]
390 L(1, "Host %s at %s starting file: \n %s\n" % (
391 slave[0], time.ctime(), tup[0]))
392 if ring:
393 time.sleep(0.5)
394 print "All workers exited"
395
396 # Test that all files are actually done, if not, loop.
397
398
399
400