-
-
Notifications
You must be signed in to change notification settings - Fork 8
/
t6jobs.js
99 lines (90 loc) · 2.9 KB
/
t6jobs.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"use strict";
var t6jobs = module.exports = {};
t6jobs.export = function() {
t6console.log(JSON.stringify());
};
t6jobs.getLength = function(query, limit) {
return jobs.chain().find(query!==null?query:{}).limit(limit!==null?limit:1).data().length;
};
t6jobs.remove = function(query, limit) {
jobs = db_jobs.getCollection("jobs");
return new Promise((resolve, reject) => {
if ( jobs.chain().find(typeof query!=="undefined"?query:{}).limit(typeof limit!=="undefined"?limit:null).remove() ) {
db_jobs.saveDatabase();
t6events.addAudit("t6App", "t6jobs.remove", query.job_id, "", {"status": 200});
return resolve(query.job_id);
} else {
return reject("Can't find job to be removed.");
}
});
};
t6jobs.get = function(query, limit) {
jobs = db_jobs.getCollection("jobs");
return jobs.chain().find(query).limit(limit!==null?limit:1).data();
};
t6jobs.getJobs = function(job_id=null, user_id=null, taskType=null) {
let query = {};
if(taskType) {
query.taskType = taskType;
}
if(user_id) {
query.user_id = user_id;
}
if(job_id) {
query.job_id = job_id;
}
t6console.debug("query on get Jobs Ids", query);
let j = jobs.chain().find(query).data();
let ids = [];
j.map(function(job) {
ids.push({"job_id": job.job_id, "user_id": job.user_id, "metadata": job.metadata, "taskType": job.taskType, "queue_id": typeof job.$loki!=="undefined"?job.$loki:null});
});
return ids;
};
t6jobs.add = function(job) {
let job_id = uuid.v4();
let newJob = {
"job_id": job_id,
"taskType": job.taskType,
"flow_id": job.flow_id,
"execTime": ((parseInt(job.time, 10)/1000)+parseInt(typeof job.ttl!=="undefined"?job.ttl:3600, 10))*1000,
"ttl": parseInt(typeof job.ttl!=="undefined"?job.ttl:3600, 10)*1000,
"track_id": job.track_id,
"user_id": job.user_id,
"metadata": typeof job.metadata!=="undefined"?job.metadata:null
};
//t6console.log(JSON.stringify(newJob));
jobs.insert(newJob);
return job_id;
};
t6jobs.start = function(limit) {
var query = {
"$and": [
{ "taskType" : "fuse" },
{ "execTime": { "$lte": moment() } },
]
};
var jobsToExec = jobs.chain().find(query).limit(limit).data();
if ( jobsToExec ) {
jobsToExec.map(function(j) {
t6console.log(`Executing Job ${j.job_id}`, moment(j.execTime).format(logDateFormat));
t6console.log(JSON.stringify(j));
t6preprocessor.fuse();
jobs.remove(j);
db_jobs.saveDatabase();
});
}
};
t6jobs.next = function() {
t6console.log("t6queue contains "+queue.getLength()+" job/s");
if(task.job.taskType === "fuse" && task.job.time+task.job.ttl>Date.now()) {
t6console.log("next task", task.job.time, task.job.ttl, ">", Date.now());
t6console.log("Processing task", JSON.stringify(task));
setTimeout(function() {
t6console.log("Processed fusion task", task.id);
}, 10000);
} else {
t6console.log(`next task ${task.id} will be run later at ${moment(task.job.time+task.job.ttl).format(logDateFormat)}`);
}
};
module.exports = t6jobs;