Skip to content

Commit

Permalink
Merge pull request #1204 from trapexit/gc
Browse files Browse the repository at this point in the history
Add manual GC triggering + configurable process queue depth
  • Loading branch information
trapexit authored Jun 27, 2023
2 parents 42836e0 + 5ab0fbc commit 7cdd12b
Show file tree
Hide file tree
Showing 27 changed files with 1,442 additions and 345 deletions.
44 changes: 43 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,11 @@ These options are the same regardless of whether you use them with the
`read-thread-count` refers to the number of threads reading FUSE
messages which are dispatched to process threads. -1 means disabled
otherwise acts like `read-thread-count`. (default: -1)
* **process-thread-queue-depth=UINT**: Sets the number of requests any
single process thread can have queued up at one time. Meaning the
total memory usage of the queues is queue depth multiplied by the
number of process threads plus read thread count. 0 sets the depth
to the same as the process thread count. (default: 0)
* **pin-threads=STR**: Selects a strategy to pin threads to CPUs
(default: unset)
* **scheduling-priority=INT**: Set mergerfs' scheduling
Expand Down Expand Up @@ -1123,7 +1128,9 @@ issue: `umount -l <mergerfs_mountpoint>`. Or you can let mergerfs do
it by setting the option `lazy-umount-mountpoint=true`.


# RUNTIME CONFIG
# RUNTIME INTERFACES

## RUNTIME CONFIG

#### .mergerfs pseudo file ####

Expand Down Expand Up @@ -1206,6 +1213,41 @@ following:
* **user.mergerfs.allpaths**: a NUL ('\0') separated list of full paths to all files found


## SIGNALS

* USR1: This will cause mergerfs to send invalidation notifications to
the kernel for all files. This will cause all unused files to be
released from memory.
* USR2: Trigger a general cleanup of currently unused memory. A more
thorough version of what happens every ~15 minutes.


## IOCTLS

Found in `fuse_ioctl.cpp`:

```C++
typedef char IOCTL_BUF[4096];
#define IOCTL_APP_TYPE 0xDF
#define IOCTL_FILE_INFO _IOWR(IOCTL_APP_TYPE,0,IOCTL_BUF)
#define IOCTL_GC _IO(IOCTL_APP_TYPE,1)
#define IOCTL_GC1 _IO(IOCTL_APP_TYPE,2)
#define IOCTL_INVALIDATE_ALL_NODES _IO(IOCTL_APP_TYPE,3)
```
* IOCTL\_FILE\_INFO: Same as the "file / directory xattrs" mentioned
above. Use a buffer size of 4096 bytes. Pass in a string of
"basepath", "relpath", "fullpath", or "allpaths". Receive details in
same buffer.
* IOCTL\_GC: Triggers a thorough garbage collection of excess
memory. Same as SIGUSR2.
* IOCTL\_GC1: Triggers a simple garbage collection of excess
memory. Same as what happens every 15 minutes normally.
* IOCTL\_INVALIDATE\_ALL\_NODES: Same as SIGUSR1. Send invalidation
notifications to the kernel for all files causing unused files to be
released from memory.
# TOOLING
* https://github.com/trapexit/mergerfs-tools
Expand Down
12 changes: 11 additions & 1 deletion libfuse/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ else
OPT_FLAGS := -O2
endif

ifeq ($(LTO),1)
LTO_FLAGS := -flto
else
LTO_FLAGS :=
endif

DESTDIR =
PREFIX = /usr/local
EXEC_PREFIX = $(PREFIX)
Expand All @@ -39,12 +45,14 @@ SRC_C = \
lib/fuse_dirents.c \
lib/fuse_lowlevel.c \
lib/fuse_mt.c \
lib/node.c \
lib/fuse_node.c \
lib/fuse_opt.c \
lib/fuse_session.c \
lib/fuse_signals.c \
lib/helper.c \
lib/mount.c
lib/mount.c \
lib/syslog.c
SRC_CPP = \
lib/format.cpp \
lib/os.cpp \
Expand All @@ -59,12 +67,14 @@ CFLAGS ?= \
$(OPT_FLAGS)
CFLAGS := \
${CFLAGS} \
$(LTO_FLAGS) \
-std=gnu99 \
-Wall \
-pipe \
-MMD
CXXFLAGS := \
${CXXFLAGS} \
$(LTO_FLAGS) \
-std=c++11 \
-Wall \
-pipe \
Expand Down
5 changes: 5 additions & 0 deletions libfuse/include/fuse.h
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ void fuse_exit(struct fuse *f);

