Merge with bugfix-0.5
mp3togo/task.py
1 # - task.py -
2 # This file is part of mp3togo
3
4 # Convert audio files to play on a mp3 player
5 # Manage a sub process.
6 #
7 # (c) Simeon Veldstra 2006 <reallifesim@gmail.com>
8 #
9 # This software is free.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You may redistribute this program under the terms of the
17 # GNU General Public Licence version 2
18 # Available in this package or at http://www.fsf.org
19
20
21 import os
22 import sys
23 import time
24 import select
25 import pty
26 import fcntl
27 import signal
28 import threading
29 import types
30
31 import mp3togo.conf as setup
32
33
34 READY = 'ready'
35 RUNNING = 'running'
36 PAUSED = 'paused'
37 STOPPED = 'stopped'
38 FAILED = 'failed'
39 DONE = 'done'
40
41
42 class SimpleTask:
43 """Run a callable and save its output"""
44
45 def __init__(self, parent, action, fltr=None, reverse=None, tmpsize=0, outsize=0, name=''):
46 if not action:
47 raise TypeError
48 if str(self.__class__).endswith('SimpleTask'):
49 if not callable(action):
50 raise TypeError
51 for func in (fltr, reverse):
52 if func:
53 if not callable(func):
54 raise TypeError
55
56 self.name = name
57 self.tmpsize = tmpsize
58 self.outsize = outsize
59 self._parent = parent
60 self._action = action
61 self._filter = fltr
62 self._reverse = reverse
63 # Hold runlock while accessing self._status
64 self._runlock = threading.Lock()
65 # Hold outlock while accessing self._output
66 self._outlock = threading.Lock()
67 self._status = READY
68 self._output = None
69 self._start_time = 0
70 self._finish_time = 0
71
72 def run(self):
73 """Run the callable"""
74 self.check_sibling()
75 self._runlock.acquire()
76 if self._status != READY:
77 self._runlock.release()
78 return False
79 self._status = RUNNING
80 self._start_time = time.time()
81
82 try:
83 out = self._action()
84 if self._filter:
85 out = self._filter(out)
86 self._outlock.acquire()
87 self._output = out
88 self._outlock.release()
89 except:
90 # if verbosity is high enough, print something about the exception
91 self._status = FAILED
92 self._runlock.release()
93 return True
94 self._status = DONE
95 self._finish_time = time.time()
96 self._runlock.release()
97 return True
98
99 def pause(self):
100 return False
101
102 def stop(self):
103 return False
104
105 def undo(self):
106 """Undo the action, if possible.
107
108 Runs the reverse function passed in.
109 reverse should return a True value on success or
110 False in the event of a failure. undo returns None
111 if no reverse function was passed in.
112
113 If reverse succeeds, the task object should be in a
114 consistent READY state and run() can start again at the
115 top. If it fails or throws an exception, the state will
116 remain as it is.
117
118 """
119 self._runlock.acquire()
120 if self._status not in (DONE, FAILED, STOPPED):
121 self._runlock.release()
122 raise setup.TaskNotReadyError
123 ret = None
124 if self._reverse:
125 try:
126 ret = self._reverse()
127 except:
128 self._runlock.release()
129 return False
130 if ret:
131 self._status = READY
132 self._runlock.release()
133 return ret
134
135 def status(self):
136 self._runlock.acquire()
137 ret = self._status
138 self._runlock.release()
139 return ret
140
141 def wait(self):
142 pass
143
144 def wait_unpause(self):
145 pass
146
147 def space_req(self):
148 return (self.tmpsize, self.outsize)
149
150 def output(self):
151 self._outlock.acquire()
152 out = self._output
153 self._outlock.release()
154 return out
155
156 def elapsed_time(self):
157 if not self._finish_time:
158 return False
159 return self._finish_time - self._start_time
160
161 def check_sibling(self):
162 bro = self.prev()
163 if bro and bro.status() != DONE:
164 raise setup.TaskNotReadyError
165
166 def next(self):
167 """Return the next task in the queue."""
168 try:
169 queue = self._parent.tasks()
170 except:
171 return None
172 qi = list(queue).index(self)
173 if qi + 1 < len(queue):
174 return queue[qi + 1]
175
176 def prev(self):
177 """Return the previous task in the queue."""
178 try:
179 queue = self._parent.tasks()
180 except:
181 return None
182 qi = list(queue).index(self)
183 if qi - 1 >= 0:
184 return queue[qi - 1]
185
186
187 class Task(SimpleTask):
188 """Run a command in a subprocess and monitor its progress"""
189
190 def __init__(self, parent, action, filter=None, reverse=None, tmpsize=0, outsize=0, name=''):
191 if type(action) not in (list, tuple):
192 raise TypeError
193 for element in action:
194 if type(element) not in types.StringTypes:
195 raise TypeError
196 self._paused = threading.Lock()
197 self._pid = None
198 self._eater = None
199 SimpleTask.__init__(self, parent, action, filter, reverse, tmpsize, outsize, name)
200
201 def run(self):
202 """Run the command in a sub process
203
204 This will probably only work on Linux.
205
206 """
207
208 self.check_sibling()
209 self._runlock.acquire()
210 self._start_time = time.time()
211 if self._status == PAUSED:
212 self._paused.release()
213 self._status = RUNNING
214 self._runlock.release()
215 return True
216 if self._status != READY:
217 self._runlock.release()
218 return False
219 self._status = RUNNING
220
221 fp = None
222
223 def eatout():
224 while not fp:
225 pass
226 while self._status in (RUNNING, PAUSED):
227 wpid, status = os.waitpid(self._pid, os.WNOHANG)
228 if wpid == pid:
229 # Child exited
230 self._runlock.acquire()
231 if self._paused.locked():
232 self._paused.release()
233 if os.WIFSIGNALED(status):
234 #if os.WSTOPSIG(status) == 0:
235 # self._status = DONE
236 #else:
237 self._status = FAILED
238 elif os.WEXITSTATUS(status) == 0:
239 self._status = DONE
240 else:
241 self._status = FAILED
242 self._runlock.release()
243 break
244 if self._paused.locked():
245 continue
246 rfd, wfd, xfd = select.select([master_fd], [], [], 5)
247 if not rfd:
248 break
249 try:
250 if master_fd in rfd:
251 out = fp.read()
252 except IOError:
253 break
254 if self._filter:
255 try:
256 out = self._filter(out)
257 except:
258 out = ''
259 self._outlock.acquire()
260 self._output = out
261 self._outlock.release()
262 # Child still running, no more output: block on child's exit
263 if self._status in (RUNNING, PAUSED):
264 self._runlock.acquire()
265 if self._paused.locked():
266 self._paused.release()
267 try:
268 wpid, status = os.waitpid(pid, 0)
269 if os.WIFSIGNALED(status):
270 if os.WTERMSIG(status) == signal.SIGHUP:
271 self._status = FAILED
272 else:
273 self._status = FAILED
274 elif os.WEXITSTATUS(status) == 0:
275 self._status = DONE
276 else:
277 self._status = FAILED
278 except OSError:
279 self._status = FAILED
280 self._runlock.release()
281 self._finish_time = time.time()
282
283 self._eater = threading.Thread(None, eatout)
284 self._eater.start()
285
286 pid, master_fd = pty.fork()
287 if pid == 0:
288 time.sleep(0.0125)
289 try:
290 os.execvp(self._action[0], self._action)
291 except:
292 os._exit(1)
293 self._pid = pid
294 fp = os.fdopen(master_fd, 'r', 0)
295 if os.uname()[0] == "Linux":
296 fcntl.fcntl(master_fd, fcntl.F_SETFL, os.O_NONBLOCK)
297 time.sleep(0.0125)
298 self._runlock.release()
299 return True
300
301 def pause(self):
302 self._runlock.acquire()
303 if self._status == RUNNING:
304 self._paused.acquire()
305 self._status = PAUSED
306 self._runlock.release()
307 return True
308 else:
309 self._runlock.release()
310 return False
311
312 def stop(self):
313 """Stop the running process"""
314 self._runlock.acquire()
315 if self._status == RUNNING:
316 try:
317 os.kill(self._pid, signal.SIGKILL)
318 self._status = STOPPED
319 self._runlock.release()
320 self._eater.join()
321 return True
322 except OSError:
323 pass
324 self._runlock.release()
325 return False
326
327 def wait(self):
328 if self._eater:
329 self._eater.join()
330
331 def wait_unpause(self):
332 self._paused.acquire()
333 self._paused.release()
334