← Back to team overview

dhis2-devs team mailing list archive

[Branch ~dhis2-devs-core/dhis2/trunk] Rev 9845: Analytics, improved performance of table population and index generation. Instead of defining tas...

 

Merge authors:
  Lars Helge Øverland (larshelge)
------------------------------------------------------------
revno: 9845 [merge]
committer: Lars Helge Øverland <larshelge@xxxxxxxxx>
branch nick: dhis2
timestamp: Tue 2013-02-19 13:24:11 +0100
message:
  Analytics, improved performance of table population and index generation. Instead of defining tasks per process up front, now creating a concurrent queue with all tasks and having each process polling new tasks continuously. This gives more stable usage of cpus.
added:
  dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/AnalyticsIndex.java
modified:
  dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/AnalyticsTableManager.java
  dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/AbstractJdbcTableManager.java
  dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/DefaultAnalyticsTableService.java
  dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/JdbcAnalyticsTableManager.java
  dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/JdbcCompletenessTableManager.java
  dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/JdbcCompletenessTargetTableManager.java


--
lp:dhis2
https://code.launchpad.net/~dhis2-devs-core/dhis2/trunk

Your team DHIS 2 developers is subscribed to branch lp:dhis2.
To unsubscribe from this branch go to https://code.launchpad.net/~dhis2-devs-core/dhis2/trunk/+edit-subscription
=== added file 'dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/AnalyticsIndex.java'
--- dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/AnalyticsIndex.java	1970-01-01 00:00:00 +0000
+++ dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/AnalyticsIndex.java	2013-02-19 10:56:48 +0000
@@ -0,0 +1,65 @@
+package org.hisp.dhis.analytics;
+
+public class AnalyticsIndex
+{
+    private String table;
+    
+    private String column;
+
+    public AnalyticsIndex( String table, String column )
+    {
+        this.table = table;
+        this.column = column;
+    }
+    
+    public String getTable()
+    {
+        return table;
+    }
+
+    public void setTable( String table )
+    {
+        this.table = table;
+    }
+
+    public String getColumn()
+    {
+        return column;
+    }
+
+    public void setColumn( String column )
+    {
+        this.column = column;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + column.hashCode();
+        result = prime * result + table.hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals( Object object )
+    {
+        if ( this == object )
+        {
+            return true;
+        }
+        if ( object == null )
+        {
+            return false;
+        }
+        if ( getClass() != object.getClass() )
+        {
+            return false;
+        }
+        
+        AnalyticsIndex other = (AnalyticsIndex) object;
+        
+        return column.equals( other.column ) && table.equals( other.table );
+    }
+}

=== modified file 'dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/AnalyticsTableManager.java'
--- dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/AnalyticsTableManager.java	2013-01-25 10:33:17 +0000
+++ dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/AnalyticsTableManager.java	2013-02-19 11:54:49 +0000
@@ -30,10 +30,9 @@
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 
-import org.hisp.dhis.period.Period;
-
 public interface AnalyticsTableManager
 {
     public static final String TABLE_TEMP_SUFFIX = "_temp";
@@ -61,10 +60,9 @@
      * Creates single indexes on the given columns of the analytics table with
      * the given name.
      * 
-     * @param tableName the name of the table to create indexes on.
-     * @param columns the columns to create single indexes for.
+     * @param indexes
      */
-    Future<?> createIndexesAsync( String tableName, List<String> columns );
+    Future<?> createIndexesAsync( ConcurrentLinkedQueue<AnalyticsIndex> indexes );
     
     /**
      * Attempts to drop analytics table, then rename temporary table to analytics
@@ -78,10 +76,9 @@
      * Copies and denormalizes rows from data value table into analytics table.
      * The data range is based on the start date of the data value row.
      * 
-     * @param tableName the name of the analytics table.
-     * @param period the data period for which to populate the table.
+     * @param tables
      */
-    Future<?> populateTableAsync( String tableName, Period period );    
+    Future<?> populateTableAsync( ConcurrentLinkedQueue<String> tables );    
 
     /**
      * Returns a list of string arrays in where the first index holds the database
@@ -129,17 +126,17 @@
      * organisation unit level column values to null for the levels above the
      * given aggregation level.
      * 
-     * @param tableName the name of the analytics table.
+     * @param tables
      * @param dataElements the data element uids to apply aggregation levels for.
      * @param aggregationLevel the aggregation level.
      */
-    void applyAggregationLevels( String tableName, Collection<String> dataElements, int aggregationLevel );
+    Future<?> applyAggregationLevels( ConcurrentLinkedQueue<String> tables, Collection<String> dataElements, int aggregationLevel );
     
     /**
      * Performs vacuum or optimization of the given table. The type of operation
      * performed is dependent on the underlying DBMS.
      * 
-     * @param tableName the name of the analytics table.
+     * @param tables
      */
-    Future<?> vacuumTableAsync( String tableName );
+    Future<?> vacuumTablesAsync( ConcurrentLinkedQueue<String> tables );
 }

=== modified file 'dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/AbstractJdbcTableManager.java'
--- dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/AbstractJdbcTableManager.java	2013-01-26 08:20:27 +0000
+++ dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/AbstractJdbcTableManager.java	2013-02-19 11:54:49 +0000
@@ -29,10 +29,12 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.hisp.dhis.analytics.AnalyticsIndex;
 import org.hisp.dhis.analytics.AnalyticsTableManager;
 import org.hisp.dhis.common.CodeGenerator;
 import org.hisp.dhis.dataelement.DataElementService;
@@ -80,21 +82,26 @@
     }
     
     @Async
