Skip to content

Commit

Permalink
[feat](kerberos)Iceberg catalog supports kerberos authentication
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs committed Nov 7, 2024
1 parent 67ee7f5 commit 54c9695
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.security.authentication;

import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;

public class PreExecutionAuthenticator {

private HadoopAuthenticator hadoopAuthenticator;

public PreExecutionAuthenticator(HadoopAuthenticator authenticator) {
this.hadoopAuthenticator = authenticator;
}

public PreExecutionAuthenticator() {
}

public <T> T execute(Callable<T> task) throws Exception {
if (hadoopAuthenticator != null) {
PrivilegedExceptionAction<T> action = new CallableToPrivilegedExceptionActionAdapter<>(task);
return hadoopAuthenticator.doAs(action);
} else {
return task.call();
}
}

public HadoopAuthenticator getHadoopAuthenticator() {
return hadoopAuthenticator;
}

public void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) {
this.hadoopAuthenticator = hadoopAuthenticator;
}

public class CallableToPrivilegedExceptionActionAdapter<T> implements PrivilegedExceptionAction<T> {
private final Callable<T> callable;

public CallableToPrivilegedExceptionActionAdapter(Callable<T> callable) {
this.callable = callable;
}

@Override
public T run() throws Exception {
return callable.call();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.iceberg;

import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
Expand All @@ -42,6 +43,8 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
protected String icebergCatalogType;
protected Catalog catalog;

protected PreExecutionAuthenticator preExecutionAuthenticator;

public IcebergExternalCatalog(long catalogId, String name, String comment) {
super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
}
Expand All @@ -51,6 +54,7 @@ public IcebergExternalCatalog(long catalogId, String name, String comment) {

@Override
protected void initLocalObjectsImpl() {
preExecutionAuthenticator = new PreExecutionAuthenticator();
initCatalog();
IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog);
transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.datasource.iceberg;

import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;
Expand Down Expand Up @@ -46,6 +48,12 @@ protected void initCatalog() {
catalogProperties.put(CatalogProperties.URI, metastoreUris);
hiveCatalog.initialize(getName(), catalogProperties);
catalog = hiveCatalog;
if (preExecutionAuthenticator.getHadoopAuthenticator() == null) {
AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration());
HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
preExecutionAuthenticator.setHadoopAuthenticator(authenticator);
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.datasource.DorisTypeVisitor;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
Expand All @@ -53,11 +54,14 @@ public class IcebergMetadataOps implements ExternalMetadataOps {
protected Catalog catalog;
protected IcebergExternalCatalog dorisCatalog;
protected SupportsNamespaces nsCatalog;
private PreExecutionAuthenticator preExecutionAuthenticator;

public IcebergMetadataOps(IcebergExternalCatalog dorisCatalog, Catalog catalog) {
this.dorisCatalog = dorisCatalog;
this.catalog = catalog;
nsCatalog = (SupportsNamespaces) catalog;
this.preExecutionAuthenticator = dorisCatalog.preExecutionAuthenticator;

}

public Catalog getCatalog() {
Expand Down Expand Up @@ -95,6 +99,18 @@ public List<String> listTableNames(String dbName) {

@Override
public void createDb(CreateDbStmt stmt) throws DdlException {
try {
preExecutionAuthenticator.execute(() -> {
performCreateDb(stmt);
return null;

});
} catch (Exception e) {
throw new DdlException("Failed to create database: " + stmt.getFullDbName(), e);
}
}

private void performCreateDb(CreateDbStmt stmt) throws DdlException {
SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
String dbName = stmt.getFullDbName();
Map<String, String> properties = stmt.getProperties();
Expand All @@ -109,14 +125,25 @@ public void createDb(CreateDbStmt stmt) throws DdlException {
String icebergCatalogType = dorisCatalog.getIcebergCatalogType();
if (!properties.isEmpty() && !IcebergExternalCatalog.ICEBERG_HMS.equals(icebergCatalogType)) {
throw new DdlException(
"Not supported: create database with properties for iceberg catalog type: " + icebergCatalogType);
"Not supported: create database with properties for iceberg catalog type: " + icebergCatalogType);
}
nsCatalog.createNamespace(Namespace.of(dbName), properties);
dorisCatalog.onRefreshCache(true);
}

@Override
public void dropDb(DropDbStmt stmt) throws DdlException {
try {
preExecutionAuthenticator.execute(() -> {
preformDropDb(stmt);
return null;
});
} catch (Exception e) {
throw new DdlException("Failed to drop database: " + stmt.getDbName(), e);
}
}

private void preformDropDb(DropDbStmt stmt) throws DdlException {
String dbName = stmt.getDbName();
if (!databaseExist(dbName)) {
if (stmt.isSetIfExists()) {
Expand All @@ -133,6 +160,15 @@ public void dropDb(DropDbStmt stmt) throws DdlException {

@Override
public boolean createTable(CreateTableStmt stmt) throws UserException {
try {
preExecutionAuthenticator.execute(() -> performCreateTable(stmt));
} catch (Exception e) {
throw new DdlException("Failed to create table: " + stmt.getTableName(), e);
}
return false;
}

public boolean performCreateTable(CreateTableStmt stmt) throws UserException {
String dbName = stmt.getDbName();
ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
if (db == null) {
Expand Down Expand Up @@ -165,6 +201,17 @@ public boolean createTable(CreateTableStmt stmt) throws UserException {

@Override
public void dropTable(DropTableStmt stmt) throws DdlException {
try {
preExecutionAuthenticator.execute(() -> {
performDropTable(stmt);
return null;
});
} catch (Exception e) {
throw new DdlException("Failed to drop table: " + stmt.getTableName(), e);
}
}

private void performDropTable(DropTableStmt stmt) throws DdlException {
String dbName = stmt.getDbName();
ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
if (db == null) {
Expand All @@ -187,4 +234,8 @@ public void dropTable(DropTableStmt stmt) throws DdlException {
public void truncateTable(String dbName, String tblName, List<String> partitions) {
throw new UnsupportedOperationException("Truncate Iceberg table is not supported.");
}

public PreExecutionAuthenticator getPreExecutionAuthenticator() {
return preExecutionAuthenticator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,23 @@ public void finishInsert(SimpleTableInfo tableInfo, Optional<InsertCommandContex
if (LOG.isDebugEnabled()) {
LOG.info("iceberg table {} insert table finished!", tableInfo);
}

//create and start the iceberg transaction
TUpdateMode updateMode = TUpdateMode.APPEND;
if (insertCtx.isPresent()) {
updateMode = ((BaseExternalTableInsertCommandContext) insertCtx.get()).isOverwrite() ? TUpdateMode.OVERWRITE
: TUpdateMode.APPEND;
try {
ops.getPreExecutionAuthenticator().execute(() -> {
//create and start the iceberg transaction
TUpdateMode updateMode = TUpdateMode.APPEND;
if (insertCtx.isPresent()) {
updateMode = ((BaseExternalTableInsertCommandContext) insertCtx.get()).isOverwrite()
? TUpdateMode.OVERWRITE
: TUpdateMode.APPEND;
}
updateManifestAfterInsert(updateMode);
return null;
});
} catch (Exception e) {
LOG.warn("Failed to finish insert for iceberg table {}.", tableInfo, e);
throw new RuntimeException(e);
}
updateManifestAfterInsert(updateMode);

}

private void updateManifestAfterInsert(TUpdateMode updateMode) {
Expand Down

0 comments on commit 54c9695

Please sign in to comment.