Skip to content

Commit

Permalink
Closes #1440: Implement WileyML XML records parser producing Document…
Browse files Browse the repository at this point in the history
…Text datastore

Introducing the first version of the importer module along with the workflow.xml definition.

Current version of the importer expects DocumentText avro records at input (with text field providing WileyML records) and produces DocumentText records at output with the identifier field updated with a DOI extracted from the WileyML record.
  • Loading branch information
marekhorst committed Dec 13, 2023
1 parent d10941d commit aab26f4
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 0 deletions.
5 changes: 5 additions & 0 deletions iis-wf/iis-wf-import/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@
<artifactId>spark-sql_2.11</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
</dependency>

<dependency>
<groupId>pl.edu.icm.spark-utils</groupId>
<artifactId>spark-utils_2.11</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package eu.dnetlib.iis.wf.importer.wiley;

import static eu.dnetlib.iis.common.spark.SparkSessionSupport.runWithSparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.udf;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathFactory;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import org.xml.sax.SAXParseException;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;

import eu.dnetlib.iis.common.java.io.HdfsUtils;
import eu.dnetlib.iis.common.spark.avro.AvroDataFrameReader;
import eu.dnetlib.iis.common.spark.avro.AvroDataFrameWriter;
import eu.dnetlib.iis.metadataextraction.schemas.DocumentText;
import eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal;


/**
* Creates a {@link DocumentText} avro data from an input {@link DocumentText} datastore by updating both id and text fields.
* Id field is set with a DOI record available in the text payload field holding WileyML record.
* Text field is set to an appropriate text representation of the WileyML record.
*
* @author mhorst
*
*/
public class ImportWileyXmlContentJob {

public static void main(String[] args) {
JobParams params = new JobParams();
JCommander jcommander = new JCommander(params);
jcommander.parse(args);

runWithSparkSession(new SparkConf(), params.isSparkSessionShared, spark -> {

HdfsUtils.remove(spark.sparkContext().hadoopConfiguration(), params.outputPath);

AvroDataFrameReader avroDataFrameReader = new AvroDataFrameReader(spark);

Dataset<Row> inputDocumentTextDF = avroDataFrameReader.read(params.inputPath, DocumentText.SCHEMA$);

String xPathQueryDOI = params.xPathQueryDOI;

Dataset<Row> idAndTextDF = inputDocumentTextDF
.select(get_xml_object_string(xPathQueryDOI).apply(col("text")).as("id"), col("text"))
.where(col("text").isNotNull().and(col("text").notEqual("").and(col("id").isNotNull().and(col("id").notEqual("")))));

Dataset<Row> documentTextDF = spark.createDataFrame(idAndTextDF.javaRDD(),
(StructType) SchemaConverters.toSqlType(DocumentText.SCHEMA$).dataType());

new AvroDataFrameWriter(documentTextDF).write(params.outputPath, DocumentText.SCHEMA$);
});
}

public static UserDefinedFunction get_xml_object_string(String xPathStr) {
return udf((UDF1<String, String>) xml -> {

XPath xPath = XPathFactory.newInstance().newXPath();

DocumentBuilder builder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
Document xmlDocument = builder.parse(new InputSource(new StringReader(xml)));

return extractFirstNonEmptyTrimmedTextContent((NodeList) xPath.compile(xPathStr).evaluate(xmlDocument, XPathConstants.NODESET));

}, DataTypes.StringType);
}

private static String extractFirstNonEmptyTrimmedTextContent(NodeList nodes) {
for (int i = 0; i < nodes.getLength(); i++) {
Node currentNode = nodes.item(i);
String textContent = currentNode.getTextContent();
if (StringUtils.isNotBlank(textContent)) {
return textContent.trim();
}
}
return null;
}

@Parameters(separators = "=")
public static class JobParams {

@Parameter(names = "-sharedSparkSession")
private Boolean isSparkSessionShared = Boolean.FALSE;

@Parameter(names = "-inputPath", required = true)
private String inputPath;

@Parameter(names = "-outputPath", required = true)
private String outputPath;

@Parameter(names = "-xPathQueryDOI", required = true)
private String xPathQueryDOI;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
<workflow-app xmlns="uri:oozie:workflow:0.4" name="importer_wiley">

<parameters>
<property>
<name>input</name>
<description>input DocumentText records</description>
</property>
<property>
<name>xPathQueryDOI</name>
<value>//publicationMeta[@level='unit']/doi/text()</value>
<description>XPath query to retrieve DOI</description>
</property>
<property>
<name>output</name>
<description>output DocumentText records</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>

<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>

<start to="import_wiley"/>

<action name="import_wiley">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>import-elsevier</name>
<class>eu.dnetlib.iis.wf.importer.wiley.ImportWileyXmlContentJob</class>
<jar>${oozieTopWfApplicationPath}/lib/iis-wf-import-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>-inputPath=${input}</arg>
<arg>-outputPath=${output}</arg>
<arg>-xPathQueryDOI=${xPathQueryDOI}</arg>
</spark>
<ok to="end"/>
<error to="fail"/>
</action>

<kill name="fail">
<message>Unfortunately, the process failed -- error message:
[${wf:errorMessage(wf:lastErrorNode())}]
</message>
</kill>

<end name="end"/>
</workflow-app>

0 comments on commit aab26f4

Please sign in to comment.