Skip to content

Commit

Permalink
Merge pull request #17 from olapiv/mikaelMultiP
Browse files Browse the repository at this point in the history
Test: Mikael PR with CI run
  • Loading branch information
mronstro authored Nov 4, 2024
2 parents ca5db34 + fae78ae commit ef5934d
Show file tree
Hide file tree
Showing 26 changed files with 306 additions and 152 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/build_test_push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,18 @@ on:
types: [opened, synchronize, reopened]

jobs:
build-pink-macos:
runs-on: macos-latest
steps:
- uses: actions/checkout@v4

- run: brew install protobuf

- name: Build
run: cd pink && make static_lib

build-and-run-redis-benchmark:
needs: [build-pink-macos]
runs-on: ubuntu-latest
env:
RONDB_TARBALL_URI: https://repo.hops.works/master/rondb-22.10.5-linux-glibc2.28-x86_64.tar.gz
Expand Down
18 changes: 16 additions & 2 deletions pink/Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
CLEAN_FILES = # deliberately empty, so we can append below.
CXX=g++
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Darwin)
LDFLAGS= -lpthread
else
LDFLAGS= -lpthread -lrt
endif

CXXFLAGS= -g -std=c++11 -fno-builtin-memcmp -pipe -fPIC
ifeq ($(shell uname -m), x86_64)
CXXFLAGS += -msse -msse4.2
CXXFLAGS += -msse -msse4.2
endif
ifeq ($(UNAME_S),Darwin)
CXXFLAGS += -I/opt/homebrew/include
endif

PROFILING_FLAGS=-pg
Expand All @@ -15,7 +24,11 @@ DEBUG_LEVEL?=0
NO_PB?=0

ifeq ($(NO_PB),0)
LDFLAGS+= -lprotobuf
ifeq ($(UNAME_S),Darwin)
LDFLAGS += -L/opt/homebrew/lib
else
LDFLAGS+= -lprotobuf
endif
endif

ifeq ($(MAKECMDGOALS),dbg)
Expand All @@ -25,6 +38,7 @@ endif
ifeq ($(ENABLE_SSL),1)
OPT += -D__ENABLE_SSL
endif
OPT += -std=c++17

# compile with -O2 if for release
# if we're compiling for release, compile without debug code (-DNDEBUG) and
Expand Down
15 changes: 2 additions & 13 deletions pink/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,9 @@
set -e

PINK_PATH=$PWD

# We depend on slash
SLASH_PATH=$1
if test -z $SLASH_PATH; then
SLASH_PATH=$PINK_PATH/third/slash
fi

if [[ ! -d $SLASH_PATH ]]; then
echo "Slash library is not available"
exit 1
fi
SLASH_PATH=$PINK_PATH/third/slash
cd $SLASH_PATH/slash && make

# Compile pink
cd $PINK_PATH
make SLASH_PATH=$SLASH_PATH
cd examples && make SLASH_PATH=$SLASH_PATH
SLASH_PATH=$SLASH_PATH make
7 changes: 6 additions & 1 deletion pink/examples/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
CXX=g++
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Darwin)
LDFLAGS= -lpthread
else
LDFLAGS= -lpthread -lrt -lprotobuf
CXXFLAGS=-O2 -std=c++11 -fno-builtin-memcmp
endif
CXXFLAGS=-O2 -std=c++17 -fno-builtin-memcmp
ifeq ($(shell uname -m), x86_64)
CXXFLAGS += -msse -msse4.2
endif
Expand Down
2 changes: 0 additions & 2 deletions pink/include/backend_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
#ifndef PINK_INCLUDE_BACKEND_THREAD_H_
#define PINK_INCLUDE_BACKEND_THREAD_H_

#include <sys/epoll.h>

#include <set>
#include <string>
#include <map>
Expand Down
2 changes: 0 additions & 2 deletions pink/include/client_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
#ifndef PINK_INCLUDE_CLIENT_THREAD_H_
#define PINK_INCLUDE_CLIENT_THREAD_H_

#include <sys/epoll.h>

#include <set>
#include <string>
#include <map>
Expand Down
4 changes: 1 addition & 3 deletions pink/include/pink_pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
#ifndef PINK_INCLUDE_PUBSUB_H_
#define PINK_INCLUDE_PUBSUB_H_

