Skip to content

Commit

Permalink
documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
hongooi73 committed Aug 3, 2017
1 parent fecdd55 commit ff409b9
Show file tree
Hide file tree
Showing 17 changed files with 593 additions and 97 deletions.
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Collate: '00rxArgs.R'
'group_by_xdf.R'
'hdfs_utils.R'
'imports.R'
'is_xdf.R'
'join_utils.R'
'joins_unsupported.R'
'joins_xdf.R'
Expand Down
5 changes: 3 additions & 2 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ S3method(as_xdf,RxFileData)
S3method(as_xdf,RxXdfData)
S3method(as_xdf,default)
S3method(cbind,RxXdfData)
S3method(collect,RxFileData)
S3method(compute,RxFileData)
S3method(collect,RxXdfData)
S3method(compute,RxXdfData)
S3method(copy_to,RxHdfsFileSystem)
S3method(distinct,RxFileData)
S3method(distinct,grouped_tbl_xdf)
Expand Down Expand Up @@ -70,6 +70,7 @@ export(as_standard_xdf)
export(as_xdf)
export(cbind.RxXdfData)
export(clean_dplyrxdf_dir)
export(copy_to_hdfs)
export(copy_xdf)
export(delete_xdf)
export(doXdf)
Expand Down
55 changes: 46 additions & 9 deletions R/as_xdf.R
Original file line number Diff line number Diff line change
@@ -1,24 +1,56 @@
#' Detect and coerce to Xdf data source objects
#'
#' Functions to detect and coerce to Xdf data source objects.
#'
#' @param .data An R object that can be coerced to an Xdf data source. This includes another existing Xdf data source; see details below.
#' @param file The path/filename for the Xdf data file.
#' @param composite Whether to create a composite Xdf.
#' @param overwrite Whether to overwrite any existing file.
#' @param ... Other arguments to pass to \code{\link{rxDataStep}}.
#'
#' @details
#' The \code{as_xdf} function takes the object given by \code{.data} and imports its data into an Xdf file, returning a data source pointing to that file. The file can be either a standard or a \emph{composite} Xdf, as given by the \code{composite} argument. A composite Xdf is actually a directory containing data and metadata files; it can be manipulated by the RevoScaleR functions as if it were a single dataset.
#'
#' The \code{as_standard_xdf} and \code{as_composite_xdf} functions are shorthand for \code{as_xdf(*, composite=FALSE)} and \code{as_xdf(*, composite=TRUE)} respectively; they always create either a standard or composite Xdf. You can use this to switch an existing Xdf data source from one type of Xdf to the other. Note that Xdf files in HDFS must always be composite.
#'
#' Passing a \code{tbl_xdf} object to an \code{as} function will strip off the tbl information, returning a raw Xdf data source. This can be useful for resetting the beginning of a pipeline.
#'
#' The \code{file} argument gives the name of the Xdf file to create. If not specified, this is taken from the input data source where possible (for Xdf and file data sources, including text). Otherwise, a random name is generated. If no directory is specified, the file is created in the current working directory (if in the native filesystem) or in the user directory (in HDFS).
#'
#' You can use the \code{as} functions with any RevoScaleR data source, or otherwise with any R object that can be turned into a data frame. The resulting Xdf file will be created in the same filesystem as the input data source. If the input does not have a filesystem, for example if it is an in-database table or a data frame, the file is created in the native filesystem.
#'
#' @return
#' For the \code{as} functions, an Xdf data source object pointing to the created file. For the \code{is} functions, a TRUE/FALSE value.
#'
#' @seealso
#' \code{\link{as}}, \code{\link{is}}, \code{\link{inherits}},
#' \code{\link{rxDataStep}}, \code{\link{rxImport}}
#'
#' @rdname as_xdf
#' @export
as_composite_xdf <- function(.data, ...)
as_composite_xdf <- function(...)
{
as_xdf(.data, ..., composite=TRUE)
}


#' @rdname as_xdf
#' @export
as_standard_xdf <- function(.data, ...)
{
as_xdf(.data, ..., composite=FALSE)
}


#' @rdname as_xdf
#' @export
as_xdf <- function(.data, ...)
{
UseMethod("as_xdf")
}


