← Back to team overview

dhis2-devs team mailing list archive

[Branch ~dhis2-devs-core/dhis2/trunk] Rev 19473: Modified ADX import to use pipe with finite buffer size and timeout

 

------------------------------------------------------------
revno: 19473
committer: Bob Jolliffe <bobjolliffe@xxxxxxxxx>
branch nick: dhis2
timestamp: Fri 2015-06-19 15:55:47 +0100
message:
  Modified ADX import to use pipe with finite buffer size and timeout
added:
  dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/datavalueset/PipedImporter.java
modified:
  dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/adx/DefaultADXDataService.java
  dhis-2/dhis-services/dhis-service-dxf2/src/test/java/org/hisp/dhis/dxf2/adx/DefaultADXDataServiceTest.java
  dhis-2/dhis-services/dhis-service-dxf2/src/test/resources/adx/adx_data_sample1.xml


--
lp:dhis2
https://code.launchpad.net/~dhis2-devs-core/dhis2/trunk

Your team DHIS 2 developers is subscribed to branch lp:dhis2.
To unsubscribe from this branch go to https://code.launchpad.net/~dhis2-devs-core/dhis2/trunk/+edit-subscription
=== modified file 'dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/adx/DefaultADXDataService.java'
--- dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/adx/DefaultADXDataService.java	2015-06-18 14:35:18 +0000
+++ dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/adx/DefaultADXDataService.java	2015-06-19 14:55:47 +0000
@@ -31,43 +31,33 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.StringWriter;
-import java.nio.charset.StandardCharsets;
+import java.io.PipedOutputStream;
 import java.util.HashMap;
 import java.util.Map;
-
-import org.apache.commons.io.IOUtils;
-
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-
-import org.w3c.dom.Document;
-
+import javax.xml.stream.XMLOutputFactory;
 import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
-
 import javax.xml.stream.XMLStreamException;
-
 import org.amplecode.staxwax.factory.XMLFactory;
 import org.hisp.dhis.dxf2.common.ImportOptions;
 import org.hisp.dhis.dxf2.datavalueset.DataExportParams;
 import org.hisp.dhis.dxf2.datavalueset.DataValueSetService;
 import org.hisp.dhis.dxf2.importsummary.ImportSummaries;
 import org.amplecode.staxwax.reader.XMLReader;
-
 import javax.xml.stream.XMLStreamReader;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-
