Skip to content

Commit

Permalink
HiveCatalog supports client pool
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 25, 2024
1 parent d6d2929 commit 3a05ea3
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
Expand Down Expand Up @@ -114,7 +113,7 @@ public class HiveCatalog extends AbstractCatalog {

private final HiveConf hiveConf;
private final String clientClassName;
private final IMetaStoreClient client;
private final HiveClientPool clients;
private final String warehouse;

private final LocationHelper locationHelper;
Expand Down Expand Up @@ -145,22 +144,22 @@ public HiveCatalog(
locationHelper = new StorageLocationHelper();
}

this.client = createClient(hiveConf, clientClassName);
int clientPoolSize = options.get(CatalogOptions.CLIENT_POOL_SIZE);
this.clients = new HiveClientPool(clientPoolSize, hiveConf, clientClassName);
}

@Override
public Optional<CatalogLock.LockContext> lockContext() {
return Optional.of(
new HiveCatalogLock.HiveLockContext(
new SerializableHiveConf(hiveConf), clientClassName));
new HiveCatalogLock.HiveLockContext(clients, new SerializableHiveConf(hiveConf)));
}

@Override
public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier identifier) {
try {
return Optional.of(
new HiveMetastoreClient.Factory(
identifier, getDataTableSchema(identifier), hiveConf, clientClassName));
identifier, getDataTableSchema(identifier), clients));
} catch (TableNotExistException e) {
throw new RuntimeException(
"Table " + identifier + " does not exist. This is unexpected.", e);
Expand All @@ -172,17 +171,19 @@ public Path getDataTableLocation(Identifier identifier) {
try {
String databaseName = identifier.getDatabaseName();
String tableName = identifier.getObjectName();
if (client.tableExists(databaseName, tableName)) {
if (clients.run(client -> client.tableExists(databaseName, tableName))) {
String location =
locationHelper.getTableLocation(client.getTable(databaseName, tableName));
locationHelper.getTableLocation(
clients.run(client -> client.getTable(databaseName, tableName)));
if (location != null) {
return new Path(location);
}
} else {
// If the table does not exist,
// we should use the database path to generate the table path.
String dbLocation =
locationHelper.getDatabaseLocation(client.getDatabase(databaseName));
locationHelper.getDatabaseLocation(
clients.run(client -> client.getDatabase(databaseName)));
if (dbLocation != null) {
return new Path(dbLocation, tableName);
}
Expand All @@ -191,28 +192,35 @@ public Path getDataTableLocation(Identifier identifier) {
return super.getDataTableLocation(identifier);
} catch (TException e) {
throw new RuntimeException("Can not get table " + identifier + " from metastore.", e);
} catch (InterruptedException e) {
throw convertedInterruptedException(
"Interrupted in call to get data table location", e);
}
}

@Override
public List<String> listDatabases() {
try {
return client.getAllDatabases();
return clients.run(client -> client.getAllDatabases());
} catch (TException e) {
throw new RuntimeException("Failed to list all databases", e);
} catch (InterruptedException e) {
throw convertedInterruptedException("Interrupted in call to list all database", e);
}
}

@Override
protected boolean databaseExistsImpl(String databaseName) {
try {
client.getDatabase(databaseName);
clients.run(client -> client.getDatabase(databaseName));
return true;
} catch (NoSuchObjectException e) {
return false;
} catch (TException e) {
throw new RuntimeException(
"Failed to determine if database " + databaseName + " exists", e);
} catch (InterruptedException e) {
throw convertedInterruptedException("Interrupted in call to get database", e);
}
}

Expand All @@ -226,9 +234,15 @@ protected void createDatabaseImpl(String name, Map<String, String> properties) {
: new Path(database.getLocationUri());
locationHelper.createPathIfRequired(databasePath, fileIO);
locationHelper.specifyDatabaseLocation(databasePath, database);
client.createDatabase(database);
clients.run(
client -> {
client.createDatabase(database);
return null;
});
} catch (TException | IOException e) {
throw new RuntimeException("Failed to create database " + name, e);
} catch (InterruptedException e) {
throw convertedInterruptedException("Interrupted in call to create database", e);
}
}

Expand All @@ -253,10 +267,13 @@ private Database convertToHiveDatabase(String name, Map<String, String> properti
@Override
public Map<String, String> loadDatabasePropertiesImpl(String name) {
try {
return convertToProperties(client.getDatabase(name));
return convertToProperties(clients.run(client -> client.getDatabase(name)));
} catch (TException e) {
throw new RuntimeException(
String.format("Failed to get database %s properties", name), e);
} catch (InterruptedException e) {
throw convertedInterruptedException(
"Interrupted in call to load database properties", e);
}
}

Expand All @@ -274,19 +291,25 @@ private Map<String, String> convertToProperties(Database database) {
@Override
protected void dropDatabaseImpl(String name) {
try {
Database database = client.getDatabase(name);
Database database = clients.run(client -> client.getDatabase(name));
String location = locationHelper.getDatabaseLocation(database);
locationHelper.dropPathIfRequired(new Path(location), fileIO);
client.dropDatabase(name, true, false, true);
clients.run(
client -> {
client.dropDatabase(name, true, false, true);
return null;
});
} catch (TException | IOException e) {
throw new RuntimeException("Failed to drop database " + name, e);
} catch (InterruptedException e) {
throw convertedInterruptedException("Interrupted in call to drop database", e);
}
}

@Override
protected List<String> listTablesImpl(String databaseName) {
try {
return client.getAllTables(databaseName).stream()
return clients.run(client -> client.getAllTables(databaseName)).stream()
.filter(
tableName -> {
Identifier identifier = new Identifier(databaseName, tableName);
Expand All @@ -295,6 +318,8 @@ protected List<String> listTablesImpl(String databaseName) {
.collect(Collectors.toList());
} catch (TException e) {
throw new RuntimeException("Failed to list all tables in database " + databaseName, e);
} catch (InterruptedException e) {
throw convertedInterruptedException("Interrupted in call to list tables", e);
}
}

Expand All @@ -306,13 +331,20 @@ public boolean tableExists(Identifier identifier) {

Table table;
try {
table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
table =
clients.run(
client ->
client.getTable(
identifier.getDatabaseName(),
identifier.getObjectName()));
} catch (NoSuchObjectException e) {
return false;
} catch (TException e) {
throw new RuntimeException(
"Cannot determine if table " + identifier.getFullName() + " is a paimon table.",
e);
} catch (InterruptedException e) {
throw convertedInterruptedException("Interrupted in call to get table", e);
}

return isPaimonTable(table) || LegacyHiveClasses.isPaimonTable(table);
Expand All @@ -338,8 +370,16 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis
@Override
protected void dropTableImpl(Identifier identifier) {
try {
client.dropTable(
identifier.getDatabaseName(), identifier.getObjectName(), true, false, true);
clients.run(
client -> {
client.dropTable(
identifier.getDatabaseName(),
identifier.getObjectName(),
true,
false,
true);
return null;
});

// When drop a Hive external table, only the hive metadata is deleted and the data files
// are not deleted.
Expand All @@ -364,6 +404,8 @@ protected void dropTableImpl(Identifier identifier) {
}
} catch (TException e) {
throw new RuntimeException("Failed to drop table " + identifier.getFullName(), e);
} catch (InterruptedException e) {
throw convertedInterruptedException("Interrupted in call to drop table", e);
}
}

Expand All @@ -388,7 +430,11 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX));
try {
updateHmsTable(table, identifier, tableSchema);
client.createTable(table);
clients.run(
client -> {
client.createTable(table);
return null;
});
} catch (Exception e) {
Path path = getDataTableLocation(identifier);
try {
Expand All @@ -405,10 +451,14 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
try {
String fromDB = fromTable.getDatabaseName();
String fromTableName = fromTable.getObjectName();
Table table = client.getTable(fromDB, fromTableName);
Table table = clients.run(client -> client.getTable(fromDB, fromTableName));
table.setDbName(toTable.getDatabaseName());
table.setTableName(toTable.getObjectName());
client.alter_table(fromDB, fromTableName, table);
clients.run(
client -> {
client.alter_table(fromDB, fromTableName, table);
return null;
});

Path fromPath = getDataTableLocation(fromTable);
if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) {
Expand All @@ -427,10 +477,17 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {

// update location
locationHelper.specifyTableLocation(table, toPath.toString());
client.alter_table(toTable.getDatabaseName(), toTable.getObjectName(), table);
clients.run(
client -> {
client.alter_table(
toTable.getDatabaseName(), toTable.getObjectName(), table);
return null;
});
}
} catch (TException e) {
throw new RuntimeException("Failed to rename table " + fromTable.getFullName(), e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

Expand All @@ -444,11 +501,23 @@ protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)

try {
// sync to hive hms
Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
Table table =
clients.run(
client ->
client.getTable(
identifier.getDatabaseName(),
identifier.getObjectName()));
updateHmsTablePars(table, schema);
updateHmsTable(table, identifier, schema);
client.alter_table(
identifier.getDatabaseName(), identifier.getObjectName(), table, true);
clients.run(
client -> {
client.alter_table(
identifier.getDatabaseName(),
identifier.getObjectName(),
table,
true);
return null;
});
} catch (Exception te) {
schemaManager.deleteSchema(schema.id());
throw new RuntimeException(te);
Expand All @@ -462,7 +531,9 @@ public boolean caseSensitive() {

@Override
public void close() throws Exception {
client.close();
if (!clients.isClosed()) {
clients.close();
}
}

@Override
Expand Down Expand Up @@ -565,8 +636,8 @@ private void updateHmsTablePars(Table table, TableSchema schema) {
}

@VisibleForTesting
public IMetaStoreClient getHmsClient() {
return client;
public HiveClientPool getHmsClient() {
return clients;
}

private FieldSchema convertToFieldSchema(DataField dataField) {
Expand All @@ -587,14 +658,10 @@ private Lock lock(Identifier identifier) {
}

HiveCatalogLock lock =
new HiveCatalogLock(client, checkMaxSleep(hiveConf), acquireTimeout(hiveConf));
new HiveCatalogLock(clients, checkMaxSleep(hiveConf), acquireTimeout(hiveConf));
return Lock.fromCatalog(lock, identifier);
}

static IMetaStoreClient createClient(HiveConf hiveConf, String clientClassName) {
return new RetryingMetaStoreClientFactory().createClient(hiveConf, clientClassName);
}

public static HiveConf createHiveConf(
@Nullable String hiveConfDir,
@Nullable String hadoopConfDir,
Expand Down Expand Up @@ -766,4 +833,9 @@ public static Configuration getHadoopConfiguration(String hadoopConfDir) {
public static String possibleHiveConfPath() {
return System.getenv("HIVE_CONF_DIR");
}

private RuntimeException convertedInterruptedException(String message, InterruptedException e) {
Thread.currentThread().interrupt();
return new RuntimeException(message, e);
}
}
Loading

0 comments on commit 3a05ea3

Please sign in to comment.