Skip to content

Commit

Permalink
[Enhancement] (nereids)implement CreateRepositoryCommand in nereids
Browse files Browse the repository at this point in the history
  • Loading branch information
rijeshkp committed Jan 11, 2025
1 parent 9fc6b8d commit d8a317e
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ supportedCreateStatement
| CREATE SQL_BLOCK_RULE (IF NOT EXISTS)?
name=identifier properties=propertyClause? #createSqlBlockRule
| CREATE ENCRYPTKEY (IF NOT EXISTS)? multipartIdentifier AS STRING_LITERAL #createEncryptkey
| CREATE (READ ONLY)? REPOSITORY name=identifier WITH storageBackend #createRepository
;

supportedAlterStatement
Expand Down Expand Up @@ -765,7 +766,6 @@ unsupportedCreateStatement
| CREATE USER (IF NOT EXISTS)? grantUserIdentify
(SUPERUSER | DEFAULT ROLE role=STRING_LITERAL)?
passwordOption (COMMENT STRING_LITERAL)? #createUser
| CREATE (READ ONLY)? REPOSITORY name=identifier WITH storageBackend #createRepository
| CREATE INDEX (IF NOT EXISTS)? name=identifier
ON tableName=multipartIdentifier identifierList
(USING (BITMAP | NGRAM_BF | INVERTED))?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.doris.fs.remote.AzureFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.remote.S3FileSystem;
import org.apache.doris.nereids.DorisParser.StorageBackendContext;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
Expand Down Expand Up @@ -228,6 +229,32 @@ public void createRepository(CreateRepositoryStmt stmt) throws DdlException {
}
}

// handle create repository in nereids stmt
public void createRepositoryInNereids(boolean isReadOnly, String name, StorageBackendContext storage,
StorageBackend.StorageType type, Map<String, String> properties)
throws DdlException {
if (!env.getBrokerMgr().containsBroker(storage.brokerName.getText())
&& type == StorageBackend.StorageType.BROKER) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"broker does not exist: " + storage.brokerName.getText());
}

RemoteFileSystem fileSystem = FileSystemFactory.get(storage.brokerName.getText(), type,
properties);
long repoId = env.getNextId();
Repository repo = new Repository(repoId, name, isReadOnly, storage.LOCATION().getText(), fileSystem);

Status st = repoMgr.addAndInitRepoIfNotExist(repo, false);
if (!st.ok()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Failed to create repository: " + st.getErrMsg());
}
if (!repo.ping()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Failed to create repository: failed to connect to the repo");
}
}

