maposmatic-dev
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Maposmatic-dev] [PATCH maposmatic 2/3] Revamp the job renderer module w


From: Maxime Petazzoni
Subject: [Maposmatic-dev] [PATCH maposmatic 2/3] Revamp the job renderer module with timeout support
Date: Fri, 5 Feb 2010 09:20:30 +0100

The render module, originally a simple extract of the render_job
function from the old daemon, has been rewriten to expose a more
sensible API and to provide self-contained timeout support.

Two classes are now available, exposing the same public API for seamless
use of any of the two:

  * JobRenderer is a simple, blocking job renderer. It can be threaded
    if the caller decides to call start() instead of run().
  * TimingOutJobRenderer, as its name suggests, is a timing out job
    renderer that makes use of the threading capability of JobRenderer
    to handle a timeout on the rendering process and kill the rendering
    thread, whatever it is doing, if the given timeout is reached.

The render module also now exposes a few public RESULT_ constants that
can be used to identify the result of a job rendering. This is used in
the daemon to infer an appropriate resultmsg.

As a standalone process, the job renderer now takes an optional second
argument: the timeout, in seconds.

  .../scripts/wrapper.py scripts/render.py <jobid> [timeout]

As of now, the daemon is fully usable in production with the same level
of functionality as before.

Signed-off-by: Maxime Petazzoni <address@hidden>
---
 scripts/daemon.py |   72 ++++++++++++---------
 scripts/render.py |  187 +++++++++++++++++++++++++++++++++++++++--------------
 2 files changed, 179 insertions(+), 80 deletions(-)

diff --git a/scripts/daemon.py b/scripts/daemon.py
index 1639034..95d3327 100755
--- a/scripts/daemon.py
+++ b/scripts/daemon.py
@@ -23,29 +23,38 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 import os
-import time
 import sys
 import threading
+import time
 
 import render
 from www.maposmatic.models import MapRenderingJob
 from www.settings import LOG
 from www.settings import RENDERING_RESULT_PATH, RENDERING_RESULT_MAX_SIZE_GB
 
-RESULT_SUCCESSFULL = 'ok'
-RESULT_INTERRUPTED = 'rendering interrupted'
-RESULT_FAILED      = 'rendering failed'
-RESULT_CANCELED    = 'rendering took too long, canceled'
+_DEFAULT_CLEAN_FREQUENCY = 20       # Clean thread polling frequency, in
+                                    # seconds.
+_DEFAULT_POLL_FREQUENCY = 10        # Daemon job polling frequency, in seconds
+
+_RESULT_MSGS = {
+    render.RESULT_SUCCESS: 'ok',
+    render.RESULT_KEYBOARD_INTERRUPT: 'rendering interrupted',
+    render.RESULT_RENDERING_EXCEPTION: 'rendering failed',
+    render.RESULT_TIMEOUT_REACHED: 'rendering took too long, canceled'
+}
 
 class MapOSMaticDaemon:
     """
     This is a basic rendering daemon, base class for the different
     implementations of rendering scheduling. By default, it acts as a
     standalone, single-process MapOSMatic rendering daemon.
+
+    It of course uses the TimingOutJobRenderer to ensure no long-lasting job
+    stalls the queue.
     """
 
-    def __init__(self, frequency):
-        self.frequency = 10
+    def __init__(self, frequency=_DEFAULT_POLL_FREQUENCY):
+        self.frequency = frequency
         LOG.info("MapOSMatic rendering daemon started.")
         self.rollback_orphaned_jobs()
 
@@ -73,31 +82,32 @@ class MapOSMaticDaemon:
         LOG.info("MapOSMatic rendering daemon terminating.")
 
     def dispatch(self, job):
-        """Dispatch the given job. In this simple single-process daemon, this
-        is as simple as rendering it."""
-        self.render(job)
+        """In this simple single-process daemon, dispatching is as easy as
+        calling the render() method. Subclasses probably want to overload this
+        method too and implement a more clever dispatching mechanism.
+
+        Args:
+            job (MapRenderingJob): the job to process and render.
 
-    def render(self, job):
-        """Render the given job, handling the different rendering outcomes
-        (success or failures)."""
+        Returns True if the rendering was successful, False otherwise.
+        """
 
-        LOG.info("Rendering job #%d '%s'..." %
-                 (job.id, job.maptitle))
-        job.start_rendering()
+        return self.render(job, 'maposmaticd_%d_' % os.getpid())
+
+    def render(self, job, prefix=None):
+        """Render the given job using a timing out job renderer.
 
-        ret = render.render_job(job)
-        if ret == 0:
-            msg = RESULT_SUCCESSFULL
-            LOG.info("Finished rendering of job #%d." % job.id)
-        elif ret == 1:
-            msg = RESULT_INTERRUPTED
-            LOG.info("Rendering of job #%d interrupted!" % job.id)
-        else:
-            msg = RESULT_FAILED
-            LOG.info("Rendering of job #%d failed (exception occurred)!" %
-                     job.id)
+        Args:
+            job (MapRenderingJob): the job to process and render.
+
+        Returns True if the rendering was successful, False otherwise.
+        """
 
