Skip to content

Commit

Permalink
Closes #1434: Reimplement context importer module by replacing IS-Loo…
Browse files Browse the repository at this point in the history
…kup with RESTful endpoint

Introducing RESTful context API based concepts importer module. New paramater is required to enable RESTful endpoint based import: `import_context_service_location`.

If the parameter is not provided then ISLookup-based importer mode will be enabled making `import_islookup_service_location` parameter required in such case.
  • Loading branch information
marekhorst committed Oct 13, 2023
1 parent a37fd12 commit db5389d
Show file tree
Hide file tree
Showing 23 changed files with 1,081 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package eu.dnetlib.iis.wf.importer.concept;

import static eu.dnetlib.iis.common.WorkflowRuntimeParameters.OOZIE_ACTION_OUTPUT_FILENAME;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.avro.file.DataFileWriter;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;

import eu.dnetlib.iis.common.counter.NamedCounters;
import eu.dnetlib.iis.common.counter.NamedCountersFileWriter;
import eu.dnetlib.iis.common.java.PortBindings;
import eu.dnetlib.iis.common.java.Process;
import eu.dnetlib.iis.common.java.io.DataStore;
import eu.dnetlib.iis.common.java.io.FileSystemPath;
import eu.dnetlib.iis.common.java.porttype.AvroPortType;
import eu.dnetlib.iis.common.java.porttype.PortType;
import eu.dnetlib.iis.importer.schemas.Concept;
import eu.dnetlib.iis.wf.importer.concept.model.Context;
import eu.dnetlib.iis.wf.importer.concept.model.Param;
import eu.dnetlib.iis.wf.importer.facade.ContextNotFoundException;
import eu.dnetlib.iis.wf.importer.facade.ContextStreamingFacade;
import eu.dnetlib.iis.wf.importer.facade.ServiceFacadeUtils;

