diff --git a/python/cloudtik/core/_private/providers.py b/python/cloudtik/core/_private/providers.py index a7a81899c..3dd007d8c 100644 --- a/python/cloudtik/core/_private/providers.py +++ b/python/cloudtik/core/_private/providers.py @@ -51,6 +51,13 @@ def _import_kubernetes(provider_config): return KubernetesNodeProvider +# TODO(ChenRui): implement NodeProvider +def _import_huaweicloud(provider_config): + from cloudtik.providers._private.huaweicloud.node_provider import \ + HUAWEICLOUDNodeProvider + return HUAWEICLOUDNodeProvider + + def _load_local_provider_home(): import cloudtik.providers.local as local_provider return os.path.dirname(local_provider.__file__) @@ -76,6 +83,11 @@ def _load_azure_provider_home(): return os.path.dirname(azure_provider.__file__) +def _load_huaweicloud_provider_home(): + import cloudtik.providers.huaweicloud as huaweicloud_provider + return os.path.dirname(huaweicloud_provider.__file__) + + def _load_local_defaults_config(): return os.path.join(_load_local_provider_home(), "defaults.yaml") @@ -96,6 +108,10 @@ def _load_azure_defaults_config(): return os.path.join(_load_azure_provider_home(), "defaults.yaml") +def _load_huaweicloud_defaults_config(): + return os.path.join(_load_huaweicloud_provider_home(), "defaults.yaml") + + def _import_external(provider_config): provider_cls = _load_class(path=provider_config["provider_class"]) return provider_cls @@ -107,6 +123,7 @@ def _import_external(provider_config): "gcp": _import_gcp, "azure": _import_azure, "kubernetes": _import_kubernetes, + "huaweicloud": _import_huaweicloud, "external": _import_external # Import an external module } @@ -116,6 +133,7 @@ def _import_external(provider_config): "gcp": "GCP", "azure": "Azure", "kubernetes": "Kubernetes", + "huaweicloud": "HUAWEICLOUD", "external": "External" } @@ -125,6 +143,7 @@ def _import_external(provider_config): "gcp": _load_gcp_provider_home, "azure": _load_azure_provider_home, "kubernetes": _load_kubernetes_provider_home, + "huaweicloud": _load_huaweicloud_provider_home, } _DEFAULT_CONFIGS = { @@ -133,6 +152,7 @@ def _import_external(provider_config): "gcp": _load_gcp_defaults_config, "azure": _load_azure_defaults_config, "kubernetes": _load_kubernetes_defaults_config, + "huaweicloud": _load_huaweicloud_defaults_config, } # For caching workspace provider instantiations across API calls of one python session @@ -166,12 +186,19 @@ def _import_kubernetes_workspace(provider_config): return KubernetesWorkspaceProvider +def _import_huaweicloud_workspace(provider_config): + from cloudtik.providers._private.huaweicloud.workspace_provider import \ + HUAWEICLOUDWorkspaceProvider + return HUAWEICLOUDWorkspaceProvider + + _WORKSPACE_PROVIDERS = { "local": _import_local_workspace, "aws": _import_aws_workspace, "gcp": _import_gcp_workspace, "azure": _import_azure_workspace, "kubernetes": _import_kubernetes_workspace, + "huaweicloud": _import_huaweicloud_workspace, "external": _import_external # Import an external module } diff --git a/python/cloudtik/core/workspace-schema.json b/python/cloudtik/core/workspace-schema.json index c19e36d87..d3842077b 100644 --- a/python/cloudtik/core/workspace-schema.json +++ b/python/cloudtik/core/workspace-schema.json @@ -26,7 +26,7 @@ "properties": { "type": { "type": "string", - "description": "e.g. aws, azure, gcp,..." + "description": "e.g. aws, azure, gcp, huaweicloud,..." }, "region": { "type": "string", @@ -159,6 +159,22 @@ } } }, + "huaweicloud_credentials": { + "type": "object", + "description": "Credentials for authenticating with HUAWEI CLOUD. If not specified, will use environment default.", + "required": [ "huaweicloud_access_key", "huaweicloud_secret_key" ], + "additionalProperties": false, + "properties": { + "huaweicloud_access_key": { + "type": "string", + "description": "The access key to use when creating the client." + }, + "huaweicloud_secret_key": { + "type": "string", + "description": "The secret key to use when creating the client." + } + } + }, "managed_cloud_storage": { "type": "boolean", "description": "Whether to create managed cloud storage of workspace.", diff --git a/python/cloudtik/providers/_private/huaweicloud/__init__.py b/python/cloudtik/providers/_private/huaweicloud/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/cloudtik/providers/_private/huaweicloud/config.py b/python/cloudtik/providers/_private/huaweicloud/config.py new file mode 100644 index 000000000..6afdefc1c --- /dev/null +++ b/python/cloudtik/providers/_private/huaweicloud/config.py @@ -0,0 +1,811 @@ +import copy +import logging +import time + +import requests +from huaweicloudsdkecs.v2 import ListServersDetailsRequest +from huaweicloudsdkeip.v2 import CreatePublicipBandwidthOption, \ + CreatePublicipOption, CreatePublicipRequest, \ + CreatePublicipRequestBody, DeletePublicipRequest, ListPublicipsRequest +from huaweicloudsdknat.v2 import CreateNatGatewayOption, \ + CreateNatGatewayRequest, \ + CreateNatGatewayRequestBody, CreateNatGatewaySnatRuleOption, \ + CreateNatGatewaySnatRuleRequest, \ + CreateNatGatewaySnatRuleRequestOption, DeleteNatGatewayRequest, \ + DeleteNatGatewaySnatRuleRequest, \ + ListNatGatewaySnatRulesRequest, \ + ListNatGatewaysRequest +from huaweicloudsdkvpc.v2 import AcceptVpcPeeringRequest, \ + CreateSecurityGroupOption, \ + CreateSecurityGroupRequest, \ + CreateSecurityGroupRequestBody, CreateSecurityGroupRuleOption, \ + CreateSecurityGroupRuleRequest, \ + CreateSecurityGroupRuleRequestBody, CreateSubnetOption, \ + CreateSubnetRequest, \ + CreateSubnetRequestBody, \ + CreateVpcOption, \ + CreateVpcPeeringOption, CreateVpcPeeringRequest, \ + CreateVpcPeeringRequestBody, CreateVpcRequest, \ + CreateVpcRequestBody, DeleteSecurityGroupRequest, \ + DeleteSecurityGroupRuleRequest, DeleteSubnetRequest, \ + DeleteVpcPeeringRequest, \ + DeleteVpcRequest, ListRouteTablesRequest, \ + ListSecurityGroupRulesRequest, ListSecurityGroupsRequest, \ + ListSubnetsRequest, \ + ListVpcPeeringsRequest, ListVpcsRequest, RouteTableRoute, ShowVpcRequest, \ + UpdateRouteTableReq, UpdateRoutetableReqBody, UpdateRouteTableRequest, \ + VpcInfo + +from cloudtik.core._private.cli_logger import cf, cli_logger +from cloudtik.core._private.utils import check_cidr_conflict, \ + is_managed_cloud_storage, \ + is_peering_firewall_allow_ssh_only, \ + is_peering_firewall_allow_working_subnet, is_use_peering_vpc, \ + is_use_working_vpc +from cloudtik.core.workspace_provider import Existence +from cloudtik.providers._private.huaweicloud.utils import make_ecs_client, \ + make_eip_client, make_nat_client, make_vpc_client + +logger = logging.getLogger(__name__) + +HWC_WORKSPACE_NUM_CREATION_STEPS = 6 +HWC_WORKSPACE_NUM_DELETION_STEPS = 6 +HWC_WORKSPACE_TARGET_RESOURCES = 5 +HWC_RESOURCE_NAME_PREFIX = 'cloudtik-{}' +HWC_WORKSPACE_VPC_NAME = HWC_RESOURCE_NAME_PREFIX + '-vpc' +HWC_WORKSPACE_DEFAULT_CIDR_PREFIX = '192.168.' +HWC_WORKSPACE_VPC_DEFAULT_CIDR = HWC_WORKSPACE_DEFAULT_CIDR_PREFIX + '0.0/16' +HWC_WORKSPACE_SUBNET_NAME = HWC_RESOURCE_NAME_PREFIX + '-{}-subnet' +HWC_WORKSPACE_NAT_NAME = HWC_RESOURCE_NAME_PREFIX + '-nat' +HWC_WORKSPACE_SG_NAME = HWC_RESOURCE_NAME_PREFIX + '-sg' +HWC_WORKSPACE_VPC_PEERING_NAME = HWC_RESOURCE_NAME_PREFIX + '-peering' +HWC_WORKSPACE_EIP_BANDWIDTH_NAME = HWC_RESOURCE_NAME_PREFIX + '-bandwidth' +HWC_WORKSPACE_VPC_SUBNETS_COUNT = 2 +HWC_VM_METADATA_URL = 'http://169.254.169.254/latest/metadata/' + + +def create_huaweicloud_workspace(config): + workspace_name = config['workspace_name'] + # TODO(ChenRui): implement managed cloud storage. + managed_cloud_storage = is_managed_cloud_storage(config) + use_peering_vpc = is_use_peering_vpc(config) + use_working_vpc = is_use_working_vpc(config) + + current_step = 1 + total_steps = HWC_WORKSPACE_NUM_CREATION_STEPS + + try: + with cli_logger.group("Creating workspace: {}", + cf.bold(workspace_name)): + # Step1: create vpc + with cli_logger.group("Creating VPC", + _numbered=("[]", current_step, total_steps)): + current_step += 1 + vpc_client = make_vpc_client(config) + if use_working_vpc: + vpc = _get_current_vpc(config, vpc_client) + cli_logger.print("Use working workspace VPC: {}...", + vpc.name) + else: + vpc = _check_and_create_vpc(vpc_client, workspace_name) + cli_logger.print("Successfully created workspace VPC: {}.", + vpc.name) + # Step2: create subnet + with cli_logger.group("Creating subnets", + _numbered=("[]", current_step, total_steps)): + current_step += 1 + subnets = _check_and_create_subnets(vpc, vpc_client, + workspace_name) + # Step3: create NAT and SNAT rules + with cli_logger.group("Creating NAT Gateway for subnets", + _numbered=("[]", current_step, total_steps)): + current_step += 1 + _check_and_create_nat(config, subnets, vpc, workspace_name) + # Step4: create security group + with cli_logger.group("Creating security group", + _numbered=("[]", current_step, total_steps)): + current_step += 1 + _check_and_create_security_group(config, vpc, vpc_client, + workspace_name) + # Step5: create peering VPC + with cli_logger.group("Creating VPC peering connection", + _numbered=("[]", current_step, total_steps)): + current_step += 1 + if use_peering_vpc: + _create_and_accept_vpc_peering(config, vpc, vpc_client, + workspace_name) + # TODO(ChenRui): implement managed cloud storage + # Step6: create OBS(Object Storage Service) bucket + pass + except Exception as e: + cli_logger.error("Failed to create workspace with the name {} at step" + "{}. \n{}", workspace_name, current_step, str(e)) + raise e + + cli_logger.print("Successfully created workspace: {}.", + cf.bold(workspace_name)) + + +def _create_and_accept_vpc_peering(config, _workspace_vpc, vpc_client, + workspace_name): + current_step = 1 + total_steps = 3 + with cli_logger.group("Creating VPC peering connection", + _numbered=("()", current_step, total_steps)): + current_step += 1 + _current_vpc = _get_current_vpc(config, vpc_client) + _vpc_peering_name = HWC_WORKSPACE_VPC_PEERING_NAME.format( + workspace_name) + vpc_peering = vpc_client.create_vpc_peering( + CreateVpcPeeringRequest( + CreateVpcPeeringRequestBody( + CreateVpcPeeringOption(name=_vpc_peering_name, + request_vpc_info=VpcInfo( + _current_vpc.id), + accept_vpc_info=VpcInfo( + _workspace_vpc.id)))) + ).peering + + with cli_logger.group("Accepting VPC peering connection", + _numbered=("()", current_step, total_steps)): + current_step += 1 + # If VPC peering is built between different tenants, need to accept. + if _current_vpc.tenant_id != _workspace_vpc.tenant_id: + vpc_peering_status = vpc_client.accept_vpc_peering( + AcceptVpcPeeringRequest(vpc_peering.id)).status + cli_logger.print( + "VPC peering {} status is {}".format(vpc_peering.id, + vpc_peering_status)) + + with cli_logger.group("Updating route table for peering connection", + _numbered=("()", current_step, total_steps)): + current_step += 1 + _current_vpc_rts = vpc_client.list_route_tables( + ListRouteTablesRequest(vpc_id=_current_vpc.id) + ).routetables + _workspace_vpc_rts = vpc_client.list_route_tables( + ListRouteTablesRequest(vpc_id=_workspace_vpc.id) + ).routetables + for _current_vpc_rt in _current_vpc_rts: + vpc_client.update_route_table( + UpdateRouteTableRequest( + routetable_id=_current_vpc_rt.id, + body=UpdateRoutetableReqBody( + UpdateRouteTableReq(routes={ + 'add': [RouteTableRoute( + type='peering', + destination=_workspace_vpc.cidr, + nexthop=vpc_peering.id)]} + ) + ) + ) + ) + cli_logger.print("Successfully add route destination to current " + "VPC route table {} with workspace VPC CIDR " + "block.".format(_current_vpc_rt.id)) + + for _workspace_vpc_rt in _workspace_vpc_rts: + vpc_client.update_route_table( + UpdateRouteTableRequest( + routetable_id=_workspace_vpc_rt.id, + body=UpdateRoutetableReqBody( + UpdateRouteTableReq(routes={ + 'add': [RouteTableRoute( + type='peering', + destination=_current_vpc.cidr, + nexthop=vpc_peering.id)]} + ) + ) + ) + ) + cli_logger.print("Successfully add route destination to " + "workspace VPC route table {} with current " + "VPC CIDR block.".format(_workspace_vpc_rt.id)) + + +def _check_and_create_security_group(config, vpc, vpc_client, workspace_name): + # Create security group + sg_name = HWC_WORKSPACE_SG_NAME.format(workspace_name) + sg = vpc_client.create_security_group( + CreateSecurityGroupRequest( + CreateSecurityGroupRequestBody( + CreateSecurityGroupOption(name=sg_name))) + ).security_group + + # Create sg rules in config + _update_security_group_rules(config, sg, vpc, vpc_client) + + +def _update_security_group_rules(config, sg, vpc, vpc_client): + # Clean old rule if exist + _clean_security_group_rules(sg, vpc_client) + # Add new rules + extended_rules = config["provider"].get("security_group", {}) \ + .get("IpPermissions", []) + for _ext_rule in extended_rules: + vpc_client.create_security_group_rule( + CreateSecurityGroupRuleRequest( + CreateSecurityGroupRuleRequestBody( + CreateSecurityGroupRuleOption(sg.id, + direction='ingress', + remote_ip_prefix=_ext_rule))) + ) + # Create SSH rule + vpc_client.create_security_group_rule( + CreateSecurityGroupRuleRequest( + CreateSecurityGroupRuleRequestBody( + CreateSecurityGroupRuleOption(sg.id, + direction='ingress', + port_range_min=22, + port_range_max=22, + protocol='tcp', + remote_ip_prefix=vpc.cidr))) + ) + # Create peering vpc rule + if is_use_peering_vpc(config) and \ + is_peering_firewall_allow_working_subnet(config): + allow_ssh_only = is_peering_firewall_allow_ssh_only(config) + _current_vpc = _get_current_vpc(config, vpc_client) + _vpc_cidr = _current_vpc.cidr + _port_min = 22 if allow_ssh_only else 1 + _port_max = 22 if allow_ssh_only else 65535 + vpc_client.create_security_group_rule( + CreateSecurityGroupRuleRequest( + CreateSecurityGroupRuleRequestBody( + CreateSecurityGroupRuleOption(sg.id, + direction='ingress', + port_range_min=_port_min, + port_range_max=_port_max, + protocol='tcp', + remote_ip_prefix=_vpc_cidr))) + ) + + +def _clean_security_group_rules(sg, vpc_client): + rule_list = vpc_client.list_security_group_rules( + ListSecurityGroupRulesRequest(security_group_id=sg.id) + ).security_group_rules + for _rule in rule_list: + vpc_client.delete_security_group_rule( + DeleteSecurityGroupRuleRequest(_rule.id)) + + +def _check_and_create_nat(config, subnets, vpc, workspace_name): + current_step = 1 + total_steps = 3 + with cli_logger.group("Creating NAT Gateway", + _numbered=("()", current_step, total_steps)): + current_step += 1 + nat_client = make_nat_client(config) + nat_name = HWC_WORKSPACE_NAT_NAME.format(workspace_name) + pub_net = subnets[0].id + nat_gateway = nat_client.create_nat_gateway( + CreateNatGatewayRequest( + CreateNatGatewayRequestBody( + CreateNatGatewayOption(name=nat_name, + router_id=vpc.id, + internal_network_id=pub_net, + spec='1'))) + ).nat_gateway + with cli_logger.group("Creating NAT EIP", + _numbered=("()", current_step, total_steps)): + current_step += 1 + # Create EIP + _bw_name = HWC_WORKSPACE_EIP_BANDWIDTH_NAME.format(workspace_name) + eip_client = make_eip_client(config) + eip = eip_client.create_publicip( + CreatePublicipRequest( + CreatePublicipRequestBody( + # Dedicated bandwidth 5 Mbit + bandwidth=CreatePublicipBandwidthOption(name=_bw_name, + share_type='PER', + size=5), + publicip=CreatePublicipOption(type='5_bgp'))) + ).publicip + cli_logger.print("Successfully created workspace EIP: {}.", + eip.public_ip_address) + with cli_logger.group("Creating SNAT Rules", + _numbered=("()", current_step, total_steps)): + current_step += 1 + # Create SNAT rule for public and private subnets + for _subnet in subnets: + nat_client.create_nat_gateway_snat_rule( + CreateNatGatewaySnatRuleRequest( + CreateNatGatewaySnatRuleRequestOption( + CreateNatGatewaySnatRuleOption( + nat_gateway_id=nat_gateway.id, + network_id=_subnet.id, + floating_ip_id=eip.id))) + ) + + return nat_gateway + + +def _get_available_subnet_cidr(vpc, vpc_client, workspace_name): + cidr_list = [] + current_vpc_subnets = vpc_client.list_subnets( + ListSubnetsRequest(vpc_id=vpc.id)).subnets + current_subnet_cidr = [_subnet.cidr for _subnet in + current_vpc_subnets] + vpc_cidr_block = vpc.cidr + ip_range = vpc_cidr_block.split('/')[0].split('.') + for i in range(0, 256): + tmp_cidr_block = '{}.{}.{}.0/24'.format(ip_range[0], + ip_range[1], + i) + if check_cidr_conflict(tmp_cidr_block, + current_subnet_cidr): + cidr_list.append(tmp_cidr_block) + if len(cidr_list) >= HWC_WORKSPACE_VPC_SUBNETS_COUNT: + break + if len(cidr_list) < HWC_WORKSPACE_VPC_SUBNETS_COUNT: + raise RuntimeError( + "No enough available subnets in VPC {} " + "for workspace {}".format(vpc.name, workspace_name) + ) + return cidr_list + + +def _check_and_create_vpc(vpc_client, workspace_name): + # Check vpc name + vpc_name = HWC_WORKSPACE_VPC_NAME.format(workspace_name) + response = vpc_client.list_vpcs(ListVpcsRequest()) + for _vpc in response.vpcs: + if _vpc.name == vpc_name: + raise RuntimeError("There is a same name VPC for workspace: {}, " + "if you want to create a new workspace with " + "the same name, you need to execute workspace " + "delete first!".format(workspace_name)) + # Create new vpc + default_cidr = HWC_WORKSPACE_VPC_DEFAULT_CIDR + request = CreateVpcRequest( + CreateVpcRequestBody( + vpc=CreateVpcOption(name=vpc_name, + cidr=default_cidr))) + vpc = vpc_client.create_vpc(request).vpc + return vpc + + +def _check_and_create_subnets(vpc, vpc_client, workspace_name): + subnets = [] + subnet_cidr_list = _get_available_subnet_cidr(vpc, vpc_client, + workspace_name) + for i, _cidr in enumerate(subnet_cidr_list, start=1): + subnet_type = 'public' if i == 1 else 'private' + subnet_name = HWC_WORKSPACE_SUBNET_NAME.format( + workspace_name, subnet_type) + _gateway_ip = _cidr.replace('.0/24', '.1') + with cli_logger.group("Creating {} subnet", subnet_type, + _numbered=("()", i, + len(subnet_cidr_list))): + try: + _subnet = vpc_client.create_subnet( + CreateSubnetRequest( + CreateSubnetRequestBody( + CreateSubnetOption(name=subnet_name, + cidr=_cidr, + gateway_ip=_gateway_ip, + vpc_id=vpc.id))) + ).subnet + except Exception as e: + cli_logger.error("Failed to create {} subnet. {}", + subnet_type, str(e)) + raise e + subnets.append(_subnet) + return subnets + + +def _get_workspace_vpc(config, vpc_client=None): + vpc_client = vpc_client or make_vpc_client(config) + _vpcs = vpc_client.list_vpcs(ListVpcsRequest()).vpcs + _workspace_vpc_name = HWC_WORKSPACE_VPC_NAME.format( + config['workspace_name']) + for _vpc in _vpcs: + if _workspace_vpc_name == _vpc.name: + workspace_vpc = _vpc + break + else: + workspace_vpc = None + return workspace_vpc + + +def _get_current_vpc(config, vpc_client=None): + vm_loca_ip_url = HWC_VM_METADATA_URL + 'local-ipv4' + response = requests.get(vm_loca_ip_url) + vm_local_ip = response.text + ecs_client = make_ecs_client(config) + response = ecs_client.list_servers_details( + ListServersDetailsRequest(ip=vm_local_ip)) + if response.servers: + vpc_id = response.servers[0].metadata['vpc_id'] + vpc_client = vpc_client or make_vpc_client(config) + vpc = vpc_client.show_vpc(ShowVpcRequest(vpc_id=vpc_id)).vpc + else: + raise RuntimeError("Failed to get the VPC for the current machine. " + "Please make sure your current machine is" + "a HUAWEICLOUD virtual machine.") + return vpc + + +def delete_huaweicloud_workspace(config, delete_managed_storage): + workspace_name = config['workspace_name'] + # TODO(ChenRui): implement managed cloud storage. + managed_cloud_storage = is_managed_cloud_storage(config) + use_peering_vpc = is_use_peering_vpc(config) + use_working_vpc = is_use_working_vpc(config) + + current_step = 1 + total_steps = HWC_WORKSPACE_NUM_DELETION_STEPS + try: + with cli_logger.group("Deleting workspace: {}", + cf.bold(workspace_name)): + # Step1: delete peering vpc connection + with cli_logger.group("Deleting peering VPC connection", + _numbered=("[]", current_step, total_steps)): + current_step += 1 + vpc_client = make_vpc_client(config) + if use_peering_vpc: + _check_and_delete_vpc_peering_connection(config, + vpc_client, + workspace_name) + + # Step2: delete security group + with cli_logger.group("Deleting Security group", + _numbered=("[]", current_step, total_steps)): + current_step += 1 + _check_and_delete_security_group(config, vpc_client, + workspace_name) + + # Step3: delete NAT and SNAT rules + with cli_logger.group("Deleting NAT, SNAT rules and EIP", + _numbered=("[]", current_step, total_steps)): + current_step += 1 + _check_and_delete_nat_gateway(config, workspace_name) + + # Step4: delete subnets + with cli_logger.group("Deleting private and public subnets", + _numbered=("[]", current_step, total_steps)): + current_step += 1 + _workspace_vpc = _check_and_delete_subnets(config, vpc_client, + workspace_name) + # Step5: delete VPC + with cli_logger.group("Deleting VPC", + _numbered=("[]", current_step, total_steps)): + current_step += 1 + _check_and_delete_vpc(config, use_working_vpc, vpc_client, + _workspace_vpc) + + # TODO(ChenRui): implement managed cloud storage + # Step6: delete OBS(Object Storage Service) bucket + pass + except Exception as e: + cli_logger.error("Failed to delete workspace with the name {} at step" + "{}. \n{}", workspace_name, current_step, str(e)) + raise e + + cli_logger.print("Successfully deleted workspace: {}.", + cf.bold(workspace_name)) + + +def _check_and_delete_vpc(config, use_working_vpc, vpc_client, _workspace_vpc): + if not use_working_vpc: + vpc_client.delete_vpc( + DeleteVpcRequest(vpc_id=_workspace_vpc.id)) + else: + _current_vpc = _get_current_vpc(config, vpc_client) + cli_logger.print("Skip to delete working VPC {}".format( + _current_vpc.name)) + + +def _check_and_delete_subnets(config, vpc_client, workspace_name): + _workspace_vpc = _get_workspace_vpc(config, vpc_client) + subnets = _get_workspace_vpc_subnets(_workspace_vpc, vpc_client, + workspace_name) + # Delete target subnets + for _subnet in subnets: + vpc_client.delete_subnet( + DeleteSubnetRequest(vpc_id=_workspace_vpc.id, + subnet_id=_subnet.id)) + # Wait until deleting is successful + while subnets: + time.sleep(3) + subnets = vpc_client.list_subnets( + ListSubnetsRequest(vpc_id=_workspace_vpc.id) + ).subnets + return _workspace_vpc + + +def _get_workspace_vpc_subnets(_workspace_vpc, vpc_client, workspace_name, + category=None): + subnets = vpc_client.list_subnets( + ListSubnetsRequest(vpc_id=_workspace_vpc.id) + ).subnets + if category: + _prefix = HWC_WORKSPACE_SUBNET_NAME.format(workspace_name, category) + _suffix = '' + # get all public and private subnets + else: + _prefix = HWC_RESOURCE_NAME_PREFIX.format(workspace_name) + _suffix = '-subnet' + + target_subnets = [] + for _subnet in subnets: + if _subnet.name.startswith(_prefix) and \ + _subnet.name.endswith(_suffix): + target_subnets.append(_subnet) + return target_subnets + + +def _check_and_delete_security_group(config, vpc_client, workspace_name): + workspace_vpc = _get_workspace_vpc(config, vpc_client) + if workspace_vpc: + target_sgs = _get_workspace_security_group(vpc_client, workspace_name) + for _sg in target_sgs: + vpc_client.delete_security_group( + DeleteSecurityGroupRequest(_sg.id)) + + +def _get_workspace_security_group(vpc_client, workspace_name): + _sgs = vpc_client.list_security_groups( + ListSecurityGroupsRequest()).security_groups + _sg_name = HWC_WORKSPACE_SG_NAME.format(workspace_name) + target_sgs = [] + for _sg in _sgs: + if _sg_name == _sg.name: + target_sgs.append(_sg) + return target_sgs + + +def _check_and_delete_eip(eip_client, eip_id): + if eip_id: + public_ips = eip_client.list_publicips( + ListPublicipsRequest(id=[eip_id])).publicips + if public_ips: + eip_client.delete_publicip( + DeletePublicipRequest(publicip_id=eip_id)) + else: + cli_logger.print("Can't fine EIP {}".format(eip_id)) + else: + cli_logger.print("No EIP {}") + + +def _check_and_delete_vpc_peering_connection(config, vpc_client, + workspace_name): + peerings = _get_vpc_peering_conn(vpc_client, workspace_name) + for _peering_conn in peerings: + vpc_client.delete_vpc_peering( + DeleteVpcPeeringRequest( + peering_id=_peering_conn.id + ) + ) + cli_logger.print( + "Delete peering VPC connection {}".format( + _peering_conn.name)) + else: + cli_logger.print("Can't find VPC peering connection") + + # Delete route table rule from current VPC to workspace VPC + _current_vpc = _get_current_vpc(config, vpc_client) + _workspace_vpc = _get_workspace_vpc(config, vpc_client) + if _workspace_vpc and _current_vpc: + # Delete route table from current vpc to workspace vpc + _current_vpc_rts = vpc_client.list_route_tables( + ListRouteTablesRequest(vpc_id=_current_vpc.id) + ).routetables + for _current_vpc_rt in _current_vpc_rts: + vpc_client.update_route_table( + UpdateRouteTableRequest( + routetable_id=_current_vpc_rt.id, + body=UpdateRoutetableReqBody( + UpdateRouteTableReq(routes={ + 'del': [RouteTableRoute( + destination=_workspace_vpc.cidr)]} + ) + ) + ) + ) + # Delete route table from workspace vpc to current vpc + _workspace_vpc_rts = vpc_client.list_route_tables( + ListRouteTablesRequest(vpc_id=_workspace_vpc.id) + ).routetables + for _workspace_vpc_rt in _workspace_vpc_rts: + vpc_client.update_route_table( + UpdateRouteTableRequest( + routetable_id=_workspace_vpc_rt.id, + body=UpdateRoutetableReqBody( + UpdateRouteTableReq(routes={ + 'del': [RouteTableRoute( + destination=_current_vpc.cidr)]} + ) + ) + ) + ) + + +def _get_vpc_peering_conn(vpc_client, workspace_name): + _peering_name = HWC_WORKSPACE_VPC_PEERING_NAME.format( + workspace_name) + peerings = vpc_client.list_vpc_peerings( + ListVpcPeeringsRequest(name=_peering_name) + ).peerings + return peerings + + +def _check_and_delete_nat_gateway(config, workspace_name): + nat_client = make_nat_client(config) + _eip_id = None + nat_gateways = _get_workspace_nat(nat_client, workspace_name) + for _nat_gateway in nat_gateways: + _snat_rules = nat_client.list_nat_gateway_snat_rules( + ListNatGatewaySnatRulesRequest( + nat_gateway_id=[_nat_gateway.id]) + ).snat_rules + # Delete SNAT rules + for _rule in _snat_rules: + if _rule.floating_ip_id: + _eip_id = _rule.floating_ip_id + nat_client.delete_nat_gateway_snat_rule( + DeleteNatGatewaySnatRuleRequest( + nat_gateway_id=_nat_gateway.id, + snat_rule_id=_rule.id) + ) + # Delete NAT + nat_client.delete_nat_gateway(DeleteNatGatewayRequest( + nat_gateway_id=_nat_gateway.id)) + # Delete EIP + if _eip_id: + eip_client = make_eip_client(config) + _check_and_delete_eip(eip_client, eip_id=_eip_id) + else: + cli_logger.print("Can't find workspace NAT gateway") + + +def _get_workspace_nat(nat_client, workspace_name): + _nat_name = HWC_WORKSPACE_NAT_NAME.format(workspace_name) + nat_gateways = nat_client.list_nat_gateways( + ListNatGatewaysRequest(name=_nat_name) + ).nat_gateways + return nat_gateways + + +def update_huaweicloud_workspace_firewalls(config): + vpc_client = make_vpc_client(config) + workspace_name = config["workspace_name"] + workspace_vpc = _get_workspace_vpc(config, vpc_client) + if not workspace_vpc: + cli_logger.print("The workspace: {} doesn't exist!".format( + workspace_name)) + return + current_step = 1 + total_steps = 1 + try: + + with cli_logger.group("Updating workspace firewalls", + _numbered=("[]", current_step, total_steps)): + current_step += 1 + _sgs = vpc_client.list_security_groups( + ListSecurityGroupsRequest()).security_groups + _sg_name = HWC_WORKSPACE_SG_NAME.format(workspace_name) + for _sg in _sgs: + if _sg.name == _sg_name: + _update_security_group_rules(config, _sg, workspace_vpc, + vpc_client) + except Exception as e: + cli_logger.error("Failed to update the firewalls of workspace {}. {}", + workspace_name, str(e)) + raise e + + cli_logger.print("Successfully updated the firewalls of workspace: {}.", + cf.bold(workspace_name)) + + +def get_huaweicloud_workspace_info(config): + # TODO(ChenRui): implement managed cloud storage + return None + + +def check_huaweicloud_workspace_existence(config): + workspace_name = config["workspace_name"] + managed_cloud_storage = is_managed_cloud_storage(config) + use_peering_vpc = is_use_peering_vpc(config) + use_working_vpc = is_use_working_vpc(config) + + existing_resources = 0 + target_resources = HWC_WORKSPACE_TARGET_RESOURCES + if managed_cloud_storage: + target_resources += 1 + if use_peering_vpc: + target_resources += 1 + + vpc_client = make_vpc_client(config) + if use_working_vpc: + workspace_vpc = _get_current_vpc(config, vpc_client) + else: + workspace_vpc = _get_workspace_vpc(config, vpc_client) + # workspace VPC check + if workspace_vpc: + existing_resources += 1 + + # private subnets check + _private_subnets_count = len( + _get_workspace_vpc_subnets(workspace_vpc, vpc_client, + workspace_name, + 'private') + ) + if _private_subnets_count >= HWC_WORKSPACE_VPC_SUBNETS_COUNT - 1: + existing_resources += 1 + + # public subnet check + _public_subnets_count = len( + _get_workspace_vpc_subnets(workspace_vpc, vpc_client, + workspace_name, + 'public') + ) + if _public_subnets_count >= 0: + existing_resources += 1 + + # NAT gateway check + nat_client = make_nat_client(config) + if len(_get_workspace_nat(nat_client, workspace_name)) > 0: + existing_resources += 1 + # Security group check + if len(_get_workspace_security_group(vpc_client, workspace_name)) > 0: + existing_resources += 1 + # VPC peering connection check + if use_peering_vpc: + if len(_get_vpc_peering_conn(vpc_client, workspace_name)) > 0: + existing_resources += 1 + + # Managed cloud storage + cloud_storage_existence = False + if managed_cloud_storage: + # TODO(ChenRui): implement managed cloud storage + existing_resources += 1 + cloud_storage_existence = True + + if existing_resources == 0: + return Existence.NOT_EXIST + elif existing_resources == target_resources: + return Existence.COMPLETED + else: + if existing_resources == 1 and cloud_storage_existence: + return Existence.STORAGE_ONLY + return Existence.IN_COMPLETED + + +def list_huaweicloud_clusters(config): + # TODO(ChenRui): implement node provider + return None + + +def bootstrap_huaweicloud_workspace(config): + # create a copy of the input config to modify + config = copy.deepcopy(config) + _configure_allowed_ssh_sources(config) + return config + + +def _configure_allowed_ssh_sources(config): + provider_config = config["provider"] + if "allowed_ssh_sources" not in provider_config: + return + + allowed_ssh_sources = provider_config["allowed_ssh_sources"] + if len(allowed_ssh_sources) == 0: + return + + if "security_group" not in provider_config: + provider_config["security_group"] = {} + security_group_config = provider_config["security_group"] + + if "IpPermissions" not in security_group_config: + security_group_config["IpPermissions"] = [] + ip_permissions = security_group_config["IpPermissions"] + ip_permission = { + "IpProtocol": "tcp", + "FromPort": 22, + "ToPort": 22, + "IpRanges": [{"CidrIp": allowed_ssh_source} for allowed_ssh_source in + allowed_ssh_sources] + } + ip_permissions.append(ip_permission) diff --git a/python/cloudtik/providers/_private/huaweicloud/node_provider.py b/python/cloudtik/providers/_private/huaweicloud/node_provider.py new file mode 100644 index 000000000..daaaa6182 --- /dev/null +++ b/python/cloudtik/providers/_private/huaweicloud/node_provider.py @@ -0,0 +1,5 @@ +from cloudtik.core.node_provider import NodeProvider + + +class HUAWEICLOUDNodeProvider(NodeProvider): + pass diff --git a/python/cloudtik/providers/_private/huaweicloud/utils.py b/python/cloudtik/providers/_private/huaweicloud/utils.py new file mode 100644 index 000000000..1b9c9f86d --- /dev/null +++ b/python/cloudtik/providers/_private/huaweicloud/utils.py @@ -0,0 +1,96 @@ +import logging +from functools import lru_cache +from typing import Any, Dict + +from huaweicloudsdkcore.auth.credentials import BasicCredentials +from huaweicloudsdkcore.http.http_config import HttpConfig +from huaweicloudsdkecs.v2 import EcsClient +from huaweicloudsdkecs.v2.region.ecs_region import EcsRegion +from huaweicloudsdkeip.v2 import EipClient +from huaweicloudsdkeip.v2.region.eip_region import EipRegion +from huaweicloudsdknat.v2 import NatClient +from huaweicloudsdknat.v2.region.nat_region import NatRegion +from huaweicloudsdkvpc.v2 import VpcClient +from huaweicloudsdkvpc.v2.region.vpc_region import VpcRegion + +from cloudtik.core._private.constants import env_bool + +logger = logging.getLogger(__name__) + + +@lru_cache() +def _client_cache(ak: str, sk: str, region: str) -> Dict[str, Any]: + client_map = {} + credentials = BasicCredentials(ak, sk) + + # Get proxy setting, if $HWC_IGNORE_SSL_VERIFICATION is true explicitly, + # ignore checking, in other case enable SSL verifying. + http_config = HttpConfig.get_default_config() + http_config.ignore_ssl_verification = env_bool( + 'HWC_IGNORE_SSL_VERIFICATION', False) + + ecs_client = EcsClient.new_builder() \ + .with_http_config(http_config) \ + .with_credentials(credentials) \ + .with_region(EcsRegion.value_of(region)) \ + .build() + client_map['ecs'] = ecs_client + + vpc_client = VpcClient.new_builder() \ + .with_http_config(http_config) \ + .with_credentials(credentials) \ + .with_region(VpcRegion.value_of(region)) \ + .build() + client_map['vpc'] = vpc_client + + nat_client = NatClient.new_builder() \ + .with_http_config(http_config) \ + .with_credentials(credentials) \ + .with_region(NatRegion.value_of(region)) \ + .build() + client_map['nat'] = nat_client + + eip_client = EipClient.new_builder() \ + .with_http_config(http_config) \ + .with_credentials(credentials) \ + .with_region(EipRegion.value_of(region)) \ + .build() + client_map['eip'] = eip_client + + return client_map + + +def make_ecs_client(config: Dict[str, Any]) -> Any: + config_provider = config['provider'] + credentials = config_provider['huaweicloud_credentials'] + ak = credentials['huaweicloud_access_key'] + sk = credentials['huaweicloud_secret_key'] + region = config_provider['region'] + return _client_cache(ak, sk, region)['ecs'] + + +def make_vpc_client(config: Dict[str, Any]) -> Any: + config_provider = config['provider'] + credentials = config_provider['huaweicloud_credentials'] + ak = credentials['huaweicloud_access_key'] + sk = credentials['huaweicloud_secret_key'] + region = config_provider['region'] + return _client_cache(ak, sk, region)['vpc'] + + +def make_nat_client(config: Dict[str, Any]) -> Any: + config_provider = config['provider'] + credentials = config_provider['huaweicloud_credentials'] + ak = credentials['huaweicloud_access_key'] + sk = credentials['huaweicloud_secret_key'] + region = config_provider['region'] + return _client_cache(ak, sk, region)['nat'] + + +def make_eip_client(config: Dict[str, Any]) -> Any: + config_provider = config['provider'] + credentials = config_provider['huaweicloud_credentials'] + ak = credentials['huaweicloud_access_key'] + sk = credentials['huaweicloud_secret_key'] + region = config_provider['region'] + return _client_cache(ak, sk, region)['eip'] diff --git a/python/cloudtik/providers/_private/huaweicloud/workspace_provider.py b/python/cloudtik/providers/_private/huaweicloud/workspace_provider.py new file mode 100644 index 000000000..e001d7021 --- /dev/null +++ b/python/cloudtik/providers/_private/huaweicloud/workspace_provider.py @@ -0,0 +1,66 @@ +from typing import Any, Dict, Optional + +from cloudtik.core._private.utils import check_workspace_name_format +from cloudtik.core.workspace_provider import Existence, WorkspaceProvider +from cloudtik.providers._private.huaweicloud.config import \ + bootstrap_huaweicloud_workspace, check_huaweicloud_workspace_existence, \ + create_huaweicloud_workspace, \ + delete_huaweicloud_workspace, get_huaweicloud_workspace_info, \ + list_huaweicloud_clusters, update_huaweicloud_workspace_firewalls + +HUAWEICLOUD_WORKSPACE_NAME_MAX_LEN = 32 + + +class HUAWEICLOUDWorkspaceProvider(WorkspaceProvider): + + def __init__(self, provider_config: Dict[str, Any], + workspace_name: str) -> None: + WorkspaceProvider.__init__(self, provider_config, workspace_name) + + def create_workspace(self, config: Dict[str, Any]): + create_huaweicloud_workspace(config) + + def delete_workspace(self, config: Dict[str, Any], + delete_managed_storage: bool = False): + delete_huaweicloud_workspace(config, delete_managed_storage) + + def update_workspace_firewalls(self, config: Dict[str, Any]): + update_huaweicloud_workspace_firewalls(config) + + def check_workspace_existence(self, config: Dict[str, Any]) -> Existence: + return check_huaweicloud_workspace_existence(config) + + def check_workspace_integrity(self, config: Dict[str, Any]) -> bool: + existence = check_huaweicloud_workspace_existence(config) + return True if existence == Existence.COMPLETED else False + + def list_clusters(self, config: Dict[str, Any]) -> Optional[ + Dict[str, Any]]: + return list_huaweicloud_clusters(config) + + def publish_global_variables(self, cluster_config: Dict[str, Any], + global_variables: Dict[str, Any]): + # TODO(ChenRui): implement node provider + pass + + def subscribe_global_variables(self, cluster_config: Dict[str, Any]): + # TODO(ChenRui): implement node provider + pass + + def validate_config(self, provider_config: Dict[str, Any]): + if len(self.workspace_name) > HUAWEICLOUD_WORKSPACE_NAME_MAX_LEN or \ + not check_workspace_name_format(self.workspace_name): + raise RuntimeError( + "{} workspace name is between 1 and {} characters, " + "and can only contain lowercase alphanumeric " + "characters and dashes".format( + provider_config["type"], + HUAWEICLOUD_WORKSPACE_NAME_MAX_LEN) + ) + + def get_workspace_info(self, config: Dict[str, Any]): + get_huaweicloud_workspace_info(config) + + @staticmethod + def bootstrap_workspace_config(config: Dict[str, Any]) -> Dict[str, Any]: + return bootstrap_huaweicloud_workspace(config) diff --git a/python/cloudtik/providers/huaweicloud/__init__.py b/python/cloudtik/providers/huaweicloud/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/cloudtik/providers/huaweicloud/commands.yaml b/python/cloudtik/providers/huaweicloud/commands.yaml new file mode 100644 index 000000000..48e79d53f --- /dev/null +++ b/python/cloudtik/providers/huaweicloud/commands.yaml @@ -0,0 +1,2 @@ +# Include the common built-in commands +from: commands diff --git a/python/cloudtik/providers/huaweicloud/defaults.yaml b/python/cloudtik/providers/huaweicloud/defaults.yaml new file mode 100644 index 000000000..81da31a46 --- /dev/null +++ b/python/cloudtik/providers/huaweicloud/defaults.yaml @@ -0,0 +1,45 @@ +# Include the common defaults +from: defaults + +# Cloud-provider specific configuration. +provider: + type: huaweicloud + region: cn-east-2 + # Whether to allow node reuse. If set to False, nodes will be terminated + # instead of stopped. + cache_stopped_nodes: False # If not present, the default is False. + # Whether to use managed cloud storage of workspace. + use_managed_cloud_storage: False + +# How we will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu +# By default, we create a new private keypair, but you can also use your own. +# If you do so, make sure to also set "KeyName" in the head and worker node +# configurations below. +# ssh_private_key: /path/to/your/key.pem + +# Tell the cluster scaler the allowed node types and the resources they provide. +# The key is the name of the node type, which is just for debugging purposes. +# The node config specifies the launch config and physical instance type. +available_node_types: + head.default: + # The node type's CPU and GPU resources are auto-detected based on HUAWEICLOUD instance type. + # If desired, you can override the autodetected CPU and GPU resources advertised to the cluster scaler. + resources: {} + # Provider-specific config for this node type, e.g. instance type. + node_config: + InstanceType: kc1.xlarge.4 + worker.default: + # The minimum number of nodes of this type to launch. + # This number should be >= 0. + min_workers: 1 + # The node type's CPU and GPU resources are auto-detected based on HUAWEICLOUD instance type. + # If desired, you can override the autodetected CPU and GPU resources advertised to the cluster scaler. + resources: {} + # Provider-specific config for this node type, e.g. instance type. + node_config: + InstanceType: kc1.xlarge.4 + +# Specify the node type of the head node (as configured above). +head_node_type: head.default diff --git a/python/cloudtik/providers/huaweicloud/workspace-defaults.yaml b/python/cloudtik/providers/huaweicloud/workspace-defaults.yaml new file mode 100644 index 000000000..9b2671003 --- /dev/null +++ b/python/cloudtik/providers/huaweicloud/workspace-defaults.yaml @@ -0,0 +1,13 @@ +# Include the common workspace defaults +from: workspace-defaults + +# Cloud-provider specific configuration. +provider: + type: huaweicloud + region: cn-east-2 + # Decide whether to require public IP for head node. + # When setting to False, Head node will require a public IP. + # Default to False + use_internal_ips: False + # Whether to create managed cloud storage of workspace. + managed_cloud_storage: False diff --git a/python/setup.py b/python/setup.py index 4800b228b..0acfd5241 100644 --- a/python/setup.py +++ b/python/setup.py @@ -85,6 +85,12 @@ def get_packages(self): "urllib3", "kopf", ], + "huaweicloud": [ + "huaweicloudsdkecs == 3.1.24", + "huaweicloudsdkvpc == 3.1.24", + "huaweicloudsdknat == 3.1.24", + "huaweicloudsdkeip == 3.1.24", + ], } setup_spec.extras["all"] = list( diff --git a/requirements.txt b/requirements.txt index c330ca4fc..41310c949 100644 --- a/requirements.txt +++ b/requirements.txt @@ -40,4 +40,9 @@ google-cloud-storage == 2.3.0 gcsfs == 2022.10.0 ## for kubernetes kubernetes -urllib3 \ No newline at end of file +urllib3 +## for huaweicloud +huaweicloudsdkecs == 3.1.24 +huaweicloudsdkvpc == 3.1.24 +huaweicloudsdknat == 3.1.24 +huaweicloudsdkeip == 3.1.24