-        job.end_rendering(msg)
+        renderer = render.TimingOutJobRenderer(job, prefix=prefix)
+        job.start_rendering()
+        ret = renderer.run()
+        job.end_rendering(_RESULT_MSGS[ret])
+        return ret == 0
 
 
 class RenderingsGarbageCollector(threading.Thread):
@@ -107,7 +117,7 @@ class RenderingsGarbageCollector(threading.Thread):
     of RENDERING_RESULT_MAX_SIZE_GB.
     """
 
-    def __init__(self, frequency=20):
+    def __init__(self, frequency=_DEFAULT_CLEAN_FREQUENCY):
         threading.Thread.__init__(self)
 
         self.frequency = frequency
@@ -217,8 +227,8 @@ if __name__ == '__main__':
                   "Please use a valid RENDERING_RESULT_PATH.")
         sys.exit(1)
 
-    daemon = MapOSMaticDaemon(10)
-    cleaner = RenderingsGarbageCollector(20)
+    cleaner = RenderingsGarbageCollector()
+    daemon = MapOSMaticDaemon()
 
     cleaner.start()
     daemon.serve()
diff --git a/scripts/render.py b/scripts/render.py
index 32ae347..ea370f6 100755
--- a/scripts/render.py
+++ b/scripts/render.py
@@ -25,77 +25,166 @@
 import Image
 import os
 import sys
+import threading
 
 from ocitysmap.coords import BoundingBox
 from ocitysmap.street_index import OCitySMap
 from www.maposmatic.models import MapRenderingJob
-from www.settings import RENDERING_RESULT_PATH, RENDERING_RESULT_FORMATS
+from www.settings import LOG
 from www.settings import OCITYSMAP_CFG_PATH
+from www.settings import RENDERING_RESULT_PATH, RENDERING_RESULT_FORMATS
 
-def render_job(job, prefix=None):
-    """Renders the given job, encapsulating all processing errors and
-    exceptions.
+RESULT_SUCCESS = 0
+RESULT_KEYBOARD_INTERRUPT = 1
+RESULT_RENDERING_EXCEPTION = 2
+RESULT_TIMEOUT_REACHED = 3
 
-    This does not affect the job entry in the database in any way. It's the
-    responsibility of the caller to do maintain the job status in the database.
+class TimingOutJobRenderer:
+    """
+    The TimingOutJobRenderer is a wrapper around JobRenderer implementing
+    timeout management. It uses JobRenderer as a thread, and tries to join it
+    for the given timeout. If the timeout is reached, the thread is suspended,
+    cleaned up and killed.
 
-    Returns:
-        * 0 on success;
-        * 1 on ^C;
-        * 2 on a rendering exception from OCitySMap.
+    The TimingOutJobRenderer has exactly the same API as the non-threading
+    JobRenderer, so it can be used in place of JobRenderer very easily.
     """
 
-    if job.administrative_city is None:
-        bbox = BoundingBox(job.lat_upper_left, job.lon_upper_left,
-                           job.lat_bottom_right, job.lon_bottom_right)
-        renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH,
-                             map_areas_prefix=prefix,
-                             boundingbox=bbox,
-                             language=job.map_language)
-    else:
-        renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH,
-                             map_areas_prefix=prefix,
-                             osmid=job.administrative_osmid,
-                             language=job.map_language)
-
-    prefix = os.path.join(RENDERING_RESULT_PATH, job.files_prefix())
+    def __init__(self, job, timeout=1200, prefix=None):
+        """Initializes this TimingOutJobRenderer with a given job and a 
timeout.
+
+        Args:
+            job (MapRenderingJob): the job to render.
+            timeout (int): a timeout, in seconds (defaults to 20 minutes).
+            prefix (string): renderer map_areas table prefix.
+        """
+
+        self.__timeout = timeout
+        self.__thread = JobRenderer(job, prefix)
+
+    def run(self):
+        """Renders the job using a JobRendered, encapsulating all processing
+        errors and exceptions, with the addition here of a processing timeout.
+
+        Returns one of the RESULT_ constants.
+        """
+
+        self.__thread.start()
+        self.__thread.join(self.__timeout)
+
+        # If the thread is no longer alive, the timeout was not reached and all
+        # is well.
+        if not self.__thread.isAlive():
+            return self.__thread.result
+
+        LOG.info("Rendering of job #%d took too long (timeout reached)!" %
+                 self.__thread.job.id)
+
+        # Remove the job files
+        self.__thread.job.remove_all_files()
+
+        # Kill the thread and return TIMEOUT_REACHED
+        del self.__thread
+        return RESULT_TIMEOUT_REACHED
+
+class JobRenderer(threading.Thread):
+    """
+    A simple, blocking job rendered. It can be used as a thread, or directly in
+    the main processing path of the caller if it chooses to call run()
+    directly.
+    """
+
+    def __init__(self, job, prefix):
+        threading.Thread.__init__(self)
+        self.job = job
+        self.prefix = prefix
+        self.result = None
+
+    def run(self):
+        """Renders the given job, encapsulating all processing errors and
+        exceptions.
+
+        This does not affect the job entry in the database in any way. It's the
+        responsibility of the caller to do maintain the job status in the
+        database.
+
+        Returns one of the RESULT_ constants.
+        """
+
+        LOG.info("Rendering job #%d '%s'..." % (self.job.id, 
self.job.maptitle))
+
+        if self.job.administrative_city is None:
+            bbox = BoundingBox(self.job.lat_upper_left,
+                               self.job.lon_upper_left,
+                               self.job.lat_bottom_right,
+                               self.job.lon_bottom_right)
+            renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH,
+                                 map_areas_prefix=self.prefix,
+                                 boundingbox=bbox,
+                                 language=self.job.map_language)
+        else:
+            renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH,
+                                 map_areas_prefix=self.prefix,
+                                 osmid=self.job.administrative_osmid,
+                                 language=self.job.map_language)
+
+        prefix = os.path.join(RENDERING_RESULT_PATH, self.job.files_prefix())
+
+        try:
+            # Render the map in all RENDERING_RESULT_FORMATS
+            result = renderer.render_map_into_files(self.job.maptitle, prefix,
+                                                    RENDERING_RESULT_FORMATS,
+                                                    'zoom:16')
+
+            # Render the index in all RENDERING_RESULT_FORMATS, using the
+            # same map size.
+            renderer.render_index(self.job.maptitle, prefix,
+                                  RENDERING_RESULT_FORMATS,
+                                  result.width, result.height)
+
+            # Create thumbnail
+            if 'png' in RENDERING_RESULT_FORMATS:
+                img = Image.open(prefix + '.png')
+                img.thumbnail((200, 200), Image.ANTIALIAS)
+                img.save(prefix + '_small.png')
+
+            self.result = RESULT_SUCCESS
+            LOG.info("Finished rendering of job #%d." % self.job.id)
+        except KeyboardInterrupt:
+            self.result = RESULT_KEYBOARD_INTERRUPT
+            LOG.info("Rendering of job #%d interrupted!" % self.job.id)
+        except:
+            self.result = RESULT_RENDERING_EXCEPTION
+            LOG.info("Rendering of job #%d failed (exception occurred)!" %
+                     self.job.id)
+
+        # Remove the job files if the rendering was not successful.
+        if self.result:
+            self.job.remove_all_files()
+
+        return self.result
 
-    try:
-        # Render the map in all RENDERING_RESULT_FORMATS
-        result = renderer.render_map_into_files(job.maptitle, prefix,
-                                                RENDERING_RESULT_FORMATS,
-                                                'zoom:16')
-
-        # Render the index in all RENDERING_RESULT_FORMATS, using the
-        # same map size.
-        renderer.render_index(job.maptitle, prefix, RENDERING_RESULT_FORMATS,
-                              result.width, result.height)
-
-        # Create thumbnail
-        if 'png' in RENDERING_RESULT_FORMATS:
-            img = Image.open(prefix + '.png')
-            img.thumbnail((200, 200), Image.ANTIALIAS)
-            img.save(prefix + '_small.png')
-
-        return 0
-    except KeyboardInterrupt:
-        return 1
-    except:
-        return 2
 
 if __name__ == '__main__':
     def usage():
-        sys.stderr.write('usage: %s <jobid>' % sys.argv[0])
+        sys.stderr.write('usage: %s <jobid> [timeout]\n' % sys.argv[0])
 
-    if len(sys.argv) != 2:
+    if len(sys.argv) < 2 or len(sys.argv) > 3:
         usage()
         sys.exit(3)
 
     try:
         jobid = int(sys.argv[1])
         job = MapRenderingJob.objects.get(id=jobid)
+
         if job:
-            sys.exit(render_job(job, 'renderer_%d' % os.getpid()))
+            prefix = 'renderer_%d_' % os.getpid()
+            if len(sys.argv) == 3:
+                renderer = TimingOutJobRenderer(job, int(sys.argv[2]), prefix)
+            else:
+                renderer = JobRenderer(job, prefix)
+
+            sys.exit(renderer.run())
         else:
             sys.stderr.write('Job #%d not found!' % jobid)
             sys.exit(4)
-- 
1.6.3.3.277.g88938c





reply via email to

[Prev in Thread] Current Thread [Next in Thread]