From 191e16e2952aa540501fe6767819acb9f1adb730 Mon Sep 17 00:00:00 2001 From: ssshow16 Date: Thu, 20 Nov 2014 18:06:19 +0900 Subject: [PATCH] supporting Kerberos Authentication --- RHive/DESCRIPTION | 2 +- RHive/R/api.R | 13 +- RHive/R/hdfs.R | 10 + RHive/R/j2r.R | 9 + RHive/R/rhive.R | 62 +- RHive/R/util.R | 8 + RHive/inst/javasrc/build.num | 4 +- .../src/com/nexr/rhive/hadoop/AuthUtils.java | 30 + .../src/com/nexr/rhive/hadoop/FSUtils.java | 759 +++++++++--------- .../nexr/rhive/hive/DatabaseConnection.java | 19 +- .../com/nexr/rhive/hive/HiveJdbcClient.java | 95 ++- .../com/nexr/rhive/hive/HiveOperations.java | 12 +- .../hive/udf/GenericUDFArrayToString.java | 2 +- .../rhive/hive/udf/GenericUDTFExpand.java | 16 +- .../rhive/hive/udf/GenericUDTFUnFold.java | 18 +- .../com/nexr/rhive/hive/udf/RangeKeyUDF.java | 33 +- .../src/com/nexr/rhive/hive/udf/ScaleUDF.java | 4 +- 17 files changed, 605 insertions(+), 491 deletions(-) create mode 100644 RHive/inst/javasrc/src/com/nexr/rhive/hadoop/AuthUtils.java diff --git a/RHive/DESCRIPTION b/RHive/DESCRIPTION index 9ae10be..6c154e8 100644 --- a/RHive/DESCRIPTION +++ b/RHive/DESCRIPTION @@ -1,7 +1,7 @@ Package: RHive Type: Package Title: R and Hive -Version: 2.0-0.5 +Version: 2.0-0.6 Description: RHive is an R extension facilitating distributed computing via HIVE query. It provides an easy to use HQL like SQL diff --git a/RHive/R/api.R b/RHive/R/api.R index e7ea91f..b29059e 100644 --- a/RHive/R/api.R +++ b/RHive/R/api.R @@ -82,9 +82,9 @@ rhive.env <- function(ALL=FALSE) { ) } -rhive.connect <- function(host="127.0.0.1", port=10000, hiveServer2=NA, defaultFS=NULL, updateJar=FALSE, user=NULL, password=NULL) { +rhive.connect <- function(host="127.0.0.1", port=10000, hiveServer2=NA, defaultFS=NULL, updateJar=FALSE, user=NULL, password=NULL, db="default", properties = character(0)) { tryCatch ( { - .rhive.connect(host=host, port=port, hiveServer2=hiveServer2, defaultFS=defaultFS, updateJar=updateJar, user=user, password=password) + .rhive.connect(host=host, port=port, hiveServer2=hiveServer2, defaultFS=defaultFS, updateJar=updateJar, user=user, password=password,db,properties) }, error=function(e) { .handleErr(e) } @@ -516,3 +516,12 @@ rhive.hdfs.info <- function(path) { } ) } + +rhive.login <- function(keytab,principal,hostname) { + tryCatch ( { + .rhive.login(keytab=keytab,principal=principal,hostname=hostname) + }, error=function(e) { + .handleErr(e) + } + ) +} diff --git a/RHive/R/hdfs.R b/RHive/R/hdfs.R index 73ca754..bc6177a 100644 --- a/RHive/R/hdfs.R +++ b/RHive/R/hdfs.R @@ -19,6 +19,10 @@ } FSUtils <- .j2r.FSUtils() + + AuthUtils <- .j2r.AuthUtils() + AuthUtils$setConfigurationUserGroup(FSUtils$getConf()) + ok <- FSUtils$checkFileSystem(defaultFS) if (!ok) { stop(sprintf("Failed to connect to %s.", defaultFS)) @@ -330,3 +334,9 @@ unlink(files) } } + +.rhive.hdfs.login <- function(){ + j.auth.properties <- .getEnv("AUTH_PROPERTIES") + AuthUtils <- .j2r.AuthUtils() + AuthUtils$loginUserFromKeytab(j.auth.properties,"hdfs") +} \ No newline at end of file diff --git a/RHive/R/j2r.R b/RHive/R/j2r.R index d717027..9c7d0b1 100644 --- a/RHive/R/j2r.R +++ b/RHive/R/j2r.R @@ -19,6 +19,10 @@ FSUtils$getConf() } +.j2r.Properties <- function() { + .jnew("java/util/Properties") +} + .j2r.EnvUtils <- function() { J("com/nexr/rhive/util/EnvUtils") } @@ -36,9 +40,14 @@ } .j2r.FSUtils <- function() { + .rhive.hdfs.login() J("com/nexr/rhive/hadoop/FSUtils") } +.j2r.AuthUtils <- function() { + J("com/nexr/rhive/hadoop/AuthUtils") +} + .j2r.System <- function() { J("java/lang/System") } diff --git a/RHive/R/rhive.R b/RHive/R/rhive.R index f0f3356..9fbf70e 100644 --- a/RHive/R/rhive.R +++ b/RHive/R/rhive.R @@ -94,7 +94,7 @@ .jinit(classpath=cp, parameters=getOption("java.parameters")) } -.rhive.connect <- function(host="127.0.0.1", port=10000, hiveServer2=NA, defaultFS=NULL, updateJar=FALSE, user=NULL, password=NULL) { +.rhive.connect <- function(host="127.0.0.1", port=10000, hiveServer2=NA, defaultFS=NULL, updateJar=FALSE, user=NULL, password=NULL, db="default", properties = character(0)) { initialized <- .getEnv("INITIALIZED") if (is.null(.getEnv("HIVE_HOME")) || is.null(.getEnv("HADOOP_HOME"))) { @@ -121,19 +121,21 @@ userName <- EnvUtils$getUserName() userHome <- EnvUtils$getUserHome() tmpDir <- EnvUtils$getTempDirectory() + j.properties <- .auth.properties(user, password, properties) .setEnv("USERNAME", userName) .setEnv("HOME", userHome) .setEnv("TMP_DIR", tmpDir) - + .setEnv("AUTH_PROPERTIES", j.properties) + System <- .j2r.System() - System$setProperty("RHIVE_UDF_DIR", .FS_UDF_DIR()) + System$setProperty("RHIVE_UDF_DIR", .FS_UDF_DIR()) System$setProperty("HADOOP_CONF_DIR", .HADOOP_CONF_DIR()) - + if (is.null(defaultFS)) { defaultFS <- .DEFAULT_FS() } - + .rhive.hdfs.connect(defaultFS) .copyJarsToHdfs(updateJar) @@ -142,7 +144,7 @@ } hiveClient <- .j2r.HiveJdbcClient(hiveServer2) - hiveClient$connect(host, as.integer(port), user, password) + hiveClient$connect(host, as.integer(port), db, user,password, j.properties) hiveClient$addJar(.FS_JAR_PATH()) .registerUDFs(hiveClient) @@ -151,9 +153,31 @@ .setEnv("hiveClient", hiveClient) .makeBaseDirs() + + if (db != "default") { + .rhive.use.database(db) + } } } +.auth.properties <- function(user, password, properties) { +#.make.j.properties <- function(properties) { + j.properties <- .j2r.Properties() + if (!is.empty(user)) { + j.properties$setProperty("user", user) + } + + if (!is.empty(password)) { + j.properties$setProperty("password", password) + } + + if (!is.empty(properties)) { + l <- lapply(strsplit(properties, split = "="), function(x) { gsub("^\\s+|\\s+$", "", x) }) + lapply(l, function(p) { if (length(p) == 2) { j.properties$setProperty(p[1], p[2]) } }) + } + return(j.properties) +} + .copyJarsToHdfs <- function(updateJar) { jar <- paste(system.file(package="RHive"), "java", "rhive_udf.jar", sep=.Platform$file.sep) @@ -207,28 +231,28 @@ .makeBaseDirs <- function() { if (!.rhive.hdfs.exists(.FS_BASE_DATA_DIR())) { - .dfs.mkdir(.FS_BASE_DATA_DIR()) - .dfs.chmod("777", .FS_BASE_DATA_DIR()) + .rhive.hdfs.mkdirs(.FS_BASE_DATA_DIR()) + .rhive.hdfs.chmod("777",.FS_BASE_DATA_DIR()) } if (!.rhive.hdfs.exists(.FS_BASE_UDF_DIR())) { - .dfs.mkdir(.FS_BASE_UDF_DIR()) - .dfs.chmod("777", .FS_BASE_UDF_DIR()) + .rhive.hdfs.mkdirs(.FS_BASE_UDF_DIR()) + .rhive.hdfs.chmod("777",.FS_BASE_UDF_DIR()) } if (!.rhive.hdfs.exists(.FS_BASE_TMP_DIR())) { - .dfs.mkdir(.FS_BASE_TMP_DIR()) - .dfs.chmod("777", .FS_BASE_TMP_DIR()) + .rhive.hdfs.mkdirs(.FS_BASE_TMP_DIR()) + .rhive.hdfs.chmod("777",.FS_BASE_TMP_DIR()) } if(!.rhive.hdfs.exists(.FS_TMP_DIR())){ - .dfs.mkdir(.FS_TMP_DIR()) - .dfs.chmod("777", .FS_TMP_DIR()) + .rhive.hdfs.mkdirs(.FS_TMP_DIR()) + .rhive.hdfs.chmod("777",.FS_TMP_DIR()) } if (!.rhive.hdfs.exists(.FS_BASE_MR_SCRIPT_DIR())) { - .dfs.mkdir(.FS_BASE_MR_SCRIPT_DIR()) - .dfs.chmod("777", .FS_BASE_MR_SCRIPT_DIR()) + .rhive.hdfs.mkdirs(.FS_BASE_MR_SCRIPT_DIR()) + .rhive.hdfs.chmod("777",.FS_BASE_MR_SCRIPT_DIR()) } } @@ -519,11 +543,11 @@ recs <- unlist(strsplit(desc, split = "\n", fixed = TRUE)) - l <- list(colname = character(0), type = character(0), comment = character(0)) + l <- list(col_name = character(0), data_type = character(0), comment = character(0)) for (i in seq_along(recs)) { v <- unlist(strsplit(recs[i], split = "\\s+")) - l$colname[i] <- v[1] - l$type[i] <- v[2] + l$col_name[i] <- v[1] + l$data_type[i] <- v[2] ## support hive (>= 0.13.# ) if (length(v) >= 3) { l$comment[i] <- v[3] diff --git a/RHive/R/util.R b/RHive/R/util.R index 482826b..c8d6710 100644 --- a/RHive/R/util.R +++ b/RHive/R/util.R @@ -13,3 +13,11 @@ # limitations under the License. trim <- function (x) gsub("^\\s+|\\s+$", "", x) + +is.empty <- function(val) { + if (is.null(val) || is.na(val) || length(val) == 0) { + return(TRUE) + } else { + return(FALSE) + } +} diff --git a/RHive/inst/javasrc/build.num b/RHive/inst/javasrc/build.num index 681273b..b7dc4dc 100644 --- a/RHive/inst/javasrc/build.num +++ b/RHive/inst/javasrc/build.num @@ -1,3 +1,3 @@ #Build Number for ANT. Do not edit! -#Mon Nov 10 10:43:12 KST 2014 -build.number=334 +#Thu Nov 20 17:11:32 KST 2014 +build.number=359 diff --git a/RHive/inst/javasrc/src/com/nexr/rhive/hadoop/AuthUtils.java b/RHive/inst/javasrc/src/com/nexr/rhive/hadoop/AuthUtils.java new file mode 100644 index 0000000..eff6180 --- /dev/null +++ b/RHive/inst/javasrc/src/com/nexr/rhive/hadoop/AuthUtils.java @@ -0,0 +1,30 @@ +package com.nexr.rhive.hadoop; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.util.Properties; + +/** + * Created by bruceshin on 11/19/14. + */ +public class AuthUtils { + + public static void loginUserFromKeytab(Properties properties, String service) throws IOException{ + + String principal = properties.getProperty(service + ".principal"); + String keytab = properties.getProperty(service + ".keytab"); + + if(StringUtils.isEmpty(principal) || StringUtils.isEmpty(keytab)){ + return; + } + + UserGroupInformation.loginUserFromKeytab(principal, keytab); + } + + public static void setConfigurationUserGroup(Configuration conf){ + UserGroupInformation.setConfiguration(conf); + } +} diff --git a/RHive/inst/javasrc/src/com/nexr/rhive/hadoop/FSUtils.java b/RHive/inst/javasrc/src/com/nexr/rhive/hadoop/FSUtils.java index 5c857a8..8f86890 100755 --- a/RHive/inst/javasrc/src/com/nexr/rhive/hadoop/FSUtils.java +++ b/RHive/inst/javasrc/src/com/nexr/rhive/hadoop/FSUtils.java @@ -14,386 +14,393 @@ import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.tools.DFSck; +import org.apache.hadoop.security.UserGroupInformation; import java.net.URL; import java.io.File; import java.io.FilenameFilter; public class FSUtils { - private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm"); - - public static boolean checkFileSystem(String defaultFS) throws IOException { - Configuration conf = getConf(defaultFS); - try { - FileSystem.get(conf); - } catch (Exception e) { - return false; - } - - return true; - } - - public static String[][] ls(String src, String defaultFS) throws IOException { - Configuration conf = getConf(defaultFS); - - Path srcPath = new Path(src); - FileSystem fs = null; - - List items = new ArrayList(); - try { - fs = srcPath.getFileSystem(conf); - FileStatus[] stats = fs.globStatus(srcPath); - if (stats != null) { - for (FileStatus stat : stats) { - if (!stat.isDir()) { - items.add(stat); - } else { - Path path = stat.getPath(); - FileStatus files[] = fs.listStatus(path); - if (files != null) { - for (FileStatus file : files) { - items.add(file); - } - } - } - } - } - } finally { - closeFileSystem(fs); - } - - String[][] rlist = new String[6][items.size()]; - for (int i = 0; i < 6; i++) { - rlist[i] = new String[items.size()]; - } - - for (int i = 0; i < 6; i++) { - for (int j = 0; j < items.size(); j++) { - FileStatus item = items.get(j); - - rlist[0][j] = item.getPermission().toString(); - rlist[1][j] = item.getOwner(); - rlist[2][j] = item.getGroup(); - rlist[3][j] = String.valueOf(item.getLen()); - - Date date = new Date(item.getModificationTime()); - rlist[4][j] = dateFormat.format(date); - rlist[5][j] = item.getPath().toUri().getPath(); - } - } - - - return rlist; - } - - private static int ls(FileStatus src, FileSystem srcFs) throws IOException { - FileStatus items[] = listStatus(srcFs, src); - - int numOfErrors = 0; - - for (int i = 0; i < items.length; i++) { - FileStatus stat = items[i]; - - int replication = String.valueOf(stat.getReplication()).length(); - int len = String.valueOf(stat.getLen()).length(); - int owner = String.valueOf(stat.getOwner()).length(); - int group = String.valueOf(stat.getGroup()).length(); - } - - for (int i = 0; i < items.length; i++) { - FileStatus item = items[i]; - item.getPermission().toString(); - item.getOwner(); - item.getGroup(); - String.valueOf(item.getLen()); - - Date date = new Date(item.getModificationTime()); - dateFormat.format(date); - item.getPath().toUri().getPath(); - } - - return numOfErrors; - } - - private static FileStatus[] listStatus(FileSystem srcFs, FileStatus src) throws IOException { - if (!src.isDir()) { - FileStatus files[] = { src }; - return files; - } - - Path path = src.getPath(); - FileStatus files[] = srcFs.listStatus(path); - if (files == null) { - files = new FileStatus[0]; - } - - return files; - } - - public static String[][] du(String src, String defaultFS) throws IOException { - Configuration conf = getConf(defaultFS); - - Path srcPath = new Path(src); - FileSystem fs = null; - FileStatus items[] = null; - - long length[] = null; - try { - fs = srcPath.getFileSystem(conf); - Path pathItems[] = FileUtil.stat2Paths(fs.globStatus(srcPath), srcPath); - items = fs.listStatus(pathItems); - if (items == null) { - items = new FileStatus[0]; - } - - length = new long[items.length]; - for (int i = 0; i < items.length; i++) { - length[i] = items[i].isDir() ? fs.getContentSummary(items[i].getPath()).getLength() : items[i].getLen(); - } - - } finally { - closeFileSystem(fs); - } - - String[][] rlist = new String[2][items.length]; - for (int i = 0; i < 2; i++) { - rlist[i] = new String[items.length]; - } - - for (int i = 0; i < 2; i++) { - for (int j = 0; j < items.length; j++) { - FileStatus item = items[j]; - - rlist[0][j] = String.valueOf(length[j]); - rlist[1][j] = item.getPath().toUri().getPath(); - } - } - - return rlist; - } - - public static String[][] dus(String src, String defaultFS) throws IOException { - Configuration conf = getConf(defaultFS); - - Path srcPath = new Path(src); - FileSystem fs = null; - FileStatus status[] = null; - - long length[] = null; - try { - fs = srcPath.getFileSystem(conf); - status = fs.globStatus(new Path(src)); - if (status == null) { - status = new FileStatus[0]; - } - - length = new long[status.length]; - for (int i = 0; i < status.length; i++) { - length[i] = fs.getContentSummary(status[i].getPath()).getLength(); - } - - } finally { - closeFileSystem(fs); - } - - String[][] rlist = new String[2][status.length]; - for (int i = 0; i < 2; i++) { - rlist[i] = new String[status.length]; - } - - for (int i = 0; i < 2; i++) { - for (int j = 0; j < status.length; j++) { - FileStatus item = status[j]; - - rlist[0][j] = String.valueOf(length[j]); - rlist[1][j] = item.getPath().toUri().getPath(); - } - } - - return rlist; - } - - public static void copyFromLocalFile(boolean delSrc, boolean overwrite, String src, String dst, String defaultFS) throws IOException { - Configuration conf = getConf(defaultFS); - - FileSystem fs = null; - try { - fs = FileSystem.get(conf); - fs.copyFromLocalFile(delSrc, overwrite, new Path(src), new Path(dst)); - } finally { - closeFileSystem(fs); - } - } - - public static void copyToLocalFile(boolean delSrc, String src, String dst, String defaultFS) throws IOException { - Configuration conf = getConf(defaultFS); - - FileSystem fs = null; - try { - fs = FileSystem.get(conf); - fs.copyToLocalFile(delSrc, new Path(src), new Path(dst)); - } finally { - closeFileSystem(fs); - } - } - - public static boolean delete(String file, String defaultFS) throws IOException { - Configuration conf = getConf(defaultFS); - - FileSystem fs = null; - try { - fs = FileSystem.get(conf); - return fs.delete(new Path(file), true); - } finally { - closeFileSystem(fs); - } - } - - public static boolean rename(String src, String dst, String defaultFS) throws IOException { - Configuration conf = getConf(defaultFS); - - FileSystem fs = null; - try { - fs = FileSystem.get(conf); - return fs.rename(new Path(src), new Path(dst)); - } finally { - closeFileSystem(fs); - } - } - - public static boolean exists(String file, String defaultFS) throws IOException { - Configuration conf = getConf(defaultFS); - - Path path = new Path(file); - FileSystem fs = null; - try { - fs = path.getFileSystem(conf); - return fs.exists(path); - } finally { - closeFileSystem(fs); - } - } - - public static boolean mkdirs(String file, String defaultFS) throws IOException { - Configuration conf = getConf(defaultFS); - - Path path = new Path(file); - FileSystem fs = null; - try { - fs = path.getFileSystem(conf); - return fs.mkdirs(path); - } finally { - closeFileSystem(fs); - } - } - - public static void cat(String src, String defaultFS) throws Exception { - Configuration conf = getConf(defaultFS); - - FsShell fsShell = new FsShell(conf); - fsShell.run(new String[] { "-cat", src }); - } - - public static void tail(String src, String defaultFS) throws Exception { - Configuration conf = getConf(defaultFS); - - FsShell fsShell = new FsShell(conf); - fsShell.run(new String[] { "-tail", src }); - } - - public static void chmod(String src, String option, boolean recursive, String defaultFS) throws Exception { - Configuration conf = getConf(defaultFS); - - FsShell fsShell = new FsShell(conf); - - if (recursive) { - fsShell.run(new String[] { "-chmod", "-R", option, src }); - } else { - fsShell.run(new String[] { "-chmod", option, src }); - } - } - - public static void chown(String src, String option, boolean recursive, String defaultFS) throws Exception { - Configuration conf = getConf(defaultFS); - - FsShell fsShell = new FsShell(conf); - - if (recursive) { - fsShell.run(new String[] { "-chown", "-R", option, src }); - } else { - fsShell.run(new String[] { "-chown", option, src }); - } - } - - public static void chgrp(String src, String option, boolean recursive, String defaultFS) throws Exception { - Configuration conf = getConf(defaultFS); - - FsShell fsShell = new FsShell(conf); - - if (recursive) { - fsShell.run(new String[] { "-chgrp", "-R", option, src }); - } else { - fsShell.run(new String[] { "-chgrp", option, src }); - } - } - - public static void info(String src) throws IOException { - Configuration conf = getConf(null); - - DFSck dfsCk = new DFSck(conf); - dfsCk.run(new String[] { src }); - } - - - private static void closeFileSystem(FileSystem fs) { - try { - if (fs != null) { - fs.close(); - } - } catch (IOException e) { } - } - - public static Configuration getConf(String defaultFS){ - - Configuration conf = getConf(); - - //override defaultFS - if (StringUtils.isNotEmpty(defaultFS)) { - FileSystem.setDefaultUri(conf, defaultFS); - } - - return conf; - } - - public static Configuration getConf(){ - - Configuration conf = new Configuration(); - - String hadoopConfPath = System.getProperty("HADOOP_CONF_DIR"); - - if(StringUtils.isNotEmpty(hadoopConfPath)){ - - File dir = new File(hadoopConfPath); - if(!dir.exists() || !dir.isDirectory()){ - return conf; - } - - File[] files = dir.listFiles( - new FilenameFilter(){ - public boolean accept(File dir, String name){ - return name.endsWith("xml"); - } - } - ); - - for(File file : files){ - try{ - URL url = new URL("file://" + file.getCanonicalPath()); - conf.addResource(url); - }catch(Exception e){} - } - } - - return conf; - } + + private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm"); + + private static Configuration conf; + + public static boolean checkFileSystem(String defaultFS) throws IOException { + Configuration conf = getConf(defaultFS); + try { + FileSystem.get(conf); + } catch (Exception e) { + return false; + } + + return true; + } + + public static String[][] ls(String src, String defaultFS) throws IOException { + Configuration conf = getConf(defaultFS); + + Path srcPath = new Path(src); + FileSystem fs = null; + + List items = new ArrayList(); + try { + fs = srcPath.getFileSystem(conf); + FileStatus[] stats = fs.globStatus(srcPath); + if (stats != null) { + for (FileStatus stat : stats) { + if (!stat.isDir()) { + items.add(stat); + } else { + Path path = stat.getPath(); + FileStatus files[] = fs.listStatus(path); + if (files != null) { + for (FileStatus file : files) { + items.add(file); + } + } + } + } + } + } finally { + closeFileSystem(fs); + } + + String[][] rlist = new String[6][items.size()]; + for (int i = 0; i < 6; i++) { + rlist[i] = new String[items.size()]; + } + + for (int i = 0; i < 6; i++) { + for (int j = 0; j < items.size(); j++) { + FileStatus item = items.get(j); + + rlist[0][j] = item.getPermission().toString(); + rlist[1][j] = item.getOwner(); + rlist[2][j] = item.getGroup(); + rlist[3][j] = String.valueOf(item.getLen()); + + Date date = new Date(item.getModificationTime()); + rlist[4][j] = dateFormat.format(date); + rlist[5][j] = item.getPath().toUri().getPath(); + } + } + + + return rlist; + } + + private static int ls(FileStatus src, FileSystem srcFs) throws IOException { + FileStatus items[] = listStatus(srcFs, src); + + int numOfErrors = 0; + + for (int i = 0; i < items.length; i++) { + FileStatus stat = items[i]; + + int replication = String.valueOf(stat.getReplication()).length(); + int len = String.valueOf(stat.getLen()).length(); + int owner = String.valueOf(stat.getOwner()).length(); + int group = String.valueOf(stat.getGroup()).length(); + } + + for (int i = 0; i < items.length; i++) { + FileStatus item = items[i]; + item.getPermission().toString(); + item.getOwner(); + item.getGroup(); + String.valueOf(item.getLen()); + + Date date = new Date(item.getModificationTime()); + dateFormat.format(date); + item.getPath().toUri().getPath(); + } + + return numOfErrors; + } + + private static FileStatus[] listStatus(FileSystem srcFs, FileStatus src) throws IOException { + if (!src.isDir()) { + FileStatus files[] = {src}; + return files; + } + + Path path = src.getPath(); + FileStatus files[] = srcFs.listStatus(path); + if (files == null) { + files = new FileStatus[0]; + } + + return files; + } + + public static String[][] du(String src, String defaultFS) throws IOException { + Configuration conf = getConf(defaultFS); + + Path srcPath = new Path(src); + FileSystem fs = null; + FileStatus items[] = null; + + long length[] = null; + try { + fs = srcPath.getFileSystem(conf); + Path pathItems[] = FileUtil.stat2Paths(fs.globStatus(srcPath), srcPath); + items = fs.listStatus(pathItems); + if (items == null) { + items = new FileStatus[0]; + } + + length = new long[items.length]; + for (int i = 0; i < items.length; i++) { + length[i] = items[i].isDir() ? fs.getContentSummary(items[i].getPath()).getLength() : items[i].getLen(); + } + + } finally { + closeFileSystem(fs); + } + + String[][] rlist = new String[2][items.length]; + for (int i = 0; i < 2; i++) { + rlist[i] = new String[items.length]; + } + + for (int i = 0; i < 2; i++) { + for (int j = 0; j < items.length; j++) { + FileStatus item = items[j]; + + rlist[0][j] = String.valueOf(length[j]); + rlist[1][j] = item.getPath().toUri().getPath(); + } + } + + return rlist; + } + + public static String[][] dus(String src, String defaultFS) throws IOException { + Configuration conf = getConf(defaultFS); + + Path srcPath = new Path(src); + FileSystem fs = null; + FileStatus status[] = null; + + long length[] = null; + try { + fs = srcPath.getFileSystem(conf); + status = fs.globStatus(new Path(src)); + if (status == null) { + status = new FileStatus[0]; + } + + length = new long[status.length]; + for (int i = 0; i < status.length; i++) { + length[i] = fs.getContentSummary(status[i].getPath()).getLength(); + } + + } finally { + closeFileSystem(fs); + } + + String[][] rlist = new String[2][status.length]; + for (int i = 0; i < 2; i++) { + rlist[i] = new String[status.length]; + } + + for (int i = 0; i < 2; i++) { + for (int j = 0; j < status.length; j++) { + FileStatus item = status[j]; + + rlist[0][j] = String.valueOf(length[j]); + rlist[1][j] = item.getPath().toUri().getPath(); + } + } + + return rlist; + } + + public static void copyFromLocalFile(boolean delSrc, boolean overwrite, String src, String dst, String defaultFS) throws IOException { + Configuration conf = getConf(defaultFS); + + FileSystem fs = null; + try { + fs = FileSystem.get(conf); + fs.copyFromLocalFile(delSrc, overwrite, new Path(src), new Path(dst)); + } finally { + closeFileSystem(fs); + } + } + + public static void copyToLocalFile(boolean delSrc, String src, String dst, String defaultFS) throws IOException { + Configuration conf = getConf(defaultFS); + + FileSystem fs = null; + try { + fs = FileSystem.get(conf); + fs.copyToLocalFile(delSrc, new Path(src), new Path(dst)); + } finally { + closeFileSystem(fs); + } + } + + public static boolean delete(String file, String defaultFS) throws IOException { + Configuration conf = getConf(defaultFS); + + FileSystem fs = null; + try { + fs = FileSystem.get(conf); + return fs.delete(new Path(file), true); + } finally { + closeFileSystem(fs); + } + } + + public static boolean rename(String src, String dst, String defaultFS) throws IOException { + Configuration conf = getConf(defaultFS); + + FileSystem fs = null; + try { + fs = FileSystem.get(conf); + return fs.rename(new Path(src), new Path(dst)); + } finally { + closeFileSystem(fs); + } + } + + public static boolean exists(String file, String defaultFS) throws IOException { + Configuration conf = getConf(defaultFS); + + Path path = new Path(file); + FileSystem fs = null; + try { + fs = path.getFileSystem(conf); + return fs.exists(path); + } finally { + closeFileSystem(fs); + } + } + + public static boolean mkdirs(String file, String defaultFS) throws IOException { + Configuration conf = getConf(defaultFS); + + Path path = new Path(file); + FileSystem fs = null; + try { + fs = path.getFileSystem(conf); + return fs.mkdirs(path); + } finally { + closeFileSystem(fs); + } + } + + public static void cat(String src, String defaultFS) throws Exception { + Configuration conf = getConf(defaultFS); + + FsShell fsShell = new FsShell(conf); + fsShell.run(new String[]{"-cat", src}); + } + + public static void tail(String src, String defaultFS) throws Exception { + Configuration conf = getConf(defaultFS); + + FsShell fsShell = new FsShell(conf); + fsShell.run(new String[]{"-tail", src}); + } + + public static void chmod(String src, String option, boolean recursive, String defaultFS) throws Exception { + Configuration conf = getConf(defaultFS); + + FsShell fsShell = new FsShell(conf); + + if (recursive) { + fsShell.run(new String[]{"-chmod", "-R", option, src}); + } else { + fsShell.run(new String[]{"-chmod", option, src}); + } + } + + public static void chown(String src, String option, boolean recursive, String defaultFS) throws Exception { + Configuration conf = getConf(defaultFS); + + FsShell fsShell = new FsShell(conf); + + if (recursive) { + fsShell.run(new String[]{"-chown", "-R", option, src}); + } else { + fsShell.run(new String[]{"-chown", option, src}); + } + } + + public static void chgrp(String src, String option, boolean recursive, String defaultFS) throws Exception { + Configuration conf = getConf(defaultFS); + + FsShell fsShell = new FsShell(conf); + + if (recursive) { + fsShell.run(new String[]{"-chgrp", "-R", option, src}); + } else { + fsShell.run(new String[]{"-chgrp", option, src}); + } + } + + public static void info(String src) throws IOException { + Configuration conf = getConf(null); + + DFSck dfsCk = new DFSck(conf); + dfsCk.run(new String[]{src}); + } + + private static void closeFileSystem(FileSystem fs) { + try { + if (fs != null) { + fs.close(); + } + } catch (IOException e) { + } + } + + public static Configuration getConf(String defaultFS) throws IOException{ + + Configuration conf = getConf(); + + //override defaultFS + if (StringUtils.isNotEmpty(defaultFS)) { + FileSystem.setDefaultUri(conf, defaultFS); + } + + return conf; + } + + public static Configuration getConf() throws IOException{ + + if(conf != null) return conf; + + conf = new Configuration(); + + String hadoopConfPath = System.getProperty("HADOOP_CONF_DIR"); + + if (StringUtils.isNotEmpty(hadoopConfPath)) { + + File dir = new File(hadoopConfPath); + if (!dir.exists() || !dir.isDirectory()) { + return conf; + } + + File[] files = dir.listFiles( + new FilenameFilter() { + public boolean accept(File dir, String name) { + return name.endsWith("xml"); + } + } + ); + + for (File file : files) { + try { + URL url = new URL("file://" + file.getCanonicalPath()); + conf.addResource(url); + } catch (Exception e) { + } + } + } + + return conf; + } } \ No newline at end of file diff --git a/RHive/inst/javasrc/src/com/nexr/rhive/hive/DatabaseConnection.java b/RHive/inst/javasrc/src/com/nexr/rhive/hive/DatabaseConnection.java index 02004ba..5b76ef6 100755 --- a/RHive/inst/javasrc/src/com/nexr/rhive/hive/DatabaseConnection.java +++ b/RHive/inst/javasrc/src/com/nexr/rhive/hive/DatabaseConnection.java @@ -5,6 +5,7 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Properties; import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -16,16 +17,15 @@ class DatabaseConnection { private DatabaseMetaData metaData; private final String url; private final String username; - private final String password; - + private final String password; private Schema schema = null; - public DatabaseConnection(String driver, String url, String username, String password) { - this.driver = driver; - this.url = url; - this.username = username; - this.password = password; - } + public DatabaseConnection(String driver, String url, String username, String password) { + this.driver = driver; + this.url = url; + this.username = username; + this.password = password; + } @Override public String toString() { @@ -48,7 +48,8 @@ boolean connect() throws SQLException { close(); - setConnection(DriverManager.getConnection(getUrl(), username, password)); +// setConnection(DriverManager.getConnection(getUrl(), properties)); + setConnection(DriverManager.getConnection(getUrl(), username, password)); setDatabaseMetaData(getConnection(false).getMetaData()); return true; diff --git a/RHive/inst/javasrc/src/com/nexr/rhive/hive/HiveJdbcClient.java b/RHive/inst/javasrc/src/com/nexr/rhive/hive/HiveJdbcClient.java index dd94729..e29efb0 100755 --- a/RHive/inst/javasrc/src/com/nexr/rhive/hive/HiveJdbcClient.java +++ b/RHive/inst/javasrc/src/com/nexr/rhive/hive/HiveJdbcClient.java @@ -1,17 +1,17 @@ package com.nexr.rhive.hive; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.security.UserGroupInformation; + +import java.sql.*; +import java.util.Enumeration; +import java.util.Properties; import java.util.StringTokenizer; public class HiveJdbcClient implements HiveOperations { private DatabaseConnection databaseConnection; private int version; private ResultSet columns; - public HiveJdbcClient(boolean hiveServer2) { if (hiveServer2) { @@ -20,21 +20,14 @@ public HiveJdbcClient(boolean hiveServer2) { this.version = 1; } } - - public void connect(String host, int port) throws SQLException { - connect(host, port, "default", null, null); - } - public void connect(String host, int port, String db) throws SQLException { - connect(host, port, db, null, null); - } + public void connect(String host, int port, String db, String user, String password) throws SQLException { + connect(host, port, db, user,password, new Properties()); + } - public void connect(String host, int port, String user, String password) throws SQLException { - connect(host, port, "default", user, password); - } - - public void connect(String host, int port, String db, String user, String password) throws SQLException { - HiveJdbcConnector hiveConnector = new HiveJdbcConnector(host, port, db, user, password); + public void connect(String host, int port, String db, String user, String password, Properties properties) throws SQLException { + + HiveJdbcConnector hiveConnector = new HiveJdbcConnector(host, port, db, user,password, properties); hiveConnector.setDaemon(false); hiveConnector.start(); try { @@ -44,11 +37,11 @@ public void connect(String host, int port, String db, String user, String passwo if (hiveConnector.isAlive()) { int version = getVersion(); throw new SQLException( - String.format( - "Connection to hiveserver has timed out.\n" + - "\t--connection-parameters(version: hiveserver%d, host:%s, port:%d, db:%s, user:%s, password:%s)\n\n" + - "Restart R session and retry with correct arguments.", - version, host, port, db, user, password)); + String.format( + "Connection to hive server has timed out.\n" + + "\t--connection-parameters(version: hive server%d, host:%s, port:%d, db:%s, properties:%s)\n\n" + + "Restart R session and retry with correct arguments.", + version, host, port, db, properties)); } } @@ -286,7 +279,6 @@ public boolean deleteArchive(String archive) throws SQLException { String command = String.format("DELETE ARCHIVES %s", archive); execute(command); return true; - } @Override @@ -298,34 +290,38 @@ protected boolean isThriftTransportException(Exception e) { String msg = e.getMessage(); return msg.indexOf("TTransportException") != -1; } - - + private class HiveJdbcConnector extends Thread { private String host; private int port; private String db; - private String user; - private String password; + private String user; + private String password; + private Properties properties; - public HiveJdbcConnector(String host, int port, String db, String user, String password) { + public HiveJdbcConnector(String host, int port, String db, String user, String password, Properties properties) { this.host = host; this.port = port; this.db = db; - this.user = user; - this.password = password; + this.user = user; + this.password = password; + this.properties = properties; } @Override public void run() { - connect(host, port, db, user, password); + connect(host, port, db, user, password); } - - public void connect(String host, int port, String db, String user, String password) { + + public void connect(String host, int port, String db, String user, String password) { try { - String url = getUrl(host, port, db); + String url = getUrl(host, port, db, properties); String driver = getDriver(); - DatabaseConnection connection = new DatabaseConnection(driver, url, user, password); + + loginUserFromKeytab(properties); + + DatabaseConnection connection = new DatabaseConnection(driver, url, user,password); connection.connect(); databaseConnection = connection; @@ -337,8 +333,18 @@ public void connect(String host, int port, String db, String user, String passwo } } } - - private String getUrl(String host, int port, String db) { + + private void loginUserFromKeytab(Properties p) throws Exception{ + String principal = p.getProperty("hive.principal"); + String keytab = p.getProperty("hive.keytab"); + + if(StringUtils.isEmpty(principal) || StringUtils.isEmpty(keytab)){ + return; + } + UserGroupInformation.loginUserFromKeytab(principal, keytab); + } + + private String getUrl(String host, int port, String db, Properties p) { String scheme = getUrlPrefix(); StringBuilder sb = new StringBuilder(scheme); sb.append(host); @@ -347,10 +353,19 @@ private String getUrl(String host, int port, String db) { sb.append("/"); sb.append(db); + Enumeration e = p.propertyNames(); + while(e.hasMoreElements()) { + String key = (String)e.nextElement(); + if(key.startsWith("hive")){ + String val = p.getProperty(key); + key = key.replace("hive.",""); + sb.append(";").append(key).append("=").append(val); + } + } + return sb.toString(); } - private String getUrlPrefix() { switch (version) { case 1: diff --git a/RHive/inst/javasrc/src/com/nexr/rhive/hive/HiveOperations.java b/RHive/inst/javasrc/src/com/nexr/rhive/hive/HiveOperations.java index a4cb7f8..820dd80 100755 --- a/RHive/inst/javasrc/src/com/nexr/rhive/hive/HiveOperations.java +++ b/RHive/inst/javasrc/src/com/nexr/rhive/hive/HiveOperations.java @@ -1,15 +1,15 @@ package com.nexr.rhive.hive; import java.sql.SQLException; - +import java.util.Properties; public interface HiveOperations { - void connect(String host, int port) throws SQLException; - void connect(String host, int port, String db) throws SQLException; - void connect(String host, int port, String user, String password) throws SQLException; - void connect(String host, int port, String db, String user, String password) throws SQLException; - void close() throws SQLException; + + void connect(String host, int port, String db, String user, String password) throws SQLException; + void connect(String host, int port, String db, String user, String password, Properties properties) throws SQLException; + + void close() throws SQLException; void checkConnection() throws SQLException; boolean execute(String query) throws SQLException; diff --git a/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/GenericUDFArrayToString.java b/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/GenericUDFArrayToString.java index f659265..3b3ff77 100644 --- a/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/GenericUDFArrayToString.java +++ b/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/GenericUDFArrayToString.java @@ -18,7 +18,7 @@ + " or the elements of a map to string. ") public class GenericUDFArrayToString extends GenericUDF { - private ObjectInspector inputOI = null; + private transient ObjectInspector inputOI = null; public Object evaluate(DeferredObject[] o) throws HiveException { diff --git a/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/GenericUDTFExpand.java b/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/GenericUDTFExpand.java index ba2dec8..b0c0cb0 100644 --- a/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/GenericUDTFExpand.java +++ b/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/GenericUDTFExpand.java @@ -49,15 +49,15 @@ enum DATA_TYPE { STRING, DOUBLE, INT, NUMERIC, NULLNAME }; - DATA_TYPE data_type; // mapping from data-type to enum DATA_TYPE - Writable[] retRow; // returned row value but only use one. - Writable[] row; // object pool of non-null writable, avoid creating + transient DATA_TYPE data_type; // mapping from data-type to enum DATA_TYPE + transient Writable[] retRow; // returned row value but only use one. + transient Writable[] row; // object pool of non-null writable, avoid creating // objects all the time - Object[] nullRow; // array of null row value - ObjectInspector[] inputOIs; // input ObjectInspectors - boolean pathParsed = false; - boolean seenErrors = false; - String delim = ","; + transient Object[] nullRow; // array of null row value + transient ObjectInspector[] inputOIs; // input ObjectInspectors + transient boolean pathParsed = false; + transient boolean seenErrors = false; + transient String delim = ","; @Override public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { diff --git a/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/GenericUDTFUnFold.java b/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/GenericUDTFUnFold.java index a80208a..3731142 100644 --- a/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/GenericUDTFUnFold.java +++ b/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/GenericUDTFUnFold.java @@ -49,17 +49,17 @@ enum DATA_TYPE { STRING, DOUBLE, INT, NUMERIC, NULLNAME }; - int numCols; // number of output columns + transient int numCols; // number of output columns - DATA_TYPE[] data_types; // mapping from data-types to enum DATA_TYPE - Writable[] retCols; // array of returned column values - Writable[] cols; // object pool of non-null writable, avoid creating + transient DATA_TYPE[] data_types; // mapping from data-types to enum DATA_TYPE + transient Writable[] retCols; // array of returned column values + transient Writable[] cols; // object pool of non-null writable, avoid creating // objects all the time - Object[] nullCols; // array of null column values - ObjectInspector[] inputOIs; // input ObjectInspectors - boolean pathParsed = false; - boolean seenErrors = false; - String delim = ","; + transient Object[] nullCols; // array of null column values + transient ObjectInspector[] inputOIs; // input ObjectInspectors + transient boolean pathParsed = false; + transient boolean seenErrors = false; + transient String delim = ","; @Override public StructObjectInspector initialize(ObjectInspector[] args) diff --git a/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/RangeKeyUDF.java b/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/RangeKeyUDF.java index a120f60..bbc9140 100644 --- a/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/RangeKeyUDF.java +++ b/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/RangeKeyUDF.java @@ -35,18 +35,18 @@ public class RangeKeyUDF extends GenericUDF { - private Configuration config; +// private transient Configuration config; - private ObjectInspector[] argumentOIs; + private transient ObjectInspector[] argumentOIs; - private RANGEVALUE rangeValue; + private transient RANGEVALUE rangeValue; - private ObjectInspector returnOI; +// private transient ObjectInspector returnOI; - private String breaks = null; - private Boolean isRight = null; + private transient String breaks = null; + private transient Boolean isRight = null; - private static Map TREES = new LinkedHashMap(); + private transient static Map TREES = new LinkedHashMap(); public static enum RANGEVALUE { INT_TYPE { @@ -241,10 +241,11 @@ private RANGEVALUE valueOf(String typeName) @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { - if (config == null) { - SessionState session = SessionState.get(); - config = session == null ? new Configuration() : session.getConf(); - } + +// if (config == null) { +// SessionState session = SessionState.get(); +// config = session == null ? new Configuration() : session.getConf(); +// } if (arguments.length < 3) { throw new UDFArgumentLengthException( @@ -259,7 +260,9 @@ public ObjectInspector initialize(ObjectInspector[] arguments) } catch (Exception e) { throw new UDFArgumentException(e); } - this.returnOI = PrimitiveObjectInspectorFactory.writableStringObjectInspector; + +// this.returnOI = PrimitiveObjectInspectorFactory.writableStringObjectInspector; + ObjectInspector returnOI = PrimitiveObjectInspectorFactory.writableStringObjectInspector; return returnOI; } @@ -268,11 +271,9 @@ public ObjectInspector initialize(ObjectInspector[] arguments) public Object evaluate(DeferredObject[] records) throws HiveException { if (breaks == null) { - breaks = (String) ((PrimitiveObjectInspector) argumentOIs[1]) - .getPrimitiveJavaObject(records[1].get()); + breaks = (String) ((PrimitiveObjectInspector) argumentOIs[1]).getPrimitiveJavaObject(records[1].get()); isRight = new Boolean( - (String) ((PrimitiveObjectInspector) argumentOIs[1]) - .getPrimitiveJavaObject(records[2].get())); + (String) ((PrimitiveObjectInspector) argumentOIs[1]).getPrimitiveJavaObject(records[2].get())); } RangeTree tree = TREES.get(breaks); diff --git a/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/ScaleUDF.java b/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/ScaleUDF.java index 6eb293f..b02db41 100644 --- a/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/ScaleUDF.java +++ b/RHive/inst/javasrc/src/com/nexr/rhive/hive/udf/ScaleUDF.java @@ -32,8 +32,8 @@ public class ScaleUDF extends GenericUDF { - Converter[] converters; - DoubleObjectInspector scaleFieldOI; + transient Converter[] converters; + transient DoubleObjectInspector scaleFieldOI; public Object evaluate(DeferredObject[] arguments) throws HiveException {