Skip to content

Commit

Permalink
Merge branch 'master' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
joecks authored Feb 6, 2025
2 parents a6b4e5e + d72b40c commit 80e07e2
Show file tree
Hide file tree
Showing 148 changed files with 3,257 additions and 2,568 deletions.
2 changes: 1 addition & 1 deletion lerna.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "node_modules/lerna/schemas/lerna-schema.json",
"version": "3.3.6",
"version": "3.4.3",
"npmClient": "yarn",
"concurrency": 20,
"command": {
Expand Down
126 changes: 76 additions & 50 deletions packages/backend-core/src/queue/inMemoryQueue.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,58 @@
import events from "events"
import { newid } from "../utils"
import { Queue, QueueOptions, JobOptions } from "./queue"
import { helpers } from "@budibase/shared-core"
import { Job, JobId, JobInformation } from "bull"

function jobToJobInformation(job: Job): JobInformation {
let cron = ""
let every = -1
let tz: string | undefined = undefined
let endDate: number | undefined = undefined

const repeat = job.opts?.repeat
if (repeat) {
endDate = repeat.endDate ? new Date(repeat.endDate).getTime() : Date.now()
tz = repeat.tz
if ("cron" in repeat) {
cron = repeat.cron
} else {
every = repeat.every
}
}

interface JobMessage {
return {
id: job.id.toString(),
name: "",
key: job.id.toString(),
tz,
endDate,
cron,
every,
next: 0,
}
}

interface JobMessage<T = any> extends Partial<Job<T>> {
id: string
timestamp: number
queue: string
queue: Queue<T>
data: any
opts?: JobOptions
}

/**
* Bull works with a Job wrapper around all messages that contains a lot more information about
* the state of the message, this object constructor implements the same schema of Bull jobs
* for the sake of maintaining API consistency.
* @param queue The name of the queue which the message will be carried on.
* @param message The JSON message which will be passed back to the consumer.
* @returns A new job which can now be put onto the queue, this is mostly an
* internal structure so that an in memory queue can be easily swapped for a Bull queue.
*/
function newJob(queue: string, message: any, opts?: JobOptions): JobMessage {
return {
id: newid(),
timestamp: Date.now(),
queue: queue,
data: message,
opts,
}
}

/**
* This is designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock.
* It is relatively simple, using an event emitter internally to register when messages are available
* to the consumers - in can support many inputs and many consumers.
* This is designed to replicate Bull (https://github.com/OptimalBits/bull) in
* memory as a sort of mock. It is relatively simple, using an event emitter
* internally to register when messages are available to the consumers - in can
* support many inputs and many consumers.
*/
class InMemoryQueue implements Partial<Queue> {
_name: string
_opts?: QueueOptions
_messages: JobMessage[]
_queuedJobIds: Set<string>
_emitter: NodeJS.EventEmitter
_emitter: NodeJS.EventEmitter<{ message: [JobMessage]; completed: [Job] }>
_runCount: number
_addCount: number

Expand Down Expand Up @@ -69,34 +82,29 @@ class InMemoryQueue implements Partial<Queue> {
*/
async process(concurrencyOrFunc: number | any, func?: any) {
func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc
this._emitter.on("message", async () => {
if (this._messages.length <= 0) {
return
}
let msg = this._messages.shift()

let resp = func(msg)
this._emitter.on("message", async message => {
let resp = func(message)

async function retryFunc(fnc: any) {
try {
await fnc
} catch (e: any) {
await new Promise<void>(r => setTimeout(() => r(), 50))

await retryFunc(func(msg))
await helpers.wait(50)
await retryFunc(func(message))
}
}

if (resp.then != null) {
try {
await retryFunc(resp)
this._emitter.emit("completed", message as Job)
} catch (e: any) {
console.error(e)
}
}
this._runCount++
const jobId = msg?.opts?.jobId?.toString()
if (jobId && msg?.opts?.removeOnComplete) {
const jobId = message.opts?.jobId?.toString()
if (jobId && message.opts?.removeOnComplete) {
this._queuedJobIds.delete(jobId)
}
})
Expand Down Expand Up @@ -130,9 +138,16 @@ class InMemoryQueue implements Partial<Queue> {
}

const pushMessage = () => {
this._messages.push(newJob(this._name, data, opts))
const message: JobMessage = {
id: newid(),
timestamp: Date.now(),
queue: this as unknown as Queue,
data,
opts,
}
this._messages.push(message)
this._addCount++
this._emitter.emit("message")
this._emitter.emit("message", message)
}

const delay = opts?.delay
Expand All @@ -158,13 +173,6 @@ class InMemoryQueue implements Partial<Queue> {
console.log(cronJobId)
}

/**
* Implemented for tests
*/
async getRepeatableJobs() {
return []
}

async removeJobs(_pattern: string) {
// no-op
}
Expand All @@ -176,13 +184,31 @@ class InMemoryQueue implements Partial<Queue> {
return []
}

async getJob() {
async getJob(id: JobId) {
for (const message of this._messages) {
if (message.id === id) {
return message as Job
}
}
return null
}

on() {
// do nothing
return this as any
on(event: string, callback: (...args: any[]) => void): Queue {
// @ts-expect-error - this callback can be one of many types
this._emitter.on(event, callback)
return this as unknown as Queue
}

async count() {
return this._messages.length
}

async getCompletedCount() {
return this._runCount
}

async getRepeatableJobs() {
return this._messages.map(job => jobToJobInformation(job as Job))
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/backend-core/src/sql/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ class InternalBuilder {
}
}

if (typeof input === "string") {
if (typeof input === "string" && schema.type === FieldType.DATETIME) {
if (isInvalidISODateString(input)) {
return null
}
Expand Down
39 changes: 24 additions & 15 deletions packages/backend-core/tests/core/utilities/mocks/licenses.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import { Feature, License, Quotas } from "@budibase/types"
import {
Feature,
License,
MonthlyQuotaName,
QuotaType,
QuotaUsageType,
} from "@budibase/types"
import cloneDeep from "lodash/cloneDeep"
import merge from "lodash/merge"

let CLOUD_FREE_LICENSE: License
let UNLIMITED_LICENSE: License
Expand Down Expand Up @@ -27,18 +34,19 @@ export function initInternal(opts: {

export interface UseLicenseOpts {
features?: Feature[]
quotas?: Quotas
monthlyQuotas?: [MonthlyQuotaName, number][]
}

// LICENSES

export const useLicense = (license: License, opts?: UseLicenseOpts) => {
if (opts) {
if (opts.features) {
license.features.push(...opts.features)
}
if (opts.quotas) {
license.quotas = opts.quotas
if (opts?.features) {
license.features.push(...opts.features)
}
if (opts?.monthlyQuotas) {
for (const [name, value] of opts.monthlyQuotas) {
license.quotas[QuotaType.USAGE][QuotaUsageType.MONTHLY][name].value =
value
}
}

Expand All @@ -57,12 +65,9 @@ export const useCloudFree = () => {

// FEATURES

const useFeature = (feature: Feature) => {
const useFeature = (feature: Feature, extra?: Partial<UseLicenseOpts>) => {
const license = cloneDeep(getCachedLicense() || UNLIMITED_LICENSE)
const opts: UseLicenseOpts = {
features: [feature],
}

const opts: UseLicenseOpts = merge({ features: [feature] }, extra)
return useLicense(license, opts)
}

Expand Down Expand Up @@ -102,8 +107,12 @@ export const useAppBuilders = () => {
return useFeature(Feature.APP_BUILDERS)
}

export const useBudibaseAI = () => {
return useFeature(Feature.BUDIBASE_AI)
export const useBudibaseAI = (opts?: { monthlyQuota?: number }) => {
return useFeature(Feature.BUDIBASE_AI, {
monthlyQuotas: [
[MonthlyQuotaName.BUDIBASE_AI_CREDITS, opts?.monthlyQuota || 1000],
],
})
}

export const useAICustomConfigs = () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/bbui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
"dayjs": "^1.10.8",
"easymde": "^2.16.1",
"svelte-dnd-action": "^0.9.8",
"svelte-portal": "^1.0.0"
"svelte-portal": "^2.2.1"
},
"resolutions": {
"loader-utils": "1.4.1"
Expand Down
Loading

0 comments on commit 80e07e2

Please sign in to comment.