From 1810e9c43a4097a5247be363d81e8c7da1ce7f8d Mon Sep 17 00:00:00 2001 From: Mehdi Ataei Date: Fri, 2 Aug 2024 14:25:50 -0400 Subject: [PATCH] Fixed distribute and post processing bugs --- examples/cfd/flow_past_sphere_3d.py | 55 ++++++++++------- examples/cfd/lid_driven_cavity_2d.py | 47 +++++++++------ .../cfd/lid_driven_cavity_2d_distributed.py | 3 +- examples/cfd/windtunnel_3d.py | 60 ++++++++++++------- xlb/distribute/distribute.py | 44 ++++++++++++-- xlb/operator/stepper/nse_stepper.py | 4 +- 6 files changed, 147 insertions(+), 66 deletions(-) diff --git a/examples/cfd/flow_past_sphere_3d.py b/examples/cfd/flow_past_sphere_3d.py index a1682b8..a7d9dac 100644 --- a/examples/cfd/flow_past_sphere_3d.py +++ b/examples/cfd/flow_past_sphere_3d.py @@ -15,44 +15,51 @@ import numpy as np import jax.numpy as jnp + class FlowOverSphere: def __init__(self, omega, grid_shape, velocity_set, backend, precision_policy): - # initialize backend xlb.init( velocity_set=velocity_set, default_backend=backend, default_precision_policy=precision_policy, ) - + self.grid_shape = grid_shape self.velocity_set = velocity_set self.backend = backend self.precision_policy = precision_policy - self.grid, self.f_0, self.f_1, self.missing_mask, self.boundary_mask = create_nse_fields(grid_shape) + self.grid, self.f_0, self.f_1, self.missing_mask, self.boundary_mask = ( + create_nse_fields(grid_shape) + ) self.stepper = None self.boundary_conditions = [] # Setup the simulation BC, its initial conditions, and the stepper self._setup(omega) - + def _setup(self, omega): self.setup_boundary_conditions() self.setup_boundary_masks() self.initialize_fields() self.setup_stepper(omega) - + def define_boundary_indices(self): - inlet = self.grid.boundingBoxIndices['left'] - outlet = self.grid.boundingBoxIndices['right'] - walls = [self.grid.boundingBoxIndices['bottom'][i] + self.grid.boundingBoxIndices['top'][i] + - self.grid.boundingBoxIndices['front'][i] + self.grid.boundingBoxIndices['back'][i] for i in range(self.velocity_set.d)] - + inlet = self.grid.boundingBoxIndices["left"] + outlet = self.grid.boundingBoxIndices["right"] + walls = [ + self.grid.boundingBoxIndices["bottom"][i] + + self.grid.boundingBoxIndices["top"][i] + + self.grid.boundingBoxIndices["front"][i] + + self.grid.boundingBoxIndices["back"][i] + for i in range(self.velocity_set.d) + ] + sphere_radius = self.grid_shape[1] // 12 x = np.arange(self.grid_shape[0]) y = np.arange(self.grid_shape[1]) z = np.arange(self.grid_shape[2]) - X, Y, Z = np.meshgrid(x, y, z, indexing='ij') + X, Y, Z = np.meshgrid(x, y, z, indexing="ij") indices = np.where( (X - self.grid_shape[0] // 6) ** 2 + (Y - self.grid_shape[1] // 2) ** 2 @@ -62,7 +69,7 @@ def define_boundary_indices(self): sphere = [tuple(indices[i]) for i in range(self.velocity_set.d)] return inlet, outlet, walls, sphere - + def setup_boundary_conditions(self): inlet, outlet, walls, sphere = self.define_boundary_indices() bc_left = EquilibriumBC(rho=1.0, u=(0.02, 0.0, 0.0), indices=inlet) @@ -83,24 +90,31 @@ def setup_boundary_masks(self): def initialize_fields(self): self.f_0 = initialize_eq(self.f_0, self.grid, self.velocity_set, self.backend) - + def setup_stepper(self, omega): self.stepper = IncompressibleNavierStokesStepper( omega, boundary_conditions=self.boundary_conditions ) - def run(self, num_steps): + def run(self, num_steps, post_process_interval=100): for i in range(num_steps): - self.f_1 = self.stepper(self.f_0, self.f_1, self.boundary_mask, self.missing_mask, i) + self.f_1 = self.stepper( + self.f_0, self.f_1, self.boundary_mask, self.missing_mask, i + ) self.f_0, self.f_1 = self.f_1, self.f_0 + if i % post_process_interval == 0 or i == num_steps - 1: + self.post_process(i) + def post_process(self, i): # Write the results. We'll use JAX backend for the post-processing if not isinstance(self.f_0, jnp.ndarray): - self.f_0 = wp.to_jax(self.f_0) + f_0 = wp.to_jax(self.f_0) + else: + f_0 = self.f_0 macro = Macroscopic(compute_backend=ComputeBackend.JAX) - rho, u = macro(self.f_0) + rho, u = macro(f_0) # remove boundary cells u = u[:, 1:-1, 1:-1, 1:-1] @@ -120,6 +134,7 @@ def post_process(self, i): precision_policy = PrecisionPolicy.FP32FP32 omega = 1.6 - simulation = FlowOverSphere(omega, grid_shape, velocity_set, backend, precision_policy) - simulation.run(num_steps=10000) - simulation.post_process(i=10000) + simulation = FlowOverSphere( + omega, grid_shape, velocity_set, backend, precision_policy + ) + simulation.run(num_steps=10000, post_process_interval=1000) diff --git a/examples/cfd/lid_driven_cavity_2d.py b/examples/cfd/lid_driven_cavity_2d.py index eac2909..ecc6f0b 100644 --- a/examples/cfd/lid_driven_cavity_2d.py +++ b/examples/cfd/lid_driven_cavity_2d.py @@ -13,22 +13,23 @@ class LidDrivenCavity2D: def __init__(self, omega, grid_shape, velocity_set, backend, precision_policy): - # initialize backend xlb.init( velocity_set=velocity_set, default_backend=backend, default_precision_policy=precision_policy, ) - + self.grid_shape = grid_shape self.velocity_set = velocity_set self.backend = backend self.precision_policy = precision_policy - self.grid, self.f_0, self.f_1, self.missing_mask, self.boundary_mask = create_nse_fields(grid_shape) + self.grid, self.f_0, self.f_1, self.missing_mask, self.boundary_mask = ( + create_nse_fields(grid_shape) + ) self.stepper = None self.boundary_conditions = [] - + # Setup the simulation BC, its initial conditions, and the stepper self._setup(omega) @@ -39,11 +40,15 @@ def _setup(self, omega): self.setup_stepper(omega) def define_boundary_indices(self): - lid = self.grid.boundingBoxIndices['top'] - walls = [self.grid.boundingBoxIndices['bottom'][i] + self.grid.boundingBoxIndices['left'][i] + - self.grid.boundingBoxIndices['right'][i] for i in range(self.velocity_set.d)] + lid = self.grid.boundingBoxIndices["top"] + walls = [ + self.grid.boundingBoxIndices["bottom"][i] + + self.grid.boundingBoxIndices["left"][i] + + self.grid.boundingBoxIndices["right"][i] + for i in range(self.velocity_set.d) + ] return lid, walls - + def setup_boundary_conditions(self): lid, walls = self.define_boundary_indices() bc_top = EquilibriumBC(rho=1.0, u=(0.02, 0.0), indices=lid) @@ -62,25 +67,32 @@ def setup_boundary_masks(self): def initialize_fields(self): self.f_0 = initialize_eq(self.f_0, self.grid, self.velocity_set, self.backend) - + def setup_stepper(self, omega): self.stepper = IncompressibleNavierStokesStepper( omega, boundary_conditions=self.boundary_conditions ) - def run(self, num_steps): + def run(self, num_steps, post_process_interval=100): for i in range(num_steps): - self.f_1 = self.stepper(self.f_0, self.f_1, self.boundary_mask, self.missing_mask, i) + self.f_1 = self.stepper( + self.f_0, self.f_1, self.boundary_mask, self.missing_mask, i + ) self.f_0, self.f_1 = self.f_1, self.f_0 + if i % post_process_interval == 0 or i == num_steps - 1: + self.post_process(i) + def post_process(self, i): # Write the results. We'll use JAX backend for the post-processing if not isinstance(self.f_0, jnp.ndarray): - self.f_0 = wp.to_jax(self.f_0) + f_0 = wp.to_jax(self.f_0) + else: + f_0 = self.f_0 macro = Macroscopic(compute_backend=ComputeBackend.JAX) - rho, u = macro(self.f_0) + rho, u = macro(f_0) # remove boundary cells rho = rho[:, 1:-1, 1:-1] @@ -95,13 +107,14 @@ def post_process(self, i): if __name__ == "__main__": # Running the simulation - grid_size = 128 + grid_size = 500 grid_shape = (grid_size, grid_size) backend = ComputeBackend.JAX velocity_set = xlb.velocity_set.D2Q9() precision_policy = PrecisionPolicy.FP32FP32 omega = 1.6 - simulation = LidDrivenCavity2D(omega, grid_shape, velocity_set, backend, precision_policy) - simulation.run(num_steps=500) - simulation.post_process(i=500) + simulation = LidDrivenCavity2D( + omega, grid_shape, velocity_set, backend, precision_policy + ) + simulation.run(num_steps=5000, post_process_interval=1000) diff --git a/examples/cfd/lid_driven_cavity_2d_distributed.py b/examples/cfd/lid_driven_cavity_2d_distributed.py index e71da52..7a43a14 100644 --- a/examples/cfd/lid_driven_cavity_2d_distributed.py +++ b/examples/cfd/lid_driven_cavity_2d_distributed.py @@ -31,5 +31,4 @@ def setup_stepper(self, omega): omega=1.6 simulation = LidDrivenCavity2D_distributed(omega, grid_shape, velocity_set, backend, precision_policy) - simulation.run(num_steps=5000) - simulation.post_process(i=5000) \ No newline at end of file + simulation.run(num_steps=5000, post_process_interval=1000) diff --git a/examples/cfd/windtunnel_3d.py b/examples/cfd/windtunnel_3d.py index 49e26f7..35140ee 100644 --- a/examples/cfd/windtunnel_3d.py +++ b/examples/cfd/windtunnel_3d.py @@ -19,23 +19,26 @@ class WindTunnel3D: - def __init__(self, omega, wind_speed, grid_shape, velocity_set, backend, precision_policy): - + def __init__( + self, omega, wind_speed, grid_shape, velocity_set, backend, precision_policy + ): # initialize backend xlb.init( velocity_set=velocity_set, default_backend=backend, default_precision_policy=precision_policy, ) - + self.grid_shape = grid_shape self.velocity_set = velocity_set self.backend = backend self.precision_policy = precision_policy - self.grid, self.f_0, self.f_1, self.missing_mask, self.boundary_mask = create_nse_fields(grid_shape) + self.grid, self.f_0, self.f_1, self.missing_mask, self.boundary_mask = ( + create_nse_fields(grid_shape) + ) self.stepper = None self.boundary_conditions = [] - + # Setup the simulation BC, its initial conditions, and the stepper self._setup(omega, wind_speed) @@ -54,31 +57,38 @@ def voxelize_stl(self, stl_filename, length_lbm_unit): return mesh_matrix, pitch def define_boundary_indices(self): - inlet = self.grid.boundingBoxIndices['left'] - outlet = self.grid.boundingBoxIndices['right'] - walls = [self.grid.boundingBoxIndices['bottom'][i] + self.grid.boundingBoxIndices['top'][i] + - self.grid.boundingBoxIndices['front'][i] + self.grid.boundingBoxIndices['back'][i] for i in range(self.velocity_set.d)] - + inlet = self.grid.boundingBoxIndices["left"] + outlet = self.grid.boundingBoxIndices["right"] + walls = [ + self.grid.boundingBoxIndices["bottom"][i] + + self.grid.boundingBoxIndices["top"][i] + + self.grid.boundingBoxIndices["front"][i] + + self.grid.boundingBoxIndices["back"][i] + for i in range(self.velocity_set.d) + ] + stl_filename = "examples/cfd/stl-files/DrivAer-Notchback.stl" grid_size_x = self.grid_shape[0] car_length_lbm_unit = grid_size_x / 4 car_voxelized, pitch = self.voxelize_stl(stl_filename, car_length_lbm_unit) car_area = np.prod(car_voxelized.shape[1:]) - tx, ty, tz = np.array([grid_size_x, grid_size_y, grid_size_z]) - car_voxelized.shape + tx, ty, tz = ( + np.array([grid_size_x, grid_size_y, grid_size_z]) - car_voxelized.shape + ) shift = [tx // 4, ty // 2, 0] car = np.argwhere(car_voxelized) + shift car = np.array(car).T car = [tuple(car[i]) for i in range(self.velocity_set.d)] return inlet, outlet, walls, car - + def setup_boundary_conditions(self, wind_speed): inlet, outlet, walls, car = self.define_boundary_indices() bc_left = EquilibriumBC(rho=1.0, u=(wind_speed, 0.0, 0.0), indices=inlet) bc_walls = FullwayBounceBackBC(indices=walls) bc_do_nothing = DoNothingBC(indices=outlet) - bc_car= FullwayBounceBackBC(indices=car) + bc_car = FullwayBounceBackBC(indices=car) self.boundary_conditions = [bc_left, bc_walls, bc_do_nothing, bc_car] def setup_boundary_masks(self): @@ -93,26 +103,35 @@ def setup_boundary_masks(self): def initialize_fields(self): self.f_0 = initialize_eq(self.f_0, self.grid, self.velocity_set, self.backend) - + def setup_stepper(self, omega): self.stepper = IncompressibleNavierStokesStepper( omega, boundary_conditions=self.boundary_conditions, collision_type="KBC" ) - def run(self, num_steps, print_interval): + def run(self, num_steps, print_interval, post_process_interval=100): start_time = time.time() for i in range(num_steps): - self.f_1 = self.stepper(self.f_0, self.f_1, self.boundary_mask, self.missing_mask, i) + self.f_1 = self.stepper( + self.f_0, self.f_1, self.boundary_mask, self.missing_mask, i + ) self.f_0, self.f_1 = self.f_1, self.f_0 if (i + 1) % print_interval == 0: elapsed_time = time.time() - start_time - print(f"Iteration: {i+1}/{num_steps} | Time elapsed: {elapsed_time:.2f}s") + print( + f"Iteration: {i+1}/{num_steps} | Time elapsed: {elapsed_time:.2f}s" + ) + + if i % post_process_interval == 0 or i == num_steps - 1: + self.post_process(i) def post_process(self, i): # Write the results. We'll use JAX backend for the post-processing if not isinstance(self.f_0, jnp.ndarray): f_0 = wp.to_jax(self.f_0) + else: + f_0 = self.f_0 macro = Macroscopic(compute_backend=ComputeBackend.JAX) @@ -159,6 +178,7 @@ def post_process(self, i): print(f"Max iterations: {num_steps}") print("\n" + "=" * 50 + "\n") - simulation = WindTunnel3D(omega, wind_speed, grid_shape, velocity_set, backend, precision_policy) - simulation.run(num_steps, print_interval) - simulation.post_process(i=num_steps) + simulation = WindTunnel3D( + omega, wind_speed, grid_shape, velocity_set, backend, precision_policy + ) + simulation.run(num_steps, print_interval, post_process_interval=1000) diff --git a/xlb/distribute/distribute.py b/xlb/distribute/distribute.py index ad07dd0..bcee7dd 100644 --- a/xlb/distribute/distribute.py +++ b/xlb/distribute/distribute.py @@ -1,16 +1,13 @@ from jax.sharding import PartitionSpec as P from xlb.operator import Operator -from xlb import DefaultConfig -from xlb import ComputeBackend +from xlb.operator.stepper import IncompressibleNavierStokesStepper +from xlb.operator.boundary_condition.boundary_condition import ImplementationStep from jax import lax from jax.experimental.shard_map import shard_map from jax import jit -import jax.numpy as jnp -import warp as wp -from typing import Tuple -def distribute( +def distribute_operator( operator: Operator, grid, velocity_set, @@ -85,3 +82,38 @@ def _wrapped_operator(*args): return distributed_operator(*args) return jit(_wrapped_operator) + + +def distribute(operator, grid, velocity_set, num_results=1, ops="permute"): + """ + Distribute an operator or a stepper. + If the operator is a stepper, check for post-streaming boundary conditions + before deciding how to distribute. + """ + if isinstance(operator, IncompressibleNavierStokesStepper): + # Check for post-streaming boundary conditions + has_post_streaming_bc = any( + bc.implementation_step == ImplementationStep.STREAMING + for bc in operator.boundary_conditions + ) + + if has_post_streaming_bc: + # If there are post-streaming BCs, only distribute the stream operator + distributed_stream = distribute_operator( + operator.stream, grid, velocity_set + ) + operator.stream = distributed_stream + else: + # If no post-streaming BCs, distribute the whole operator + distributed_op = distribute_operator( + operator, grid, velocity_set, num_results=num_results, ops=ops + ) + return distributed_op + + return operator + else: + # For other operators, apply the original distribution logic + distributed_op = distribute_operator( + operator, grid, velocity_set, num_results=num_results, ops=ops + ) + return distributed_op diff --git a/xlb/operator/stepper/nse_stepper.py b/xlb/operator/stepper/nse_stepper.py index 11b5615..3f986d3 100644 --- a/xlb/operator/stepper/nse_stepper.py +++ b/xlb/operator/stepper/nse_stepper.py @@ -220,7 +220,9 @@ def kernel3d( feq = self.equilibrium.warp_functional(rho, u) # Apply collision - f_post_collision = self.collision.warp_functional(f_post_stream, feq, rho, u) + f_post_collision = self.collision.warp_functional( + f_post_stream, feq, rho, u + ) # Apply collision type boundary conditions if _boundary_id == _fullway_bounce_back_bc: