From aab26f4bf4b206009038e35ac4eae80805b9fcae Mon Sep 17 00:00:00 2001 From: Marek Horst Date: Wed, 13 Dec 2023 13:19:49 +0100 Subject: [PATCH] Closes #1440: Implement WileyML XML records parser producing DocumentText datastore Introducing the first version of the importer module along with the workflow.xml definition. Current version of the importer expects DocumentText avro records at input (with text field providing WileyML records) and produces DocumentText records at output with the identifier field updated with a DOI extracted from the WileyML record. --- iis-wf/iis-wf-import/pom.xml | 5 + .../wiley/ImportWileyXmlContentJob.java | 128 ++++++++++++++++++ .../wf/importer/wiley/oozie_app/workflow.xml | 105 ++++++++++++++ 3 files changed, 238 insertions(+) create mode 100644 iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/wiley/ImportWileyXmlContentJob.java create mode 100644 iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/wiley/oozie_app/workflow.xml diff --git a/iis-wf/iis-wf-import/pom.xml b/iis-wf/iis-wf-import/pom.xml index f6c86dac2..9487e1d07 100644 --- a/iis-wf/iis-wf-import/pom.xml +++ b/iis-wf/iis-wf-import/pom.xml @@ -123,6 +123,11 @@ spark-sql_2.11 + + org.apache.spark + spark-avro_2.11 + + pl.edu.icm.spark-utils spark-utils_2.11 diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/wiley/ImportWileyXmlContentJob.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/wiley/ImportWileyXmlContentJob.java new file mode 100644 index 000000000..8fd39b2a1 --- /dev/null +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/wiley/ImportWileyXmlContentJob.java @@ -0,0 +1,128 @@ +package eu.dnetlib.iis.wf.importer.wiley; + +import static eu.dnetlib.iis.common.spark.SparkSessionSupport.runWithSparkSession; +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.udf; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.api.java.UDF1; +import org.apache.spark.sql.avro.SchemaConverters; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField$; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.StructType$; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.InputSource; +import org.xml.sax.SAXParseException; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; + +import eu.dnetlib.iis.common.java.io.HdfsUtils; +import eu.dnetlib.iis.common.spark.avro.AvroDataFrameReader; +import eu.dnetlib.iis.common.spark.avro.AvroDataFrameWriter; +import eu.dnetlib.iis.metadataextraction.schemas.DocumentText; +import eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal; + + +/** + * Creates a {@link DocumentText} avro data from an input {@link DocumentText} datastore by updating both id and text fields. + * Id field is set with a DOI record available in the text payload field holding WileyML record. + * Text field is set to an appropriate text representation of the WileyML record. + * + * @author mhorst + * + */ +public class ImportWileyXmlContentJob { + + public static void main(String[] args) { + JobParams params = new JobParams(); + JCommander jcommander = new JCommander(params); + jcommander.parse(args); + + runWithSparkSession(new SparkConf(), params.isSparkSessionShared, spark -> { + + HdfsUtils.remove(spark.sparkContext().hadoopConfiguration(), params.outputPath); + + AvroDataFrameReader avroDataFrameReader = new AvroDataFrameReader(spark); + + Dataset inputDocumentTextDF = avroDataFrameReader.read(params.inputPath, DocumentText.SCHEMA$); + + String xPathQueryDOI = params.xPathQueryDOI; + + Dataset idAndTextDF = inputDocumentTextDF + .select(get_xml_object_string(xPathQueryDOI).apply(col("text")).as("id"), col("text")) + .where(col("text").isNotNull().and(col("text").notEqual("").and(col("id").isNotNull().and(col("id").notEqual(""))))); + + Dataset documentTextDF = spark.createDataFrame(idAndTextDF.javaRDD(), + (StructType) SchemaConverters.toSqlType(DocumentText.SCHEMA$).dataType()); + + new AvroDataFrameWriter(documentTextDF).write(params.outputPath, DocumentText.SCHEMA$); + }); + } + + public static UserDefinedFunction get_xml_object_string(String xPathStr) { + return udf((UDF1) xml -> { + + XPath xPath = XPathFactory.newInstance().newXPath(); + + DocumentBuilder builder = DocumentBuilderFactory.newInstance().newDocumentBuilder(); + Document xmlDocument = builder.parse(new InputSource(new StringReader(xml))); + + return extractFirstNonEmptyTrimmedTextContent((NodeList) xPath.compile(xPathStr).evaluate(xmlDocument, XPathConstants.NODESET)); + + }, DataTypes.StringType); + } + + private static String extractFirstNonEmptyTrimmedTextContent(NodeList nodes) { + for (int i = 0; i < nodes.getLength(); i++) { + Node currentNode = nodes.item(i); + String textContent = currentNode.getTextContent(); + if (StringUtils.isNotBlank(textContent)) { + return textContent.trim(); + } + } + return null; + } + + @Parameters(separators = "=") + public static class JobParams { + + @Parameter(names = "-sharedSparkSession") + private Boolean isSparkSessionShared = Boolean.FALSE; + + @Parameter(names = "-inputPath", required = true) + private String inputPath; + + @Parameter(names = "-outputPath", required = true) + private String outputPath; + + @Parameter(names = "-xPathQueryDOI", required = true) + private String xPathQueryDOI; + } +} \ No newline at end of file diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/wiley/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/wiley/oozie_app/workflow.xml new file mode 100644 index 000000000..2c4e5fba2 --- /dev/null +++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/wiley/oozie_app/workflow.xml @@ -0,0 +1,105 @@ + + + + + input + input DocumentText records + + + xPathQueryDOI + //publicationMeta[@level='unit']/doi/text() + XPath query to retrieve DOI + + + output + output DocumentText records + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + + yarn-cluster + cluster + import-elsevier + eu.dnetlib.iis.wf.importer.wiley.ImportWileyXmlContentJob + ${oozieTopWfApplicationPath}/lib/iis-wf-import-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + -inputPath=${input} + -outputPath=${output} + -xPathQueryDOI=${xPathQueryDOI} + + + + + + + Unfortunately, the process failed -- error message: + [${wf:errorMessage(wf:lastErrorNode())}] + + + + + \ No newline at end of file