diff --git a/DESCRIPTION b/DESCRIPTION index 30bcb74..0a44a98 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -60,12 +60,13 @@ Collate: 'MapServerMap-class.R' 'MapServerWeb-class.R' 'ProcessGraph-class.R' - 'Product-class.R' + 'Product-class.R' 'R2Generic-class.R' 'data.R' 'processes.R' 'Server-class.R' 'Service-class.R' + 'Udf-class.R' 'User-class.R' 'View-class.R' 'api_processes.R' @@ -75,5 +76,4 @@ Collate: 'api_job.R' 'api_data.R' 'api.R' - 'prepare_UDF.R' 'serializer_proxy.R' diff --git a/NAMESPACE b/NAMESPACE index 98653fd..48d52ea 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -21,6 +21,7 @@ export(R2Generic) export(Service) export(SpaceView) export(TimeView) +export(UdfTransaction) export(User) export(View) export(code) @@ -28,6 +29,7 @@ export(createServerInstance) export(create_dimensionality) export(create_dimensionality_modifier) export(exists.Service) +export(exists.Udf) export(exists.User) export(getCollectionFromImageryStatement) export(is.MapServerConfig) @@ -37,12 +39,11 @@ export(is.MapServerWeb) export(is.Process) export(is.Product) export(is.Service) +export(is.Udf) export(is.User) export(loadSentinel2Data) -export(raster_collection_export) export(read_legend) export(serializer_proxy) -export(udf_export) export(write_generics) exportMethods(crs) exportMethods(extent) diff --git a/R/Collection-class.R b/R/Collection-class.R index 1633bd5..4e1d9f2 100644 --- a/R/Collection-class.R +++ b/R/Collection-class.R @@ -4,6 +4,7 @@ #' intermediate results from one process to an other (the result of a process is a Collection). #' #' @field dimensions A dimensionality object containing information about the existence of dimensions +#' @field space A tibble containing an index and the geometry #' @importFrom R6 R6Class #' @export Collection <- R6Class( @@ -657,3 +658,33 @@ crs.Collection = function(x, ...) { } +is.st_raster = function(x) { + dims = x$dimensions + return(dims$space && dims$time && dims$raster) +} +# what about multiband / attribute ? + +is.st_feature = function(x) { + dims = x$dimensions + return(dims$space && dims$time && dims$feature) +} + +is.raster = function(x) { + dims = x$dimensions + return(dims$space && dims$raster && !dims$time) +} + +is.feature = function(x) { + dims = x$dimensions + return(dims$space && dims$feature && !dims$time) +} + +is.timeseries = function(x) { + dims = x$dimensions + return(dims$time && !dims$space && !dims$raster && !dims$feature) +} + +is.scalar = function(x) { + dims = x$dimensions + return(!dims$time && !dims$space && !dims$raster && !dims$feature) +} \ No newline at end of file diff --git a/R/Job-class.R b/R/Job-class.R index a7035c4..7b5007a 100644 --- a/R/Job-class.R +++ b/R/Job-class.R @@ -6,12 +6,16 @@ #' #' @field job_id The unique identifier of the job #' @field status The current status in the job lifecycle -#' @field process_graph graph of nested processes that is executable -#' @field view Spatio-Temporal Extent to be used for the calculation +#' @field process_graph graph of nested processes that is executable (ExecutableProcess) +#' @field view Spatio-Temporal Extent to be used for the calculation (currently not in use) #' @field submitted Timestamp when the job was submitted to the server #' @field user_id The user who owns the job #' @field consumed_credits For accounting and billing the amount of credits consumed by this job -#' @field filePath The system filepath that links to the stored JSON process graph +#' @field last_update Timestamp when the job was last updated +#' @field output output configuration like output$format copied from the process graph (pg) +#' @field results Contains the result of the process_graph after execution (Collection) +#' @field persistent Whether or not the job is stored in database +#' @field output list containing the output configuration like format (or additional GDAL commands) #' #' @include Process-class.R #' @importFrom R6 R6Class @@ -31,9 +35,9 @@ Job <- R6Class( last_update=NULL, user_id=NULL, consumed_credits=NULL, - output=NULL, - results = NULL, - persistent = FALSE, + output=NULL, # output configuration like output$format copied from the process graph (pg) + results = NULL, # contains the results of the process_graph after execution + persistent = FALSE, # whether or not the job is stored in data base # functions ---- initialize = function(job_id=NULL,process_graph=NULL,user_id = NULL) { @@ -304,4 +308,16 @@ exists.Job = function(job_id) { } else { return(FALSE) } +} + +syncJobId = function() { + randomString = paste("SYNC",createAlphaNumericId(n=1,length=11),sep="") + + + if (exists.Job(randomString)) { + # if id exists get a new one (recursive) + return(syncJobId()) + } else { + return(randomString) + } } \ No newline at end of file diff --git a/R/R2Generic-class.R b/R/R2Generic-class.R index ce53485..54dcdf9 100644 --- a/R/R2Generic-class.R +++ b/R/R2Generic-class.R @@ -85,3 +85,25 @@ R2Generic = R6Class( ) ) + +#' Write generics to disk +#' +#' @description This function writes rasters in a consistent directory structure to disk in generic GeoTIFF formats. +#' It takes as input an object of class `Collection` and the path where the files are to be written to disk. Once the files +#' have been written to disk, it can be loaded into a `stars` object by the user after which custom functions could +#' be applied to it. +#' +#' @param collection_obj Object of class Collection as produced in the previous step while executing the process graph before encountering the UDF +#' @param dir_name Path where the generics are to be written to disk. This could be obtained from the UDF process if it is defined by the user while registering it. +#' +#' @export +#' +write_generics = function(collection_obj, dir_name = "disk") #dir_name could be obtained if it is defined while registering the UDF +{ + scene_table = collection_obj$getData() + R2G_obj = R2Generic$new(scenes = scene_table) + R2G_obj$write_scenes(dir_name = dir_name) + + R2G_obj$legend_to_disk(dir_name) + +} diff --git a/R/Server-class.R b/R/Server-class.R index 83eaa55..a142a01 100644 --- a/R/Server-class.R +++ b/R/Server-class.R @@ -32,10 +32,11 @@ OpenEOServer <- R6Class( sqlite.path = NULL, udf_transactions.path = NULL, + udf_cleanup = TRUE, api.port = NULL, host = NULL, - baseserver.url = "http:localhost:8000/api/", + baseserver.url = "http://localhost:8000/api/", mapserver.url = NULL, #assuming here a url, if not specified the backend is probably started with docker-compose processes = NULL, @@ -190,6 +191,16 @@ OpenEOServer <- R6Class( )") } + if (!dbExistsTable(con,"udf")) { + dbExecute(con, "create table udf ( + udf_id text, + job_id text, + start_date datetime default current_timestamp, + end_date datetime, + status text + )") + } + dbDisconnect(con) }, @@ -264,6 +275,7 @@ OpenEOServer <- R6Class( }, error = function(e) { cat(str(e)) }, finally={ + removeJobsUdfData(job) logToConsole() }) diff --git a/R/Udf-class.R b/R/Udf-class.R new file mode 100644 index 0000000..e72590e --- /dev/null +++ b/R/Udf-class.R @@ -0,0 +1,362 @@ +#' @export +UdfTransaction <- R6Class( + "UdfTransaction", + inherit = DatabaseEntity, + # public ---- + public = list( + # attributes ==== + udf_id = NULL, + job_id = NULL, + start_date = NULL, + end_date = NULL, + status = NULL, + + script = NULL, + + # functions ==== + initialize = function(udf_id = NA) { + self$udf_id = udf_id + self$job_id = NA + self$start_date = NA + self$end_date = NA + self$status = NA + + invisible(self) + }, + + load = function() { + udf_id = self$udf_id + + # check if exists + if (exists.Udf(udf_id)) { + # load information from db + con = openeo.server$getConnection() + udf_info = dbGetQuery(con, "select * from udf where udf_id = :id" + ,param = list(id=udf_id)) + dbDisconnect(con) + + self$udf_id = udf_info$udf_id + self$job_id = udf_info$job_id + self$start_date = udf_info$start_date + self$end_date = udf_info$end_date + self$status = udf_info$status + + invisible(self) + } else { + stop(paste("Cannot find udf with id:",udf_id)) + } + }, + + store = function() { + + if (is.na(self$udf_id)) { + # create new id + self$udf_id = private$newUdfId() + self$start_date = format(now(),format="%Y-%m-%d %H:%M:%S") + + udf_transaction_folder = self$workspace + + if (!dir.exists(udf_transaction_folder)) { + dir.create(udf_transaction_folder,recursive = TRUE) + } + + results.file.path = self$results_workspace + if (!dir.exists(results.file.path)) { + dir.create(results.file.path,recursive = TRUE) + } + + insertQuery = "insert into udf ( + udf_id, job_id, status, start_date + ) VALUES ( + :uid, :jid, :status, :start + )" + + tryCatch({ + con = openeo.server$getConnection() + dbExecute(con, insertQuery, param=list( + uid = self$udf_id, + jid = self$job_id, + status = "started", + start = self$start_date + )) + }, + finally= { + dbDisconnect(con) + } + ) + + + } else { + # update values + + updateQuery = "update udf + set end_date = :end, status = :status + where udf_id = :uid + " + con = openeo.server$getConnection() + dbExecute(con, updateQuery, param=list( + end = self$end_date, + status = self$status, + uid = self$udf_id + )) + dbDisconnect(con) + + } + invisible(self) + }, + + remove = function() { + udf_id = self$udf_id + + con = openeo.server$getConnection() + deleteQuery = "delete from udf where udf_id = :uid" + dbExecute(con, deleteQuery, param=list(uid=udf_id)) + dbDisconnect(con) + + + if (dir.exists(self$workspace)) { + unlink(self$workspace, recursive = TRUE) + } + }, + + clearExportData = function() { + if (openeo.server$udf_cleanup) { + # deletes all export file except the results + files = list.files(path=self$workspace, recursive = TRUE,full.names = TRUE) + unlink(files[!grepl("result",files)],recursive = TRUE) + + dirs=list.dirs(self$workspace) + unlink(dirs[!grepl("result",dirs)][-1], recursive = TRUE) # -1 removes the first argument (the transaction folder) + } + }, + + prepareExportData = function(collection, export_type="file") { + if (!all(export_type %in% c("file","json"))) { + stop("Can only support file and json based data export") + } + + if (! is.Collection(collection)) { + stop("Passed object is not a Collection object") + } + + if ("json" %in% export_type) { + json = private$createJsonRequest(collection = collection, strategy = NULL, language = "R") + # TODO tiling, coordinate HTTP requests, stitch everything together + # for now just write to disk + write(toJSON(json, auto_unbox=TRUE,pretty = TRUE), + paste(self$workspace,"udf_request.json",sep="/")) + } + + if ("file" %in% export_type) { + write_generics(collection,dir_name = self$workspace) + } + + invisible(self) + } + ), + # active ---- + active = list( + workspace = function() { + if (!is.null(self$udf_id)) { + return(paste(openeo.server$udf_transactions.path,self$udf_id,sep="/")) + } else { + stop("Uninitialized Udf object: no id.") + } + }, + results_workspace = function() { + if (!is.null(self$udf_id)) { + return(paste(self$workspace,"results",sep="/")) + } else { + stop("Uninitialized Udf object: no id.") + } + } + ), + # private ---- + private = list( + # attributes ==== + # functions ==== + newUdfId = function() { + randomString = paste("U",createAlphaNumericId(n=1,length=12),sep="") + + + if (exists.Udf(randomString)) { + # if id exists get a new one (recursive) + return(private$newUdfId()) + } else { + return(randomString) + } + }, + + # Prepares the collection data for the UDF service request + # + # Transforms the data contained in a Collection into a JSON representation. It will be passed along the code script URL as data + # to the specified UDF REST processing service. Currently implemented only for raster timeserires collections. + # + # @param collection Collection object + # @param strategy the tiling strategy (not implemented yet) + # @return list that can be transformed into "UdfData" JSON + exportCollection.json = function(collection,strategy) { + # TODO prepare data with some sort of tiling strategy + + if (is.st_raster(collection)) { + udf_data = list() + udf_data[["proj"]] = as.character(collection$getGlobalSRS()) + udf_data[["raster_collection_tiles"]] = list() + + udf_data[["raster_collection_tiles"]] = append(udf_data[["raster_collection_tiles"]],private$raster_collection_export(collection)) + return(udf_data) + } else { + stop("Not yet implemented") + } + + }, + + createJsonRequest = function(collection,strategy=NULL,language="R") { + # TODO remove the hard coded backend selection + request = list( + code = list( + language = language, + source = readChar(self$script, file.info(self$script)$size) + ), + data = private$exportCollection.json(collection = collection, strategy = strategy) + ) + + return(request) + }, + + # Creates RasterCollectionTile representation + # + # Subsets and groups Collection data by band and space in order to create the specified UDF RasterCollectionTile JSON output. + # + # @param collection Collection object + # @return list that can be transformed into "UdfData" JSON + raster_collection_export = function(collection) { + if (! is.Collection(collection)) { + stop("Passed object is not a Collection object") + } + + data = collection$getData() + extents = collection$space + + modified = data %>% group_by(band,space) %>% dplyr::summarise( + exported = tibble(band,space,data,time) %>% (function(x,...) { + raster_collection_tiles = list() + raster_collection_tiles[["id"]] = "test1" + + raster_collection_tiles[["wavelength"]] = unique(x[["band"]]) + + # select sf polygons by ids stored in the data table, then give bbox from all of the sf + b = st_bbox(extents[x %>% dplyr::select(space) %>% unique() %>% unlist() %>% unname(),]) + raster_collection_tiles[["extent"]] = list( + north = b[["ymax"]], + south = b[["ymin"]], + west = b[["xmin"]], + east = b[["xmax"]], + height = yres(x[[1,"data"]]), + width = xres(x[[1,"data"]]) + ) + + # times + times = x[["time"]] + tres = round(median(diff(times))) + raster_collection_tiles[["start_times"]] = strftime(times, "%Y-%m-%dT%H:%M:%S", usetz = TRUE) + raster_collection_tiles[["end_times"]] = strftime(times+tres, "%Y-%m-%dT%H:%M:%S", usetz = TRUE) + + # fetch data from raster files as matrix (store as list first other wise it messes up the matrix + # structure by creating row/col as rows for each attribute) + raster_collection_tiles[["data"]] = x %>% apply(MARGIN=1,FUN= function(row) { + + return(list(raster::values(x=row$data, format="matrix"))) + + }) + + # unlist it again + raster_collection_tiles[["data"]] = lapply(raster_collection_tiles[["data"]], function(arr_list) { + arr_list[[1]] + }) + + return(list(raster_collection_tiles)) + }) + ) + return(modified[["exported"]]) + } + ) +) + +# statics ==== + +#' @export +exists.Udf = function(udf_id) { + if (nchar(udf_id) == 13) { + con = openeo.server$getConnection() + result = dbGetQuery(con, "select count(*) from udf where udf_id = :id" + ,param = list(id=udf_id)) == 1 + dbDisconnect(con) + return(result) + } else { + return(FALSE) + } +} + +#' @export +is.Udf = function(obj) { + return("Udf" %in% class(obj)) +} + +udfIdsByJobId = function(jobid) { + tryCatch({ + query = "select udf_id from udf where job_id = :jid" + + db = openeo.server$getConnection() + result = dbGetQuery(db, query, param = list(jid=jobid)) + return(result) + + },finally = { + dbDisconnect(db) + }) +} + +removeJobsUdfData = function(job) { + if (openeo.server$udf_cleanup) { + udfids = udfIdsByJobId(job$job_id) + + if (length(udfids)>0) { + for (id in udfids) { + udf = Udf$new() + udf$udf_id = id + udf$load() + udf$remove() + udf=NULL + } + } + } +} + +prepare_udf_transaction = function(user,script,job_id = NULL) { + udf_transaction = UdfTransaction$new() + + # TODO mayb script is URL + isURL = FALSE + + # TODO implement the URL check and also download script if necessary + if (isURL) { + # download the script and store it in the user workspace + script.url = script + file.path = script.url + } else { + # then we need to make the script accessable as URL + file.path = paste(user$workspace,"files", script, sep="/") + } + + udf_transaction$script = file.path + + if (!is.null(job_id)) { + udf_transaction$job_id = job_id + } else { + udf_transaction$job_id = "sync_job" + } + + udf_transaction$store() + + return(udf_transaction) +} \ No newline at end of file diff --git a/R/api.R b/R/api.R index 5c2203c..d67ac6a 100644 --- a/R/api.R +++ b/R/api.R @@ -147,6 +147,7 @@ tryCatch({ job = Job$new(process_graph=process_graph,user_id = req$user$user_id) + job$job_id = syncJobId() job = job$run() @@ -157,6 +158,8 @@ return(.create_output(res = res,result = job$results, format = format)) }, error= function(e) { return(openEO.R.Backend:::error(res=res, status = 500, msg = e)) + }, finally = { + removeJobsUdfData(job) }) } diff --git a/R/prepare_UDF.R b/R/prepare_UDF.R deleted file mode 100644 index 9cfccb9..0000000 --- a/R/prepare_UDF.R +++ /dev/null @@ -1,159 +0,0 @@ -#' Write generics to disk -#' -#' @description This function writes rasters in a consistent directory structure to disk in generic GeoTIFF formats. -#' It takes as input an object of class `Collection` and the path where the files are to be written to disk. Once the files -#' have been written to disk, it can be loaded into a `stars` object by the user after which custom functions could -#' be applied to it. -#' -#' @param collection_obj Object of class Collection as produced in the previous step while executing the process graph before encountering the UDF -#' @param dir_name Path where the generics are to be written to disk. This could be obtained from the UDF process if it is defined by the user while registering it. -#' -#' @export -#' -write_generics = function(collection_obj, dir_name = "disk") #dir_name could be obtained if it is defined while registering the UDF -{ - scene_table = collection_obj$getData() - R2G_obj = R2Generic$new(scenes = scene_table) - R2G_obj$write_scenes(dir_name = dir_name) - - R2G_obj$legend_to_disk(dir_name) - -} - -#' Prepares the collection data for the UDF service request -#' -#' Transforms the data contained in a Collection into a JSON representation. It will be passed along the code script URL as data -#' to the specified UDF REST processing service. Currently implemented only for raster timeserires collections. -#' -#' @param collection Collection object -#' @param strategy the tiling strategy (not implemented yet) -#' @return list that can be transformed into "UdfData" JSON -#' @export -udf_export = function(collection,strategy) { - if (! is.Collection(collection)) { - stop("Passed object is not a Collection object") - } - - # TODO prepare some sort of tiling strategy - - if (collection$dimensions$raster && collection$dimensions$space && collection$dimensions$time) { - udf_data = list() - udf_data[["proj"]] = as.character(collection$getGlobalSRS()) - udf_data[["raster_collection_tiles"]] = list() - - udf_data[["raster_collection_tiles"]] = append(udf_data[["raster_collection_tiles"]],raster_collection_export(collection)) - return(udf_data) - } else { - stop("Not yet implemented") - } - -} - -udf_request = function(collection,strategy=NULL,udf_transaction) { - # TODO remove the hard coded backend selection - request = list( - code = list( - language = "R", - source = readChar(udf_transaction$script, file.info(udf_transaction$script)$size) - ), - data = udf_export(collection = collection, strategy = strategy) - ) - - return(request) -} - -#' Creates RasterCollectionTile representation -#' -#' Subsets and groups Collection data by band and space in order to create the specified UDF RasterCollectionTile JSON output. -#' -#' @param collection Collection object -#' @return list that can be transformed into "UdfData" JSON -#' @export -raster_collection_export = function(collection) { - if (! is.Collection(collection)) { - stop("Passed object is not a Collection object") - } - - data = collection$getData() - extents = collection$space - - modified = data %>% group_by(band,space) %>% dplyr::summarise( - exported = tibble(band,space,data,time) %>% (function(x,...) { - raster_collection_tiles = list() - raster_collection_tiles[["id"]] = "test1" - - raster_collection_tiles[["wavelength"]] = unique(x[["band"]]) - - # select sf polygons by ids stored in the data table, then give bbox from all of the sf - b = st_bbox(extents[x %>% dplyr::select(space) %>% unique() %>% unlist() %>% unname(),]) - raster_collection_tiles[["extent"]] = list( - north = b[["ymax"]], - south = b[["ymin"]], - west = b[["xmin"]], - east = b[["xmax"]], - height = yres(x[[1,"data"]]), - width = xres(x[[1,"data"]]) - ) - - # times - times = x[["time"]] - tres = round(median(diff(times))) - raster_collection_tiles[["start_times"]] = strftime(times, "%Y-%m-%dT%H:%M:%S", usetz = TRUE) - raster_collection_tiles[["end_times"]] = strftime(times+tres, "%Y-%m-%dT%H:%M:%S", usetz = TRUE) - - # fetch data from raster files as matrix (store as list first other wise it messes up the matrix - # structure by creating row/col as rows for each attribute) - raster_collection_tiles[["data"]] = x %>% apply(MARGIN=1,FUN= function(row) { - - return(list(raster::values(x=row$data, format="matrix"))) - - }) - - # unlist it again - raster_collection_tiles[["data"]] = lapply(raster_collection_tiles[["data"]], function(arr_list) { - arr_list[[1]] - }) - - return(list(raster_collection_tiles)) - }) - ) - return(modified[["exported"]]) -} - -prepare_udf_transaction = function(user,script) { - # TODO mayb script is URL - isURL = FALSE - - # /udf// - transaction_id = createAlphaNumericId(n=1,length=18) - - if (isURL) { - # download the script and store it in the user workspace - script.url = script - } else { - # then we need to make the script accessable as URL - file.path = paste(user$workspace,"files", script, sep="/") - } - - - udf_transaction_folder = paste(openeo.server$udf_transactions.path,transaction_id,sep="/") - - if (!dir.exists(udf_transaction_folder)) { - dir.create(udf_transaction_folder,recursive = TRUE) - } - - results.file.path = paste(udf_transaction_folder, "results", sep = "/") - if (!dir.exists(results.file.path)) { - dir.create(results.file.path,recursive = TRUE) - } - - udf_transaction = list( - id = transaction_id, - script = file.path, - input = udf_transaction_folder, - result = results.file.path - ) - class(udf_transaction) = "udf_transaction" - - return(udf_transaction) -} \ No newline at end of file diff --git a/R/processes.R b/R/processes.R index 4c64099..42b8d8d 100644 --- a/R/processes.R +++ b/R/processes.R @@ -320,19 +320,19 @@ aggregate_time = Process$new( # else we need to download it first. # prepare paths - udf_transaction = prepare_udf_transaction(user,script) + udf_transaction = prepare_udf_transaction(user,script,job$job_id) + udf_transaction$prepareExportData(collection,export_type=c("json","file")) - # export data - write_generics(collection,dir_name = udf_transaction$input) - #testing - write(toJSON(udf_request(collection=collection,udf_transaction = udf_transaction),auto_unbox=TRUE,pretty = TRUE),paste(udf_transaction$input,"udf_request.json",sep="/")) - + # # export data + # write_generics(collection,dir_name = udf_transaction$workspace) + # #testing + # write(toJSON(udf_request(collection=collection,udf_transaction = udf_transaction),auto_unbox=TRUE,pretty = TRUE),paste(udf_transaction$workspace,"udf_request.json",sep="/")) + # oldwd = getwd() tryCatch({ - setwd(udf_transaction$input) - + setwd(udf_transaction$workspace) source(file = udf_transaction$script, local = TRUE) # Now read back results present at results.file.path # To be implemented once classes for data I/O have been re-written @@ -340,24 +340,30 @@ aggregate_time = Process$new( # -> modification is applied afterwards # TODO replace code with something that is read from a global meta data file - result.collection = read_legend(legend.path = paste(udf_transaction$result, "out_legend.csv", sep = "/"), code = "11110") + result.collection = read_legend(legend.path = paste(udf_transaction$results_workspace, "out_legend.csv", sep = "/"), code = "11110") + + udf_transaction = udf_transaction$load() + udf_transaction$status = "finished" + udf_transaction$end_date = format(now(),format="%Y-%m-%d %H:%M:%S") + udf_transaction$store() return(result.collection) }, error = function(e) { cat(paste("ERROR:",e)) + udf_transaction = udf_transaction$load() + udf_transaction$status = "error" + udf_transaction$end_date = NA + udf_transaction$store() },finally = { - # cleanup at this point the results should been written to disk already, clear export! - files = list.files(path=".", recursive = TRUE,full.names = TRUE) - unlink(files[!grepl("result",files)],recursive = TRUE) - - dirs=list.dirs(".") - unlink(dirs[!grepl("result",dirs)][-1], recursive = TRUE) # -1 removes the first argument (the transaction folder) + udf_transaction$clearExportData() setwd(oldwd) }) + + } ) @@ -392,34 +398,35 @@ apply_pixel = Process$new( # fla: if the file is hosted at this backend # else we need to download it first. # prepare paths - udf_transaction = prepare_udf_transaction(user,script) - - # file.path = paste(user$workspace,"files", script, sep="/") + udf_transaction = prepare_udf_transaction(user,script,job$job_id) - # export data - write_generics(collection,dir_name = udf_transaction$input) + udf_transaction$prepareExportData(collection,export_type="file") oldwd = getwd() tryCatch({ - setwd(udf_transaction$input) + setwd(udf_transaction$workspace) source(file = udf_transaction$script, local = TRUE) # TODO replace code with something that is read from a global meta data file - result.collection = read_legend(legend.path = paste(udf_transaction$result, "out_legend.csv", sep = "/"), code = "11110") + result.collection = read_legend(legend.path = paste(udf_transaction$results_workspace, "out_legend.csv", sep = "/"), code = "11110") + + udf_transaction = udf_transaction$load() + udf_transaction$status = "finished" + udf_transaction$end_date = format(now(),format="%Y-%m-%d %H:%M:%S") + udf_transaction$store() return(result.collection) }, error = function(e) { cat(paste("ERROR:",e)) - },finally= function(){ - # cleanup at this point the results should been written to disk already, clear export! - files = list.files(path=".", recursive = TRUE,full.names = TRUE) - unlink(files[!grepl("result",files)],recursive = TRUE) - - dirs=list.dirs(".") - unlink(dirs[!grepl("result",dirs)][-1], recursive = TRUE) # -1 removes the first argument (the transaction folder) + udf_transaction = udf_transaction$load() + udf_transaction$status = "error" + udf_transaction$end_date = NA + udf_transaction$store() + },finally= function() { + udf_transaction$clearExportData() setwd(oldwd) }) diff --git a/doc/Main.jpg b/doc/Main.jpg index ba41ebf..b9f150b 100644 Binary files a/doc/Main.jpg and b/doc/Main.jpg differ diff --git a/man/Collection.Rd b/man/Collection.Rd index 176f6b1..409a782 100644 --- a/man/Collection.Rd +++ b/man/Collection.Rd @@ -16,6 +16,8 @@ intermediate results from one process to an other (the result of a process is a \describe{ \item{\code{dimensions}}{A dimensionality object containing information about the existence of dimensions} + +\item{\code{space}}{A tibble containing an index and the geometry} }} \keyword{datasets} diff --git a/man/Job.Rd b/man/Job.Rd index 4e735ea..f01d176 100644 --- a/man/Job.Rd +++ b/man/Job.Rd @@ -20,9 +20,9 @@ graph. \item{\code{status}}{The current status in the job lifecycle} -\item{\code{process_graph}}{graph of nested processes that is executable} +\item{\code{process_graph}}{graph of nested processes that is executable (ExecutableProcess)} -\item{\code{view}}{Spatio-Temporal Extent to be used for the calculation} +\item{\code{view}}{Spatio-Temporal Extent to be used for the calculation (currently not in use)} \item{\code{submitted}}{Timestamp when the job was submitted to the server} @@ -30,7 +30,15 @@ graph. \item{\code{consumed_credits}}{For accounting and billing the amount of credits consumed by this job} -\item{\code{filePath}}{The system filepath that links to the stored JSON process graph} +\item{\code{last_update}}{Timestamp when the job was last updated} + +\item{\code{output}}{output configuration like output$format copied from the process graph (pg)} + +\item{\code{results}}{Contains the result of the process_graph after execution (Collection)} + +\item{\code{persistent}}{Whether or not the job is stored in database} + +\item{\code{output}}{list containing the output configuration like format (or additional GDAL commands)} }} \keyword{datasets} diff --git a/man/raster_collection_export.Rd b/man/raster_collection_export.Rd deleted file mode 100644 index 727321e..0000000 --- a/man/raster_collection_export.Rd +++ /dev/null @@ -1,17 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/prepare_UDF.R -\name{raster_collection_export} -\alias{raster_collection_export} -\title{Creates RasterCollectionTile representation} -\usage{ -raster_collection_export(collection) -} -\arguments{ -\item{collection}{Collection object} -} -\value{ -list that can be transformed into "UdfData" JSON -} -\description{ -Subsets and groups Collection data by band and space in order to create the specified UDF RasterCollectionTile JSON output. -} diff --git a/man/udf_export.Rd b/man/udf_export.Rd deleted file mode 100644 index 6da66b9..0000000 --- a/man/udf_export.Rd +++ /dev/null @@ -1,20 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/prepare_UDF.R -\name{udf_export} -\alias{udf_export} -\title{Prepares the collection data for the UDF service request} -\usage{ -udf_export(collection, strategy) -} -\arguments{ -\item{collection}{Collection object} - -\item{strategy}{the tiling strategy (not implemented yet)} -} -\value{ -list that can be transformed into "UdfData" JSON -} -\description{ -Transforms the data contained in a Collection into a JSON representation. It will be passed along the code script URL as data -to the specified UDF REST processing service. Currently implemented only for raster timeserires collections. -} diff --git a/man/write_generics.Rd b/man/write_generics.Rd index 232dcc7..99fad42 100644 --- a/man/write_generics.Rd +++ b/man/write_generics.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/prepare_UDF.R +% Please edit documentation in R/R2Generic-class.R \name{write_generics} \alias{write_generics} \title{Write generics to disk}