Skip to content

Commit

Permalink
Add pthreads xfer, axl_cp, and update AXL_Add for directories
Browse files Browse the repository at this point in the history
- Add a new AXL_XFER_PTHREAD transfer.  This uses multiple threads to transfer
  the files.

- Add an 'axl_cp' command.  axl_cp works like the standard 'cp' command, but
  allows you to specify the AXL transfer type (sync, pthreads, bbapi, etc...).
  It will also serve as good example code for AXL users.  Along with this,
  test_axl.c has been replaced with a 'test_axl' bash script.  This will make
  it a lot easier to write test cases.  The new test_axl script currently
  tests both sync and pthread transfers via calls to axl_cp.

- Allow passing directories to AXL_Add().  If a directory is passed, all
  files and subdirs within the dir are recursively copied.  Basically,
  AXL_Add() will follow the same semantics as the 'cp' command.  This is
  conceptually easier for the user to understand, and saves them from having to
  traverse their directories manually.  Note that the recursive directory
  copy will currently not copy empty subdirectories.

Signed-off-by: Tony Hutter <[email protected]>
  • Loading branch information
tonyhutter authored and gonsie committed Jun 1, 2019
1 parent 6e592c2 commit 42ff1dd
Show file tree
Hide file tree
Showing 20 changed files with 865 additions and 716 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ before_install:

script:
- cd $TRAVIS_BUILD_DIR
- cmake -DCMAKE_C_COMPILER=$CC -DCMAKE_CXX_COMPILER=$CXX -DCMAKE_C_FLAGS="-Wall -coverage" -DMPI=OFF -DAXL_ASYNC_DAEMON=ON -DWITH_KVTREE_PREFIX=$HOME/KVTree/kvtree-install
- cmake -DCMAKE_C_COMPILER=$CC -DCMAKE_CXX_COMPILER=$CXX -DCMAKE_C_FLAGS="-Wall -coverage" -DMPI=OFF -DWITH_KVTREE_PREFIX=$HOME/KVTree/kvtree-install
- make -k VERBOSE=1
- make check

Expand Down
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ IF(ZLIB_FOUND)
LIST(APPEND AXL_LINK_LINE "-lz")
ENDIF(ZLIB_FOUND)

# PTHREADS
FIND_PACKAGE(Threads REQUIRED)
IF(CMAKE_THREAD_LIBS_INIT)
LIST(APPEND AXL_EXTERNAL_LIBS "-lpthread")
ENDIF()

## CPPR
IF(${AXL_ASYNC_API} STREQUAL "INTEL_CPPR")
FIND_PACKAGE(CPPR REQUIRED)
Expand Down
2 changes: 2 additions & 0 deletions doc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ from the destination.

* AXL\_XFER\_SYNC - this is a synchronous transfer, which does not return until the files have been fully copied. It uses POSIX I/O to directly read/write files.

* AXL\_XFER\_PTHREADS - Like AXL\_XFER\_SYNC, but use multiple threads to do the copy.

* AXL\_XFER\_ASYNC\_DAEMON - this methods currently forks a process that sleeps in the background waiting for work to do. The library communicates with this daemon via a shared metadata file. When there are files to be copied, the daemon itself uses POSIX I/O to directly read/write files. This works, but dealing with the forked process is clunky. It needs to be rewritten to use a pthread instead of forking a process.

