diff --git a/ovn-fake-multinode-utils/translate_yaml.py b/ovn-fake-multinode-utils/translate_yaml.py index 56b0b312..a1fcdfa2 100755 --- a/ovn-fake-multinode-utils/translate_yaml.py +++ b/ovn-fake-multinode-utils/translate_yaml.py @@ -26,28 +26,6 @@ class GlobalConfig: cms_name: str = '' -def calculate_node_remotes( - node_net: str, clustered_db: bool, n_relays: int, enable_ssl: bool -) -> str: - net = netaddr.IPNetwork(node_net) - - ip_gen = net.iter_hosts() - # The first IP is assigned to the tester, skip it. - next(ip_gen) - if n_relays > 0: - skip = 3 if clustered_db else 1 - for _ in range(0, skip): - next(ip_gen) - ip_range = range(0, n_relays) - else: - ip_range = range(0, 3 if clustered_db else 1) - if enable_ssl: - remotes = ["ssl:" + str(next(ip_gen)) + ":6642" for _ in ip_range] - else: - remotes = ["tcp:" + str(next(ip_gen)) + ":6642" for _ in ip_range] - return ','.join(remotes) - - DEFAULT_N_VIPS = 2 DEFAULT_VIP_PORT = 80 @@ -121,7 +99,6 @@ class ClusterConfig: db_inactivity_probe: int = 60000 node_net: str = "192.16.0.0/16" enable_ssl: bool = True - node_remote: str = None node_timeout_s: int = 20 internal_net: str = "16.0.0.0/16" internal_net6: str = "16::/64" @@ -143,14 +120,6 @@ class ClusterConfig: def __post_init__(self, **kwargs): # Some defaults have to be calculated - if not self.node_remote: - self.node_remote = calculate_node_remotes( - self.node_net, - self.clustered_db, - self.n_relays, - self.enable_ssl, - ) - if self.vips is None: self.vips = calculate_vips(self.vip_subnet) diff --git a/ovn-tester/cms/ovn_kubernetes/ovn_kubernetes.py b/ovn-tester/cms/ovn_kubernetes/ovn_kubernetes.py index db672b55..cf623fd1 100644 --- a/ovn-tester/cms/ovn_kubernetes/ovn_kubernetes.py +++ b/ovn-tester/cms/ovn_kubernetes/ovn_kubernetes.py @@ -1,5 +1,5 @@ from ovn_context import Context -from ovn_workload import WorkerNode, Cluster +from ovn_workload import WorkerNode from ovn_utils import DualStackSubnet @@ -8,30 +8,38 @@ class OVNKubernetes: @staticmethod - def create_nodes(cluster_config, workers): - mgmt_net = cluster_config.node_net - mgmt_ip = mgmt_net.ip + 2 - internal_net = cluster_config.internal_net - external_net = cluster_config.external_net - gw_net = cluster_config.gw_net - worker_nodes = [ - WorkerNode( - workers[i % len(workers)], - f'ovn-scale-{i}', - mgmt_net, - mgmt_ip + i, - DualStackSubnet.next(internal_net, i), - DualStackSubnet.next(external_net, i), - gw_net, - i, - ) - for i in range(cluster_config.n_workers) - ] - return worker_nodes + def add_cluster_worker_nodes(cluster, workers): + cluster_cfg = cluster.cluster_cfg + + # Allocate worker IPs after central and relay IPs. + mgmt_ip = ( + cluster_cfg.node_net.ip + + 2 + + len(cluster.central_nodes) + + len(cluster.relay_nodes) + ) + + protocol = "ssl" if cluster_cfg.enable_ssl else "tcp" + internal_net = cluster_cfg.internal_net + external_net = cluster_cfg.external_net + gw_net = cluster_cfg.gw_net + cluster.add_workers( + [ + WorkerNode( + workers[i % len(workers)], + f'ovn-scale-{i}', + mgmt_ip + i, + protocol, + DualStackSubnet.next(internal_net, i), + DualStackSubnet.next(external_net, i), + gw_net, + i, + ) + for i in range(cluster_cfg.n_workers) + ] + ) @staticmethod - def prepare_test(central_node, worker_nodes, cluster_cfg, brex_cfg): - ovn = Cluster(central_node, worker_nodes, cluster_cfg, brex_cfg) - with Context(ovn, 'prepare_test'): - ovn.start() - return ovn + def prepare_test(cluster): + with Context(cluster, 'prepare_test'): + cluster.start() diff --git a/ovn-tester/cms/ovn_kubernetes/tests/base_cluster_bringup.py b/ovn-tester/cms/ovn_kubernetes/tests/base_cluster_bringup.py index c27d31de..7d6df2d3 100644 --- a/ovn-tester/cms/ovn_kubernetes/tests/base_cluster_bringup.py +++ b/ovn-tester/cms/ovn_kubernetes/tests/base_cluster_bringup.py @@ -7,8 +7,8 @@ class BaseClusterBringup(ExtCmd): - def __init__(self, config, central_node, worker_nodes, global_cfg): - super().__init__(config, central_node, worker_nodes) + def __init__(self, config, cluster, global_cfg): + super().__init__(config, cluster) test_config = config.get('base_cluster_bringup', dict()) self.config = ClusterBringupCfg( n_pods_per_node=test_config.get('n_pods_per_node', 0), diff --git a/ovn-tester/cms/ovn_kubernetes/tests/cluster_density.py b/ovn-tester/cms/ovn_kubernetes/tests/cluster_density.py index d77f879a..e6e312f1 100644 --- a/ovn-tester/cms/ovn_kubernetes/tests/cluster_density.py +++ b/ovn-tester/cms/ovn_kubernetes/tests/cluster_density.py @@ -17,8 +17,8 @@ class ClusterDensity(ExtCmd): - def __init__(self, config, central_node, worker_nodes, global_cfg): - super().__init__(config, central_node, worker_nodes) + def __init__(self, config, cluster, global_cfg): + super().__init__(config, cluster) test_config = config.get('cluster_density', dict()) self.config = ClusterDensityCfg( n_runs=test_config.get('n_runs', 0), diff --git a/ovn-tester/cms/ovn_kubernetes/tests/density_heavy.py b/ovn-tester/cms/ovn_kubernetes/tests/density_heavy.py index 6084c315..0bbab209 100644 --- a/ovn-tester/cms/ovn_kubernetes/tests/density_heavy.py +++ b/ovn-tester/cms/ovn_kubernetes/tests/density_heavy.py @@ -23,8 +23,8 @@ class DensityHeavy(ExtCmd): - def __init__(self, config, central_node, worker_nodes, global_cfg): - super().__init__(config, central_node, worker_nodes) + def __init__(self, config, cluster, global_cfg): + super().__init__(config, cluster) test_config = config.get('density_heavy', dict()) pods_vip_ratio = test_config.get( 'pods_vip_ratio', DENSITY_PODS_VIP_RATIO diff --git a/ovn-tester/cms/ovn_kubernetes/tests/density_light.py b/ovn-tester/cms/ovn_kubernetes/tests/density_light.py index a8400203..c2c160da 100644 --- a/ovn-tester/cms/ovn_kubernetes/tests/density_light.py +++ b/ovn-tester/cms/ovn_kubernetes/tests/density_light.py @@ -10,8 +10,8 @@ class DensityLight(ExtCmd): - def __init__(self, config, central_node, worker_nodes, global_cfg): - super().__init__(config, central_node, worker_nodes) + def __init__(self, config, cluster, global_cfg): + super().__init__(config, cluster) test_config = config.get('density_light', dict()) self.config = DensityCfg( n_pods=test_config.get('n_pods', 0), diff --git a/ovn-tester/cms/ovn_kubernetes/tests/netpol.py b/ovn-tester/cms/ovn_kubernetes/tests/netpol.py index bcb9222f..31e2a118 100644 --- a/ovn-tester/cms/ovn_kubernetes/tests/netpol.py +++ b/ovn-tester/cms/ovn_kubernetes/tests/netpol.py @@ -8,8 +8,8 @@ class NetPol(ExtCmd): - def __init__(self, name, config, central_node, worker_nodes): - super().__init__(config, central_node, worker_nodes) + def __init__(self, name, config, cluster): + super().__init__(config, cluster) test_config = config.get(name, dict()) self.config = NpCfg( n_ns=test_config.get('n_ns', 0), diff --git a/ovn-tester/cms/ovn_kubernetes/tests/netpol_cross_ns.py b/ovn-tester/cms/ovn_kubernetes/tests/netpol_cross_ns.py index 78c9a0ed..c1d7366c 100644 --- a/ovn-tester/cms/ovn_kubernetes/tests/netpol_cross_ns.py +++ b/ovn-tester/cms/ovn_kubernetes/tests/netpol_cross_ns.py @@ -7,8 +7,8 @@ class NetpolCrossNs(ExtCmd): - def __init__(self, config, central_node, worker_nodes, global_cfg): - super().__init__(config, central_node, worker_nodes) + def __init__(self, config, cluster, global_cfg): + super().__init__(config, cluster) test_config = config.get('netpol_cross', dict()) self.config = NpCrossNsCfg( n_ns=test_config.get('n_ns', 0), diff --git a/ovn-tester/cms/ovn_kubernetes/tests/netpol_large.py b/ovn-tester/cms/ovn_kubernetes/tests/netpol_large.py index eef03585..af8fcdba 100644 --- a/ovn-tester/cms/ovn_kubernetes/tests/netpol_large.py +++ b/ovn-tester/cms/ovn_kubernetes/tests/netpol_large.py @@ -2,8 +2,8 @@ class NetpolLarge(NetPol): - def __init__(self, config, central_node, worker_nodes, global_cfg): - super().__init__('netpol_large', config, central_node, worker_nodes) + def __init__(self, config, cluster, global_cfg): + super().__init__('netpol_large', config, cluster) def run(self, ovn, global_cfg): self.init(ovn, global_cfg) diff --git a/ovn-tester/cms/ovn_kubernetes/tests/netpol_multitenant.py b/ovn-tester/cms/ovn_kubernetes/tests/netpol_multitenant.py index 2a9a2eb8..474044c9 100644 --- a/ovn-tester/cms/ovn_kubernetes/tests/netpol_multitenant.py +++ b/ovn-tester/cms/ovn_kubernetes/tests/netpol_multitenant.py @@ -15,8 +15,8 @@ class NetpolMultitenant(ExtCmd): - def __init__(self, config, central_node, worker_nodes, global_cfg): - super().__init__(config, central_node, worker_nodes) + def __init__(self, config, cluster, global_cfg): + super().__init__(config, cluster) test_config = config.get('netpol_multitenant', dict()) ranges = [ NsRange( diff --git a/ovn-tester/cms/ovn_kubernetes/tests/netpol_small.py b/ovn-tester/cms/ovn_kubernetes/tests/netpol_small.py index 9f3a06d4..8944424a 100644 --- a/ovn-tester/cms/ovn_kubernetes/tests/netpol_small.py +++ b/ovn-tester/cms/ovn_kubernetes/tests/netpol_small.py @@ -2,8 +2,8 @@ class NetpolSmall(NetPol): - def __init__(self, config, central_node, worker_nodes, global_cfg): - super().__init__('netpol_small', config, central_node, worker_nodes) + def __init__(self, config, cluster, global_cfg): + super().__init__('netpol_small', config, cluster) def run(self, ovn, global_cfg): self.init(ovn, global_cfg) diff --git a/ovn-tester/cms/ovn_kubernetes/tests/service_route.py b/ovn-tester/cms/ovn_kubernetes/tests/service_route.py index f6f7e28a..68d38db4 100644 --- a/ovn-tester/cms/ovn_kubernetes/tests/service_route.py +++ b/ovn-tester/cms/ovn_kubernetes/tests/service_route.py @@ -16,8 +16,8 @@ class ServiceRoute(ExtCmd): - def __init__(self, config, central_node, worker_nodes, global_cfg): - super().__init__(config, central_node, worker_nodes) + def __init__(self, config, cluster, global_cfg): + super().__init__(config, cluster) test_config = config.get('service_route', dict()) self.config = ServiceRouteCfg( n_lb=test_config.get('n_lb', 16), diff --git a/ovn-tester/ovn_ext_cmd.py b/ovn-tester/ovn_ext_cmd.py index 2209a3dd..b50c3423 100644 --- a/ovn-tester/ovn_ext_cmd.py +++ b/ovn-tester/ovn_ext_cmd.py @@ -1,18 +1,10 @@ from collections import defaultdict from fnmatch import fnmatch from io import StringIO -from ovn_sandbox import Sandbox - - -# The wrapper allows us to execute the command on all -# matching central containers -class CentralNodeWrapper(Sandbox): - def __init__(self, central_node, container): - super().__init__(central_node.phys_node, container) class ExtCmdUnit: - def __init__(self, conf, central_node, worker_nodes): + def __init__(self, conf, cluster): self.iteration = conf.get('iteration') self.cmd = conf.get('cmd') self.test = conf.get('test') @@ -21,13 +13,11 @@ def __init__(self, conf, central_node, worker_nodes): self.pid_opt = conf.get('pid_opt', '') node = conf.get('node') - self.nodes = [n for n in worker_nodes if fnmatch(n.container, node)] + self.nodes = [ + n for n in cluster.worker_nodes if fnmatch(n.container, node) + ] self.nodes.extend( - [ - CentralNodeWrapper(central_node, c) - for c in central_node.central_containers() - if fnmatch(c, node) - ] + [n for n in cluster.central_nodes if fnmatch(n.container, node)] ) def is_valid(self): @@ -58,10 +48,10 @@ def _node_exec(self, node): class ExtCmd: - def __init__(self, config, central_node, worker_nodes): + def __init__(self, config, cluster): self.cmd_map = defaultdict(list) for ext_cmd in config.get('ext_cmd', list()): - cmd_unit = ExtCmdUnit(ext_cmd, central_node, worker_nodes) + cmd_unit = ExtCmdUnit(ext_cmd, cluster) if cmd_unit.is_valid(): self.cmd_map[(cmd_unit.iteration, cmd_unit.test)].append( cmd_unit diff --git a/ovn-tester/ovn_tester.py b/ovn-tester/ovn_tester.py index 17e355f9..d8133538 100644 --- a/ovn-tester/ovn_tester.py +++ b/ovn-tester/ovn_tester.py @@ -11,7 +11,13 @@ from collections import namedtuple from ovn_sandbox import PhysicalNode -from ovn_workload import BrExConfig, CentralNode, ClusterConfig +from ovn_workload import ( + BrExConfig, + CentralNode, + Cluster, + ClusterConfig, + RelayNode, +) from ovn_utils import DualStackSubnet from ovs.stream import Stream @@ -68,7 +74,6 @@ def read_config(config): node_net=netaddr.IPNetwork(cluster_args['node_net']), n_relays=cluster_args['n_relays'], enable_ssl=cluster_args['enable_ssl'], - node_remote=cluster_args['node_remote'], northd_probe_interval=cluster_args['northd_probe_interval'], db_inactivity_probe=cluster_args['db_inactivity_probe'], node_timeout_s=cluster_args['node_timeout_s'], @@ -167,7 +172,7 @@ def load_cms(cms_name): return cls() -def configure_tests(yaml, central_node, worker_nodes, global_cfg): +def configure_tests(yaml, cluster, global_cfg): tests = [] for section, cfg in yaml.items(): if section in RESERVED: @@ -178,25 +183,30 @@ def configure_tests(yaml, central_node, worker_nodes, global_cfg): ) class_name = ''.join(s.title() for s in section.split('_')) cls = getattr(mod, class_name) - tests.append(cls(yaml, central_node, worker_nodes, global_cfg)) + tests.append(cls(yaml, cluster, global_cfg)) return tests -def create_central_nodes(cluster_config, central): - mgmt_net = cluster_config.node_net - mgmt_ip = mgmt_net.ip + 2 +def create_cluster(cluster_cfg, central, brex_cfg): + protocol = "ssl" if cluster_cfg.enable_ssl else "tcp" + mgmt_ip = cluster_cfg.node_net.ip + 2 db_containers = ( ['ovn-central-1', 'ovn-central-2', 'ovn-central-3'] - if cluster_config.clustered_db + if cluster_cfg.clustered_db else ['ovn-central'] ) - relay_containers = [ - f'ovn-relay-{i + 1}' for i in range(cluster_config.n_relays) + + central_nodes = [ + CentralNode(central, c, mgmt_ip + i, protocol) + for i, c in enumerate(db_containers) ] - central_node = CentralNode( - central, db_containers, relay_containers, mgmt_net, mgmt_ip - ) - return central_node + mgmt_ip += len(central_nodes) + + relay_nodes = [ + RelayNode(central, f'ovn-relay-{i + 1}', mgmt_ip + i, protocol) + for i in range(cluster_cfg.n_relays) + ] + return Cluster(central_nodes, relay_nodes, cluster_cfg, brex_cfg) def set_ssl_keys(cluster_cfg): @@ -225,14 +235,14 @@ def set_ssl_keys(cluster_cfg): cms = load_cms(global_cfg.cms_name) central, workers = read_physical_deployment(sys.argv[1], global_cfg) - central_node = create_central_nodes(cluster_cfg, central) - worker_nodes = cms.create_nodes(cluster_cfg, workers) - tests = configure_tests(config, central_node, worker_nodes, global_cfg) + cluster = create_cluster(cluster_cfg, central, brex_cfg) + cms.add_cluster_worker_nodes(cluster, workers) + tests = configure_tests(config, cluster, global_cfg) if cluster_cfg.enable_ssl: set_ssl_keys(cluster_cfg) - ovn = cms.prepare_test(central_node, worker_nodes, cluster_cfg, brex_cfg) + cms.prepare_test(cluster) for test in tests: - test.run(ovn, global_cfg) + test.run(cluster, global_cfg) sys.exit(0) diff --git a/ovn-tester/ovn_workload.py b/ovn-tester/ovn_workload.py index 2a531e14..1a4ae3b2 100644 --- a/ovn-tester/ovn_workload.py +++ b/ovn-tester/ovn_workload.py @@ -28,7 +28,6 @@ 'db_inactivity_probe', 'node_net', 'enable_ssl', - 'node_remote', 'node_timeout_s', 'internal_net', 'external_net', @@ -53,24 +52,21 @@ class Node(ovn_sandbox.Sandbox): - def __init__(self, phys_node, container, mgmt_net, mgmt_ip): + def __init__(self, phys_node, container, mgmt_ip, protocol): super().__init__(phys_node, container) self.container = container - self.mgmt_net = mgmt_net - self.mgmt_ip = mgmt_ip + self.mgmt_ip = netaddr.IPAddress(mgmt_ip) + self.protocol = protocol class CentralNode(Node): - def __init__( - self, phys_node, db_containers, relay_containers, mgmt_net, mgmt_ip - ): - super().__init__(phys_node, db_containers[0], mgmt_net, mgmt_ip) - self.db_containers = db_containers - self.relay_containers = relay_containers + def __init__(self, phys_node, container, mgmt_ip, protocol): + super().__init__(phys_node, container, mgmt_ip, protocol) - def start(self, cluster_cfg): + def start(self, cluster_cfg, update_election_timeout=False): log.info('Configuring central node') - self.set_raft_election_timeout(cluster_cfg.raft_election_to) + if cluster_cfg.clustered_db and update_election_timeout: + self.set_raft_election_timeout(cluster_cfg.raft_election_to) self.enable_trim_on_compaction() self.set_northd_threads(cluster_cfg.northd_threads) if cluster_cfg.log_txns_db: @@ -78,12 +74,11 @@ def start(self, cluster_cfg): def set_northd_threads(self, n_threads): log.info(f'Configuring northd to use {n_threads} threads') - for container in self.db_containers: - self.phys_node.run( - f'podman exec {container} ovn-appctl -t ' - f'ovn-northd parallel-build/set-n-threads ' - f'{n_threads}' - ) + self.phys_node.run( + f'podman exec {self.container} ovn-appctl -t ' + f'ovn-northd parallel-build/set-n-threads ' + f'{n_threads}' + ) def set_raft_election_timeout(self, timeout_s): for timeout in range(1000, (timeout_s + 1) * 1000, 1000): @@ -102,23 +97,16 @@ def set_raft_election_timeout(self, timeout_s): def enable_trim_on_compaction(self): log.info('Setting DB trim-on-compaction') - for db_container in self.db_containers: - self.phys_node.run( - f'podman exec {db_container} ovs-appctl -t ' - f'/run/ovn/ovnnb_db.ctl ' - f'ovsdb-server/memory-trim-on-compaction on' - ) - self.phys_node.run( - f'podman exec {db_container} ovs-appctl -t ' - f'/run/ovn/ovnsb_db.ctl ' - f'ovsdb-server/memory-trim-on-compaction on' - ) - for relay_container in self.relay_containers: - self.phys_node.run( - f'podman exec {relay_container} ovs-appctl -t ' - f'/run/ovn/ovnsb_db.ctl ' - f'ovsdb-server/memory-trim-on-compaction on' - ) + self.phys_node.run( + f'podman exec {self.container} ovs-appctl -t ' + f'/run/ovn/ovnnb_db.ctl ' + f'ovsdb-server/memory-trim-on-compaction on' + ) + self.phys_node.run( + f'podman exec {self.container} ovs-appctl -t ' + f'/run/ovn/ovnsb_db.ctl ' + f'ovsdb-server/memory-trim-on-compaction on' + ) def enable_txns_db_logging(self): log.info('Enable DB txn logging') @@ -139,15 +127,28 @@ def enable_txns_db_logging(self): 'vlog/disable-rate-limit transaction' ) - def get_connection_string(self, cluster_cfg, port): - protocol = "ssl" if cluster_cfg.enable_ssl else "tcp" - ip = self.mgmt_ip - num_conns = 3 if cluster_cfg.clustered_db else 1 - conns = [f"{protocol}:{ip + idx}:{port}" for idx in range(num_conns)] - return ",".join(conns) + def get_connection_string(self, port): + return f'{self.protocol}:{self.mgmt_ip}:{port}' + - def central_containers(self): - return self.db_containers +class RelayNode(Node): + def __init__(self, phys_node, container, mgmt_ip, protocol): + super().__init__(phys_node, container, mgmt_ip, protocol) + + def start(self): + log.info(f'Configuring relay node {self.container}') + self.enable_trim_on_compaction() + + def get_connection_string(self, port): + return f'{self.protocol}:{self.mgmt_ip}:{port}' + + def enable_trim_on_compaction(self): + log.info('Setting DB trim-on-compaction') + self.phys_node.run( + f'podman exec {self.container} ovs-appctl -t ' + f'/run/ovn/ovnsb_db.ctl ' + f'ovsdb-server/memory-trim-on-compaction on' + ) class WorkerNode(Node): @@ -155,14 +156,14 @@ def __init__( self, phys_node, container, - mgmt_net, mgmt_ip, + protocol, int_net, ext_net, gw_net, unique_id, ): - super().__init__(phys_node, container, mgmt_net, mgmt_ip) + super().__init__(phys_node, container, mgmt_ip, protocol) self.int_net = int_net self.ext_net = ext_net self.gw_net = gw_net @@ -177,19 +178,16 @@ def __init__( def start(self, cluster_cfg): self.vsctl = ovn_utils.OvsVsctl( self, - self.get_connection_string(cluster_cfg, 6640), + self.get_connection_string(6640), cluster_cfg.db_inactivity_probe // 1000, ) @ovn_stats.timeit - def connect(self, cluster_cfg): + def connect(self, remote): log.info( - f'Connecting worker {self.container}: ' - f'ovn-remote = {cluster_cfg.node_remote}' - ) - self.vsctl.set_global_external_id( - 'ovn-remote', f'{cluster_cfg.node_remote}' + f'Connecting worker {self.container}: ' f'ovn-remote = {remote}' ) + self.vsctl.set_global_external_id('ovn-remote', f'{remote}') def configure_localnet(self, physical_net): log.info(f'Creating localnet on {self.container}') @@ -214,7 +212,7 @@ def wait(self, sbctl, timeout_s): @ovn_stats.timeit def provision(self, cluster): - self.connect(cluster.cluster_cfg) + self.connect(cluster.get_relay_connection_string()) self.wait(cluster.sbctl, cluster.cluster_cfg.node_timeout_s) # Create a node switch and connect it to the cluster router. @@ -435,12 +433,8 @@ def ping_ports(self, cluster, ports): if port.ip6: self.ping_port(cluster, port, dest=port.ext_gw6) - def get_connection_string(self, cluster_cfg, port): - protocol = "ssl" if cluster_cfg.enable_ssl else "tcp" - offset = 0 - offset += 3 if cluster_cfg.clustered_db else 1 - offset += cluster_cfg.n_relays - return f"{protocol}:{self.mgmt_ip + offset}:{port}" + def get_connection_string(self, port): + return f"{self.protocol}:{self.mgmt_ip}:{port}" ACL_DEFAULT_DENY_PRIO = 1 @@ -744,10 +738,11 @@ def provision_vips_to_load_balancers(self, backend_lists, version): class Cluster: - def __init__(self, central_node, worker_nodes, cluster_cfg, brex_cfg): + def __init__(self, central_nodes, relay_nodes, cluster_cfg, brex_cfg): # In clustered mode use the first node for provisioning. - self.central_node = central_node - self.worker_nodes = worker_nodes + self.central_nodes = central_nodes + self.relay_nodes = relay_nodes + self.worker_nodes = [] self.cluster_cfg = cluster_cfg self.brex_cfg = brex_cfg self.nbctl = None @@ -760,22 +755,29 @@ def __init__(self, central_node, worker_nodes, cluster_cfg, brex_cfg): self.last_selected_worker = 0 self.n_ns = 0 + def add_workers(self, worker_nodes): + self.worker_nodes.extend(worker_nodes) + def start(self): - self.central_node.start(self.cluster_cfg) - nb_conn = self.central_node.get_connection_string( - self.cluster_cfg, 6641 - ) + for c in self.central_nodes: + c.start( + self.cluster_cfg, + update_election_timeout=(c is self.central_nodes[0]), + ) + nb_conn = self.get_nb_connection_string() inactivity_probe = self.cluster_cfg.db_inactivity_probe // 1000 self.nbctl = ovn_utils.OvnNbctl( - self.central_node, nb_conn, inactivity_probe + self.central_nodes[0], nb_conn, inactivity_probe ) - sb_conn = self.central_node.get_connection_string( - self.cluster_cfg, 6642 - ) + sb_conn = self.get_sb_connection_string() self.sbctl = ovn_utils.OvnSbctl( - self.central_node, sb_conn, inactivity_probe + self.central_nodes[0], sb_conn, inactivity_probe ) + + for r in self.relay_nodes: + r.start() + for w in self.worker_nodes: w.start(self.cluster_cfg) w.configure(self.brex_cfg.physical_net) @@ -789,6 +791,23 @@ def start(self): self.nbctl.set_inactivity_probe(self.cluster_cfg.db_inactivity_probe) self.sbctl.set_inactivity_probe(self.cluster_cfg.db_inactivity_probe) + def get_nb_connection_string(self): + return ','.join( + [db.get_connection_string(6641) for db in self.central_nodes] + ) + + def get_sb_connection_string(self): + return ','.join( + [db.get_connection_string(6642) for db in self.central_nodes] + ) + + def get_relay_connection_string(self): + if len(self.relay_nodes) > 0: + return ','.join( + [db.get_connection_string(6642) for db in self.relay_nodes] + ) + return self.get_sb_connection_string() + def create_cluster_router(self, rtr_name): self.router = self.nbctl.lr_add(rtr_name) self.nbctl.lr_set_options(