dhis2-devs team mailing list archive
-
dhis2-devs team
-
Mailing list archive
-
Message #12945
[Branch ~dhis2-devs-core/dhis2/trunk] Rev 4056: Data mart: Processing of aggregated data element and indicator values now takes place in parallel...
------------------------------------------------------------
revno: 4056
committer: Lars Helge Overland <larshelge@xxxxxxxxx>
branch nick: dhis2
timestamp: Fri 2011-07-01 13:20:25 +0200
message:
Data mart: Processing of aggregated data element and indicator values now takes place in parallel processes (split up on periods). This will enable better utilization of processing power of multi-core processors and make aggregation performance scalable on available processor cores / hardware. Total processing time on benchmark database down 30 % on my dual-core laptop. Currently number of processes are fixed to 2, will implement a system setting for this and provide more test reports on servers with more cores.
added:
dhis-2/dhis-support/dhis-support-system/src/main/java/org/hisp/dhis/system/util/ConcurrentUtils.java
modified:
dhis-2/dhis-api/src/main/java/org/hisp/dhis/period/Period.java
dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DataElementDataMart.java
dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DefaultDataElementDataMart.java
dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/engine/DefaultDataMartEngine.java
dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/DefaultIndicatorDataMart.java
dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/IndicatorDataMart.java
--
lp:dhis2
https://code.launchpad.net/~dhis2-devs-core/dhis2/trunk
Your team DHIS 2 developers is subscribed to branch lp:dhis2.
To unsubscribe from this branch go to https://code.launchpad.net/~dhis2-devs-core/dhis2/trunk/+edit-subscription
=== modified file 'dhis-2/dhis-api/src/main/java/org/hisp/dhis/period/Period.java'
--- dhis-2/dhis-api/src/main/java/org/hisp/dhis/period/Period.java 2011-05-05 21:14:56 +0000
+++ dhis-2/dhis-api/src/main/java/org/hisp/dhis/period/Period.java 2011-07-01 11:20:25 +0000
@@ -231,7 +231,7 @@
@Override
public String toString()
{
- return "[" + periodType.getName() + ": " + startDate + " - " + endDate + "]";
+ return "[" + id + " " + periodType.getName() + ": " + startDate + " - " + endDate + "]";
}
// -------------------------------------------------------------------------
=== modified file 'dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DataElementDataMart.java'
--- dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DataElementDataMart.java 2011-07-01 07:12:30 +0000
+++ dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DataElementDataMart.java 2011-07-01 11:20:25 +0000
@@ -28,6 +28,7 @@
*/
import java.util.Collection;
+import java.util.concurrent.Future;
import org.hisp.dhis.dataelement.DataElementOperand;
import org.hisp.dhis.datamart.DataElementOperandList;
@@ -39,6 +40,6 @@
*/
public interface DataElementDataMart
{
- void exportDataValues( Collection<DataElementOperand> operands, Collection<Period> periods,
+ Future<?> exportDataValues( Collection<DataElementOperand> operands, Collection<Period> periods,
Collection<OrganisationUnit> organisationUnits, DataElementOperandList operandList, String key );
}
=== modified file 'dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DefaultDataElementDataMart.java'
--- dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DefaultDataElementDataMart.java 2011-07-01 07:12:30 +0000
+++ dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/dataelement/DefaultDataElementDataMart.java 2011-07-01 11:20:25 +0000
@@ -34,6 +34,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.Future;
import org.amplecode.quick.BatchHandler;
import org.amplecode.quick.BatchHandlerFactory;
@@ -50,6 +51,7 @@
import org.hisp.dhis.organisationunit.OrganisationUnitHierarchy;
import org.hisp.dhis.organisationunit.OrganisationUnitService;
import org.hisp.dhis.period.Period;
+import org.springframework.scheduling.annotation.Async;
/**
* @author Lars Helge Overland
@@ -132,7 +134,8 @@
// DataMart functionality
// -------------------------------------------------------------------------
- public void exportDataValues( Collection<DataElementOperand> operands, Collection<Period> periods,
+ @Async
+ public Future<?> exportDataValues( Collection<DataElementOperand> operands, Collection<Period> periods,
Collection<OrganisationUnit> organisationUnits, DataElementOperandList operandList, String key )
{
final BatchHandler<AggregatedDataValue> batchHandler = batchHandlerFactory.createBatchHandler( AggregatedDataValueBatchHandler.class ).init();
@@ -199,5 +202,7 @@
batchHandler.flush();
cacheHandler.flush();
+
+ return null;
}
}
=== modified file 'dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/engine/DefaultDataMartEngine.java'
--- dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/engine/DefaultDataMartEngine.java 2011-07-01 07:37:16 +0000
+++ dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/engine/DefaultDataMartEngine.java 2011-07-01 11:20:25 +0000
@@ -32,6 +32,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Future;
import org.hisp.dhis.aggregation.AggregatedDataValueService;
import org.hisp.dhis.common.ProcessState;
@@ -52,7 +53,9 @@
import org.hisp.dhis.period.Period;
import org.hisp.dhis.period.PeriodService;
import org.hisp.dhis.system.util.Clock;
+import org.hisp.dhis.system.util.ConcurrentUtils;
import org.hisp.dhis.system.util.ConversionUtils;
+import org.hisp.dhis.system.util.PaginatedList;
import org.springframework.transaction.annotation.Transactional;
/**
@@ -61,6 +64,8 @@
public class DefaultDataMartEngine
implements DataMartEngine
{
+ private static final int THREAD_NO = 2;
+
// -------------------------------------------------------------------------
// Dependencies
// -------------------------------------------------------------------------
@@ -150,7 +155,7 @@
public void export( Collection<Integer> dataElementIds, Collection<Integer> indicatorIds,
Collection<Integer> periodIds, Collection<Integer> organisationUnitIds, boolean useIndexes, ProcessState state )
{
- Clock clock = new Clock().startClock().logTime( "Data mart export process started" );
+ Clock clock = new Clock().startClock().logTime( "Data mart export process started" + " 2" );
// ---------------------------------------------------------------------
// Get objects
@@ -211,8 +216,6 @@
// Create aggregated data cache
// ---------------------------------------------------------------------
- DataElementOperandList operandList = new DataElementOperandList( indicatorOperands );
-
crossTabService.createAggregatedDataCache( indicatorOperands, key );
clock.logTime( "Created aggregated data cache" );
@@ -243,10 +246,19 @@
state.setMessage( "exporting_data_for_data_elements" );
+ List<List<Period>> periodPages = new PaginatedList<Period>( periods ).setNumberOfPages( THREAD_NO ).getPages();
+
if ( allOperands.size() > 0 )
{
- dataElementDataMart.exportDataValues( allOperands, periods, organisationUnits, operandList, key );
+ List<Future<?>> futures = new ArrayList<Future<?>>();
+
+ for ( List<Period> periodPage : periodPages )
+ {
+ futures.add( dataElementDataMart.exportDataValues( allOperands, periodPage, organisationUnits, new DataElementOperandList( indicatorOperands ), key ) );
+ }
+ ConcurrentUtils.waitForCompletion( futures );
+
clock.logTime( "Exported values for data element operands (" + allOperands.size() + ")" );
}
@@ -266,8 +278,15 @@
if ( isIndicators )
{
- indicatorDataMart.exportIndicatorValues( indicators, periods, organisationUnits, indicatorOperands, key );
-
+ List<Future<?>> futures = new ArrayList<Future<?>>();
+
+ for ( List<Period> periodPage : periodPages )
+ {
+ futures.add( indicatorDataMart.exportIndicatorValues( indicators, periodPage, organisationUnits, indicatorOperands, key ) );
+ }
+
+ ConcurrentUtils.waitForCompletion( futures );
+
clock.logTime( "Exported values for indicators (" + indicators.size() + ")" );
}
=== modified file 'dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/DefaultIndicatorDataMart.java'
--- dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/DefaultIndicatorDataMart.java 2011-07-01 07:12:30 +0000
+++ dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/DefaultIndicatorDataMart.java 2011-07-01 11:20:25 +0000
@@ -35,6 +35,7 @@
import java.util.Collection;
import java.util.Map;
+import java.util.concurrent.Future;
import org.amplecode.quick.BatchHandler;
import org.amplecode.quick.BatchHandlerFactory;
@@ -53,6 +54,7 @@
import org.hisp.dhis.period.Period;
import org.hisp.dhis.period.PeriodType;
import org.hisp.dhis.system.util.DateUtils;
+import org.springframework.scheduling.annotation.Async;
/**
* @author Lars Helge Overland
@@ -116,8 +118,9 @@
// -------------------------------------------------------------------------
// IndicatorDataMart implementation
// -------------------------------------------------------------------------
-
- public void exportIndicatorValues( final Collection<Indicator> indicators, final Collection<Period> periods,
+
+ @Async
+ public Future<?> exportIndicatorValues( final Collection<Indicator> indicators, final Collection<Period> periods,
final Collection<OrganisationUnit> organisationUnits, final Collection<DataElementOperand> operands, String key )
{
final BatchHandler<AggregatedIndicatorValue> batchHandler = batchHandlerFactory.createBatchHandler( AggregatedIndicatorValueBatchHandler.class ).init();
@@ -181,6 +184,8 @@
}
batchHandler.flush();
+
+ return null;
}
// -------------------------------------------------------------------------
=== modified file 'dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/IndicatorDataMart.java'
--- dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/IndicatorDataMart.java 2011-07-01 07:12:30 +0000
+++ dhis-2/dhis-services/dhis-service-datamart-default/src/main/java/org/hisp/dhis/datamart/indicator/IndicatorDataMart.java 2011-07-01 11:20:25 +0000
@@ -28,6 +28,7 @@
*/
import java.util.Collection;
+import java.util.concurrent.Future;
import org.hisp.dhis.dataelement.DataElementOperand;
import org.hisp.dhis.indicator.Indicator;
@@ -39,6 +40,6 @@
*/
public interface IndicatorDataMart
{
- void exportIndicatorValues( Collection<Indicator> indicators, Collection<Period> periods,
+ Future<?> exportIndicatorValues( Collection<Indicator> indicators, Collection<Period> periods,
Collection<OrganisationUnit> organisationUnits, Collection<DataElementOperand> operands, String key );
}
=== added file 'dhis-2/dhis-support/dhis-support-system/src/main/java/org/hisp/dhis/system/util/ConcurrentUtils.java'
--- dhis-2/dhis-support/dhis-support-system/src/main/java/org/hisp/dhis/system/util/ConcurrentUtils.java 1970-01-01 00:00:00 +0000
+++ dhis-2/dhis-support/dhis-support-system/src/main/java/org/hisp/dhis/system/util/ConcurrentUtils.java 2011-07-01 11:20:25 +0000
@@ -0,0 +1,57 @@
+package org.hisp.dhis.system.util;
+
+/*
+ * Copyright (c) 2004-2010, University of Oslo
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * * Neither the name of the HISP project nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import java.util.Collection;
+import java.util.concurrent.Future;
+
+/**
+ * @author Lars Helge Overland
+ */
+public class ConcurrentUtils
+{
+ /**
+ * Blocks and waits for all Futures in the given collection to complete.
+ *
+ * @param futures the collection of Futures.
+ */
+ public static void waitForCompletion( Collection<Future<?>> futures )
+ {
+ for ( Future<?> future : futures )
+ {
+ try
+ {
+ future.get();
+ }
+ catch ( Exception ex )
+ {
+ throw new RuntimeException( ex );
+ }
+ }
+ }
+}