-    public Future<?> createIndexesAsync( String tableName, List<String> columns )
+    public Future<?> createIndexesAsync( ConcurrentLinkedQueue<AnalyticsIndex> indexes )
     {
-        for ( String column : columns )
-        {        
-            final String index = PREFIX_INDEX + column + "_" + tableName + "_" + CodeGenerator.generateCode();
-            
-            final String sql = "create index " + index + " on " + tableName + " (" + column + ")";
+        taskLoop : while ( true )
+        {
+            AnalyticsIndex inx = indexes.poll();
+            
+            if ( inx == null )
+            {
+                break taskLoop;
+            }
+            
+            final String index = PREFIX_INDEX + inx.getColumn() + "_" + inx.getTable() + "_" + CodeGenerator.generateCode();
+            
+            final String sql = "create index " + index + " on " + inx.getTable() + " (" + inx.getColumn() + ")";
                 
             executeSilently( sql );
             
             log.info( "Created index: " + index );
         }
         
-        log.info( "Indexes created" );
-        
         return null;
     }
 
@@ -148,13 +155,23 @@
     }
 
     @Async
-    public Future<?> vacuumTableAsync( String tableName )
+    public Future<?> vacuumTablesAsync( ConcurrentLinkedQueue<String> tables )
     {
-        final String sql = statementBuilder.getVacuum( tableName );
-        
-        log.info( "Vacuum SQL: " + sql );
-        
-        jdbcTemplate.execute( sql );
+        taskLoop : while ( true )
+        {
+            String table = tables.poll();
+            
+            if ( table == null )
+            {
+                break taskLoop;
+            }
+            
+            final String sql = statementBuilder.getVacuum( table );
+            
+            log.info( "Vacuum SQL: " + sql );
+            
+            jdbcTemplate.execute( sql );
+        }
         
         return null;
     }

=== modified file 'dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/DefaultAnalyticsTableService.java'
--- dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/DefaultAnalyticsTableService.java	2013-02-19 09:29:27 +0000
+++ dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/DefaultAnalyticsTableService.java	2013-02-19 11:54:49 +0000
@@ -32,19 +32,19 @@
 import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.hisp.dhis.analytics.AnalyticsIndex;
 import org.hisp.dhis.analytics.AnalyticsTableManager;
 import org.hisp.dhis.analytics.AnalyticsTableService;
 import org.hisp.dhis.common.IdentifiableObjectUtils;
 import org.hisp.dhis.dataelement.DataElementService;
 import org.hisp.dhis.organisationunit.OrganisationUnitService;
