diff --git a/CMakeLists.txt b/CMakeLists.txt index 5aff6c7..de5eb60 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -79,6 +79,7 @@ add_executable(${PROJECT_NAME} basic/uuid.cpp basic/dictionary.cpp basic/fdbuf.cpp + basic/workerthread.cpp web/httpserver.cpp web/webpage.cpp web/errorpage.cpp diff --git a/basic/workerthread.cpp b/basic/workerthread.cpp new file mode 100644 index 0000000..15aabdb --- /dev/null +++ b/basic/workerthread.cpp @@ -0,0 +1,88 @@ +/* +AirSane Imaging Daemon +Copyright (C) 2018-2023 Simul Piscator + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ +#include "workerthread.h" + +#include +#include +#include +#include +#include + +struct WorkerThread::Private +{ + std::mutex mMutex; + std::condition_variable mThreadCondition, mExecuteCondition; + Callable* mpCallable = nullptr; + bool mCallDone = false; + bool mStarted = false; + bool mTerminate = false; + + std::thread mThread; + void threadFunc(); +}; + +WorkerThread::WorkerThread() + : p(new Private) +{ + std::unique_lock lock(p->mMutex); + p->mStarted = false; + p->mThread = std::thread([this](){ p->threadFunc(); }); + p->mThreadCondition.wait(lock, [this](){ return p->mStarted; }); +} + +WorkerThread::~WorkerThread() +{ + std::unique_lock lock(p->mMutex); + p->mTerminate = true; + lock.unlock(); + p->mExecuteCondition.notify_one(); + if (p->mThread.joinable()) + p->mThread.join(); + delete p; +} + +void WorkerThread::executeSynchronously(Callable& c) +{ + std::unique_lock lock(p->mMutex); + assert(p->mpCallable == nullptr); + p->mpCallable = &c; + p->mCallDone = false; + p->mExecuteCondition.notify_one(); + p->mThreadCondition.wait(lock, [this](){ return p->mCallDone; }); +} + +void WorkerThread::Private::threadFunc() +{ + std::unique_lock lock(mMutex); + mStarted = true; + lock.unlock(); + mThreadCondition.notify_one(); + while (!mTerminate) + { + std::unique_lock lock(mMutex); + mExecuteCondition.wait(lock, [this](){ return mTerminate || mpCallable; }); + if (mpCallable) { + assert(!mCallDone); + mpCallable->onCall(); + mpCallable = nullptr; + mCallDone = true; + lock.unlock(); + mThreadCondition.notify_one(); + } + } +} diff --git a/basic/workerthread.h b/basic/workerthread.h new file mode 100644 index 0000000..a530550 --- /dev/null +++ b/basic/workerthread.h @@ -0,0 +1,43 @@ +/* +AirSane Imaging Daemon +Copyright (C) 2018-2023 Simul Piscator + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ +#ifndef WORKER_THREAD_H +#define WORKER_THREAD_H + +class WorkerThread +{ +public: + WorkerThread(); + ~WorkerThread(); + + WorkerThread(const WorkerThread&) = delete; + WorkerThread& operator=(const WorkerThread&) = delete; + + struct Callable + { + virtual ~Callable() {} + virtual void onCall() = 0; + }; + void executeSynchronously(Callable&); + +private: + struct Private; + Private* p; +}; + + +#endif // WORKER_THREAD_H diff --git a/server/scanjob.cpp b/server/scanjob.cpp index f15383d..72ab331 100644 --- a/server/scanjob.cpp +++ b/server/scanjob.cpp @@ -22,6 +22,7 @@ along with this program. If not, see . #include "imageformats/pngencoder.h" #include "scanner.h" #include "web/httpserver.h" +#include "basic/workerthread.h" #include #include @@ -104,21 +105,29 @@ struct ScanJob::Private Scanner* mpScanner; std::string mUuid; - ::time_t mCreated, mLastActive; + std::atomic<::time_t> mCreated, mLastActive; std::atomic mState; std::atomic mStateReason; - SANE_Status mAdfStatus; + std::atomic mAdfStatus; std::string mScanSource, mIntent, mDocumentFormat, mColorMode; int mBitDepth, mRes_dpi; bool mColorScan; double mLeft_px, mTop_px, mWidth_px, mHeight_px; - int mKind, mImagesCompleted; + std::atomic mKind, mImagesCompleted; std::shared_ptr mpSession; OptionsFile::Options mDeviceOptions; std::vector mGammaTable; + + // We need a job-permanent worker thread to execute + // beginTransfer() and finishTransfer(). + // If these functions are called from two different + // threads (e.g., requests for NextDocument), we get + // into difficulties because backends are not required + // to be thread safe. + WorkerThread mWorkerThread; }; ScanJob::ScanJob(Scanner* scanner, const std::string& uuid) @@ -126,7 +135,7 @@ ScanJob::ScanJob(Scanner* scanner, const std::string& uuid) { p->mpScanner = scanner; p->mCreated = ::time(nullptr); - p->mLastActive = p->mCreated; + p->mLastActive = p->mCreated.load(); p->mUuid = uuid; p->mState = pending; p->mStateReason = PWG_NONE; @@ -543,7 +552,18 @@ ScanJob::writeJobInfoXml(std::ostream& os) const bool ScanJob::beginTransfer() { - return p->beginTransfer(); + struct : WorkerThread::Callable + { + void onCall() override + { + result = p->beginTransfer(); + } + bool result = false; + Private* p = nullptr; + } functionCall; + functionCall.p = p; + p->mWorkerThread.executeSynchronously(functionCall); + return functionCall.result; } bool @@ -629,7 +649,18 @@ ScanJob::Private::closeSession() ScanJob& ScanJob::finishTransfer(std::ostream& os) { - p->finishTransfer(os); + struct : WorkerThread::Callable + { + void onCall() override + { + p->finishTransfer(*pOs); + } + Private* p = nullptr; + std::ostream* pOs = nullptr; + } functionCall; + functionCall.p = p; + functionCall.pOs = &os; + p->mWorkerThread.executeSynchronously(functionCall); return *this; }