Version 0.5.3
mp3togo/cluster.py
1 # - cluster.py -
2 # Cluster mode support.
3 #
4 # This file is part of mp3togo
5 #
6 # Convert audio files to play on a mp3 player
7 #
8 # (c) Simeon Veldstra 2006 <reallifesim@gmail.com>
9 #
10 # This software is free.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You may redistribute this program under the terms of the
18 # GNU General Public Licence version 2
19 # Available in this package or at http://www.fsf.org
20
21
22 """Cluster mode.
23
24 Run slave processes on multiple machines communicating by
25 pipes over ssh and sharing data through NFS mounts."""
26
27 import os
28 import sys
29 import pty
30 import time
31 import fcntl
32 import select
33 import signal
34 import termios
35 import cPickle
36
37 import mp3togo.filelist as filelist
38 import mp3togo.track as track
39 import mp3togo.cache as cache
40 import mp3togo.conf as conf
41 import mp3togo.task as task
42 import mp3togo.pool as pool
43 import mp3togo
44
45
46 BEGIN = 'begin'
47 READY = 'ready'
48 INUSE = 'inuse'
49 EXIT = 'exit'
50
51 MAXTRIES = 3
52
53 EOT = '\n*****\n\n'
54 EOR = '\n#####\n\n'
55 RS = '\n#**\n**#\n'
56
57 # Time to sleep between output messages
58 INTERVAL = 1.0
59
60
61 def slave(opts):
62 """Run as a slave process.
63
64 The program was run with --cluster-slave. Establish
65 communication with the master and do protocol.
66
67 The message protocol:
68
69 Enter: Master --> options --> Slave-\ \
70 | |
71 /-------<-----------<----------------/ |
72 | EXIT | BEGIN
73 Slave --> ack --> Master-\ ^ |
74 | | |
75 /-------<-------<--------/ | /
76 | | - READY
77 |->Master --> [work_unit | quit] --> Slave--| \
78 | | |
79 | /--------------<---------------<---------/ | INUSE
80 ^ | |
81 | \ Slave --> % ... % ... % ... Done --> Master-\ /
82 | |
83 \<----------------<---------------<--------------/ - READY
84
85 """
86
87 state = BEGIN
88 sin = sys.stdin
89 sout = sys.stdout
90 cooked = None
91 trk = None
92 seen_pickle = True
93
94
95 fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Want
96 #fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Don't
97
98 def initialize(ropts):
99 """Integrate remote options into local opts"""
100 # of = file('/home/sim/temp/2go/cluster/opts-slave', 'w')
101 # for key in ropts.keys():
102 # if opts.has_key(key):
103 # opts[key] = ropts[key]
104 # # of.write("%s: %s\n" % (key, opts[key]))
105 # else:
106 # return None
107 # of.close()
108 st = ropts['pickle']
109 opts._d = cPickle.loads(st)
110
111 opts['verbosity'] = 0
112 of = file('/home/sim/temp/2go/cluster/opts-slave', 'w')
113 for key in opts.keys():
114 of.write("%s: %s\n" % (key, opts[key]))
115 of.close()
116 return "SLAVE: ready"
117
118 def perform(rotps):
119 """Run the work unit and watch progress"""
120 job_start = time.time()
121 if not rotps.has_key('file'):
122 raise conf.ErrorClusterProtocol
123 if opts['usecache']:
124 fc = cache.Cache(opts['usecache'], opts)
125 else:
126 fc = None
127 trk = track.Track(rotps['file'], opts, fc)
128 pl = pool.Pool(opts)
129 if pl.add_track(trk):
130 tsk = trk.getfirst()
131 while tsk:
132 tsk.run()
133 while tsk.status() == task.RUNNING:
134 yield "Progress: %s %s\n" % (tsk.name, str(tsk.output()))
135 tsk.wait()
136 if tsk.status() != task.DONE:
137 name = tsk.name
138 while tsk:
139 tsk.undo()
140 tsk = tsk.prev()
141 yield "FAIL: %s\n" % name
142 raise StopIteration
143 tsk = tsk.next()
144 else:
145 yield "FAIL: Insufficient space\n"
146 raise StopIteration
147 yield "DONE: %f\n" % (time.time() - job_start)
148 raise StopIteration
149
150
151 # The state machine
152 while state != EXIT:
153 if state == INUSE:
154 raise conf.ErrorClusterProtocol
155 # Read from master
156 rdata = ''
157 while rdata[-len(EOT):] != EOT:
158 rfd, wfd, xfd = select.select([sys.stdin.fileno()],
159 [], [], 15)
160 if not rfd:
161 sout.write("No input within fifteen seconds, ")
162 sout.write("can't wait around forever.")
163 state = EXIT
164 break
165 rdata += sin.read()
166 if rdata == "SLAVE EXIT":
167 state = EXIT
168 break
169 sout.flush()
170 sin.flush()
171 rdata = rdata[:-len(EOT)]
172 rdata = rdata.replace('\r', '')
173 ropts = {}
174 for line in rdata.split(RS):
175 if '=' in line:
176 key, value = line.split('=', 1)
177 ropts[key] = value[:-1] # \n
178
179 # Write to master
180 if state == BEGIN:
181 msg = initialize(ropts)
182 if msg:
183 state = READY
184 else:
185 state = EXIT
186 msg = "Did not understand request."
187 msg += EOR
188 sout.write(msg)
189 sout.flush()
190 sin.flush()
191 elif state == READY:
192 state = INUSE
193 gen = perform(ropts)
194 while state == INUSE:
195 try:
196 msg = gen.next()
197 time.sleep(INTERVAL)
198 except StopIteration:
199 state = READY
200 msg = EOR
201 sout.write(msg)
202 sout.flush()
203 sin.flush()
204
205 # END REMOTE END
206
207 # START MASTER END
208
209 class Boss:
210 """Manage one slave"""
211
212 def __init__(self, host, files, failures, opts, r_schema=""):
213 """Manage a remote host. use r_schema='ssh %h ' for remote hosts
214 Files is a FileList of files to process the list should be
215 shared with the cooperating bosses"""
216 self.host = host
217 self.files = files
218 self.fails = failures
219 self.opts = opts
220 self.current = None
221 self.pid, self.fd1 = pty.fork()
222 if self.pid == 0:
223 args = r_schema.replace('%h', host)
224 args += " mp3togo --cluster-slave True"
225 #args += " test-mp3togo --cluster-slave True"
226 args = args.split()
227 os.execvp(args[0], args)
228 else:
229 self.fd2 = os.dup(self.fd1)
230 self.rf = os.fdopen(self.fd1, 'r', 0)
231 fcntl.fcntl(self.fd1, fcntl.F_SETFL, os.O_NONBLOCK)
232 self.wf = os.fdopen(self.fd2, 'w', 0)
233 fcntl.fcntl(self.fd2, fcntl.F_SETFL, os.O_NONBLOCK)
234
235 def __del__(self):
236 self.rf.flush()
237 self.wf.flush()
238 self.rf.close()
239 self.wf.close()
240 try:
241 os.waitpid(self.pid, 0)
242 except OSError:
243 pass
244
245 def poll(self):
246 start = 0
247 last_action = ''
248 #for key in self.opts.keys():
249 # self.wf.write('%s=%s\n' % (key, self.opts[key]))
250 optd = self.opts._d.copy()
251 optd['arg_files'] = []
252 st = "pickle=" + cPickle.dumps(optd) + '\n'
253 self.wf.write(st)
254 self.wf.flush()
255 self.wf.write(EOT)
256 self.wf.flush()
257 errors = 0
258 while 1:
259 a, b, c = select.select([self.fd1, self.fd2], [], [], 60)
260 if a:
261 try:
262 inp = self.rf.read()
263 except IOError:
264 errors += 1
265 if errors > 25:
266 if self.current:
267 self.files.push(self.current, front=True)
268 raise StopIteration
269 yield ('Not Ready', 'Not Ready', 0)
270 continue
271 self.rf.flush()
272 for line in inp.split('\n'):
273 line = line.replace('\r', '')
274 def next():
275 try:
276 self.current = self.files.pop(0)
277 except:
278 self.wf.write('SLAVE EXIT')
279 os.waitpid(self.pid, 0)
280 raise StopIteration
281 if self.fails.has_key(self.current) and (
282 self.host in self.fails[self.current]
283 or len(self.fails[self.current]) > MAXTRIES):
284 oldcurrent = self.current
285 try:
286 next()
287 finally:
288 self.files.push(oldcurrent, front=True)
289 return
290 self.wf.write('file=%s\n' % self.current)
291 self.wf.write(EOT)
292 start = time.time()
293 percent = 0.0
294 if EOR.replace('\n', '') == line:
295 if self.current:
296 yield (self.current, 'Finished',
297 float(time.time() - start))
298 next()
299 break
300 elif line.startswith('Progress:'):
301 action = ' '.join(line.split()[1:-1])
302 if not action:
303 action = last_action
304 if action != last_action:
305 percent = 0.0
306 last_action = action
307 try:
308 percent = float(line.split()[-1])
309 except ValueError:
310 pass
311 yield (self.current, action, percent)
312 elif line.startswith('FAIL:'):
313 if "Insufficient space" in line:
314 yield (self.current, 'Insufficient space', 0.0)
315 raise StopIteration
316 else:
317 yield (self.current, 'Subtask Failed', 0.0)
318 if self.fails.has_key(self.host):
319 self.fails[self.current].append(self.host)
320 else:
321 self.fails[self.current] = [self.host]
322 oldcurrent = self.current
323 try:
324 next()
325 finally:
326 self.files.push(oldcurrent, front=True)
327 break
328 else:
329 #Timed out
330 self.kill()
331 raise StopIteration
332
333 def kill(self):
334 """Kill the child
335 This kills the ssh connection, it is
336 unable to clean up the remote machine."""
337 os.kill(self.pid, signal.SIGKILL)
338 os.waitpid(self.pid, 0)
339
340
341
342 def slavedriver(playlist, opts, cooked=None):
343 """A simple text mode cluster controller.
344
345 Function signature for running from main.py"""
346
347 if opts['verbosity']:
348 def p(s):
349 sys.stdout.write(s)
350 else:
351 p = lambda x: None
352 L = opts.log
353
354 p("mp3togo version %s\n" % mp3togo.version)
355 p("Starting Cluster mode\n\n")
356
357 slaves = opts['cluster']
358 slaves = slaves.split(',')
359 master = None
360
361 p("Connecting to cluster nodes:\n")
362 fails = {} # {'badfilename': [host,...]}
363 if opts['clusternolocal']:
364 ring = []
365 else:
366 p(" localhost")
367 master = Boss('localhost', playlist, fails, opts, r_schema='')
368 ring = [('localhost', master.poll(), master)]
369 for slave in slaves:
370 p(" " + slave)
371 theman = Boss(slave, playlist, fails, opts, r_schema="ssh %h ")
372 tr = (slave, theman.poll(), theman)
373 ring.append(tr)
374
375
376 p("\n\n")
377 files = dict(map(lambda x: (x, ''), slaves))
378 if master:
379 files[master.host] = ''
380 while ring:
381 for slave in ring[:]:
382 try:
383 tup = slave[1].next()
384 #print tup
385 except StopIteration:
386 L(1, "Host %s finished.\n" % slave[0])
387 del ring[ring.index(slave)]
388 break
389 if tup[1] == 'Subtask Failed':
390 L(1, "Task failed on host: %s\n %s\n" % (
391 slave[0], tup[0]))
392 if tup[0] != files[slave[0]]:
393 files[slave[0]] = tup[0]
394 L(1, "Host %s at %s starting file: \n %s\n" % (
395 slave[0], time.ctime(), tup[0]))
396 if ring:
397 time.sleep(0.5)
398 print "All workers exited"
399
400 # Test that all files are actually done, if not, loop.
401
402
403
404