-import org.hisp.dhis.period.Period;
 import org.hisp.dhis.system.util.Clock;
 import org.hisp.dhis.system.util.ConcurrentUtils;
-import org.hisp.dhis.system.util.PaginatedList;
 import org.hisp.dhis.system.util.SystemUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Async;
@@ -128,25 +128,16 @@
     
     private void populateTables( List<String> tables )
     {
-        List<List<String>> tablePages = new PaginatedList<String>( tables ).setPageSize( getProcessNo() ).getPages();
-        
-        log.info( "No of table pages: " + tablePages.size() );
-        
-        for ( List<String> tablePage : tablePages )
+        ConcurrentLinkedQueue<String> tableQ = new ConcurrentLinkedQueue<String>( tables );
+        
+        List<Future<?>> futures = new ArrayList<Future<?>>();
+        
+        for ( int i = 0; i < getProcessNo(); i++ )
         {
-            log.info( "Table page: " + tablePage );
-            
-            List<Future<?>> futures = new ArrayList<Future<?>>();
-            
-            for ( String table : tablePage )
-            {
-                Period period = PartitionUtils.getPeriod( table );
-                
-                futures.add( tableManager.populateTableAsync( table, period ) );
-            }
-            
-            ConcurrentUtils.waitForCompletion( futures );
+            futures.add( tableManager.populateTableAsync( tableQ ) );
         }
+        
+        ConcurrentUtils.waitForCompletion( futures );
     }
     
     private void pruneTables( List<String> tables )
@@ -166,55 +157,69 @@
     {
         int maxLevels = organisationUnitService.getMaxOfOrganisationUnitLevels();
         
-        for ( int i = 0; i < maxLevels; i++ )
+        levelLoop : for ( int i = 0; i < maxLevels; i++ )
         {
             int level = maxLevels - i;
             
             Collection<String> dataElements = IdentifiableObjectUtils.getUids( 
                 dataElementService.getDataElementsByAggregationLevel( level ) );
             
-            if ( !dataElements.isEmpty() )
-            {
-                for ( String table : tables )
-                {
-                    tableManager.applyAggregationLevels( table, dataElements, level );
-                }
-            }
+            if ( dataElements.isEmpty() )
+            {
+                continue levelLoop;
+            }
+                        
+            ConcurrentLinkedQueue<String> tableQ = new ConcurrentLinkedQueue<String>( tables );
+
+            List<Future<?>> futures = new ArrayList<Future<?>>();
+            
+            for ( int j = 0; j < getProcessNo(); j++ )
+            {
+                futures.add( tableManager.applyAggregationLevels( tableQ, dataElements, level ) );
+            }
+
+            ConcurrentUtils.waitForCompletion( futures );
         }
     }
     
     private void createIndexes( List<String> tables )
     {
+        ConcurrentLinkedQueue<AnalyticsIndex> indexes = new ConcurrentLinkedQueue<AnalyticsIndex>();
+        
+        List<String> columns = tableManager.getDimensionColumnNames();
+        
         for ( String table : tables )
         {
-            List<Future<?>> futures = new ArrayList<Future<?>>();
-    
-            List<List<String>> columnPages = new PaginatedList<String>( tableManager.getDimensionColumnNames() ).setNumberOfPages( getProcessNo() ).getPages();
-            
-            for ( List<String> columnPage : columnPages )
+            for ( String column : columns )
             {
-                futures.add( tableManager.createIndexesAsync( table, columnPage ) );
+                indexes.add( new AnalyticsIndex( table, column ) );
             }
-            
-            ConcurrentUtils.waitForCompletion( futures );
-        }
+        }
+        
+        log.info( "No of indexes: " + indexes.size() );
+        
+        List<Future<?>> futures = new ArrayList<Future<?>>();
+
+        for ( int i = 0; i < getProcessNo(); i++ )
+        {
+            futures.add( tableManager.createIndexesAsync( indexes ) );
+        }
+
+        ConcurrentUtils.waitForCompletion( futures );
     }
 
     private void vacuumTables( List<String> tables )
     {
-        List<List<String>> tablePages = new PaginatedList<String>( tables ).setPageSize( getProcessNo() ).getPages();
-        
-        for ( List<String> tablePage : tablePages )
+        ConcurrentLinkedQueue<String> tableQ = new ConcurrentLinkedQueue<String>( tables );
+        
+        List<Future<?>> futures = new ArrayList<Future<?>>();
+        
+        for ( int i = 0; i < getProcessNo(); i++ )
         {
-            List<Future<?>> futures = new ArrayList<Future<?>>();
-            
-            for ( String table : tablePage )
-            {
-                futures.add( tableManager.vacuumTableAsync( table ) );
-            }
-            
-            ConcurrentUtils.waitForCompletion( futures );
+            tableManager.vacuumTablesAsync( tableQ );
         }
+        
+        ConcurrentUtils.waitForCompletion( futures );        
     }
     
     private void swapTables( List<String> tables )

