Skip to content

Commit

Permalink
Added support for HDFS
Browse files Browse the repository at this point in the history
* align with the s3 url handler (you can only set one per JVM)
* lots of internal refactorings
  • Loading branch information
conker84 authored and jexp committed Jan 27, 2018
1 parent 8e39e1c commit 4d55cf8
Show file tree
Hide file tree
Showing 38 changed files with 606 additions and 232 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ Thumbs.db
.cache-main
.cache-tests
bin
hadoop
21 changes: 13 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@ ext {
}

repositories {
// mavenLocal()
maven { url "https://m2.neo4j.org/content/repositories/snapshots" }
mavenCentral()
maven { url "https://m2.neo4j.org/content/repositories/snapshots" }
maven { url "http://oss.sonatype.org/content/repositories/snapshots/" }
maven { url "https://m2.neo4j.org/content/repositories/snapshots" }
mavenCentral()
maven { url "http://oss.sonatype.org/content/repositories/snapshots/" }
flatDir { dirs 'lib' }
}

compileJava {
Expand All @@ -72,8 +71,8 @@ dependencies {
compileOnly group: 'net.biville.florent', name: 'neo4j-sproc-compiler', version:'1.2'

compile 'com.novell.ldap:jldap:2009-10-07'
compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.11.250'
testCompile group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.11.250'
compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.11.270'
testCompile group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.11.270'

testCompile group: 'junit', name: 'junit', version:'4.12'
testCompile group: 'org.hamcrest', name: 'hamcrest-library', version:'1.3'
Expand All @@ -88,7 +87,13 @@ dependencies {
testCompile group: 'org.neo4j', name: 'neo4j-io', version:neo4jVersionEffective, classifier: "tests"

testCompile group: 'org.postgresql', name: 'postgresql', version: '42.1.4'
testCompile 'io.minio:minio:3.0.10'
testCompile 'io.minio:minio:3.0.11'

compileOnly group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.7.5'
testCompile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.7.5'
compileOnly group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.5'
testCompile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.5'
testCompile group: 'org.apache.hadoop', name: 'hadoop-minicluster', version: '2.7.5'

compileOnly 'org.mongodb:mongodb-driver:3.2.2'
testCompile 'org.mongodb:mongodb-driver:3.2.2'
Expand Down
Binary file added hadoop/bin/hadoop.dll
Binary file not shown.
Binary file added hadoop/bin/hdfs.dll
Binary file not shown.
Binary file added hadoop/bin/libwinutils.lib
Binary file not shown.
Binary file added hadoop/bin/winutils.exe
Binary file not shown.
3 changes: 0 additions & 3 deletions src/main/java/apoc/ApocConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@

import apoc.cache.Static;
import apoc.util.Util;
import apoc.util.s3util.S3UrlStreamHandlerFactory;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.internal.GraphDatabaseAPI;

import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
Expand All @@ -25,7 +23,6 @@ public class ApocConfiguration {
static {
PARAM_WHITELIST.put("dbms.directories.import", "import.file.directory");
PARAM_WHITELIST.put("dbms.security.allow_csv_import_from_file_urls", "import.file.allow_read_from_filesystem");
URL.setURLStreamHandlerFactory(new S3UrlStreamHandlerFactory());
}

public static void initialize(GraphDatabaseAPI db) {
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/apoc/ApocKernelExtensionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import apoc.index.IndexUpdateTransactionEventHandler;
import apoc.trigger.Trigger;
import apoc.ttl.TTLLifeCycle;
import apoc.util.ApocUrlStreamHandlerFactory;
import org.neo4j.kernel.extension.KernelExtensionFactory;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.proc.Procedures;
Expand All @@ -13,12 +14,17 @@
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;

import java.net.URL;

/**
* @author mh
* @since 14.05.16
*/
public class ApocKernelExtensionFactory extends KernelExtensionFactory<ApocKernelExtensionFactory.Dependencies>{

static {
URL.setURLStreamHandlerFactory(new ApocUrlStreamHandlerFactory());
}
public ApocKernelExtensionFactory() {
super("APOC");
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/apoc/cypher/Cypher.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package apoc.cypher;

import apoc.Pools;
import apoc.export.util.FileUtils;
import apoc.util.FileUtils;
import apoc.result.MapResult;
import apoc.util.QueueBasedSpliterator;
import apoc.util.Util;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/apoc/export/csv/ExportCSV.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import java.util.Map;
import java.util.stream.Stream;

import static apoc.export.util.FileUtils.checkWriteAllowed;
import static apoc.export.util.FileUtils.getPrintWriter;
import static apoc.util.FileUtils.checkWriteAllowed;
import static apoc.util.FileUtils.getPrintWriter;

/**
* @author mh
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/apoc/export/cypher/ExportCypher.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static apoc.export.util.FileUtils.checkWriteAllowed;
import static apoc.util.FileUtils.checkWriteAllowed;

/**
* @author mh
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/apoc/export/cypher/FileManagerFactory.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package apoc.export.cypher;

import apoc.export.util.FileUtils;
import apoc.util.FileUtils;

import java.io.IOException;
import java.io.PrintWriter;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/apoc/export/graphml/ExportGraphML.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package apoc.export.graphml;

import apoc.export.util.ExportConfig;
import apoc.export.util.FileUtils;
import apoc.util.FileUtils;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.export.util.ProgressReporter;
import apoc.result.ProgressInfo;
Expand All @@ -23,7 +23,7 @@
import java.util.Map;
import java.util.stream.Stream;

import static apoc.export.util.FileUtils.getPrintWriter;
import static apoc.util.FileUtils.getPrintWriter;

/**
* @author mh
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/apoc/load/LoadCsv.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package apoc.load;

import apoc.export.util.CountingReader;
import apoc.export.util.FileUtils;
import apoc.util.FileUtils;
import apoc.meta.Meta;
import apoc.util.Util;
import au.com.bytecode.opencsv.CSVReader;
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/apoc/load/Xml.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package apoc.load;

import apoc.export.util.FileUtils;
import apoc.util.FileUtils;
import apoc.result.MapResult;
import apoc.result.NodeResult;
import apoc.util.Util;
Expand All @@ -26,7 +26,6 @@
import javax.xml.xpath.XPathFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.net.URLConnection;
import java.util.*;
Expand Down Expand Up @@ -69,7 +68,7 @@ private Stream<MapResult> xmlXpathToMapResult(@Name("url") String url, boolean s
DocumentBuilder documentBuilder = documentBuilderFactory.newDocumentBuilder();

FileUtils.checkReadAllowed(url);
Document doc = Util.builderDocument(url, documentBuilder);
Document doc = documentBuilder.parse(Util.openInputStream(url, Collections.emptyMap(), null));
XPathFactory xPathFactory = XPathFactory.newInstance();

XPath xPath = xPathFactory.newXPath();
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/apoc/util/ApocUrlStreamHandlerFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package apoc.util;

import java.net.URLStreamHandler;
import java.net.URLStreamHandlerFactory;

public class ApocUrlStreamHandlerFactory implements URLStreamHandlerFactory {

private static URLStreamHandlerFactory s3StreamHandlerFactory = Util.createInstanceOrNull("apoc.util.s3.S3UrlStreamHandlerFactory");
private static URLStreamHandlerFactory hdfsStreamHandlerFactory = Util.createInstanceOrNull("org.apache.hadoop.fs.FsUrlStreamHandlerFactory");

@Override
public URLStreamHandler createURLStreamHandler(String protocol) {
if (FileUtils.S3_ENABLED && FileUtils.S3_PROTOCOL.equalsIgnoreCase(protocol)) {
return s3StreamHandlerFactory.createURLStreamHandler(protocol);
}
if (FileUtils.HDFS_ENABLED && FileUtils.HDFS_PROTOCOL.equalsIgnoreCase(protocol)) {
return hdfsStreamHandlerFactory.createURLStreamHandler(protocol);
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package apoc.export.util;
package apoc.util;

import apoc.ApocConfiguration;
import apoc.util.Util;
import apoc.export.util.CountingReader;
import apoc.util.hdfs.HDFSUtils;
import apoc.util.s3.S3URLConnection;

import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;

/**
Expand All @@ -14,17 +15,40 @@
*/
public class FileUtils {

public static final String S3_PROTOCOL = "s3";
public static final boolean S3_ENABLED = Util.classExists("com.amazonaws.services.s3.AmazonS3");
public static final String HDFS_PROTOCOL = "hdfs";
public static final boolean HDFS_ENABLED = Util.classExists("org.apache.hadoop.fs.FileSystem");

public static CountingReader readerFor(String fileName) throws IOException {
checkReadAllowed(fileName);
if (fileName==null) return null;
fileName= changeFileUrlIfImportDirectoryConstrained(fileName);
fileName = changeFileUrlIfImportDirectoryConstrained(fileName);
if (fileName.matches("^\\w+:/.+")) {
return Util.openInputStream(fileName,null,null).asReader();
if (HDFSUtils.isHdfs(fileName)) {
return readHdfs(fileName);
} else {
return Util.openInputStream(fileName,null,null).asReader();
}
}
File file = new File(fileName);
return readFile(fileName);
}

private static CountingReader readHdfs(String fileName) {
try {
StreamConnection streamConnection = HDFSUtils.readFile(fileName);
Reader reader = new BufferedReader(new InputStreamReader(streamConnection.getInputStream(), "UTF-8"));
return new CountingReader(reader, streamConnection.getLength());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static CountingReader readFile(String fileName) throws IOException, FileNotFoundException {
File file = new File(fileName);
if (!file.exists() || !file.isFile() || !file.canRead()) throw new IOException("Cannot open file "+fileName+" for reading.");
return new CountingReader(file);
}
}

public static String changeFileUrlIfImportDirectoryConstrained(String url) throws IOException {
if (isFile(url) && ApocConfiguration.isEnabled("import.file.use_neo4j_config")) {
Expand Down Expand Up @@ -56,13 +80,25 @@ public static String changeFileUrlIfImportDirectoryConstrained(String url) throw
public static boolean isFile(String fileName) {
if (fileName==null) return false;
if (fileName.toLowerCase().startsWith("http")) return false;
if (HDFSUtils.isHdfs(fileName)) return false;
if (fileName.toLowerCase().startsWith("file:")) return true;
return true;
}

public static PrintWriter getPrintWriter(String fileName, Writer out) throws IOException {
if (fileName == null) return null;
Writer writer = fileName.equals("-") ? out : new BufferedWriter(new FileWriter(fileName));
Writer writer;

if (HDFSUtils.isHdfs(fileName)) {
try {
writer = new OutputStreamWriter(HDFSUtils.writeFile(fileName));
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
writer = fileName.equals("-") ? out : new BufferedWriter(new FileWriter(fileName));
}

return new PrintWriter(writer);
}

Expand All @@ -74,4 +110,34 @@ public static void checkWriteAllowed() {
if (!ApocConfiguration.isEnabled("export.file.enabled"))
throw new RuntimeException("Export to files not enabled, please set apoc.export.file.enabled=true in your neo4j.conf");
}

public static StreamConnection openS3InputStream(URL url) throws IOException {
if (!S3_ENABLED) {
throw new MissingDependencyException("Cannot find the S3 jars in the plugins folder. \n" +
"Please put these files into the plugins folder :\n\n" +
"aws-java-sdk-core-x.y.z.jar\n" +
"aws-java-sdk-s3-x.y.z.jar\n" +
"httpclient-x.y.z.jar\n" +
"httpcore-x.y.z.jar\n" +
"joda-time-x.y.z.jar\n" +
"\nSee the documentation: https://neo4j-contrib.github.io/neo4j-apoc-procedures/#_loading_data_from_web_apis_json_xml_csv");
}
return S3URLConnection.openS3InputStream(url);
}

public static StreamConnection openHdfsInputStream(URL url) throws IOException {
if (!HDFS_ENABLED) {
throw new MissingDependencyException("Cannot find the HDFS/Hadoop jars in the plugins folder. \n" +
"Please put these files into the plugins folder :\n\n" +
"commons-cli\n" +
"hadoop-auth\n" +
"hadoop-client\n" +
"hadoop-common\n" +
"hadoop-hdfs\n" +
"htrace-core-3.1.0-incubating\n" +
"protobuf-java\n" +
"\nSee the documentation: https://neo4j-contrib.github.io/neo4j-apoc-procedures/#_loading_data_from_web_apis_json_xml_csv");
}
return HDFSUtils.readFile(url);
}
}
8 changes: 0 additions & 8 deletions src/main/java/apoc/util/JsonUtil.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,18 @@
package apoc.util;

import apoc.ApocConfiguration;
import apoc.export.util.FileUtils;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.MappingIterator;
import org.codehaus.jackson.map.ObjectMapper;
import org.neo4j.procedure.Name;

import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Scanner;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down
43 changes: 43 additions & 0 deletions src/main/java/apoc/util/StreamConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package apoc.util;

import apoc.export.util.CountingInputStream;

import java.io.IOException;
import java.io.InputStream;
import java.net.URLConnection;

/**
* @author mh
* @since 26.01.18
*/
public interface StreamConnection {
InputStream getInputStream() throws IOException;
String getEncoding();
long getLength();
default CountingInputStream toCountingInputStream() throws IOException {
return new CountingInputStream(getInputStream(),getLength());
}

static class UrlStreamConnection implements StreamConnection {
private final URLConnection con;

public UrlStreamConnection(URLConnection con) {
this.con = con;
}

@Override
public InputStream getInputStream() throws IOException {
return con.getInputStream();
}

@Override
public String getEncoding() {
return con.getContentEncoding();
}

@Override
public long getLength() {
return con.getContentLength();
}
}
}
Loading

0 comments on commit 4d55cf8

Please sign in to comment.