Skip to content

Commit

Permalink
Merge branch 'tickets/DM-29447'
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Oct 24, 2023
2 parents e4c4292 + e1291fb commit 434e406
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 92 deletions.
4 changes: 2 additions & 2 deletions src/replica/ConfigAppBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ void ConfigAppBase::dumpDatabasesAsTable(string const& indent, string const& cap
publishTime.push_back(to_string(database.publishTime));
tableName.push_back(table.name);
isPartitioned.push_back(table.isPartitioned ? "yes" : "no");
isDirector.push_back(table.isDirector ? "yes" : "no");
isRefMatch.push_back(table.isRefMatch ? "yes" : "no");
isDirector.push_back(table.isDirector() ? "yes" : "no");
isRefMatch.push_back(table.isRefMatch() ? "yes" : "no");
directorTable.push_back(table.directorTable.databaseTableName());
directorKey.push_back(table.directorTable.primaryKeyColumn());
directorTable2.push_back(table.directorTable2.databaseTableName());
Expand Down
38 changes: 14 additions & 24 deletions src/replica/ConfigDatabase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ DatabaseInfo DatabaseInfo::parse(json const& obj, map<string, DatabaseFamilyInfo
table.uniquePrimaryKey = tableJson.at("unique_primary_key").get<int>() != 0;
table.latitudeColName = tableJson.at("latitude_key").get<string>();
table.longitudeColName = tableJson.at("longitude_key").get<string>();
table.isDirector = table.isPartitioned && table.directorTable.tableName().empty();
table.isRefMatch = table.isPartitioned && !table.directorTable2.tableName().empty();
}
if (tableJson.count("columns") != 0) {
json const& columns = tableJson.at("columns");
Expand Down Expand Up @@ -156,16 +154,16 @@ vector<string> DatabaseInfo::partitionedTables() const {

vector<string> DatabaseInfo::directorTables() const {
vector<string> result;
for (auto&& itr : _tables) {
if (itr.second.isDirector) result.push_back(itr.first);
for (auto&& [name, table] : _tables) {
if (table.isDirector()) result.push_back(name);
}
return result;
}

vector<string> DatabaseInfo::refMatchTables() const {
vector<string> result;
for (auto&& itr : _tables) {
if (itr.second.isRefMatch) result.push_back(itr.first);
for (auto&& [name, table] : _tables) {
if (table.isRefMatch()) result.push_back(name);
}
return result;
}
Expand Down Expand Up @@ -210,16 +208,16 @@ TableInfo DatabaseInfo::validate(map<std::string, DatabaseInfo> const& databases
throwIf(!table.isPublished && (table.publishTime != 0),
"the publish timestamp of the non-published table is not 0");

bool const isRegularType = !table.isPartitioned && !(table.isDirector || table.isRefMatch);
bool const isPartitionedType = table.isPartitioned && !(table.isDirector && table.isRefMatch);
bool const isRegularType = !table.isPartitioned && !(table.isDirector() || table.isRefMatch());
bool const isPartitionedType = table.isPartitioned && !(table.isDirector() && table.isRefMatch());
throwIfNot(isRegularType || isPartitionedType, "ambiguous table type definition");

if (table.isPartitioned) {
// This collection will get populated with special columns required for
// the table, depending on its declared type.
map<string, string> colDefs;

if (table.isDirector) {
if (table.isDirector()) {
throwIfNot(table.directorTable.tableName().empty() && table.directorTable2.empty(),
"the director table can't be the dependant of other director(s)");

Expand All @@ -241,7 +239,7 @@ TableInfo DatabaseInfo::validate(map<std::string, DatabaseInfo> const& databases
// This column is required for the director tables to allow Qserv materialize
// sub-chunks in the near-neighbour queries.
colDefs.insert({"subChunkIdColName", lsst::qserv::SUB_CHUNK_COLUMN});
} else if (table.isRefMatch) {
} else if (table.isRefMatch()) {
throwIf(table.directorTable.empty() || table.directorTable2.empty(),
"incomplete definition of the directors for the RefMatch table");
throwIf(table.directorTable == table.directorTable2,
Expand All @@ -262,7 +260,7 @@ TableInfo DatabaseInfo::validate(map<std::string, DatabaseInfo> const& databases
throwIfNot(database->tableExists(tableRef.tableName()),
"non-existing director '" + tableRef.tableName() +
"' referenced in the RefMatch definition");
throwIfNot(database->findTable(tableRef.tableName()).isDirector,
throwIfNot(database->findTable(tableRef.tableName()).isDirector(),
"table '" + tableRef.tableName() +
"' referenced in the RefMatch definition isn't the director");
}
Expand Down Expand Up @@ -294,7 +292,7 @@ TableInfo DatabaseInfo::validate(map<std::string, DatabaseInfo> const& databases
" table spec of the dependent tables");
throwIfNot(tableExists(table.directorTable.tableName()),
"non-existing director table referenced in the dependent table definition");
throwIfNot(findTable(table.directorTable.tableName()).isDirector,
throwIfNot(findTable(table.directorTable.tableName()).isDirector(),
"a table referenced in the dependent table definition isn't the director table");
throwIfNot(table.directorTable2.empty(), "the dependent table can't have the second director");

Expand Down Expand Up @@ -350,11 +348,11 @@ TableInfo DatabaseInfo::sanitize(TableInfo const& table_) const {
table.publishTime = 0;
}
if (table.isPartitioned) {
if (table.isDirector != table.isRefMatch) {
if (table.isDirector() != table.isRefMatch()) {
// For the known specialization of the partitioned table type sanitize
// other attributes depending on the type. Note that such explicit
// specialization always takes precedence.
if (table.isDirector) {
if (table.isDirector()) {
table.directorTable = DirectorTableRef("", table.directorTable.primaryKeyColumn());
table.directorTable2 = DirectorTableRef();
table.flagColName = string();
Expand All @@ -363,38 +361,30 @@ TableInfo DatabaseInfo::sanitize(TableInfo const& table_) const {
table.latitudeColName = string();
table.longitudeColName = string();
}
} else if (table.isDirector && table.isRefMatch) {
} else if (table.isDirector() && table.isRefMatch()) {
// It's impossible to do anything here due to the explicitly
// made table type ambiguity.
;
} else {
// If neither type flags were set then try deducing the table type based
// on the presence of the director table columns.
if (table.directorTable.tableName().empty()) {
table.isDirector = true;
table.isRefMatch = false;
table.directorTable = DirectorTableRef("", table.directorTable.primaryKeyColumn());
table.directorTable2 = DirectorTableRef();
table.flagColName = string();
table.angSep = 0;
} else {
if (table.directorTable2.tableName().empty()) {
table.isDirector = false;
table.isRefMatch = false;
table.directorTable2 = DirectorTableRef();
table.flagColName = string();
table.angSep = 0;
} else {
table.isDirector = false;
table.isRefMatch = true;
table.latitudeColName = string();
table.longitudeColName = string();
}
}
}
} else {
table.isDirector = false;
table.isRefMatch = false;
table.directorTable = DirectorTableRef();
table.directorTable2 = DirectorTableRef();
table.latitudeColName = string();
Expand All @@ -414,7 +404,7 @@ void DatabaseInfo::removeTable(string const& tableName) {
"'.");
}
TableInfo& thisTableInfo = thisTableItr->second;
if (thisTableInfo.isDirector) {
if (thisTableInfo.isDirector()) {
// Make sure no dependent tables exists for this director
// among other partitioned tables.
for (auto&& itr : _tables) {
Expand Down
8 changes: 2 additions & 6 deletions src/replica/ConfigParserMySQL.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ void ConfigParserMySQL::_parseDatabases() {
table.latitudeColName = _parseParam<string>("latitude_key");
table.longitudeColName = _parseParam<string>("longitude_key");
table.isPartitioned = _parseParam<int>("is_partitioned") != 0;
table.isDirector = table.isPartitioned && table.directorTable.tableName().empty() &&
!table.directorTable.primaryKeyColumn().empty() && table.directorTable2.empty();
table.isRefMatch =
table.isPartitioned && !table.directorTable.empty() && !table.directorTable2.empty();
tables.emplace_back(table);
}

Expand All @@ -163,10 +159,10 @@ void ConfigParserMySQL::_parseDatabases() {
// This algorithm will enforce the referential integrity between the partitioned tables.
// Pushing partitioned tables in the wrong order will fail the registration.
for (auto&& table : tables) {
if (table.isDirector) _databases[table.database].addTable(_databases, table);
if (table.isDirector()) _databases[table.database].addTable(_databases, table);
}
for (auto&& table : tables) {
if (!table.isDirector) _databases[table.database].addTable(_databases, table);
if (!table.isDirector()) _databases[table.database].addTable(_databases, table);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/replica/ConfigTable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ json TableInfo::toJson() const {
result["create_time"] = createTime;
result["publish_time"] = publishTime;
result["is_partitioned"] = isPartitioned ? 1 : 0;
result["is_director"] = isDirector ? 1 : 0;
result["is_ref_match"] = isRefMatch ? 1 : 0;
result["is_director"] = isDirector() ? 1 : 0;
result["is_ref_match"] = isRefMatch() ? 1 : 0;
result["director_table"] = directorTable.databaseTableName();
result["director_database_name"] = directorTable.databaseName();
result["director_table_name"] = directorTable.tableName();
Expand Down
8 changes: 5 additions & 3 deletions src/replica/ConfigTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,13 @@ class TableInfo {
uint64_t createTime = 0;
uint64_t publishTime = 0;

// The type of the table is determined by these flags.
// The type of the table is determined by these attributes.

bool isPartitioned = false;
bool isDirector = false;
bool isRefMatch = false;
bool isDirector() const { return isPartitioned && directorTable.tableName().empty(); }
bool isRefMatch() const {
return isPartitioned && !directorTable.empty() && !directorTable2.tableName().empty();
}

/**
* @brief The "director" table (if any).
Expand Down
2 changes: 0 additions & 2 deletions src/replica/ConfigTestApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,6 @@ bool ConfigTestApp::_testTables() {
table1.name = "director-1";
table1.database = database;
table1.isPartitioned = true;
table1.isDirector = true;
table1.directorTable = DirectorTableRef("", "objectId");
table1.uniquePrimaryKey = false;
table1.latitudeColName = "decl";
Expand All @@ -905,7 +904,6 @@ bool ConfigTestApp::_testTables() {
table2.name = "director-2";
table2.database = database;
table2.isPartitioned = true;
table2.isDirector = true;
table2.directorTable = DirectorTableRef("", "id");
table2.latitudeColName = "coord_decl";
table2.longitudeColName = "coord_ra";
Expand Down
2 changes: 1 addition & 1 deletion src/replica/DirectorIndexJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ DirectorIndexJob::DirectorIndexJob(string const& databaseName, string const& dir
_onFinish(onFinish) {
try {
_database = controller->serviceProvider()->config()->databaseInfo(databaseName);
if (!_database.findTable(directorTableName).isDirector) {
if (!_database.findTable(directorTableName).isDirector()) {
throw runtime_error(context() + "::" + string(__func__) + " no such director table '" +
directorTableName + "' in the database: '" + _database.name + "'.");
}
Expand Down
2 changes: 1 addition & 1 deletion src/replica/HttpDirectorIndexModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ json HttpDirectorIndexModule::_buildDirectorIndex() {

auto const database = config->databaseInfo(databaseName);
auto const table = database.findTable(directorTableName);
if (!table.isDirector) {
if (!table.isDirector()) {
string const msg = "table '" + table.name + "' is not configured as a director table in database '" +
database.name + "'";
throw HttpError(__func__, msg);
Expand Down
40 changes: 32 additions & 8 deletions src/replica/HttpIngestModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

// Third party headers
#include "boost/filesystem.hpp"
#include "boost/algorithm/string.hpp"

// Qserv headers
#include "css/CssAccess.h"
Expand Down Expand Up @@ -97,6 +98,23 @@ string jobCompletionErrorIfAny(SqlJob::Ptr const& job, string const& prefix) {
return error;
}

/**
* Check if the provided type is not prohibited for using in the 'director' tables.
* @param func A context for error reporting.
* @param colName The name of a column (used for error reporting).
* @param colType The column type definition to be evaluated.
* @throw lsst::qserv::replica::HttpError if the type validation failed.
*/
void validateColumnType(string const& func, string const& colName, string const& colType) {
string const colTypeUpperCase = boost::algorithm::to_upper_copy(colType);
for (char const* prohibitedType : {"BLOB", "TEXT"}) {
if (colTypeUpperCase.find(prohibitedType) == string::npos) continue;
string const msg = "the prohibited type '" + colType + "' detected in a definition of the column '" +
colName + "' of the director table";
throw HttpError(func, msg);
}
}

} // namespace

namespace lsst::qserv::replica {
Expand Down Expand Up @@ -565,7 +583,13 @@ json HttpIngestModule::_addTable() {
throw HttpError(__func__, msg);
}
string colType = column["type"];

if (table.isDirector()) {
// Schemas of the director tables require reinforced screening of the column
// types to prevent large variable size types from being used in column
// definitions. Such types will prevent Qserv from materialzing sub-chunks
// as the MEMORY tables.
::validateColumnType(__func__, colName, colType);
}
if (_partitionByColumn == colName) {
string const msg = "reserved column '" + _partitionByColumn + "' is not allowed";
throw HttpError(__func__, msg);
Expand Down Expand Up @@ -621,7 +645,7 @@ json HttpIngestModule::_addTable() {
// This operation can be vetoed by a catalog ingest workflow at the database
// registration time.
if (autoBuildDirectorIndex(database.name)) {
if (table.isDirector) {
if (table.isDirector()) {
_createDirectorIndex(config->databaseInfo(database.name), table.name);
}
}
Expand Down Expand Up @@ -678,7 +702,7 @@ json HttpIngestModule::_deleteTable() {
bool const ifExists = true;
conn->execute(g.dropTable(g.id(database.name, table.name), ifExists));
// Remove the director index (if any)
if (table.isDirector) {
if (table.isDirector()) {
string const query = g.dropTable(
g.id("qservMeta", directorIndexTableName(database.name, table.name)), ifExists);
conn->execute(query);
Expand Down Expand Up @@ -1180,7 +1204,7 @@ void HttpIngestModule::_publishDatabaseInMaster(DatabaseInfo const& database) co
// Skip tables that have been published.
if (table.isPublished) continue;
if (!cssAccess->containsTable(database.name, table.name)) {
if (table.isRefMatch) {
if (table.isRefMatch()) {
css::MatchTableParams const matchParams(
table.directorTable.databaseTableName(), table.directorTable.primaryKeyColumn(),
table.directorTable2.databaseTableName(), table.directorTable2.primaryKeyColumn(),
Expand All @@ -1190,9 +1214,9 @@ void HttpIngestModule::_publishDatabaseInMaster(DatabaseInfo const& database) co
// These parameters need to be set correctly for the 'director' and dependent
// tables to avoid confusing Qserv query analyzer. Also note, that the 'overlap'
// is set to be the same for all 'director' tables of the database family.
double const overlap = table.isDirector ? databaseFamilyInfo.overlap : 0;
double const overlap = table.isDirector() ? databaseFamilyInfo.overlap : 0;
bool const isPartitioned = true;
bool const hasSubChunks = table.isDirector;
bool const hasSubChunks = table.isDirector();
css::PartTableParams const partParams(database.name, table.directorTable.tableName(),
table.directorTable.primaryKeyColumn(),
table.latitudeColName, table.longitudeColName, overlap,
Expand Down Expand Up @@ -1297,7 +1321,7 @@ json HttpIngestModule::_buildEmptyChunksListImpl(string const& databaseName, boo
void HttpIngestModule::_createDirectorIndex(DatabaseInfo const& database,
string const& directorTableName) const {
auto const& table = database.findTable(directorTableName);
if (!table.isDirector) {
if (!table.isDirector()) {
throw logic_error("table '" + table.name + "' is not configured in database '" + database.name +
"' as the director table");
}
Expand Down Expand Up @@ -1355,7 +1379,7 @@ void HttpIngestModule::_createDirectorIndex(DatabaseInfo const& database,
void HttpIngestModule::_consolidateDirectorIndex(DatabaseInfo const& database,
string const& directorTableName) const {
auto const table = database.findTable(directorTableName);
if (!table.isDirector) {
if (!table.isDirector()) {
throw logic_error("table '" + table.name + "' is not configured in database '" + database.name +
"' as the director table");
}
Expand Down
4 changes: 2 additions & 2 deletions src/replica/HttpIngestTransModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ void HttpIngestTransModule::_addPartitionToDirectorIndex(DatabaseInfo const& dat
TransactionId transactionId,
string const& directorTableName) const {
auto const table = database.findTable(directorTableName);
if (!table.isDirector) {
if (!table.isDirector()) {
throw logic_error("table '" + table.name + "' is not configured in database '" + database.name +
"' as the director table");
}
Expand All @@ -505,7 +505,7 @@ void HttpIngestTransModule::_removePartitionFromDirectorIndex(DatabaseInfo const
TransactionId transactionId,
string const& directorTableName) const {
auto const table = database.findTable(directorTableName);
if (!table.isDirector) {
if (!table.isDirector()) {
throw logic_error("table '" + table.name + "' is not configured in database '" + database.name +
"' as the director table");
}
Expand Down
2 changes: 1 addition & 1 deletion src/replica/HttpQservMonitorModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ json HttpQservMonitorModule::_css() {
vector<string> sharedScanTables;
for (string const& tableName : database.tables()) {
auto const table = database.findTable(tableName);
if (table.isPartitioned && !table.isRefMatch) {
if (table.isPartitioned && !table.isRefMatch()) {
sharedScanTables.emplace_back(table.name);
// Set the empty object as the default result for each table.
resultSharedScan[familyName][database.name][table.name] = json::object();
Expand Down
Loading

0 comments on commit 434e406

Please sign in to comment.