← Back to team overview

dhis2-devs team mailing list archive

[Branch ~dhis2-devs-core/dhis2/trunk] Rev 4056: Data mart: Processing of aggregated data element and indicator values now takes place in parallel...

 

------------------------------------------------------------
revno: 4056
committer: Lars Helge Overland <larshelge@xxxxxxxxx>
branch nick: dhis2
timestamp: Fri 2011-07-01 13:20:25 +0200
message:
  Data mart: Processing of aggregated data element and indicator values now takes place in parallel processes (split up on periods). This will enable better utilization of processing power of multi-core processors and make aggregation performance scalable on available processor cores / hardware. Total processing time on benchmark database down 30 % on my dual-core laptop. Currently number of processes are fixed to 2, will implement a system setting for this and provide more test reports on servers with more cores.
added:
  dhis-2/dhis-support/dhis-support-system/src/main/java/org/hisp/dhis/system/util/ConcurrentUtils.java
modified:
  dhis-2/dhis-api/src/main/java/org/hisp/dhis/period/Period.java
  dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DataElementDataMart.java
  dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DefaultDataElementDataMart.java
  dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/engine/DefaultDataMartEngine.java
  dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/DefaultIndicatorDataMart.java
  dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/IndicatorDataMart.java


--
lp:dhis2
https://code.launchpad.net/~dhis2-devs-core/dhis2/trunk

Your team DHIS 2 developers is subscribed to branch lp:dhis2.
To unsubscribe from this branch go to https://code.launchpad.net/~dhis2-devs-core/dhis2/trunk/+edit-subscription
=== modified file 'dhis-2/dhis-api/src/main/java/org/hisp/dhis/period/Period.java'
--- dhis-2/dhis-api/src/main/java/org/hisp/dhis/period/Period.java	2011-05-05 21:14:56 +0000
+++ dhis-2/dhis-api/src/main/java/org/hisp/dhis/period/Period.java	2011-07-01 11:20:25 +0000
@@ -231,7 +231,7 @@
     @Override
     public String toString()
     {
-        return "[" + periodType.getName() + ": " + startDate + " - " + endDate + "]";
+        return "[" + id + " " + periodType.getName() + ": " + startDate + " - " + endDate + "]";
     }
 
     // -------------------------------------------------------------------------

=== modified file 'dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DataElementDataMart.java'
--- dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DataElementDataMart.java	2011-07-01 07:12:30 +0000
+++ dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DataElementDataMart.java	2011-07-01 11:20:25 +0000
@@ -28,6 +28,7 @@
  */
 
 import java.util.Collection;
+import java.util.concurrent.Future;
 
 import org.hisp.dhis.dataelement.DataElementOperand;
 import org.hisp.dhis.datamart.DataElementOperandList;