+import javax.xml.stream.XMLStreamWriter;
+import org.hisp.dhis.dxf2.datavalueset.PipedImporter;
 import org.hisp.dhis.dxf2.importsummary.ImportStatus;
 import org.hisp.dhis.dxf2.importsummary.ImportSummary;
 import org.hisp.dhis.period.Period;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.w3c.dom.Element;
 
 /**
  *
@@ -79,6 +69,17 @@
     @Autowired
     protected DataValueSetService dataValueSetService;
 
+    protected ExecutorService executor;
+
+    public static final int PIPE_BUFFER_SIZE = 4096;
+
+    public static final int TOTAL_MINUTES_TO_WAIT = 5;
+
+    public void setDataValueSetService( DataValueSetService dataValueSetService )
+    {
+        this.dataValueSetService = dataValueSetService;
+    }
+
     @Override
     public void getData( DataExportParams params, OutputStream out )
     {
@@ -89,39 +90,30 @@
     public ImportSummaries postData( InputStream in, ImportOptions importOptions )
         throws IOException
     {
-        XMLReader reader = XMLFactory.getXMLReader( in );
+
+        XMLReader adxReader = XMLFactory.getXMLReader( in );
 
         ImportSummaries importSummaries = new ImportSummaries();
 
-        reader.moveToStartElement( ADXConstants.ROOT, ADXConstants.NAMESPACE );
-
-        while ( reader.moveToStartElement( ADXConstants.GROUP, ADXConstants.NAMESPACE ) )
+        adxReader.moveToStartElement( ADXConstants.ROOT, ADXConstants.NAMESPACE );
+
+        // TODO: inject this?
+        executor = Executors.newSingleThreadExecutor();
+
+        while ( adxReader.moveToStartElement( ADXConstants.GROUP, ADXConstants.NAMESPACE ) )
         {
-            DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
-            DocumentBuilder docBuilder;
-            try
+            try (PipedOutputStream pipeOut = new PipedOutputStream())
             {
-                docBuilder = docFactory.newDocumentBuilder();
-                Document dxf = docBuilder.newDocument();
+                Future<ImportSummary> futureImportSummary;
+                futureImportSummary = executor.submit(new PipedImporter( dataValueSetService, importOptions, pipeOut ) );
+                XMLOutputFactory factory = XMLOutputFactory.newInstance();
+                XMLStreamWriter dxfWriter = factory.createXMLStreamWriter( pipeOut );
+                parseADXGroupToDxf( adxReader, dxfWriter );
+                pipeOut.flush();
 
-                // buld a dxf2 datavalueset document from each adx group
-                parseADXGroupToDxf( reader, dxf );
-                
-                // write the document to String
-                DOMSource source = new DOMSource( dxf );
-                StringWriter writer = new StringWriter();
-                TransformerFactory transformerFactory = TransformerFactory.newInstance();
-                Transformer transformer = transformerFactory.newTransformer();
-                
-                StreamResult result = new StreamResult(writer);
-                transformer.transform( source, result );
-                // create an inputstream for the String
-                InputStream dxfIn = IOUtils.toInputStream( result.toString(), StandardCharsets.UTF_8 );
-                
-                // pass off to the dxf2 datavalueset service
-                importSummaries.addImportSummary( dataValueSetService.saveDataValueSet( dxfIn, importOptions ) );
+                importSummaries.addImportSummary( futureImportSummary.get( TOTAL_MINUTES_TO_WAIT, TimeUnit.SECONDS ) );
             } 
-            catch ( Exception ex )
+            catch ( IOException | XMLStreamException | InterruptedException | ExecutionException | TimeoutException ex )
             {
                 ImportSummary importSummary = new ImportSummary();
                 importSummary.setStatus( ImportStatus.ERROR );
@@ -134,43 +126,46 @@
         return importSummaries;
     }
 
-    protected void parseADXGroupToDxf( XMLReader reader, Document dxf ) throws XMLStreamException
+    protected void parseADXGroupToDxf( XMLReader adxReader, XMLStreamWriter dxfWriter ) throws XMLStreamException
     {
-        Element root = dxf.createElementNS( "http://dhis2.org/schema/dxf/2.0";, "dataValueSet" );
-        
-        Map<String, String> groupAttributes = readAttributes( reader );
+        dxfWriter.writeStartDocument( "1.0" );
+        dxfWriter.writeStartElement( "dataValueSet" );
+        dxfWriter.writeDefaultNamespace( "http://dhis2.org/schema/dxf/2.0"; );
+
+        Map<String, String> groupAttributes = readAttributes( adxReader );
 
         String periodStr = groupAttributes.get( ADXConstants.PERIOD );
         groupAttributes.remove( ADXConstants.PERIOD );
 
         Period period = ADXPeriod.parse( periodStr );
-        root.setAttribute( "period", period.getIsoDate() );
-        // pass through the remaining attributes to dxf
-        for ( String attribute : groupAttributes.keySet() )
-        {
-            root.setAttribute( attribute, groupAttributes.get( attribute ) );
-        }
-
-        dxf.appendChild( root );
+        dxfWriter.writeAttribute( "period", period.getIsoDate() );
         
-        while ( reader.moveToStartElement( ADXConstants.DATAVALUE, ADXConstants.GROUP ) )
-        {
-            parseADXDataValueToDxf( reader, dxf );
-        }
+        // pass through the remaining attributes to dxf
+        for ( String attribute : groupAttributes.keySet() )
+        {
+            dxfWriter.writeAttribute( attribute, groupAttributes.get( attribute ) );
+        }
+
+        while ( adxReader.moveToStartElement( ADXConstants.DATAVALUE, ADXConstants.GROUP ) )
+        {
+            parseADXDataValueToDxf( adxReader, dxfWriter );
+        }
+        dxfWriter.writeEndElement();
+        dxfWriter.writeEndDocument();
     }
 
-    protected void parseADXDataValueToDxf( XMLReader reader, Document dxf ) throws XMLStreamException
+    protected void parseADXDataValueToDxf( XMLReader adxReader, XMLStreamWriter dxfWriter ) throws XMLStreamException
     {
-        Element dv = dxf.createElementNS( "http://dhis2.org/schema/dxf/2.0","dataValue";);
+        dxfWriter.writeStartElement( "dataValue" );
 
-        Map<String, String> groupAttributes = readAttributes( reader );
+        Map<String, String> groupAttributes = readAttributes( adxReader );
 
         // pass through the remaining attributes to dxf
         for ( String attribute : groupAttributes.keySet() )
         {
-            dv.setAttribute( attribute, groupAttributes.get( attribute ) );
+            dxfWriter.writeAttribute( attribute, groupAttributes.get( attribute ) );
         }
-        dxf.getFirstChild().appendChild( dv );
+        dxfWriter.writeEndElement();
     }
 
     // TODO  this should really be part of staxwax library
@@ -190,7 +185,7 @@
         {
             attributes.put( reader.getAttributeLocalName( i ), reader.getAttributeValue( i ) );
         }
-        
+
         return attributes;
     }
 }

=== added file 'dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/datavalueset/PipedImporter.java'
--- dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/datavalueset/PipedImporter.java	1970-01-01 00:00:00 +0000
+++ dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/datavalueset/PipedImporter.java	2015-06-19 14:55:47 +0000
@@ -0,0 +1,83 @@
+package org.hisp.dhis.dxf2.datavalueset;
+
+/*
+ * Copyright (c) 2004-2015, University of Oslo
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * Neither the name of the HISP project nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.Callable;
+import org.hisp.dhis.dxf2.common.ImportOptions;
+import org.hisp.dhis.dxf2.importsummary.ImportStatus;
+import org.hisp.dhis.dxf2.importsummary.ImportSummary;
+
+/**
+ *
+ * @author bobj
+ */
+public class PipedImporter
+    implements Callable<ImportSummary>
+{
+
+    public static final int PIPE_BUFFER_SIZE = 4096;
+
+    public static final int TOTAL_MINUTES_TO_WAIT = 5;
+
+    protected PipedInputStream pipeIn;
+
+    private final DataValueSetService dataValueSetService;
+
+    private final ImportOptions importOptions;
+
+    public PipedImporter( DataValueSetService dataValueSetService, ImportOptions importOptions, PipedOutputStream pipeOut )
+        throws IOException
+    {
+        this.dataValueSetService = dataValueSetService;
+        pipeIn = new PipedInputStream( pipeOut, PIPE_BUFFER_SIZE );
+        this.importOptions = importOptions;
+    }
+
+    @Override
+    public ImportSummary call() throws Exception
+    {
+        ImportSummary result = null;
+        try
+        {
+            result = dataValueSetService.saveDataValueSet( pipeIn, importOptions );
+        } 
+        catch ( Exception ex )
+        {
+            result = new ImportSummary();
+            result.setStatus( ImportStatus.ERROR );
+            result.setDescription( "Exception: " + ex.getMessage());
+        }
+        pipeIn.close();
+        return result;
+    }
+
+}