=== modified file 'dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/JdbcAnalyticsTableManager.java'
--- dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/JdbcAnalyticsTableManager.java	2013-02-19 09:29:27 +0000
+++ dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/JdbcAnalyticsTableManager.java	2013-02-19 11:54:49 +0000
@@ -34,6 +34,7 @@
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 
 import org.hisp.dhis.analytics.DataQueryParams;
@@ -97,17 +98,29 @@
     }
     
     @Async
-    public Future<?> populateTableAsync( String tableName, Period period )
+    public Future<?> populateTableAsync( ConcurrentLinkedQueue<String> tables )
     {
-        Date startDate = period.getStartDate();
-        Date endDate = period.getEndDate();
-        
-        populateTable( tableName, startDate, endDate, "cast(dv.value as double precision)", "int", "dv.value != ''" );
-        
-        populateTable( tableName, startDate, endDate, "1" , "bool", "dv.value = 'true'" );
-
-        populateTable( tableName, startDate, endDate, "0" , "bool", "dv.value = 'false'" );
-        
+        taskLoop : while ( true )
+        {
+            String table = tables.poll();
+                
+            if ( table == null )
+            {
+                break taskLoop;
+            }
+            
+            Period period = PartitionUtils.getPeriod( table );
+            
+            Date startDate = period.getStartDate();
+            Date endDate = period.getEndDate();
+            
+            populateTable( table, startDate, endDate, "cast(dv.value as double precision)", "int", "dv.value != ''" );
+            
+            populateTable( table, startDate, endDate, "1" , "bool", "dv.value = 'true'" );
+    
+            populateTable( table, startDate, endDate, "0" , "bool", "dv.value = 'false'" );
+        }
+    
         return null;
     }
     
@@ -221,26 +234,39 @@
         return jdbcTemplate.queryForObject( sql, Date.class );
     }
     
-    public void applyAggregationLevels( String tableName, Collection<String> dataElements, int aggregationLevel )
+    @Async
+    public Future<?> applyAggregationLevels( ConcurrentLinkedQueue<String> tables, Collection<String> dataElements, int aggregationLevel )
     {
-        StringBuilder sql = new StringBuilder( "update " + tableName + " set " );
-        
-        for ( int i = 0; i < aggregationLevel; i++ )
+        taskLoop : while ( true )
         {
-            int level = i + 1;
-            
-            String column = DataQueryParams.LEVEL_PREFIX + level;
-            
-            sql.append( column + " = null," );
+            String table = tables.poll();
+                
+            if ( table == null )
+            {
+                break taskLoop;
+            }
+            
+            StringBuilder sql = new StringBuilder( "update " + table + " set " );
+            
+            for ( int i = 0; i < aggregationLevel; i++ )
+            {
+                int level = i + 1;
+                
+                String column = DataQueryParams.LEVEL_PREFIX + level;
+                
+                sql.append( column + " = null," );
+            }
+            
+            sql.deleteCharAt( sql.length() - ",".length() );
+            
+            sql.append( " where level > " + aggregationLevel );
+            sql.append( " and de in (" + getQuotedCommaDelimitedString( dataElements ) + ")" );
+            
+            log.info( "Aggregation level SQL: " + sql.toString() );
+            
+            jdbcTemplate.execute( sql.toString() );
         }
-        
-        sql.deleteCharAt( sql.length() - ",".length() );
-        
-        sql.append( " where level > " + aggregationLevel );
-        sql.append( " and de in (" + getQuotedCommaDelimitedString( dataElements ) + ")" );
-        
-        log.info( "Aggregation level SQL: " + sql.toString() );
-        
-        jdbcTemplate.execute( sql.toString() );
+
+        return null;
     }
 }

