Hacking at cluster mode

file:00c3a3cdd67786960c1ea8de57361f8bdc201598(new)
--- /dev/null
+++ b/mp3togo/cluster.py
@@ -0,0 +1,116 @@
+# - cluster.py -
+# Cluster mode support.
+#
+# This file is part of mp3togo
+#
+# Convert audio files to play on a mp3 player
+#
+# (c) Simeon Veldstra 2006 <reallifesim@gmail.com>
+#
+# This software is free.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You may redistribute this program under the terms of the
+# GNU General Public Licence version 2
+# Available in this package or at http://www.fsf.org
+
+
+"""Cluster mode.
+
+ Run slave processes on multiple machines communicating by
+ pipes over ssh and sharing data through NFS mounts."""
+
+import sys
+import os
+import time
+import cPickle
+import fcntl
+import select
+
+import mp3togo
+import mp3togo.filelist as filelist
+import mp3togo.conf as conf
+import mp3togo.track as track
+import mp3togo.task as task
+import mp3togo.pool as pool
+import mp3togo.cache as cache
+
+
+BEGIN = 1
+READY = 2
+INUSE = 3
+EXIT = 4
+
+
+def run_slave(opts):
+ """Run as a slave process.
+
+ The program was run with --cluster-slave. Establish
+ communication with the master and do protocol.
+
+ The message protocol:
+
+ Master --> options --> Slave-\ \
+ | |
+ /-------<-----------<-------/ |
+ | EXIT | BEGIN
+ Slave --> ack --> Master-\ ^ |
+ | | |
+ /-------<-------<--------/ | /
+ | | - READY
+ |->Master --> [work_unit | quit] --> Slave-| \
+ | | |
+ | /--------------<---------------<--------/ | INUSE
+ ^ | |
+ | \ Slave --> % ... % ... % ... Done --> Master-\ /
+ | |
+ \<----------------<---------------<--------------/ - READY
+
+ """
+
+ state = BEGIN
+ sin = sys.stdin
+ sout = sys.stdout
+
+ fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Want
+ #fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) #Don't
+
+ sout.write("foo bar baz output quux\n")
+ sout.flush()
+ sin.flush()
+
+ sout.write("Below now, foo bar baz output quux\n")
+ sout.flush()
+ sin.flush()
+
+ rcuke = ''
+ while rcuke[-5:] != '\n***\n':
+ rfd, wfd, xfd = select.select([sys.stdin.fileno()], [], [], 40)
+ if not rfd:
+ sout.write("No input in thirty seconds")
+ sys.exit()
+ rcuke = sin.read()
+ rcuke = rcuke[:-4]
+ ropts = {}
+ for line in rcuke.split('\n'):
+ if '=' in line:
+ key, value = line.split('=', 1)
+ ropts[key] = value
+ sout.flush()
+ sin.flush()
+
+ sout.write("Real Below now, foo bar baz output quux\n")
+ sout.flush()
+ sin.flush()
+
+ of = file("XXX-Test-Cluster.out", 'w')
+ for k, v in ropts.items():
+ of.write("%s, %s\n" % (k, v))
+ of.close()
+
+
+
file:8580d89dd10e30e44d83e45767bbaa5c816fbcee -> file:62a48999290a624144b97ced935e26ec7dceae95
--- a/mp3togo/conf.py
+++ b/mp3togo/conf.py
@@ -110,7 +110,9 @@ class Options(options.Options):
('index', '', 'index', False, None,
'''Create an index file when in wav output mode.'''),
('nonormal', '', 'no-normalize', False, None,
- '''Don't normalize the wav file before encoding.''')]
+ '''Don't normalize the wav file before encoding.'''),
+ ('clusterslave', '', 'cluster-slave', False, None,
+ '''Start a worker process for cluster mode. This option is used by the cluster master to start slave processes. Do not use.''')]
# Override hook defined in Base class
self._conffile = self._absfile
file:5a2bbd43c5dd19e0c05b0ec8a0bc54845b336497 -> file:86b594d49949a2d2a3f9f3751d6b67559bbeaf31
--- a/mp3togo/main.py
+++ b/mp3togo/main.py
@@ -39,6 +39,7 @@ import mp3togo.track as track
import mp3togo.task as task
import mp3togo.pool as pool
import mp3togo.cache as cache
+import mp3togo.cluster as cluster
def fail(mesg='', code=1):
@@ -55,6 +56,11 @@ def main(argv):
print conf.Options().usage()
fail(str(msg))
+ # Cluster mode is handled in the cluster module
+ if opts['clusterslave']:
+ cluster.run_slave(opts)
+ return 0
+
# Are we creating a cache?
if opts['makecache']:
if not opts['cachesize']: