← Back to team overview

openerp-community-reviewer team mailing list archive

[Merge] lp:~camptocamp/carriers-deliveries/7.0-threaded-dispatch-label-generation into lp:carriers-deliveries

 

Yannick Vaucher @ Camptocamp has proposed merging lp:~camptocamp/carriers-deliveries/7.0-threaded-dispatch-label-generation into lp:carriers-deliveries with lp:~camptocamp/carriers-deliveries/7.0-delivery_carrier_label_dispatch-output-file-yvr as a prerequisite.

Commit message:
delivery_carrier_label_dispatch - Improve efficiency of label generation by threading it. For instance for postlogistics, using serialized calls to the SOAP web service is a really slow as it would wait for a response before sending the next request.

Requested reviews:
  Stock and Logistic Core Editors (stock-logistic-core-editors)

For more details, see:
https://code.launchpad.net/~camptocamp/carriers-deliveries/7.0-threaded-dispatch-label-generation/+merge/215184
-- 
https://code.launchpad.net/~camptocamp/carriers-deliveries/7.0-threaded-dispatch-label-generation/+merge/215184
Your team Stock and Logistic Core Editors is requested to review the proposed merge of lp:~camptocamp/carriers-deliveries/7.0-threaded-dispatch-label-generation into lp:carriers-deliveries.
=== modified file 'delivery_carrier_label_dispatch/__openerp__.py'
--- delivery_carrier_label_dispatch/__openerp__.py	2014-04-10 13:58:38 +0000
+++ delivery_carrier_label_dispatch/__openerp__.py	2014-04-10 13:58:38 +0000
@@ -37,6 +37,20 @@
 
 If you don't define your pack it will be considered a picking is a single pack.
 
+
+Tips
+----
+For picking dispatch with huge number of labels to generate your can add a
+number of worker with ir.config_parameter `shipping_label.num_workers` to
+parallelize the generation on multiple workers.
+This can be really useful for exemple using PostLogistics web service to
+gain speed.
+
+Be careful not to set too many worker as each one will need to instanciate
+a cursor on database and this could generate PoolErrors.
+A good choice would be to set it as `db_maxconn` / number_of_worker / 2
+>>>>>>> MERGE-SOURCE
+
 Contributors
 ------------
 

=== modified file 'delivery_carrier_label_dispatch/wizard/generate_labels.py'
--- delivery_carrier_label_dispatch/wizard/generate_labels.py	2014-04-10 13:58:38 +0000
+++ delivery_carrier_label_dispatch/wizard/generate_labels.py	2014-04-10 13:58:38 +0000
@@ -20,13 +20,21 @@
 ##############################################################################
 from operator import attrgetter
 from itertools import groupby
+import sys
+import Queue
+import threading
+import logging
 
+from openerp import pooler
 from openerp.osv import orm, fields
 from openerp.tools.translate import _
 
 from ..pdf_utils import assemble_pdf
 
 
+_logger = logging.getLogger(__name__)
+
+
 class DeliveryCarrierLabelGenerate(orm.TransientModel):
 
     _name = 'delivery.carrier.label.generate'
@@ -85,37 +93,139 @@
             return None
         return label_obj.browse(cr, uid, label_id[0], context=context)
 
+    def _do_generate_labels(self, cr, uid, wizard, pack, picking,
+                                  label, context=None):
+        """ Generate a label in a thread safe context
+
+        Here we declare a specific cursor so do not launch
+        too many threads
+        """
+        picking_out_obj = self.pool['stock.picking.out']
+        # generate the label of the pack
+        tracking_ids = [pack.id] if pack else None
+        # create a cursor to be thread safe
+        thread_cr = pooler.get_db(cr.dbname).cursor()
+        try:
+            try:
+                picking_out_obj.generate_labels(
+                    thread_cr, uid, [picking.id],
+                    tracking_ids=tracking_ids,
+                    context=context)
+            except orm.except_orm as e:
+                # add information on picking and pack in the exception
+                picking_name = _('Picking: %s') % picking.name
+                pack_num = _('Pack: %s') % pack.name if pack else ''
+                raise orm.except_orm(
+                    e.name,
+                    _('%s %s - %s') % (picking_name, pack_num, e.value))
+            thread_cr.commit()
+        except Exception:
+            thread_cr.rollback()
+            raise
+        finally:
+            thread_cr.close()
+
+    def worker(self, q, q_except):
+        """ A worker to generate labels
+
+        Takes data from queue q
+
+        And if the worker encounters errors, he will add them in
+        q_except queue
+        """
+        while not q.empty():
+            args, kwargs = q.get()
+            try:
+                self._do_generate_labels(*args, **kwargs)
+            except Exception as e:
+                q_except.put(e)
+            finally:
+                q.task_done()
+
+    def _get_num_workers(self, cr, uid, context=None):
+        """ Get number of worker parameter for labels generation
+
+        Optional ir.config_parameter is `shipping_label.num_workers`
+        """
+        param_obj = self.pool['ir.config_parameter']
+        num_workers = param_obj.get_param(cr, uid,
+                                          'shipping_label.num_workers')
+        if not num_workers:
+            return 1
+        return int(num_workers)
+
+
     def _get_all_pdf(self, cr, uid, wizard, dispatch, context=None):
+
+        q = Queue.Queue()
+        q_except = Queue.Queue()
+
+        # create the tasks to generate labels
         for pack, moves, label in self._get_packs(cr, uid, wizard, dispatch,
                                                   context=context):
             if not label or wizard.generate_new_labels:
-                picking_out_obj = self.pool['stock.picking.out']
                 picking = moves[0].picking_id
-                # generate the label of the pack
-                if pack:
-                    tracking_ids = [pack.id]
-                else:
-                    tracking_ids = None
-                try:
-                    picking_out_obj.generate_labels(
-                        cr, uid, [picking.id],
-                        tracking_ids=tracking_ids,
-                        context=context)
-                except orm.except_orm as e:
-                    picking_name = _('Picking: %s') % picking.name
-                    pack_num = _('Pack: %s') % pack.name if pack else ''
-                    raise orm.except_orm(
-                        e.name,
-                        _('%s %s - %s') % (picking_name, pack_num, e.value))
-                if pack:
-                    label = self._find_pack_label(cr, uid, wizard, pack,
-                                                  context=context)
-                else:
-                    label = self._find_picking_label(cr, uid, wizard, picking,
-                                                     context=context)
-                if not label:
-                    continue  # no label could be generated
+                args=(cr, uid, wizard, pack, picking, label)
+                kwargs={'context': context}
+                task = (args, kwargs)
+                q.put(task)
+
+        # create few workers to parallelize label generation
+        num_workers = self._get_num_workers(cr, uid, context=context)
+        _logger.info('Starting %s workers to generate labels', num_workers)
+        for i in range(num_workers):
+            t = threading.Thread(target=self.worker, args=(q, q_except))
+            t.daemon = True
+            t.start()
+
+        # wait for all tasks to be done
+        q.join()
+
+        # We will not create a partial PDF if some labels weren't
+        # generated thus we raise catched exceptions by the workers
+        # We will try to regroup all orm exception in one
+        if not q_except.empty():
+
+            error_count = {}
+            messages = []
+            while not q_except.empty():
+                e = q_except.get()
+                if isinstance(e, orm.except_orm):
+                    if not e.name in error_count:
+                        error_count[e.name] = 1
+                    else:
+                        error_count[e.name] += 1
+                    messages.append(e.value)
+                else:
+                    # raise other exceptions like PoolError if
+                    # too many cursor where created by workers
+                    raise e
+            titles = []
+            for key, v in error_count.iteritems():
+                titles.append('%sx %s' % (v, key))
+
+            title = _('Errors while generating labels: ') + ' '.join(titles)
+            message = _('Some labels couldn\'t be generated. Please correct '
+                        'following errors and generate labels again to create '
+                        'the ones which failed.\n\n'
+                        ) + '\n'.join(messages)
+            raise orm.except_orm(title, message)
+
+        # create a new cursor to be up to date with what was created by workers
+        join_cr = pooler.get_db(cr.dbname).cursor()
+        for pack, moves, label in self._get_packs(join_cr, uid, wizard, dispatch,
+                                                  context=context):
+            picking = moves[0].picking_id
+            if pack:
+                label = self._find_pack_label(join_cr, uid, wizard, pack,
+                                              context=context)
+            else:
+                label = self._find_picking_label(join_cr, uid, wizard, picking,
+                                                 context=context)
+            if not label:
+                continue  # no label could be generated
             yield label
+        join_cr.close()
 
     def action_generate_labels(self, cr, uid, ids, context=None):
         """
@@ -128,7 +238,7 @@
         if not this.dispatch_ids:
             raise orm.except_orm(_('Error'), _('No picking dispatch selected'))
 
-        attachment_obj = self.pool.get('ir.attachment')
+        attachment_obj = self.pool['ir.attachment']
 
         for dispatch in this.dispatch_ids:
             labels = self._get_all_pdf(cr, uid, this, dispatch,


Follow ups