int fuse_config_read_thread_count(const struct fuse *f);
int fuse_config_process_thread_count(const struct fuse *f);
int fuse_config_process_thread_queue_depth(const struct fuse *f);
const char* fuse_config_pin_threads(const struct fuse *f);

/**
Expand Down Expand Up @@ -764,6 +765,10 @@ void fuse_set_getcontext_func(struct fuse_context *(*func)(void));
/** Get session from fuse object */
struct fuse_session *fuse_get_session(struct fuse *f);

void fuse_gc1();
void fuse_gc();
void fuse_invalidate_all_nodes();

EXTERN_C_END

#endif /* _FUSE_H_ */
1 change: 1 addition & 0 deletions libfuse/include/fuse_lowlevel.h
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,7 @@ void fuse_session_process(struct fuse_session *se,
int fuse_session_loop_mt(struct fuse_session *se,
const int read_thread_count,
const int process_thread_count,
const int process_thread_queue_depth,
const char *pin_threads_type);

/* ----------------------------------------------------------- *
Expand Down
3 changes: 1 addition & 2 deletions libfuse/include/fuse_msgbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
typedef struct fuse_msgbuf_t fuse_msgbuf_t;
struct fuse_msgbuf_t
{
char *mem;
uint32_t size;
uint32_t used;
char *mem;
};
31 changes: 16 additions & 15 deletions libfuse/lib/bounded_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,26 @@ class BoundedQueue
explicit
BoundedQueue(std::size_t max_size_,
bool block_ = true)
: _block(block),
_max_size(max_size_)
: _block(block_),
_max_size(max_size_ ? max_size_ : 1)
{
if(_max_size == 0)
_max_size = 1;
}

BoundedQueue(const BoundedQueue&) = delete;
BoundedQueue(BoundedQueue&&) = default;

bool
push(const T& item_)
{
{
std::unique_lock guard(_queue_lock);
std::unique_lock<std::mutex> guard(_queue_lock);

_condition_push.wait(guard, [&]() { return _queue.size() < _max_size || !_block; });

if(_queue.size() == _max_size)
return false;

_queue.push(item);
_queue.push(item_);
}

_condition_pop.notify_one();
Expand All @@ -43,7 +44,7 @@ class BoundedQueue
push(T&& item_)
{
{
std::unique_lock guard(_queue_lock);
std::unique_lock<std::mutex> guard(_queue_lock);

_condition_push.wait(guard, [&]() { return _queue.size() < _max_size || !_block; });

Expand All @@ -61,7 +62,7 @@ class BoundedQueue
emplace(Args&&... args_)
{
{
std::unique_lock guard(_queue_lock);
std::lock_guard<std::mutex> guard(_queue_lock);

_condition_push.wait(guard, [&]() { return _queue.size() < _max_size || !_block; });

Expand All @@ -80,7 +81,7 @@ class BoundedQueue
pop(T& item_)
{
{
std::unique_lock guard(_queue_lock);
std::unique_lock<std::mutex> guard(_queue_lock);

_condition_pop.wait(guard, [&]() { return !_queue.empty() || !_block; });
if(_queue.empty())
Expand All @@ -99,7 +100,7 @@ class BoundedQueue
std::size_t
size() const
{
std::lock_guard guard(_queue_lock);
std::lock_guard<std::mutex> guard(_queue_lock);

return _queue.size();
}
Expand All @@ -113,31 +114,31 @@ class BoundedQueue
bool
empty() const
{
std::lock_guard guard(_queue_lock);
std::lock_guard<std::mutex> guard(_queue_lock);

return _queue.empty();
}

bool
full() const
{
std::lock_guard lock(_queue_lock);
std::lock_guard<std::mutex> lock(_queue_lock);

return (_queue.size() == capacity());
}

void
block()
{
std::lock_guard guard(_queue_lock);
std::lock_guard<std::mutex> guard(_queue_lock);
_block = true;
}

void
unblock()
{
{
std::lock_guard guard(_queue_lock);
std::lock_guard<std::mutex> guard(_queue_lock);
_block = false;
}

Expand All @@ -148,7 +149,7 @@ class BoundedQueue
bool
blocking() const
{
std::lock_guard guard(_queue_lock);
std::lock_guard<std::mutex> guard(_queue_lock);

return _block;
}
Expand Down
Loading

0 comments on commit 7cdd12b

Please sign in to comment.