Skip to content

Commit

Permalink
Merge pull request #13 from Aircoookie/response_queue
Browse files Browse the repository at this point in the history
Add request queue with limits
  • Loading branch information
willmmiles authored Sep 14, 2024
2 parents 5d1f89a + 490cb87 commit d274e92
Show file tree
Hide file tree
Showing 8 changed files with 581 additions and 127 deletions.
4 changes: 2 additions & 2 deletions library.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
"platforms": "espressif8266"
},
{
"owner": "me-no-dev",
"owner": "willmmiles",
"name": "AsyncTCP",
"version": "^1.1.1",
"version": "^1.3.0",
"platforms": "espressif32"
},
{
Expand Down
12 changes: 12 additions & 0 deletions src/DynamicBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ DynamicBuffer::DynamicBuffer(SharedBuffer&& b) : _data(nullptr), _len(0) {
if (b) *this = std::move(*b._buf);
}

size_t DynamicBuffer::resize(size_t s) {
if (_len != s) {
auto next = realloc(_data, s);
if (next) {
_data = reinterpret_cast<char*>(next);
_len = s;
}
}

return _len;
}

String toString(DynamicBuffer buf) {
auto dbstr = DynamicBufferString(std::move(buf));
return std::move(*static_cast<String*>(&dbstr)); // Move-construct the result string from dbstr
Expand Down
99 changes: 93 additions & 6 deletions src/DynamicBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,33 @@ class DynamicBuffer {
DynamicBuffer() : _data(nullptr), _len(0) {};
explicit DynamicBuffer(size_t len) : _data(len ? reinterpret_cast<char*>(malloc(len)): nullptr), _len(_data ? len : 0) {};
DynamicBuffer(const char* buf, size_t len) : DynamicBuffer(len) { if (_data) memcpy(_data, buf, len); };
explicit DynamicBuffer(const String& s) : DynamicBuffer(s.begin(), s.length()) {};
explicit DynamicBuffer(String&&); // Move string contents in to buffer if possible
DynamicBuffer(const SharedBuffer&);
DynamicBuffer(SharedBuffer&&);

~DynamicBuffer() { clear(); };

// Move
DynamicBuffer(DynamicBuffer&& d) : _data(d._data), _len(d._len) { d._data = nullptr; d._len = 0; };
DynamicBuffer& operator=(DynamicBuffer&& d) { std::swap(_data, d._data); std::swap(_len, d._len); return *this; };
DynamicBuffer(SharedBuffer&&); // Move data, leaving shared buffer empty
explicit DynamicBuffer(String&&); // Move string contents in to buffer if possible

// Copy
DynamicBuffer(const DynamicBuffer& d) : DynamicBuffer(d._data, d._len) {}; // copy
DynamicBuffer& operator=(const DynamicBuffer& d) { *this = DynamicBuffer(d); return *this; }; // use move to copy

DynamicBuffer(const SharedBuffer&); // Copy data
explicit DynamicBuffer(const String& s) : DynamicBuffer(s.begin(), s.length()) {};

// Accessors
char* data() const { return _data; };
size_t size() const { return _len; };
char& operator[](ptrdiff_t p) const { return *(_data + p); };

explicit operator bool() const { return (_data != nullptr) && (_len > 0); }

// Release the buffer without freeing it
char* release() { char* temp = _data; _data = nullptr; _len = 0; return temp; }

// TODO, if it ever matters - resizing
// Resize the buffer. Returns new size on success, current size on failure.
size_t resize(size_t);
};

// Same interface as DynamicBuffer, but with shared_ptr semantics: buffer is held until last copy releases it.
Expand All @@ -67,6 +70,7 @@ class SharedBuffer {

char* data() const { return _buf ? _buf->data() : nullptr; };
size_t size() const { return _buf ? _buf->size() : 0U; };
char& operator[](ptrdiff_t p) const { return *(data() + p); };
void clear() { _buf.reset(); };

explicit operator bool() const { return _buf && *_buf; };
Expand Down Expand Up @@ -150,3 +154,86 @@ class BufferListPrint : public Print {
typedef BufferListPrint<DynamicBufferList> DynamicBufferListPrint;
typedef BufferListPrint<SharedBufferList> SharedBufferListPrint;


// Walkable buffer
// A buffer class that permits "consuming" data from either end, adjusting data() and size() to match
template<typename buffer_type>
class Walkable
{
buffer_type _buf;
size_t _left, _right;

public:
Walkable() : _left(0), _right(0) {};
explicit Walkable(size_t len) : _buf(len), _left(0), _right(0) {};
Walkable(const char* buf, size_t len) : _buf(buf, len), _left(0), _right(0) {};
Walkable(buffer_type&& buf) : _buf(std::move(buf)), _left(0), _right(0) {};
explicit Walkable(const String& s) : _buf(s), _left(0), _right(0) {};
explicit Walkable(String&& s) : _buf(std::move(s)), _left(0), _right(0) {};

// Accessors
// Buffer interface
char* data() const { return _buf.data() + _left; };
size_t size() const { return _buf.size() - (_left + _right); };
size_t capacity() const { return _buf.size(); }; // for similarity with STL types
explicit operator bool() const { return (buffer_type::data() != nullptr) && (size() > 0); }
char& operator[](ptrdiff_t p) const { return *(data() + p); };
void clear() { _buf.clear(); reset(); };

// Raw interface
const buffer_type& buffer() const { return _buf; };
size_t offset() const { return _left; }
size_t roffset() const { return _right; }

// Modifiers
void reset() { _left = _right = 0; }; // Reset the walking counters
void advance(ptrdiff_t count) { // Consume some data from the left hand side
if (count > 0) {
_left = std::min(_left+count, _buf.size() - _right);
} else {
if (abs(count) <= _left) {
_left += count;
} else {
_left = 0; // do not wrap
}
}
}
void radvance(ptrdiff_t count) { // Consume some data from the right hand side
if (count > 0) {
_right = std::min(_right+count, _buf.size() - _left);
} else {
if (abs(count) <= _right) {
_right += count;
} else {
_right = 0; // do not wrap
}
}
}

// Contract buffer to specified size
size_t resize(size_t s) {
auto bs = _buf.size() - _left;
_right = s <= bs ? (bs - s) : 0U;
return size();
}

// Resize the underlying buffer storage, preserving contents
// Returns new size on success, current size on failure.
size_t reallocate(size_t s) {
if (s <= size()) {
auto new_buf = buffer_type(data(), s);
if (new_buf) {
_buf = std::move(new_buf);
reset();
}
} else {
auto new_buf = buffer_type(s);
if (new_buf) {
memcpy(new_buf.data(), data(), size());
_buf = std::move(new_buf);
reset();
}
}
return _buf.size();
}
};
45 changes: 43 additions & 2 deletions src/ESPAsyncWebServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
#include "StringArray.h"

#ifdef ESP32
#include <mutex>
#include <WiFi.h>
#include <AsyncTCP.h>
#define ASYNCWEBSERVER_NEEDS_MUTEX
#elif defined(ESP8266)
#include <ESP8266WiFi.h>
#include <ESPAsyncTCP.h>
Expand Down Expand Up @@ -195,6 +197,9 @@ class AsyncWebServerRequest {
void _parsePlainPostChar(uint8_t data);
void _parseMultipartPostByte(uint8_t data, bool last);
void _addGetParams(const String& params);

void _requestReady();
void _handleRequest(); // called when the queue permits this request to run

void _handleUploadStart();
void _handleUploadByte(uint8_t data, bool last);
Expand Down Expand Up @@ -253,6 +258,8 @@ class AsyncWebServerRequest {
AsyncWebServerResponse *beginResponse_P(int code, const String& contentType, const uint8_t * content, size_t len, AwsTemplateProcessor callback=nullptr);
AsyncWebServerResponse *beginResponse_P(int code, const String& contentType, PGM_P content, AwsTemplateProcessor callback=nullptr);

void deferResponse(); // Move to the back of the queue

size_t headers() const; // get header count
bool hasHeader(const String& name) const; // check if header exists
bool hasHeader(const __FlashStringHelper * data) const; // check if header exists
Expand Down Expand Up @@ -369,6 +376,7 @@ class AsyncWebServerResponse {
size_t _writtenLength; // size of data written to client
WebResponseState _state;
static const __FlashStringHelper* _responseCodeToString(int code);
friend class AsyncWebServer;

public:
AsyncWebServerResponse();
Expand All @@ -386,6 +394,20 @@ class AsyncWebServerResponse {
virtual size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time);
};

/*
* Queue limit structure for Server
*
* Any value set to 0 indicates no limit.
* */
struct AsyncWebServerQueueLimits {
// Count limits
size_t nParallel; // Permit up to this number of active parallel requests.
size_t nMax; // Permit up to this number of active + queued requests - send 503 otherwise.
// Heap limits
size_t queueHeapRequired; // Require at least this much free heap before queuing a new request, otherwise send a 503.
size_t requestHeapRequired; // Require at least this much free heap before handling a new request, except if no requests are active.
};

/*
* SERVER :: One instance
* */
Expand All @@ -396,14 +418,22 @@ typedef std::function<void(AsyncWebServerRequest *request, uint8_t *data, size_t

class AsyncWebServer {
protected:
AsyncWebServerQueueLimits _queueLimits;
AsyncServer _server;
LinkedList<AsyncWebRewrite*> _rewrites;
LinkedList<AsyncWebHandler*> _handlers;
LinkedList<AsyncWebHandler*> _handlers;
AsyncCallbackWebHandler* _catchAllHandler;

#ifdef ASYNCWEBSERVER_NEEDS_MUTEX
std::mutex _mutex;
#endif
LinkedList<AsyncWebServerRequest*> _requestQueue;
bool _queueActive;

public:
AsyncWebServer(IPAddress addr, uint16_t port);
AsyncWebServer(uint16_t port);
AsyncWebServer(IPAddress addr, uint16_t port, const AsyncWebServerQueueLimits& limits);
AsyncWebServer(uint16_t port, const AsyncWebServerQueueLimits& limits);
~AsyncWebServer();

void begin();
Expand Down Expand Up @@ -433,10 +463,21 @@ class AsyncWebServer {
void onRequestBody(ArBodyHandlerFunction fn); //handle posts with plain body content (JSON often transmitted this way as a request)

void reset(); //remove all writers and handlers, with onNotFound/onFileUpload/onRequestBody

// Queue interface
size_t numClients(); // Number of active clients, active and pending
size_t queueLength(); // Number of queued clients
const AsyncWebServerQueueLimits& getQueueLimits() { return _queueLimits; };
void setQueueLimits(const AsyncWebServerQueueLimits& limits);
void printStatus(Print&); // Write queue status in human-readable format
void processQueue(); // Consider the current queue state against the limits; may retry deferred handlers.

void _handleDisconnect(AsyncWebServerRequest *request);
void _attachHandler(AsyncWebServerRequest *request);
void _rewriteRequest(AsyncWebServerRequest *request);

void _dequeue(AsyncWebServerRequest *request);
void _defer(AsyncWebServerRequest *request);
};

class DefaultHeaders {
Expand Down
Loading

0 comments on commit d274e92

Please sign in to comment.