Update Debian packaging
mp3togo/cluster.py
1 # - cluster.py -
2 # Cluster mode support.
3 #
4 # This file is part of mp3togo
5 #
6 # Convert audio files to play on a mp3 player
7 #
8 # (c) Simeon Veldstra 2006 <reallifesim@gmail.com>
9 #
10 # This software is free.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You may redistribute this program under the terms of the
18 # GNU General Public Licence version 2
19 # Available in this package or at http://www.fsf.org
20
21
22 """Cluster mode.
23
24 Run slave processes on multiple machines communicating by
25 pipes over ssh and sharing data through NFS mounts."""
26
27 import os
28 import sys
29 import pty
30 import time
31 import fcntl
32 import select
33 import signal
34 import termios
35 import cPickle
36
37 import mp3togo.filelist as filelist
38 import mp3togo.track as track
39 import mp3togo.cache as cache
40 import mp3togo.conf as conf
41 import mp3togo.task as task
42 import mp3togo.pool as pool
43 import mp3togo
44
45
46 BEGIN = 'begin'
47 READY = 'ready'
48 INUSE = 'inuse'
49 EXIT = 'exit'
50
51 MAXTRIES = 3
52
53 EOT = '\n*****\n\n'
54 EOR = '\n#####\n\n'
55 RS = '\n#**\n**#\n'
56
57 # Time to sleep between output messages
58 INTERVAL = 1.0
59
60
61 def slave(opts):
62 """Run as a slave process.
63
64 The program was run with --cluster-slave. Establish
65 communication with the master and do protocol.
66
67 The message protocol:
68
69 Enter: Master --> options --> Slave-\ \
70 | |
71 /-------<-----------<----------------/ |
72 | EXIT | BEGIN
73 Slave --> ack --> Master-\ ^ |
74 | | |
75 /-------<-------<--------/ | /
76 | | - READY
77 |->Master --> [work_unit | quit] --> Slave--| \
78 | | |
79 | /--------------<---------------<---------/ | INUSE
80 ^ | |
81 | \ Slave --> % ... % ... % ... Done --> Master-\ /
82 | |
83 \<----------------<---------------<--------------/ - READY
84
85 """
86
87 state = BEGIN
88 sin = sys.stdin
89 sout = sys.stdout
90 cooked = None
91 trk = None
92 seen_pickle = True
93
94
95 fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Want
96
97 def initialize(ropts):
98 """Integrate remote options into local opts"""
99 st = ropts['pickle']
100 opts._d = cPickle.loads(st)
101 opts['verbosity'] = 0
102 return "SLAVE: ready"
103
104 def perform(rotps):
105 """Run the work unit and watch progress"""
106 job_start = time.time()
107 if not rotps.has_key('file'):
108 raise conf.ErrorClusterProtocol
109 if opts['usecache']:
110 fc = cache.Cache(opts['usecache'], opts)
111 else:
112 fc = None
113 trk = track.Track(rotps['file'], opts, fc)
114 pl = pool.Pool(opts)
115 if pl.add_track(trk):
116 tsk = trk.getfirst()
117 while tsk:
118 tsk.run()
119 while tsk.status() == task.RUNNING:
120 yield "Progress: %s %s\n" % (tsk.name, str(tsk.output()))
121 tsk.wait()
122 if tsk.status() != task.DONE:
123 name = tsk.name
124 while tsk:
125 tsk.undo()
126 tsk = tsk.prev()
127 yield "FAIL: %s\n" % name
128 raise StopIteration
129 tsk = tsk.next()
130 else:
131 yield "FAIL: Insufficient space\n"
132 raise StopIteration
133 yield "DONE: %f\n" % (time.time() - job_start)
134 raise StopIteration
135
136
137 # The state machine
138 while state != EXIT:
139 if state == INUSE:
140 raise conf.ErrorClusterProtocol
141 # Read from master
142 rdata = ''
143 while rdata[-len(EOT):] != EOT:
144 rfd, wfd, xfd = select.select([sys.stdin.fileno()],
145 [], [], 15)
146 if not rfd:
147 sout.write("No input within fifteen seconds, ")
148 sout.write("can't wait around forever.")
149 state = EXIT
150 break
151 rdata += sin.read()
152 if rdata == "SLAVE EXIT":
153 state = EXIT
154 break
155 sout.flush()
156 sin.flush()
157 rdata = rdata[:-len(EOT)]
158 rdata = rdata.replace('\r', '')
159 ropts = {}
160 for line in rdata.split(RS):
161 if '=' in line:
162 key, value = line.split('=', 1)
163 ropts[key] = value[:-1] # \n
164
165 # Write to master
166 if state == BEGIN:
167 msg = initialize(ropts)
168 if msg:
169 state = READY
170 else:
171 state = EXIT
172 msg = "Did not understand request."
173 msg += EOR
174 sout.write(msg)
175 sout.flush()
176 sin.flush()
177 elif state == READY:
178 state = INUSE
179 gen = perform(ropts)
180 while state == INUSE:
181 try:
182 msg = gen.next()
183 time.sleep(INTERVAL)
184 except StopIteration:
185 state = READY
186 msg = EOR
187 sout.write(msg)
188 sout.flush()
189 sin.flush()
190
191 # END REMOTE END
192
193 # START MASTER END
194
195 class Boss:
196 """Manage one slave"""
197
198 def __init__(self, host, files, failures, opts, r_schema=""):
199 """Manage a remote host. use r_schema='ssh %h ' for remote hosts
200 Files is a FileList of files to process the list should be
201 shared with the cooperating bosses"""
202 self.host = host
203 self.files = files
204 self.fails = failures
205 self.opts = opts
206 self.current = None
207 self.pid, self.fd1 = pty.fork()
208 if self.pid == 0:
209 args = r_schema.replace('%h', host)
210 args += " mp3togo --cluster-slave True"
211 args = args.split()
212 os.execvp(args[0], args)
213 else:
214 self.fd2 = os.dup(self.fd1)
215 self.rf = os.fdopen(self.fd1, 'r', 0)
216 fcntl.fcntl(self.fd1, fcntl.F_SETFL, os.O_NONBLOCK)
217 self.wf = os.fdopen(self.fd2, 'w', 0)
218 fcntl.fcntl(self.fd2, fcntl.F_SETFL, os.O_NONBLOCK)
219
220 def __del__(self):
221 self.rf.flush()
222 self.wf.flush()
223 self.rf.close()
224 self.wf.close()
225 try:
226 os.waitpid(self.pid, 0)
227 except OSError:
228 pass
229
230 def poll(self):
231 start = 0
232 last_action = ''
233 optd = self.opts._d.copy()
234 optd['arg_files'] = []
235 st = "pickle=" + cPickle.dumps(optd) + '\n'
236 self.wf.write(st)
237 self.wf.flush()
238 self.wf.write(EOT)
239 self.wf.flush()
240 errors = 0
241 while 1:
242 a, b, c = select.select([self.fd1, self.fd2], [], [], 60)
243 if a:
244 try:
245 inp = self.rf.read()
246 except IOError:
247 errors += 1
248 if errors > 25:
249 if self.current:
250 self.files.push(self.current, front=True)
251 raise StopIteration
252 yield ('Not Ready', 'Not Ready', 0)
253 continue
254 self.rf.flush()
255 for line in inp.split('\n'):
256 line = line.replace('\r', '')
257 def next():
258 try:
259 self.current = self.files.pop(0)
260 except:
261 self.wf.write('SLAVE EXIT')
262 os.waitpid(self.pid, 0)
263 raise StopIteration
264 if self.fails.has_key(self.current) and (
265 self.host in self.fails[self.current]
266 or len(self.fails[self.current]) > MAXTRIES):
267 oldcurrent = self.current
268 try:
269 next()
270 finally:
271 self.files.push(oldcurrent, front=True)
272 return
273 self.wf.write('file=%s\n' % self.current)
274 self.wf.write(EOT)
275 start = time.time()
276 percent = 0.0
277 if EOR.replace('\n', '') == line:
278 if self.current:
279 yield (self.current, 'Finished',
280 float(time.time() - start))
281 next()
282 break
283 elif line.startswith('Progress:'):
284 action = ' '.join(line.split()[1:-1])
285 if not action:
286 action = last_action
287 if action != last_action:
288 percent = 0.0
289 last_action = action
290 try:
291 percent = float(line.split()[-1])
292 except ValueError:
293 pass
294 yield (self.current, action, percent)
295 elif line.startswith('FAIL:'):
296 if "Insufficient space" in line:
297 yield (self.current, 'Insufficient space', 0.0)
298 raise StopIteration
299 else:
300 yield (self.current, 'Subtask Failed', 0.0)
301 if self.fails.has_key(self.host):
302 self.fails[self.current].append(self.host)
303 else:
304 self.fails[self.current] = [self.host]
305 oldcurrent = self.current
306 try:
307 next()
308 finally:
309 self.files.push(oldcurrent, front=True)
310 break
311 else:
312 #Timed out
313 self.kill()
314 raise StopIteration
315
316 def kill(self):
317 """Kill the child
318 This kills the ssh connection, it is
319 unable to clean up the remote machine."""
320 os.kill(self.pid, signal.SIGKILL)
321 os.waitpid(self.pid, 0)
322
323
324
325 def slavedriver(playlist, opts, cooked=None):
326 """A simple text mode cluster controller.
327
328 Function signature for running from main.py"""
329
330 if opts['verbosity']:
331 def p(s):
332 sys.stdout.write(s)
333 else:
334 p = lambda x: None
335 L = opts.log
336
337 p("mp3togo version %s\n" % mp3togo.version)
338 p("Starting Cluster mode\n\n")
339
340 slaves = opts['cluster']
341 slaves = slaves.split(',')
342 master = None
343
344 p("Connecting to cluster nodes:\n")
345 fails = {} # {'badfilename': [host,...]}
346 if opts['clusternolocal']:
347 ring = []
348 else:
349 p(" localhost")
350 master = Boss('localhost', playlist, fails, opts, r_schema='')
351 ring = [('localhost', master.poll(), master)]
352 for slave in slaves:
353 p(" " + slave)
354 theman = Boss(slave, playlist, fails, opts, r_schema="ssh %h ")
355 tr = (slave, theman.poll(), theman)
356 ring.append(tr)
357
358
359 p("\n\n")
360 files = dict(map(lambda x: (x, ''), slaves))
361 if master:
362 files[master.host] = ''
363 while ring:
364 for slave in ring[:]:
365 try:
366 tup = slave[1].next()
367 #print tup
368 except StopIteration:
369 L(1, "Host %s finished.\n" % slave[0])
370 del ring[ring.index(slave)]
371 break
372 if tup[1] == 'Subtask Failed':
373 L(1, "Task failed on host: %s\n %s\n" % (
374 slave[0], tup[0]))
375 if tup[0] != files[slave[0]]:
376 files[slave[0]] = tup[0]
377 L(1, "Host %s at %s starting file: \n %s\n" % (
378 slave[0], time.ctime(), tup[0]))
379 if ring:
380 time.sleep(0.5)
381 print "All workers exited"
382
383 # Test that all files are actually done, if not, loop.
384
385
386
387