diff --git a/src/callback_streambuf.hpp b/src/callback_streambuf.hpp new file mode 100644 index 0000000000..2be8e48e21 --- /dev/null +++ b/src/callback_streambuf.hpp @@ -0,0 +1,47 @@ +#ifndef CALLBACK_STREAMBUF_H +#define CALLBACK_STREAMBUF_H + +#include +#include + +template +class callback_streambuf : public std::basic_streambuf +{ +public: + using base_type = std::streambuf; + using char_type = typename base_type::char_type; + using int_type = typename base_type::int_type; + + callback_streambuf(Callback callback) + : buffer_{}, + callback_(callback) + { + base_type::setp(buffer_.begin(), buffer_.end()); + } + +protected: + int sync() + { + bool ok = callback_(base_type::pbase(), + base_type::pptr() - base_type::pbase()); + base_type::setp(buffer_.begin(), buffer_.end()); + return ok ? 0 : -1; + } + + int_type overflow(int_type ch) + { + int ret = sync(); + if (ch == base_type::traits_type::eof()) + { + return ch; + } + base_type::sputc(ch); + return ret == 0 ? 0 : base_type::traits_type::eof(); + } + +private: + std::array buffer_; + Callback callback_; +}; + +#endif diff --git a/src/mapnik_image.cpp b/src/mapnik_image.cpp index 89589d7ecb..34579e0b91 100644 --- a/src/mapnik_image.cpp +++ b/src/mapnik_image.cpp @@ -28,6 +28,7 @@ #include "mapnik_color.hpp" #include "utils.hpp" +#include "callback_streambuf.hpp" #include "agg_rasterizer_scanline_aa.h" #include "agg_basics.h" @@ -82,6 +83,7 @@ void Image::Initialize(v8::Local target) { Nan::SetPrototypeMethod(lcons, "setPixel", setPixel); Nan::SetPrototypeMethod(lcons, "encodeSync", encodeSync); Nan::SetPrototypeMethod(lcons, "encode", encode); + Nan::SetPrototypeMethod(lcons, "encodeChunked", encodeChunked); Nan::SetPrototypeMethod(lcons, "view", view); Nan::SetPrototypeMethod(lcons, "saveSync", saveSync); Nan::SetPrototypeMethod(lcons, "save", save); @@ -3736,6 +3738,188 @@ void Image::EIO_AfterEncode(uv_work_t* req) delete closure; } +struct chunked_encode_image_baton_t +{ + encode_image_baton_t image_baton; + + uv_async_t async; + uv_mutex_t mutex; + + using char_type = char; + using buffer_type = std::vector; + using buffer_list_type = std::vector; + buffer_list_type buffers; + + chunked_encode_image_baton_t() + { + if (int e = uv_async_init(uv_default_loop(), &async, yield_chunk)) + { + throw std::runtime_error("Cannot create async handler"); + } + + if (int e = uv_mutex_init(&mutex)) + { + uv_close(reinterpret_cast(&async), NULL); + throw std::runtime_error("Cannot create mutex"); + } + + async.data = this; + } + + ~chunked_encode_image_baton_t() + { + uv_close(reinterpret_cast(&async), NULL); + uv_mutex_destroy(&mutex); + } + + template + bool operator()(const Char* buffer, Size size) + { + uv_mutex_lock(&mutex); + buffers.emplace_back(buffer, buffer + size); + uv_mutex_unlock(&mutex); + + if (int e = uv_async_send(&async)) + { + image_baton.error = true; + image_baton.error_name = "Cannot call async callback"; + return false; + } + + return true; + } + + static void yield_chunk(uv_async_t* handle) + { + using closure_type = chunked_encode_image_baton_t; + closure_type & closure = *reinterpret_cast(handle->data); + + buffer_list_type local_buffers; + + uv_mutex_lock(&closure.mutex); + closure.buffers.swap(local_buffers); + uv_mutex_unlock(&closure.mutex); + + Nan::HandleScope scope; + + for (auto const & buffer : local_buffers) + { + v8::Local argv[2] = { + Nan::Null(), Nan::CopyBuffer(buffer.data(), + buffer.size()).ToLocalChecked() }; + Nan::MakeCallback(Nan::GetCurrentContext()->Global(), + Nan::New(closure.image_baton.cb), 2, argv); + } + } +}; + +void Image::EIO_EncodeChunked(uv_work_t* work) +{ + using closure_type = chunked_encode_image_baton_t; + closure_type & closure = *reinterpret_cast(work->data); + try + { + callback_streambuf streambuf(closure); + std::ostream stream(&streambuf); + save_to_stream(*closure.image_baton.im->this_, + stream, closure.image_baton.format); + stream.flush(); + + uv_mutex_lock(&closure.mutex); + closure.buffers.emplace_back(); // Signalize end of stream + uv_mutex_unlock(&closure.mutex); + } + catch (std::exception const& ex) + { + closure.image_baton.error = true; + closure.image_baton.error_name = ex.what(); + } +} + +void Image::EIO_AfterEncodeChunked(uv_work_t* work, int status) +{ + using closure_type = chunked_encode_image_baton_t; + closure_type & closure = *reinterpret_cast(work->data); + + if (closure.image_baton.error) + { + v8::Local argv[1] = { + Nan::Error(closure.image_baton.error_name.c_str()) }; + Nan::MakeCallback(Nan::GetCurrentContext()->Global(), + Nan::New(closure.image_baton.cb), 1, argv); + } + else + { + closure_type::yield_chunk(&closure.async); + } + + closure.image_baton.im->Unref(); + closure.image_baton.cb.Reset(); + delete &closure; +} + +NAN_METHOD(Image::encodeChunked) +{ + Image* im = Nan::ObjectWrap::Unwrap(info.Holder()); + + std::string format = "png"; + palette_ptr palette; + + // accept custom format + if (info.Length() >= 1){ + if (!info[0]->IsString()) { + Nan::ThrowTypeError("first arg, 'format' must be a string"); + return; + } + format = TOSTR(info[0]); + } + + // options hash + if (info.Length() >= 2) { + if (!info[1]->IsObject()) { + Nan::ThrowTypeError("optional second arg must be an options object"); + return; + } + + v8::Local options = info[1].As(); + + if (options->Has(Nan::New("palette").ToLocalChecked())) + { + v8::Local format_opt = options->Get(Nan::New("palette").ToLocalChecked()); + if (!format_opt->IsObject()) { + Nan::ThrowTypeError("'palette' must be an object"); + return; + } + + v8::Local obj = format_opt.As(); + if (obj->IsNull() || obj->IsUndefined() || !Nan::New(Palette::constructor)->HasInstance(obj)) { + Nan::ThrowTypeError("mapnik.Palette expected as second arg"); + return; + } + + palette = Nan::ObjectWrap::Unwrap(obj)->palette(); + } + } + + // ensure callback is a function + v8::Local callback = info[info.Length() - 1]; + if (!callback->IsFunction()) { + Nan::ThrowTypeError("last argument must be a callback function"); + return; + } + + chunked_encode_image_baton_t *closure = new chunked_encode_image_baton_t(); + closure->image_baton.request.data = closure; + closure->image_baton.im = im; + closure->image_baton.format = format; + closure->image_baton.palette = palette; + closure->image_baton.error = false; + closure->image_baton.cb.Reset(callback.As()); + + uv_queue_work(uv_default_loop(), &closure->image_baton.request, EIO_EncodeChunked, EIO_AfterEncodeChunked); + im->Ref(); +} + /** * Get a constrained view of this image given x, y, width, height parameters. * @memberof Image diff --git a/src/mapnik_image.hpp b/src/mapnik_image.hpp index 454b58418b..04fa9812b2 100644 --- a/src/mapnik_image.hpp +++ b/src/mapnik_image.hpp @@ -29,8 +29,11 @@ class Image: public Nan::ObjectWrap { static NAN_METHOD(setPixel); static NAN_METHOD(encodeSync); static NAN_METHOD(encode); + static NAN_METHOD(encodeChunked); static void EIO_Encode(uv_work_t* req); static void EIO_AfterEncode(uv_work_t* req); + static void EIO_EncodeChunked(uv_work_t* req); + static void EIO_AfterEncodeChunked(uv_work_t* req, int status); static NAN_METHOD(setGrayScaleToAlpha); static NAN_METHOD(width); diff --git a/test/image.test.js b/test/image.test.js index 92cf5ee363..06ff18c079 100644 --- a/test/image.test.js +++ b/test/image.test.js @@ -146,6 +146,54 @@ describe('mapnik.Image ', function() { assert.equal(im.encodeSync().length, im2.encodeSync().length); }); + it('should be able to encode by chunks', function(done) { + var im = new mapnik.Image(256, 256); + assert.ok(im instanceof mapnik.Image); + + assert.equal(im.width(), 256); + assert.equal(im.height(), 256); + assert.throws(function() { im.view(); }); + + var actual_length = 0; + var chunk_count = 0; + + im.encodeChunked('png32', {}, function(err, result) { + if (err) throw err; + if (result.length == 0) { + assert.equal(1, chunk_count); + assert.equal(im.encodeSync('png32').length, actual_length); + done(); + } else { + chunk_count++; + actual_length += result.length; + } + }); + }); + + it('should be able to encode by chunks - multiple chunks', function(done) { + var im = new mapnik.Image.openSync('./test/data/images/sat_image.png'); + assert.ok(im instanceof mapnik.Image); + + assert.equal(im.width(), 75); + assert.equal(im.height(), 75); + assert.throws(function() { im.view(); }); + + var actual_length = 0; + var chunk_count = 0; + + im.encodeChunked('png32', {}, function(err, result) { + if (err) throw err; + if (result.length == 0) { + assert.equal(16, chunk_count); + assert.equal(im.encodeSync('png32').length, actual_length); + done(); + } else { + chunk_count++; + actual_length += result.length; + } + }); + }); + it('should be able to open via byte stream', function(done) { var im = new mapnik.Image(256, 256); // png