Skip to content

Commit

Permalink
Add background thread to purge jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
SimulPiscator committed Oct 22, 2023
1 parent 020c94a commit f07dcf9
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 19 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ add_executable(${PROJECT_NAME}
server/server.cpp
server/mainpage.cpp
server/scanner.cpp
server/purgethread.cpp
server/scanjob.cpp
server/scannerpage.cpp
sanecpp/sanecpp.cpp
Expand Down
93 changes: 93 additions & 0 deletions server/purgethread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
AirSane Imaging Daemon
Copyright (C) 2018-2023 Simul Piscator
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#include "purgethread.h"

#include <thread>
#include <unistd.h>
#include <sys/select.h>

struct PurgeThread::Private
{
const ScannerList* mpScanners;
std::thread* mpThread;
int mWriteFd, mReadFd;
int mSleepDuration, mMaxTime;

void start();
void terminate();
void threadFunc();
bool interruptibleSleep(int seconds);
};

void PurgeThread::Private::start()
{
int fds[2] = {0};
::pipe(fds);
mReadFd = fds[0];
mWriteFd = fds[1];
mpThread = new std::thread([this]{threadFunc();});
}

void PurgeThread::Private::terminate()
{
if (mpThread->joinable()) {
char c = 'x';
::write(mWriteFd, &c, 1);
mpThread->join();
}
::close(mReadFd);
::close(mWriteFd);
}

void PurgeThread::Private::threadFunc()
{
while (interruptibleSleep(mSleepDuration)) {
std::clog << "purging jobs with timeout of " << mMaxTime << "seconds";
for (const auto& entry : *mpScanners)
entry.pScanner->purgeJobs(mMaxTime);
}
}

bool PurgeThread::Private::interruptibleSleep(int seconds)
{
fd_set readSet;
FD_ZERO(&readSet);
FD_SET(mReadFd, &readSet);
struct timeval timeout = { seconds, 0 };
int count = 0;
do {
count = ::select(mReadFd + 1, &readSet, nullptr, nullptr, &timeout);
} while (count < 0 && errno == EINTR);
return count == 0;
}

PurgeThread::PurgeThread(const ScannerList& scanners, int sleepDuration, int maxTime)
: p(new Private)
{
p->mpScanners = &scanners;
p->mSleepDuration = sleepDuration;
p->mMaxTime = maxTime;
p->start();
}

PurgeThread::~PurgeThread()
{
p->terminate();
delete p;
}
38 changes: 38 additions & 0 deletions server/purgethread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
AirSane Imaging Daemon
Copyright (C) 2018-2023 Simul Piscator
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#ifndef PURGETHREAD_H
#define PURGETHREAD_H

#include "server.h"

class PurgeThread
{
PurgeThread(const Scanner&) = delete;
PurgeThread& operator=(const PurgeThread&) = delete;

public:
PurgeThread(const ScannerList&, int sleepDuration, int maxTime);
~PurgeThread();

private:
struct Private;
Private* p;
};

#endif // PURGETHREAD_H
18 changes: 8 additions & 10 deletions server/scanjob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ struct ScanJob::Private
Scanner* mpScanner;

std::string mUuid;
::time_t mCreated;
::time_t mCreated, mLastActive;
std::atomic<State> mState;
std::atomic<const char*> mStateReason;
SANE_Status mAdfStatus;
Expand All @@ -114,7 +114,7 @@ struct ScanJob::Private
bool mColorScan;
double mLeft_px, mTop_px, mWidth_px, mHeight_px;

int mKind, mImagesToTransfer, mImagesCompleted;
int mKind, mImagesCompleted;
std::shared_ptr<sanecpp::session> mpSession;

OptionsFile::Options mDeviceOptions;
Expand All @@ -126,6 +126,7 @@ ScanJob::ScanJob(Scanner* scanner, const std::string& uuid)
{
p->mpScanner = scanner;
p->mCreated = ::time(nullptr);
p->mLastActive = p->mCreated;
p->mUuid = uuid;
p->mState = pending;
p->mStateReason = PWG_NONE;
Expand All @@ -151,9 +152,9 @@ ScanJob::ageSeconds() const
}

int
ScanJob::imagesToTransfer() const
ScanJob::idleSeconds() const
{
return p->mImagesToTransfer;
return ::time(nullptr) - p->mLastActive;
}

int
Expand Down Expand Up @@ -253,18 +254,15 @@ ScanJob::Private::init(const ScanSettingsXml& settings, bool autoselectFormat, c
mDocumentFormat = HttpServer::MIME_TYPE_PNG;
std::clog << "document format used: " << mDocumentFormat << "\n";

mImagesToTransfer = 1;
mImagesCompleted = 0;

std::string inputSource = settings.getString("InputSource");
if (inputSource == "Platen") {
mScanSource = mpScanner->platenSourceName();
mImagesToTransfer = 1;
mKind = single;
}
else if (inputSource == "Feeder") {
mScanSource = mpScanner->adfSourceName();
mImagesToTransfer = std::numeric_limits<int>::max();
double concatIfPossible = settings.getNumber("ConcatIfPossible");
if (concatIfPossible == 1.0 && mDocumentFormat == HttpServer::MIME_TYPE_PDF)
mKind = adfConcat;
Expand Down Expand Up @@ -508,9 +506,6 @@ ScanJob::writeJobInfoXml(std::ostream& os) const
"<pwg:JobState>"
<< p->statusString()
<< "</pwg:JobState>\r\n"
"<pwg:ImagesToTransfer>"
<< p->mImagesToTransfer
<< "</pwg:ImagesToTransfer>\r\n"
"<pwg:ImagesCompleted>"
<< p->mImagesCompleted
<< "</pwg:ImagesCompleted>\r\n"
Expand Down Expand Up @@ -618,6 +613,7 @@ ScanJob::finishTransfer(std::ostream& os)
void
ScanJob::Private::finishTransfer(std::ostream& os)
{
mLastActive = ::time(nullptr);
std::shared_ptr<ImageEncoder> pEncoder;
if (isProcessing()) {
if (mDocumentFormat == HttpServer::MIME_TYPE_JPEG) {
Expand Down Expand Up @@ -673,6 +669,7 @@ ScanJob::Private::finishTransfer(std::ostream& os)
}
}
while (isProcessing()) {
mLastActive = ::time(nullptr);
std::vector<char> buffer(mpSession->parameters()->bytes_per_line);
SANE_Status status = SANE_STATUS_GOOD;
while (status == SANE_STATUS_GOOD && os && isProcessing()) {
Expand Down Expand Up @@ -705,6 +702,7 @@ ScanJob::Private::finishTransfer(std::ostream& os)
}
if (pEncoder)
pEncoder->endDocument();
mLastActive = ::time(nullptr);
}

ScanJob&
Expand Down
2 changes: 1 addition & 1 deletion server/scanjob.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ScanJob
int kind() const;

int ageSeconds() const;
int imagesToTransfer() const;
int idleSeconds() const;
int imagesCompleted() const;
std::string uri() const;
const std::string& uuid() const;
Expand Down
4 changes: 2 additions & 2 deletions server/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -914,12 +914,12 @@ Scanner::cancelJob(const std::string& uuid)
}

int
Scanner::purgeJobs(int maxAgeSeconds)
Scanner::purgeJobs(int maxIdleSeconds)
{
int n = 0;
std::lock_guard<std::mutex> lock(p->mJobsMutex);
for (auto i = p->mJobs.begin(); i != p->mJobs.end();) {
if (i->second->ageSeconds() > maxAgeSeconds) {
if (i->second->idleSeconds() > maxIdleSeconds) {
i = p->mJobs.erase(i);
++n;
} else {
Expand Down
29 changes: 23 additions & 6 deletions server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "optionsfile.h"
#include "scanjob.h"
#include "scanner.h"
#include "purgethread.h"
#include "basic/uuid.h"
#include "zeroconf/hotplugnotifier.h"
#include "web/accessfile.h"
Expand Down Expand Up @@ -89,12 +90,15 @@ Server::Server(int argc, char** argv)
, mHotplug(true)
, mRandompaths(false)
, mCompatiblepath(false)
, mJobtimeout(0)
, mPurgeinterval(0)
, mStartupTimeSeconds(0)
, mDoRun(true)
{
std::string port, interface, unixsocket, accesslog, hotplug, announce,
webinterface, resetoption, discloseversion, localonly, optionsfile,
ignorelist, accessfile, randompaths, compatiblepath, debug, announcesecure;
ignorelist, accessfile, randompaths, compatiblepath, debug, announcesecure,
jobtimeout, purgeinterval;
struct
{
const std::string name, def, info;
Expand All @@ -112,10 +116,9 @@ Server::Server(int argc, char** argv)
{ "disclose-version", "true", "disclose version information in web interface", discloseversion },
{ "random-paths", "false", "prepend a random uuid to scanner paths", randompaths },
{ "compatible-path", "true", "use /eSCL as path for first scanner", compatiblepath },
{ "local-scanners-only",
"false",
"ignore SANE network scanners",
localonly },
{ "local-scanners-only", "false", "ignore SANE network scanners", localonly },
{ "job-timeout", "120", "timeout for idle jobs (seconds)", jobtimeout },
{ "purge-interval", "5", "how often job lists are purged (seconds)", purgeinterval },
{ "options-file",
#ifdef __FreeBSD__
"/usr/local/etc/airsane/options.conf",
Expand Down Expand Up @@ -189,6 +192,17 @@ Server::Server(int argc, char** argv)
std::cerr << "invalid port number: " << port << std::endl;
mDoRun = false;
}
if (!(std::istringstream(jobtimeout) >> mJobtimeout) || mJobtimeout < 1) {
std::cerr << "invalid job timeout: " << mJobtimeout << std::endl;
mDoRun = false;
}
if (!(std::istringstream(purgeinterval) >> mPurgeinterval) || mPurgeinterval < 1) {
std::cerr << "invalid purge interval: " << mPurgeinterval << std::endl;
mDoRun = false;
}
if (mJobtimeout <= mPurgeinterval) {
std::cerr << "job timeout must be greater than purge interval" << std::endl;
}
if (help) {
std::cout << "options, and their defaults, are:\n";
for (auto& opt : options)
Expand Down Expand Up @@ -307,7 +321,10 @@ Server::run()
mStartupTimeSeconds = t1 - t0;
std::clog << "startup took " << mStartupTimeSeconds << " secconds" << std::endl;

ok = HttpServer::run();
{
PurgeThread purgethread(mScanners, mPurgeinterval, mJobtimeout);
ok = HttpServer::run();
}
mScanners.clear();
if (ok && terminationStatus() == SIGHUP) {
std::clog << "received SIGHUP, reloading" << std::endl;
Expand Down
1 change: 1 addition & 0 deletions server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Server : public HttpServer
bool mAnnounce, mWebinterface, mResetoption, mDiscloseversion,
mLocalonly, mHotplug, mRandompaths, mCompatiblepath, mAnnouncesecure;
std::string mOptionsfile, mAccessfile, mIgnorelist;
int mJobtimeout, mPurgeinterval;
float mStartupTimeSeconds;
bool mDoRun;
};
Expand Down

0 comments on commit f07dcf9

Please sign in to comment.