public void alterRepository(AlterRepositoryStmt stmt) throws DdlException {
tryLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import org.apache.doris.nereids.DorisParser.CreateFileContext;
import org.apache.doris.nereids.DorisParser.CreateMTMVContext;
import org.apache.doris.nereids.DorisParser.CreateProcedureContext;
import org.apache.doris.nereids.DorisParser.CreateRepositoryContext;
import org.apache.doris.nereids.DorisParser.CreateRoleContext;
import org.apache.doris.nereids.DorisParser.CreateRoutineLoadContext;
import org.apache.doris.nereids.DorisParser.CreateRowPolicyContext;
Expand Down Expand Up @@ -515,6 +516,7 @@
import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateProcedureCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateRepositoryCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateRoleCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateSqlBlockRuleCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
Expand Down Expand Up @@ -1728,7 +1730,7 @@ public LogicalPlan visitCreateRoutineLoad(CreateRoutineLoadContext ctx) {
}
String comment = visitCommentSpec(ctx.commentSpec());
Map<String, LoadProperty> loadPropertyMap = new HashMap<>();
for (DorisParser.LoadPropertyContext oneLoadPropertyCOntext : ctx.loadProperty()) {
for (LoadPropertyContext oneLoadPropertyCOntext : ctx.loadProperty()) {
LoadProperty loadProperty = visitLoadProperty(oneLoadPropertyCOntext);
if (loadProperty == null) {
throw new AnalysisException("invalid clause of routine load");
Expand Down Expand Up @@ -5453,5 +5455,29 @@ public LogicalPlan visitShowConvertLsc(ShowConvertLscContext ctx) {
}
return new ShowConvertLSCCommand(databaseName);
}

@Override
public LogicalPlan visitCreateRepository(CreateRepositoryContext ctx) {

boolean isReadOnly;

isReadOnly = (ctx.READ() != null && ctx.ONLY() != null);

Map<String, String> properties = ctx.storageBackend().propertyClause() != null
? Maps.newHashMap(visitPropertyClause(ctx.storageBackend().propertyClause())) : Maps.newHashMap();

StorageBackend.StorageType type = null;

if (ctx.storageBackend().S3() != null) {
type = StorageBackend.StorageType.S3;
} else if (ctx.storageBackend().HDFS() != null) {
type = StorageBackend.StorageType.HDFS;
} else if (ctx.storageBackend().LOCAL() != null) {
type = StorageBackend.StorageType.LOCAL;
} else if (ctx.storageBackend().BROKER() != null) {
type = StorageBackend.StorageType.BROKER;
}
return new CreateRepositoryCommand(isReadOnly, ctx.name.getText(), ctx.storageBackend(), type, properties);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -279,5 +279,6 @@ public enum PlanType {
SHOW_QUERY_PROFILE_COMMAND,
SWITCH_COMMAND,
HELP_COMMAND,
USE_COMMAND
USE_COMMAND,
CREATE_REPOSITORY_COMMAND
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.StmtType;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.DorisParser.StorageBackendContext;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;

import java.util.Map;

/** CreateViewCommand */
public class CreateRepositoryCommand extends Command implements ForwardWithSync {
private final boolean isReadOnly;
private final String name;
private final StorageBackendContext storage;
private final StorageBackend.StorageType type;
private final Map<String, String> properties;

/** CreateRepositoryCommand */
public CreateRepositoryCommand(boolean isReadOnly, String name, StorageBackendContext storage,
StorageBackend.StorageType type, Map<String, String> properties) {
super(PlanType.CREATE_REPOSITORY_COMMAND);
this.isReadOnly = isReadOnly;
this.name = name;
this.storage = storage;
this.type = type;
this.properties = properties;
}

@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
// check auth
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
FeNameFormat.checkCommonName("repository", name);

Env.getCurrentEnv().getBackupHandler().createRepositoryInNereids(isReadOnly, name, storage, type, properties);
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitCreateRepositoryCommand(this, context);
}

@Override
public StmtType stmtType() {
return StmtType.CREATE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateProcedureCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateRepositoryCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateRoleCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateSqlBlockRuleCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
Expand Down Expand Up @@ -757,4 +758,9 @@ default R visitUseCommand(UseCommand useCommand, C context) {
default R visitAlterDatabaseRenameCommand(AlterDatabaseRenameCommand alterDatabaseRenameCommand, C context) {
return visitCommand(alterDatabaseRenameCommand, context);
}

default R visitCreateRepositoryCommand(CreateRepositoryCommand createRepositoryCommand,
C context) {
return visitCommand(createRepositoryCommand, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package ddl.repository
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.junit.Assert;

suite("create_repository_command") {
String ak = getS3AK()
String sk = getS3SK()
String endpoint = getS3Endpoint()
String region = getS3Region()
String bucket = context.config.otherConfigs.get("s3BucketName");
String repoName = "test_create_repository"
String readOnlyRepoName = "test_create_read_only_repository"

//cloud-mode
if (isCloudMode()) {
return
}

try_sql "DROP REPOSITORY `${repoName}`"
try_sql "DROP REPOSITORY `${readOnlyRepoName}`"

def create_read_only_repo = checkNereidsExecute("""CREATE READ ONLY REPOSITORY `${readOnlyRepoName}`
WITH S3
ON LOCATION "s3://${bucket}/${readOnlyRepoName}"
PROPERTIES
(
"s3.endpoint" = "http://${endpoint}",
"s3.region" = "${region}",
"s3.access_key" = "${ak}",
"s3.secret_key" = "${sk}"
)""");

def create_repo = checkNereidsExecute("""CREATE REPOSITORY `${repoName}`
WITH S3
ON LOCATION "s3://${bucket}/${repoName}"
PROPERTIES
(
"s3.endpoint" = "http://${endpoint}",
"s3.region" = "${region}",
"s3.access_key" = "${ak}",
"s3.secret_key" = "${sk}"
)""");

def test_create_repo = checkNereidsExecuteWithResult("""SHOW CREATE REPOSITORY for `${repoName}`;""").toString();
assertTrue(test_create_repo.contains("${repoName}"))
assertTrue(test_create_repo.contains("s3://${bucket}/${repoName}"))

assertTrue(test_create_repo.contains("${repoName}"))
assertTrue(test_create_repo.contains("s3://${bucket}/${repoName}"))

def test_read_only_repo = checkNereidsExecuteWithResult("""SHOW CREATE REPOSITORY for `${readOnlyRepoName}`;""").toString();
assertTrue(test_read_only_repo.contains("${readOnlyRepoName}"))
assertTrue(test_read_only_repo.contains("READ ONLY"))

sql """DROP REPOSITORY `${repoName}`;"""
sql """DROP REPOSITORY `${readOnlyRepoName}`;"""

try {
sql """SHOW CREATE REPOSITORY for `${repoName}`;"""
} catch (Exception e) {
assertTrue(e.getMessage().contains("repository not exist."))
}
}

0 comments on commit d8a317e

Please sign in to comment.