Skip to content
This repository has been archived by the owner on Oct 5, 2024. It is now read-only.

Commit

Permalink
Merge pull request #501 from hotosm/dev
Browse files Browse the repository at this point in the history
Handle multiple databases, improvements for data management
  • Loading branch information
emi420 authored May 23, 2024
2 parents ad61a24 + 4f7f9e6 commit b581efe
Show file tree
Hide file tree
Showing 14 changed files with 336 additions and 196 deletions.
4 changes: 0 additions & 4 deletions setup/db/indexes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ CREATE UNIQUE INDEX nodes_id_idx ON public.nodes (osm_id DESC);
CREATE UNIQUE INDEX ways_poly_id_idx ON public.ways_poly (osm_id DESC);
CREATE UNIQUE INDEX ways_line_id_idx ON public.ways_line(osm_id DESC);
CREATE UNIQUE INDEX relations_id_idx ON public.relations(osm_id DESC);
CREATE INDEX way_refs_node_id_idx ON public.way_refs (node_id);
CREATE INDEX way_refs_way_id_idx ON public.way_refs (way_id);
CREATE INDEX rel_refs_rel_id_idx ON public.rel_refs (rel_id);
CREATE INDEX rel_refs_way_id_idx ON public.rel_refs (way_id);

CREATE INDEX nodes_version_idx ON public.nodes (version);
CREATE INDEX ways_poly_version_idx ON public.ways_poly (version);
Expand Down
15 changes: 0 additions & 15 deletions setup/db/underpass.sql
Original file line number Diff line number Diff line change
Expand Up @@ -109,25 +109,10 @@ CREATE TABLE IF NOT EXISTS public.relations (
ALTER TABLE ONLY public.relations
ADD CONSTRAINT relations_pkey PRIMARY KEY (osm_id);

CREATE TABLE IF NOT EXISTS public.way_refs (
way_id int8,
node_id int8
);

CREATE TABLE IF NOT EXISTS public.rel_refs (
rel_id int8,
way_id int8
);

CREATE UNIQUE INDEX nodes_id_idx ON public.nodes (osm_id DESC);
CREATE UNIQUE INDEX ways_poly_id_idx ON public.ways_poly (osm_id DESC);
CREATE UNIQUE INDEX ways_line_id_idx ON public.ways_line(osm_id DESC);
CREATE UNIQUE INDEX relations_id_idx ON public.relations(osm_id DESC);
CREATE INDEX way_refs_node_id_idx ON public.way_refs (node_id);
CREATE INDEX way_refs_way_id_idx ON public.way_refs (way_id);

CREATE INDEX rel_refs_rel_id_idx ON public.rel_refs (rel_id);
CREATE INDEX rel_refs_way_id_idx ON public.rel_refs (way_id);

CREATE INDEX nodes_version_idx ON public.nodes (version);
CREATE INDEX ways_poly_version_idx ON public.ways_poly (version);
Expand Down
55 changes: 37 additions & 18 deletions src/bootstrap/bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ BootstrapQueries
Bootstrap::allTasksQueries(std::shared_ptr<std::vector<BootstrapTask>> tasks) {
BootstrapQueries queries;
for (auto it = tasks->begin(); it != tasks->end(); ++it) {
queries.underpass += it->query ;
queries.osm += it->osmquery ;
for (auto itt = it->query.begin(); itt != it->query.end(); ++itt) {
queries.underpass.push_back(*itt);
}
for (auto itt = it->osmquery.begin(); itt != it->osmquery.end(); ++itt) {
queries.osm.push_back(*itt);
}
}
return queries;
}
Expand Down Expand Up @@ -154,8 +158,12 @@ Bootstrap::processWays() {

auto queries = allTasksQueries(tasks);

db->query(queries.underpass);
osmdb->query(queries.osm);
for (auto it = queries.underpass.begin(); it != queries.underpass.end(); ++it) {
db->query(*it);
}
for (auto it = queries.osm.begin(); it != queries.osm.end(); ++it) {
osmdb->query(*it);
}

lastid = ways->back().id;
for (auto it = tasks->begin(); it != tasks->end(); ++it) {
Expand Down Expand Up @@ -206,8 +214,12 @@ Bootstrap::processNodes() {
pool.join();

auto queries = allTasksQueries(tasks);
db->query(queries.underpass);
osmdb->query(queries.osm);
for (auto it = queries.underpass.begin(); it != queries.underpass.end(); ++it) {
db->query(*it);
}
for (auto it = queries.osm.begin(); it != queries.osm.end(); ++it) {
osmdb->query(*it);
}
lastid = nodes->back().id;
for (auto it = tasks->begin(); it != tasks->end(); ++it) {
count += it->processed;
Expand Down Expand Up @@ -256,9 +268,12 @@ Bootstrap::processRelations() {
pool.join();

auto queries = allTasksQueries(tasks);
db->query(queries.underpass);
osmdb->query(queries.osm);

for (auto it = queries.underpass.begin(); it != queries.underpass.end(); ++it) {
db->query(*it);
}
for (auto it = queries.osm.begin(); it != queries.osm.end(); ++it) {
osmdb->query(*it);
}
lastid = relations->back().id;
for (auto it = tasks->begin(); it != tasks->end(); ++it) {
count += it->processed;
Expand Down Expand Up @@ -292,16 +307,15 @@ Bootstrap::threadBootstrapWayTask(WayTask wayTask)
if (i < ways->size()) {
auto way = ways->at(i);
wayval->push_back(validator->checkWay(way, "building"));
// Fill the way_refs table
if (!norefs) {
for (auto ref = way.refs.begin(); ref != way.refs.end(); ++ref) {
task.osmquery += "INSERT INTO way_refs (way_id, node_id) VALUES (" + std::to_string(way.id) + "," + std::to_string(*ref) + "); ";
}
}
++processed;
}
}
queryvalidate->ways(wayval, task.query);

auto result = queryvalidate->ways(wayval);
for (auto it = result->begin(); it != result->end(); ++it) {
task.query.push_back(*it);
}

task.processed = processed;
const std::lock_guard<std::mutex> lock(tasks_change_mutex);
(*tasks)[taskIndex] = task;
Expand Down Expand Up @@ -337,7 +351,12 @@ Bootstrap::threadBootstrapNodeTask(NodeTask nodeTask)
++processed;
}
}
queryvalidate->nodes(nodeval, task.query);

auto result = queryvalidate->nodes(nodeval);
for (auto it = result->begin(); it != result->end(); ++it) {
task.query.push_back(*it);
}

task.processed = processed;
const std::lock_guard<std::mutex> lock(tasks_change_mutex);
(*tasks)[taskIndex] = task;
Expand Down Expand Up @@ -367,7 +386,7 @@ Bootstrap::threadBootstrapRelationTask(RelationTask relationTask)
// relationval->push_back(validator->checkRelation(way, "building"));
// Fill the rel_refs table
for (auto mit = relation.members.begin(); mit != relation.members.end(); ++mit) {
task.osmquery += "INSERT INTO rel_refs (rel_id, way_id) VALUES (" + std::to_string(relation.id) + "," + std::to_string(mit->ref) + "); ";
task.osmquery.push_back("INSERT INTO rel_refs (rel_id, way_id) VALUES (" + std::to_string(relation.id) + "," + std::to_string(mit->ref) + "); ");
}
++processed;
}
Expand Down
9 changes: 4 additions & 5 deletions src/bootstrap/bootstrap.hh
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,18 @@ namespace bootstrap {
/// \struct BootstrapTask
/// \brief Represents a bootstrap task
struct BootstrapTask {
std::string query = "";
std::string osmquery = "";
std::vector<std::string> query;
std::vector<std::string> osmquery;
int processed = 0;
};

/// \struct BootstrapQueries
/// \brief Represents a bootstrap queries list
struct BootstrapQueries {
std::string underpass = "";
std::string osm = "";
std::vector<std::string> underpass;
std::vector<std::string> osm;
};


struct WayTask {
int taskIndex;
std::shared_ptr<std::vector<BootstrapTask>> tasks;
Expand Down
11 changes: 9 additions & 2 deletions src/data/pq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,15 @@ Pq::query(const std::string &query)
{
std::scoped_lock write_lock{pqxx_mutex};
pqxx::work worker(*sdb);
auto result = worker.exec(query);
worker.commit();
pqxx::result result;
try {
result = worker.exec(query);
worker.commit();
} catch (std::exception &e) {
log_error("ERROR executing query %1%", e.what());
// Return an empty result so higher level code can handle the error
return result;
}
return result;
}

Expand Down
2 changes: 1 addition & 1 deletion src/osm/osmchange.cc
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ OsmChange::dump(void)
// rel->dump();
// }
// }
std::cerr << "Final timestamp: " << to_simple_string(final_entry) << std::endl;
// std::cerr << "Final timestamp: " << to_simple_string(final_entry) << std::endl;

}

Expand Down
Loading

0 comments on commit b581efe

Please sign in to comment.