#include <sys/epoll.h>

#include <string>
#include <functional>
#include <queue>
Expand All @@ -29,7 +27,7 @@
namespace pink {

class PinkEpoll;
class PinkFiredEvent;
struct PinkFiredEvent;
class PinkConn;

class PubSubThread : public Thread {
Expand Down
2 changes: 0 additions & 2 deletions pink/include/server_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
#ifndef PINK_INCLUDE_SERVER_THREAD_H_
#define PINK_INCLUDE_SERVER_THREAD_H_

#include <sys/epoll.h>

#include <set>
#include <vector>
#include <memory>
Expand Down
14 changes: 13 additions & 1 deletion pink/rondis/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
CXX=g++
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Darwin)
LDFLAGS= -lpthread
else
LDFLAGS= -lpthread -lrt -lprotobuf
endif
CXXFLAGS=-O2 -std=c++17 -fno-builtin-memcmp
ifeq ($(shell uname -m), x86_64)
CXXFLAGS += -msse -msse4.2
Expand All @@ -25,14 +30,21 @@ SLASH_LIBRARY=$(SLASH_PATH)/slash/lib/libslash.a

ifndef RONDB_PATH
$(warning Warning: missing rondb path)
exit 1
$(error Exit due to missing RONDB_PATH)
endif
RONDB_INCLUDE_DIR=$(RONDB_PATH)/include/storage/ndb
ifeq ($(UNAME_S),Darwin)
RONDB_LIBRARY=$(RONDB_PATH)/lib/libndbclient.dylib
else
RONDB_LIBRARY=$(RONDB_PATH)/lib/libndbclient.so
endif

CXXFLAGS+= -I$(PINK_INCLUDE_DIR) -I$(SLASH_INCLUDE_DIR) -I$(RONDB_INCLUDE_DIR)

DEP_LIBS = $(PINK_LIBRARY) $(SLASH_LIBRARY) $(RONDB_LIBRARY)
ifeq ($(UNAME_S),Darwin)
DEP_LIBS+= /opt/homebrew/lib/libprotobuf.dylib
endif
LDFLAGS := $(DEP_LIBS) $(LDFLAGS)

# Use find to locate all .cc files in subdirectories
Expand Down
2 changes: 1 addition & 1 deletion pink/rondis/rondis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ int main(int argc, char *argv[])
{
int port = 6379;
char *connect_string = "localhost:13000";
int worker_threads = 1;
int worker_threads = 2;
if (argc != 4)
{
printf("Not receiving 3 arguments, just using defaults\n");
Expand Down
36 changes: 20 additions & 16 deletions pink/rondis/string/db_operations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ NdbRecord *entire_key_record = nullptr;
NdbRecord *pk_value_record = nullptr;
NdbRecord *entire_value_record = nullptr;

static void set_length(char *buf, Uint32 key_len) {
Uint8 *ptr = (Uint8*)buf;
ptr[0] = (Uint8)(key_len & 255);
ptr[1] = (Uint8)(key_len >> 8);
}

static Uint32 get_length(char *buf) {
Uint8 *ptr = (Uint8*)buf;
Uint8 low = ptr[0];
Uint8 high = ptr[1];
Uint32 len32 = Uint32(low) + Uint32(256) * Uint32(high);
return len32;
}

int create_key_row(std::string *response,
Ndb *ndb,
const NdbDictionary::Table *tab,
Expand Down Expand Up @@ -141,8 +155,7 @@ int delete_key_row(std::string *response,
}
del_op->deleteTuple();
memcpy(&buf[2], key_str, key_len);
buf[0] = key_len & 255;
buf[1] = key_len >> 8;
set_length(buf, key_len);
del_op->equal(KEY_TABLE_COL_redis_key, buf);

if (del_op->getNdbError().code != 0)
Expand Down Expand Up @@ -242,8 +255,7 @@ void write_data_to_key_op(NdbOperation *ndb_op,
char *buf)
{
memcpy(&buf[2], key_str, key_len);
buf[0] = key_len & 255;
buf[1] = key_len >> 8;
set_length(buf, key_len);
ndb_op->equal(KEY_TABLE_COL_redis_key, buf);

if (rondb_key == 0)
Expand All @@ -265,9 +277,7 @@ void write_data_to_key_op(NdbOperation *ndb_op,
this_value_len = INLINE_VALUE_LEN;
}
memcpy(&buf[2], value_str, this_value_len);
Uint8 *ptr = (Uint8 *)buf;
ptr[0] = (Uint8)(this_value_len & 255);
ptr[1] = (Uint8)(this_value_len >> 8);
set_length(buf, this_value_len);
ndb_op->setValue(KEY_TABLE_COL_value_start, buf);
}

Expand Down Expand Up @@ -301,9 +311,7 @@ int create_value_row(std::string *response,
op->equal(VALUE_TABLE_COL_rondb_key, rondb_key);
op->equal(VALUE_TABLE_COL_ordinal, ordinal);
memcpy(&buf[2], start_value_ptr, this_value_len);
Uint8 *ptr = (Uint8 *)buf;
ptr[0] = (Uint8)(this_value_len & 255);
ptr[1] = (Uint8)(this_value_len >> 8);
set_length(buf, this_value_len);
op->setValue(VALUE_TABLE_COL_value, buf);
{
if (op->getNdbError().code != 0)
Expand Down Expand Up @@ -512,9 +520,7 @@ int read_batched_value_rows(std::string *response,
for (Uint32 i = 0; i < num_rows_to_read; i++)
{
// Transfer char pointer to response's string
Uint8 low = (Uint8)value_rows[i].value[0];
Uint8 high = (Uint8)value_rows[i].value[1];
Uint32 row_value_len = Uint32(low) + (Uint32(256) * Uint32(high));
Uint32 row_value_len = get_length((char*)&value_rows->value[0]);
response->append((const char *)&value_rows[i].value[2], row_value_len);
}
return 0;
Expand Down Expand Up @@ -576,9 +582,7 @@ int get_complex_key_row(std::string *response,
response->append(header_buf);

// Append inline value to response
Uint8 low = (Uint8)key_row->value_start[0];
Uint8 high = (Uint8)key_row->value_start[1];
Uint32 inline_value_len = Uint32(low) + (Uint32(256) * Uint32(high));
Uint32 inline_value_len = get_length((char*)&key_row->value_start[0]);
response->append((const char *)&key_row->value_start[2], inline_value_len);

int ret_code = get_value_rows(response,
Expand Down
28 changes: 14 additions & 14 deletions pink/src/backend_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Status BackendThread::Close(const int fd) {
}

Status BackendThread::ProcessConnectStatus(PinkFiredEvent* pfe, int* should_close) {
if ((pfe->mask & EPOLLERR) || (pfe->mask & EPOLLHUP)) {
if (pfe->mask & PinkEpoll::kError) {
*should_close = 1;
return Status::Corruption("EPOLLERR or EPOLLHUP");
}
Expand All @@ -119,7 +119,7 @@ Status BackendThread::ProcessConnectStatus(PinkFiredEvent* pfe, int* should_clos
}

void BackendThread::SetWaitConnectOnEpoll(int sockfd) {
pink_epoll_->PinkAddEvent(sockfd, EPOLLIN | EPOLLOUT);
pink_epoll_->PinkAddEvent(sockfd, PinkEpoll::kRead | PinkEpoll::kWrite);
connecting_fds_.insert(sockfd);
}

Expand Down Expand Up @@ -183,7 +183,7 @@ Status BackendThread::Connect(const std::string& dst_ip, const int dst_port, int
}

AddConnection(dst_ip, dst_port, sockfd);
pink_epoll_->PinkAddEvent(sockfd, EPOLLIN | EPOLLOUT);
pink_epoll_->PinkAddEvent(sockfd, PinkEpoll::kRead | PinkEpoll::kWrite);
struct sockaddr_in laddr;
socklen_t llen = sizeof(laddr);
getsockname(sockfd, (struct sockaddr*) &laddr, &llen);
Expand Down Expand Up @@ -248,7 +248,7 @@ void BackendThread::DoCronTask() {
if (keepalive_timeout_ > 0 &&
(now.tv_sec - conn->last_interaction().tv_sec > keepalive_timeout_)) {
log_info("Do cron task del fd %d\n", conn->fd());
pink_epoll_->PinkDelEvent(conn->fd());
pink_epoll_->PinkDelEvent(conn->fd(), 0);
close(conn->fd());
handle_->FdTimeoutHandle(conn->fd(), conn->ip_port());
if (conns_.count(conn->fd())) {
Expand Down Expand Up @@ -315,7 +315,7 @@ void BackendThread::NotifyClose(const int fd) {
}

void BackendThread::ProcessNotifyEvents(const PinkFiredEvent* pfe) {
if (pfe->mask & EPOLLIN) {
if (pfe->mask & PinkEpoll::kRead) {
char bb[2048];
int32_t nread = read(pink_epoll_->notify_receive_fd(), bb, 2048);
if (nread == 0) {
Expand All @@ -332,7 +332,7 @@ void BackendThread::ProcessNotifyEvents(const PinkFiredEvent* pfe) {
continue;
} else {
// connection exist
pink_epoll_->PinkModEvent(fd, 0, EPOLLOUT | EPOLLIN);
pink_epoll_->PinkModEvent(fd, 0, PinkEpoll::kRead | PinkEpoll::kWrite);
}
{
auto iter = to_send_.find(fd);
Expand All @@ -348,7 +348,7 @@ void BackendThread::ProcessNotifyEvents(const PinkFiredEvent* pfe) {
}
} else if (ti.notify_type() == kNotiClose) {
log_info("received kNotiClose\n");
pink_epoll_->PinkDelEvent(fd);
pink_epoll_->PinkDelEvent(fd, 0);
CloseFd(fd);
conns_.erase(fd);
connecting_fds_.erase(fd);
Expand Down Expand Up @@ -413,7 +413,7 @@ void *BackendThread::ThreadMain() {
if (iter == conns_.end()) {
mu_.Unlock();
log_info("fd %d not found in fd_conns\n", pfe->fd);
pink_epoll_->PinkDelEvent(pfe->fd);
pink_epoll_->PinkDelEvent(pfe->fd, 0);
continue;
}
mu_.Unlock();
Expand All @@ -428,11 +428,11 @@ void *BackendThread::ThreadMain() {
connecting_fds_.erase(pfe->fd);
}

if (!should_close && (pfe->mask & EPOLLOUT) && conn->is_reply()) {
if (!should_close && (pfe->mask & PinkEpoll::kWrite) && conn->is_reply()) {
WriteStatus write_status = conn->SendReply();
conn->set_last_interaction(now);
if (write_status == kWriteAll) {
pink_epoll_->PinkModEvent(pfe->fd, 0, EPOLLIN);
pink_epoll_->PinkModEvent(pfe->fd, 0, PinkEpoll::kRead);
conn->set_is_reply(false);
} else if (write_status == kWriteHalf) {
continue;
Expand All @@ -442,11 +442,11 @@ void *BackendThread::ThreadMain() {
}
}

if (!should_close && (pfe->mask & EPOLLIN)) {
if (!should_close && (pfe->mask & PinkEpoll::kRead)) {
ReadStatus read_status = conn->GetRequest();
conn->set_last_interaction(now);
if (read_status == kReadAll) {
// pink_epoll_->PinkModEvent(pfe->fd, 0, EPOLLOUT);
// pink_epoll_->PinkModEvent(pfe->fd, 0, PinkEpoll::kWrite);
} else if (read_status == kReadHalf) {
continue;
} else {
Expand All @@ -455,10 +455,10 @@ void *BackendThread::ThreadMain() {
}
}

if ((pfe->mask & EPOLLERR) || (pfe->mask & EPOLLHUP) || should_close) {
if ((pfe->mask & PinkEpoll::kError) || should_close) {
{
log_info("close connection %d reason %d %d\n", pfe->fd, pfe->mask, should_close);
pink_epoll_->PinkDelEvent(pfe->fd);
pink_epoll_->PinkDelEvent(pfe->fd, 0);
CloseFd(conn);
mu_.Lock();
conns_.erase(pfe->fd);
Expand Down
Loading

0 comments on commit ef5934d

Please sign in to comment.