Skip to content

Commit

Permalink
improvements and updates to doc
Browse files Browse the repository at this point in the history
Summary: improvements and updates to doc (examples, etc)
improvements to logging and default behavior
some bug fixes (directory time, receiver start time)

Differential Revision: D2262254
  • Loading branch information
ldemailly committed Jul 21, 2015
1 parent 28fb3d1 commit 626f64a
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 41 deletions.
12 changes: 7 additions & 5 deletions BUILD.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ Notes:
If using a vmware image/starting fresh
sudo vmware-config-tools.pl # to setup shared folders

# Cmake 3.1 or later
# Cmake 3.2 or later

wget http://www.cmake.org/files/v3.1/cmake-3.1.0.tar.gz
tar xvfz cmake-3.1.0.tar.gz
cd cmake-3.1.0
See below for mac (Cmake 3.3/head on a mac is recommended for Xcode support)

wget http://www.cmake.org/files/v3.2/cmake-3.2.3.tar.gz
tar xvfz cmake-3.2.3.tar.gz
cd cmake-3.2.3
./bootstrap --prefix=/usr --parallel=16 && make && sudo make install

# Get folly
Expand Down Expand Up @@ -64,7 +66,7 @@ Install Xcode 6 and the command line tools (so
exists)

## cmake
# Until the fix gets to cmake (3.1.1 ? 3.2) we need the latest on a Mac to
# Until 3.3 is out we need the latest on a Mac to
# get CXX 11 support: ( http://www.cmake.org/Bug/view.php?id=15355 )

git clone http://cmake.org/cmake.git
Expand Down
8 changes: 7 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ cmake_minimum_required(VERSION 3.2)
# There is no C per se in WDT but if you use CXX only here many checks fail
# Version is Major.Minor.YYMMDDX for up to 10 releases per day
# Minor currently is also the protocol version - has to match with Protocol.cpp
project("WDT" LANGUAGES C CXX VERSION 1.14.1507170)
project("WDT" LANGUAGES C CXX VERSION 1.14.1507210)

# On MacOS this requires the latest (master) CMake (and/or CMake 3.1.1/3.2)
set(CMAKE_CXX_STANDARD 11)
Expand All @@ -31,6 +31,12 @@ set(CMAKE_CXX_STANDARD_REQUIRED on)
# somehow 'option' for this doesn't seeem to work/I don't know how to make it
set(BUILD_SHARED_LIBS on CACHE Bool "build shared libs")

# CMake default behavior should be to set rpath when needed (non system install)
# it's not so let's set this for now:
set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)
set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib")


# Optimized by default
# TODO: This doesn't seem to work / sets default to "" instead of Release...
# set(CMAKE_BUILD_TYPE Release CACHE String "build type")
Expand Down
3 changes: 2 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ possible.

## Our Development Process

See also README.md

## Pull Requests
We actively welcome your pull requests.
1. Fork the repo and create your branch from `master`.
Expand All @@ -30,7 +32,6 @@ outlined on that page and do not file a public issue.
## Coding Style
* run clangformat.sh


## License
By contributing to WDT, you agree that your contributions will be licensed
under its BSD license.
7 changes: 4 additions & 3 deletions DirectorySourceQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ bool DirectorySourceQueue::buildQueueSynchronously() {
}
}
directoryTime_ = durationSeconds(Clock::now() - startTime);
VLOG(1) << "finished initialization of DirectorySourceQueue";
VLOG(1) << "finished initialization of DirectorySourceQueue in "
<< directoryTime_;
return res;
}

Expand Down Expand Up @@ -286,8 +287,8 @@ bool DirectorySourceQueue::explore() {
}
closedir(dirPtr);
}
LOG(INFO) << "Number of files explored " << numEntries_
<< " errors : " << std::boolalpha << hasError;
LOG(INFO) << "Number of files explored: " << numEntries_
<< ", errors: " << std::boolalpha << hasError;
return !hasError;
}

Expand Down
56 changes: 55 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,48 @@ resource limits to be only hardware limited (disc or network bandwidth
not latency) and as efficient as possible (low CPU/memory/resources
utilization)

