dhis2-devs team mailing list archive
-
dhis2-devs team
-
Mailing list archive
-
Message #38140
[Branch ~dhis2-devs-core/dhis2/trunk] Rev 19473: Modified ADX import to use pipe with finite buffer size and timeout
------------------------------------------------------------
revno: 19473
committer: Bob Jolliffe <bobjolliffe@xxxxxxxxx>
branch nick: dhis2
timestamp: Fri 2015-06-19 15:55:47 +0100
message:
Modified ADX import to use pipe with finite buffer size and timeout
added:
dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/datavalueset/PipedImporter.java
modified:
dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/adx/DefaultADXDataService.java
dhis-2/dhis-services/dhis-service-dxf2/src/test/java/org/hisp/dhis/dxf2/adx/DefaultADXDataServiceTest.java
dhis-2/dhis-services/dhis-service-dxf2/src/test/resources/adx/adx_data_sample1.xml
--
lp:dhis2
https://code.launchpad.net/~dhis2-devs-core/dhis2/trunk
Your team DHIS 2 developers is subscribed to branch lp:dhis2.
To unsubscribe from this branch go to https://code.launchpad.net/~dhis2-devs-core/dhis2/trunk/+edit-subscription
=== modified file 'dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/adx/DefaultADXDataService.java'
--- dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/adx/DefaultADXDataService.java 2015-06-18 14:35:18 +0000
+++ dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/adx/DefaultADXDataService.java 2015-06-19 14:55:47 +0000
@@ -31,43 +31,33 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.StringWriter;
-import java.nio.charset.StandardCharsets;
+import java.io.PipedOutputStream;
import java.util.HashMap;
import java.util.Map;
-
-import org.apache.commons.io.IOUtils;
-
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-
-import org.w3c.dom.Document;
-
+import javax.xml.stream.XMLOutputFactory;
import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
-
import javax.xml.stream.XMLStreamException;
-
import org.amplecode.staxwax.factory.XMLFactory;
import org.hisp.dhis.dxf2.common.ImportOptions;
import org.hisp.dhis.dxf2.datavalueset.DataExportParams;
import org.hisp.dhis.dxf2.datavalueset.DataValueSetService;
import org.hisp.dhis.dxf2.importsummary.ImportSummaries;
import org.amplecode.staxwax.reader.XMLReader;
-
import javax.xml.stream.XMLStreamReader;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-
+import javax.xml.stream.XMLStreamWriter;
+import org.hisp.dhis.dxf2.datavalueset.PipedImporter;
import org.hisp.dhis.dxf2.importsummary.ImportStatus;
import org.hisp.dhis.dxf2.importsummary.ImportSummary;
import org.hisp.dhis.period.Period;
import org.springframework.beans.factory.annotation.Autowired;
-import org.w3c.dom.Element;
/**
*
@@ -79,6 +69,17 @@
@Autowired
protected DataValueSetService dataValueSetService;
+ protected ExecutorService executor;
+
+ public static final int PIPE_BUFFER_SIZE = 4096;
+
+ public static final int TOTAL_MINUTES_TO_WAIT = 5;
+
+ public void setDataValueSetService( DataValueSetService dataValueSetService )
+ {
+ this.dataValueSetService = dataValueSetService;
+ }
+
@Override
public void getData( DataExportParams params, OutputStream out )
{
@@ -89,39 +90,30 @@
public ImportSummaries postData( InputStream in, ImportOptions importOptions )
throws IOException
{
- XMLReader reader = XMLFactory.getXMLReader( in );
+
+ XMLReader adxReader = XMLFactory.getXMLReader( in );
ImportSummaries importSummaries = new ImportSummaries();
- reader.moveToStartElement( ADXConstants.ROOT, ADXConstants.NAMESPACE );
-
- while ( reader.moveToStartElement( ADXConstants.GROUP, ADXConstants.NAMESPACE ) )
+ adxReader.moveToStartElement( ADXConstants.ROOT, ADXConstants.NAMESPACE );
+
+ // TODO: inject this?
+ executor = Executors.newSingleThreadExecutor();
+
+ while ( adxReader.moveToStartElement( ADXConstants.GROUP, ADXConstants.NAMESPACE ) )
{
- DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
- DocumentBuilder docBuilder;
- try
+ try (PipedOutputStream pipeOut = new PipedOutputStream())
{
- docBuilder = docFactory.newDocumentBuilder();
- Document dxf = docBuilder.newDocument();
+ Future<ImportSummary> futureImportSummary;
+ futureImportSummary = executor.submit(new PipedImporter( dataValueSetService, importOptions, pipeOut ) );
+ XMLOutputFactory factory = XMLOutputFactory.newInstance();
+ XMLStreamWriter dxfWriter = factory.createXMLStreamWriter( pipeOut );
+ parseADXGroupToDxf( adxReader, dxfWriter );
+ pipeOut.flush();
- // buld a dxf2 datavalueset document from each adx group
- parseADXGroupToDxf( reader, dxf );
-
- // write the document to String
- DOMSource source = new DOMSource( dxf );
- StringWriter writer = new StringWriter();
- TransformerFactory transformerFactory = TransformerFactory.newInstance();
- Transformer transformer = transformerFactory.newTransformer();
-
- StreamResult result = new StreamResult(writer);
- transformer.transform( source, result );
- // create an inputstream for the String
- InputStream dxfIn = IOUtils.toInputStream( result.toString(), StandardCharsets.UTF_8 );
-
- // pass off to the dxf2 datavalueset service
- importSummaries.addImportSummary( dataValueSetService.saveDataValueSet( dxfIn, importOptions ) );
+ importSummaries.addImportSummary( futureImportSummary.get( TOTAL_MINUTES_TO_WAIT, TimeUnit.SECONDS ) );
}
- catch ( Exception ex )
+ catch ( IOException | XMLStreamException | InterruptedException | ExecutionException | TimeoutException ex )
{
ImportSummary importSummary = new ImportSummary();
importSummary.setStatus( ImportStatus.ERROR );
@@ -134,43 +126,46 @@
return importSummaries;
}
- protected void parseADXGroupToDxf( XMLReader reader, Document dxf ) throws XMLStreamException
+ protected void parseADXGroupToDxf( XMLReader adxReader, XMLStreamWriter dxfWriter ) throws XMLStreamException
{
- Element root = dxf.createElementNS( "http://dhis2.org/schema/dxf/2.0", "dataValueSet" );
-
- Map<String, String> groupAttributes = readAttributes( reader );
+ dxfWriter.writeStartDocument( "1.0" );
+ dxfWriter.writeStartElement( "dataValueSet" );
+ dxfWriter.writeDefaultNamespace( "http://dhis2.org/schema/dxf/2.0" );
+
+ Map<String, String> groupAttributes = readAttributes( adxReader );
String periodStr = groupAttributes.get( ADXConstants.PERIOD );
groupAttributes.remove( ADXConstants.PERIOD );
Period period = ADXPeriod.parse( periodStr );
- root.setAttribute( "period", period.getIsoDate() );
- // pass through the remaining attributes to dxf
- for ( String attribute : groupAttributes.keySet() )
- {
- root.setAttribute( attribute, groupAttributes.get( attribute ) );
- }
-
- dxf.appendChild( root );
+ dxfWriter.writeAttribute( "period", period.getIsoDate() );
- while ( reader.moveToStartElement( ADXConstants.DATAVALUE, ADXConstants.GROUP ) )
- {
- parseADXDataValueToDxf( reader, dxf );
- }
+ // pass through the remaining attributes to dxf
+ for ( String attribute : groupAttributes.keySet() )
+ {
+ dxfWriter.writeAttribute( attribute, groupAttributes.get( attribute ) );
+ }
+
+ while ( adxReader.moveToStartElement( ADXConstants.DATAVALUE, ADXConstants.GROUP ) )
+ {
+ parseADXDataValueToDxf( adxReader, dxfWriter );
+ }
+ dxfWriter.writeEndElement();
+ dxfWriter.writeEndDocument();
}
- protected void parseADXDataValueToDxf( XMLReader reader, Document dxf ) throws XMLStreamException
+ protected void parseADXDataValueToDxf( XMLReader adxReader, XMLStreamWriter dxfWriter ) throws XMLStreamException
{
- Element dv = dxf.createElementNS( "http://dhis2.org/schema/dxf/2.0","dataValue");
+ dxfWriter.writeStartElement( "dataValue" );
- Map<String, String> groupAttributes = readAttributes( reader );
+ Map<String, String> groupAttributes = readAttributes( adxReader );
// pass through the remaining attributes to dxf
for ( String attribute : groupAttributes.keySet() )
{
- dv.setAttribute( attribute, groupAttributes.get( attribute ) );
+ dxfWriter.writeAttribute( attribute, groupAttributes.get( attribute ) );
}
- dxf.getFirstChild().appendChild( dv );
+ dxfWriter.writeEndElement();
}
// TODO this should really be part of staxwax library
@@ -190,7 +185,7 @@
{
attributes.put( reader.getAttributeLocalName( i ), reader.getAttributeValue( i ) );
}
-
+
return attributes;
}
}
=== added file 'dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/datavalueset/PipedImporter.java'
--- dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/datavalueset/PipedImporter.java 1970-01-01 00:00:00 +0000
+++ dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/datavalueset/PipedImporter.java 2015-06-19 14:55:47 +0000
@@ -0,0 +1,83 @@
+package org.hisp.dhis.dxf2.datavalueset;
+
+/*
+ * Copyright (c) 2004-2015, University of Oslo
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * Neither the name of the HISP project nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.Callable;
+import org.hisp.dhis.dxf2.common.ImportOptions;
+import org.hisp.dhis.dxf2.importsummary.ImportStatus;
+import org.hisp.dhis.dxf2.importsummary.ImportSummary;
+
+/**
+ *
+ * @author bobj
+ */
+public class PipedImporter
+ implements Callable<ImportSummary>
+{
+
+ public static final int PIPE_BUFFER_SIZE = 4096;
+
+ public static final int TOTAL_MINUTES_TO_WAIT = 5;
+
+ protected PipedInputStream pipeIn;
+
+ private final DataValueSetService dataValueSetService;
+
+ private final ImportOptions importOptions;
+
+ public PipedImporter( DataValueSetService dataValueSetService, ImportOptions importOptions, PipedOutputStream pipeOut )
+ throws IOException
+ {
+ this.dataValueSetService = dataValueSetService;
+ pipeIn = new PipedInputStream( pipeOut, PIPE_BUFFER_SIZE );
+ this.importOptions = importOptions;
+ }
+
+ @Override
+ public ImportSummary call() throws Exception
+ {
+ ImportSummary result = null;
+ try
+ {
+ result = dataValueSetService.saveDataValueSet( pipeIn, importOptions );
+ }
+ catch ( Exception ex )
+ {
+ result = new ImportSummary();
+ result.setStatus( ImportStatus.ERROR );
+ result.setDescription( "Exception: " + ex.getMessage());
+ }
+ pipeIn.close();
+ return result;
+ }
+
+}
=== modified file 'dhis-2/dhis-services/dhis-service-dxf2/src/test/java/org/hisp/dhis/dxf2/adx/DefaultADXDataServiceTest.java'
--- dhis-2/dhis-services/dhis-service-dxf2/src/test/java/org/hisp/dhis/dxf2/adx/DefaultADXDataServiceTest.java 2015-06-18 12:59:02 +0000
+++ dhis-2/dhis-services/dhis-service-dxf2/src/test/java/org/hisp/dhis/dxf2/adx/DefaultADXDataServiceTest.java 2015-06-19 14:55:47 +0000
@@ -28,89 +28,57 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
-import java.io.IOException;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-import org.amplecode.staxwax.factory.XMLFactory;
-import org.amplecode.staxwax.reader.XMLReader;
-import org.junit.After;
-import org.junit.AfterClass;
-import static org.junit.Assert.fail;
-import org.junit.Before;
-import org.junit.BeforeClass;
+import java.io.InputStream;
+import org.hisp.dhis.DhisSpringTest;
+import org.hisp.dhis.datavalue.DataValue;
+import org.hisp.dhis.dxf2.datavalueset.DataValueSetService;
+import org.hisp.dhis.dxf2.importsummary.ImportSummaries;
+import org.hisp.dhis.jdbc.batchhandler.DataValueBatchHandler;
+import org.hisp.dhis.mock.batchhandler.MockBatchHandler;
+import org.hisp.dhis.mock.batchhandler.MockBatchHandlerFactory;
+import static org.junit.Assert.assertEquals;
import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
-import org.w3c.dom.Document;
/**
*
* @author bobj
*/
public class DefaultADXDataServiceTest
+ extends DhisSpringTest
{
+ @Autowired
+ private DataValueSetService dataValueSetService;
+
+ @Autowired
+ private ADXDataService adxDataService;
+
protected static final String SIMPLE_ADX_SAMPLE = "adx/adx_data_sample1.xml";
- public DefaultADXDataServiceTest()
- {
- }
-
- @BeforeClass
- public static void setUpClass()
- {
- }
-
- @AfterClass
- public static void tearDownClass()
- {
- }
-
- @Before
- public void setUp() throws IOException
- {
- }
-
- @After
- public void tearDown()
- {
- }
-
- /**
- * Test of parseADXGroup method, of class DefaultADXDataService.
- */
+ private MockBatchHandler<DataValue> mockDataValueBatchHandler = null;
+
+ private MockBatchHandlerFactory mockBatchHandlerFactory = null;
+
+ @Override
+ public void setUpTest()
+ {
+ mockDataValueBatchHandler = new MockBatchHandler<>();
+ mockBatchHandlerFactory = new MockBatchHandlerFactory();
+ mockBatchHandlerFactory.registerBatchHandler( DataValueBatchHandler.class, mockDataValueBatchHandler );
+ setDependency( dataValueSetService, "batchHandlerFactory", mockBatchHandlerFactory );
+ setDependency( adxDataService,"dataValueSetService", dataValueSetService );
+ }
+
@Test
- public void testParseADXGroup() throws Exception
+ public void testpostData() throws Exception
{
- try
- {
- XMLReader reader = XMLFactory.getXMLReader( new ClassPathResource( SIMPLE_ADX_SAMPLE ).getInputStream() );
- reader.moveToStartElement( ADXConstants.ROOT, ADXConstants.NAMESPACE );
-
- DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
- DocumentBuilder docBuilder;
- docBuilder = docFactory.newDocumentBuilder();
-
- DefaultADXDataService instance = new DefaultADXDataService();
-
- while ( reader.moveToStartElement( ADXConstants.GROUP, ADXConstants.NAMESPACE ) )
- {
- Document dxf = docBuilder.newDocument();
- instance.parseADXGroupToDxf( reader, dxf );
-
- TransformerFactory transformerFactory = TransformerFactory.newInstance();
- Transformer transformer = transformerFactory.newTransformer();
- DOMSource source = new DOMSource( dxf );
- StreamResult result = new StreamResult( System.out );
-
- transformer.transform( source, result );
- }
- }
- catch ( Exception ex )
- {
- fail( ex.toString() );
- }
+ InputStream in = new ClassPathResource( SIMPLE_ADX_SAMPLE ).getInputStream();
+
+ ImportSummaries importSummaries = adxDataService.postData(in, null);
+
+ assertEquals(importSummaries.getImportSummaries().size(), 2);
+ // only testing this far .. summaries are full of conflicts for now ...
}
+
}
=== modified file 'dhis-2/dhis-services/dhis-service-dxf2/src/test/resources/adx/adx_data_sample1.xml'
--- dhis-2/dhis-services/dhis-service-dxf2/src/test/resources/adx/adx_data_sample1.xml 2015-06-18 12:42:19 +0000
+++ dhis-2/dhis-services/dhis-service-dxf2/src/test/resources/adx/adx_data_sample1.xml 2015-06-19 14:55:47 +0000
@@ -17,7 +17,7 @@
<dataValue dataElement="MAL04" value="10" categoryOptionCombo="McDonalds"/>
</group>
- <group orgUnit="342" period="2015-01-01/P1M" >
+ <group dataSet="Malaria" orgUnit="342" period="2015-01-01/P1M" >
<dataValue dataElement="MAL01" value="32" />
<dataValue dataElement="MAL02" value="20" />
<dataValue dataElement="MAL03" value="0" >