diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java index f5817d897eb..c50207817a5 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java @@ -107,7 +107,7 @@ public Boolean testConnection(DataNodeRequest request) { } @Override - public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator) { + public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity oldEntity, String operator) { LOGGER.info("do nothing for the data node type ={}", request.getType()); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java index 58a12b73218..1974b544aaa 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java @@ -83,9 +83,9 @@ public interface DataNodeOperator { * Update related stream source. * * @param request data node request - * @param entity data node entity + * @param oldEntity old data node entity * @param operator operator */ - void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator); + void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity oldEntity, String operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java index 145435fef12..c950937dfe5 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.TenantUserTypeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper; @@ -184,7 +185,7 @@ public List list(DataNodePageRequest request, UserInfo opInfo) { @Transactional(rollbackFor = Throwable.class) public Boolean update(DataNodeRequest request, String operator) { LOGGER.info("begin to update data node by id: {}", request); - // check whether record existed + // check whether the record existed DataNodeEntity curEntity = dataNodeMapper.selectById(request.getId()); if (curEntity == null) { throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND, @@ -192,25 +193,18 @@ public Boolean update(DataNodeRequest request, String operator) { } userService.checkUser(curEntity.getInCharges(), operator, "Current user does not have permission to update data node info"); + // check whether modify unmodifiable parameters chkUnmodifiableParams(curEntity, request); - // Check whether the data node name exists with the same name and type - if (request.getName() != null) { - if (StringUtils.isBlank(request.getName())) { - throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, - "the name changed of data node is blank!"); - } - DataNodeEntity existEntity = - dataNodeMapper.selectByUniqueKey(request.getName(), request.getType()); - if (existEntity != null && !existEntity.getId().equals(request.getId())) { - throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE, - String.format("data node already exist for name=%s, type=%s, required id=%s, exist id=%s", - request.getName(), request.getType(), request.getId(), existEntity.getId())); - } - } + + // after the update operation, `curEntity` will be updated to the latest info by the MyBatis cache mechanism, + // so we need to get an `oldEntity` by copying `curEntity` before the update operation. + DataNodeEntity oldEntity = CommonBeanUtils.copyProperties(curEntity, DataNodeEntity::new); DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType()); dataNodeOperator.updateOpt(request, operator); - dataNodeOperator.updateRelatedStreamSource(request, curEntity, operator); + + // update the related stream sources if the request and old entity ha + dataNodeOperator.updateRelatedStreamSource(request, oldEntity, operator); LOGGER.info("success to update data node={}", request); return true; } @@ -218,34 +212,8 @@ public Boolean update(DataNodeRequest request, String operator) { @Override @Transactional(rollbackFor = Throwable.class) public Boolean update(DataNodeRequest request, UserInfo opInfo) { - // check the record existed - DataNodeEntity curEntity = dataNodeMapper.selectById(request.getId()); - if (curEntity == null) { - throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND, - String.format("data node record not found by id=%d", request.getId())); - } - userService.checkUser(curEntity.getInCharges(), opInfo.getName(), - "Current user does not have permission to update data node info"); - // check whether modify unmodifiable parameters - chkUnmodifiableParams(curEntity, request); - // Check whether the data node name exists with the same name and type - if (request.getName() != null) { - if (StringUtils.isBlank(request.getName())) { - throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, - "the name changed of data node is blank!"); - } - DataNodeEntity existEntity = - dataNodeMapper.selectByUniqueKey(request.getName(), request.getType()); - if (existEntity != null && !existEntity.getId().equals(request.getId())) { - throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE, - String.format("data node already exist for name=%s, type=%s, required id=%s, exist id=%s", - request.getName(), request.getType(), request.getId(), existEntity.getId())); - } - } - DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType()); - dataNodeOperator.updateOpt(request, opInfo.getName()); - dataNodeOperator.updateRelatedStreamSource(request, curEntity, opInfo.getName()); - return true; + String operator = opInfo.getName(); + return this.update(request, operator); } @Override diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java index 8336bf6ed39..f55699febca 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java @@ -110,13 +110,13 @@ public Boolean testConnection(DataNodeRequest request) { } @Override - public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator) { - MySQLDataNodeRequest mySQLDataNodeRequest = (MySQLDataNodeRequest) request; - MySQLDataNodeInfo mySQLDataNodeInfo = (MySQLDataNodeInfo) this.getFromEntity(entity); - boolean changed = !Objects.equals(mySQLDataNodeRequest.getUrl(), mySQLDataNodeInfo.getUrl()) - || !Objects.equals(mySQLDataNodeRequest.getBackupUrl(), mySQLDataNodeInfo.getBackupUrl()) - || !Objects.equals(mySQLDataNodeRequest.getUsername(), mySQLDataNodeInfo.getUsername()) - || !Objects.equals(mySQLDataNodeRequest.getToken(), mySQLDataNodeInfo.getToken()); + public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity oldEntity, String operator) { + MySQLDataNodeRequest nodeRequest = (MySQLDataNodeRequest) request; + MySQLDataNodeInfo nodeInfo = (MySQLDataNodeInfo) this.getFromEntity(oldEntity); + boolean changed = !Objects.equals(nodeRequest.getUrl(), nodeInfo.getUrl()) + || !Objects.equals(nodeRequest.getBackupUrl(), nodeInfo.getBackupUrl()) + || !Objects.equals(nodeRequest.getUsername(), nodeInfo.getUsername()) + || !Objects.equals(nodeRequest.getToken(), nodeInfo.getToken()); if (changed) { retryStreamSourceByDataNodeNameAndType(request.getName(), SourceType.MYSQL_SQL, operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserServiceImpl.java index 450699746f6..a3f29d44942 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserServiceImpl.java @@ -357,9 +357,8 @@ public void login(UserLoginRequest req) { public void checkUser(String inCharges, String user, String errMsg) { UserEntity userEntity = userMapper.selectByName(user); boolean isInCharge = Preconditions.inSeparatedString(user, inCharges, InlongConstants.COMMA); - Preconditions.expectTrue( - isInCharge || TenantUserTypeEnum.TENANT_ADMIN.getCode().equals(userEntity.getAccountType()), - errMsg); + Preconditions.expectTrue(isInCharge + || TenantUserTypeEnum.TENANT_ADMIN.getCode().equals(userEntity.getAccountType()), errMsg); } public void removeInChargeForGroup(String user, String operator) {