=== modified file 'dhis-2/dhis-services/dhis-service-dxf2/src/test/java/org/hisp/dhis/dxf2/adx/DefaultADXDataServiceTest.java'
--- dhis-2/dhis-services/dhis-service-dxf2/src/test/java/org/hisp/dhis/dxf2/adx/DefaultADXDataServiceTest.java	2015-06-18 12:59:02 +0000
+++ dhis-2/dhis-services/dhis-service-dxf2/src/test/java/org/hisp/dhis/dxf2/adx/DefaultADXDataServiceTest.java	2015-06-19 14:55:47 +0000
@@ -28,89 +28,57 @@
  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
-import java.io.IOException;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-import org.amplecode.staxwax.factory.XMLFactory;
-import org.amplecode.staxwax.reader.XMLReader;
-import org.junit.After;
-import org.junit.AfterClass;
-import static org.junit.Assert.fail;
-import org.junit.Before;
-import org.junit.BeforeClass;
+import java.io.InputStream;
+import org.hisp.dhis.DhisSpringTest;
+import org.hisp.dhis.datavalue.DataValue;
+import org.hisp.dhis.dxf2.datavalueset.DataValueSetService;
+import org.hisp.dhis.dxf2.importsummary.ImportSummaries;
+import org.hisp.dhis.jdbc.batchhandler.DataValueBatchHandler;
+import org.hisp.dhis.mock.batchhandler.MockBatchHandler;
+import org.hisp.dhis.mock.batchhandler.MockBatchHandlerFactory;
+import static org.junit.Assert.assertEquals;
 import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.io.ClassPathResource;