#' @rdname as_xdf
#' @export
as_xdf.RxXdfData <- function(.data, file=NULL, composite=NULL, overwrite=TRUE, ...)
{
Expand All @@ -44,12 +76,13 @@ as_xdf.RxXdfData <- function(.data, file=NULL, composite=NULL, overwrite=TRUE, .
return(copy_xdf(.data, file))

out <- modifyXdf(.data, file=file, createCompositeSet=composite)
rxDataStep(.data, out, rowsPerRead=.dxOptions$rowsPerRead, overwrite=TRUE, ...)
rxDataStep(.data, out, rowsPerRead=.dxOptions$rowsPerRead, overwrite=overwrite, ...)
}


#' @rdname as_xdf
#' @export
as_xdf.RxFileData <- function(.data, file=NULL, composite=in_hdfs(.data), ...)
as_xdf.RxFileData <- function(.data, file=NULL, composite=in_hdfs(.data), overwrite=TRUE, ...)
{
if(in_hdfs(.data) && !composite)
stop("only composite Xdf files supported in HDFS")
Expand All @@ -59,14 +92,15 @@ as_xdf.RxFileData <- function(.data, file=NULL, composite=in_hdfs(.data), ...)
file <- validateXdfFile(file, composite)

out <- RxXdfData(file=file, fileSystem=rxGetFileSystem(.data), createCompositeSet=composite)
rxDataStep(.data, out, rowsPerRead=.dxOptions$rowsPerRead, overwrite=TRUE, ...)
rxDataStep(.data, out, rowsPerRead=.dxOptions$rowsPerRead, overwrite=overwrite, ...)
}


#' @rdname as_xdf
#' @export
as_xdf.RxDataSource <- function(.data, file=NULL, composite=NULL, ...)
as_xdf.RxDataSource <- function(.data, file=NULL, composite=NULL, overwrite=TRUE, ...)
{
hdfsDetected <- !is.na(isRemoteHdfsClient(FALSE)) || in_hdfs()
hdfsDetected <- !is.na(isRemoteHdfsClient(FALSE)) || in_hdfs(.data)
if(is.null(composite))
composite <- hdfsDetected

Expand All @@ -78,12 +112,15 @@ as_xdf.RxDataSource <- function(.data, file=NULL, composite=NULL, ...)
file <- validateXdfFile(file, composite)

out <- RxXdfData(file=file, fileSystem=rxGetFileSystem(.data), createCompositeSet=composite)
rxDataStep(.data, out, rowsPerRead=.dxOptions$rowsPerRead, overwrite=TRUE, ...)
if(in_hdfs(out))
rxDataStep(.data, out, rowsPerRead=.dxOptions$rowsPerRead, overwrite=overwrite, ...)
else local_exec(rxDataStep(.data, out, rowsPerRead=.dxOptions$rowsPerRead, overwrite=overwrite, ...))
}


#' @rdname as_xdf
#' @export
as_xdf.default <- function(.data, file=NULL, composite=NULL, ...)
as_xdf.default <- function(.data, file=NULL, composite=NULL, overwrite=TRUE, ...)
{
hdfsDetected <- !is.na(isRemoteHdfsClient(FALSE)) || in_hdfs()
if(is.null(composite))
Expand All @@ -96,5 +133,5 @@ as_xdf.default <- function(.data, file=NULL, composite=NULL, ...)
.data <- as.data.frame(.data)

out <- RxXdfData(file=file, fileSystem=RxNativeFileSystem(), createCompositeSet=composite)
local_exec(rxDataStep(.data, out, rowsPerRead=.dxOptions$rowsPerRead, overwrite=TRUE, ...))
local_exec(rxDataStep(.data, out, rowsPerRead=.dxOptions$rowsPerRead, overwrite=overwrite, ...))
}
32 changes: 32 additions & 0 deletions R/copy_to_hdfs.R
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
#' Upload a dataset to HDFS
#'
#' @param dest The destination source: an object of class \code{\link{RxHdfsFileSystem}}.
#' @param df A dataset: can be a filename, an Xdf data source object, another RevoScaleR data source, or anything that can be coerced to a data frame.
#' @param path The HDFS directory in which to store the uploaded dataset. Defaults to the user's HDFS home directory.
#' @param overwrite Whether to overwrite any existing file.
#' @param force_composite: Whether to force the uploaded dataset to be a composite Xdf file. See details below.
#' @param ... For \code{copy_to}, further arguments to \code{\link{rxHadoopCommand}}.
#'
#' @details
#' This is the RevoScaleR HDFS method for the dplyr \code{\link[dplyr]{copy_to}} function, for uploading data to a remote database/src. The method should work with any RevoScaleR data source, or with any R object that can be converted into a data frame. If the data is not already in Xdf format, it is first imported into Xdf, and then uploaded.
#'
#' The code will handle both the cases where you are logged into the edge node of a Hadoop/Spark cluster, and if you are a remote client. For the latter case, the uploading is a two-stage process: the data is first transferred to the native filesystem of the edge node, and then copied from the edge node into HDFS.
#'
#' @return
#' An Xdf data source object pointing to the uploaded data.
#'
#' @seealso
#' \code{\link{rxHadoopCopyFromClient}}, \code{\link{rxHadoopCopyFromLocal}},
#' \code{\link{collect}} and \code{\link{compute}} for downloading data from HDFS
#' @aliases copy_to
#' @rdname copy_to
#' @export
copy_to.RxHdfsFileSystem <- function(dest, df, path=NULL, overwrite=FALSE, force_composite=TRUE, ...)
{
Expand Down Expand Up @@ -37,6 +59,16 @@ copy_to.RxHdfsFileSystem <- function(dest, df, path=NULL, overwrite=FALSE, force
}


#' @details
#' The \code{copy_to_hdfs} function is a simple wrapper that avoids having to create an explicit filesystem object.
#' @rdname copy_to
#' @export
copy_to_hdfs <- function(...)
{
copy_to(RxHdfsFileSystem(), ...)
}


hdfsUpload <- function(src, dest, nativeTarget="/tmp", overwrite, isDir, ...)
{
# based on rxHadoopCopyFromClient
Expand Down
90 changes: 82 additions & 8 deletions R/hdfs_utils.R
Original file line number Diff line number Diff line change
@@ -1,3 +1,32 @@
#' Utilities for HDFS
#'
#' Functions for working with files in HDFS: directory listing; file copy, move and delete; directory create and delete; test for file/directory existence; check if in HDFS; expunge Trash.
#'
#' @param path A HDFS pathname.
#' @param full_path For \code{hdfs_dir}, whether to prepend the directory path to filenames to give a full path. If FALSE, only file names are returned.
#' @param include_dirs For \code{hdfs_dir}, if subdirectory names should be included. Always TRUE for non-recursive listings.
#' @param recursive For \code{hdfs_dir}, if the listing should recurse into subdirectories.
#' @param dirs_only For \code{hdfs_dir} if \emph{only} subdirectory names should be included.
#' @param pattern For \code{hdfs_dir}, an optional \link{regular expression}. Only file names that match will be returned.
#' @param ... For \code{hdfs_dir}, further switches, prefixed by \code{"-"}, to pass to the Hadoop \code{fs -ls} command. For other functions, further arguments to pass to \code{\link{rxHadoopCommand.}}
#' @param convert_backslashes Whether to convert any backslashes found in the input to forward slashes.
#' @param src,dest For \code{hdfs_file_copy} and \code{hdfs_file_move}, the source and destination paths.
#'
#' @details
#' These are utility functions to simplify working with files and directories in HDFS. For the most part, they wrap lower-level functions provided by RevoScaleR, which in turn wrap various Hadoop file system commands. They work with any file that is stored in HDFS, not just Xdf files.
#'
#' The \code{hdfs_dir} function is analogous to \code{dir} for the native filesystem. Like that function, and unlike \code{\link{rxHadoopListFiles}}, it returns a vector of filenames (\code{rxHadoopListFiles} returns a vector of \emph{printed output} from the \code{hadoop fs -ls} command, which is not quite the same thing). Again unlike \code{rxHadoopListFiles}, it does not print anything by default (the \code{print} method takes care of that).
#'
#' @return
#' \code{hdfs_dir} returns a vector of filenames, optionally with the full path attached.
#'
#' @seealso
#' \code{\link{dir}}, \code{link{dir.exists}}, \code{\link{file.exists}}, \code{\link{dir.create}},
#' \code{\link{file.copy}}, \code{\link{file.rename}}, \code{\link{file.remove}}, \code{\link{unlink}},
#' \code{\link{rxHadoopListFiles}}, \code{\link{rxHadoopFileExists}},
#' \code{\link{rxHadoopMakeDir}}, \code{\link{rxHadoopRemoveDir}},
#' \code{\link{rxHadoopCopy}}, \code{\link{rxHadoopMove}}, \code{\link{rxHadoopRemove}}
#' @rdname hdfs
#' @export
hdfs_dir <- function(path=".", ..., full_path=FALSE, include_dirs=FALSE, recursive=FALSE,
dirs_only=FALSE, pattern=NULL, convert_backslashes=TRUE)
Expand Down Expand Up @@ -25,6 +54,7 @@ hdfs_dir <- function(path=".", ..., full_path=FALSE, include_dirs=FALSE, recursi
output <- output[substr(output, 1, 1) == "d"]

#output <- gsub("^[^/]*(/.*)$", "\\1", output)
## NOTE: regex below will break on filenames with a space
output <- substr(output, regexpr("[^ ]+$", output), nchar(output))

if(!full_path && !recursive)
Expand All @@ -38,6 +68,7 @@ hdfs_dir <- function(path=".", ..., full_path=FALSE, include_dirs=FALSE, recursi
}


#' @rdname hdfs
#' @export
print.dplyrXdf_hdfs_dir <- function(x, ...)
{
Expand All @@ -50,6 +81,12 @@ print.dplyrXdf_hdfs_dir <- function(x, ...)
}


#' @details
#' \code{hdfs_dir_exists} and \code{hdfs_file_exists} test for the existence of a given directory and file, respectively. They are analogous to \code{dir.exists} and \code{file.exists} for the native filesystem.
#'
#' @return
#' \code{hdfs_dir_exists} and \code{hdfs_file_exists} return TRUE or FALSE depending on whether the directory or file exists.
#' @rdname hdfs
#' @export
hdfs_dir_exists <- function(path, convert_backslashes=TRUE)
{
Expand All @@ -60,6 +97,7 @@ hdfs_dir_exists <- function(path, convert_backslashes=TRUE)


# for symmetry with hdfs_dir_exists
#' @rdname hdfs
#' @export
hdfs_file_exists <- function(path, convert_backslashes=TRUE)
{
Expand All @@ -68,6 +106,12 @@ hdfs_file_exists <- function(path, convert_backslashes=TRUE)
}


#' @details
#' \code{hdfs_dir_create} and \code{hdfs_dir_remove} create and remove directories. They are analogous to \code{dir.create} and \code{unlink(recursive=TRUE)} for the native filesystem.
#'
#' @return
#' The other \code{hdfs_*} functions return TRUE or FALSE depending on whether the operation succeeded.
#' @rdname hdfs
#' @export
hdfs_dir_create <- function(path, ..., convert_backslashes=TRUE)
{
Expand All @@ -76,6 +120,7 @@ hdfs_dir_create <- function(path, ..., convert_backslashes=TRUE)
}


#' @rdname hdfs
#' @export
hdfs_dir_remove <- function(path, ..., convert_backslashes=TRUE)
{
Expand All @@ -84,6 +129,9 @@ hdfs_dir_remove <- function(path, ..., convert_backslashes=TRUE)
}


#' @details
#' \code{hdfs_file_copy} and \code{hdfs_file_move} copy and move files. They are analogous to \code{file.copy} and \code{file.rename} for the native filesystem. Unlike \code{\link{rxHadoopCopy}} and \code{\link{rxHadoopMove}}, they are vectorised in both \code{src} and \code{dest}.
#' @rdname hdfs
#' @export
hdfs_file_copy <- function(src, dest, ..., overwrite=TRUE, convert_backslashes=TRUE)
{
Expand All @@ -95,14 +143,7 @@ hdfs_file_copy <- function(src, dest, ..., overwrite=TRUE, convert_backslashes=T
}


#' @export
hdfs_file_remove <- function(path, ..., convert_backslashes=TRUE)
{
path <- convertBS(path, convert_backslashes)
rxHadoopRemove(path, ...)
}


#' @rdname hdfs
#' @export
hdfs_file_move <- function(src, dest, ..., convert_backslashes=TRUE)
{
Expand All @@ -114,13 +155,32 @@ hdfs_file_move <- function(src, dest, ..., convert_backslashes=TRUE)
}


#' @details
#' \code{hdfs_file_remove} deletes files. It is analogous to \code{file.remove} and \code{unlink} for the native filesystem.
#' @rdname hdfs
#' @export
hdfs_file_remove <- function(path, ..., convert_backslashes=TRUE)
{
path <- convertBS(path, convert_backslashes)
rxHadoopRemove(path, ...)
}


#' @details
#' \code{hdfs_expunge} empties the HDFS trash.
#' @rdname hdfs
#' @export
hdfs_expunge <- function()
{
rxHadoopCommand("fs -expunge")
}


#' @param obj For \code{in_hdfs}, An R object, typically a RevoScaleR data source object.
#'
#' @return
#' \code{in_hdfs} returns whether the given object is stored in HDFS. This will be TRUE for an Xdf data source or file data source in HDFS, or a Spark data source. Classes for the latter include \code{RxHiveData}, \code{RxParquetData} and \code{RxOrcData}. If no argument is specified, returns whether the default filesystem is HDFS.
#' @rdname hdfs
#' @export
in_hdfs <- function(obj=NULL)
{
Expand All @@ -131,6 +191,20 @@ in_hdfs <- function(obj=NULL)
}


#' Runs an expression in the local compute context
#'
#' @param expr An expression to execute. Normally something that depends on the compute context, such as \code{rxDataStep}.
#' @param context The compute context in which to execute \code{expr}. Defaults to local.
#'
#' @details
#' This function is useful when you are working with datasets in both the native filesystem and HDFS. The workhorse RevoScaleR function for data transformation, \code{rxDataStep}, will complain if you are in a distributed compute context such as \code{\link{RxHadoopMR}} or \code{\link{RxSpark}}, and you want to process a dataset in the native filesystem. You can wrap your code inside a \code{local_exec} call to switch to the local compute context temporarily, and then switch back when it has finished running.
#'
#' @return
#' The value of \code{expr}.
#'
#' @seealso
#' \code{\link{eval}}
#' @rdname local_exec
#' @export
local_exec <- function(expr, context="local")
{
Expand Down
37 changes: 37 additions & 0 deletions R/is_xdf.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#' Detect and coerce to Xdf data source objects
#'
#' Functions to detect and coerce to Xdf data source objects.
#'
#' @param x An R object.
#'
#' @details
#' The \code{is_xdf} function returns TRUE if \code{x} is an Xdf data source object; ie, it inherits from the \code{RxXdfData} class. This includes both raw Xdf data sources and \code{tbl_xdf} objects as created by dplyrXdf. The \code{is_composite_xdf} function returns TRUE if \code{x} is a \emph{composite} Xdf data source.
#'
#' Detecting whether an object is a composite Xdf can be tricky and \code{is_composite_xdf} goes through a few steps to do this. If \code{x} has a non-NULL \code{createCompositeSet} slot, then that value is returned. Otherwise, it checks whether the \code{file} slot refers to an existing directory, whose name does \emph{not} have an extension (that is, \code{"foo"} qualifies as a valid filename for a composite Xdf, but not \code{"foo.xdf"}). This is necessary because of the semantics of \code{rxDataStep}.
#'
#' To remove any ambiguity, it's recommended that you always explicitly specify the \code{createCompositeSet} argument when creating an Xdf data source object (objects created by dplyrXdf will always do this).
#'
#' @rdname as_xdf
#' @export
is_xdf <- function(x)
{
inherits(x, "RxXdfData")
}

#' @rdname as_xdf
#' @export
is_composite_xdf <- function(x)
{
if(!is_xdf(x))
return(FALSE)

composite <- x@createCompositeSet
if(!is.null(composite))
return(composite)

# check if this file refers to an existing directory
file <- x@file
if(in_hdfs(x))
return(tools::file_ext(file) == "" && hdfs_dir_exists(file))
else return(tools::file_ext(file) == "" && dir.exists(file))
}
Loading

0 comments on commit ff409b9

Please sign in to comment.