diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/concept/RestfulEndpointBasedConceptImporter.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/concept/RestfulEndpointBasedConceptImporter.java new file mode 100644 index 000000000..a227b2bfe --- /dev/null +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/concept/RestfulEndpointBasedConceptImporter.java @@ -0,0 +1,178 @@ +package eu.dnetlib.iis.wf.importer.concept; + +import static eu.dnetlib.iis.common.WorkflowRuntimeParameters.OOZIE_ACTION_OUTPUT_FILENAME; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.avro.file.DataFileWriter; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.Logger; + +import com.google.common.base.Preconditions; +import com.google.gson.Gson; + +import eu.dnetlib.iis.common.counter.NamedCounters; +import eu.dnetlib.iis.common.counter.NamedCountersFileWriter; +import eu.dnetlib.iis.common.java.PortBindings; +import eu.dnetlib.iis.common.java.Process; +import eu.dnetlib.iis.common.java.io.DataStore; +import eu.dnetlib.iis.common.java.io.FileSystemPath; +import eu.dnetlib.iis.common.java.porttype.AvroPortType; +import eu.dnetlib.iis.common.java.porttype.PortType; +import eu.dnetlib.iis.importer.schemas.Concept; +import eu.dnetlib.iis.wf.importer.concept.model.Context; +import eu.dnetlib.iis.wf.importer.concept.model.Param; +import eu.dnetlib.iis.wf.importer.facade.ContextNotFoundException; +import eu.dnetlib.iis.wf.importer.facade.ContextStreamingFacade; +import eu.dnetlib.iis.wf.importer.facade.ServiceFacadeUtils; + +/** + * RESTful endpoint based importer reading contexts provided as JSON records. + * @author mhorst + * + */ +public class RestfulEndpointBasedConceptImporter implements Process { + + public static final String PARAM_IMPORT_CONTEXT_IDS_CSV = "import.context.ids.csv"; + + protected static final String CONCEPT_COUNTER_NAME = "CONCEPT_COUNTER"; + + private static final Logger log = Logger.getLogger(RestfulEndpointBasedConceptImporter.class); + + private final NamedCountersFileWriter countersWriter = new NamedCountersFileWriter(); + + + protected static final String PORT_OUT_CONCEPTS = "concepts"; + + private final Map outputPorts = new HashMap(); + + + //------------------------ CONSTRUCTORS ------------------- + + public RestfulEndpointBasedConceptImporter() { + outputPorts.put(PORT_OUT_CONCEPTS, new AvroPortType(Concept.SCHEMA$)); + } + + //------------------------ LOGIC -------------------------- + + @Override + public Map getInputPorts() { + return Collections.emptyMap(); + } + + @Override + public Map getOutputPorts() { + return outputPorts; + } + + @Override + public void run(PortBindings portBindings, Configuration conf, + Map parameters) throws Exception { + + Preconditions.checkArgument(parameters.containsKey(PARAM_IMPORT_CONTEXT_IDS_CSV), + "unknown context identifier, required parameter '%s' is missing!", PARAM_IMPORT_CONTEXT_IDS_CSV); + String contextIdsCSV = parameters.get(PARAM_IMPORT_CONTEXT_IDS_CSV); + + try (DataFileWriter conceptWriter = getWriter(FileSystem.get(conf), portBindings)) { + + NamedCounters counters = new NamedCounters(new String[] { CONCEPT_COUNTER_NAME }); + + ContextStreamingFacade streamingFacade = ServiceFacadeUtils.instantiate(parameters); + + String[] contextIds = StringUtils.split(contextIdsCSV, ','); + + for (String contextId : contextIds) { + + if (StringUtils.isNotEmpty(contextId)) { + + try { + try (InputStream is = streamingFacade.getStream(contextId)) { + + Concept[] concepts = buildConcepts(IOUtils.toString(is, StandardCharsets.UTF_8.name())); + + for (Concept concept : concepts) { + conceptWriter.append(concept); + } + + counters.increment(CONCEPT_COUNTER_NAME, (long) concepts.length); + } + } catch (ContextNotFoundException e) { + log.warn("context not found: " + contextId, e); + } + } + } + + countersWriter.writeCounters(counters, System.getProperty(OOZIE_ACTION_OUTPUT_FILENAME)); + } + } + + /** + * Provides {@link Concept} writer consuming records. + */ + protected DataFileWriter getWriter(FileSystem fs, PortBindings portBindings) throws IOException { + return DataStore.create( + new FileSystemPath(fs, portBindings.getOutput().get(PORT_OUT_CONCEPTS)), Concept.SCHEMA$); + } + + //------------------------ PRIVATE -------------------------- + + /** + * Builds an array of concepts based on the JSON representation returned by the context endpoint. + * @param contextsJson contexts encoded in JSON format + * @return array of {@link Concept} avro records. + */ + private static Concept[] buildConcepts(String contextsJson) { + + return translate(new Gson().fromJson(contextsJson, Context[].class)); + } + + /** + * Translates an array of {@link IISConfigurationEntry} from the JSON model into the array of {@link Concept} objects from avro model. + */ + private static Concept[] translate(Context[] source) { + Concept[] results = new Concept[source.length]; + for (int i=0; i < source.length; i++) { + results[i] = translate(source[i]); + } + return results; + } + + /** + * Translates {@link IISConfigurationEntry} from the JSON model into the {@link Concept} object from avro model. + */ + private static Concept translate(Context source) { + Concept.Builder conceptBuilder = Concept.newBuilder(); + conceptBuilder.setId(source.getId()); + conceptBuilder.setLabel(source.getLabel()); + conceptBuilder.setParams(translate(source.getParams())); + return conceptBuilder.build(); + } + + /** + * Translates the list of {@link Param} from the JSON model into the list of {@link eu.dnetlib.iis.importer.schemas.Param} objects from avro model. + */ + private static List translate(List source) { + return source.stream().map(x -> translate(x)).collect(Collectors.toList()); + } + + /** + * Translates {@link Param} from the JSON model into the {@link eu.dnetlib.iis.importer.schemas.Param} object from avro model. + */ + private static eu.dnetlib.iis.importer.schemas.Param translate(Param source) { + eu.dnetlib.iis.importer.schemas.Param.Builder paramBuilder = eu.dnetlib.iis.importer.schemas.Param.newBuilder(); + paramBuilder.setName(source.getName()); + paramBuilder.setValue(source.getValue()); + return paramBuilder.build(); + } + +} diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/concept/model/Context.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/concept/model/Context.java new file mode 100644 index 000000000..ae80246f4 --- /dev/null +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/concept/model/Context.java @@ -0,0 +1,42 @@ +package eu.dnetlib.iis.wf.importer.concept.model; + +import java.util.ArrayList; +import java.util.List; + +/** + * Class representing context record. + * + * @author mhorst + * + */ +public class Context { + + private String id; + private String label; + private List params = new ArrayList<>(); + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public String getLabel() { + return label; + } + + public void setLabel(final String label) { + this.label = label; + } + + public List getParams() { + return params; + } + + public void setParams(final List params) { + this.params = params; + } + +} diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/concept/model/Param.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/concept/model/Param.java new file mode 100644 index 000000000..50afaa688 --- /dev/null +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/concept/model/Param.java @@ -0,0 +1,32 @@ +package eu.dnetlib.iis.wf.importer.concept.model; + +/** + * Class representing context parameter. + * + * @author mhorst + */ +public class Param { + + private String name; + + private String value; + + public String getName() { + return name; + } + + public String getValue() { + return value; + } + + public Param setName(final String name) { + this.name = name; + return this; + } + + public Param setValue(final String value) { + this.value = value; + return this; + } + +} diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextNotFoundException.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextNotFoundException.java new file mode 100644 index 000000000..afae0bfdf --- /dev/null +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextNotFoundException.java @@ -0,0 +1,17 @@ +package eu.dnetlib.iis.wf.importer.facade; + +/** + * An exception indicating given context was not found. + * @author mhorst + */ +public class ContextNotFoundException extends ContextStreamingException { + + + private static final long serialVersionUID = -1546075729881700992L; + + public ContextNotFoundException(String contextId) { + super(contextId); + } + + +} diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextStreamingException.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextStreamingException.java new file mode 100644 index 000000000..97026c1a5 --- /dev/null +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextStreamingException.java @@ -0,0 +1,31 @@ +package eu.dnetlib.iis.wf.importer.facade; + +/** + * An exception indicating fatal error when streaming context content. + * @author mhorst + */ +public class ContextStreamingException extends Exception { + + private static final long serialVersionUID = -711409479484772617L; + + /** + * Context identifier. + */ + private final String contextId; + + + public ContextStreamingException(String contextId) { + super("Problem occured while streaming context: " + contextId); + this.contextId = contextId; + } + + public ContextStreamingException(String contextId, Exception e) { + super("Problem occured while streaming context: " + contextId, e); + this.contextId = contextId; + } + + public String getContextId() { + return contextId; + } + +} diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextStreamingFacade.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextStreamingFacade.java new file mode 100644 index 000000000..fd8af4a9b --- /dev/null +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextStreamingFacade.java @@ -0,0 +1,18 @@ +package eu.dnetlib.iis.wf.importer.facade; + +import java.io.InputStream; + +/** + * Context API streaming facade. + * @author mhorst + * + */ +public interface ContextStreamingFacade { + + /** + * Returns stream for a given context identifier. + * @return underlying stream + */ + InputStream getStream(String contextId) throws ContextNotFoundException, ContextStreamingException; + +} diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextUrlStreamingFacade.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextUrlStreamingFacade.java new file mode 100644 index 000000000..03cc6ae50 --- /dev/null +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextUrlStreamingFacade.java @@ -0,0 +1,61 @@ +package eu.dnetlib.iis.wf.importer.facade; + +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; + +import org.apache.log4j.Logger; + +/** + * Context straming facade reading data from URL. + * @author mhorst + * + */ +public class ContextUrlStreamingFacade implements ContextStreamingFacade { + + private static final Logger log = Logger.getLogger(ContextUrlStreamingFacade.class); + + private final String endpointLocation; + + private final int readTimeout; + + private final int connectionTimeout; + + + //------------------------ CONSTRUCTOR -------------------------- + + /** + * @param endpointLocation stream endpoint URL location + * @param readTimeout url read timeout + * @param connectionTimeout url connection timeout + */ + public ContextUrlStreamingFacade(String endpointLocation, int readTimeout, int connectionTimeout) { + this.endpointLocation = endpointLocation; + this.readTimeout = readTimeout; + this.connectionTimeout = connectionTimeout; + } + + //------------------------ LOGIC -------------------------- + + @Override + public InputStream getStream(String contextId) throws ContextNotFoundException, ContextStreamingException { + try { + log.info(String.format("setting timeouts for streaming service: read timeout (%s) and connect timeout (%s)", + this.readTimeout, this.connectionTimeout)); + URL url = new URL(ContextUrlStreamingFacadeUtils.buildUrl(endpointLocation, contextId)); + HttpURLConnection con = (HttpURLConnection) url.openConnection(); + con.setReadTimeout(this.readTimeout); + con.setConnectTimeout(this.connectionTimeout); + if (HttpURLConnection.HTTP_NOT_FOUND == con.getResponseCode()) { + throw new ContextNotFoundException(contextId); + } else { + return con.getInputStream(); + } + } catch (IOException e) { + throw new ContextStreamingException(contextId, e); + } + + } + +} diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextUrlStreamingFacadeFactory.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextUrlStreamingFacadeFactory.java new file mode 100644 index 000000000..0ed829dbc --- /dev/null +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextUrlStreamingFacadeFactory.java @@ -0,0 +1,42 @@ +package eu.dnetlib.iis.wf.importer.facade; + +import static eu.dnetlib.iis.common.WorkflowRuntimeParameters.DNET_SERVICE_CLIENT_CONNECTION_TIMEOUT; +import static eu.dnetlib.iis.common.WorkflowRuntimeParameters.DNET_SERVICE_CLIENT_READ_TIMEOUT; +import static eu.dnetlib.iis.common.WorkflowRuntimeParameters.DNET_SERVICE_CONNECTION_TIMEOUT_DEFAULT_VALUE; +import static eu.dnetlib.iis.common.WorkflowRuntimeParameters.DNET_SERVICE_READ_TIMEOUT_DEFAULT_VALUE; + +import java.util.Map; + +import com.google.common.base.Preconditions; + +import eu.dnetlib.iis.common.WorkflowRuntimeParameters; + +/** + * Url based streaming facade factory. + * @author mhorst + * + */ +public class ContextUrlStreamingFacadeFactory implements ServiceFacadeFactory { + + protected static final String IMPORT_CONTEXT_STREAM_ENDPOINT_URL = "import.context.stream.endpoint.url"; + + + //------------------------ LOGIC -------------------------- + + @Override + public ContextStreamingFacade instantiate(Map parameters) { + + Preconditions.checkArgument(parameters.containsKey(IMPORT_CONTEXT_STREAM_ENDPOINT_URL), + "unknown projects stream endpoint url, required parameter '%s' is missing!", + IMPORT_CONTEXT_STREAM_ENDPOINT_URL); + + return new ContextUrlStreamingFacade(parameters.get(IMPORT_CONTEXT_STREAM_ENDPOINT_URL), + Integer.parseInt(WorkflowRuntimeParameters.getParamValue(DNET_SERVICE_CLIENT_READ_TIMEOUT, + DNET_SERVICE_READ_TIMEOUT_DEFAULT_VALUE, parameters)), + Integer.parseInt(WorkflowRuntimeParameters.getParamValue(DNET_SERVICE_CLIENT_CONNECTION_TIMEOUT, + DNET_SERVICE_CONNECTION_TIMEOUT_DEFAULT_VALUE, parameters))); + + } + + +} diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextUrlStreamingFacadeUtils.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextUrlStreamingFacadeUtils.java new file mode 100644 index 000000000..90f52469c --- /dev/null +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/facade/ContextUrlStreamingFacadeUtils.java @@ -0,0 +1,32 @@ +package eu.dnetlib.iis.wf.importer.facade; + +import org.apache.commons.lang3.StringUtils; + +import com.google.common.base.Preconditions; + +/** + * Utility class facilitating {@link ContextUrlStreamingFacade}. + * @author mhorst + */ +public class ContextUrlStreamingFacadeUtils { + + private static final String URL_SLASH = "/"; + + + /** + * Builds an url for a given endpoint location and context identifier. + * @param endpointLocation root url + * @param contextId context identifier to be attached + */ + public static String buildUrl(String endpointLocation, String contextId) { + Preconditions.checkArgument(StringUtils.isNotBlank(endpointLocation), "endpointLocation has not been set"); + Preconditions.checkArgument(StringUtils.isNotBlank(contextId), "contextId has not been set"); + + StringBuilder urlBuilder = new StringBuilder(endpointLocation); + if (!endpointLocation.endsWith(URL_SLASH)) { + urlBuilder.append(URL_SLASH); + } + urlBuilder.append(contextId); + return urlBuilder.toString(); + } +} diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/concept/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/concept/oozie_app/workflow.xml index d9e7d8872..9dd42df27 100644 --- a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/concept/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/concept/oozie_app/workflow.xml @@ -1,6 +1,7 @@ + islookup_facade_factory_classname eu.dnetlib.iis.wf.importer.facade.WebServiceISLookupFacadeFactory @@ -8,8 +9,20 @@ islookup_service_location + $UNDEFINED$ islookup service location + + + context_streaming_facade_factory_classname + eu.dnetlib.iis.wf.importer.facade.ContextUrlStreamingFacadeFactory + ServiceFacadeFactory implementation class name producing eu.dnetlib.iis.wf.importer.facade.ContextStreamingFacade + + + context_service_location + $UNDEFINED$ + RESTful context service location + output concept avro datastore output @@ -66,9 +79,16 @@ - + + + + + ${context_service_location eq "$UNDEFINED$"} + + + - + @@ -86,16 +106,47 @@ -Oconcepts=${output} - + + + + + eu.dnetlib.iis.common.java.ProcessWrapper + eu.dnetlib.iis.common.report.ReportGenerator + -Preport.import.concepts=${wf:actionData('islookup-concept-importer')['CONCEPT_COUNTER']} + -Oreport=${output_report_root_path}/${output_report_relative_path} + + + + + + + + + + + + eu.dnetlib.iis.common.java.ProcessWrapper + eu.dnetlib.iis.wf.importer.concept.RestfulEndpointBasedConceptImporter + -Pimport.context.stream.endpoint.url=${context_service_location} + -Pimport.context.ids.csv=${context_ids_csv} + -Pdnet.service.client.read.timeout=${dnet_service_client_read_timeout} + -Pdnet.service.client.connection.timeout=${dnet_service_client_connection_timeout} + -Pimport.facade.factory.classname=${context_streaming_facade_factory_classname} + -Oconcepts=${output} + + + + + - + eu.dnetlib.iis.common.java.ProcessWrapper eu.dnetlib.iis.common.report.ReportGenerator - -Preport.import.concepts=${wf:actionData('concept-importer')['CONCEPT_COUNTER']} + -Preport.import.concepts=${wf:actionData('context-streaming-concept-importer')['CONCEPT_COUNTER']} -Oreport=${output_report_root_path}/${output_report_relative_path} diff --git a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/EmptyResultsContextStreamingFacadeFactory.java b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/EmptyResultsContextStreamingFacadeFactory.java new file mode 100644 index 000000000..b164647b1 --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/EmptyResultsContextStreamingFacadeFactory.java @@ -0,0 +1,41 @@ +package eu.dnetlib.iis.wf.importer.concept; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.Map; + +import eu.dnetlib.iis.wf.importer.facade.ContextStreamingFacade; +import eu.dnetlib.iis.wf.importer.facade.ServiceFacadeFactory; + +/** + * Factory for building {@link ContextStreamingFacade} facade mock returning empty results. + * @author mhorst + * + */ +public class EmptyResultsContextStreamingFacadeFactory implements ServiceFacadeFactory { + + + //------------------------ LOGIC -------------------------- + + @Override + public ContextStreamingFacade instantiate(Map parameters) { + return new EmptyResultsContextStreamingFacade(); + } + + //--------------- INNER CLASS ------------------- + + /** + * ISLookup mock providing static concept profiles. + * + */ + private static class EmptyResultsContextStreamingFacade implements ContextStreamingFacade { + + //------------------------ LOGIC -------------------------- + + @Override + public InputStream getStream(String contextId) { + return new ByteArrayInputStream("[]".getBytes()); + } + + } +} diff --git a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/ExceptionThrowingContextStreamingFacadeFactory.java b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/ExceptionThrowingContextStreamingFacadeFactory.java new file mode 100644 index 000000000..6fa24d3b3 --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/ExceptionThrowingContextStreamingFacadeFactory.java @@ -0,0 +1,41 @@ +package eu.dnetlib.iis.wf.importer.concept; + +import java.io.InputStream; +import java.util.Map; + +import eu.dnetlib.iis.wf.importer.facade.ContextStreamingException; +import eu.dnetlib.iis.wf.importer.facade.ContextStreamingFacade; +import eu.dnetlib.iis.wf.importer.facade.ServiceFacadeFactory; + +/** + * Factory for building {@link ContextStreamingFacade} facade throwing {@link ContextStreamingException}. + * @author mhorst + * + */ +public class ExceptionThrowingContextStreamingFacadeFactory implements ServiceFacadeFactory { + + + //------------------------ LOGIC -------------------------- + + @Override + public ContextStreamingFacade instantiate(Map parameters) { + return new ExceptionThrowingContextStreamingFacade(); + } + + //--------------- INNER CLASS ------------------- + + /** + * ISLookup mock providing static concept profiles. + * + */ + private static class ExceptionThrowingContextStreamingFacade implements ContextStreamingFacade { + + //------------------------ LOGIC -------------------------- + + @Override + public InputStream getStream(String contextId) throws ContextStreamingException { + throw new ContextStreamingException(contextId); + } + + } +} diff --git a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/MockContextStreamingFacadeFactory.java b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/MockContextStreamingFacadeFactory.java new file mode 100644 index 000000000..09da00a93 --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/MockContextStreamingFacadeFactory.java @@ -0,0 +1,51 @@ +package eu.dnetlib.iis.wf.importer.concept; + +import java.io.InputStream; +import java.util.Map; + +import eu.dnetlib.iis.common.ClassPathResourceProvider; +import eu.dnetlib.iis.wf.importer.facade.ContextNotFoundException; +import eu.dnetlib.iis.wf.importer.facade.ContextStreamingFacade; +import eu.dnetlib.iis.wf.importer.facade.ServiceFacadeFactory; + +/** + * Factory for building {@link ContextStreamingFacade} mocks. + * @author mhorst + * + */ +public class MockContextStreamingFacadeFactory implements ServiceFacadeFactory { + + private static final String supportedContextId = "fet-fp7"; + + protected static final String fetProfileLocation = "/eu/dnetlib/iis/wf/importer/concept/data/input/fet-fp7.json"; + + //------------------------ LOGIC -------------------------- + + @Override + public ContextStreamingFacade instantiate(Map parameters) { + return new MockContextStreamingFacade(); + } + + //--------------- INNER CLASS ------------------- + + /** + * ISLookup mock providing static concept profiles. + * + */ + private static class MockContextStreamingFacade implements ContextStreamingFacade { + + + + //------------------------ LOGIC -------------------------- + + @Override + public InputStream getStream(String contextId) throws ContextNotFoundException { + if (supportedContextId.equals(contextId)) { + return ClassPathResourceProvider.getResourceInputStream(fetProfileLocation); + } else { + throw new ContextNotFoundException(contextId); + } + } + + } +} diff --git a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/RestfulEndpointBasedConceptImporterTest.java b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/RestfulEndpointBasedConceptImporterTest.java new file mode 100644 index 000000000..4d0a103c0 --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/RestfulEndpointBasedConceptImporterTest.java @@ -0,0 +1,212 @@ +package eu.dnetlib.iis.wf.importer.concept; + +import static eu.dnetlib.iis.common.WorkflowRuntimeParameters.OOZIE_ACTION_OUTPUT_FILENAME; +import static eu.dnetlib.iis.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_FACADE_FACTORY_CLASS; +import static eu.dnetlib.iis.wf.importer.VerificationUtils.verifyReport; +import static eu.dnetlib.iis.wf.importer.concept.RestfulEndpointBasedConceptImporter.CONCEPT_COUNTER_NAME; +import static eu.dnetlib.iis.wf.importer.concept.RestfulEndpointBasedConceptImporter.PARAM_IMPORT_CONTEXT_IDS_CSV; +import static eu.dnetlib.iis.wf.importer.concept.RestfulEndpointBasedConceptImporter.PORT_OUT_CONCEPTS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.avro.file.DataFileWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.google.gson.Gson; + +import eu.dnetlib.iis.common.ClassPathResourceProvider; +import eu.dnetlib.iis.common.java.PortBindings; +import eu.dnetlib.iis.common.java.porttype.AvroPortType; +import eu.dnetlib.iis.common.java.porttype.PortType; +import eu.dnetlib.iis.importer.schemas.Concept; +import eu.dnetlib.iis.wf.importer.concept.model.Context; +import eu.dnetlib.iis.wf.importer.facade.ContextStreamingException; +import eu.dnetlib.iis.wf.importer.facade.ServiceFacadeException; + + +/** + * @author mhorst + * + */ +@ExtendWith(MockitoExtension.class) +public class RestfulEndpointBasedConceptImporterTest { + + private PortBindings portBindings; + + private Configuration conf; + + private Map parameters; + + private RestfulEndpointBasedConceptImporter importer; + + @TempDir + public File testFolder; + + @Mock + private DataFileWriter conceptWriter; + + @Captor + private ArgumentCaptor conceptCaptor; + + + @BeforeEach + public void init() { + System.setProperty(OOZIE_ACTION_OUTPUT_FILENAME, + testFolder.getAbsolutePath() + File.separatorChar + "test.properties"); + + Map output = new HashMap<>(); + output.put(PORT_OUT_CONCEPTS, new Path("/irrelevant/location/as/it/will/be/mocked")); + this.portBindings = new PortBindings(Collections.emptyMap(), output); + this.conf = new Configuration(); + this.parameters = new HashMap<>(); + this.parameters.put(IMPORT_FACADE_FACTORY_CLASS, + "eu.dnetlib.iis.wf.importer.concept.MockContextStreamingFacadeFactory"); + this.parameters.put(PARAM_IMPORT_CONTEXT_IDS_CSV, "fet-fp7"); + importer = new RestfulEndpointBasedConceptImporter() { + + @Override + protected DataFileWriter getWriter(FileSystem fs, PortBindings portBindings) throws IOException { + return conceptWriter; + } + }; + } + + // ----------------------------------- TESTS ----------------------------------- + + @Test + public void testGetInputPorts() throws Exception { + // execute + Map result = importer.getInputPorts(); + + // assert + assertNotNull(result); + assertTrue(result.isEmpty()); + } + + @Test + public void testGetOutputPorts() { + // execute + Map result = importer.getOutputPorts(); + + // assert + assertNotNull(result); + assertNotNull(result.get(PORT_OUT_CONCEPTS)); + assertTrue(result.get(PORT_OUT_CONCEPTS) instanceof AvroPortType); + assertSame(Concept.SCHEMA$, ((AvroPortType) result.get(PORT_OUT_CONCEPTS)).getSchema()); + } + + @Test + public void testRunWithoutFacade() { + // given + parameters.remove(IMPORT_FACADE_FACTORY_CLASS); + + // execute + assertThrows(ServiceFacadeException.class, () -> importer.run(portBindings, conf, parameters)); + } + + @Test + public void testRunWithoutContextIds() { + // given + parameters.remove(PARAM_IMPORT_CONTEXT_IDS_CSV); + + // execute + assertThrows(IllegalArgumentException.class, () -> importer.run(portBindings, conf, parameters)); + } + + @Test + public void testRunEmptyResult() throws Exception { + // given + this.parameters.put(IMPORT_FACADE_FACTORY_CLASS, + "eu.dnetlib.iis.wf.importer.concept.EmptyResultsContextStreamingFacadeFactory"); + + // execute + importer.run(portBindings, conf, parameters); + + // assert + verify(conceptWriter, never()).append(any()); + verifyReport(0, CONCEPT_COUNTER_NAME); + } + + @Test + public void testRunUnrecognizedContext() throws Exception { + // given + this.parameters.put(PARAM_IMPORT_CONTEXT_IDS_CSV, "unrecognized-context-id"); + + // execute + importer.run(portBindings, conf, parameters); + + // assert + verify(conceptWriter, never()).append(any()); + verifyReport(0, CONCEPT_COUNTER_NAME); + } + + @Test + public void testRunReceivingUnexpectedException() throws Exception { + // given + + this.parameters.put(IMPORT_FACADE_FACTORY_CLASS, + "eu.dnetlib.iis.wf.importer.concept.ExceptionThrowingContextStreamingFacadeFactory"); + + // execute + assertThrows(ContextStreamingException.class, () -> importer.run(portBindings, conf, parameters)); + } + + @Test + public void testRun() throws Exception { + // given + Context[] expectedContexts = new Gson().fromJson( + ClassPathResourceProvider.getResourceContent(MockContextStreamingFacadeFactory.fetProfileLocation), + Context[].class); + + // execute + importer.run(portBindings, conf, parameters); + + // assert + verify(conceptWriter, times(expectedContexts.length)).append(conceptCaptor.capture()); + List concepts = conceptCaptor.getAllValues(); + verifyReport(expectedContexts.length, CONCEPT_COUNTER_NAME); + assertEquals(expectedContexts.length, concepts.size()); + + for (int i = 0; i < expectedContexts.length; i++) { + Context currentExpectedContext = expectedContexts[i]; + Concept currentConcept = concepts.get(i); + + assertEquals(currentExpectedContext.getId(), currentConcept.getId()); + assertEquals(currentExpectedContext.getLabel(), currentConcept.getLabel()); + + assertEquals(currentExpectedContext.getParams().size(), currentConcept.getParams().size()); + for (int j = 0; j < currentExpectedContext.getParams().size(); j++) { + assertEquals(currentExpectedContext.getParams().get(j).getName(), + currentConcept.getParams().get(j).getName()); + assertEquals(currentExpectedContext.getParams().get(j).getValue(), + currentConcept.getParams().get(j).getValue()); + } + } + + } + +} diff --git a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/WorkflowTest.java b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/WorkflowTest.java index f42a6fe35..57a35299c 100644 --- a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/WorkflowTest.java +++ b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/concept/WorkflowTest.java @@ -13,9 +13,16 @@ public class WorkflowTest extends AbstractOozieWorkflowTestCase { @Test - public void testImportConceptWorkflow() { + public void testImportConceptWithISLookupServiceBasedWorkflow() { OozieWorkflowTestConfiguration wfConf = new OozieWorkflowTestConfiguration(); wfConf.setTimeoutInSeconds(720); - testWorkflow("eu/dnetlib/iis/wf/importer/concept/sampletest", wfConf); + testWorkflow("eu/dnetlib/iis/wf/importer/concept/islookup", wfConf); + } + + @Test + public void testImportConceptWithContextStreamingServiceBasedWorkflow() { + OozieWorkflowTestConfiguration wfConf = new OozieWorkflowTestConfiguration(); + wfConf.setTimeoutInSeconds(720); + testWorkflow("eu/dnetlib/iis/wf/importer/concept/contextservice", wfConf); } } diff --git a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/facade/ContextUrlStreamingFacadeUtilsTest.java b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/facade/ContextUrlStreamingFacadeUtilsTest.java new file mode 100644 index 000000000..3019ec1ae --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/facade/ContextUrlStreamingFacadeUtilsTest.java @@ -0,0 +1,75 @@ +package eu.dnetlib.iis.wf.importer.facade; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; + +/** + * @author mhorst + */ +public class ContextUrlStreamingFacadeUtilsTest { + + + // ------------------------ TESTS -------------------------- + + @Test + public void build_url_without_trailing_slash() { + // given + String rootUrl = "someRootUrl"; + String contextId = "someCtxId"; + + // execute + assertEquals(rootUrl +"/" + contextId, ContextUrlStreamingFacadeUtils.buildUrl(rootUrl, contextId)); + } + + @Test + public void build_url_with_trailing_slash() { + // given + String rootUrl = "someRootUrl/"; + String contextId = "someCtxId"; + + // execute + assertEquals(rootUrl + contextId, ContextUrlStreamingFacadeUtils.buildUrl(rootUrl, contextId)); + } + + @Test + public void build_url_with_null_context() { + // given + String rootUrl = "someRootUrl/"; + String contextId = null; + + // execute + assertThrows(IllegalArgumentException.class, () -> ContextUrlStreamingFacadeUtils.buildUrl(rootUrl, contextId)); + } + + @Test + public void build_url_with_empty_context() { + // given + String rootUrl = "someRootUrl/"; + String contextId = ""; + + // execute + assertThrows(IllegalArgumentException.class, () -> ContextUrlStreamingFacadeUtils.buildUrl(rootUrl, contextId)); + } + + @Test + public void build_url_with_null_endpoint_location() { + // given + String rootUrl = null; + String contextId = "someCtxId"; + + // execute + assertThrows(IllegalArgumentException.class, () -> ContextUrlStreamingFacadeUtils.buildUrl(rootUrl, contextId)); + } + + @Test + public void build_url_with_empty_endpoint_location() { + // given + String rootUrl = ""; + String contextId = "someCtxId"; + + // execute + assertThrows(IllegalArgumentException.class, () -> ContextUrlStreamingFacadeUtils.buildUrl(rootUrl, contextId)); + } +} diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/sampletest/oozie_app/import.txt b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/contextservice/oozie_app/import.txt similarity index 100% rename from iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/sampletest/oozie_app/import.txt rename to iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/contextservice/oozie_app/import.txt diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/contextservice/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/contextservice/oozie_app/workflow.xml new file mode 100644 index 000000000..777d0e046 --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/contextservice/oozie_app/workflow.xml @@ -0,0 +1,79 @@ + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + + + + + + + ${wf:appPath()}/import_concept + + + + context_streaming_facade_factory_classname + eu.dnetlib.iis.wf.importer.concept.MockContextStreamingFacadeFactory + + + context_service_location + dummyServiceLocation + + + context_ids_csv + fet-fp7 + + + output + ${workingDir}/out + + + output_report_root_path + ${workingDir}/report + + + + + + + + + + + eu.dnetlib.iis.common.java.ProcessWrapper + + eu.dnetlib.iis.common.java.jsonworkflownodes.TestingConsumer + + -C{concepts, + eu.dnetlib.iis.importer.schemas.Concept, + eu/dnetlib/iis/wf/importer/concept/data/output/concept.json} + -C{report,eu.dnetlib.iis.common.schemas.ReportEntry, + eu/dnetlib/iis/wf/importer/concept/data/output/report.json} + + -Iconcepts=${workingDir}/out + -Ireport=${workingDir}/report/import_concept + + + + + + + Unfortunately, the process failed -- error message: + [${wf:errorMessage(wf:lastErrorNode())}] + + + + + diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/data/input/fet-fp7.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/data/input/fet-fp7.json new file mode 100644 index 000000000..c29a8ebe0 --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/data/input/fet-fp7.json @@ -0,0 +1,43 @@ +[{ + "id": "fet-fp7::open::301::284566", + "label": "Quantum Propagating Microwaves in Strongly Coupled Environments", + "params": [ + {"name": "CD_PROJECT_NUMBER", "value":"284566"}, + {"name": "CD_ACRONYM", "value":"PROMISCE"}, + {"name": "CD_FRAMEWORK", "value":"FP7"} + ] +}, +{ + "id": "fet-fp7::open::301::284584", + "label": "Quantum Interferometry with Bose-Einstein Condensates", + "params": [ + {"name": "CD_PROJECT_NUMBER", "value":"284584"}, + {"name": "CD_FRAMEWORK", "value":"FP7"} + ] +}, +{ + "id": "fet-fp7::open::301", + "label": "Challenging current thinking", + "params": [ + {"name": "CD_DIVNAME", "value":"7.A.SP1.03.19.01"}, + {"name": "CD_ABBR", "value":"ICT-2011.9.1"}, + {"name": "CD_PARENT_DIVNAME", "value":"7.A.SP1.03.19"}, + {"name": "DUPLICATED_ENTRY", "value":"value1"}, + {"name": "DUPLICATED_ENTRY", "value":"value2"} + ] +}, +{ + "id": "fet-fp7::open", + "label": "FET Open", + "params": [ + {"name": "CD_DIVNAME", "value":"7.A.SP1.03.08"}, + {"name": "CD_ABBR", "value":"ICT-2007.8.0"}, + {"name": "CD_PARENT_DIVNAME", "value":"7.A.SP1.03", "unknown_param_field": null} + ] +}, +{ + "id": "fet-fp7", + "label": "FET", + "params": [], + "unknown_field": null +}] \ No newline at end of file diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/islookup/oozie_app/import.txt b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/islookup/oozie_app/import.txt new file mode 100644 index 000000000..e1ff2647c --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/islookup/oozie_app/import.txt @@ -0,0 +1,2 @@ +## This is a classpath-based import file (this header is required) +import_concept classpath eu/dnetlib/iis/wf/importer/concept/oozie_app diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/sampletest/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/islookup/oozie_app/workflow.xml similarity index 98% rename from iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/sampletest/oozie_app/workflow.xml rename to iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/islookup/oozie_app/workflow.xml index f44bcf191..f7527b5d6 100644 --- a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/sampletest/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/concept/islookup/oozie_app/workflow.xml @@ -1,5 +1,5 @@ - + ${jobTracker} diff --git a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml index d6ac25e64..e0bc99740 100644 --- a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml @@ -38,6 +38,10 @@ islookup_service_location IS Lookup service location + + context_service_location + RESTful context service location + project_concepts_context_ids_csv comma separated list of concepts context identifiers to be picked by ISLookup @@ -248,6 +252,10 @@ ${islookup_service_location} + context_service_location + ${context_service_location} + + context_ids_csv ${project_concepts_context_ids_csv} diff --git a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml index 14a5a8802..77e3b74b9 100644 --- a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml @@ -87,8 +87,14 @@ import_islookup_service_location + $UNDEFINED$ IS Lookup service location + + import_context_service_location + $UNDEFINED$ + RESTful context service location + import_project_concepts_context_ids_csv fet-fp7,fet-h2020 @@ -524,6 +530,10 @@ islookup_service_location ${import_islookup_service_location} + + context_service_location + ${import_context_service_location} + project_concepts_context_ids_csv ${import_project_concepts_context_ids_csv}