=== modified file 'dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/JdbcCompletenessTableManager.java'
--- dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/JdbcCompletenessTableManager.java	2013-01-31 10:59:22 +0000
+++ dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/JdbcCompletenessTableManager.java	2013-02-19 11:54:49 +0000
@@ -31,6 +31,7 @@
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 
 import org.hisp.dhis.organisationunit.OrganisationUnitGroupSet;
@@ -69,46 +70,58 @@
     }
     
     @Async
-    public Future<?> populateTableAsync( String tableName, Period period )
+    public Future<?> populateTableAsync( ConcurrentLinkedQueue<String> tables )
     {
-        final String start = DateUtils.getMediumDateString( period.getStartDate() );
-        final String end = DateUtils.getMediumDateString( period.getEndDate() );
-        
-        String insert = "insert into " + tableName + " (";
-        
-        for ( String[] col : getDimensionColumns() )
-        {
-            insert += col[0] + ",";
-        }
-        
-        insert += "value) ";
-        
-        String select = "select ";
-        
-        for ( String[] col : getDimensionColumns() )
-        {
-            select += col[2] + ",";
-        }
-        
-        select = select.replace( "organisationunitid", "sourceid" ); // Legacy fix TODO remove
-        
-        select += 
-            "cdr.date as value " +
-            "from completedatasetregistration cdr " +
-            "left join _organisationunitgroupsetstructure ougs on cdr.sourceid=ougs.organisationunitid " +
-            "left join _orgunitstructure ous on cdr.sourceid=ous.organisationunitid " +
-            "left join _periodstructure ps on cdr.periodid=ps.periodid " +
-            "left join period pe on cdr.periodid=pe.periodid " +
-            "left join dataset ds on cdr.datasetid=ds.datasetid " +
-            "where pe.startdate >= '" + start + "' " +
-            "and pe.startdate <= '" + end + "'" +
-            "and cdr.date is not null";
-
-        final String sql = insert + select;
-        
-        log.info( "Populate SQL: "+ sql );
-        
-        jdbcTemplate.execute( sql );
+        taskLoop : while ( true )
+        {
+            String table = tables.poll();
+                
+            if ( table == null )
+            {
+                break taskLoop;
+            }
+            
+            Period period = PartitionUtils.getPeriod( table );
+            
+            final String start = DateUtils.getMediumDateString( period.getStartDate() );
+            final String end = DateUtils.getMediumDateString( period.getEndDate() );
+        
+            String insert = "insert into " + table + " (";
+            
+            for ( String[] col : getDimensionColumns() )
+            {
+                insert += col[0] + ",";
+            }
+            
+            insert += "value) ";
+            
+            String select = "select ";
+            
+            for ( String[] col : getDimensionColumns() )
+            {
+                select += col[2] + ",";
+            }
+            
+            select = select.replace( "organisationunitid", "sourceid" ); // Legacy fix TODO remove
+            
+            select += 
+                "cdr.date as value " +
+                "from completedatasetregistration cdr " +
+                "left join _organisationunitgroupsetstructure ougs on cdr.sourceid=ougs.organisationunitid " +
+                "left join _orgunitstructure ous on cdr.sourceid=ous.organisationunitid " +
+                "left join _periodstructure ps on cdr.periodid=ps.periodid " +
+                "left join period pe on cdr.periodid=pe.periodid " +
+                "left join dataset ds on cdr.datasetid=ds.datasetid " +
+                "where pe.startdate >= '" + start + "' " +
+                "and pe.startdate <= '" + end + "'" +
+                "and cdr.date is not null";
+    
+            final String sql = insert + select;
+            
+            log.info( "Populate SQL: "+ sql );
+            
+            jdbcTemplate.execute( sql );
+        }
         
         return null;
     }
