Skip to content

Commit

Permalink
refactor(fs/watcher): handle 'AbortSignal' and encoding preference
Browse files Browse the repository at this point in the history
  • Loading branch information
jwerle committed Oct 30, 2023
1 parent 3fb31ad commit 9a3d590
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 7 deletions.
6 changes: 4 additions & 2 deletions api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1025,14 +1025,15 @@ Unlinks (removes) file at `path`.
| options.signal | AbortSignal? | | true | |
| callback | function(Error?) | | false | |

## [`watch(, options, callback)`](https://github.com/socketsupply/socket/blob/master/api/fs/index.js#L860)
## [`watch(, options, callback)`](https://github.com/socketsupply/socket/blob/master/api/fs/index.js#L861)

Watch for changes at `path` calling `callback`

| Argument | Type | Default | Optional | Description |
| :--- | :--- | :---: | :---: | :--- |
| (Position 0) | string | | false | |
| options | function \| object | | true | |
| options.encoding | string | utf8 | true | |
| callback | ?function | | true | |

| Return Value | Type | Description |
Expand Down Expand Up @@ -1330,14 +1331,15 @@ External docs: https://nodejs.org/dist/latest-v20.x/docs/api/fs.html#fspromisesw
| :--- | :--- | :--- |
| Not specified | Promise<void> | |

## [`watch(, options)`](https://github.com/socketsupply/socket/blob/master/api/fs/promises.js#L496)
## [`watch(, options)`](https://github.com/socketsupply/socket/blob/master/api/fs/promises.js#L497)

Watch for changes at `path` calling `callback`

| Argument | Type | Default | Optional | Description |
| :--- | :--- | :---: | :---: | :--- |
| (Position 0) | string | | false | |
| options | function \| object | | true | |
| options.encoding | string | utf8 | true | |
| options.signal | AbortSignal | | true | |

| Return Value | Type | Description |
Expand Down
1 change: 1 addition & 0 deletions api/fs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,7 @@ export function writeFile (path, data, options, callback) {
* Watch for changes at `path` calling `callback`
* @param {string}
* @param {function|object=} [options]
* @param {string=} [options.encoding = 'utf8']
* @param {?function} [callback]
* @return {Watcher}
*/
Expand Down
1 change: 1 addition & 0 deletions api/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ export async function writeFile (path, data, options) {
* Watch for changes at `path` calling `callback`
* @param {string}
* @param {function|object=} [options]
* @param {string=} [options.encoding = 'utf8']
* @param {AbortSignal=} [options.signal]
* @return {Watcher}
*/
Expand Down
75 changes: 70 additions & 5 deletions api/fs/watcher.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
import { EventEmitter } from '../events.js'
import { AbortError } from '../errors.js'
import { rand64 } from '../crypto.js'
import { Buffer } from '../buffer.js'
import hooks from '../hooks.js'
import ipc from '../ipc.js'
import gc from '../gc.js'

/**
* Encodes filename based on encoding preference.
* @ignore
* @param {Watcher} watcher
* @param {string} filename
* @return {string|Buffer}
*/
function encodeFilename (watcher, filename) {
if (!watcher.encoding || watcher.encoding === 'utf8') {
return filename.toString()
}

if (watcher.encoding === 'buffer') {
return Buffer.from(filename.toString())
}

return filename
}

/**
* Starts the `fs.Watcher`
* @ignore
Expand All @@ -24,7 +45,7 @@ async function start (watcher) {
/**
* Internal watcher data event listeer.
* @ignore
* @param {Watcher}
* @param {Watcher} watcher
* @return {function}
*/
function listen (watcher) {
Expand All @@ -40,7 +61,7 @@ function listen (watcher) {

const { path, events } = data

watcher.emit('change', events[0], path)
watcher.emit('change', events[0], encodeFilename(watcher, path))
})
}

Expand All @@ -67,6 +88,24 @@ export class Watcher extends EventEmitter {
*/
closed = false

/**
* `true` if aborted, otherwise `false`.
* @type {boolean}
*/
aborted = false

/**
* The encoding of the `filename`
* @type {'utf8'|'buffer'}
*/
encoding = 'utf8'

/**
* A `AbortController` `AbortSignal` for async aborts.
* @type {AbortSignal?}
*/
signal = null

/**
* Internal event listener cancellation.
* @ignore
Expand All @@ -79,16 +118,36 @@ export class Watcher extends EventEmitter {
* @ignore
* @param {string} path
* @param {object=} [options]
* @param {AbortSignal=} [options.signal}
* @param {string|number|bigint=} [options.id]
* @param {string=} [options.encoding = 'utf8']
*/
constructor (path, options = {}) {
constructor (path, options = null) {
super()

this.id = options?.id || String(rand64())
this.path = path
this.signal = options?.signal || null
this.aborted = this.signal?.aborted === true
this.encoding = options?.encoding || this.encoding

gc.ref(this)

if (this.signal?.aborted) {
throw new AbortError(this.signal)
}

if (typeof this.signal?.addEventListener === 'function') {
this.signal.addEventListener('abort', async () => {
this.aborted = true
try {
await this.close()
} catch (err) {
console.warn('Failed to close fs.Watcher in AbortSignal:', err.message)
}
})
}

// internal
if (options?.start !== false) {
this.start()
Expand Down Expand Up @@ -155,14 +214,20 @@ export class Watcher extends EventEmitter {
* @return {AsyncIterator<{ eventType: string, filename: string }>}
*/
[Symbol.asyncIterator] () {
let watcher = this
return {
async next () {
if (this.closed) {
if (watcher?.aborted) {
throw new AbortError(watcher.signal)
}

if (watcher.closed) {
watcher = null
return { done: true, value: null }
}

const event = await new Promise((resolve) => {
this.once('change', (eventType, filename) => {
watcher.once('change', (eventType, filename) => {
resolve({ eventType, filename })
})
})
Expand Down
19 changes: 19 additions & 0 deletions api/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2628,7 +2628,9 @@ declare module "socket:fs/watcher" {
* @ignore
* @param {string} path
* @param {object=} [options]
* @param {AbortSignal=} [options.signal}
* @param {string|number|bigint=} [options.id]
* @param {string=} [options.encoding = 'utf8']
*/
constructor(path: string, options?: object | undefined);
/**
Expand All @@ -2647,6 +2649,21 @@ declare module "socket:fs/watcher" {
* @type {boolean}
*/
closed: boolean;
/**
* `true` if aborted, otherwise `false`.
* @type {boolean}
*/
aborted: boolean;
/**
* The encoding of the `filename`
* @type {'utf8'|'buffer'}
*/
encoding: 'utf8' | 'buffer';
/**
* A `AbortController` `AbortSignal` for async aborts.
* @type {AbortSignal?}
*/
signal: AbortSignal | null;
/**
* Internal event listener cancellation.
* @ignore
Expand Down Expand Up @@ -2831,6 +2848,7 @@ declare module "socket:fs/promises" {
* Watch for changes at `path` calling `callback`
* @param {string}
* @param {function|object=} [options]
* @param {string=} [options.encoding = 'utf8']
* @param {AbortSignal=} [options.signal]
* @return {Watcher}
*/
Expand Down Expand Up @@ -3060,6 +3078,7 @@ declare module "socket:fs/index" {
* Watch for changes at `path` calling `callback`
* @param {string}
* @param {function|object=} [options]
* @param {string=} [options.encoding = 'utf8']
* @param {?function} [callback]
* @return {Watcher}
*/
Expand Down

0 comments on commit 9a3d590

Please sign in to comment.