Skip to content

Commit

Permalink
Refine commands and move Error Macros in jhelper.h
Browse files Browse the repository at this point in the history
  • Loading branch information
JBenda committed Jan 18, 2023
1 parent af5f241 commit 46934e2
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 140 deletions.
63 changes: 63 additions & 0 deletions include/core/jhelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,69 @@

G_BEGIN_DECLS

/**
* Execudes a command which returns true on success.
* on error a warning is written and jumps to <err_label> in the form:
* EXE failed at <File>:<Line>: with
* "<format string>", args
*
* \param cmd command to execute
* \param err_label label to jump to, when failed
* \param ... warning message in form: <format string>, args
**/
#define EXE(cmd, err_label, ...) \
do \
{ \
if ((cmd) == FALSE) \
{ \
g_warning("EXE failed at %s:%d with:", __FILE__, __LINE__); \
g_warning(__VA_ARGS__); \
goto err_label; \
} \
} while (FALSE)

/**
* Checks if <res> is an non negative number, if it negative jumps to <err_label>
* and print a warning in the form:
* CHECK failed at <File>:<Line> with (<res>):
* "<format string>", args
*
* \param res result value
* \param err_label label to jump to at error
* \param ... warning message in form: <format string>, args
**/
#define CHECK(res, err_label, ...) \
do \
{ \
if (res < 0) \
{ \
g_warning("CHECK failed at %s:%d with (%d):", __FILE__, __LINE__, res); \
g_warning(__VA_ARGS__); \
goto err_label; \
} \
} while (FALSE)

/**
* Check if <error> is an filled GError, if print warning and jumps to <err_label>
* warnings has the form:
* CHECK failed at <File>:<Line> with:
* "<format string>", args
*
* \param error GError
* \param err_label label to jump to at error
* \param ... warning message in form: <format sting>, args
**/
#define CHECK_GERROR(error, err_label, ...) \
do \
{ \
if (error != NULL) \
{ \
g_warning("CHECK failed at %s:%d with:\n\t%s", __FILE__, __LINE__, error->message); \
g_error_free(error); \
goto err_label; \
} \
} while (FALSE)

/**
* \defgroup JHelper Helper
*
Expand Down
19 changes: 10 additions & 9 deletions include/core/jmanagedbackends.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,23 +164,24 @@ typedef struct
* \param[in] obj_id continuous index which is for the live time of a object unique
* \param[in] tier identifier for the storage tier
* the object is currently stored on
* \param[in] access type of access performed \sa JObjectBackendAccessType
* \param[in] access type of access performed \sa JObjectBackendAccessType
* \param[in] data corresponding to the access type.
* For more details see \ref JObjectBackendAccessDetails
**/
gboolean (*process_access)(gpointer policy_data, const gchar* namespace, const gchar* path, guint obj_id, guint tier, JObjectBackendAccessTypes access, gconstpointer data);

/// short processing period to match new object to a storage tier
/** \retval FALSE on error
* \sa j_backend_stack_get_tiers */
/**
* Signals a new object creation to the policy, sets instal storage tier.
*
* \attention Ensure a short processing period because it blocks the write process
*
* \param[inout] policy_data pointer usage depends on policy
* \param[in] namespace of the object which will be created
* \param[in] path of the object which will be created
* \param[in] obj_id \ref process_access
* \param[out] storage_tier ID of storage tier to create object on
*
* \sa j_backend_stack_get_tiers
**/
gboolean (*process_create)(gpointer policy_data, const gchar* namespace, const gchar* path, guint obj_id, guint* storage_tier);