/**
* RESTful endpoint based importer reading contexts provided as JSON records.
* @author mhorst
*
*/
public class RestfulEndpointBasedConceptImporter implements Process {

public static final String PARAM_IMPORT_CONTEXT_IDS_CSV = "import.context.ids.csv";

protected static final String CONCEPT_COUNTER_NAME = "CONCEPT_COUNTER";

private static final Logger log = Logger.getLogger(RestfulEndpointBasedConceptImporter.class);

private final NamedCountersFileWriter countersWriter = new NamedCountersFileWriter();


protected static final String PORT_OUT_CONCEPTS = "concepts";

private final Map<String, PortType> outputPorts = new HashMap<String, PortType>();


//------------------------ CONSTRUCTORS -------------------

public RestfulEndpointBasedConceptImporter() {
outputPorts.put(PORT_OUT_CONCEPTS, new AvroPortType(Concept.SCHEMA$));
}

//------------------------ LOGIC --------------------------

@Override
public Map<String, PortType> getInputPorts() {
return Collections.emptyMap();
}

@Override
public Map<String, PortType> getOutputPorts() {
return outputPorts;
}

@Override
public void run(PortBindings portBindings, Configuration conf,
Map<String, String> parameters) throws Exception {

Preconditions.checkArgument(parameters.containsKey(PARAM_IMPORT_CONTEXT_IDS_CSV),
"unknown context identifier, required parameter '%s' is missing!", PARAM_IMPORT_CONTEXT_IDS_CSV);
String contextIdsCSV = parameters.get(PARAM_IMPORT_CONTEXT_IDS_CSV);

try (DataFileWriter<Concept> conceptWriter = getWriter(FileSystem.get(conf), portBindings)) {

NamedCounters counters = new NamedCounters(new String[] { CONCEPT_COUNTER_NAME });

ContextStreamingFacade streamingFacade = ServiceFacadeUtils.instantiate(parameters);

String[] contextIds = StringUtils.split(contextIdsCSV, ',');

for (String contextId : contextIds) {

if (StringUtils.isNotEmpty(contextId)) {

try {
try (InputStream is = streamingFacade.getStream(contextId)) {

Concept[] concepts = buildConcepts(IOUtils.toString(is, StandardCharsets.UTF_8.name()));

for (Concept concept : concepts) {
conceptWriter.append(concept);
}

counters.increment(CONCEPT_COUNTER_NAME, (long) concepts.length);
}
} catch (ContextNotFoundException e) {
log.warn("context not found: " + contextId, e);
}
}
}

countersWriter.writeCounters(counters, System.getProperty(OOZIE_ACTION_OUTPUT_FILENAME));
}
}

/**
* Provides {@link Concept} writer consuming records.
*/
protected DataFileWriter<Concept> getWriter(FileSystem fs, PortBindings portBindings) throws IOException {
return DataStore.create(
new FileSystemPath(fs, portBindings.getOutput().get(PORT_OUT_CONCEPTS)), Concept.SCHEMA$);
}

//------------------------ PRIVATE --------------------------

/**
* Builds an array of concepts based on the JSON representation returned by the context endpoint.
* @param contextsJson contexts encoded in JSON format
* @return array of {@link Concept} avro records.
*/
private static Concept[] buildConcepts(String contextsJson) {

return translate(new Gson().fromJson(contextsJson, Context[].class));
}

/**
* Translates an array of {@link IISConfigurationEntry} from the JSON model into the array of {@link Concept} objects from avro model.
*/
private static Concept[] translate(Context[] source) {
Concept[] results = new Concept[source.length];
for (int i=0; i < source.length; i++) {
results[i] = translate(source[i]);
}
return results;
}

/**
* Translates {@link IISConfigurationEntry} from the JSON model into the {@link Concept} object from avro model.
*/
private static Concept translate(Context source) {
Concept.Builder conceptBuilder = Concept.newBuilder();
conceptBuilder.setId(source.getId());
conceptBuilder.setLabel(source.getLabel());
conceptBuilder.setParams(translate(source.getParams()));
return conceptBuilder.build();
}

/**
* Translates the list of {@link Param} from the JSON model into the list of {@link eu.dnetlib.iis.importer.schemas.Param} objects from avro model.
*/
private static List<eu.dnetlib.iis.importer.schemas.Param> translate(List<Param> source) {
return source.stream().map(x -> translate(x)).collect(Collectors.toList());
}

/**
* Translates {@link Param} from the JSON model into the {@link eu.dnetlib.iis.importer.schemas.Param} object from avro model.
*/
private static eu.dnetlib.iis.importer.schemas.Param translate(Param source) {
eu.dnetlib.iis.importer.schemas.Param.Builder paramBuilder = eu.dnetlib.iis.importer.schemas.Param.newBuilder();
paramBuilder.setName(source.getName());
paramBuilder.setValue(source.getValue());
return paramBuilder.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package eu.dnetlib.iis.wf.importer.concept.model;

import java.util.ArrayList;
import java.util.List;

/**
* Class representing context record.
*
* @author mhorst
*
*/
public class Context {

private String id;
private String label;
private List<Param> params = new ArrayList<>();

public String getId() {
return id;
}

public void setId(final String id) {
this.id = id;
}

public String getLabel() {
return label;
}

public void setLabel(final String label) {
this.label = label;
}

public List<Param> getParams() {
return params;
}

public void setParams(final List<Param> params) {
this.params = params;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package eu.dnetlib.iis.wf.importer.concept.model;

/**
* Class representing context parameter.
*
* @author mhorst
*/
public class Param {

private String name;

private String value;

public String getName() {
return name;
}

public String getValue() {
return value;
}

public Param setName(final String name) {
this.name = name;
return this;
}

public Param setValue(final String value) {
this.value = value;
return this;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package eu.dnetlib.iis.wf.importer.facade;

/**
* An exception indicating given context was not found.
* @author mhorst
*/
public class ContextNotFoundException extends ContextStreamingException {


private static final long serialVersionUID = -1546075729881700992L;

public ContextNotFoundException(String contextId) {
super(contextId);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package eu.dnetlib.iis.wf.importer.facade;

/**
* An exception indicating fatal error when streaming context content.
* @author mhorst
*/
public class ContextStreamingException extends Exception {

private static final long serialVersionUID = -711409479484772617L;

/**
* Context identifier.
*/
private final String contextId;


public ContextStreamingException(String contextId) {
super("Problem occured while streaming context: " + contextId);
this.contextId = contextId;
}

public ContextStreamingException(String contextId, Exception e) {
super("Problem occured while streaming context: " + contextId, e);
this.contextId = contextId;
}

public String getContextId() {
return contextId;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package eu.dnetlib.iis.wf.importer.facade;

import java.io.InputStream;

/**
* Context API streaming facade.
* @author mhorst
*
*/
public interface ContextStreamingFacade {

/**
* Returns stream for a given context identifier.
* @return underlying stream
*/
InputStream getStream(String contextId) throws ContextNotFoundException, ContextStreamingException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package eu.dnetlib.iis.wf.importer.facade;

import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;

import org.apache.log4j.Logger;

/**
* Context straming facade reading data from URL.
* @author mhorst
*
*/
public class ContextUrlStreamingFacade implements ContextStreamingFacade {

private static final Logger log = Logger.getLogger(ContextUrlStreamingFacade.class);

private final String endpointLocation;

private final int readTimeout;

private final int connectionTimeout;


//------------------------ CONSTRUCTOR --------------------------

/**
* @param endpointLocation stream endpoint URL location
* @param readTimeout url read timeout
* @param connectionTimeout url connection timeout
*/
public ContextUrlStreamingFacade(String endpointLocation, int readTimeout, int connectionTimeout) {
this.endpointLocation = endpointLocation;
this.readTimeout = readTimeout;
this.connectionTimeout = connectionTimeout;
}

//------------------------ LOGIC --------------------------

@Override
public InputStream getStream(String contextId) throws ContextNotFoundException, ContextStreamingException {
try {
log.info(String.format("setting timeouts for streaming service: read timeout (%s) and connect timeout (%s)",
this.readTimeout, this.connectionTimeout));
URL url = new URL(ContextUrlStreamingFacadeUtils.buildUrl(endpointLocation, contextId));
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setReadTimeout(this.readTimeout);
con.setConnectTimeout(this.connectionTimeout);
if (HttpURLConnection.HTTP_NOT_FOUND == con.getResponseCode()) {
throw new ContextNotFoundException(contextId);
} else {
return con.getInputStream();
}
} catch (IOException e) {
throw new ContextStreamingException(contextId, e);
}

}

}
Loading

0 comments on commit db5389d

Please sign in to comment.