Skip to content
This repository has been archived by the owner on Oct 5, 2024. It is now read-only.

Commit

Permalink
Merge pull request #483 from hotosm/fix/replicator
Browse files Browse the repository at this point in the history
Fix/replicator
  • Loading branch information
emi420 authored Mar 11, 2024
2 parents 39bc906 + 4b06c4a commit a0c78da
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 57 deletions.
28 changes: 13 additions & 15 deletions src/replicator/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -675,18 +675,19 @@ RemoteURL::updatePath(int _major, int _minor, int _index)
}

void
RemoteURL::Increment(void)
RemoteURL::increment(void)
{

boost::format majorfmt("%03d");
boost::format minorfmt("%03d");
boost::format indexfmt("%03d");
std::string newpath;
if (minor == 999) {

if (minor == 999 && index == 999) {
major++;
minor = 0;
index = 0;
}
if (index == 999) {
} else if (index == 999) {
minor++;
index = 0;
} else {
Expand All @@ -701,20 +702,20 @@ RemoteURL::Increment(void)
}

void
RemoteURL::Decrement(void)
RemoteURL::decrement(void)
{
boost::format majorfmt("%03d");
boost::format minorfmt("%03d");
boost::format indexfmt("%03d");
std::string newpath;
if (minor == 0) {

if (minor == 000 && index == 000) {
major--;
minor = 0;
index = 0;
}
if (index == 0) {
minor = 999;
index = 999;
} else if (index == 000) {
minor--;
index = 0;
index = 999;
} else {
index--;
}
Expand All @@ -723,10 +724,7 @@ RemoteURL::Decrement(void)
minorfmt % (minor);
indexfmt % (index);

newpath = majorfmt.str() + "/" + minorfmt.str() + "/" + indexfmt.str();
boost::algorithm::replace_all(destdir, subpath, newpath);
boost::algorithm::replace_all(filespec, subpath, newpath);
subpath = newpath;
updatePath(major, minor, index);
}

RemoteURL &
Expand Down
4 changes: 2 additions & 2 deletions src/replicator/replication.hh
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ class RemoteURL {
/// Dump internal data for debugging
void dump(void);
/// Increment the numerical part of the path by one file
void Increment(void);
void increment(void);
/// Decrement the numerical part of the path by one file
void Decrement(void);
void decrement(void);
/// Copy one remote object to another
RemoteURL &operator=(const RemoteURL &inr);
long sequence() const; ///< The sequence number of this path
Expand Down
7 changes: 4 additions & 3 deletions src/replicator/threads.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ getClosest(std::shared_ptr<std::vector<ReplicationTask>> tasks, ptime now) {
void
startMonitorChangesets(std::shared_ptr<replication::RemoteURL> &remote,
const multipolygon_t &poly,
const UnderpassConfig &config)
const UnderpassConfig config)
{
#ifdef TIMING_DEBUG
boost::timer::auto_cpu_timer timer("startMonitorChangesets: took %w seconds\n");
Expand Down Expand Up @@ -172,6 +172,7 @@ startMonitorChangesets(std::shared_ptr<replication::RemoteURL> &remote,
auto last_task = std::make_shared<ReplicationTask>();
bool caughtUpWithNow = false;
bool monitoring = true;

while (monitoring) {
auto tasks = std::make_shared<std::vector<ReplicationTask>>();
i = cores*2;
Expand All @@ -180,7 +181,7 @@ startMonitorChangesets(std::shared_ptr<replication::RemoteURL> &remote,
std::this_thread::sleep_for(delay);
if (last_task->status == reqfile_t::success ||
(last_task->status == reqfile_t::remoteNotFound && !caughtUpWithNow)) {
remote->Increment();
remote->increment();
if (!config.silent) {
remote->dump();
}
Expand Down Expand Up @@ -314,7 +315,7 @@ startMonitorChanges(std::shared_ptr<replication::RemoteURL> &remote,
std::this_thread::sleep_for(delay);
if (last_task->status == reqfile_t::success ||
(last_task->status == reqfile_t::remoteNotFound && !caughtUpWithNow)) {
remote->Increment();
remote->increment();
if (!config.silent) {
remote->dump();
}
Expand Down
2 changes: 1 addition & 1 deletion src/replicator/threads.hh
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ struct ReplicationTask {
extern void
startMonitorChangesets(std::shared_ptr<replication::RemoteURL> &remote,
const multipolygon_t &poly,
const underpassconfig::UnderpassConfig &config
const underpassconfig::UnderpassConfig config
);

/// This updates several fields in the changesets table, which are part of
Expand Down
2 changes: 1 addition & 1 deletion src/testsuite/libunderpass.all/stats-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class TestStats {
auto stats = change.collectStats(boundary);
jsonstr += statsToJSON(stats, osmchange->filespec);

osmchange->Increment();
osmchange->increment();
}

jsonstr.erase(jsonstr.size() - 2);
Expand Down
68 changes: 33 additions & 35 deletions src/underpass.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ int
main(int argc, char *argv[])
{

// The changesets URL path (e.g. "/001/001/999")
std::string starting_url_path;

std::string datadir = "replication/";
std::string boundary = "/etc/underpass/priority.geojson";

Expand Down Expand Up @@ -153,19 +150,20 @@ main(int argc, char *argv[])
if (vm.count("debug")) {
dbglogfile.setVerbosity();
}
if (vm.count("silent")) {
config.silent = true;
}

// Database
if (vm.count("server")) {
config.underpass_db_url = vm["server"].as<std::string>();
}

// Local cache
if (vm.count("destdir_base")) {
config.destdir_base = vm["destdir_base"].as<std::string>();
}

if (vm.count("silent")) {
config.silent = true;
}

// Concurrency
if (vm.count("concurrency")) {
const auto concurrency = vm["concurrency"].as<std::string>();
Expand All @@ -184,6 +182,7 @@ main(int argc, char *argv[])
}

if (vm.count("timestamp") || vm.count("url")) {

// Planet server
if (vm.count("planet")) {
config.planet_server = vm["planet"].as<std::string>();
Expand All @@ -199,15 +198,19 @@ main(int argc, char *argv[])
config.planet_server = config.planet_servers[0].domain;
}

// Priority boundary
multipolygon_t poly;
if (vm.count("boundary")) {
boundary = vm["boundary"].as<std::string>();
}

// Boundary
geoutil::GeoUtil geou;
if (!geou.readFile(boundary)) {
log_debug("Could not find '%1%' area file!", boundary);
}
multipolygon_t * oscboundary = &poly;
if (!vm.count("oscnoboundary")) {
oscboundary = &geou.boundary;
}

// Features
if (vm.count("disable-validation")) {
Expand All @@ -222,26 +225,23 @@ main(int argc, char *argv[])

// Replication
planetreplicator::PlanetReplicator replicator;

std::shared_ptr<std::vector<unsigned char>> data;

if (!starting_url_path.empty() && vm.count("timestamp")) {
if (vm.count("url") && vm.count("timestamp")) {
log_debug("ERROR: 'url' takes precedence over 'timestamp' arguments are mutually exclusive!");
exit(-1);
}

// This is the default data directory on that server
// Default data directory on the server
if (vm.count("datadir")) {
datadir = vm["datadir"].as<std::string>();
}
const char *tmp = std::getenv("DATADIR");
if (tmp != 0) {
datadir = tmp;
}

// Add datadir to config
config.datadir = datadir;

// Frequency: minutely, hourly, daily
if (vm.count("frequency")) {
const auto strfreq = vm["frequency"].as<std::string>();
if (strfreq[0] == 'm') {
Expand Down Expand Up @@ -276,55 +276,52 @@ main(int argc, char *argv[])
}
} else if (vm.count("url")) {
replicator.connectServer("https://" + config.planet_server);
// This is the changesets path part (ex. 000/075/000), takes precedence over 'timestamp'
// option. This only applies to the osm change files, as it's timestamp is used to
// start the changesets.
std::string fullurl = "https://" + config.planet_server + "/replication/" + StateFile::freq_to_string(config.frequency);
std::vector<std::string> parts;
boost::split(parts, vm["url"].as<std::string>(), boost::is_any_of("/"));
// fullurl += "/" + vm["url"].as<std::string>() + "/" + parts[2] + ".state.txt";
fullurl += "/" + vm["url"].as<std::string>() + ".state.txt";

osmchange->parse(fullurl);
osmchange->destdir_base = config.destdir_base;
auto data = replicator.downloadFile(*osmchange).data;
StateFile start(osmchange->filespec, false);
//start.dump();
config.start_time = start.timestamp;
boost::algorithm::replace_all(osmchange->filespec, ".state.txt", ".osc.gz");
}

std::thread changesetThread;
// OsmChanges
std::thread osmChangeThread;

multipolygon_t poly;
if (!vm.count("changesets")) {
multipolygon_t * osmboundary = &poly;
if (!vm.count("osmnoboundary")) {
osmboundary = &geou.boundary;
}
osmchange->destdir_base = config.destdir_base;
osmchange->dump();
osmChangeThread = std::thread(replicatorthreads::startMonitorChanges, std::ref(osmchange),
std::ref(*osmboundary), std::ref(config));
std::ref(*osmboundary), config);
}
config.frequency = replication::changeset;
auto changeset = replicator.findRemotePath(config, config.start_time);
changeset->destdir_base = config.destdir_base;
if (vm.count("changeseturl")) {

// Changesets
std::thread changesetThread;
if (vm.count("changeseturl") || vm.count("timestamp")) {
config.frequency = replication::changeset;
auto changeset = replicator.findRemotePath(config, config.start_time);
changeset->destdir_base = config.destdir_base;
std::vector<std::string> parts;
boost::split(parts, vm["changeseturl"].as<std::string>(), boost::is_any_of("/"));
changeset->updatePath(stoi(parts[0]),stoi(parts[1]),stoi(parts[2]));
if (!config.silent) {
changeset->dump();
}
}
if (!vm.count("osmchanges")) {
multipolygon_t * oscboundary = &poly;
if (!vm.count("oscnoboundary")) {
oscboundary = &geou.boundary;
if (!vm.count("osmchanges")) {
changesetThread = std::thread(replicatorthreads::startMonitorChangesets, std::ref(changeset),
std::ref(*oscboundary), std::ref(config));
}
changesetThread = std::thread(replicatorthreads::startMonitorChangesets, std::ref(changeset),
std::ref(*oscboundary), std::ref(config));
}

// Start processing

if (changesetThread.joinable()) {
changesetThread.join();
}
Expand All @@ -336,6 +333,7 @@ main(int argc, char *argv[])

}

// Bootstrapping
if (vm.count("bootstrap")){
std::thread bootstrapThread;
std::cout << "Starting bootstrapping process ..." << std::endl;
Expand Down

0 comments on commit a0c78da

Please sign in to comment.