* AXL\_XFER\_ASYNC\_BBAPI - this method uses [IBM's Burst Buffer API](https://github.com/IBM/CAST) to transfer files. IBM's system software then takes over to move data in the background. It's actually using NVMeoF, reading data from the local SSD from a remote node, so that the compute node is not really bothered once started.
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ LIST(APPEND libaxl_srcs
axl_err.c
axl_io.c
axl_util.c
axl_pthread.c
)

ADD_LIBRARY(axl_o OBJECT ${libaxl_srcs})
Expand Down
181 changes: 167 additions & 14 deletions src/axl.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdio.h>

/* PATH_MAX */
#include <limits.h>

/* dirname */
#include <libgen.h>
Expand All @@ -8,6 +13,9 @@
#include <sys/types.h>
#include <sys/stat.h>

/* opendir */
#include <dirent.h>

/* axl_xfer_t */
#include "axl.h"

Expand All @@ -24,6 +32,7 @@
/*#include "axl_async_cppr.h" */
#include "axl_async_daemon.h"
#include "axl_async_datawarp.h"
#include "axl_pthread.h"

/* define states for transfer handlesto help ensure
* users call AXL functions in the correct order */
Expand Down Expand Up @@ -241,7 +250,7 @@ int AXL_Create (axl_xfer_t xtype, const char* name)
}

/* Create an entry for this transfer handle
* record user string and transfer type
* record user string and transfer type
* UID
* id
* NAME
Expand All @@ -265,6 +274,7 @@ int AXL_Create (axl_xfer_t xtype, const char* name)
case AXL_XFER_ASYNC_DAEMON:
case AXL_XFER_ASYNC_DW:
case AXL_XFER_ASYNC_CPPR:
case AXL_XFER_PTHREAD:
break;
case AXL_XFER_ASYNC_BBAPI:
rc = axl_async_create_bbapi(id);
Expand All @@ -289,11 +299,33 @@ int AXL_Create (axl_xfer_t xtype, const char* name)
return id;
}

/* Add a file to an existing transfer handle */
int AXL_Add (int id, const char* source, const char* destination)
/* Is this path a file or a directory? Return the type. */
enum {PATH_UNKNOWN = 0, PATH_FILE, PATH_DIR};
static int path_type(const char *path) {
struct stat s;
if (stat(path, &s) != 0) {
return 0;
}
if (S_ISREG(s.st_mode)) {
return PATH_FILE;
}
if (S_ISDIR(s.st_mode)) {
return PATH_DIR;
}
return PATH_UNKNOWN;
}

