Skip to content

Commit

Permalink
support coroutine lock
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFreeman committed Oct 17, 2024
1 parent 4fe3e4b commit bda24ee
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/iouring.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on: [push, pull_request]
jobs:
test-linux:
if: "!contains(github.event.head_commit.message, '--filter=') || contains(github.event.head_commit.message, '[iouring]')"
runs-on: ubuntu-latest
runs-on: ubuntu-24.04
strategy:
fail-fast: false
matrix:
Expand Down
38 changes: 37 additions & 1 deletion ext-src/swoole_lock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ using swoole::SpinLock;
using swoole::RWLock;
#endif

#ifdef HAVE_IOURING_FUTEX
#include "swoole_coroutine.h"
using swoole::Coroutine;
using swoole::CoroutineLock;
#endif

static zend_class_entry *swoole_lock_ce;
static zend_object_handlers swoole_lock_handlers;

Expand Down Expand Up @@ -117,6 +123,9 @@ void php_swoole_lock_minit(int module_number) {
#endif
#ifdef HAVE_SPINLOCK
zend_declare_class_constant_long(swoole_lock_ce, ZEND_STRL("SPINLOCK"), Lock::SPIN_LOCK);
#endif
#ifdef HAVE_IOURING_FUTEX
zend_declare_class_constant_long(swoole_lock_ce, ZEND_STRL("COROUTINELOCK"), Lock::COROUTINE_LOCK);
#endif
zend_declare_property_long(swoole_lock_ce, ZEND_STRL("errCode"), 0, ZEND_ACC_PUBLIC);

Expand All @@ -127,6 +136,9 @@ void php_swoole_lock_minit(int module_number) {
#ifdef HAVE_SPINLOCK
SW_REGISTER_LONG_CONSTANT("SWOOLE_SPINLOCK", Lock::SPIN_LOCK);
#endif
#ifdef HAVE_IOURING_FUTEX
SW_REGISTER_LONG_CONSTANT("SWOOLE_COROUTINE_LOCK", Lock::COROUTINE_LOCK);
#endif
}

static PHP_METHOD(swoole_lock, __construct) {
Expand Down Expand Up @@ -158,8 +170,14 @@ static PHP_METHOD(swoole_lock, __construct) {
case Lock::MUTEX:
lock = new Mutex(Mutex::PROCESS_SHARED);
break;
#ifdef HAVE_IOURING_FUTEX
case Lock::COROUTINE_LOCK:
lock = new CoroutineLock();
break;
#endif
default:
zend_throw_exception(swoole_exception_ce, "lock type[%d] is not support", type);
zend_throw_exception_ex(
swoole_exception_ce, SW_ERROR_INVALID_PARAMS, "lock type[%d] is not support", (zend_long) type);
RETURN_FALSE;
break;
}
Expand All @@ -171,6 +189,15 @@ static PHP_METHOD(swoole_lock, __destruct) {}

static PHP_METHOD(swoole_lock, lock) {
Lock *lock = php_swoole_lock_get_and_check_ptr(ZEND_THIS);
#ifdef HAVE_IOURING_FUTEX
if (lock->get_type() == Lock::COROUTINE_LOCK && (SwooleTG.reactor == nullptr || !Coroutine::get_current())) {
zend_throw_exception_ex(swoole_exception_ce,
SW_ERROR_OPERATION_NOT_SUPPORT,
"lock type[%d] must be used in a coroutine environment",
(zend_long) Lock::COROUTINE_LOCK);
RETURN_FALSE;
}
#endif
SW_LOCK_CHECK_RETURN(lock->lock());
}

Expand All @@ -197,6 +224,15 @@ static PHP_METHOD(swoole_lock, lockwait) {

static PHP_METHOD(swoole_lock, unlock) {
Lock *lock = php_swoole_lock_get_and_check_ptr(ZEND_THIS);
#ifdef HAVE_IOURING_FUTEX
if (lock->get_type() == Lock::COROUTINE_LOCK && (SwooleTG.reactor == nullptr || !Coroutine::get_current())) {
zend_throw_exception_ex(swoole_exception_ce,
SW_ERROR_OPERATION_NOT_SUPPORT,
"lock type[%d] must be used in a coroutine environment",
(zend_long) Lock::COROUTINE_LOCK);
RETURN_FALSE;
}
#endif
SW_LOCK_CHECK_RETURN(lock->unlock());
}

Expand Down
31 changes: 31 additions & 0 deletions ext-src/swoole_thread_lock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ using swoole::SpinLock;
#ifdef HAVE_RWLOCK
using swoole::RWLock;
#endif
#ifdef HAVE_IOURING_FUTEX
#include "swoole_coroutine.h"
using swoole::Coroutine;
using swoole::CoroutineLock;
#endif

zend_class_entry *swoole_thread_lock_ce;
static zend_object_handlers swoole_thread_lock_handlers;
Expand All @@ -50,6 +55,11 @@ struct LockResource : public ThreadResource {
case Lock::RW_LOCK:
lock_ = new RWLock(0);
break;
#endif
#ifdef HAVE_IOURING_FUTEX
case Lock::CoroutineLock:
lock_ = new COROUTINE_LOCK();
break;
#endif
case Lock::MUTEX:
default:
Expand Down Expand Up @@ -152,6 +162,9 @@ void php_swoole_thread_lock_minit(int module_number) {
#endif
#ifdef HAVE_SPINLOCK
zend_declare_class_constant_long(swoole_thread_lock_ce, ZEND_STRL("SPINLOCK"), Lock::SPIN_LOCK);
#endif
#ifdef HAVE_IOURING_FUTEX
zend_declare_class_constant_long(swoole_lock_ce, ZEND_STRL("COROUTINELOCK"), Lock::COROUTINE_LOCK);
#endif
zend_declare_property_long(swoole_thread_lock_ce, ZEND_STRL("errCode"), 0, ZEND_ACC_PUBLIC);
}
Expand All @@ -177,6 +190,15 @@ static PHP_METHOD(swoole_thread_lock, __destruct) {}

static PHP_METHOD(swoole_thread_lock, lock) {
Lock *lock = lock_get_and_check_ptr(ZEND_THIS);
#ifdef HAVE_IOURING_FUTEX
if (lock->get_type() == Lock::COROUTINE_LOCK && (SwooleTG.reactor == nullptr || !Coroutine::get_current())) {
zend_throw_exception_ex(swoole_exception_ce,
SW_ERROR_OPERATION_NOT_SUPPORT,
"lock type[%d] must be used in a coroutine environment",
(zend_long) Lock::COROUTINE_LOCK);
RETURN_FALSE;
}
#endif
SW_LOCK_CHECK_RETURN(lock->lock());
}

Expand All @@ -203,6 +225,15 @@ static PHP_METHOD(swoole_thread_lock, lockwait) {

static PHP_METHOD(swoole_thread_lock, unlock) {
Lock *lock = lock_get_and_check_ptr(ZEND_THIS);
#ifdef HAVE_IOURING_FUTEX
if (lock->get_type() == Lock::COROUTINE_LOCK && (SwooleTG.reactor == nullptr || !Coroutine::get_current())) {
zend_throw_exception_ex(swoole_exception_ce,
SW_ERROR_OPERATION_NOT_SUPPORT,
"lock type[%d] must be used in a coroutine environment",
(zend_long) Lock::COROUTINE_LOCK);
RETURN_FALSE;
}
#endif
SW_LOCK_CHECK_RETURN(lock->unlock());
}

Expand Down
8 changes: 8 additions & 0 deletions include/swoole.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ extern std::mutex sw_thread_lock;
#define SW_THREAD_LOCAL
#endif

#ifdef SW_USE_IOURING
#include <liburing.h>
#include <linux/version.h>
#if LINUX_VERSION_CODE >= KERNEL_VERSION(6, 7, 0)
#define HAVE_IOURING_FUTEX 1 // futex lock available since kernel 6.7.
#endif
#endif

/**
* API naming rules
* -----------------------------------
Expand Down
46 changes: 22 additions & 24 deletions include/swoole_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@
#include <atomic>
#include <queue>

#ifdef SW_USE_IOURING
#include <liburing.h>
#endif

#ifndef O_DIRECT
#define O_DIRECT 040000
#endif
Expand All @@ -40,42 +36,37 @@ enum AsyncFlag {
};

struct AsyncEvent {
size_t task_id;
#ifdef SW_USE_IOURING
size_t count;
#endif
uint8_t canceled;
int error;
/**
* input & output
*/
void *data;
#ifdef SW_USE_IOURING
const char *pathname;
const char *pathname2;
struct statx *statxbuf;
void *rbuf;
const void *wbuf;
#endif
/**
* output
*/
ssize_t retval;
#ifdef SW_USE_IOURING
int fd;
int flags;
int fd; // alias futex_flags
uint32_t flags;
int opcode;
mode_t mode;
#ifdef HAVE_IOURING_FUTEX
uint32_t *futex;
uint64_t value;
uint64_t mask;
#endif
#endif
/**
* internal use only
*/

// internal use only
network::Socket *pipe_socket;
double timestamp;
void *object;
void (*handler)(AsyncEvent *event);
void (*callback)(AsyncEvent *event);

size_t task_id;
ssize_t retval; // output
void *data; // input & output
uint8_t canceled;
int error;

bool catch_error() {
return (error == SW_ERROR_AIO_TIMEOUT || error == SW_ERROR_AIO_CANCELED);
}
Expand Down Expand Up @@ -168,6 +159,10 @@ class AsyncIouring {
SW_IORING_OP_RENAMEAT = IORING_OP_RENAMEAT,
SW_IORING_OP_UNLINKAT = IORING_OP_UNLINKAT,
SW_IORING_OP_MKDIRAT = IORING_OP_MKDIRAT,
#ifdef HAVE_IOURING_FUTEX
SW_IORING_OP_FUTEX_WAIT = IORING_OP_FUTEX_WAIT,
SW_IORING_OP_FUTEX_WAKE = IORING_OP_FUTEX_WAKE,
#endif

SW_IORING_OP_FSTAT = 1000,
SW_IORING_OP_LSTAT = 1001,
Expand All @@ -193,6 +188,9 @@ class AsyncIouring {
bool unlink(AsyncEvent *event);
bool rename(AsyncEvent *event);
bool fsync(AsyncEvent *event);
#ifdef HAVE_IOURING_FUTEX
bool futex(AsyncEvent *event);
#endif
inline bool is_empty_waiting_tasks() {
return waiting_tasks.size() == 0;
}
Expand Down
9 changes: 9 additions & 0 deletions include/swoole_coroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,15 @@ int async(AsyncIouring::opcodes opcode,
struct statx *statxbuf = nullptr,
size_t count = 0,
double timeout = -1);
#ifdef HAVE_IOURING_FUTEX
int futex(AsyncIouring::opcodes opcode,
uint32_t *futex,
uint64_t value,
uint64_t mask,
uint32_t futex_flags,
uint32_t flags,
double timeout = -1);
#endif
#endif
bool run(const CoroutineFunc &fn, void *arg = nullptr);
} // namespace coroutine
Expand Down
27 changes: 26 additions & 1 deletion include/swoole_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

#include <system_error>

#ifdef HAVE_IOURING_FUTEX
#include "swoole_coroutine_system.h"
#endif

namespace swoole {

class Lock {
Expand All @@ -31,11 +35,15 @@ class Lock {
RW_LOCK = 1,
MUTEX = 3,
SPIN_LOCK = 5,
ATOMIC_LOCK = 6,
#ifdef HAVE_IOURING_FUTEX
COROUTINE_LOCK = 6,
#endif
};

Type get_type() {
return type_;
}

virtual ~Lock(){};
virtual int lock_rd() = 0;
virtual int lock() = 0;
Expand Down Expand Up @@ -106,6 +114,23 @@ class SpinLock : public Lock {
};
#endif

#ifdef HAVE_IOURING_FUTEX
class CoroutineLock : public Lock {
private:
sw_atomic_t *value = nullptr;
Coroutine *current_coroutine = nullptr;

public:
CoroutineLock();
~CoroutineLock();
int lock_rd() override;
int lock() override;
int unlock() override;
int trylock_rd() override;
int trylock() override;
};
#endif

#if defined(HAVE_PTHREAD_BARRIER) && !(defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__))
#define SW_USE_PTHREAD_BARRIER
#endif
Expand Down
2 changes: 1 addition & 1 deletion scripts/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ if [ "${SWOOLE_BRANCH}" = "valgrind" ]; then
elif [ "$SWOOLE_THREAD" = 1 ]; then
dir="swoole_thread"
elif [ "$SWOOLE_USE_IOURING" = 1 ]; then
dir="swoole_runtime/file_hook"
dir="swoole_runtime/file_hook swoole_lock/lock.phpt"
else
dir="swoole_*"
fi
Expand Down
34 changes: 34 additions & 0 deletions src/coroutine/system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,40 @@ int async(AsyncIouring::opcodes opcode,

return event.retval;
}

#ifdef HAVE_IOURING_FUTEX
int futex(AsyncIouring::opcodes opcode,
uint32_t *futex,
uint64_t value,
uint64_t mask,
uint32_t futex_flags,
uint32_t flags,
double timeout) {
if (SwooleTG.async_iouring == nullptr) {
SwooleTG.async_iouring = new AsyncIouring(SwooleTG.reactor);
SwooleTG.async_iouring->add_event();
}

AsyncEvent event{};
AsyncLambdaTask task{Coroutine::get_current_safe(), nullptr};

event.object = &task;
event.callback = async_lambda_callback;
event.opcode = opcode;
event.fd = futex_flags;
event.value = value;
event.futex = futex;
event.flags = flags;
event.mask = mask;

bool result = SwooleTG.async_iouring->futex(&event);
if (!result || !task.co->yield_ex(timeout)) {
return 0;
}

return event.retval;
}
#endif
#endif

AsyncLock::AsyncLock(void *resource) {
Expand Down
Loading

0 comments on commit bda24ee

Please sign in to comment.