Skip to content

Commit

Permalink
[FOUND PROBLEM]: Empty Keys
Browse files Browse the repository at this point in the history
  • Loading branch information
SudeepRed committed Nov 5, 2023
1 parent ad0c83a commit e4c8dc3
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 239 deletions.
2 changes: 1 addition & 1 deletion src/include/pgagroal.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,12 @@ struct query_cache
atomic_schar lock; /**< lock to protect the cache */
size_t size; /**< size of the cache */
struct hashTable* table;
int max_elements;
struct cachev2
{
struct hashEntry* key;
struct hashEntry* data;
} cache[100000];
int max_elements;

} __attribute__ ((aligned (64)));

Expand Down
138 changes: 27 additions & 111 deletions src/libpgagroal/pipeline_session.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ session_initialize(void* shmem, void** pipeline_shmem, size_t* pipeline_shmem_si
cache_key->kind = '\0';
cache_key->key_length = 0;
atomic_init(&cache_key->lock, STATE_FREE);
pgagroal_log_info("INIT LOCK STATE %d", cache_key->lock);

client_server_shmem = cache_key;

Expand Down Expand Up @@ -313,8 +312,6 @@ session_client(struct ev_loop* loop, struct ev_io* watcher, int revents)
struct query_cache* cache;
cache = (struct query_cache*)query_cache_shmem;

pgagroal_log_info("CACHE_SHMEM LOCK STATE %d, client slot %d", client_query->lock, wi->slot);

client_active(wi->slot);

if (wi->client_ssl == NULL)
Expand All @@ -336,123 +333,65 @@ session_client(struct ev_loop* loop, struct ev_io* watcher, int revents)
cache_is_free = STATE_FREE;
if (atomic_compare_exchange_strong(&client_query->lock, &cache_is_free, STATE_IN_USE))
{
// char* query_with_semicolon = msg->data + 5;
if (msg->kind == 'Q')
{
pgagroal_log_debug("Q message found");

pgagroal_log_info("Lock acquired by client %d", wi->client_fd);
char* query_with_semicolon = msg->data + 5;

pgagroal_log_debug("msg: kind %c, strlen %d", msg->kind, strlen(msg->data + 5));

// find total length of string 24
if (msg->kind == 'Q' && query_with_semicolon[strlen(query_with_semicolon) - 1] == ';')
{
size_t key_length = strlen(msg->data + 5);

// allocate 25 bytes for key \0
// client_query->key = malloc(key_length + 1);
// if (client_query->key == NULL)
// {
// pgagroal_log_fatal("failed to allocate memory for key");
// client_inactive(wi->slot);
// ev_break(loop, EVBREAK_ONE);
// atomic_store(&client_query->lock, STATE_FREE);
// return;
// }
// // set 25 bytes to 0
memset(client_query->key, 0, QUERY_KEY_SIZE);
memcpy(client_query->key, msg->data + 5, key_length);
client_query->key[key_length + 1] = '\0';

// copy key to cache_key->key
// strcpy(client_query->key, msg->data + 5);
// client_query->key[strlen(client_query->key)] = '\0';

client_query->key_length = key_length;
client_query->kind = msg->kind;

// pgagroal_log_info("print using shmem %s, %d", client_query->key, client_query->key_length);

struct hashEntry* key = NULL;
key = (struct hashEntry*)malloc(sizeof(struct hashEntry) + key_length + 1);
key = (struct hashEntry*)malloc(sizeof(struct hashEntry) + client_query->key_length + 1);

if (key == NULL)
{
pgagroal_log_fatal("failed to allocate memory for key");

client_inactive(wi->slot);
ev_break(loop, EVBREAK_ONE);

atomic_store(&client_query->lock, STATE_FREE);
return;
}
// allocate 25 bytes for key->value
// key->value = (char*)malloc(key_length + 1);
// if (key->value == NULL)
// {
// pgagroal_log_fatal("failed to allocate memory for key");
// client_inactive(wi->slot);
// ev_break(loop, EVBREAK_ONE);
// atomic_store(&client_query->lock, STATE_FREE);

// return;
// }

// initialize 25 memory blocks properly
memset(key->key, 0, key_length + 1);
memset(key->key, 0, client_query->key_length + 1);

memcpy(key->key, msg->data + 5, key_length);
key->key[key_length + 1] = '\0';
memcpy(key->key, client_query->key, key_length);
key->key[client_query->key_length + 1] = '\0';

key->length = key_length;
key->length = client_query->key_length;
struct hashEntry* s = pgagroal_query_cache_get(cache, &(cache->table), key);
// struct hashEntry* s = NULL;

// free memory for key->value and key

free(key);

if (s != NULL && s->value != NULL)
{
pgagroal_log_info("USING CACHED ENTRY");
pgagroal_log_warn("CACHE_QUERY LOCK STATE %d", cache->lock);

struct message* result = NULL;
pgagroal_log_warn("CACHED_ENTRY %s, %d", s->value, s->length);

int x = pgagroal_create_message(s->value, s->length, &result);
// log
pgagroal_log_fatal("DONE: %d", x);
// pgagroal_log_info("res: %s, %d", result->data, result->length);

status = pgagroal_write_socket_message(wi->client_fd, result);
pgagroal_log_info("status %d", status);

client_inactive(wi->slot);

pgagroal_log_debug("CACHE IS IN USE");
pgagroal_log_debug("CACHE_QUERY STATE %d", cache->lock);
//release lock
ev_break(loop, EVBREAK_ONE);
if (pgagroal_create_message(s->value, s->length, &result))
{
status = pgagroal_write_socket_message(wi->client_fd, result);
client_inactive(wi->slot);
ev_break(loop, EVBREAK_ONE);
atomic_store(&client_query->lock, STATE_FREE);
return;
}

atomic_store(&client_query->lock, STATE_FREE);
return;
}

pgagroal_log_info("CACHE MISS");

}
atomic_store(&client_query->lock, STATE_FREE);
}
else
{
pgagroal_log_debug("CACHE IS IN USE %d", client_query->lock);
pgagroal_log_debug("CACHE_QUERY STATE %d", cache->lock);
SLEEP_AND_GOTO(1000000L, retry_cache_get);
}
// pgagroal_log_warn("CACHE LOCK STATE %d, shq %s, msgQ %s", client_query->lock, client_query->key, msg->data + 5);
pgagroal_log_warn("OUT");
pgagroal_log_warn("CACHE_SHMEM LOCK STATE %d", client_query->lock);
pgagroal_log_warn("CACHE_QUERY LOCK STATE %d", cache->lock);

pgagroal_log_info("---------------------------------------------------------------------------");
int offset = 0;

while (offset < msg->length)
Expand Down Expand Up @@ -602,7 +541,6 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents)
struct client_server_cache* client_query = NULL;
client_query = (struct client_server_cache*)client_server_shmem;

//Query Cache
struct query_cache* cache;
cache = (struct query_cache*)query_cache_shmem;

Expand All @@ -629,27 +567,21 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents)
cache_is_free = STATE_FREE;
if (atomic_compare_exchange_strong(&client_query->lock, &cache_is_free, STATE_IN_USE))
{
pgagroal_log_info("SERVER Lock acquired by client %d", wi->client_fd);

if (msg->kind == 'T' && client_query->kind == 'Q' && strlen(client_query->key) == client_query->key_length)
if (msg->kind == 'T' && client_query->kind == 'Q')
{
// pgagroal_log_warn("INSIDE SERVER msg %c, %s", client_query->kind, client_query->key);

struct message* tmp = NULL;
if (!pgagroal_extract_message('Z', msg, &tmp))
{
pgagroal_log_info("START");
pgagroal_log_info("MSG Size %d", msg->length);
pgagroal_log_info("KEY Size %d", client_query->key_length);
// pgagroal_log_info("KEY %s", client_query->key);

struct hashEntry* key, * data;
key = (struct hashEntry*)malloc(sizeof(struct hashEntry) + client_query->key_length + 1);
data = (struct hashEntry*)malloc(sizeof(struct hashEntry) + msg->length);

// Check if memory allocation was successful
if (key == NULL || data == NULL)
{
pgagroal_log_fatal("memory allocaltion failed");

if (key != NULL)
{
free(key);
Expand All @@ -662,23 +594,17 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents)
}
client_inactive(wi->slot);
ev_break(loop, EVBREAK_ONE);
//release lock

atomic_store(&client_query->lock, STATE_FREE);
return;
}

// Use memset to initialize the memory blocks properly

// pgagroal_log_info("str len, size %d, %d, %s", client_query->key_length, strlen(client_query->key), client_query->key);

// key->value = (char*)malloc(client_query->key_length + 1);
data->value = malloc(msg->length);
memset(key->key, 0, client_query->key_length + 1);

// Check if memory allocation for key->value and data->value was successful
if (data->value == NULL)
{
pgagroal_log_fatal("memory allocaltion failed");

if (data->value != NULL)
{
free(data->value);
Expand All @@ -691,7 +617,7 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents)
}
client_inactive(wi->slot);
ev_break(loop, EVBREAK_ONE);
//release lock

atomic_store(&client_query->lock, STATE_FREE);
return;
}
Expand All @@ -704,35 +630,26 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents)
memcpy(data->value, msg->data, msg->length);
data->length = msg->length;