-import org.w3c.dom.Document;
 
 /**
  *
  * @author bobj
  */
 public class DefaultADXDataServiceTest
+    extends DhisSpringTest
 {
+    @Autowired
+    private DataValueSetService dataValueSetService;
+
+    @Autowired
+    private ADXDataService adxDataService;
+
     protected static final String SIMPLE_ADX_SAMPLE = "adx/adx_data_sample1.xml";
 
-    public DefaultADXDataServiceTest()
-    {
-    }
-
-    @BeforeClass
-    public static void setUpClass()
-    {
-    }
-
-    @AfterClass
-    public static void tearDownClass()
-    {
-    }
-
-    @Before
-    public void setUp() throws IOException
-    {
-    }
-
-    @After
-    public void tearDown()
-    {
-    }
-
-    /**
-     * Test of parseADXGroup method, of class DefaultADXDataService.
-     */
+    private MockBatchHandler<DataValue> mockDataValueBatchHandler = null;
+
+    private MockBatchHandlerFactory mockBatchHandlerFactory = null;
+
+    @Override
+    public void setUpTest()
+    {
+        mockDataValueBatchHandler = new MockBatchHandler<>();
+        mockBatchHandlerFactory = new MockBatchHandlerFactory();
+        mockBatchHandlerFactory.registerBatchHandler( DataValueBatchHandler.class, mockDataValueBatchHandler );
+        setDependency( dataValueSetService, "batchHandlerFactory", mockBatchHandlerFactory );
+        setDependency( adxDataService,"dataValueSetService", dataValueSetService );
+    }
+
     @Test
-    public void testParseADXGroup() throws Exception
+    public void testpostData() throws Exception
     {
-        try
-        {
-            XMLReader reader = XMLFactory.getXMLReader( new ClassPathResource( SIMPLE_ADX_SAMPLE ).getInputStream() );
-            reader.moveToStartElement( ADXConstants.ROOT, ADXConstants.NAMESPACE );
-
-            DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
-            DocumentBuilder docBuilder;
-            docBuilder = docFactory.newDocumentBuilder();
-
-            DefaultADXDataService instance = new DefaultADXDataService();
-
-            while ( reader.moveToStartElement( ADXConstants.GROUP, ADXConstants.NAMESPACE ) )
-            {
-                Document dxf = docBuilder.newDocument();
-                instance.parseADXGroupToDxf( reader, dxf );
-
-                TransformerFactory transformerFactory = TransformerFactory.newInstance();
-                Transformer transformer = transformerFactory.newTransformer();
-                DOMSource source = new DOMSource( dxf );
-                StreamResult result = new StreamResult( System.out );
-
-                transformer.transform( source, result );
-            }
-        } 
-        catch ( Exception ex )
-        {
-            fail( ex.toString() );
-        }
+        InputStream in = new ClassPathResource( SIMPLE_ADX_SAMPLE ).getInputStream();
+        
+        ImportSummaries importSummaries =  adxDataService.postData(in, null);
+        
+        assertEquals(importSummaries.getImportSummaries().size(), 2);
+        // only testing this far .. summaries are full of conflicts for now ...
     }
+
 }

=== modified file 'dhis-2/dhis-services/dhis-service-dxf2/src/test/resources/adx/adx_data_sample1.xml'
--- dhis-2/dhis-services/dhis-service-dxf2/src/test/resources/adx/adx_data_sample1.xml	2015-06-18 12:42:19 +0000
+++ dhis-2/dhis-services/dhis-service-dxf2/src/test/resources/adx/adx_data_sample1.xml	2015-06-19 14:55:47 +0000
@@ -17,7 +17,7 @@
         <dataValue dataElement="MAL04" value="10" categoryOptionCombo="McDonalds"/>
     </group>
     
-    <group orgUnit="342" period="2015-01-01/P1M" >
+    <group dataSet="Malaria" orgUnit="342" period="2015-01-01/P1M" >
         <dataValue dataElement="MAL01" value="32" />
         <dataValue dataElement="MAL02" value="20" />
         <dataValue dataElement="MAL03" value="0" >