@@ -165,9 +178,10 @@
         
         return jdbcTemplate.queryForObject( sql, Date.class );
     }
-    
-    public void applyAggregationLevels( String tableName, Collection<String> dataElements, int aggregationLevel )
+
+    @Async
+    public Future<?> applyAggregationLevels( ConcurrentLinkedQueue<String> tables, Collection<String> dataElements, int aggregationLevel )
     {
-        // Not relevant
+        return null; // Not relevant
     }
 }

=== modified file 'dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/JdbcCompletenessTargetTableManager.java'
--- dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/JdbcCompletenessTargetTableManager.java	2013-01-25 10:33:17 +0000
+++ dhis-2/dhis-services/dhis-service-analytics/src/main/java/org/hisp/dhis/analytics/table/JdbcCompletenessTargetTableManager.java	2013-02-19 11:54:49 +0000
@@ -31,11 +31,11 @@
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 
 import org.hisp.dhis.organisationunit.OrganisationUnitGroupSet;
 import org.hisp.dhis.organisationunit.OrganisationUnitLevel;
-import org.hisp.dhis.period.Period;
 import org.springframework.scheduling.annotation.Async;
 
 public class JdbcCompletenessTargetTableManager
@@ -67,35 +67,45 @@
     }
 
     @Async
-    public Future<?> populateTableAsync( String tableName, Period period )
+    public Future<?> populateTableAsync( ConcurrentLinkedQueue<String> tables )
     {
-        String sql = "insert into " + tableName + " (";
-
-        for ( String[] col : getDimensionColumns() )
-        {
-            sql += col[0] + ",";
-        }
-
-        sql = sql.substring( 0, sql.length() - 1 );
-        
-        sql += ") select ";
-
-        for ( String[] col : getDimensionColumns() )
-        {
-            sql += col[2] + ",";
-        }
-        
-        sql = sql.substring( 0, sql.length() - 1 ) + " ";
-        
-        sql +=
-            "from datasetsource dss " +
-            "left join dataset ds on dss.datasetid=ds.datasetid " +
-            "left join _orgunitstructure ous on dss.sourceid=ous.organisationunitid " +
-            "left join _organisationunitgroupsetstructure ougs on dss.sourceid=ougs.organisationunitid";            
-
-        log.info( "Populate SQL: "+ sql );
-        
-        jdbcTemplate.execute( sql );
+        taskLoop : while ( true )
+        {
+            String table = tables.poll();
+                
+            if ( table == null )
+            {
+                break taskLoop;
+            }
+            
+            String sql = "insert into " + table + " (";
+    
+            for ( String[] col : getDimensionColumns() )
+            {
+                sql += col[0] + ",";
+            }
+    
+            sql = sql.substring( 0, sql.length() - 1 );
+            
+            sql += ") select ";
+    
+            for ( String[] col : getDimensionColumns() )
+            {
+                sql += col[2] + ",";
+            }
+            
+            sql = sql.substring( 0, sql.length() - 1 ) + " ";
+            
+            sql +=
+                "from datasetsource dss " +
+                "left join dataset ds on dss.datasetid=ds.datasetid " +
+                "left join _orgunitstructure ous on dss.sourceid=ous.organisationunitid " +
+                "left join _organisationunitgroupsetstructure ougs on dss.sourceid=ougs.organisationunitid";            
+    
+            log.info( "Populate SQL: "+ sql );
+            
+            jdbcTemplate.execute( sql );
+        }
         
         return null;
     }
@@ -140,8 +150,9 @@
         return null; // Not relevant
     }
 
-    public void applyAggregationLevels( String tableName, Collection<String> dataElements, int aggregationLevel )
+    @Async
+    public Future<?> applyAggregationLevels( ConcurrentLinkedQueue<String> tables, Collection<String> dataElements, int aggregationLevel )
     {
-        // Not relevant
+        return null; // Not relevant
     }
 }