Skip to content

Commit

Permalink
add implementations for eventloop functions sock_*
Browse files Browse the repository at this point in the history
  • Loading branch information
philoinovsky committed Jun 29, 2021
1 parent 5747a3a commit 31264ff
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 14 deletions.
13 changes: 9 additions & 4 deletions include/boost/python/eventloop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,21 @@ class event_loop
}


void sock_recv(object sock, int bytes);
object sock_recv(object sock, size_t nbytes);

void sock_recv_into(object sock, object buffer);
size_t sock_recv_into(object sock, object buffer);

void sock_sendall(object sock, object data);
object sock_sendall(object sock, object data);

void sock_connect(object sock, object address);

void sock_accept(object sock);
object sock_accept(object sock);

void sock_sendfile(object sock, object file, int offset = 0, int count = 0, bool fallback = true);

private:
int64_t _timer_id = 0;
object _pymod_socket = import("socket");
boost::asio::io_context::strand _strand;
std::unordered_map<int, std::unique_ptr<boost::asio::steady_timer>> _id_to_timer_map;
// read: key = fd * 2 + 0, write: key = fd * 2 + 1
Expand All @@ -89,6 +90,10 @@ class event_loop

void _add_reader_or_writer(int fd, object f, int key);
void _remove_reader_or_writer(int key);
static void _sock_connect_cb(
event_loop& loop, std::promise<void>& prom, std::future<void>& fut, object sock, object addr);
static void _sock_accept(
event_loop& loop, std::promise<object>& prom, std::future<object>& fut, object sock);
};

}}}
Expand Down
197 changes: 187 additions & 10 deletions src/eventloop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,44 @@
// TODO:
// 1. posix::stream_descriptor need windows version
// 2. call_* need return async.Handle
// 3. _ensure_fd_no_transport
// 4. _ensure_resolve

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/python.hpp>
#include <boost/python/eventloop.hpp>
#include <boost/mpl/vector.hpp>
#include <Python.h>


