From c7d8bab0290ecfcb0f8fb4c3ee0b3fd147e1ff19 Mon Sep 17 00:00:00 2001 From: Matthias Mohr Date: Tue, 30 Jul 2024 01:47:24 +0200 Subject: [PATCH] Implement Batch Jobs using Google Drive --- src/api/files.js | 3 +- src/api/jobs.js | 8 +- src/api/services.js | 2 +- src/api/worker/batchjob.js | 105 +++++++++++++----- src/api/worker/sync.js | 6 +- src/api/worker/webservice.js | 6 +- src/formats/bitmap.js | 5 +- src/formats/fileformat.js | 12 ++- src/formats/gif.js | 3 +- src/formats/gtiff.js | 41 ++++---- src/models/jobstore.js | 187 +++++++++++++++++++++++++++++++++ src/models/userstore.js | 8 +- src/processes/utils/results.js | 77 +++++++++++--- src/processgraph/jsonschema.js | 4 +- src/utils/processingcontext.js | 50 +++++++-- src/utils/servercontext.js | 16 ++- src/utils/utils.js | 10 ++ 17 files changed, 458 insertions(+), 85 deletions(-) diff --git a/src/api/files.js b/src/api/files.js index cf93080..bc9d2e0 100644 --- a/src/api/files.js +++ b/src/api/files.js @@ -94,8 +94,7 @@ export default class FilesAPI { size: newFileStat.size, modified: Utils.getISODateTime(newFileStat.mtime) }); - } - catch (e) { + } catch (e) { if (this.context.debug) { console.error(e); } diff --git a/src/api/jobs.js b/src/api/jobs.js index 28ac637..50baa1f 100644 --- a/src/api/jobs.js +++ b/src/api/jobs.js @@ -54,6 +54,9 @@ export default class JobsAPI { if (!req.user._id) { throw new Errors.AuthenticationRequired(); } + + // Update the task status + this.context.processingContext(req.user).startTaskMonitor(); } async getJobs(req, res) { @@ -200,6 +203,9 @@ export default class JobsAPI { } const makeStorageUrl = obj => { + if (Utils.isUrl(obj.href)) { + return obj; + } obj.href = API.getUrl("/storage/" + job.token + "/" + obj.href); return obj; }; @@ -246,7 +252,7 @@ export default class JobsAPI { if (this.storage.isFieldEditable(key)) { switch(key) { case 'process': { - const pg = new ProcessGraph(req.body.process, this.context.processingContext(req.user)); + const pg = new ProcessGraph(req.body.process, this.context.processingContext(req.user, job)); pg.allowUndefinedParameters(false); promises.push(pg.validate()); break; diff --git a/src/api/services.js b/src/api/services.js index ccbf2d7..550a000 100644 --- a/src/api/services.js +++ b/src/api/services.js @@ -118,7 +118,7 @@ export default class ServicesAPI { if (this.storage.isFieldEditable(key)) { switch(key) { case 'process': { - const pg = new ProcessGraph(req.body.process, this.context.processingContext(req.user)); + const pg = new ProcessGraph(req.body.process, this.context.processingContext(req.user, service)); // ToDo 1.0: Correctly handle service paramaters #79 pg.allowUndefinedParameters(false); promises.push(pg.validate()); diff --git a/src/api/worker/batchjob.js b/src/api/worker/batchjob.js index b22e019..04eddcf 100644 --- a/src/api/worker/batchjob.js +++ b/src/api/worker/batchjob.js @@ -19,33 +19,64 @@ export default async function run(config, storage, user, query) { await Promise.all(cleanupTasks); logger.info("Starting batch job"); + await storage.updateJobStatus(query, 'running'); - const context = config.processingContext(user); + const jobfolder = storage.getJobFolder(job._id); + await fse.ensureDir(path.dirname(jobfolder)); + + const context = config.processingContext(user, job); const pg = new ProcessGraph(job.process, context, logger); await pg.execute(); - const computeTasks = pg.getResults().map(async (datacube) => { - const response = await GeeResults.retrieve(context, datacube, logger); - const params = datacube.getOutputFormatParameters(); - const filename = (params.name || String(Utils.generateHash())) + GeeResults.getFileExtension(datacube, config); - const filepath = storage.getJobFile(job._id, filename); - logger.debug("Storing result to: " + filepath); - await fse.ensureDir(path.dirname(filepath)); - await new Promise((resolve, reject) => { - const writer = fse.createWriteStream(filepath); - response.data.pipe(writer); - writer.on('error', reject); - writer.on('close', resolve); - }); - return { filepath, datacube }; + const computeTasks = pg.getResults().map(async (dc) => { + const format = config.getOutputFormat(dc.getOutputFormat()); + const datacube = format.preprocess(GeeResults.BATCH, context, dc, logger); + + if (format.canExport()) { + const tasks = await format.export(context.ee, dc, context.getResource()); + storage.addTasks(job, tasks); + context.startTaskMonitor(); + const filepath = await new Promise((resolve, reject) => { + setInterval(async () => { + const updatedJob = await storage.getById(job._id, job.user_id); + if (!updatedJob) { + reject(new Error("Job was deleted")); + } + if (['canceled', 'error', 'finished'].includes(updatedJob.status)) { + // todo: resolve google drive URLs + resolve(job.googleDriveResults); + } + }, 10000); + }); + return { filepath, datacube }; + } + else { + const response = await format.retrieve(context.ee, dc); + const params = datacube.getOutputFormatParameters(); + const filename = (params.name || String(Utils.generateHash())) + GeeResults.getFileExtension(datacube, config); + const filepath = storage.getJobFile(job._id, filename); + await new Promise((resolve, reject) => { + const writer = fse.createWriteStream(filepath); + response.data.pipe(writer); + writer.on('error', reject); + writer.on('close', resolve); + }); + return { filepath, datacube }; + } }); await Promise.all(computeTasks); const results = []; for (const task of computeTasks) { - results.push(await task); + const { filepath, datacube } = await task; + if (Array.isArray(filepath)) { + filepath.forEach(fp => results.push({ filepath: fp, datacube })); + } + else { + results.push({ filepath, datacube }); + } } const item = await createSTAC(storage, job, results); @@ -53,6 +84,7 @@ export default async function run(config, storage, user, query) { await fse.writeJSON(stacpath, item, {spaces: 2}); logger.info("Finished"); + // todo: set to error is any task failed storage.updateJobStatus(query, 'finished'); } catch(e) { logger.error(e); @@ -78,17 +110,36 @@ async function createSTAC(storage, job, results) { let endTime = null; const extents = []; for(const { filepath, datacube } of results) { - const filename = path.basename(filepath); - const stat = await fse.stat(filepath); - let asset = { - href: path.relative(folder, filepath), - roles: ["data"], - type: Utils.extensionToMediaType(filepath), - title: filename, - "file:size": stat.size, - created: stat.birthtime, - updated: stat.mtime - }; + if (!filepath) { + continue; + } + + let asset; + let filename; + if (Utils.isUrl(filepath)) { + let url = new URL(filepath); + console.log(url); + filename = path.basename(url.pathname || url.hash.substring(1)); + asset = { + href: filepath, + roles: ["data"], +// type: Utils.extensionToMediaType(filepath), + title: filename + }; + } + else { + filename = path.basename(filepath); + const stat = await fse.stat(filepath); + asset = { + href: path.relative(folder, filepath), + roles: ["data"], + type: Utils.extensionToMediaType(filepath), + title: filename, + "file:size": stat.size, + created: stat.birthtime, + updated: stat.mtime + }; + } if (datacube.hasT()) { const t = datacube.dimT(); diff --git a/src/api/worker/sync.js b/src/api/worker/sync.js index 9934e1f..4ebe730 100644 --- a/src/api/worker/sync.js +++ b/src/api/worker/sync.js @@ -25,7 +25,11 @@ export default async function run(config, user, id, process, log_level) { if (pg.getResults().length > 1) { logger.warn("Multiple results can't be processed in synchronous mode. Only the result from the result node will be returned."); } - return await GeeResults.retrieve(context, resultNode.getResult(), logger); + + const dc = resultNode.getResult(); + const format = config.getOutputFormat(dc.getOutputFormat()); + const dc2 = format.preprocess(GeeResults.SYNC, context, dc, logger); + return await format.retrieve(context.ee, dc2); } export async function getResultLogs(user_id, id, log_level) { diff --git a/src/api/worker/webservice.js b/src/api/worker/webservice.js index 64075ee..ff5d5e4 100644 --- a/src/api/worker/webservice.js +++ b/src/api/worker/webservice.js @@ -9,7 +9,7 @@ export default async function run(config, storage, user, query, xyz) { try { const rect = storage.calculateXYZRect(...xyz); - const context = config.processingContext(user); + const context = config.processingContext(user, service); // Update user id to the user id, which stored the job. // See https://github.com/Open-EO/openeo-earthengine-driver/issues/19 context.setUserId(service.user_id); @@ -29,7 +29,9 @@ export default async function run(config, storage, user, query, xyz) { dc.setOutputFormat('png'); } - return await GeeResults.retrieve(context, dc, logger); + const format = config.getOutputFormat(dc.getOutputFormat()); + const dc2 = format.preprocess(GeeResults.SERVICE, context, dc, logger); + return await format.retrieve(context.ee, dc2); } catch(e) { logger.error(e); throw e; diff --git a/src/formats/bitmap.js b/src/formats/bitmap.js index 679a7d7..b2b0cb2 100644 --- a/src/formats/bitmap.js +++ b/src/formats/bitmap.js @@ -3,6 +3,7 @@ import GeeResults from "../processes/utils/results.js"; import DataCube from "../datacube/datacube.js"; import Utils from "../utils/utils.js"; import FileFormat, { EPSGCODE_PARAMETER, SIZE_PARAMETER } from "./fileformat.js"; +import HttpUtils from "../utils/http.js"; export const EPSGCODE_PARAMETER_BITMAP = Object.assign({}, EPSGCODE_PARAMETER); EPSGCODE_PARAMETER_BITMAP.default = 4326; @@ -96,7 +97,7 @@ export default class BitmapLike extends FileFormat { return renderer === 'filmstrip'; } - preprocess(context, dc, logger) { + preprocess(mode, context, dc, logger) { const ee = context.ee; const parameters = dc.getOutputFormatParameters(); @@ -175,7 +176,7 @@ export default class BitmapLike extends FileFormat { reject('Download URL provided by Google Earth Engine is empty.'); } else { - resolve(url); + resolve(HttpUtils.stream(url)); } }); }); diff --git a/src/formats/fileformat.js b/src/formats/fileformat.js index a812d0b..d8e2c23 100644 --- a/src/formats/fileformat.js +++ b/src/formats/fileformat.js @@ -16,7 +16,7 @@ export const SIZE_PARAMETER = { export const SCALE_PARAMETER = { type: 'number', - description: 'Scale of the image in meters per pixel.', + description: 'Scale of the image in meters per pixel. Defaults to native resolution in batch jobs, and 100 otherwise.', default: 100, minimum: 1 }; @@ -81,15 +81,19 @@ export default class FileFormat { }; } - preprocess(context, dc/*, logger*/) { + preprocess(mode, context, dc/*, logger*/) { return dc; } - async retrieve(/*ee, dc*/) { + async retrieve(/*ee, dc */) { throw new Error('Not implemented'); } - async export(/*ee, dc*/) { + canExport() { + return false; + } + + async export(/*ee, dc */) { throw new Error('Not implemented'); } diff --git a/src/formats/gif.js b/src/formats/gif.js index bc57ed6..27eb9c3 100644 --- a/src/formats/gif.js +++ b/src/formats/gif.js @@ -1,3 +1,4 @@ +import HttpUtils from "../utils/http.js"; import Utils from "../utils/utils.js"; import BitmapLike from "./bitmap.js"; @@ -53,7 +54,7 @@ export default class GifFormat extends BitmapLike { reject('Download URL provided by Google Earth Engine is empty.'); } else { - resolve(url); + resolve(HttpUtils.stream(url)); } }); }); diff --git a/src/formats/gtiff.js b/src/formats/gtiff.js index 979386d..e3ce7a7 100644 --- a/src/formats/gtiff.js +++ b/src/formats/gtiff.js @@ -2,28 +2,24 @@ import GeeResults from "../processes/utils/results.js"; import DataCube from "../datacube/datacube.js"; import Utils from "../utils/utils.js"; import FileFormat, { EPSGCODE_PARAMETER, SCALE_PARAMETER } from "./fileformat.js"; +import HttpUtils from "../utils/http.js"; export default class GTiffFormat extends FileFormat { constructor(title = 'GeoTiff', parameters = {}) { - super(title, parameters); + super(title, parameters, "Cloud-optimized in batch jobs, not cloud-optimized otherwise."); this.addParameter('scale', SCALE_PARAMETER); this.addParameter('epsgCode', EPSGCODE_PARAMETER); - this.addParameter('zipped', { - type: 'boolean', - description: 'Pack the GeoTiff files into ZIP files, one file per band.', - default: false - }); } getGisDataTypes() { return ['raster']; } - getFileExtension(parameters) { - return parameters.zipped ? '.zip' : '.tiff'; + getFileExtension(/*parameters*/) { + return '.tiff'; } - preprocess(context, dc, logger) { + preprocess(mode, context, dc, logger) { const ee = context.ee; const parameters = dc.getOutputFormatParameters(); const dc2 = new DataCube(ee, dc); @@ -33,7 +29,8 @@ export default class GTiffFormat extends FileFormat { if (dc2.hasT()) { dc2.dimT().drop(); } - return dc2.setData(GeeResults.toImageOrCollection(ee, logger, dc.getData())); + const allowMultiple = (mode === GeeResults.BATCH); + return dc2.setData(GeeResults.toImageOrCollection(ee, logger, dc.getData(), allowMultiple)); } async retrieve(ee, dc) { @@ -46,18 +43,13 @@ export default class GTiffFormat extends FileFormat { crs = Utils.crsToString(dc.getCrs()); } - const format = parameters.zipped ? 'ZIPPED_GEO_TIFF' : 'GEO_TIFF'; - const data = dc.getData(); - if (data instanceof ee.ImageCollection) { - // todo: implement - } - else if (data instanceof ee.Image) { + if (data instanceof ee.Image) { const eeOpts = { scale: parameters.scale || 100, region, crs, - format + format: 'GEO_TIFF' }; return await new Promise((resolve, reject) => { data.getDownloadURL(eeOpts, (url, err) => { @@ -68,12 +60,25 @@ export default class GTiffFormat extends FileFormat { reject('Download URL provided by Google Earth Engine is empty.'); } else { - resolve(url); + resolve(HttpUtils.stream(url)); } }); }); } + else { + throw new Error('Only single images are supported in this processing mode for GeoTIFF.'); + } + } + + canExport() { + return true; + } + async export(ee, dc, job) { + return await GeeResults.exportToDrive(ee, dc, job, 'GeoTIFF', { + cloudOptimized: true, + // noData: NaN + }); } } diff --git a/src/models/jobstore.js b/src/models/jobstore.js index 074fa16..90a21ec 100644 --- a/src/models/jobstore.js +++ b/src/models/jobstore.js @@ -4,11 +4,22 @@ import fse from 'fs-extra'; import path from 'path'; import Errors from '../utils/errors.js'; import Logs from './logs.js'; +import Utils from '../utils/utils.js'; + +const TASK_STATUS_MAP = { + PENDING: 'queued', + RUNNING : 'running', + CANCELLING: 'cancelled', + SUCCEEDED: 'finished', + CANCELLED: 'canceled', + FAILED: 'error' +}; export default class JobStore { constructor() { this.db = DB.load('jobs'); + this.taskDb = DB.load('tasks'); this.editableFields = ['title', 'description', 'process', 'plan', 'budget']; this.jobFolder = './storage/job_files'; this.logFileName = 'logs.db'; @@ -72,6 +83,182 @@ export default class JobStore { } } + async addTasks(job, tasks) { + const rows = []; + let logger = null; + for (const task of tasks) { + let status = 'queued'; + if (task.error) { + status = 'error'; + if (logger === null) { + logger = await this.getLogsById(job._id, job.log_level); + } + logger.error(task.error, {taskId: task.taskId}); + } + rows.push({ + task_id: task.taskId, + image_id: task.imageId, + job_id: job._id, + user_id: job.user_id, + google_user_id: Utils.isGoogleUser(job.user_id) ? job.user_id : '', + done: false, + script: null, + status, + stageOffset: 0, + progress: 0, + updated: Utils.toISODate(Date.now()), + results: [], + usage: null + }); + } + await this.taskDb.insertAsync(rows); + } + + async getTaskCount(google_user_id) { + return await this.taskDb.countAsync({google_user_id}); + } + + async updateTasks(ops) { + const documents = {}; + const newLogs = {}; + for (const op of ops) { + try { + const taskId = op.name.split('/').pop(); + const query = { task_id: taskId }; + const task = await this.taskDb.findOneAsync(query); + if (!task || (task.done && op.done)) { + // (1) Task not found, probably started in GEE directly, OR + // (2) Task is done and we synced it already, no need to update + continue; + } + + const jobId = task.job_id; + documents[jobId] = documents[jobId] || []; + newLogs[jobId] = newLogs[jobId] || []; + + if (task.attempt > 1 && task.attempt !== op.metadata.attempt) { + // New attempt, reset stage offset + task.stageOffset = 0; + newLogs[jobId].push({ + level: 'warning', + message: 'Started new attempt, previous attempt was aborted.' + }); + } + + if (Array.isArray(op.metadata.stages)) { + const stages = op.metadata.stages.slice(task.stageOffset); + for (const stage of stages) { + task.stageOffset++; + newLogs[jobId].push({ + level: 'info', + message: stage.description + }); + } + } + if (op.done && op.error) { + newLogs[jobId].push({ + level: 'error', + message: op.error.message, + code: String(op.error.message.code), + data: op.error.details + }); + } + + let progress = ops.done ? 100 : 0; + if (op.metadata.progress > 0) { + progress = op.metadata.progress * 100; + } + + if (!task.script && op.metadata.scriptUri) { + newLogs[jobId].push({ + level: 'info', + message: 'Google Earth Engine script can be found at: ' + op.metadata.scriptUri + }); + } + + const update = { + $set: { + status: TASK_STATUS_MAP[op.metadata.state], + progress, + attempt: op.metadata.attempt || 0, + done: op.done || false, + updated: op.metadata.updateTime, + script: op.metadata.scriptUri || null, + results: op.metadata.destinationUris || [], + usage: op.metadata.batchEecuUsageSeconds || 0, + stageOffset: task.stageOffset + } + }; + + const { affectedDocuments } = await this.taskDb.updateAsync(query, update, { returnUpdatedDocs: true }); + if (affectedDocuments) { + documents[task.job_id].push(affectedDocuments); + } + } catch (e) { + console.error(e); + } + + for(const jobId in documents) { + const tasks = documents[jobId]; + const job = await this.getById(jobId, tasks[0].user_id); + + const jobUpdates = { + updated: Utils.toISODate(Date.now()) + }; + if (tasks.some(t => t.status === 'running')) { + jobUpdates.status = 'running'; + } + else if (tasks.every(t => t.status === 'finished')) { + jobUpdates.status = 'finished'; + } + else if (tasks.some(t => t.status === 'error')) { + jobUpdates.status = 'error'; + } + else if (tasks.some(t => t.status === 'canceled')) { + jobUpdates.status = 'canceled'; + } + else { + jobUpdates.status = 'queued'; + } + + jobUpdates.progress = tasks.reduce((acc, t) => acc + t.progress, 0) / tasks.length; + + jobUpdates.usage = { + cpu: { + value: tasks.reduce((acc, t) => acc + t.batchEecuUsageSeconds, 0), + unit: 'eecu-seconds' + } + }; + + jobUpdates.googleDriveResults = tasks.reduce((acc, t) => acc.concat(t.results), []); + + await this.db.updateAsync({_id: jobId}, { $set: jobUpdates }); + + if (newLogs[jobId].length > 0) { + const logs = await this.getLogsById(jobId, job.log_level); + for (const log of newLogs[jobId]) { + logs.add(log.message, log.level, log.data, logs.trace, log.code); + } + } + } + } + } + + async removeTasks(taskIds) { + if (!Array.isArray(taskIds)) { + taskIds = [taskIds]; + } + + const query = { task_id: { $in: taskIds } }; + const { numRemoved } = await this.taskDb.removeAsync(query); + return numRemoved; + } + + async removeTasksForUser(userId) { + const { numRemoved } = await this.taskDb.removeAsync({ user_id: userId }); + return numRemoved; + } + async removeResults(jobId, removeLogs = true) { const p = this.makeFolder(this.jobFolder, [jobId]); if (!p) { diff --git a/src/models/userstore.js b/src/models/userstore.js index 5cfc0cd..cd3ef2e 100644 --- a/src/models/userstore.js +++ b/src/models/userstore.js @@ -21,6 +21,8 @@ export default class UserStore { "openid", "email", "https://www.googleapis.com/auth/earthengine", + "https://www.googleapis.com/auth/drive.file", + // "https://www.googleapis.com/auth/drive", // "https://www.googleapis.com/auth/cloud-platform", // "https://www.googleapis.com/auth/devstorage.full_control" ]; @@ -157,6 +159,10 @@ export default class UserStore { const jobCount = await jobDb.removeAsync({user_id}, {multi: true}); console.log(`- Removed ${jobCount} batch jobs from database`); + // note: doesn't cancel tasks, just removes them from the database + const taskCount = jobModel.removeTasksForUser(user_id); + console.log(`- Removed ${taskCount} GEE tasks from database`); + await Promise.all(jobs.map(job => jobModel.removeResults(job._id))); console.log('- Removed batch job results and logs from file system'); } catch (err) { @@ -207,7 +213,7 @@ export default class UserStore { async authenticateGoogle(token) { const userInfo = await this.getOidcUserInfo(token); const userData = this.emptyUser(false); - userData._id = "google_" + userInfo.sub; + userData._id = Utils.GOOGLE_USER_PREFIX + userInfo.sub; userData.name = userInfo.name || userInfo.email || null; userData.email = userInfo.email || null; userData.token = token; diff --git a/src/processes/utils/results.js b/src/processes/utils/results.js index 4f6f44e..23f3c3d 100644 --- a/src/processes/utils/results.js +++ b/src/processes/utils/results.js @@ -1,8 +1,13 @@ +import Utils from '../../utils/utils.js'; +import GeeProcessing from './processing.js'; import GeeTypes from './types.js'; -import HttpUtils from '../../utils/http.js'; const GeeResults = { + BATCH: 1, + SYNC: 2, + SERVICE: 3, + toImageOrCollection(ee, logger, data, allowMultiple = false) { const eeData = GeeTypes.toEE(ee, logger, data); if (eeData instanceof ee.Image) { @@ -37,21 +42,67 @@ const GeeResults = { return ext || ''; }, - // Returns AxiosResponse (object) or URL (string) - async retrieve(context, dc, logger) { - const ee = context.ee; - const config = context.server(); - - const format = config.getOutputFormat(dc.getOutputFormat()); - dc = format.preprocess(context, dc, logger); + async exportToDrive(ee, dc, job, fileFormat, formatOptions) { + const parameters = dc.getOutputFormatParameters(); - let response = await format.retrieve(ee, dc); - if (typeof response === 'string') { - logger.debug("Downloading data from Google: " + response); - response = await HttpUtils.stream(response); + let region = null; + let crs = null; + if (dc.hasXY()) { + region = Utils.bboxToGeoJson(dc.getSpatialExtent()); + crs = Utils.crsToString(dc.getCrs()); } - return response; + const data = ee.ImageCollection(dc.getData()); + const imageList = data.toList(data.size()); + const imgCount = await GeeProcessing.evaluate(imageList.size()); + const tasks = []; + for (let i = 0; i < imgCount; i++) { + let taskId = null; + let error = null; + let imageId; + try { + const image = ee.Image(imageList.get(i)); + imageId = await GeeProcessing.evaluate(image.id()); + + let crsTransform, scale; + if (parameters.scale > 0) { + scale = parameters.scale; + } + else { + const projection = await GeeProcessing.evaluate(image.projection()); + crsTransform = projection.transform; + } + + const task = ee.batch.Export.image.toDrive({ + image, + description: job.description, + folder: job._id, + fileNamePrefix: imageId, + skipEmptyTiles: true, + crs, + crsTransform, + region, + scale, + fileFormat, + formatOptions + }); + taskId = await new Promise((resolve, reject) => { + task.start( + () => resolve(task.id), + (message) => reject(new Error(message)) + ) + }); + } catch (e) { + error = e.message; + } finally { + tasks.push({ + taskId, + imageId, + error + }); + } + } + return tasks; } }; diff --git a/src/processgraph/jsonschema.js b/src/processgraph/jsonschema.js index abff0fe..e19f702 100644 --- a/src/processgraph/jsonschema.js +++ b/src/processgraph/jsonschema.js @@ -18,7 +18,7 @@ export default class GeeJsonSchemaValidator extends JsonSchemaValidator { } async validateJobId(data) { - const job = await this.context.getJob(data); + const job = await this.context.server().jobs().getById(data); if (job === null) { throw new ajv.ValidationError([{ message: "Job doesn't exist." @@ -46,7 +46,7 @@ export default class GeeJsonSchemaValidator extends JsonSchemaValidator { } async validateProcessGraphId(data) { - const pg = await this.context.getStoredProcessGraph(data); + const pg = await this.context.server().storedProcessGraphs().getById(data); if (pg === null) { throw new ajv.ValidationError([{ message: "Stored process graph doesn't exist." diff --git a/src/utils/processingcontext.js b/src/utils/processingcontext.js index c01e4d9..1b6231e 100644 --- a/src/utils/processingcontext.js +++ b/src/utils/processingcontext.js @@ -3,22 +3,31 @@ import fse from 'fs-extra'; export default class ProcessingContext { - constructor(serverContext, user = null) { + constructor(serverContext, user = null, parentResource = null) { this.serverContext = serverContext; this.user = user; this.userId = user ? user._id : null; + this.googleUserId = ''; + this.parentResource = parentResource; this.ee = Utils.require('@google/earthengine'); this.eePrivateKey = null; + this.taskMonitorId = null; + this.connected = false; } async connectGee(forcePrivateKey = false) { + if (this.connected) { + return this.ee; + } + const user = this.getUser(); const ee = this.ee; - if (!forcePrivateKey && typeof this.userId === 'string' && this.userId.startsWith("google-")) { + if (!forcePrivateKey && Utils.isGoogleUser(this.userId)) { console.log("Authenticate via user token"); const expires = 59 * 60; // todo auth: get expiration from token and set more parameters #82 ee.apiclient.setAuthToken(null, 'Bearer', user.token, expires, [], null, false, false); + this.googleUserId = this.userId; } else if (this.serverContext.serviceAccountCredentialsFile) { console.log("Authenticate via private key"); @@ -41,6 +50,7 @@ export default class ProcessingContext { } await ee.initialize(); + this.connected = true; return ee; } @@ -52,12 +62,8 @@ export default class ProcessingContext { return this.serverContext.collections().getData(id); } - getStoredProcessGraph(id) { // returns promise - return this.serverContext.storedProcessGraphs().getById(id); - } - - getJob(jobId) { // returns promise - return this.serverContext.jobs().getById(jobId); + getResource() { + return this.parentResource; } getVariable(id) { @@ -76,4 +82,32 @@ export default class ProcessingContext { return this.user; } + getGoogleUserId() { + return Utils.isGoogleUser(this.userId) ? this.userId : ''; + } + + startTaskMonitor() { + const googleUserId = this.getGoogleUserId(); + this.serverContext.removeTaskMonitor(googleUserId); + this.taskMonitorId = setTimeout(this.monitorTasks.bind(this), 60 * 1000); + this.serverContext.addTaskMonitor(googleUserId, this.taskMonitorId); + this.monitorTasks(); + } + + async monitorTasks() { + const jobModel = this.serverContext.jobs(); + const taskCount = await this.serverContext.jobs().getTaskCount(this.getGoogleUserId()); + if (taskCount === 0) { + // Nothing to monitor + return; + } + try { + await this.connectGee(); + this.ee.data.listOperations(null, ops => jobModel.updateTasks(ops)); + } catch (e) { + this.serverContext.removeTaskMonitor(this.googleUserId); + console.error(e); + } + } + } diff --git a/src/utils/servercontext.js b/src/utils/servercontext.js index 22bbce7..e870fdb 100644 --- a/src/utils/servercontext.js +++ b/src/utils/servercontext.js @@ -21,6 +21,7 @@ export default class ServerContext extends Config { this.jobStore = new JobStore(); this.userStore = new UserStore(this); this.serviceStore = new ServiceStore(); + this.taskMonitors = {}; } jobs() { @@ -74,8 +75,19 @@ export default class ServerContext extends Config { return (typeof service_type === 'string' && Utils.isObject(this.services[service_type.toLowerCase()])); } - processingContext(user) { - return new ProcessingContext(this, user); + processingContext(user, parentResource = null) { + return new ProcessingContext(this, user, parentResource); + } + + addTaskMonitor(userId, monitorId) { + this.taskMonitors[userId] = monitorId; + } + + removeTaskMonitor(userId) { + if (userId in this.taskMonitors) { + clearTimeout(this.taskMonitors[userId]); + delete this.taskMonitors[userId]; + } } } diff --git a/src/utils/utils.js b/src/utils/utils.js index 0550d0f..586ae7e 100644 --- a/src/utils/utils.js +++ b/src/utils/utils.js @@ -9,6 +9,16 @@ const Utils = { crsBboxes: {}, + GOOGLE_USER_PREFIX: "google_", + + isGoogleUser(userId) { + return typeof userId === 'string' && userId.startsWith(Utils.GOOGLE_USER_PREFIX); + }, + + isUrl(url) { + return typeof url === 'string' && url.includes('://'); + }, + require(file) { const require = createRequire(import.meta.url); return require(file);