@@ -39,6 +40,6 @@
  */
 public interface DataElementDataMart
 {
-    void exportDataValues( Collection<DataElementOperand> operands, Collection<Period> periods, 
+    Future<?> exportDataValues( Collection<DataElementOperand> operands, Collection<Period> periods, 
         Collection<OrganisationUnit> organisationUnits, DataElementOperandList operandList, String key );
 }

=== modified file 'dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DefaultDataElementDataMart.java'
--- dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DefaultDataElementDataMart.java	2011-07-01 07:12:30 +0000
+++ dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DefaultDataElementDataMart.java	2011-07-01 11:20:25 +0000
@@ -34,6 +34,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.Future;
 
 import org.amplecode.quick.BatchHandler;
 import org.amplecode.quick.BatchHandlerFactory;
@@ -50,6 +51,7 @@
 import org.hisp.dhis.organisationunit.OrganisationUnitHierarchy;
 import org.hisp.dhis.organisationunit.OrganisationUnitService;
 import org.hisp.dhis.period.Period;
+import org.springframework.scheduling.annotation.Async;
 
 /**
  * @author Lars Helge Overland
@@ -132,7 +134,8 @@
     // DataMart functionality
     // -------------------------------------------------------------------------
     
-    public void exportDataValues( Collection<DataElementOperand> operands, Collection<Period> periods, 
+    @Async
+    public Future<?> exportDataValues( Collection<DataElementOperand> operands, Collection<Period> periods, 
         Collection<OrganisationUnit> organisationUnits, DataElementOperandList operandList, String key )
     {
         final BatchHandler<AggregatedDataValue> batchHandler = batchHandlerFactory.createBatchHandler( AggregatedDataValueBatchHandler.class ).init();
@@ -199,5 +202,7 @@
         batchHandler.flush();
         
         cacheHandler.flush();
+
+        return null;
     }
 }

=== modified file 'dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/engine/DefaultDataMartEngine.java'
--- dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/engine/DefaultDataMartEngine.java	2011-07-01 07:37:16 +0000
+++ dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/engine/DefaultDataMartEngine.java	2011-07-01 11:20:25 +0000
@@ -32,6 +32,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Future;
 
 import org.hisp.dhis.aggregation.AggregatedDataValueService;
 import org.hisp.dhis.common.ProcessState;
@@ -52,7 +53,9 @@
 import org.hisp.dhis.period.Period;
 import org.hisp.dhis.period.PeriodService;
 import org.hisp.dhis.system.util.Clock;
+import org.hisp.dhis.system.util.ConcurrentUtils;
 import org.hisp.dhis.system.util.ConversionUtils;
+import org.hisp.dhis.system.util.PaginatedList;
 import org.springframework.transaction.annotation.Transactional;
 
 /**
@@ -61,6 +64,8 @@
 public class DefaultDataMartEngine
     implements DataMartEngine
 {
+    private static final int THREAD_NO = 2;
+    
     // -------------------------------------------------------------------------
     // Dependencies
     // -------------------------------------------------------------------------
@@ -150,7 +155,7 @@
     public void export( Collection<Integer> dataElementIds, Collection<Integer> indicatorIds,
         Collection<Integer> periodIds, Collection<Integer> organisationUnitIds, boolean useIndexes, ProcessState state )
     {
-        Clock clock = new Clock().startClock().logTime( "Data mart export process started" );
+        Clock clock = new Clock().startClock().logTime( "Data mart export process started" + " 2" );
         
         // ---------------------------------------------------------------------
         // Get objects
@@ -211,8 +216,6 @@
         // Create aggregated data cache
         // ---------------------------------------------------------------------
 
-        DataElementOperandList operandList = new DataElementOperandList( indicatorOperands );
-
         crossTabService.createAggregatedDataCache( indicatorOperands, key );
         
         clock.logTime( "Created aggregated data cache" );
@@ -243,10 +246,19 @@
 
         state.setMessage( "exporting_data_for_data_elements" );
 
+        List<List<Period>> periodPages = new PaginatedList<Period>( periods ).setNumberOfPages( THREAD_NO ).getPages();
+        
         if ( allOperands.size() > 0 )
         {
-            dataElementDataMart.exportDataValues( allOperands, periods, organisationUnits, operandList, key );
+            List<Future<?>> futures = new ArrayList<Future<?>>();
+            
+            for ( List<Period> periodPage : periodPages )
+            {
+                futures.add( dataElementDataMart.exportDataValues( allOperands, periodPage, organisationUnits, new DataElementOperandList( indicatorOperands ), key ) );
+            }
 
+            ConcurrentUtils.waitForCompletion( futures );
+            
             clock.logTime( "Exported values for data element operands (" + allOperands.size() + ")" );
         }
 
@@ -266,8 +278,15 @@
 
         if ( isIndicators )
         {
-            indicatorDataMart.exportIndicatorValues( indicators, periods, organisationUnits, indicatorOperands, key );
-
+            List<Future<?>> futures = new ArrayList<Future<?>>();
+
+            for ( List<Period> periodPage : periodPages )
+            {
+                futures.add( indicatorDataMart.exportIndicatorValues( indicators, periodPage, organisationUnits, indicatorOperands, key ) );
+            }
+
+            ConcurrentUtils.waitForCompletion( futures );
+            
             clock.logTime( "Exported values for indicators (" + indicators.size() + ")" );
         }
 

=== modified file 'dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/DefaultIndicatorDataMart.java'
--- dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/DefaultIndicatorDataMart.java	2011-07-01 07:12:30 +0000
+++ dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/DefaultIndicatorDataMart.java	2011-07-01 11:20:25 +0000
@@ -35,6 +35,7 @@
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.amplecode.quick.BatchHandler;
 import org.amplecode.quick.BatchHandlerFactory;
@@ -53,6 +54,7 @@
 import org.hisp.dhis.period.Period;
 import org.hisp.dhis.period.PeriodType;
 import org.hisp.dhis.system.util.DateUtils;
+import org.springframework.scheduling.annotation.Async;
 
 /**
  * @author Lars Helge Overland
@@ -116,8 +118,9 @@
     // -------------------------------------------------------------------------
     // IndicatorDataMart implementation
     // -------------------------------------------------------------------------
-    
-    public void exportIndicatorValues( final Collection<Indicator> indicators, final Collection<Period> periods, 
+
+    @Async
+    public Future<?> exportIndicatorValues( final Collection<Indicator> indicators, final Collection<Period> periods, 
         final Collection<OrganisationUnit> organisationUnits, final Collection<DataElementOperand> operands, String key )
     {
         final BatchHandler<AggregatedIndicatorValue> batchHandler = batchHandlerFactory.createBatchHandler( AggregatedIndicatorValueBatchHandler.class ).init();
@@ -181,6 +184,8 @@
         }
         
         batchHandler.flush();
+        
+        return null;
     }
     
     // -------------------------------------------------------------------------

=== modified file 'dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/IndicatorDataMart.java'
--- dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/IndicatorDataMart.java	2011-07-01 07:12:30 +0000
+++ dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/IndicatorDataMart.java	2011-07-01 11:20:25 +0000
@@ -28,6 +28,7 @@
  */
 
 import java.util.Collection;
+import java.util.concurrent.Future;
 
 import org.hisp.dhis.dataelement.DataElementOperand;
 import org.hisp.dhis.indicator.Indicator;
@@ -39,6 +40,6 @@
  */
 public interface IndicatorDataMart
 {
-    void exportIndicatorValues( Collection<Indicator> indicators, Collection<Period> periods, 
+    Future<?> exportIndicatorValues( Collection<Indicator> indicators, Collection<Period> periods, 
         Collection<OrganisationUnit> organisationUnits, Collection<DataElementOperand> operands, String key );
 }

=== added file 'dhis-2/dhis-support/dhis-support-system/src/main/java/org/hisp/dhis/system/util/ConcurrentUtils.java'
--- dhis-2/dhis-support/dhis-support-system/src/main/java/org/hisp/dhis/system/util/ConcurrentUtils.java	1970-01-01 00:00:00 +0000
+++ dhis-2/dhis-support/dhis-support-system/src/main/java/org/hisp/dhis/system/util/ConcurrentUtils.java	2011-07-01 11:20:25 +0000
@@ -0,0 +1,57 @@
+package org.hisp.dhis.system.util;
+
+/*
+ * Copyright (c) 2004-2010, University of Oslo
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright notice, this
+ *   list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * * Neither the name of the HISP project nor the names of its contributors may
+ *   be used to endorse or promote products derived from this software without
+ *   specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import java.util.Collection;
+import java.util.concurrent.Future;
+
+/**
+ * @author Lars Helge Overland
+ */
+public class ConcurrentUtils
+{
+    /**
+     * Blocks and waits for all Futures in the given collection to complete.
+     * 
+     * @param futures the collection of Futures.
+     */
+    public static void waitForCompletion( Collection<Future<?>> futures )
+    {
+        for ( Future<?> future : futures )
+        {
+            try
+            {
+                future.get();
+            }
+            catch ( Exception ex )
+            {
+                throw new RuntimeException( ex );
+            }
+        }
+    }
+}