diff --git a/.travis.yml b/.travis.yml index 549aacd..b396b64 100644 --- a/.travis.yml +++ b/.travis.yml @@ -80,7 +80,7 @@ before_install: script: - cd $TRAVIS_BUILD_DIR - - cmake -DCMAKE_C_COMPILER=$CC -DCMAKE_CXX_COMPILER=$CXX -DCMAKE_C_FLAGS="-Wall -coverage" -DMPI=OFF -DAXL_ASYNC_DAEMON=ON -DWITH_KVTREE_PREFIX=$HOME/KVTree/kvtree-install + - cmake -DCMAKE_C_COMPILER=$CC -DCMAKE_CXX_COMPILER=$CXX -DCMAKE_C_FLAGS="-Wall -coverage" -DMPI=OFF -DWITH_KVTREE_PREFIX=$HOME/KVTree/kvtree-install - make -k VERBOSE=1 - make check diff --git a/CMakeLists.txt b/CMakeLists.txt index 7c04195..af63d94 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,6 +43,12 @@ IF(ZLIB_FOUND) LIST(APPEND AXL_LINK_LINE "-lz") ENDIF(ZLIB_FOUND) +# PTHREADS +FIND_PACKAGE(Threads REQUIRED) +IF(CMAKE_THREAD_LIBS_INIT) + LIST(APPEND AXL_EXTERNAL_LIBS "-lpthread") +ENDIF() + ## CPPR IF(${AXL_ASYNC_API} STREQUAL "INTEL_CPPR") FIND_PACKAGE(CPPR REQUIRED) diff --git a/doc/README.md b/doc/README.md index ddfcecf..c0b3667 100644 --- a/doc/README.md +++ b/doc/README.md @@ -60,6 +60,8 @@ from the destination. * AXL\_XFER\_SYNC - this is a synchronous transfer, which does not return until the files have been fully copied. It uses POSIX I/O to directly read/write files. +* AXL\_XFER\_PTHREADS - Like AXL\_XFER\_SYNC, but use multiple threads to do the copy. + * AXL\_XFER\_ASYNC\_DAEMON - this methods currently forks a process that sleeps in the background waiting for work to do. The library communicates with this daemon via a shared metadata file. When there are files to be copied, the daemon itself uses POSIX I/O to directly read/write files. This works, but dealing with the forked process is clunky. It needs to be rewritten to use a pthread instead of forking a process. * AXL\_XFER\_ASYNC\_BBAPI - this method uses [IBM's Burst Buffer API](https://github.com/IBM/CAST) to transfer files. IBM's system software then takes over to move data in the background. It's actually using NVMeoF, reading data from the local SSD from a remote node, so that the compute node is not really bothered once started. diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b364e1e..090dcfa 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -21,6 +21,7 @@ LIST(APPEND libaxl_srcs axl_err.c axl_io.c axl_util.c + axl_pthread.c ) ADD_LIBRARY(axl_o OBJECT ${libaxl_srcs}) diff --git a/src/axl.c b/src/axl.c index b95461f..6bceb40 100644 --- a/src/axl.c +++ b/src/axl.c @@ -1,5 +1,10 @@ #include #include +#include +#include + +/* PATH_MAX */ +#include /* dirname */ #include @@ -8,6 +13,9 @@ #include #include +/* opendir */ +#include + /* axl_xfer_t */ #include "axl.h" @@ -24,6 +32,7 @@ /*#include "axl_async_cppr.h" */ #include "axl_async_daemon.h" #include "axl_async_datawarp.h" +#include "axl_pthread.h" /* define states for transfer handlesto help ensure * users call AXL functions in the correct order */ @@ -241,7 +250,7 @@ int AXL_Create (axl_xfer_t xtype, const char* name) } /* Create an entry for this transfer handle - * record user string and transfer type + * record user string and transfer type * UID * id * NAME @@ -265,6 +274,7 @@ int AXL_Create (axl_xfer_t xtype, const char* name) case AXL_XFER_ASYNC_DAEMON: case AXL_XFER_ASYNC_DW: case AXL_XFER_ASYNC_CPPR: + case AXL_XFER_PTHREAD: break; case AXL_XFER_ASYNC_BBAPI: rc = axl_async_create_bbapi(id); @@ -289,11 +299,33 @@ int AXL_Create (axl_xfer_t xtype, const char* name) return id; } -/* Add a file to an existing transfer handle */ -int AXL_Add (int id, const char* source, const char* destination) +/* Is this path a file or a directory? Return the type. */ +enum {PATH_UNKNOWN = 0, PATH_FILE, PATH_DIR}; +static int path_type(const char *path) { + struct stat s; + if (stat(path, &s) != 0) { + return 0; + } + if (S_ISREG(s.st_mode)) { + return PATH_FILE; + } + if (S_ISDIR(s.st_mode)) { + return PATH_DIR; + } + return PATH_UNKNOWN; +} + +/* + * Add a file to an existing transfer handle. No directories. + * + * If the file's destination path doesn't exist, then automatically create the + * needed directories. + */ +static int +__AXL_Add (int id, const char* src, const char* dest) { - /* lookup transfer info for the given id */ kvtree* file_list = NULL; + axl_xfer_t xtype = AXL_XFER_NULL; axl_xfer_state_t xstate = AXL_XFER_STATE_NULL; if (axl_get_info(id, &file_list, &xtype, &xstate) != AXL_SUCCESS) { @@ -316,21 +348,22 @@ int AXL_Add (int id, const char* source, const char* destination) * /path/to/dest/file * STATUS * SOURCE */ - kvtree* src_hash = kvtree_set_kv(file_list, AXL_KEY_FILES, source); - kvtree_util_set_str(src_hash, AXL_KEY_FILE_DEST, destination); + kvtree* src_hash = kvtree_set_kv(file_list, AXL_KEY_FILES, src); + kvtree_util_set_str(src_hash, AXL_KEY_FILE_DEST, dest); kvtree_util_set_int(src_hash, AXL_KEY_STATUS, AXL_STATUS_SOURCE); /* add file to transfer data structure, depending on its type */ int rc = AXL_SUCCESS; switch (xtype) { case AXL_XFER_SYNC: + case AXL_XFER_PTHREAD: break; case AXL_XFER_ASYNC_DAEMON: break; case AXL_XFER_ASYNC_DW: break; case AXL_XFER_ASYNC_BBAPI: - rc = axl_async_add_bbapi(id, source, destination); + rc = axl_async_add_bbapi(id, src, dest); break; case AXL_XFER_ASYNC_CPPR: break; @@ -348,6 +381,114 @@ int AXL_Add (int id, const char* source, const char* destination) return rc; } +/* + * Add a file or directory to the transfer handle. If the src is a + * directory, recursively add all the files and directories in that + * directory. + */ +int AXL_Add (int id, const char *src, const char *dest) +{ + int rc; + DIR *dir; + struct dirent *de; + char *src_copy, *dest_copy; + char *src_basename; + unsigned int src_path_type, dest_path_type; + char *new_dest, *new_src, *final_dest; + + new_dest = calloc(PATH_MAX, 1); + if (!new_dest) { + return ENOMEM; + } + new_src = calloc(PATH_MAX, 1); + if (!new_src) { + free(new_dest); + return ENOMEM; + } + final_dest = calloc(PATH_MAX, 1); + if (!final_dest) { + free(new_dest); + free(new_src); + return ENOMEM; + } + + src_copy = strdup(src); + dest_copy = strdup(dest); + + src_path_type = path_type(src); + dest_path_type = path_type(dest); + src_basename = basename(src_copy); + + switch (src_path_type) { + case PATH_FILE: + if (dest_path_type == PATH_DIR) { + /* + * They passed a source file, with dest directory. Append the + * filename to dest. + * + * Before: + * src dest + * /tmp/file1 /tmp/mydir + * + * After: + * /tmp/file1 /tmp/mydir/file1 + */ + + snprintf(new_dest, PATH_MAX, "%s/%s", dest, src_basename); + rc = __AXL_Add(id, src, new_dest); + } else { + /* The destination is a filename */ + rc = __AXL_Add(id, src, dest); + } + break; + case PATH_DIR: + /* Add the directory itself first... */ + if (dest_path_type == PATH_FILE) { + /* We can't copy a directory onto a file */ + rc = EINVAL; + break; + } else if (dest_path_type == PATH_DIR) { + snprintf(new_dest, PATH_MAX, "%s/%s", dest, src_basename); + } else { + /* Our destination doesn't exist */ + snprintf(new_dest, PATH_MAX, "%s", dest); + } + + /* Traverse all files/dirs in the directory. */ + dir = opendir(src); + if (!dir) { + rc = ENOENT; + break; + } + while ((de = readdir(dir)) != NULL) { + /* Skip '.' and '..' directories */ + if ((strcmp(de->d_name, ".") == 0) || (strcmp(de->d_name, "..") == 0)) { + continue; + } + snprintf(new_src, PATH_MAX, "%s/%s", src, de->d_name); + snprintf(final_dest, PATH_MAX, "%s/%s", new_dest, de->d_name); + + rc = AXL_Add(id, new_src, final_dest); + if (rc != AXL_SUCCESS) { + rc = EINVAL; + break; + } + } + break; + + default: + rc = EINVAL; + break; + } + + free(dest_copy); + free(src_copy); + free(final_dest); + free(new_src); + free(new_dest); + return rc; +} + /* Initiate a transfer for all files in handle ID */ int AXL_Dispatch (int id) { @@ -355,6 +496,7 @@ int AXL_Dispatch (int id) kvtree* file_list = NULL; axl_xfer_t xtype = AXL_XFER_NULL; axl_xfer_state_t xstate = AXL_XFER_STATE_NULL; + if (axl_get_info(id, &file_list, &xtype, &xstate) != AXL_SUCCESS) { AXL_ERR("Could not find transfer info for UID %d", id); return AXL_FAILURE; @@ -371,19 +513,16 @@ int AXL_Dispatch (int id) kvtree_elem* elem; kvtree* files = kvtree_get(file_list, AXL_KEY_FILES); for (elem = kvtree_elem_first(files); elem != NULL; elem = kvtree_elem_next(elem)) { - /* get path to source file */ - char* source = kvtree_elem_key(elem); - /* get hash for this file */ kvtree* elem_hash = kvtree_elem_hash(elem); /* get destination for this file */ - char* destination; - kvtree_util_get_str(elem_hash, AXL_KEY_FILE_DEST, &destination); + char* dest; + kvtree_util_get_str(elem_hash, AXL_KEY_FILE_DEST, &dest); /* figure out and create dirs that should exist */ /* TODO: vendors may implement smarter functions for mkdir */ - char* dest_path = strdup(destination); + char* dest_path = strdup(dest); char* dest_dir = dirname(dest_path); mode_t mode_dir = axl_getmode(1, 1, 1); axl_mkdir(dest_dir, mode_dir); @@ -398,6 +537,9 @@ int AXL_Dispatch (int id) case AXL_XFER_SYNC: rc = axl_sync_start(id); break; + case AXL_XFER_PTHREAD: + rc = axl_pthread_start(id); + break; case AXL_XFER_ASYNC_DAEMON: rc = axl_async_start_daemon(id); break; @@ -462,6 +604,9 @@ int AXL_Test (int id) case AXL_XFER_SYNC: rc = axl_sync_test(id); break; + case AXL_XFER_PTHREAD: + rc = axl_pthread_test(id); + break; case AXL_XFER_ASYNC_DAEMON: rc = axl_async_test_daemon(id, &bytes_total, &bytes_written); break; @@ -495,7 +640,6 @@ int AXL_Wait (int id) AXL_ERR("Could not find transfer info for UID %d", id); return AXL_FAILURE; } - /* check that handle is in correct state to wait */ if (xstate != AXL_XFER_STATE_DISPATCHED) { AXL_ERR("Invalid state to wait UID %d", id); @@ -506,6 +650,7 @@ int AXL_Wait (int id) /* lookup status for the transfer, return if done */ int status; kvtree_util_get_int(file_list, AXL_KEY_STATUS, &status); + if (status == AXL_STATUS_DEST) { return AXL_SUCCESS; } else if (status == AXL_STATUS_ERROR) { @@ -521,6 +666,9 @@ int AXL_Wait (int id) case AXL_XFER_SYNC: rc = axl_sync_wait(id); break; + case AXL_XFER_PTHREAD: + rc = axl_pthread_wait(id); + break; case AXL_XFER_ASYNC_DAEMON: rc = axl_async_wait_daemon(id); break; @@ -625,11 +773,16 @@ int AXL_Free (int id) kvtree* file_list = NULL; axl_xfer_t xtype = AXL_XFER_NULL; axl_xfer_state_t xstate = AXL_XFER_STATE_NULL; + if (axl_get_info(id, &file_list, &xtype, &xstate) != AXL_SUCCESS) { AXL_ERR("Could not find transfer info for UID %d", id); return AXL_FAILURE; } + if (xtype == AXL_XFER_PTHREAD) { + axl_pthread_free(id); + } + /* check that handle is in correct state to free */ if (xstate != AXL_XFER_STATE_CREATED && xstate != AXL_XFER_STATE_COMPLETED) diff --git a/src/axl.h b/src/axl.h index fb17dff..f4b6b7e 100644 --- a/src/axl.h +++ b/src/axl.h @@ -28,6 +28,7 @@ typedef enum { * shmem). If there is no native API, fall back * to AXL_XFER_DEFAULT. */ + AXL_XFER_PTHREAD, /* parallel copy using pthreads */ } axl_xfer_t; /* Read configuration from non-AXL-specific file diff --git a/src/axl_internal.h b/src/axl_internal.h index 10904cd..b808124 100644 --- a/src/axl_internal.h +++ b/src/axl_internal.h @@ -26,6 +26,8 @@ extern kvtree* axl_file_lists; #define AXL_KEY_FILE_DEST ("DEST") #define AXL_KEY_FILE_STATUS ("STATUS") #define AXL_KEY_FILE_CRC ("CRC") +#define AXL_KEY_PTHREAD_DATA ("PTHREAD_DATA") + /* TRANSFER STATUS */ #define AXL_STATUS_SOURCE (1) diff --git a/src/axl_pthread.c b/src/axl_pthread.c new file mode 100644 index 0000000..95d25eb --- /dev/null +++ b/src/axl_pthread.c @@ -0,0 +1,394 @@ +#include +#include +#include +#include /* get_nprocs() */ +#include "axl_internal.h" +#include "kvtree_util.h" +#include "axl_sync.h" + +/* get_nprocs() */ +#if defined(__APPLE__) +#include +#else +#include +#endif + +#define MIN(a,b) (a < b ? a : b) + +/* + * We default our number of threads to the lesser of: + * + * - The number of CPU threads + * - MAX_THREADS + * - The number of files being transfered + */ +#define MAX_THREADS 16 /* We don't see much scaling past 16 threads */ + +struct axl_work +{ + struct axl_work *next; + /* The file element in the kvtree */ + kvtree_elem* elem; +}; + +struct axl_pthread_data +{ + /* Lock to protect workqueue */ + pthread_mutex_t lock; + + /* Our workqueue */ + struct axl_work *head; + struct axl_work *tail; + + /* Number of threads */ + unsigned int threads; + + /* Array of our thread IDs */ + pthread_t *tid; +}; + +/* Linux and OSX compatible 'get number of hardware threads' */ +unsigned int axl_get_nprocs(void) +{ + unsigned int cpu_threads; +#if defined(__APPLE__) + int count; + size_t size = sizeof(count); + if (sysctlbyname("hw.ncpu", &count, &size, NULL, 0)) { + cpu_threads = 1; + } else { + cpu_threads = count; + } +#else + cpu_threads = get_nprocs(); +#endif + return cpu_threads; +} + +/* The actual pthread function */ +static void * +axl_pthread_func(void *arg) +{ + struct axl_pthread_data *pdata = arg; + struct axl_work *work; + int rc; + char *src, *dst; + kvtree_elem *elem; + kvtree *elem_hash; + + while (1) { + pthread_mutex_lock(&pdata->lock); + + /* Get a file to transfer */ + work = pdata->head; + if (!work) { + /* No more work to do */ + pthread_mutex_unlock(&pdata->lock); + pthread_exit((void *) AXL_SUCCESS); + } else { + /* Take our work out of the queue */ + pdata->head = pdata->head->next; + pthread_mutex_unlock(&pdata->lock); + } + + /* Lookup our filename */ + elem = work->elem; + elem_hash = kvtree_elem_hash(elem); + src = kvtree_elem_key(elem); + kvtree_util_get_str(elem_hash, AXL_KEY_FILE_DEST, &dst); + + rc = axl_file_copy(src, dst, axl_file_buf_size, NULL); + AXL_DBG(2, "%s: Read and copied %s to %s, rc %d", + __func__, src, dst, rc); + + /* Record the success/failure of the individual file transfer */ + if (rc == AXL_SUCCESS) { + kvtree_util_set_int(elem_hash, AXL_KEY_FILE_STATUS, AXL_STATUS_DEST); + } else { + kvtree_util_set_int(elem_hash, AXL_KEY_FILE_STATUS, AXL_STATUS_ERROR); + } + + free(work); + + if (rc != AXL_SUCCESS) + pthread_exit((void *) AXL_FAILURE); + } + return NULL; +} + +static struct axl_pthread_data * +axl_pthread_create(unsigned int threads) +{ + struct axl_pthread_data *pdata; + + pdata = calloc(1, sizeof(*pdata)); + if (!pdata) + return NULL; + + if (pthread_mutex_init(&pdata->lock, NULL) != 0) { + free(pdata); + return NULL; + } + + pdata->tid = calloc(threads, sizeof(pdata->tid[0])); + if (!pdata->tid) { + free(pdata); + return NULL; + } + pdata->threads = threads; + + pdata->head = pdata->tail = NULL; + + return pdata; +} + +/* Free up our axl_pthread_data. We assume no one is competing for the lock. */ +static void +axl_pthread_free_pdata(struct axl_pthread_data *pdata) +{ + struct axl_work *work; + + /* + * Free our workqueue item first (if any). If our transfer completed + * successfully, all workqueue entries would have already been freed. + */ + work = pdata->head; + while (pdata->head) { + work = pdata->head->next; + free(pdata->head); + pdata->head = work; + } + + free(pdata->tid); + free(pdata); +} + +/* Add a file to our workqueue. We assume the lock is already held */ +static int +axl_pthread_add_work(struct axl_pthread_data *pdata, kvtree_elem* elem) +{ + struct axl_work *work; + + work = calloc(1, sizeof(*work)); + if (!work) + return AXL_FAILURE; + + work->elem = elem; + + if (!pdata->head) { + /* First time we inserted into the workqueue */ + pdata->head = work; + pdata->tail = work; + } else { + /* Insert it at the end of the list */ + pdata->tail->next = work; + pdata->tail = work; + } + return AXL_SUCCESS; +} + +/* + * Create and wakeup all our threads to start transferring the files in + * the workqueue. + */ +static int +axl_pthread_run(struct axl_pthread_data *pdata) +{ + int i; + int rc; + int final_rc = AXL_SUCCESS; + + for (i = 0; i < pdata->threads; i++) { + rc = pthread_create(&pdata->tid[i], NULL, &axl_pthread_func, pdata); + if (rc != 0) + return rc; + } + + return final_rc; +} + +int axl_pthread_start (int id) +{ + /* assume we'll succeed */ + int rc = AXL_SUCCESS; + struct axl_pthread_data *pdata; + unsigned int file_count, threads, cpu_threads; + + /* get pointer to file list for this dataset */ + kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id); + + /* mark dataset as in progress */ + kvtree_util_set_int(file_list, AXL_KEY_STATUS, AXL_STATUS_INPROG); + + kvtree_elem* elem = NULL; + kvtree* files = kvtree_get(file_list, AXL_KEY_FILES); + file_count = kvtree_size(files); + + /* Get number of hardware threads */ + cpu_threads = axl_get_nprocs(); + + threads = MIN(cpu_threads, MIN(file_count, MAX_THREADS)); + + /* Create the data structure for our threads */ + pdata = axl_pthread_create(threads); + if (!pdata) + return AXL_FAILURE; + + for (elem = kvtree_elem_first(files); elem != NULL; elem = kvtree_elem_next(elem)) { + /* get the hash for this file */ + kvtree* elem_hash = kvtree_elem_hash(elem); + + /* WEIRD case: we've restarted a sync transfer that was going */ + int status; + kvtree_util_get_int(elem_hash, AXL_KEY_FILE_STATUS, &status); + if (status == AXL_STATUS_DEST) { + /* this file was already transfered */ + continue; + } + + rc = axl_pthread_add_work(pdata, elem); + if (rc != AXL_SUCCESS) { + printf("something bad happened\n"); + /* + * Something bad happened. Break here instead of returning so + * that pdata gets freed (at the end of this function). + */ + break; + } + } + + /* + * Store our pointer to our pthread data in our kvtree. KVTree doesn't + * doesn't support pointers, so we store it as a int64_t. */ + rc = kvtree_util_set_int64(file_list, AXL_KEY_PTHREAD_DATA, (int64_t) pdata); + if (rc != AXL_SUCCESS) { + AXL_ERR("Couldn't set AXL_KEY_PTHREAD_DATA"); + return rc; + } + + /* + * At this point, all our files are queued in the workqueue. Start the + * transfers. + */ + rc = axl_pthread_run(pdata); + if (rc == AXL_SUCCESS) { + kvtree_util_set_int(file_list, AXL_KEY_STATUS, AXL_STATUS_INPROG); + } else { + AXL_ERR("Couldn't spawn all threads"); + kvtree_util_set_int(file_list, AXL_KEY_STATUS, AXL_STATUS_ERROR); + } + + return rc; +} + +int axl_pthread_test (int id) +{ + kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id); + int64_t ptr; + struct axl_pthread_data *pdata; + + kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr); + pdata = (struct axl_pthread_data *) ptr; + + /* Is there still work in the workqueue? */ + if (pdata->head) { + /* Yes there is, we need to wait */ + return AXL_FAILURE; + } + + return AXL_SUCCESS; +} + +int axl_pthread_wait (int id) +{ + /* get pointer to file list for this dataset */ + kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id); + int64_t ptr; + struct axl_pthread_data *pdata; + int rc; + int final_rc = AXL_SUCCESS; + int i; + void *rc_ptr; + + kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr); + pdata = (struct axl_pthread_data *) ptr; + if (!pdata) { + /* Did they call AXL_Cancel() and then AXL_Wait()? */ + AXL_ERR("No pthread data\n"); + return AXL_FAILURE; + } + + /* All our threads are now started. Wait for them to finish */ + for (i = 0; i < pdata->threads; i++) { + rc = pthread_join(pdata->tid[i], &rc_ptr); + + /* + * Thread either finished successfully, or was canceled by + * AXL_Cancel(). Both statuses are valid. + */ + if (rc != 0 && rc_ptr != PTHREAD_CANCELED) { + AXL_ERR("pthread_join(%d) failed (%d)", i, rc); + return AXL_FAILURE; + } + + /* + * Check the rc that the thread actually reported. The thread + * returns a void * that we encode our rc value in. + */ + rc = (int) ((unsigned long) rc_ptr); + if (rc) { + final_rc |= AXL_FAILURE; + } + } + if (final_rc != AXL_SUCCESS) { + AXL_ERR("Couldn't join all threads"); + return AXL_FAILURE; + } + + kvtree_util_set_int64(file_list, AXL_KEY_PTHREAD_DATA, 0); + axl_pthread_free_pdata(pdata); + + /* All threads are now successfully finished */ + kvtree_util_set_int(file_list, AXL_KEY_STATUS, AXL_STATUS_DEST); + + return axl_sync_wait(id); +} + +int axl_pthread_cancel (int id) +{ + /* get pointer to file list for this dataset */ + kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id); + int64_t ptr; + struct axl_pthread_data *pdata; + int rc = 0; + int i; + + kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr); + pdata = (struct axl_pthread_data *) ptr; + + for (i = 0; i < pdata->threads; i++) { + rc |= pthread_cancel(pdata->tid[i]); + } + if (rc != 0) { + AXL_ERR("Bad return code from pthread_cancel()\n"); + } + return AXL_FAILURE; +} + +void axl_pthread_free (int id) +{ + kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id); + int64_t ptr; + struct axl_pthread_data *pdata; + + /* + * pdata should have been freed in AXL_Wait(), but maybe they just did + * an AXL_Cancel() and then an AXL_Free(). If so, pdata will be set + * and we should free it. + */ + kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr); + pdata = (struct axl_pthread_data *) ptr; + if (pdata) { + axl_pthread_free_pdata(pdata); + } +} diff --git a/src/axl_pthread.h b/src/axl_pthread.h new file mode 100644 index 0000000..75f061d --- /dev/null +++ b/src/axl_pthread.h @@ -0,0 +1,10 @@ +#ifndef AXL_PTHREAD_H +#define AXL_PTHREAD_H + +int axl_pthread_start(int id); +int axl_pthread_test(int id); +int axl_pthread_wait(int id); +int axl_pthread_cancel(int id); +void axl_pthread_free(int id); + +#endif //AXL_SYNC_H diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d06aae0..b9cff16 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -3,21 +3,22 @@ ############### LIST(APPEND axl_test_srcs - test_axl.c - test_axl_sync.c - test_axl_async_daemon.c - test_axl_async_ibmbb.c + axl_cp.c ) -ADD_EXECUTABLE(test_axl ${axl_test_srcs}) -TARGET_LINK_LIBRARIES(test_axl axl) +ADD_EXECUTABLE(axl_cp ${axl_test_srcs}) + +TARGET_LINK_LIBRARIES(axl_cp axl) ################ # Add tests to ctest ################ +CONFIGURE_FILE(test_axl.sh ${CMAKE_CURRENT_BINARY_DIR} COPYONLY) + # rountdrip tests -ADD_TEST(transfer_tests test_axl) +ADD_TEST(transfer_tests test_axl.sh) + #################### # make a verbose "test" target named "check" diff --git a/test/axl_cp.c b/test/axl_cp.c new file mode 100644 index 0000000..c093828 --- /dev/null +++ b/test/axl_cp.c @@ -0,0 +1,195 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include "axl.h" + +int id = -1; + +/* Return 1 if path is a directory */ +static int +is_dir(const char *path) { + struct stat s; + if (stat(path, &s) != 0) + return 0; + return S_ISDIR(s.st_mode); +} + +/* Translate a string like "bbapi" to its corresponding axl_xfer_t */ +static axl_xfer_t +axl_xfer_str_to_xfer(const char *xfer_str) +{ + struct { + const char *xfer_str; + axl_xfer_t xfer; + } xfer_strs[] = { + {"default", AXL_XFER_DEFAULT}, + {"native", AXL_XFER_NATIVE}, + {"sync", AXL_XFER_SYNC}, + {"dw", AXL_XFER_ASYNC_DW}, + {"bbapi", AXL_XFER_ASYNC_BBAPI}, + {"cppr", AXL_XFER_ASYNC_CPPR}, + {"pthread", AXL_XFER_PTHREAD}, + {NULL, AXL_XFER_NULL}, /* must always be last element in array */ + }; + int i; + + for (i = 0; xfer_strs[i].xfer_str; i++) { + if (strcmp(xfer_strs[i].xfer_str, xfer_str) == 0) { + /* Match */ + return xfer_strs[i].xfer; + } + } + /* No match */ + return AXL_XFER_NULL; +} + +static void +usage(void) +{ + printf("Usage: axl_cp [-r|-R] [-X xfer_type] SOURCE DEST\n"); + printf(" axl_cp [-r|-R] [-X xfer_type] SOURCE... DIRECTORY\n"); + printf("\n"); + printf("-r|-R: Copy directories recursively\n"); + printf("-X xfer_type: AXL transfer type: default native pthread sync dw bbapi cppr\n"); + printf("\n"); + +} + +void sig_func(int signum) +{ + if (id == -1) { + /* AXL not initialized yet */ + return; + } + + printf("axl_cp: canceling\n"); + AXL_Cancel(id); + AXL_Wait(id); + AXL_Free(id); + AXL_Finalize(); +} + +int +main(int argc, char **argv) { + int rc; + int opt; + int args_left; + const char *xfer_str = NULL; + char **src = NULL; + char *dest = NULL; + axl_xfer_t xfer; + unsigned int src_count; + int i; + int recursive = 0; + struct sigaction action = {0}; + action.sa_handler = sig_func; + sigaction(SIGTERM, &action, NULL); + + while ((opt = getopt(argc, argv, "rRX:")) != -1) { + switch (opt) { + case 'X': + xfer_str = optarg; + break; + case 'r': + case 'R': + recursive = 1; + break; + default: /* '?' */ + usage(); + exit(1); + } + } + args_left = argc - optind; + + /* Did they specify source(s) and destination? */ + if (args_left == 0) { + printf("Missing source and destination\n"); + usage(); + exit(1); + } else if (args_left == 1) { + printf("Missing destination"); + usage(); + exit(1); + } else { + src = &argv[optind]; + src_count = args_left - 1; + dest = argv[optind + args_left - 1]; + } + + /* + * They're doing a copy where the final arg is a destination directory, + * not a file. + */ + if (args_left > 2 && !is_dir(dest)) { + printf("axl_cp: target '%s' is not a directory\n", dest); + exit(1); + } + + if (xfer_str) { + xfer = axl_xfer_str_to_xfer(xfer_str); + if (xfer == AXL_XFER_NULL) { + printf("Error: Invalid AXL transfer type '%s'\n", xfer_str); + printf("\n"); + usage(); + exit(1); + } + } else { + xfer = AXL_XFER_SYNC; + } + + rc = AXL_Init(NULL); + if (rc != AXL_SUCCESS) { + printf("AXL_Init() failed (error %d)\n", rc); + return rc; + } + + id = AXL_Create(xfer, "axl_cp"); + if (id == -1) { + printf("AXL_Create() failed (error %d)\n", id); + return id; + } + + for (i = 0; i < src_count; i++) { + if (!recursive && is_dir(src[i])) { + printf("axl_cp: omitting directory '%s'\n", src[i]); + continue; + } + rc = AXL_Add(id, src[i], dest); + if (rc != AXL_SUCCESS) { + printf("AXL_Add(..., %s, %s) failed (error %d)\n", src[i], dest, rc); + return rc; + } + } + + rc = AXL_Dispatch(id); + if (rc != AXL_SUCCESS) { + printf("AXL_Dispatch() failed (error %d)\n", rc); + return rc; + } + + /* Wait for transfer to complete and finalize axl */ + rc = AXL_Wait(id); + if (rc != AXL_SUCCESS) { + printf("AXL_Wait() failed (error %d)\n", rc); + return rc; + } + + rc = AXL_Free(id); + if (rc != AXL_SUCCESS) { + printf("AXL_Free() failed (error %d)\n", rc); + return rc; + } + + rc = AXL_Finalize(); + if (rc != AXL_SUCCESS) { + printf("AXL_Finalize() failed (error %d)\n", rc); + return rc; + } + + return AXL_SUCCESS; +} diff --git a/test/test_axl.c b/test/test_axl.c deleted file mode 100644 index 9422716..0000000 --- a/test/test_axl.c +++ /dev/null @@ -1,328 +0,0 @@ -#include -#include -#include -#include -#include -#include - -#include "test_axl.h" -#include "test_axl_sync.h" -#include "test_axl_async_daemon.h" -#include "test_axl_async_ibmbb.h" - -#define SYNC 0x1 -#define ASYNC 0x2 -#define IBMBB 0x4 -#define DATAWARP 0x8 - -/* MAX_TESTS can be changed arbitrarily as long as - * it exceeds the number of tests - */ -#define MAX_TESTS (1024) - -static unsigned int num_tests = 0; - - -/* - * Diff the contents of two files. - * - * Return 0 if the file contents are the same, 1 if they differ. - */ -static int axl_compare_file_contents(const char *file1, const char *file2) -{ - FILE *fp1 = NULL, *fp2 = NULL; - char buf1[4096], buf2[4096]; - size_t rc1, rc2; - int rc = 1; - - fp1 = fopen(file1, "r"); - if (!fp1) { - goto end; - } - - fp2 = fopen(file2, "r"); - if (!fp2) { - goto end; - } - - while (1) { - rc1 = fread(buf1, sizeof(*buf1), 1, fp1); - if (rc1 < 0) { - break; - } - - rc2 = fread(buf2, sizeof(*buf2), 1, fp2); - if (rc2 < 0) { - break; - } - - if (rc1 != rc2) { - break; - } - - /* Are we done reading the files? */ - if (rc1 == 0) { - /* All done, success */ - rc = 0; - break; - } - - /* Did the file contents match? */ - if (memcmp(buf1, buf2, rc1) != 0) { - break; /* nope */ - } - } - -end: - if (fp1) { - fclose(fp1); - } - - if (fp2) { - fclose(fp2); - } - - return rc; -} - -/* - * Compare two files or two directories. - * - * 1. Given two file paths, compare the file permissions and file contents to - * make sure they're the same. - * - * 2. Given two directory paths, compare the directory permissions. - * - * Returns 0 if everything is the same, non-zero otherwise. - */ -int axl_compare_files_or_dirs(const char *path1, const char *path2) -{ - struct stat stat1, stat2; - if (stat(path1, &stat1) != 0) { - fprintf(stderr, "Couldn't stat %s: %s\n", path1, strerror(errno)); - return errno; - } - if (stat(path1, &stat2) != 0) { - fprintf(stderr, "Couldn't stat %s: %s\n", path2, strerror(errno)); - return errno; - } - if (!((S_ISREG(stat1.st_mode) && S_ISREG(stat2.st_mode)) || - (S_ISDIR(stat1.st_mode) && S_ISDIR(stat2.st_mode)))) { - fprintf(stderr, "%s and %s should be either both files, or both dirs\n", - path1, path2); - return EINVAL; - } - - if (stat1.st_size != stat2.st_size) { - fprintf(stderr, "%s and %s have different sizes (%zu and %zu)\n", - path1, path2, stat1.st_size, stat2.st_size); - return EINVAL; - } - - /* We were passed two files, compare their contents. */ - if (S_ISREG(stat1.st_mode) && S_ISREG(stat2.st_mode)) { - if (axl_compare_file_contents(path1, path2)) { - fprintf(stderr, "%s and %s file contents differed\n", - path1, path2); - return EINVAL; - } - } - - return 0; -} - -struct test { - char *name; - test_ptr_t func; -} tests[MAX_TESTS]; - -void register_test(test_ptr_t test_func, char* test_name) { - if (num_tests >= MAX_TESTS) { - fprintf(stderr, "Ran out of test space\n"); - exit(1); - } - - tests[num_tests].name = strdup(test_name); - tests[num_tests].func = test_func; - num_tests++; -} - -static int -do_tests(unsigned int flags, struct test_args *test_args) { - int i; - int test_rc[MAX_TESTS] = {0}; - int num_failed = 0; - - /* Initialize each test type */ - if (flags & ASYNC) { - test_axl_async_daemon_init(); - } - if (flags & IBMBB) { - test_axl_async_ibmbb_init(); - } - if (flags & SYNC) { - test_axl_sync_init(); - } - - /* Run the tests */ - for(i = 0; i < num_tests; i++) { - test_rc[i] = tests[i].func(test_args); - if (test_rc[i] != TEST_PASS) { - num_failed++; - } - } - - /* Print results */ - printf("Ran %d tests: %d pass, %d fail.\n", num_tests, num_tests - num_failed, num_failed); - for(i = 0; i < num_tests; i++) { - printf(" %s %s\n", tests[i].name, - test_rc[i] == 0 ? "PASSED" : "FAILED", test_rc[i]); - } - - /* Free all allocated memory */ - for(i = 0; i < num_tests; i++) { - free(tests[i].name); - } - - /* Return number of tests failed as return code */ - return num_failed; -} - - -/* - * Create a file with `size` bytes of random text data in it. - * - * Returns 0 on success, errno value on failure. - * */ -int create_source_file(const char *source_file, size_t size) -{ - FILE * fp; - char buf[1024]; - int chunk_size = sizeof(buf); - int chunks; - int rc = 0; - int chunk, i; - - fp = fopen(source_file, "w+"); - if (!fp) { - return errno; - } - - chunks = (size / chunk_size); - if (size % chunk_size > 0) - chunks++; /* The last chunk isn't totally full */ - - /* Write the random data out */ - for (chunk = 0; chunk < chunks; chunk++) { - int this_chunk_size; - - for (i = 0; i < chunk_size; i++) { - if (i % 80 == 0) { - buf[i] = '\n'; - } else { - buf[i] = 'a' + (rand() % 26); /* random printable chars */ - } - } - - if (size - (chunk * chunk_size) >= chunk_size ) { - /* Full chunk */ - this_chunk_size = chunk_size; - } else { - /* Last leftover data */ - this_chunk_size = size % chunk_size; - } - - rc = fwrite(buf, this_chunk_size, 1, fp); - if (rc != 1) { - printf("%d fwrite rc %d\n", chunk, rc); - fprintf(stderr, "Error writing %s\n", strerror(errno)); - return errno; - } - } - - fclose(fp); - return 0; -} - -static void usage(char *argv0) { - printf( "Usage: %s -abds [[-S size]|[src [dst]]]\n", - argv0); - printf("\n"); - printf("Tests to run:\n"); - printf("\t-a:\tAsync tests\n"); - printf("\t-b:\tIBM Burst Buffer tests\n"); - printf("\t-d:\tCray DataWarp tests\n"); - printf("\t-s:\tSync tests\n"); - printf("\t-S size:\tSize in bytes of test file to create"); - printf("\n"); - printf("\tsrc:\tsource file path (can be existing file)\n"); - printf("\tdst:\tdestination file path"); - printf("\n"); -} - - -int main(int argc, char** argv){ - unsigned int flags = 0; - int opt; - struct test_args test_args; - int rc; - char dst_path[1024]; - int args_left; - unsigned long size = 1024*1024;; - - while ((opt = getopt(argc, argv, "abdhsS:")) != -1) { - switch (opt) { - case 'a': - flags |= ASYNC; - break; - case 'b': - flags |= IBMBB; - break; - case 'd': - flags |= DATAWARP; - break; - case 'h': - usage(argv[0]); - exit(0); - case 's': - flags |= SYNC; - break; - case 'S': - size = atol(optarg); - break; - default: /* '?' */ - usage(argv[0]); - exit(EXIT_FAILURE); - } - } - - args_left = argc - optind; - - /* Did they specify a source and destination paths? */ - if (args_left >= 1) { - test_args.src_path = argv[optind]; - if (args_left >= 2) { - /* Source and destination path */ - test_args.dst_path = argv[optind + 1]; - } - } else { - /* No source or destination paths, use defaults. */ - test_args.src_path = "/tmp/testfile"; - sprintf(dst_path,"%s/%s", getenv("BBPATH"), "testfile"); - test_args.dst_path = dst_path; - } - - if (access(test_args.src_path, F_OK ) != -1 ) { - printf("Using existing source file %s\n", test_args.src_path); - } else { - printf("Creating %lu byte source file %s\n", size, test_args.src_path); - rc = create_source_file(test_args.src_path, size); - if (rc != 0) { - fprintf(stderr, "Error opening source test file %s for writing: %s\n", test_args.src_path, - strerror(rc)); - return rc; - } - } - - return do_tests(flags, &test_args); -} diff --git a/test/test_axl.h b/test/test_axl.h deleted file mode 100644 index 2405fd0..0000000 --- a/test/test_axl.h +++ /dev/null @@ -1,19 +0,0 @@ -#ifndef TEST_AXL_H -#define TEST_AXL_H - -#include "axl.h" - -#define TEST_PASS (0) -#define TEST_FAIL (1) - -struct test_args { - /* Source and destination file paths */ - const char *src_path; - const char *dst_path; -}; - -typedef int (*test_ptr_t)(struct test_args *); - -void register_test(test_ptr_t test, char* test_name); -int axl_compare_files_or_dirs(const char *path1, const char *path2); -#endif //TEST_AXL_H diff --git a/test/test_axl.sh b/test/test_axl.sh new file mode 100755 index 0000000..7193cd8 --- /dev/null +++ b/test/test_axl.sh @@ -0,0 +1,78 @@ +#!/bin/bash +# +# Basic AXL sanity test. +# +# Make some files in /tmp and copy them using AXL. +# + +src=$(mktemp -d) +dest=$(mktemp -d) + +trap ctrl_c INT + +function cleanup +{ + rm -fr "$src" "$dest" +} + +function ctrl_c() { + cleanup +} + +function create_files { + + # Create a directory structure + dirs=(./ home project/dir1 project/dir2 docs/c1/data docs/c2/data docs/c2) + for i in "${dirs[@]}" ; do + mkdir -p "$src/$i" + done + num_dirs=${#dirs[@]} + + # Make 50 files and put them in our directories + for i in {1..50} ; do + dirnum=$(($i % $num_dirs)) + tmp="$src/${dirs[$dirnum]}" + dd if=/dev/zero of="$tmp/$i.file" bs=1k count=$i &>/dev/null + done +} + +function run_test +{ + xfer=$1 + rc=0; + echo -en "Testing $xfer transfer...\t" + if ! out1="$(./axl_cp -X $xfer -r $src/* $dest)" ; then + echo "failed copy" + echo "$out1" + rc=1 + else + # Compare the two directories + if ! out2="$(diff -qr $src $dest)" ; then + echo "failed diff" + echo "$out1" + echo "---" + echo "$out2" + rc=2 + else + echo "success" + fi + fi + rm -fr "$dest"/* + if [ "$rc" != "0" ] ; then + false + else + true + fi +} + +create_files + +# Run our tests +xfers="sync pthread" +for i in $xfers ; do + if ! run_test $i ; then + cleanup + exit 1 + fi +done +cleanup diff --git a/test/test_axl_async_daemon.c b/test/test_axl_async_daemon.c deleted file mode 100644 index 2c52e3c..0000000 --- a/test/test_axl_async_daemon.c +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright (c) 2009, Lawrence Livermore National Security, LLC. - * Produced at the Lawrence Livermore National Laboratory. - * Written by Adam Moody . - * LLNL-CODE-411039. - * All rights reserved. - * KVTree was originally part of The Scalable Checkpoint / Restart (SCR) library. - * For details, see https://sourceforge.net/projects/scalablecr/ - * Please also read this file: LICENSE.TXT. -*/ - -#include "test_axl.h" -#include "test_axl_async_daemon.h" - -#include -#include -#include -#include -#include -#include - -#define TEST_PASS (0) -#define TEST_FAIL (1) - -#define TEST_FILE "test_file" -#define TEST_STRING "I am a file" - -#define TEST_NAME "test transfer" -#define TEST_DEST "test_file_moved" - -int test_axl_async_daemon(){ - int rc = TEST_PASS; - - /* Create a file */ - unlink(TEST_FILE); - FILE * fp = fopen(TEST_FILE, "w"); - fputs(TEST_STRING, fp); - fclose(fp); - - /* Create dest path */ - char pwd[PATH_MAX]; - if (getcwd(pwd, sizeof(pwd)) == NULL) { - printf("getcwd() failed: errno=%d (%s)\n", errno, strerror(errno)); - } - char* dest_path = malloc(strlen(pwd) + strlen(TEST_DEST) + 2); - strcpy(dest_path, pwd); - strcat(dest_path, "/"); - strcat(dest_path, TEST_DEST); - unlink(dest_path); - - /* Launch axl, reate a transfer, add test file, dispatch */ - if (AXL_Init(NULL) != AXL_SUCCESS) { - rc = TEST_FAIL; - goto cleanup; - } - - int id = AXL_Create(AXL_XFER_ASYNC_DAEMON, TEST_NAME); - if (id < 0) { - rc = TEST_FAIL; - goto cleanup; - } - - if (AXL_Add(id, TEST_FILE, dest_path) != AXL_SUCCESS) { - rc = TEST_FAIL; - goto cleanup; - } - - if (AXL_Dispatch(id) != AXL_SUCCESS) { - rc = TEST_FAIL; - goto cleanup; - } - -#if 0 - if (AXL_Cancel(id) != AXL_SUCCESS) { - rc = TEST_FAIL; - goto cleanup; - } - - /* will return AXL_SUCCESS if not cancelled, - * and !AXL_SUCCESS if cancelled (or error) */ - AXL_Wait(id); -#else - /* Wait for transfer to complete and finalize axl */ - if (AXL_Wait(id) != AXL_SUCCESS) { - rc = TEST_FAIL; - goto cleanup; - } -#endif - - if (AXL_Free(id) != AXL_SUCCESS) { - rc = TEST_FAIL; - goto cleanup; - } - - if (AXL_Finalize() != AXL_SUCCESS) { - rc = TEST_FAIL; - goto cleanup; - } - - /* Check that file arrived properly */ - FILE* dfp = fopen(dest_path, "r"); - if(!dfp) { - rc = TEST_FAIL; - } else { - char* read_str = malloc(strlen(TEST_STRING) + 1); - if (fgets(read_str, strlen(TEST_STRING) + 1, dfp) == NULL) { - printf("fgets() returned NULL\n"); - } - if(strcmp(read_str, TEST_STRING)) rc = TEST_FAIL; - free(read_str); - fclose(dfp); - } - -cleanup: - /* Unlink test files and return rc */ - unlink(TEST_FILE); - unlink(dest_path); - free(dest_path); - return rc; -} - -void test_axl_async_daemon_init(){ - register_test(test_axl_async_daemon, "test_axl_async_daemon"); -} diff --git a/test/test_axl_async_daemon.h b/test/test_axl_async_daemon.h deleted file mode 100644 index 16d7698..0000000 --- a/test/test_axl_async_daemon.h +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2009, Lawrence Livermore National Security, LLC. - * Produced at the Lawrence Livermore National Laboratory. - * Written by Adam Moody . - * LLNL-CODE-411039. - * All rights reserved. - * KVTree was originally part of The Scalable Checkpoint / Restart (SCR) library. - * For details, see https://sourceforge.net/projects/scalablecr/ - * Please also read this file: LICENSE.TXT. -*/ - -#ifndef TEST_AXL_ASYNC_DAEMON_H -#define TEST_AXL_ASYNC_DAEMON_H - -#include "test_axl.h" - -int test_axl_async_daemon(); -void test_axl_async_daemon_init(); - -#endif //TEST_AXL_ASYNC_DAEMON_H diff --git a/test/test_axl_async_ibmbb.c b/test/test_axl_async_ibmbb.c deleted file mode 100644 index a11b873..0000000 --- a/test/test_axl_async_ibmbb.c +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (c) 2009, Lawrence Livermore National Security, LLC. - * Produced at the Lawrence Livermore National Laboratory. - * Written by Adam Moody . - * LLNL-CODE-411039. - * All rights reserved. - * KVTree was originally part of The Scalable Checkpoint / Restart (SCR) library. - * For details, see https://sourceforge.net/projects/scalablecr/ - * Please also read this file: LICENSE.TXT. -*/ - -#include "test_axl.h" -#include "test_axl_async_ibmbb.h" - -#include -#include -#include -#include -#include -#include - -#define TEST_PASS (0) -#define TEST_FAIL (1) - -#define TEST_FILE "test_file" -#define TEST_STRING "I am a file" - -#define TEST_NAME "test transfer" -#define TEST_DEST "test_file_moved" - -int test_axl_async_ibmbb(struct test_args *test_args){ - int rc = TEST_PASS; - const char *src_path = test_args->src_path; - const char *dst_path = test_args->dst_path; - printf("Copying %s to %s\n", src_path, dst_path); - - /* Launch axl, reate a transfer, add test file, dispatch */ - if (AXL_Init(NULL) != AXL_SUCCESS) { - rc = TEST_FAIL; - goto cleanup; - } - - int id = AXL_Create(AXL_XFER_ASYNC_BBAPI, TEST_NAME); - if (id < 0) { - rc = TEST_FAIL; - goto cleanup; - } - - if (AXL_Add(id, src_path, dst_path) != AXL_SUCCESS) { - rc = TEST_FAIL; - goto cleanup; - } - - if (AXL_Dispatch(id) != AXL_SUCCESS) { - rc = TEST_FAIL; - goto cleanup; - } - -#if 0 - if (AXL_Cancel(id) != AXL_SUCCESS) { - rc = TEST_FAIL; - goto cleanup; - } - - /* will return AXL_SUCCESS if not cancelled, - * and !AXL_SUCCESS if cancelled (or error) */ - AXL_Wait(id); -#else - /* Wait for transfer to complete and finalize axl */ - if (AXL_Wait(id) != AXL_SUCCESS) { - rc = TEST_FAIL; - goto cleanup; - } -#endif - if (AXL_Free(id) != AXL_SUCCESS) { - rc = TEST_FAIL; - goto cleanup; - } - - if (AXL_Finalize() != AXL_SUCCESS) { - rc = TEST_FAIL; - goto cleanup; - } - - if (axl_compare_files_or_dirs(src_path, dst_path) != 0) { - rc = TEST_FAIL; - goto cleanup; - } - -cleanup: - /* Unlink test files and return rc */ - printf("Removing %s and %s\n", src_path, dst_path); - unlink(src_path); - unlink(dst_path); - return rc; -} - -void test_axl_async_ibmbb_init(){ - register_test(test_axl_async_ibmbb, "test_axl_async_ibmbb"); -} diff --git a/test/test_axl_async_ibmbb.h b/test/test_axl_async_ibmbb.h deleted file mode 100644 index 0ca00e5..0000000 --- a/test/test_axl_async_ibmbb.h +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2009, Lawrence Livermore National Security, LLC. - * Produced at the Lawrence Livermore National Laboratory. - * Written by Adam Moody . - * LLNL-CODE-411039. - * All rights reserved. - * KVTree was originally part of The Scalable Checkpoint / Restart (SCR) library. - * For details, see https://sourceforge.net/projects/scalablecr/ - * Please also read this file: LICENSE.TXT. -*/ - -#ifndef TEST_AXL_ASYNC_IBMBB_H -#define TEST_AXL_ASYNC_IBMBB_H - -#include "test_axl.h" - -int test_axl_async_ibmbb(); -void test_axl_async_ibmbb_init(); - -#endif //TEST_AXL_ASYNC_IBMBB_H diff --git a/test/test_axl_sync.c b/test/test_axl_sync.c deleted file mode 100644 index 98ebfc2..0000000 --- a/test/test_axl_sync.c +++ /dev/null @@ -1,74 +0,0 @@ -#include "test_axl.h" -#include "test_axl_sync.h" - -#include -#include -#include -#include -#include -#include - -#define TEST_PASS (0) -#define TEST_FAIL (1) - -#define TEST_FILE "test_file" -#define TEST_STRING "I am a file" - -#define TEST_NAME "test transfer" -#define TEST_DEST "test_file_moved" - -int test_axl_sync(){ - int rc = TEST_PASS; - - /* Create a file */ - unlink(TEST_FILE); - FILE * fp = fopen(TEST_FILE, "w"); - fputs(TEST_STRING, fp); - fclose(fp); - - /* Create dest path */ - char pwd[PATH_MAX]; - if (getcwd(pwd, sizeof(pwd)) == NULL) { - printf("getcwd() failed: errno=%d (%s)\n", errno, strerror(errno)); - } - char* dest_path = malloc(strlen(pwd) + strlen(TEST_DEST) + 2); - strcpy(dest_path, pwd); - strcat(dest_path, "/"); - strcat(dest_path, TEST_DEST); - unlink(dest_path); - - /* Launch axl, reate a transfer, add test file, dispatch */ - if (AXL_Init(NULL) != AXL_SUCCESS) rc = TEST_FAIL; - int id = AXL_Create(AXL_XFER_SYNC, TEST_NAME); - if (AXL_Add(id, TEST_FILE, dest_path) != AXL_SUCCESS) rc = TEST_FAIL; - if (AXL_Dispatch(id) != AXL_SUCCESS) rc = TEST_FAIL; - - /* Wait for transfer to complete and finalize axl */ - if (AXL_Wait(id) != AXL_SUCCESS) rc = TEST_FAIL; - if (AXL_Free(id) != AXL_SUCCESS) rc = TEST_FAIL; - if (AXL_Finalize() != AXL_SUCCESS) rc = TEST_FAIL; - - /* Check that file arrived properly */ - FILE* dfp = fopen(dest_path, "r"); - if(!dfp) { - rc = TEST_FAIL; - } else { - char* read_str = malloc(strlen(TEST_STRING) + 1); - if (fgets(read_str, strlen(TEST_STRING) + 1, dfp) == NULL) { - printf("fgets() returned NULL\n"); - } - if(strcmp(read_str, TEST_STRING)) rc = TEST_FAIL; - free(read_str); - fclose(dfp); - } - - /* Unlink test files and return rc */ - unlink(TEST_FILE); - unlink(dest_path); - free(dest_path); - return rc; -} - -void test_axl_sync_init(){ - register_test(test_axl_sync, "test_axl_sync"); -} diff --git a/test/test_axl_sync.h b/test/test_axl_sync.h deleted file mode 100644 index fbb9ef6..0000000 --- a/test/test_axl_sync.h +++ /dev/null @@ -1,9 +0,0 @@ -#ifndef TEST_AXL_SYNC_H -#define TEST_AXL_SYNC_H - -#include "test_axl.h" - -int test_axl_sync(); -void test_axl_sync_init(); - -#endif //TEST_KVTREE_ALLOCATE_DELETE_H