Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: async flush and close #182

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions deps/mpg123/src/output/win32.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,6 @@ static void flush_win32(struct audio_output_struct *ao)
/* If WHDR_PREPARED is not set, this is (potentially) a partial buffer */
if (!(hdr->dwFlags & WHDR_PREPARED))
hdr->dwBufferLength = 0;

/* Finish processing the buffers */
drain_win32(ao);
}

/* output final buffer (if any) */
Expand Down
2 changes: 1 addition & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ declare class Speaker extends Writable {
*
* @param flush Defaults to `true`.
*/
public close(flush: boolean): string;
public close(flush: boolean): void;

/**
* Returns the `MPG123_ENC_*` constant that corresponds to the given "format"
Expand Down
56 changes: 39 additions & 17 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class Speaker extends Writable {
if (!opts) opts = {}
if (opts.lowWaterMark == null) opts.lowWaterMark = 0
if (opts.highWaterMark == null) opts.highWaterMark = 0
// emit close manually
opts.emitClose = false

super(opts)

Expand All @@ -41,12 +43,14 @@ class Speaker extends Writable {
// flipped after close() is called, no write() calls allowed after
this._closed = false

// whether native binding is closing
this._closing = false

// set PCM format
this._format(opts)

// bind event listeners
this._format = this._format.bind(this)
this.on('finish', this._flush)
this.on('pipe', this._pipe)
this.on('unpipe', this._unpipe)
}
Expand Down Expand Up @@ -253,10 +257,13 @@ class Speaker extends Writable {
* @api private
*/

_flush () {
debug('_flush()')
_final (callback) {
debug('_final()')
this.emit('flush')
this.close(false)
this._close(false)
.then(() => callback())
.catch(error => callback(error))
.finally(() => debug('_final() end'))
}

/**
Expand All @@ -270,25 +277,40 @@ class Speaker extends Writable {

close (flush) {
debug('close(%o)', flush)
this._close(flush)
.catch(console.error)
.finally(() => debug('close() end'))
}

/**
* Flush audio backend and close backend
*
* @param {Boolean} flush - if `false`, then don't call the `flush()` native binding call.`.
*/
async _close (flush) {
if (this._closed) return debug('already closed...')

if (this.audio_handle) {
if (flush !== false) {
// TODO: async most likely…
debug('invoking flush() native binding')
binding.flush(this.audio_handle)
}
if (!this.audio_handle) {
debug('not invoking flush() or close() bindings since no `audio_handle`')
this._closed = true
this.emit('close')
return
}

if (flush !== false) {
debug('invoking flush() native binding')
await binding.flush(this.audio_handle)
}

// TODO: async maybe?
if (!this._closing) {
this._closing = true
debug('invoking close() native binding')
binding.close(this.audio_handle)
await binding.close(this.audio_handle)
this.audio_handle = null
} else {
debug('not invoking flush() or close() bindings since no `audio_handle`')
this._closed = true
this.emit('close')
debug('closed')
}

this._closed = true
this.emit('close')
}
}

Expand Down
98 changes: 78 additions & 20 deletions src/binding.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ typedef struct {
napi_deferred deferred;
} WriteData;

typedef struct {
audio_output_t *ao;
napi_deferred deferred;
napi_async_work work;
} FlushData;

typedef struct {
audio_output_t *ao;
int error;
napi_deferred deferred;
napi_async_work work;
} CloseData;

bool is_string(napi_env env, napi_value value) {
napi_valuetype valuetype;
assert(napi_typeof(env, value, &valuetype) == napi_ok);
Expand Down Expand Up @@ -126,6 +139,20 @@ napi_value speaker_write(napi_env env, napi_callback_info info) {
return promise;
}

void flush_execute(napi_env env, void* _data) {
FlushData* data = _data;
data->ao->flush(data->ao);
}

void flush_complete(napi_env env, napi_status status, void* _data) {
FlushData* data = _data;
napi_value undefined;
assert(napi_get_undefined(env, &undefined) == napi_ok);
assert(napi_resolve_deferred(env, data->deferred, undefined) == napi_ok);
assert(napi_delete_async_work(env, data->work) == napi_ok);
free(_data);
}

napi_value speaker_flush(napi_env env, napi_callback_info info) {
size_t argc = 1;
napi_value args[1];
Expand All @@ -135,9 +162,47 @@ napi_value speaker_flush(napi_env env, napi_callback_info info) {
assert(napi_unwrap(env, args[0], (void**) &speaker) == napi_ok);
audio_output_t *ao = &speaker->ao;

/* TODO: async */
ao->flush(ao);
return NULL;
napi_value promise;
FlushData* data = malloc(sizeof(FlushData));
data->ao = ao;
assert(napi_create_promise(env, &data->deferred, &promise) == napi_ok);

napi_value work_name;
assert(napi_create_string_utf8(env, "speaker:flush", NAPI_AUTO_LENGTH, &work_name) == napi_ok);
assert(napi_create_async_work(env, NULL, work_name, flush_execute, flush_complete, (void*) data, &data->work) == napi_ok);
assert(napi_queue_async_work(env, data->work) == napi_ok);

return promise;
}

void close_execute(napi_env env, void* _data) {
CloseData* data = _data;
int r = data->ao->close(data->ao);
if (r != 0) {
data->error = 1;
return;
}
if (data->ao->deinit) {
int r = data->ao->deinit(data->ao);
if (r != 0) {
data->error = 1;
}
}
}

void close_complete(napi_env env, napi_status status, void* _data) {
CloseData* data = _data;
if (data->error == 0) {
napi_value undefined;
assert(napi_get_undefined(env, &undefined) == napi_ok);
assert(napi_resolve_deferred(env, data->deferred, undefined) == napi_ok);
} else {
napi_value error;
assert(napi_create_string_utf8(env, "Close failed", NAPI_AUTO_LENGTH, &error) == napi_ok);
assert(napi_reject_deferred(env, data->deferred, error) == napi_ok);
}
assert(napi_delete_async_work(env, data->work) == napi_ok);
free(_data);
}

napi_value speaker_close(napi_env env, napi_callback_info info) {
Expand All @@ -149,25 +214,18 @@ napi_value speaker_close(napi_env env, napi_callback_info info) {
assert(napi_unwrap(env, args[0], (void**) &speaker) == napi_ok);
audio_output_t *ao = &speaker->ao;

int r = ao->close(ao);

if (r != 0) {
napi_throw_error(env, "ERR_CLOSE", "Failed to initialize output device");
goto cleanup;
}

if (ao->deinit) {
int r = ao->deinit(ao);
napi_value promise;
CloseData* data = malloc(sizeof(CloseData));
data->ao = ao;
data->error = 0;
assert(napi_create_promise(env, &data->deferred, &promise) == napi_ok);

if (r != 0) {
napi_throw_error(env, "ERR_CLOSE", "Failed to initialize output device");
goto cleanup;
}
}
napi_value work_name;
assert(napi_create_string_utf8(env, "speaker:close", NAPI_AUTO_LENGTH, &work_name) == napi_ok);
assert(napi_create_async_work(env, NULL, work_name, close_execute, close_complete, (void*) data, &data->work) == napi_ok);
assert(napi_queue_async_work(env, data->work) == napi_ok);

cleanup:
free(speaker->device);
return NULL;
return promise;
}

int get_formats() {
Expand Down