From a2a2bbeaa59d88a5102fb834d0980df96940aab0 Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Sun, 27 Jun 2021 18:20:00 +0000 Subject: [PATCH] add implementations for eventloop functions sock_* --- include/boost/python/eventloop.hpp | 9 +- src/eventloop.cpp | 195 +++++++++++++++++++++++++++-- 2 files changed, 190 insertions(+), 14 deletions(-) diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp index 4a7c3b30a..c8338d73d 100644 --- a/include/boost/python/eventloop.hpp +++ b/include/boost/python/eventloop.hpp @@ -67,20 +67,21 @@ class event_loop } - void sock_recv(object sock, int bytes); + object sock_recv(object sock, size_t nbytes); - void sock_recv_into(object sock, object buffer); + size_t sock_recv_into(object sock, object buffer); - void sock_sendall(object sock, object data); + object sock_sendall(object sock, object data); void sock_connect(object sock, object address); - void sock_accept(object sock); + object sock_accept(object sock); void sock_sendfile(object sock, object file, int offset = 0, int count = 0, bool fallback = true); private: int64_t _timer_id = 0; + object _pymod_socket = import("socket"); boost::asio::io_context::strand _strand; std::unordered_map> _id_to_timer_map; // read: key = fd * 2 + 0, write: key = fd * 2 + 1 diff --git a/src/eventloop.cpp b/src/eventloop.cpp index c4ed98076..676a66126 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -6,14 +6,121 @@ // TODO: // 1. posix::stream_descriptor need windows version // 2. call_* need return async.Handle +// 3. _ensure_fd_no_transport +// 4. _ensure_resolve #include #include #include #include +#include +#include namespace boost { namespace python { namespace asio { +namespace +{ + +bool _hasattr(object o, const char* name) +{ + return PyObject_HasAttrString(o.ptr(), name); +} + +void _sock_recv_handler( + std::promise>& prom_data, + std::promise& prom_nbytes_read, + size_t nbytes, + int fd) +{ + std::vector buffer(nbytes); + prom_nbytes_read.set_value(read(fd, buffer.data(), nbytes)); + prom_data.set_value(std::move(buffer)); +} + +void _sock_send_handler(std::promise& prom, int fd, const char *py_str, ssize_t len) +{ + size_t nwrite = write(fd, py_str, len); + prom.set_value(nwrite); +} + +void _sock_connect_cb(object pymod_socket, std::promise& prom, std::future& fut, object sock, object addr) +{ + try + { + object err = sock.attr("getsockopt")( + pymod_socket.attr("SOL_SOCKET"), pymod_socket.attr("SO_ERROR")); + if (err != object(0)) { + // TODO: print the address + PyErr_SetString(PyExc_OSError, "Connect call failed {address}"); + } + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_BlockingIOError) + || PyErr_ExceptionMatches(PyExc_InterruptedError)) + { + PyErr_Clear(); + // pass + } + else if (PyErr_ExceptionMatches(PyExc_SystemExit) + || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) + { + // raise + } + else if (PyErr_ExceptionMatches(PyExc_BaseException)) + { + PyErr_Clear(); + prom.set_exception(std::current_exception()); + } + else + { + PyErr_Clear(); + prom.set_value(); + } + } +} + +void _sock_accept(event_loop& loop, std::promise& prom, std::future& fut, object sock) +{ + int fd = extract(sock.attr("fileno")()); + object conn; + object address; + try + { + object ret = sock.attr("accept")(); + conn = ret[0]; + address = ret[1]; + conn.attr("setblocking")(object(false)); + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_BlockingIOError) + || PyErr_ExceptionMatches(PyExc_InterruptedError)) + { + PyErr_Clear(); + loop.add_reader(fd, make_function(bind( + _sock_accept, boost::ref(loop), boost::ref(prom), boost::ref(fut), sock), + default_call_policies(), boost::mpl::vector())); + } + else if (PyErr_ExceptionMatches(PyExc_SystemExit) + || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) + { + // raise + } + else if (PyErr_ExceptionMatches(PyExc_BaseException)) + { + PyErr_Clear(); + prom.set_exception(std::current_exception()); + } + else + { + PyErr_Clear(); + prom.set_value(make_tuple(conn, address)); + } + } +} + +} void event_loop::_add_reader_or_writer(int fd, object f, int key) { @@ -76,34 +183,102 @@ void event_loop::call_at(double when, object f) return call_soon(f); } -void event_loop::sock_recv(object sock, int bytes) +object event_loop::sock_recv(object sock, size_t nbytes) { - + int fd = extract(sock.attr("fileno")()); + std::promise> prom_data; + std::future> fut_data = prom_data.get_future(); + std::promise prom_nbytes_read; + std::future fut_nbytes_read = prom_nbytes_read.get_future(); + add_reader(fd, make_function(bind(_sock_recv_handler, + boost::ref(prom_data), boost::ref(prom_nbytes_read), nbytes, fd), + default_call_policies(), boost::mpl::vector())); + return object(handle<>(PyBytes_FromStringAndSize(fut_data.get().data(), nbytes))); } -void event_loop::sock_recv_into(object sock, object buffer) +size_t event_loop::sock_recv_into(object sock, object buffer) { - + int fd = extract(sock.attr("fileno")()); + ssize_t nbytes = len(buffer); + std::promise> prom_data; + std::future> fut_data = prom_data.get_future(); + std::promise prom_nbytes_read; + std::future fut_nbytes_read = prom_nbytes_read.get_future(); + add_reader(fd, make_function(bind(_sock_recv_handler, + boost::ref(prom_data), boost::ref(prom_nbytes_read), nbytes, fd), + default_call_policies(), boost::mpl::vector())); + buffer = object(handle<>(PyBytes_FromStringAndSize(fut_data.get().data(), nbytes))); + return fut_nbytes_read.get(); } -void event_loop::sock_sendall(object sock, object data) +object event_loop::sock_sendall(object sock, object data) { - + int fd = extract(sock.attr("fileno")()); + char const* py_str = extract(data.attr("decode")()); + ssize_t py_str_len = len(data); + std::promise prom; + std::future fut = prom.get_future(); + add_writer(fd, make_function(bind(_sock_send_handler, std::ref(prom), fd, py_str, py_str_len), + default_call_policies(), boost::mpl::vector())); + fut.wait(); + return object(); } void event_loop::sock_connect(object sock, object address) { - + + if (!_hasattr(_pymod_socket, "AF_UNIX") || sock.attr("family") != _pymod_socket.attr("AF_UNIX")) + { + // TODO: _ensure_resolve + } + std::promise prom; + std::future fut = prom.get_future(); + int fd = extract(sock.attr("fileno")()); + try + { + sock.attr("connect")(address); + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_BlockingIOError) + || PyErr_ExceptionMatches(PyExc_InterruptedError)) + { + PyErr_Clear(); + add_writer(fd, make_function(bind( + _sock_connect_cb, _pymod_socket, boost::ref(prom), boost::ref(fut), sock, address), + default_call_policies(), boost::mpl::vector())); + } + else if (PyErr_ExceptionMatches(PyExc_SystemExit) + || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) + { + // raise + } + else if (PyErr_ExceptionMatches(PyExc_BaseException)) + { + PyErr_Clear(); + prom.set_exception(std::current_exception()); + } + else + { + PyErr_Clear(); + prom.set_value(); + } + } + fut.wait(); } -void event_loop::sock_accept(object sock) +object event_loop::sock_accept(object sock) { - + std::promise prom; + std::future fut = prom.get_future(); + _sock_accept(*this, prom, fut, sock); + return fut.get(); } +// TODO: implement this void event_loop::sock_sendfile(object sock, object file, int offset, int count, bool fallback) { - + PyErr_SetString(PyExc_NotImplementedError, "Not implemented!"); } }}} \ No newline at end of file