int cache_add_status = pgagroal_query_cache_add(cache, &(cache->table), data, key, 1);
pgagroal_log_info("CACHE ADD STATUS: %d", cache_add_status);
pgagroal_query_cache_add(cache, &(cache->table), data, key, 1);

atomic_store(&client_query->lock, STATE_FREE);
free(data);
free(key);

}
else
{
pgagroal_log_info("FAILED TO FIND Z MESSAGE");
atomic_store(&client_query->lock, STATE_FREE);
}
}
else
{
pgagroal_log_info("MESSAGE DID NOT HAVE T (ROW DESCRIPTION)");
atomic_store(&client_query->lock, STATE_FREE);
}
}
else
{
pgagroal_log_debug("CACHE IS IN USE");
SLEEP_AND_GOTO(1000000L, retry_cache_add)
}
pgagroal_log_warn("OUT");
pgagroal_log_warn("CACHE_SHMEM LOCK STATE %d", client_query->lock);
pgagroal_log_warn("CACHE_QUERY LOCK STATE %d", cache->lock);

pgagroal_log_info("---------------------------------------------------------------------------");
client_query->kind = '\0';

int offset = 0;

Expand All @@ -743,7 +660,6 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents)
char kind = pgagroal_read_byte(msg->data + offset);
int length = pgagroal_read_int32(msg->data + offset + 1);

/* The Z message tell us the transaction state */
if (kind == 'Z')
{
char tx_state = pgagroal_read_byte(msg->data + offset + 5);
Expand Down
Loading

0 comments on commit e4c8dc3

Please sign in to comment.