We are trying to keep dependencies minimal in order to maximize portability
and ensure smallest binary size. A side benefit is to minimize compile time.

We aren't using exceptions for performance and because using exceptions would
makes it harder to reason about the control flow of the library.
We also believe WDT library is easier to integrate as a result. To some extent
our philosophy is to write moderately structured and encapsultated C code
as opposed to using every feature of C++.

We try to minimize the number of system calls which is one of the reason
we are using blocking thread IOs, we can maximize system throughput because
at any given point some threads are reading while others are writing and data
is buffered on both paths keeping each subsystem busy while minimizing
kernel to user space switches.

## Example

While WDT is essentially a library, we also have a small command line tool
which we use for tests and is useful standalone - a more complete example
is in "wcp.sh" which installs as "wcp" but here is a quick example:

Receiver side: (starts the server indicating destination directory)

[ldemailly@devbig074]$ wdt -directory /data/users/ldemailly/transfer1

Sender side: (discover and sends all files in a directory tree to destination)

[root@dev443]$ wdt -directory /usr/bin -destination devbig074.prn2

[=================================================] 100% 588.8 Mbytes/s
I0720 21:48:08.446014 3245296 Sender.cpp:314] Total sender time = 2.68699
seconds (0.00640992 dirTime). Transfer summary : Transfer status = OK. Number
of files transferred = 1887. Data Mbytes = 1582.08. Header kBytes = 62.083
(0.00383215% overhead). Total bytes = 1658999858. Wasted bytes due to
failure = 0 (0% overhead). Total sender throughput = 588.816 Mbytes/sec
(590.224 Mbytes/sec pure transf rate)

Note that in this simple examples with lots of small files (/usr/bin from
a linux distribution), but not much total data (~1.5Gbyte), the maximum
speed isn't as good as it would with more data (as there is still a tcp ramp
up time even though it's faster because of parallelization) like when we use
it in our production use cases:

## Performance/Results

Expand All @@ -17,7 +59,11 @@ we are able to transfer data at a throttled 600 Mbytes/sec even across
long distance, high latency links (e.g. Sweden to Oregon). That's 3x the speed
of previous highly optimized http based solution and with less strain on the
system. When not throttling we are able to easily saturate a 40 Gbit/s NIC and
get near theoritical link speed (above 4 Gbytes/sec).
get near theoretical link speed (above 4 Gbytes/sec).

We have so far optimized WDT for servers with fast IOs - in particular flash
card or in memory read/writes. If you use disks throughput won't be as good
but we do plan on optimizing for disks as well in the future.

## Dependencies

Expand Down Expand Up @@ -202,6 +248,14 @@ a 1 second delay before we send the first payload byte

## Submitting diffs/making changes

See CONTRIBUTING.md

Please run the tests (make test) and the manual tests (integration upcoming)
wdt_e2e_test.sh
wdt_download_resumption_test.sh
wdt_network_test.sh
wdt_max_send_test.sh