Expand Down Expand Up @@ -251,8 +252,8 @@ gboolean j_backend_managed_policy_process(JManagedBackends* this, gboolean* keep

/**
* Fetches backend where a given object is stored. Also blocks migration for that object.
* Use j_backend_managed_object_open() to allow migration again. If the object not already exists
* use j_backend_managed_object_create() to create it
* Use j_backend_managed_object_close() to allow migration again.
* If the object not already exists use j_backend_managed_object_create() to create it
*
* \param[in] this JManagedBackends instance
* \param[in] namespace of the object
Expand Down Expand Up @@ -330,17 +331,17 @@ guint j_backend_managed_get_tier(JManagedBackends* this, const gchar* namespace,
gboolean j_backend_managed_object_migrate(JManagedBackends* this, const gchar* namespace, const gchar* path, guint dest);

/**
* If resources a free migrates the specified object to the destination tier, if not return.
* If resources are free migrates the specified object to the destination tier, if not return.
*
* \retval FALSE on error or if resources are busy
**/
gboolean j_backend_managed_object_migrate_if_free(JManagedBackends* this, const gchar* namespace, const gchar* path, guint dest);

/**
* Stops all migrations at all backends, used for maintenance.
* use j_backend_stack_unlock() to re-enable migration
* use j_backend_managed_unlock() to re-enable migration
*
* \attention changing object locations or editing them might crashes the policy!
* \attention changing object locations or editing them might crash the policy!
*
* \param[in] this JManagedBackends instance
* \param[inout] address to backends array of backends managed, set to NULL if not needed
Expand Down
78 changes: 41 additions & 37 deletions lib/core/jmanagedbackends.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <core/jsemantics.h>
#include <core/jtrace.h>
#include <core/jconfiguration.h>
#include <core/jhelper.h>

#include <julea-config.h>

Expand All @@ -37,17 +38,6 @@
#include <sys/select.h>
#include <sys/time.h>

#define EXE(cmd, ...) \
do \
{ \
if ((cmd) == FALSE) \
{ \
g_warning("EXE failed at %s:%d with:", __FILE__, __LINE__); \
g_warning(__VA_ARGS__); \
goto end; \
} \
} while (FALSE)

/// an read write spin lock to avoid data access on migration
typedef struct
{
Expand Down Expand Up @@ -402,9 +392,11 @@ j_backend_managed_init(JConfiguration* config, JList* object_backends, JManagedB
J_BACKEND_TYPE_KV,
&this->kv_module,
&this->kv_store),
end,
"failed to create kv for backend manager!");
EXE(this->kv_store->kv.backend_init(j_configuration_get_object_policy_kv_path(config),
&this->kv_store->data),
end,
"failed to init kv for backend manager!");
this->kv_semantics = j_semantics_new(J_SEMANTICS_TEMPLATE_DEFAULT);

Expand Down Expand Up @@ -464,34 +456,30 @@ j_backend_managed_object_open(JManagedBackends* this,
struct KVEntry* entry;
struct JBackendWrapper* wrapper;
gboolean ret = FALSE;
guint generation;

if (kv_get(this, namespace, path, &entry))
if (!kv_get(this, namespace, path, &entry))
{
guint generation;

generation = read_lock(&g_array_index(this->rw_spin_locks, RWSpinLock, entry->lock_id));
if (generation != entry->generation)
{
EXE(kv_get(this, namespace, path, &entry), "failed to fetch kvEntry for open");
}

*backend = g_memdup2(this->object_backend[entry->backend_id], sizeof(struct JBackendWrapper));
wrapper = (void*)*backend;
wrapper->scope = (JManagedBackendScope){
.mem = wrapper,
.lock_id = entry->lock_id,
.tier = entry->backend_id,
.stack = this,
.namespace = namespace,
.path = path,
};
*scope = &wrapper->scope;
goto end;
}
else

generation = read_lock(&g_array_index(this->rw_spin_locks, RWSpinLock, entry->lock_id));
if (generation != entry->generation)
{
EXE(j_backend_managed_object_create(this, namespace, path, backend, scope),
"failed to create new entry");
EXE(kv_get(this, namespace, path, &entry), end, "failed to fetch kvEntry for open");
}

*backend = g_memdup2(this->object_backend[entry->backend_id], sizeof(struct JBackendWrapper));
wrapper = (void*)*backend;
wrapper->scope = (JManagedBackendScope){
.mem = wrapper,
.lock_id = entry->lock_id,
.tier = entry->backend_id,
.stack = this,
.namespace = namespace,
.path = path,
};
*scope = &wrapper->scope;
ret = TRUE;
end:
return ret;
Expand All @@ -518,6 +506,7 @@ j_backend_managed_object_create(JManagedBackends* this,

// ask policy for tier
EXE(this->policy->process_create(this->policy->data, namespace, path, lock_id, &tier),
end,
"failed to match storage tier for new object!");
*backend = g_memdup2(this->object_backend[tier], sizeof(struct JBackendWrapper));
wrapper = (void*)*backend;
Expand All @@ -540,6 +529,7 @@ j_backend_managed_object_create(JManagedBackends* this,
};

EXE(kv_put(this, namespace, path, &entry),
end,
"failed to store new object in kv");

wrapper->scope = (JManagedBackendScope){
Expand Down Expand Up @@ -593,6 +583,7 @@ j_backend_managed_get_tier(JManagedBackends* this,
{
struct KVEntry* entry;
EXE(kv_get(this, namespace, path, &entry),
end,
"failed to fetch kventry");
return entry->backend_id;
end:
Expand All @@ -605,10 +596,13 @@ kv_put(JManagedBackends* this, const gchar* namespace, const gchar* key,
{
gpointer batch;
EXE(j_backend_kv_batch_start(this->kv_store, namespace, this->kv_semantics, &batch),
end,
"failed do start kv batch");
EXE(j_backend_kv_put(this->kv_store, batch, key, entry, sizeof(struct KVEntry)),
end,
"failed batch put value command");
EXE(j_backend_kv_batch_execute(this->kv_store, batch),
end,
"failed to execute put value batch");
return TRUE;
end:
Expand All @@ -623,9 +617,11 @@ kv_get(JManagedBackends* this, const gchar* namespace, const gchar* key,
guint32 len;
gboolean ret;
EXE(j_backend_kv_batch_start(this->kv_store, namespace, this->kv_semantics, &batch),
end,
"failed do start kv batch");
ret = j_backend_kv_get(this->kv_store, batch, key, (gpointer*)entry, &len);
EXE(j_backend_kv_batch_execute(this->kv_store, batch),
end,
"failed to execute get value batch");
#ifndef NDEBUG
if (len != sizeof(struct KVEntry))
Expand All @@ -645,9 +641,11 @@ kv_rm(JManagedBackends* this, const gchar* namespace, const gchar* key)
gboolean ret = FALSE;

EXE(j_backend_kv_batch_start(this->kv_store, namespace, this->kv_semantics, &batch),
end,
"failed to start kv backend");
ret = j_backend_kv_delete(this->kv_store, batch, key);
EXE(j_backend_kv_batch_execute(this->kv_store, batch),
end,
"failed to execute delete kv value");
end:
return ret;
Expand Down Expand Up @@ -743,7 +741,7 @@ j_backend_managed_object_migrate(JManagedBackends* this,
gpointer data = NULL;

read_lock(&this->global_lock);
EXE(kv_get(this, namespace, path, &entry), "Unable to migrate, because entry not found!");
EXE(kv_get(this, namespace, path, &entry), end, "Unable to migrate, because entry not found!");
if (entry->backend_id == dest)
{
ret = TRUE;
Expand All @@ -753,7 +751,7 @@ j_backend_managed_object_migrate(JManagedBackends* this,
lock = TRUE;
if (generation != entry->generation)
{
EXE(kv_get(this, namespace, path, &entry), "failed open kvEntry again for migration");
EXE(kv_get(this, namespace, path, &entry), end, "failed open kvEntry again for migration");
}
from = this->object_backend[entry->backend_id]->orig;
if (this->log.filename)
Expand All @@ -780,32 +778,38 @@ j_backend_managed_object_migrate(JManagedBackends* this,
goto end;
}
EXE(j_backend_object_status(from, object_from, &mod_time, &size),
end,
"Failed to get object size");
data = malloc(MIN(size, max_chunk_size));

EXE(j_backend_object_create(to, namespace, path, &object_to),
end,
"Failed to create new object");
while (size > 0)
{
guint64 written, transfer = MIN(size, max_chunk_size);
EXE(j_backend_object_read(from, object_from, data, transfer, offset, &written) && transfer == written,
end,
"Failed to read object for transmission");
EXE(j_backend_object_write(to, object_to, data, transfer, offset, &written) && written == transfer,
end,
"Failed to write migrated object");
size -= written;
offset += written;
}

EXE(j_backend_object_delete(from, object_from),
end,
"Failed to delete original object");
EXE(j_backend_object_close(to, object_to),
end,
"Failed to close new object");
}

new_entry = *entry;
++new_entry.generation;
new_entry.backend_id = dest;
EXE(kv_put(this, namespace, path, &new_entry), "failed to store new migrated entry");
EXE(kv_put(this, namespace, path, &new_entry), end, "failed to store new migrated entry");
ret = TRUE;
end:

Expand Down
Loading

0 comments on commit 46934e2

Please sign in to comment.