namespace boost { namespace python { namespace asio {
namespace
{

bool _hasattr(object o, const char* name)
{
return PyObject_HasAttrString(o.ptr(), name);
}

void _sock_recv_handler(
std::promise<std::vector<char>>& prom_data,
std::promise<size_t>& prom_nbytes_read,
size_t nbytes,
int fd)
{
std::vector<char> buffer(nbytes);
prom_nbytes_read.set_value(read(fd, buffer.data(), nbytes));
prom_data.set_value(std::move(buffer));
}

void _sock_send_handler(std::promise<size_t>& prom, int fd, const char *py_str, ssize_t len)
{
size_t nwrite = write(fd, py_str, len);
prom.set_value(nwrite);
}

}

void event_loop::_add_reader_or_writer(int fd, object f, int key)
{
Expand Down Expand Up @@ -50,6 +80,85 @@ void event_loop::_remove_reader_or_writer(int key)
}
}

void event_loop::_sock_connect_cb(
event_loop& loop, std::promise<void>& prom, std::future<void>& fut, object sock, object addr)
{
try
{
object err = sock.attr("getsockopt")(
loop._pymod_socket.attr("SOL_SOCKET"), loop._pymod_socket.attr("SO_ERROR"));
if (err != object(0)) {
// TODO: print the address
PyErr_SetString(PyExc_OSError, "Connect call failed {address}");
}
}
catch (const error_already_set& e)
{
if (PyErr_ExceptionMatches(PyExc_BlockingIOError)
|| PyErr_ExceptionMatches(PyExc_InterruptedError))
{
PyErr_Clear();
// pass
}
else if (PyErr_ExceptionMatches(PyExc_SystemExit)
|| PyErr_ExceptionMatches(PyExc_KeyboardInterrupt))
{
// raise
}
else if (PyErr_ExceptionMatches(PyExc_BaseException))
{
PyErr_Clear();
prom.set_exception(std::current_exception());
}
else
{
PyErr_Clear();
prom.set_value();
}
}
}

void event_loop::_sock_accept(
event_loop& loop, std::promise<object>& prom, std::future<object>& fut, object sock)
{
int fd = extract<int>(sock.attr("fileno")());
object conn;
object address;
try
{
object ret = sock.attr("accept")();
conn = ret[0];
address = ret[1];
conn.attr("setblocking")(object(false));
}
catch (const error_already_set& e)
{
if (PyErr_ExceptionMatches(PyExc_BlockingIOError)
|| PyErr_ExceptionMatches(PyExc_InterruptedError))
{
PyErr_Clear();
loop.add_reader(fd, make_function(bind(
_sock_accept, boost::ref(loop), boost::ref(prom), boost::ref(fut), sock),
default_call_policies(), boost::mpl::vector<void, object>()));
}
else if (PyErr_ExceptionMatches(PyExc_SystemExit)
|| PyErr_ExceptionMatches(PyExc_KeyboardInterrupt))
{
// raise
}
else if (PyErr_ExceptionMatches(PyExc_BaseException))
{
PyErr_Clear();
prom.set_exception(std::current_exception());
}
else
{
PyErr_Clear();
prom.set_value(make_tuple(conn, address));
}
}
}

void event_loop::call_later(double delay, object f)
{
// add timer
Expand All @@ -76,34 +185,102 @@ void event_loop::call_at(double when, object f)
return call_soon(f);
}

void event_loop::sock_recv(object sock, int bytes)
object event_loop::sock_recv(object sock, size_t nbytes)
{

int fd = extract<int>(sock.attr("fileno")());
std::promise<std::vector<char>> prom_data;
std::future<std::vector<char>> fut_data = prom_data.get_future();
std::promise<size_t> prom_nbytes_read;
std::future<size_t> fut_nbytes_read = prom_nbytes_read.get_future();
add_reader(fd, make_function(bind(_sock_recv_handler,
boost::ref(prom_data), boost::ref(prom_nbytes_read), nbytes, fd),
default_call_policies(), boost::mpl::vector<void, object>()));
return object(handle<>(PyBytes_FromStringAndSize(fut_data.get().data(), nbytes)));
}

void event_loop::sock_recv_into(object sock, object buffer)
size_t event_loop::sock_recv_into(object sock, object buffer)
{

int fd = extract<int>(sock.attr("fileno")());
ssize_t nbytes = len(buffer);
std::promise<std::vector<char>> prom_data;
std::future<std::vector<char>> fut_data = prom_data.get_future();
std::promise<size_t> prom_nbytes_read;
std::future<size_t> fut_nbytes_read = prom_nbytes_read.get_future();
add_reader(fd, make_function(bind(_sock_recv_handler,
boost::ref(prom_data), boost::ref(prom_nbytes_read), nbytes, fd),
default_call_policies(), boost::mpl::vector<void, object>()));
buffer = object(handle<>(PyBytes_FromStringAndSize(fut_data.get().data(), nbytes)));
return fut_nbytes_read.get();
}

void event_loop::sock_sendall(object sock, object data)
object event_loop::sock_sendall(object sock, object data)
{

int fd = extract<int>(sock.attr("fileno")());
char const* py_str = extract<char const*>(data.attr("decode")());
ssize_t py_str_len = len(data);
std::promise<size_t> prom;
std::future<size_t> fut = prom.get_future();
add_writer(fd, make_function(bind(_sock_send_handler, std::ref(prom), fd, py_str, py_str_len),
default_call_policies(), boost::mpl::vector<void, object>()));
fut.wait();
return object();
}

void event_loop::sock_connect(object sock, object address)
{


if (!_hasattr(_pymod_socket, "AF_UNIX") || sock.attr("family") != _pymod_socket.attr("AF_UNIX"))
{
// TODO: _ensure_resolve
}
std::promise<void> prom;
std::future<void> fut = prom.get_future();
int fd = extract<int>(sock.attr("fileno")());
try
{
sock.attr("connect")(address);
}
catch (const error_already_set& e)
{
if (PyErr_ExceptionMatches(PyExc_BlockingIOError)
|| PyErr_ExceptionMatches(PyExc_InterruptedError))
{
PyErr_Clear();
add_writer(fd, make_function(bind(
_sock_connect_cb, boost::ref(*this), boost::ref(prom), boost::ref(fut), sock, address),
default_call_policies(), boost::mpl::vector<void, object>()));
}
else if (PyErr_ExceptionMatches(PyExc_SystemExit)
|| PyErr_ExceptionMatches(PyExc_KeyboardInterrupt))
{
// raise
}
else if (PyErr_ExceptionMatches(PyExc_BaseException))
{
PyErr_Clear();
prom.set_exception(std::current_exception());
}
else
{
PyErr_Clear();
prom.set_value();
}
}
fut.wait();
}

void event_loop::sock_accept(object sock)
object event_loop::sock_accept(object sock)
{

std::promise<object> prom;
std::future<object> fut = prom.get_future();
_sock_accept(*this, prom, fut, sock);
return fut.get();
}

// TODO: implement this
void event_loop::sock_sendfile(object sock, object file, int offset, int count, bool fallback)
{

PyErr_SetString(PyExc_NotImplementedError, "Not implemented!");
}

}}}

0 comments on commit 31264ff

Please sign in to comment.