(facebook only:)
Make sure to do the following, before "arc diff":
```
Expand Down
6 changes: 4 additions & 2 deletions Receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,13 @@ void Receiver::markTransferFinished(bool isFinished) {
std::unique_ptr<TransferReport> Receiver::finish() {
std::unique_lock<std::mutex> instanceLock(instanceManagementMutex_);
if (areThreadsJoined_) {
LOG(INFO) << "Threads have already been joined. Returning the "
<< "transfer report";
VLOG(1) << "Threads have already been joined. Returning the "
<< "transfer report";
return getTransferReport();
}
const auto &options = WdtOptions::get();
if (!isJoinable_) {
// TODO: don't complain about this when coming from runForever()
LOG(WARNING) << "The receiver is not joinable. The threads will never"
<< " finish and this method will never return";
}
Expand Down Expand Up @@ -374,6 +375,7 @@ void Receiver::progressTracker() {
}

void Receiver::start() {
startTime_ = Clock::now();
if (hasPendingTransfer()) {
LOG(WARNING) << "There is an existing transfer in progress on this object";
}
Expand Down
14 changes: 7 additions & 7 deletions Receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ class Receiver : public WdtBase {
std::unique_ptr<TransferReport> finish() override;

/**
* Call this method when you don't want the wdt receiver
* to stop after one transfer.
* Call this method instead of transferAsync() when you don't
* want the wdt receiver to stop after one transfer.
*/
ErrorCode runForever();

Expand Down Expand Up @@ -106,17 +106,17 @@ class Receiver : public WdtBase {
*/
bool hasPendingTransfer();

/**
* @param isFinished Mark transfer active/inactive
*/
void markTransferFinished(bool isFinished);

/**
* Use the method to get the list of ports receiver is listening on
*/
std::vector<int32_t> getPorts() const;

protected:
/**
* @param isFinished Mark transfer active/inactive
*/
void markTransferFinished(bool isFinished);

/**
* Wdt receiver has logic to maintain the consistency of the
* the transfers through connection errors. All threads are run by the logic
Expand Down
17 changes: 9 additions & 8 deletions Sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,17 +235,16 @@ Clock::time_point Sender::getEndTime() {

std::unique_ptr<TransferReport> Sender::finish() {
std::unique_lock<std::mutex> instanceLock(instanceManagementMutex_);
VLOG(1) << "Sender::finish()";
if (areThreadsJoined_) {
LOG(INFO) << "Threads have already been joined. Returning the "
<< "minimal transfer report";
VLOG(1) << "Threads have already been joined. Returning the"
<< " existing transfer report";
return getTransferReport();
}
const auto &options = WdtOptions::get();
const bool twoPhases = options.two_phases;
bool progressReportEnabled =
progressReporter_ && progressReportIntervalMillis_ > 0;
double directoryTime;
directoryTime = dirQueue_->getDirectoryTime();
const int64_t numPorts = ports_.size();
for (int64_t i = 0; i < numPorts; i++) {
senderThreads_[i].join();
Expand Down Expand Up @@ -311,6 +310,8 @@ std::unique_ptr<TransferReport> Sender::finish() {
}
LOG(INFO) << report;
}
double directoryTime;
directoryTime = dirQueue_->getDirectoryTime();
LOG(INFO) << "Total sender time = " << totalTime << " seconds ("
<< directoryTime << " dirTime)"
<< ". Transfer summary : " << *transferReport
Expand Down Expand Up @@ -962,8 +963,8 @@ void Sender::sendOne(int threadIndex) {
std::unique_lock<std::mutex> lock(mutex_);
numActiveThreads_--;
if (numActiveThreads_ == 0) {
LOG(WARNING) << "Last thread finished "
<< durationSeconds(Clock::now() - startTime_);
LOG(INFO) << "Last thread finished "
<< durationSeconds(Clock::now() - startTime_);
endTime_ = Clock::now();
transferFinished_ = true;
}
Expand All @@ -986,8 +987,8 @@ void Sender::sendOne(int threadIndex) {
}

double totalTime = durationSeconds(Clock::now() - startTime);
LOG(INFO) << "Got reply - all done for port :" << port << ". "
<< "Transfer stat : " << threadStats << " Total throughput = "
LOG(INFO) << "Port " << port << " done. " << threadStats
<< " Total throughput = "
<< threadStats.getEffectiveTotalBytes() / totalTime / kMbToB
<< " Mbytes/sec";
perfReports_[threadIndex] = *perfStatReport;
Expand Down
4 changes: 2 additions & 2 deletions WdtConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

#define WDT_VERSION_MAJOR 1
#define WDT_VERSION_MINOR 14
#define WDT_VERSION_BUILD 1507170
#define WDT_VERSION_BUILD 1507210
// Add -fbcode to version str
#define WDT_VERSION_STR "1.14.1507170-fbcode"
#define WDT_VERSION_STR "1.14.1507210-fbcode"
// Tie minor and proto version
#define WDT_PROTOCOL_VERSION WDT_VERSION_MINOR

Expand Down
10 changes: 6 additions & 4 deletions wdtCmdLine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include FLAGS_INCLUDE_FILE

// Flags not already in WdtOptions.h/WdtFlags.cpp.inc
DEFINE_bool(run_as_daemon, true,
DEFINE_bool(run_as_daemon, false,
"If true, run the receiver as never ending process");

DEFINE_string(directory, ".", "Source/Destination directory");
Expand Down Expand Up @@ -134,13 +134,15 @@ int main(int argc, char *argv[]) {
<< " from port = " << FLAGS_start_port;
ErrorCode retCode = OK;
if (FLAGS_parse_transfer_log) {
// Log parsing mode
TransferLogManager transferLogManager;
transferLogManager.setRootDir(FLAGS_directory);
if (!transferLogManager.parseAndPrint()) {
LOG(ERROR) << "Transfer log parsing failed";
retCode = ERROR;
}
} else if (FLAGS_destination.empty()) {
// Receiver mode
Receiver receiver(FLAGS_start_port, FLAGS_num_ports, FLAGS_directory);
receiver.setTransferId(FLAGS_transfer_id);
if (FLAGS_protocol_version > 0) {
Expand All @@ -152,16 +154,16 @@ int main(int argc, char *argv[]) {
return 0;
}
setUpAbort(receiver);
// TODO fix this
if (!FLAGS_run_as_daemon) {
receiver.transferAsync();
std::unique_ptr<TransferReport> report = receiver.finish();
retCode = report->getSummary().getErrorCode();
} else {
receiver.runForever();
retCode = OK;
retCode = receiver.runForever();
// not reached
}
} else {
// Sender mode
std::vector<FileInfo> fileInfo;
if (FLAGS_files) {
// Each line should have the filename and optionally
Expand Down
9 changes: 3 additions & 6 deletions wdt_e2e_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ echo "Run from ~/fbcode - or fbmake runtests"
# to 1 : slow/expensive but checks correctness
# to 0 : fast for repeated benchmarking not for correctness
DO_VERIFY=1
NC="nc -4" # ipv4 only
# echo e | $NC was used to stop daemon servers
#NC="nc -4" # ipv4 only
REALPATH=/mnt/vol/engshare/svnroot/tfb/trunk/www/scripts/bin/realpath

# Verbose:
Expand Down Expand Up @@ -105,7 +106,7 @@ mkdir $DIR/extsrc

#cp -R wdt folly /usr/bin /usr/lib /usr/lib64 /usr/libexec /usr/share $DIR/src
#cp -R wdt folly /usr/bin /usr/lib /usr/lib64 /usr/libexec $DIR/src
cp -R wdt folly /usr/share $DIR/src
cp -R wdt folly /usr/bin /usr/lib $DIR/src
#cp -R wdt folly $DIR/src
# Removing symlinks which point to the same source tree
for link in `find -L $DIR/src -xtype l`
Expand Down Expand Up @@ -161,14 +162,10 @@ fi
echo "$WDTBIN -directory $DIR/src -destination $HOSTNAME |& tee $DIR/client.log"
time $WDTBIN -directory $DIR/src -destination $HOSTNAME |& tee $DIR/client.log

echo -n e | $NC localhost 22356

$WDTBIN -directory $DIR/dst_symlinks >> $DIR/server.log 2>&1 &
echo "$WDTBIN -follow_symlinks -directory $DIR/src -destination $HOSTNAME |& tee -a $DIR/client.log"
time $WDTBIN -follow_symlinks -directory $DIR/src -destination $HOSTNAME |& tee -a $DIR/client.log

echo -n e | $NC localhost 22356

# rsync test:
#time rsync --stats -v -W -r $DIR/src/ $DIR/dst/

Expand Down
2 changes: 1 addition & 1 deletion wdt_max_send_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ do
done
echo "Done with staging src test files"

/usr/bin/time -f "$SERVER_PROFILE_FORMAT" $WDTCMD -directory $DIR/dst -skip_writes=$SKIP_WRITES > \
/usr/bin/time -f "$SERVER_PROFILE_FORMAT" $WDTCMD -directory $DIR/dst -run_as_daemon=true -skip_writes=$SKIP_WRITES > \
$DIR/server.log 2>&1 &

# wait for server to be up
Expand Down

0 comments on commit 626f64a

Please sign in to comment.