Fix flac tag bug and debian packaging
mp3togo/task.py
1 # - task.py -
2 # This file is part of mp3togo
3
4 # Convert audio files to play on a mp3 player
5 # Manage a sub process.
6 #
7 # (c) Simeon Veldstra 2006 <reallifesim@gmail.com>
8 #
9 # This software is free.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You may redistribute this program under the terms of the
17 # GNU General Public Licence version 2
18 # Available in this package or at http://www.fsf.org
19
20
21 import os
22 import sys
23 import time
24 import select
25 import pty
26 import fcntl
27 import signal
28 import threading
29 import types
30
31 import mp3togo.conf as setup
32
33
34 READY = 'ready'
35 RUNNING = 'running'
36 PAUSED = 'paused'
37 STOPPED = 'stopped'
38 FAILED = 'failed'
39 DONE = 'done'
40
41
42 class SimpleTask:
43 """Run a callable and save its output"""
44
45 def __init__(self, parent, action, fltr=None, reverse=None, tmpsize=0, outsize=0, name=''):
46 if not action:
47 raise TypeError
48 if str(self.__class__).endswith('SimpleTask'):
49 if not callable(action):
50 raise TypeError
51 for func in (fltr, reverse):
52 if func:
53 if not callable(func):
54 raise TypeError
55
56 self.name = name
57 self.tmpsize = tmpsize
58 self.outsize = outsize
59 self._parent = parent
60 self._action = action
61 self._filter = fltr
62 self._reverse = reverse
63 # Hold runlock while accessing self._status
64 self._runlock = threading.Lock()
65 # Hold outlock while accessing self._output
66 self._outlock = threading.Lock()
67 self._status = READY
68 self._output = None
69 self._start_time = 0
70 self._finish_time = 0
71
72 def run(self):
73 """Run the callable"""
74 self.check_sibling()
75 self._runlock.acquire()
76 if self._status != READY:
77 self._runlock.release()
78 return False
79 self._status = RUNNING
80 self._start_time = time.time()
81
82 try:
83 out = self._action()
84 if self._filter:
85 out = self._filter(out)
86 self._outlock.acquire()
87 self._output = out
88 self._outlock.release()
89 except:
90 self._status = FAILED
91 self._runlock.release()
92 return True
93 self._status = DONE
94 self._finish_time = time.time()
95 self._runlock.release()
96 return True
97
98 def pause(self):
99 return False
100
101 def stop(self):
102 return False
103
104 def undo(self):
105 """Undo the action, if possible.
106
107 Runs the reverse function passed in.
108 reverse should return a True value on success or
109 False in the event of a failure. undo returns None
110 if no reverse function was passed in.
111
112 If reverse succeeds, the task object should be in a
113 consistent READY state and run() can start again at the
114 top. If it fails or throws an exception, the state will
115 remain as it is.
116
117 """
118 self._runlock.acquire()
119 if self._status not in (DONE, FAILED, STOPPED):
120 self._runlock.release()
121 raise setup.TaskNotReadyError
122 ret = None
123 if self._reverse:
124 try:
125 ret = self._reverse()
126 except:
127 self._runlock.release()
128 return False
129 if ret:
130 self._status = READY
131 self._runlock.release()
132 return ret
133
134 def status(self):
135 self._runlock.acquire()
136 ret = self._status
137 self._runlock.release()
138 return ret
139
140 def wait(self):
141 pass
142
143 def wait_unpause(self):
144 pass
145
146 def space_req(self):
147 return (self.tmpsize, self.outsize)
148
149 def output(self):
150 self._outlock.acquire()
151 out = self._output
152 self._outlock.release()
153 return out
154
155 def elapsed_time(self):
156 if not self._finish_time:
157 return False
158 return self._finish_time - self._start_time
159
160 def check_sibling(self):
161 bro = self.prev()
162 if bro and bro.status() != DONE:
163 raise setup.TaskNotReadyError
164
165 def next(self):
166 """Return the next task in the queue."""
167 try:
168 queue = self._parent.tasks()
169 except:
170 return None
171 qi = list(queue).index(self)
172 if qi + 1 < len(queue):
173 return queue[qi + 1]
174
175 def prev(self):
176 """Return the previous task in the queue."""
177 try:
178 queue = self._parent.tasks()
179 except:
180 return None
181 qi = list(queue).index(self)
182 if qi - 1 >= 0:
183 return queue[qi - 1]
184
185
186 class Task(SimpleTask):
187 """Run a command in a subprocess and monitor its progress"""
188
189 def __init__(self, parent, action, filter=None, reverse=None, tmpsize=0, outsize=0, name=''):
190 if type(action) not in (list, tuple):
191 raise TypeError
192 for element in action:
193 if type(element) not in types.StringTypes:
194 raise TypeError
195 self._paused = threading.Lock()
196 self._pid = None
197 self._eater = None
198 SimpleTask.__init__(self, parent, action, filter, reverse, tmpsize, outsize, name)
199
200 def run(self):
201 """Run the command in a sub process
202
203 This will probably only work on Linux.
204
205 """
206
207 self.check_sibling()
208 self._runlock.acquire()
209 self._start_time = time.time()
210 if self._status == PAUSED:
211 self._paused.release()
212 self._status = RUNNING
213 self._runlock.release()
214 return True
215 if self._status != READY:
216 self._runlock.release()
217 return False
218 self._status = RUNNING
219
220 fp = None
221
222 def eatout():
223 while not fp:
224 pass
225 while self._status in (RUNNING, PAUSED):
226 wpid, status = os.waitpid(pid, os.WNOHANG)
227 if wpid == pid:
228 # Child exited
229 self._runlock.acquire()
230 if self._paused.locked():
231 self._paused.release()
232 if os.WIFSIGNALED(status):
233 #if os.WSTOPSIG(status) == 0:
234 # self._status = DONE
235 #else:
236 self._status = FAILED
237 elif os.WEXITSTATUS(status) == 0:
238 self._status = DONE
239 else:
240 self._status = FAILED
241 self._runlock.release()
242 break
243 if self._paused.locked():
244 continue
245 rfd, wfd, xfd = select.select([master_fd], [], [], 5)
246 if not rfd:
247 break
248 try:
249 if master_fd in rfd:
250 out = fp.read()
251 except IOError:
252 break
253 if self._filter:
254 try:
255 out = self._filter(out)
256 except:
257 out = ''
258 self._outlock.acquire()
259 self._output = out
260 self._outlock.release()
261 # Child still running, no more output: block on child's exit
262 if self._status in (RUNNING, PAUSED):
263 self._runlock.acquire()
264 if self._paused.locked():
265 self._paused.release()
266 try:
267 wpid, status = os.waitpid(pid, 0)
268 if os.WIFSIGNALED(status):
269 if os.WTERMSIG(status) == signal.SIGHUP:
270 self._status = FAILED
271 else:
272 self._status = FAILED
273 elif os.WEXITSTATUS(status) == 0:
274 self._status = DONE
275 else:
276 self._status = FAILED
277 except OSError:
278 self._status = FAILED
279 self._runlock.release()
280 self._finish_time = time.time()
281
282 self._eater = threading.Thread(None, eatout)
283 self._eater.start()
284
285 pid, master_fd = pty.fork()
286 if pid == 0:
287 time.sleep(0.0125)
288 try:
289 os.execvp(self._action[0], self._action)
290 except:
291 os._exit(1)
292 self._pid = pid
293 fp = os.fdopen(master_fd, 'r', 0)
294 if os.uname()[0] == "Linux":
295 fcntl.fcntl(master_fd, fcntl.F_SETFL, os.O_NONBLOCK)
296 time.sleep(0.0125)
297 self._runlock.release()
298 return True
299
300 def pause(self):
301 self._runlock.acquire()
302 if self._status == RUNNING:
303 self._paused.acquire()
304 self._status = PAUSED
305 self._runlock.release()
306 return True
307 else:
308 self._runlock.release()
309 return False
310
311 def stop(self):
312 """Stop the running process"""
313 self._runlock.acquire()
314 if self._status == RUNNING:
315 try:
316 os.kill(self._pid, signal.SIGKILL)
317 self._status = STOPPED
318 self._runlock.release()
319 self._eater.join()
320 return True
321 except OSError:
322 pass
323 self._runlock.release()
324 return False
325
326 def wait(self):
327 if self._eater:
328 self._eater.join()
329
330 def wait_unpause(self):
331 self._paused.acquire()
332 self._paused.release()
333