/*
* Add a file to an existing transfer handle. No directories.
*
* If the file's destination path doesn't exist, then automatically create the
* needed directories.
*/
static int
__AXL_Add (int id, const char* src, const char* dest)
{
/* lookup transfer info for the given id */
kvtree* file_list = NULL;

axl_xfer_t xtype = AXL_XFER_NULL;
axl_xfer_state_t xstate = AXL_XFER_STATE_NULL;
if (axl_get_info(id, &file_list, &xtype, &xstate) != AXL_SUCCESS) {
Expand All @@ -316,21 +348,22 @@ int AXL_Add (int id, const char* source, const char* destination)
* /path/to/dest/file
* STATUS
* SOURCE */
kvtree* src_hash = kvtree_set_kv(file_list, AXL_KEY_FILES, source);
kvtree_util_set_str(src_hash, AXL_KEY_FILE_DEST, destination);
kvtree* src_hash = kvtree_set_kv(file_list, AXL_KEY_FILES, src);
kvtree_util_set_str(src_hash, AXL_KEY_FILE_DEST, dest);
kvtree_util_set_int(src_hash, AXL_KEY_STATUS, AXL_STATUS_SOURCE);

/* add file to transfer data structure, depending on its type */
int rc = AXL_SUCCESS;
switch (xtype) {
case AXL_XFER_SYNC:
case AXL_XFER_PTHREAD:
break;
case AXL_XFER_ASYNC_DAEMON:
break;
case AXL_XFER_ASYNC_DW:
break;
case AXL_XFER_ASYNC_BBAPI:
rc = axl_async_add_bbapi(id, source, destination);
rc = axl_async_add_bbapi(id, src, dest);
break;
case AXL_XFER_ASYNC_CPPR:
break;
Expand All @@ -348,13 +381,122 @@ int AXL_Add (int id, const char* source, const char* destination)
return rc;
}

/*
* Add a file or directory to the transfer handle. If the src is a
* directory, recursively add all the files and directories in that
* directory.
*/
int AXL_Add (int id, const char *src, const char *dest)
{
int rc;
DIR *dir;
struct dirent *de;
char *src_copy, *dest_copy;
char *src_basename;
unsigned int src_path_type, dest_path_type;
char *new_dest, *new_src, *final_dest;

new_dest = calloc(PATH_MAX, 1);
if (!new_dest) {
return ENOMEM;
}
new_src = calloc(PATH_MAX, 1);
if (!new_src) {
free(new_dest);
return ENOMEM;
}
final_dest = calloc(PATH_MAX, 1);
if (!final_dest) {
free(new_dest);
free(new_src);
return ENOMEM;
}

src_copy = strdup(src);
dest_copy = strdup(dest);

src_path_type = path_type(src);
dest_path_type = path_type(dest);
src_basename = basename(src_copy);

switch (src_path_type) {
case PATH_FILE:
if (dest_path_type == PATH_DIR) {
/*
* They passed a source file, with dest directory. Append the
* filename to dest.
*
* Before:
* src dest
* /tmp/file1 /tmp/mydir
*
* After:
* /tmp/file1 /tmp/mydir/file1
*/

snprintf(new_dest, PATH_MAX, "%s/%s", dest, src_basename);
rc = __AXL_Add(id, src, new_dest);
} else {
/* The destination is a filename */
rc = __AXL_Add(id, src, dest);
}
break;
case PATH_DIR:
/* Add the directory itself first... */
if (dest_path_type == PATH_FILE) {
/* We can't copy a directory onto a file */
rc = EINVAL;
break;
} else if (dest_path_type == PATH_DIR) {
snprintf(new_dest, PATH_MAX, "%s/%s", dest, src_basename);
} else {
/* Our destination doesn't exist */
snprintf(new_dest, PATH_MAX, "%s", dest);
}

/* Traverse all files/dirs in the directory. */
dir = opendir(src);
if (!dir) {
rc = ENOENT;
break;
}
while ((de = readdir(dir)) != NULL) {
/* Skip '.' and '..' directories */
if ((strcmp(de->d_name, ".") == 0) || (strcmp(de->d_name, "..") == 0)) {
continue;
}
snprintf(new_src, PATH_MAX, "%s/%s", src, de->d_name);
snprintf(final_dest, PATH_MAX, "%s/%s", new_dest, de->d_name);

rc = AXL_Add(id, new_src, final_dest);
if (rc != AXL_SUCCESS) {
rc = EINVAL;
break;
}
}
break;

default:
rc = EINVAL;
break;
}

free(dest_copy);
free(src_copy);
free(final_dest);
free(new_src);
free(new_dest);
return rc;
}

/* Initiate a transfer for all files in handle ID */
int AXL_Dispatch (int id)
{
/* lookup transfer info for the given id */
kvtree* file_list = NULL;
axl_xfer_t xtype = AXL_XFER_NULL;
axl_xfer_state_t xstate = AXL_XFER_STATE_NULL;

if (axl_get_info(id, &file_list, &xtype, &xstate) != AXL_SUCCESS) {
AXL_ERR("Could not find transfer info for UID %d", id);
return AXL_FAILURE;
Expand All @@ -371,19 +513,16 @@ int AXL_Dispatch (int id)
kvtree_elem* elem;
kvtree* files = kvtree_get(file_list, AXL_KEY_FILES);
for (elem = kvtree_elem_first(files); elem != NULL; elem = kvtree_elem_next(elem)) {
/* get path to source file */
char* source = kvtree_elem_key(elem);

/* get hash for this file */
kvtree* elem_hash = kvtree_elem_hash(elem);

/* get destination for this file */
char* destination;
kvtree_util_get_str(elem_hash, AXL_KEY_FILE_DEST, &destination);
char* dest;
kvtree_util_get_str(elem_hash, AXL_KEY_FILE_DEST, &dest);

/* figure out and create dirs that should exist */
/* TODO: vendors may implement smarter functions for mkdir */
char* dest_path = strdup(destination);
char* dest_path = strdup(dest);
char* dest_dir = dirname(dest_path);
mode_t mode_dir = axl_getmode(1, 1, 1);
axl_mkdir(dest_dir, mode_dir);
Expand All @@ -398,6 +537,9 @@ int AXL_Dispatch (int id)
case AXL_XFER_SYNC:
rc = axl_sync_start(id);
break;
case AXL_XFER_PTHREAD:
rc = axl_pthread_start(id);
break;
case AXL_XFER_ASYNC_DAEMON:
rc = axl_async_start_daemon(id);
break;
Expand Down Expand Up @@ -462,6 +604,9 @@ int AXL_Test (int id)
case AXL_XFER_SYNC:
rc = axl_sync_test(id);
break;
case AXL_XFER_PTHREAD:
rc = axl_pthread_test(id);
break;
case AXL_XFER_ASYNC_DAEMON:
rc = axl_async_test_daemon(id, &bytes_total, &bytes_written);
break;
Expand Down Expand Up @@ -495,7 +640,6 @@ int AXL_Wait (int id)
AXL_ERR("Could not find transfer info for UID %d", id);
return AXL_FAILURE;
}

/* check that handle is in correct state to wait */
if (xstate != AXL_XFER_STATE_DISPATCHED) {
AXL_ERR("Invalid state to wait UID %d", id);
Expand All @@ -506,6 +650,7 @@ int AXL_Wait (int id)
/* lookup status for the transfer, return if done */
int status;
kvtree_util_get_int(file_list, AXL_KEY_STATUS, &status);

if (status == AXL_STATUS_DEST) {
return AXL_SUCCESS;
} else if (status == AXL_STATUS_ERROR) {
Expand All @@ -521,6 +666,9 @@ int AXL_Wait (int id)
case AXL_XFER_SYNC:
rc = axl_sync_wait(id);
break;
case AXL_XFER_PTHREAD:
rc = axl_pthread_wait(id);
break;
case AXL_XFER_ASYNC_DAEMON:
rc = axl_async_wait_daemon(id);
break;
Expand Down Expand Up @@ -625,11 +773,16 @@ int AXL_Free (int id)
kvtree* file_list = NULL;
axl_xfer_t xtype = AXL_XFER_NULL;
axl_xfer_state_t xstate = AXL_XFER_STATE_NULL;

if (axl_get_info(id, &file_list, &xtype, &xstate) != AXL_SUCCESS) {
AXL_ERR("Could not find transfer info for UID %d", id);
return AXL_FAILURE;
}

if (xtype == AXL_XFER_PTHREAD) {
axl_pthread_free(id);
}

/* check that handle is in correct state to free */
if (xstate != AXL_XFER_STATE_CREATED &&
xstate != AXL_XFER_STATE_COMPLETED)
Expand Down
1 change: 1 addition & 0 deletions src/axl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ typedef enum {
* shmem). If there is no native API, fall back
* to AXL_XFER_DEFAULT.
*/
AXL_XFER_PTHREAD, /* parallel copy using pthreads */
} axl_xfer_t;

/* Read configuration from non-AXL-specific file
Expand Down
2 changes: 2 additions & 0 deletions src/axl_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ extern kvtree* axl_file_lists;
#define AXL_KEY_FILE_DEST ("DEST")
#define AXL_KEY_FILE_STATUS ("STATUS")
#define AXL_KEY_FILE_CRC ("CRC")
#define AXL_KEY_PTHREAD_DATA ("PTHREAD_DATA")


/* TRANSFER STATUS */
#define AXL_STATUS_SOURCE (1)
Expand Down
Loading

0 comments on commit 42ff1dd

Please sign in to comment.