Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@tus/server: add GCS locker #616

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,136 changes: 2,300 additions & 1,836 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions packages/gcs-store/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"debug": "^4.3.4"
},
"devDependencies": {
"@google-cloud/storage": "^6.12.0",
"@google-cloud/storage": "^7.15.0",
"@tus/server": "^1.10.0",
"@types/debug": "^4.1.12",
"@types/mocha": "^10.0.6",
Expand All @@ -27,7 +27,7 @@
"should": "^13.2.3"
},
"peerDependencies": {
"@google-cloud/storage": "*"
"@google-cloud/storage": "^7.12.0"
},
"engines": {
"node": ">=16"
Expand Down
3 changes: 3 additions & 0 deletions packages/gcs-store/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const log = debug('tus-node-server:stores:gcsstore')

type Options = {bucket: Bucket}

export {GCSLocker} from './locker/GCSLocker'

export class GCSStore extends DataStore {
bucket: Bucket

Expand Down Expand Up @@ -166,6 +168,7 @@ export class GCSStore extends DataStore {

await this.bucket.file(id).setMetadata({metadata: this.#stringifyUploadKeys(upload)})
}

/**
* Convert the Upload object to a format that can be stored in GCS metadata.
*/
Expand Down
126 changes: 126 additions & 0 deletions packages/gcs-store/src/locker/GCSLock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import type {RequestRelease} from '@tus/utils'
import type {Bucket} from '@google-cloud/storage'
import GCSLockFile, {type FileMetadata} from './GCSLockFile'
import debug from 'debug'

const log = debug('tus-node-server:lockers:gcs')

/**
* Handles interaction with a lock.
*/
export default class GCSLock {
protected resourceId: string
protected file: GCSLockFile
protected ttl: number
protected watchInterval: number
protected watcher: NodeJS.Timeout | undefined

constructor(
resourceId: string,
lockBucket: Bucket,
ttl: number,
watchInterval: number
) {
this.resourceId = resourceId
this.file = new GCSLockFile(lockBucket, `${resourceId}.lock`)
this.ttl = ttl
this.watchInterval = watchInterval
}

/**
* Try to create the lockfile and start the watcher. If lock is already taken, requests for release and returns FALSE.
*/
public async take(cancelHandler: RequestRelease): Promise<boolean> {
try {
//Try to create lock file
const exp = Date.now() + this.ttl
await this.file.create(exp)

//Lock acquired, start watcher
this.startWatcher(cancelHandler)

log('lock acquired and started watcher')

return true
} catch (err) {
log('failed creating lock file', err.code, err.message)
//Probably lock is already taken
const isHealthy = await this.insureHealth()

if (!isHealthy) {
log('lock not healthy. calling GCSLock.take() again')
return await this.take(cancelHandler)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the lock is unhealthy and cannot be taken again? Is access to the upload resources on GCP then taken away? Since the locker cannot ensure exclusive access, saving uploaded data to GCS should be stopped.

}
//Lock is still healthy, request release
await this.file.requestRelease()

return false
}
}

/**
* Release the lock - clear watcher and delete the file.
*/
public async release() {
//Clear watcher
clearInterval(this.watcher)

//Delete the lock file
await this.file.deleteOwn()
}

/**
* Check if the lock is healthy, delete if not.
* Returns TRUE if the lock is healthy.
*/
protected async insureHealth() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
protected async insureHealth() {
protected async ensureHealth() {

Could this be a naming mistake? "ensure" seems more appropriate than "insure".

try {
const meta = await this.file.getMeta()

if (this.hasExpired(meta)) {
//TTL expired, delete unhealthy lock
await this.file.deleteUnhealthy(meta.metageneration as number)
log('insureHealth deleted unhealthy')

return false
}
} catch (err) {
//Probably lock does not exist (anymore)
return false
}

return true
}

/**
* Start watching the lock file - keep it healthy and handle release requests.
*/
protected startWatcher(cancelHandler: RequestRelease) {
this.watcher = setInterval(() => {
log('watcher interval')
const handleError = () => {
//Probably the watched lock is freed, terminate watcher
clearInterval(this.watcher)
}

this.file.checkOwnReleaseRequest().then((shouldRelease) => {
log('watcher shouldRelease', shouldRelease)
if (shouldRelease) {
cancelHandler()
}

//Update TTL to keep the lock healthy
const exp = Date.now() + this.ttl
this.file.refreshOwn(exp).catch(handleError)
}, handleError)
}, this.watchInterval)
}

/**
* Compare lock expiration timestamp with the current time.
*/
protected hasExpired(meta: FileMetadata) {
const date = Number.parseInt(meta.metadata.exp, 10)
return !date || date < Date.now()
}
}
136 changes: 136 additions & 0 deletions packages/gcs-store/src/locker/GCSLockFile.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import type {Bucket, File, SaveOptions} from '@google-cloud/storage'
import debug from 'debug'

type MetaGeneration = string | number | undefined
export type FileMetadata = NonNullable<SaveOptions['metadata']>

const log = debug('tus-node-server:lockers:gcs')

/**
* Handles communication with GCS.
*/
export default class GCSLockFile {
/**
* Name of the file in the bucket.
*/
protected name: string
/**
* GCS File instance for the lock.
*/
protected lockFile: File
/**
* GCS File instance for release request.
*/
protected releaseFile: File
/**
* The last known metageneration of the file. If it does not match the GCS metageneration, this lockfile has been deleted and another instance has already created a new one.
*/
protected currentMetaGeneration: MetaGeneration

constructor(bucket: Bucket, name: string) {
this.name = name
this.lockFile = bucket.file(name)
this.releaseFile = bucket.file(`${name}.release`)
}
/**
* Create the lockfile with the specified exp time. Throws if the file already exists
*/
public async create(exp: number) {
const metadata = {
metadata: {exp},
// TODO: this does nothing?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this TODO comment?

cacheControl: 'no-store',
}

await this.lockFile.save('', {
preconditionOpts: {ifGenerationMatch: 0},
metadata,
})

this.currentMetaGeneration = (await this.getMeta()).metageneration
}

/**
* Fetch metadata of the lock file.
*/
public async getMeta() {
return (await this.lockFile.getMetadata())[0]
}

/**
* Refresh our own lockfile. Throws if it does not exist or the file is modified by another instance.
*/
public async refreshOwn(exp: number) {
const metadata: FileMetadata = {
exp,
}
const res = await this.lockFile.setMetadata(metadata, {
ifMetaGenerationMatch: this.currentMetaGeneration,
})
this.currentMetaGeneration = res[0].metageneration
log('updated currentMetaGeneration', this.currentMetaGeneration)
}
/**
* Check if a release request has been submitted to our own lockfile. Throws if it does not exist or the file is modified by another instance.
*/
public async checkOwnReleaseRequest() {
const meta = await this.getMeta()
if (meta.metageneration !== this.currentMetaGeneration) {
throw new Error('This lockfile has been already taken by another instance.')
}

const releaseRequestExists = (await this.releaseFile.exists())[0]
return releaseRequestExists
}

/**
* Delete our own lockfile if it still exists.
*/
public async deleteOwn() {
try {
await this.deleteReleaseRequest()
await this.lockFile.delete({ifGenerationMatch: this.currentMetaGeneration})
} catch (err) {
//Probably already deleted, no need to report
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only errors about the object not existing should be ignored. All other errors should be thrown.

}
}

/**
* Request releasing the lock from another instance.
* As metadata edits are only prohibited for the owner
* (so it can keep track of metageneration),
* we write to a separate file.
*/
public async requestRelease() {
try {
await this.releaseFile.save('', {
preconditionOpts: {ifGenerationMatch: 0},
})
log('requestRelease success')
} catch (err) {
if (err.code === 412) {
//Release file already created, no need to report
return
}
log('requestRelease error', err)
throw err
}
}

/**
* Delete the unhealthy file of a previous lock.
*/
public async deleteUnhealthy(metaGeneration: number) {
await this.deleteReleaseRequest()
await this.lockFile.delete({ifMetagenerationMatch: metaGeneration})
}

/**
* Delete release request file of the lock if exists.
*/
protected async deleteReleaseRequest() {
try {
await this.releaseFile.delete()
} catch (err) {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only errors about the object not existing should be ignored. All other errors should be thrown.

}
}
Loading
Loading