diff --git a/src/include/pgagroal.h b/src/include/pgagroal.h index e32c2bec..4271d0cc 100644 --- a/src/include/pgagroal.h +++ b/src/include/pgagroal.h @@ -436,12 +436,12 @@ struct query_cache atomic_schar lock; /**< lock to protect the cache */ size_t size; /**< size of the cache */ struct hashTable* table; + int max_elements; struct cachev2 { struct hashEntry* key; struct hashEntry* data; } cache[100000]; - int max_elements; } __attribute__ ((aligned (64))); diff --git a/src/libpgagroal/pipeline_session.c b/src/libpgagroal/pipeline_session.c index 5e07f3c0..08c23ecc 100644 --- a/src/libpgagroal/pipeline_session.c +++ b/src/libpgagroal/pipeline_session.c @@ -118,7 +118,6 @@ session_initialize(void* shmem, void** pipeline_shmem, size_t* pipeline_shmem_si cache_key->kind = '\0'; cache_key->key_length = 0; atomic_init(&cache_key->lock, STATE_FREE); - pgagroal_log_info("INIT LOCK STATE %d", cache_key->lock); client_server_shmem = cache_key; @@ -313,8 +312,6 @@ session_client(struct ev_loop* loop, struct ev_io* watcher, int revents) struct query_cache* cache; cache = (struct query_cache*)query_cache_shmem; - pgagroal_log_info("CACHE_SHMEM LOCK STATE %d, client slot %d", client_query->lock, wi->slot); - client_active(wi->slot); if (wi->client_ssl == NULL) @@ -336,123 +333,65 @@ session_client(struct ev_loop* loop, struct ev_io* watcher, int revents) cache_is_free = STATE_FREE; if (atomic_compare_exchange_strong(&client_query->lock, &cache_is_free, STATE_IN_USE)) { - // char* query_with_semicolon = msg->data + 5; - if (msg->kind == 'Q') - { - pgagroal_log_debug("Q message found"); - - pgagroal_log_info("Lock acquired by client %d", wi->client_fd); + char* query_with_semicolon = msg->data + 5; - pgagroal_log_debug("msg: kind %c, strlen %d", msg->kind, strlen(msg->data + 5)); - - // find total length of string 24 + if (msg->kind == 'Q' && query_with_semicolon[strlen(query_with_semicolon) - 1] == ';') + { size_t key_length = strlen(msg->data + 5); - // allocate 25 bytes for key \0 - // client_query->key = malloc(key_length + 1); - // if (client_query->key == NULL) - // { - // pgagroal_log_fatal("failed to allocate memory for key"); - // client_inactive(wi->slot); - // ev_break(loop, EVBREAK_ONE); - // atomic_store(&client_query->lock, STATE_FREE); - // return; - // } - // // set 25 bytes to 0 memset(client_query->key, 0, QUERY_KEY_SIZE); memcpy(client_query->key, msg->data + 5, key_length); client_query->key[key_length + 1] = '\0'; - // copy key to cache_key->key - // strcpy(client_query->key, msg->data + 5); - // client_query->key[strlen(client_query->key)] = '\0'; - client_query->key_length = key_length; client_query->kind = msg->kind; - // pgagroal_log_info("print using shmem %s, %d", client_query->key, client_query->key_length); - struct hashEntry* key = NULL; - key = (struct hashEntry*)malloc(sizeof(struct hashEntry) + key_length + 1); + key = (struct hashEntry*)malloc(sizeof(struct hashEntry) + client_query->key_length + 1); + if (key == NULL) { - pgagroal_log_fatal("failed to allocate memory for key"); + client_inactive(wi->slot); ev_break(loop, EVBREAK_ONE); atomic_store(&client_query->lock, STATE_FREE); return; } - // allocate 25 bytes for key->value - // key->value = (char*)malloc(key_length + 1); - // if (key->value == NULL) - // { - // pgagroal_log_fatal("failed to allocate memory for key"); - // client_inactive(wi->slot); - // ev_break(loop, EVBREAK_ONE); - // atomic_store(&client_query->lock, STATE_FREE); - - // return; - // } - // initialize 25 memory blocks properly - memset(key->key, 0, key_length + 1); + memset(key->key, 0, client_query->key_length + 1); - memcpy(key->key, msg->data + 5, key_length); - key->key[key_length + 1] = '\0'; + memcpy(key->key, client_query->key, key_length); + key->key[client_query->key_length + 1] = '\0'; - key->length = key_length; + key->length = client_query->key_length; struct hashEntry* s = pgagroal_query_cache_get(cache, &(cache->table), key); - // struct hashEntry* s = NULL; - - // free memory for key->value and key - - free(key); if (s != NULL && s->value != NULL) { - pgagroal_log_info("USING CACHED ENTRY"); - pgagroal_log_warn("CACHE_QUERY LOCK STATE %d", cache->lock); struct message* result = NULL; - pgagroal_log_warn("CACHED_ENTRY %s, %d", s->value, s->length); - - int x = pgagroal_create_message(s->value, s->length, &result); - // log - pgagroal_log_fatal("DONE: %d", x); - // pgagroal_log_info("res: %s, %d", result->data, result->length); - status = pgagroal_write_socket_message(wi->client_fd, result); - pgagroal_log_info("status %d", status); - - client_inactive(wi->slot); - - pgagroal_log_debug("CACHE IS IN USE"); - pgagroal_log_debug("CACHE_QUERY STATE %d", cache->lock); - //release lock - ev_break(loop, EVBREAK_ONE); + if (pgagroal_create_message(s->value, s->length, &result)) + { + status = pgagroal_write_socket_message(wi->client_fd, result); + client_inactive(wi->slot); + ev_break(loop, EVBREAK_ONE); + atomic_store(&client_query->lock, STATE_FREE); + return; + } atomic_store(&client_query->lock, STATE_FREE); - return; } - pgagroal_log_info("CACHE MISS"); - } atomic_store(&client_query->lock, STATE_FREE); } else { - pgagroal_log_debug("CACHE IS IN USE %d", client_query->lock); - pgagroal_log_debug("CACHE_QUERY STATE %d", cache->lock); SLEEP_AND_GOTO(1000000L, retry_cache_get); } - // pgagroal_log_warn("CACHE LOCK STATE %d, shq %s, msgQ %s", client_query->lock, client_query->key, msg->data + 5); - pgagroal_log_warn("OUT"); - pgagroal_log_warn("CACHE_SHMEM LOCK STATE %d", client_query->lock); - pgagroal_log_warn("CACHE_QUERY LOCK STATE %d", cache->lock); - pgagroal_log_info("---------------------------------------------------------------------------"); int offset = 0; while (offset < msg->length) @@ -602,7 +541,6 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents) struct client_server_cache* client_query = NULL; client_query = (struct client_server_cache*)client_server_shmem; - //Query Cache struct query_cache* cache; cache = (struct query_cache*)query_cache_shmem; @@ -629,27 +567,21 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents) cache_is_free = STATE_FREE; if (atomic_compare_exchange_strong(&client_query->lock, &cache_is_free, STATE_IN_USE)) { - pgagroal_log_info("SERVER Lock acquired by client %d", wi->client_fd); - if (msg->kind == 'T' && client_query->kind == 'Q' && strlen(client_query->key) == client_query->key_length) + if (msg->kind == 'T' && client_query->kind == 'Q') { - // pgagroal_log_warn("INSIDE SERVER msg %c, %s", client_query->kind, client_query->key); + struct message* tmp = NULL; if (!pgagroal_extract_message('Z', msg, &tmp)) { - pgagroal_log_info("START"); - pgagroal_log_info("MSG Size %d", msg->length); - pgagroal_log_info("KEY Size %d", client_query->key_length); - // pgagroal_log_info("KEY %s", client_query->key); struct hashEntry* key, * data; key = (struct hashEntry*)malloc(sizeof(struct hashEntry) + client_query->key_length + 1); data = (struct hashEntry*)malloc(sizeof(struct hashEntry) + msg->length); - // Check if memory allocation was successful if (key == NULL || data == NULL) { - pgagroal_log_fatal("memory allocaltion failed"); + if (key != NULL) { free(key); @@ -662,23 +594,17 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents) } client_inactive(wi->slot); ev_break(loop, EVBREAK_ONE); - //release lock + atomic_store(&client_query->lock, STATE_FREE); return; } - // Use memset to initialize the memory blocks properly - - // pgagroal_log_info("str len, size %d, %d, %s", client_query->key_length, strlen(client_query->key), client_query->key); - - // key->value = (char*)malloc(client_query->key_length + 1); data->value = malloc(msg->length); memset(key->key, 0, client_query->key_length + 1); - // Check if memory allocation for key->value and data->value was successful if (data->value == NULL) { - pgagroal_log_fatal("memory allocaltion failed"); + if (data->value != NULL) { free(data->value); @@ -691,7 +617,7 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents) } client_inactive(wi->slot); ev_break(loop, EVBREAK_ONE); - //release lock + atomic_store(&client_query->lock, STATE_FREE); return; } @@ -704,35 +630,26 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents) memcpy(data->value, msg->data, msg->length); data->length = msg->length; - int cache_add_status = pgagroal_query_cache_add(cache, &(cache->table), data, key, 1); - pgagroal_log_info("CACHE ADD STATUS: %d", cache_add_status); + pgagroal_query_cache_add(cache, &(cache->table), data, key, 1); + atomic_store(&client_query->lock, STATE_FREE); - free(data); - free(key); } else { - pgagroal_log_info("FAILED TO FIND Z MESSAGE"); atomic_store(&client_query->lock, STATE_FREE); } } else { - pgagroal_log_info("MESSAGE DID NOT HAVE T (ROW DESCRIPTION)"); atomic_store(&client_query->lock, STATE_FREE); } } else { - pgagroal_log_debug("CACHE IS IN USE"); SLEEP_AND_GOTO(1000000L, retry_cache_add) } - pgagroal_log_warn("OUT"); - pgagroal_log_warn("CACHE_SHMEM LOCK STATE %d", client_query->lock); - pgagroal_log_warn("CACHE_QUERY LOCK STATE %d", cache->lock); - - pgagroal_log_info("---------------------------------------------------------------------------"); + client_query->kind = '\0'; int offset = 0; @@ -743,7 +660,6 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents) char kind = pgagroal_read_byte(msg->data + offset); int length = pgagroal_read_int32(msg->data + offset + 1); - /* The Z message tell us the transaction state */ if (kind == 'Z') { char tx_state = pgagroal_read_byte(msg->data + offset + 5); diff --git a/src/libpgagroal/query_cache.c b/src/libpgagroal/query_cache.c index 47541dae..65f117bf 100644 --- a/src/libpgagroal/query_cache.c +++ b/src/libpgagroal/query_cache.c @@ -45,7 +45,7 @@ #include #define QUERY_CACHE_MAX_SIZE 1024 * 1024 * 256 -#define QUERY_CACHE_MAX_ENTRIES 1000 +#define QUERY_CACHE_MAX_ENTRIES 100000 #define HASH_ENTRY_DATA_SIZE 1024 * 1024 //1MB #define HASH_ENTRY_KEY_SIZE 1024 //1KB @@ -95,24 +95,25 @@ pgagroal_query_cache_get(struct query_cache* cache, struct hashTable** Table, st { signed char cache_is_free; retry_get: - pgagroal_log_info("GET"); cache_is_free = STATE_FREE; if (atomic_compare_exchange_strong(&cache->lock, &cache_is_free, STATE_IN_USE)) { - // pgagroal_log_info("START: GETTING cache entry with key: %s ", key->key); + + pgagroal_log_fatal("NUMBER OF ELEMENTS %d", cache->max_elements); for (int i = 0; i < cache->max_elements; i++) { - // pgagroal_log_info("GET %s", cache->cache[i].key->key); int x = strncmp(cache->cache[i].key->key, key->key, key->length); - // pgagroal_log_info("GET X %d", x); + pgagroal_log_warn("from cache GET to comapre %d, %s, %s, %d", x, key->key, cache->cache[i].key->key, i); if (x == 0) { + // Key found, return the corresponding data + atomic_store(&cache->lock, STATE_FREE); return cache->cache[i].data; @@ -125,7 +126,7 @@ pgagroal_query_cache_get(struct query_cache* cache, struct hashTable** Table, st } else { - // pgagroal_log_info("LOCKED GET"); + SLEEP_AND_GOTO(1000000L, retry_get) } } @@ -147,7 +148,7 @@ pgagroal_query_cache_invalidate(struct hashTable** Table, struct hashEntry* key) int pgagroal_query_cache_update(struct hashTable** Table, struct hashEntry* key, struct hashEntry* data) { - pgagroal_log_info("Updating cache entry with key: %s and data: %s", key->value, data->value); + struct hashTable* s = NULL; void* qkey = malloc(strlen(key->value) + 1); strcpy(qkey, key->value); @@ -173,9 +174,9 @@ pgagroal_query_cache_add(struct query_cache* cache, struct hashTable** Table, st { if (cache->max_elements >= QUERY_CACHE_MAX_ENTRIES) { - pgagroal_log_info("fail %d", cache->max_elements); + pgagroal_log_warn("Cache is full %d", cache->max_elements); - return -1; // Cache is full + return -1; } signed char cache_is_free; @@ -183,10 +184,6 @@ pgagroal_query_cache_add(struct query_cache* cache, struct hashTable** Table, st cache_is_free = STATE_FREE; if (atomic_compare_exchange_strong(&cache->lock, &cache_is_free, STATE_IN_USE)) { - /* - add key and data to the cache - */ - // pgagroal_log_info("START: Adding cache entry with key: %s and data: %s", key->key, data->value); struct hashEntry* copy_key = NULL; struct hashEntry* copy_data = NULL; @@ -203,10 +200,26 @@ pgagroal_query_cache_add(struct query_cache* cache, struct hashTable** Table, st memset(copy_data->value, 0, data->length); memcpy(copy_data->value, data->value, data->length); copy_data->length = data->length; + int idx = cache->max_elements; + pgagroal_log_info("Adding to cache %d", idx); + + cache->cache[idx].key = copy_key; + cache->cache[idx].data = copy_data; + cache->max_elements = idx + 1; + pgagroal_log_info("fromcache to cache %s", cache->cache[idx].key->key); + pgagroal_log_info("fromcache to parameter %s", key->key); + for (int i = 0; i < cache->max_elements; i++) + { + if (cache->cache[i].key == NULL) + { + pgagroal_log_info("from cache GET to comapre %s", "NULL"); + continue; + } + char t[cache->cache[i].key->length]; + strcpy(t, cache->cache[i].key->key); + pgagroal_log_info("from cache GET to comapre %s", cache->cache[i].key->key); - cache->cache[cache->max_elements].key = copy_key; - cache->cache[cache->max_elements].data = copy_data; - cache->max_elements = cache->max_elements + 1; + } atomic_store(&cache->lock, STATE_FREE); return 1; @@ -233,115 +246,5 @@ pgagroal_query_cache_clear(struct hashTable** Table) void pgagroal_query_cache_test(void) { - // struct query_cache* cache; - - // cache = (struct query_cache*)query_cache_shmem; - - // struct hashEntry* key, * data; - // key = (struct hashEntry*)malloc(sizeof *key); - // memset(key, 0, sizeof *key); - // data = (struct hashEntry*)malloc(sizeof *data); - // memset(key, 0, sizeof *data); - // key->value = "key"; - // key->length = strlen(key->value); - // data->value = "data"; - // data->length = strlen(data->value); - // pgagroal_log_info("Add cache entry with key: key and data: data"); - // int x = pgagroal_query_cache_add(cache, &(cache->table), data, key, 0); - // if (x == 0) - // { - // pgagroal_log_info("Key already exists"); - // } - // else - // { - // pgagroal_log_info("Key added"); - // } - // pgagroal_log_info("Add cache entry with key: key and data: data"); - // x = pgagroal_query_cache_add(cache, &(cache->table), data, key, 0); - // if (x == 0) - // { - // pgagroal_log_info("Key already exists"); - // } - // else - // { - // pgagroal_log_info("Key added"); - // } - // pgagroal_log_info("Get cache entry with key: key"); - // struct hashTable* resp = pgagroal_query_cache_get(cache, &(cache->table), key); - // if (resp != NULL) - // { - // pgagroal_log_info("resp: %s", resp->data->value); - // } - // else - // { - // pgagroal_log_info("Key not found in cache"); - // } - // pgagroal_log_info("Update cache entry with key: key and data: new data"); - // struct hashEntry* ndata, * nKey; - // ndata = (struct hashEntry*)malloc(sizeof *ndata); - // ndata->value = "new data"; - // ndata->length = strlen(ndata->value); - // nKey = (struct hashEntry*)malloc(sizeof *nKey); - // nKey->value = "newKey"; - // nKey->length = strlen(nKey->value); - // pgagroal_query_cache_update(&(cache->table), key, ndata); - // resp = pgagroal_query_cache_get(cache, &(cache->table), key); - // if (resp) - // { - // pgagroal_log_info("resp: %s", resp->data->value); - // } - // else - // { - // pgagroal_log_info("Key not found in cache"); - // } - // pgagroal_log_info("Invalidate cache entry with key: key"); - // pgagroal_query_cache_invalidate(&(cache->table), key); - // pgagroal_log_info("Get cache entry with key: key"); - // resp = pgagroal_query_cache_get(cache, &(cache->table), key); - // if (resp) - // { - // pgagroal_log_info("resp: %s", resp->data->value); - // } - // else - // { - // pgagroal_log_info("Key not found in cache"); - // } - // pgagroal_log_info("Add cache entry with key: key and data: data"); - // pgagroal_query_cache_add(cache, &(cache->table), data, key, 0); - // pgagroal_log_info("Add cache entry with key: newKey and data: new-data"); - // pgagroal_query_cache_add(cache, &(cache->table), ndata, nKey, 0); - // pgagroal_log_info("Get cache entry with key: newKey"); - // resp = pgagroal_query_cache_get(cache, &(cache->table), nKey); - // if (resp) - // { - // pgagroal_log_info("resp: %s", resp->data->value); - // } - // else - // { - // pgagroal_log_info("Key not found in cache"); - // } - // pgagroal_log_info("Clear cache"); - // pgagroal_query_cache_clear(&(cache->table)); - // pgagroal_log_info("Get cache entry with key: key"); - - // resp = pgagroal_query_cache_get(cache, &(cache->table), nKey); - // if (resp) - // { - // pgagroal_log_info("resp: %s", resp->data->value); - // } - // else - // { - // pgagroal_log_info("Key not found in cache"); - // } - // pgagroal_log_info("Get cache entry with key: newKey"); - // resp = pgagroal_query_cache_get(cache, &(cache->table), key); - // if (resp) - // { - // pgagroal_log_info("resp: %s", resp->data->value); - // } - // else - // { - // pgagroal_log_info("Key not found in cache"); - // } - + return; }