Skip to content

Commit

Permalink
Rebase to master, set libfabric version to 1.6
Browse files Browse the repository at this point in the history
  • Loading branch information
JBenda committed Oct 31, 2021
1 parent a15b56b commit 919ff5c
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 6 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,12 @@ jobs:
exclude:
# GLib is too old for TAP on Ubuntu 18.04
- os:
dist: ubuntu-18.04
dist: ubuntu-18.04
dependencies: system
# libfaibrc 1.11 is not available on Ubuntu 20.04
# - os:
# dist: ubuntu-20.04
# depednencies: system
steps:
- name: Checkout
uses: actions/checkout@v2
Expand Down
3 changes: 0 additions & 3 deletions include/core/jconfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ guint64 j_configuration_get_message_inject_size(JConfiguration*);
guint32 j_configuration_get_max_connections(JConfiguration*);
guint64 j_configuration_get_stripe_size(JConfiguration*);

/// network port to communicate
guint16 j_configuration_get_port(JConfiguration*);

gint64 j_configuration_get_libfabric_version(JConfiguration*);
struct fi_info* j_configuration_get_libfabric_hints(JConfiguration*);

Expand Down
2 changes: 1 addition & 1 deletion lib/core/jconfiguration.c
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ j_configuration_new_for_data(GKeyFile* key_file)
configuration->max_connections = max_connections;
configuration->stripe_size = stripe_size;
configuration->ref_count = 1;
configuration->libfabric.version = FI_VERSION(1, 11);
configuration->libfabric.version = FI_VERSION(1, 6);
configuration->libfabric.hints = fi_allocinfo();
configuration->libfabric.hints->caps =
FI_MSG | FI_SEND | FI_RECV | FI_READ | FI_RMA | FI_REMOTE_READ;
Expand Down
22 changes: 21 additions & 1 deletion lib/object/jdistributed-object.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ struct JDistributedObjectBackgroundData
struct
{
JList* bytes_written;
guint64 total_bytes_written;
} write;
};
};
Expand Down Expand Up @@ -440,6 +441,7 @@ j_distributed_object_write_background_operation(gpointer data)
guint64* bytes_written = j_list_iterator_get(it);

nbytes = j_message_get_8(reply);
background_data->write.total_bytes_written += nbytes;
j_helper_atomic_add(bytes_written, nbytes);
}
}
Expand Down Expand Up @@ -983,13 +985,17 @@ j_distributed_object_write_exec(JList* operations, JSemantics* semantics)
gsize name_len = 0;
gsize namespace_len = 0;
guint32 server_count = 0;
guint64 total_data_length = 0;
JSemanticsSafety safety;

/// \todo
//JLock* lock = NULL;

g_return_val_if_fail(operations != NULL, FALSE);
g_return_val_if_fail(semantics != NULL, FALSE);

safety = j_semantics_get(semantics, J_SEMANTICS_SAFETY);

{
JDistributedObjectOperation* operation = j_list_get_first(operations);
g_assert(operation != NULL);
Expand Down Expand Up @@ -1075,10 +1081,14 @@ j_distributed_object_write_exec(JList* operations, JSemantics* semantics)
new_data += new_length;

// Fake bytes_written here instead of doing another loop further down
if (j_semantics_get(semantics, J_SEMANTICS_SAFETY) == J_SEMANTICS_SAFETY_NONE)
if (safety == J_SEMANTICS_SAFETY_NONE)
{
j_helper_atomic_add(bytes_written, new_length);
}
else
{
total_data_length += new_length;
}
}
}
else
Expand Down Expand Up @@ -1114,13 +1124,15 @@ j_distributed_object_write_exec(JList* operations, JSemantics* semantics)
data->operations = NULL;
data->semantics = semantics;
data->write.bytes_written = bw_lists[i];
data->write.total_bytes_written = 0;
data->ret = TRUE;

background_data[i] = data;
}

j_helper_execute_parallel(j_distributed_object_write_background_operation, background_data, server_count);

guint64 total_written= 0;
for (guint i = 0; i < server_count; i++)
{
JDistributedObjectBackgroundData* data;
Expand All @@ -1132,9 +1144,17 @@ j_distributed_object_write_exec(JList* operations, JSemantics* semantics)

data = background_data[i];
ret = data->ret && ret;
total_written += data->write.total_bytes_written;

g_slice_free(JDistributedObjectBackgroundData, data);
}
if(safety == J_SEMANTICS_SAFETY_STORAGE || safety == J_SEMANTICS_SAFETY_NETWORK)
{
if(total_written != total_data_length)
{
ret = FALSE;
}
}
}
else
{
Expand Down
11 changes: 11 additions & 0 deletions lib/object/jobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ j_object_write_exec(JList* operations, JSemantics* semantics)
g_autoptr(JMessage) message = NULL;
JObject* object;
gpointer object_handle;
guint64 total_data_length = 0;

/// \todo
//JLock* lock = NULL;
Expand Down Expand Up @@ -677,6 +678,10 @@ j_object_write_exec(JList* operations, JSemantics* semantics)
{
j_helper_atomic_add(bytes_written, length);
}
else
{
total_data_length += length;
}
}
else
{
Expand Down Expand Up @@ -710,6 +715,7 @@ j_object_write_exec(JList* operations, JSemantics* semantics)

if (j_message_get_count(reply) > 0)
{
guint64 total_received_length = 0;
it = j_list_iterator_new(operations);

while (j_list_iterator_next(it))
Expand All @@ -718,10 +724,15 @@ j_object_write_exec(JList* operations, JSemantics* semantics)
guint64* bytes_written = operation->write.bytes_written;

nbytes = j_message_get_8(reply);
total_received_length += nbytes;
j_helper_atomic_add(bytes_written, nbytes);
}

j_list_iterator_free(it);

if(total_data_length != total_received_length) {
ret = FALSE;
}
}
else
{
Expand Down

0 comments on commit 919ff5c

Please sign in to comment.