""" 2次元不定流+5区分河床変動モデルの実装テンプレート ====================================================== このスクリプトは、Webデモ画像生成ではなく、実際の解析プログラム化を 進めるためのテンプレートです。 想定入力: input/dem_1m.tif input/river_area.gpkg input/levee_area.gpkg 想定出力: output/bed_elevation_final.tif output/water_depth_final.tif output/velocity_final.tif output/bed_change_total.tif 注意: 実務適用には、境界条件、粗度、粒度分布、上流給砂量、観測値による キャリブレーションが必要です。 必要ライブラリ: pip install numpy rasterio geopandas landlab """ from __future__ import annotations import os from pathlib import Path import geopandas as gpd import numpy as np import rasterio from rasterio.features import rasterize try: from landlab import RasterModelGrid from landlab.components import RiverFlowDynamics except ImportError as exc: raise ImportError( "Landlab がインストールされていません。\n" "pip install landlab を実行してください。" ) from exc RHO_W = 1000.0 RHO_S = 2650.0 G = 9.80665 POROSITY = 0.40 THETA_CR = 0.047 MPM_A = 8.0 # 5区分の代表粒径 m GRAIN_D = np.array([0.0005, 0.0020, 0.0100, 0.0300, 0.0750], dtype=float) N_CLASS = len(GRAIN_D) def read_dem_as_landlab_grid(dem_path: str | Path): """1m DEM GeoTIFFをLandlab RasterModelGridへ変換する。""" with rasterio.open(dem_path) as src: dem = src.read(1).astype(float) profile = src.profile.copy() transform = src.transform nodata = src.nodata dx = abs(transform.a) dy = abs(transform.e) if not np.isclose(dx, dy): raise ValueError(f"正方格子ではありません: dx={dx}, dy={dy}") if nodata is not None: dem = np.where(dem == nodata, np.nan, dem) max_z = np.nanmax(dem) dem = np.where(np.isnan(dem), max_z + 100.0, dem) nrows, ncols = dem.shape grid = RasterModelGrid((nrows, ncols), xy_spacing=(dx, dy)) z = np.flipud(dem).ravel() grid.add_field("topographic__elevation", z.copy(), at="node", clobber=True) grid.add_zeros("surface_water__depth", at="node", clobber=True) grid.add_zeros("surface_water__elevation", at="node", clobber=True) grid.add_zeros("surface_water__velocity", at="link", clobber=True) grid.at_node["surface_water__elevation"][:] = ( grid.at_node["topographic__elevation"] + grid.at_node["surface_water__depth"] ) return grid, profile def rasterize_vector(vector_path: str | Path, reference_profile: dict, burn_value: int = 1) -> np.ndarray: """ベクターデータをDEMと同じ配列にラスタ化する。""" gdf = gpd.read_file(vector_path) if gdf.empty: raise ValueError(f"ベクターデータが空です: {vector_path}") shapes = [(geom, burn_value) for geom in gdf.geometry if geom is not None] return rasterize( shapes=shapes, out_shape=(reference_profile["height"], reference_profile["width"]), transform=reference_profile["transform"], fill=0, dtype="uint8", ).astype(bool) def apply_levee_to_dem(grid, levee_mask_top_origin: np.ndarray, levee_raise: float = 5.0) -> None: """堤防ポリゴンを簡易的に高い壁としてDEMへ反映する。""" mask = np.flipud(levee_mask_top_origin).ravel() z = grid.at_node["topographic__elevation"] z[mask] += levee_raise h = grid.at_node["surface_water__depth"] grid.at_node["surface_water__elevation"][:] = z + h def make_left_boundary_inlet(grid, inlet_depth: float = 1.0, inlet_velocity: float = 1.0): """左端から右向きに流入させる簡易境界条件。""" nrows, ncols = grid.shape fixed_entry_nodes = np.arange(0, nrows * ncols, ncols) links = [] nodes = [] for node in fixed_entry_nodes: node_links = grid.links_at_node[node] node_links = node_links[node_links != -1] if len(node_links) == 0: continue nodes.append(node) links.append(node_links[0]) fixed_entry_nodes = np.array(nodes, dtype=int) fixed_entry_links = np.array(links, dtype=int) entry_nodes_h_values = np.full(len(fixed_entry_nodes), inlet_depth) entry_links_vel_values = np.full(len(fixed_entry_links), inlet_velocity) return fixed_entry_nodes, fixed_entry_links, entry_nodes_h_values, entry_links_vel_values def compute_water_surface_slope(grid): """水面勾配と流下方向単位ベクトルを計算する。""" nrows, ncols = grid.shape dx = grid.dx eta = grid.at_node["surface_water__elevation"].reshape((nrows, ncols)) deta_dy, deta_dx = np.gradient(eta, dx, dx) sx = -deta_dx sy = -deta_dy slope_mag = np.sqrt(sx**2 + sy**2) + 1.0e-12 return slope_mag, sx / slope_mag, sy / slope_mag def sediment_transport_5class(grid, F: np.ndarray): """5区分の掃流砂量を簡易MPM型で計算する。""" nrows, ncols = grid.shape h = grid.at_node["surface_water__depth"].reshape((nrows, ncols)) slope_mag, dir_x, dir_y = compute_water_surface_slope(grid) tau = RHO_W * G * h * slope_mag qsx = np.zeros((nrows, ncols, N_CLASS), dtype=float) qsy = np.zeros((nrows, ncols, N_CLASS), dtype=float) s = RHO_S / RHO_W for k, d in enumerate(GRAIN_D): theta = tau / ((RHO_S - RHO_W) * G * d + 1.0e-12) excess = np.maximum(theta - THETA_CR, 0.0) qs = MPM_A * F[:, :, k] * np.sqrt((s - 1.0) * G * d**3) * excess**1.5 qsx[:, :, k] = qs * dir_x qsy[:, :, k] = qs * dir_y return qsx, qsy def divergence(qx: np.ndarray, qy: np.ndarray, dx: float) -> np.ndarray: """2次元フラックスの発散を計算する。""" dqy_dy, _ = np.gradient(qy, dx, dx) _, dqx_dx = np.gradient(qx, dx, dx) return dqx_dx + dqy_dy def update_bed_by_exner( grid, F: np.ndarray, dt_morph: float, river_mask: np.ndarray | None = None, active_layer_thickness: float = 0.2, ): """Exner式により河床高と表層粒度構成を更新する。""" nrows, ncols = grid.shape dx = grid.dx qsx, qsy = sediment_transport_5class(grid, F) dz_total = np.zeros((nrows, ncols), dtype=float) div_classes = np.zeros((nrows, ncols, N_CLASS), dtype=float) for k in range(N_CLASS): div_k = divergence(qsx[:, :, k], qsy[:, :, k], dx) div_classes[:, :, k] = div_k dz_total += -dt_morph * div_k / (1.0 - POROSITY) # 数値安定化のため、1ステップの河床変動量を制限 dz_total = np.clip(dz_total, -0.05, 0.05) if river_mask is not None: mask = np.flipud(river_mask).astype(bool) dz_total = np.where(mask, dz_total, 0.0) z = grid.at_node["topographic__elevation"].reshape((nrows, ncols)) z[:, :] = z + dz_total M = F * active_layer_thickness * (1.0 - POROSITY) for k in range(N_CLASS): dM = -div_classes[:, :, k] * dt_morph M[:, :, k] = np.maximum(M[:, :, k] + dM, 1.0e-9) F_new = M / np.sum(M, axis=2, keepdims=True) if river_mask is not None: mask = np.flipud(river_mask).astype(bool) F = np.where(mask[:, :, None], F_new, F) else: F = F_new h = grid.at_node["surface_water__depth"].reshape((nrows, ncols)) eta = grid.at_node["surface_water__elevation"].reshape((nrows, ncols)) eta[:, :] = z + h return F, dz_total def write_geotiff(path: str | Path, arr_landlab_origin: np.ndarray, profile: dict, nodata=None) -> None: """Landlab配列をGeoTIFFとして保存する。""" arr = np.flipud(arr_landlab_origin).astype("float32") out_profile = profile.copy() out_profile.update(dtype="float32", count=1, compress="deflate", nodata=nodata) with rasterio.open(path, "w", **out_profile) as dst: dst.write(arr, 1) def run_simulation( dem_path: str | Path, out_dir: str | Path, river_polygon_path: str | Path | None = None, levee_polygon_path: str | Path | None = None, total_time: float = 3600.0, dt_flow: float = 1.0, dt_morph: float = 1.0, inlet_depth: float = 1.0, inlet_velocity: float = 1.0, mannings_n: float = 0.035, ) -> None: """2次元流れと河床変動を実行するメイン関数。""" out_dir = Path(out_dir) out_dir.mkdir(parents=True, exist_ok=True) grid, profile = read_dem_as_landlab_grid(dem_path) river_mask = None if river_polygon_path is not None: river_mask = rasterize_vector(river_polygon_path, profile, burn_value=1) if levee_polygon_path is not None: levee_mask = rasterize_vector(levee_polygon_path, profile, burn_value=1) apply_levee_to_dem(grid, levee_mask, levee_raise=5.0) boundary = make_left_boundary_inlet(grid, inlet_depth=inlet_depth, inlet_velocity=inlet_velocity) fixed_entry_nodes, fixed_entry_links, entry_nodes_h_values, entry_links_vel_values = boundary rfd = RiverFlowDynamics( grid, dt=dt_flow, mannings_n=mannings_n, threshold_depth=0.01, fixed_entry_nodes=fixed_entry_nodes, fixed_entry_links=fixed_entry_links, entry_nodes_h_values=entry_nodes_h_values, entry_links_vel_values=entry_links_vel_values, ) nrows, ncols = grid.shape F = np.zeros((nrows, ncols, N_CLASS), dtype=float) F[:, :, :] = np.array([0.20, 0.20, 0.20, 0.20, 0.20]) dz_accum = np.zeros((nrows, ncols), dtype=float) nsteps = int(total_time / dt_flow) for step in range(nsteps): rfd.run_one_step() F, dz = update_bed_by_exner(grid, F, dt_morph=dt_morph, river_mask=river_mask) dz_accum += dz if step % 300 == 0: print(f"step={step}/{nsteps}") z = grid.at_node["topographic__elevation"].reshape((nrows, ncols)) h = grid.at_node["surface_water__depth"].reshape((nrows, ncols)) # リンク流速をノードへ簡易平均 v_node = np.zeros(grid.number_of_nodes) count = np.zeros(grid.number_of_nodes) v_link = np.abs(grid.at_link["surface_water__velocity"]) for link_id, nodes in enumerate(grid.nodes_at_link): n1, n2 = nodes if n1 >= 0: v_node[n1] += v_link[link_id] count[n1] += 1 if n2 >= 0: v_node[n2] += v_link[link_id] count[n2] += 1 v = (v_node / np.maximum(count, 1)).reshape((nrows, ncols)) write_geotiff(out_dir / "bed_elevation_final.tif", z, profile) write_geotiff(out_dir / "water_depth_final.tif", h, profile) write_geotiff(out_dir / "velocity_final.tif", v, profile) write_geotiff(out_dir / "bed_change_total.tif", dz_accum, profile) for k in range(N_CLASS): write_geotiff(out_dir / f"bed_material_fraction_class_{k + 1}.tif", F[:, :, k], profile) print(f"計算完了: {out_dir}") if __name__ == "__main__": run_simulation( dem_path="input/dem_1m.tif", out_dir="output", river_polygon_path="input/river_area.gpkg", levee_polygon_path="input/levee_area.gpkg", total_time=3600.0, dt_flow=1.0, dt_morph=1.0, inlet_depth=1.0, inlet_velocity=1.0, mannings_n=0.035, )