""" 河川氾濫2次元シミュレーション サンプル ====================================== このスクリプトは、demo.godo-tys.jp/software/floodstream/ に掲載するための 軽量なPythonサンプルです。 想定 ---- - 1m標高メッシュDEMを使う - 河川、堤防、氾濫原の地形を格子上で扱う - 2次元浅水流モデルの考え方に基づき、簡易な拡散波近似で浸水深を更新する - 最大浸水深、時刻別浸水範囲、流速・流向、浸水面積・ボリューム・評価地点水位を出力する 注意 ---- このコードはWeb掲載用の教育・デモ用テンプレートです。 実務では、境界条件、河道断面、堤防高、破堤条件、建物・道路、粗度、観測水位、 CFL条件、保存性を確認した有限体積法などが必要です。 """ from __future__ import annotations from dataclasses import dataclass from pathlib import Path import numpy as np try: import matplotlib.pyplot as plt except ImportError: # pragma: no cover - matplotlib未導入環境向け plt = None @dataclass class FloodConfig: """計算条件。実務では外部設定ファイルやAPI入力に置き換える。""" nx: int = 140 ny: int = 90 dx: float = 5.0 # デモは5m格子。実務では1m DEMに置換可能 dt: float = 1.0 total_time: float = 1800.0 manning_n: float = 0.04 diffusion_factor: float = 0.16 inflow_m3_s: float = 22.0 breach_start_s: float = 300.0 breach_width_cells: int = 8 inundation_threshold_m: float = 0.05 output_dir: str = "output_floodstream" @property def cell_area(self) -> float: return self.dx * self.dx def make_synthetic_terrain(cfg: FloodConfig) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ デモ用の河道・堤防・氾濫原DEMを作成する。 Returns ------- dem : ndarray 地盤高。 river_mask : ndarray of bool 河道セル。 levee_mask : ndarray of bool 堤防セル。 basin_mask : ndarray of bool 氾濫原の低地セル。図示・評価地点の補助に使う。 """ y, x = np.mgrid[0 : cfg.ny, 0 : cfg.nx] # 全体として下流方向に低くなる地形を作る。 dem = 28.0 - 0.022 * x + 0.004 * y # 蛇行した河道中心線。 center = cfg.ny * 0.42 + 7.0 * np.sin(x / 18.0) dist = y - center river_mask = np.abs(dist) < 3.2 levee_mask = (np.abs(dist) >= 3.2) & (np.abs(dist) < 5.8) # 河道を掘り下げ、堤防を高くする。 dem[river_mask] -= 2.0 dem[levee_mask] += 1.2 # 右岸側の氾濫原に浅い窪地を置く。 basin = np.exp(-(((x - 85.0) / 26.0) ** 2 + ((y - 62.0) / 16.0) ** 2)) basin_mask = basin > 0.18 dem -= 1.05 * basin # 道路盛土のような線状の微高地を入れ、流向が単調になりすぎないようにする。 road = np.exp(-((y - (70 - 0.08 * x)) / 2.2) ** 2) dem += 0.25 * road return dem, river_mask, levee_mask, basin_mask def apply_breach(levee_mask: np.ndarray, dem: np.ndarray, cfg: FloodConfig) -> np.ndarray: """堤防の一部を切り下げ、越水・破堤箇所を作る。""" dem_breached = dem.copy() ny, nx = dem.shape cx = int(nx * 0.52) cols = slice(cx - cfg.breach_width_cells // 2, cx + cfg.breach_width_cells // 2) # 該当列の堤防セルを周辺氾濫原程度まで下げる。 rows = np.where(levee_mask[:, cx])[0] if len(rows) > 0: for r in rows: dem_breached[max(r - 1, 0) : min(r + 2, ny), cols] -= 1.45 return dem_breached def add_upstream_inflow(depth: np.ndarray, river_mask: np.ndarray, cfg: FloodConfig, t: float) -> None: """上流端の河道セルに流入量を水深として加える。""" upstream_cols = slice(0, 2) cells = river_mask[:, upstream_cols] n = max(int(cells.sum()), 1) # ピークを持つ簡易ハイドログラフ。 hydrograph_factor = 0.50 + 0.70 * np.exp(-((t - 820.0) / 430.0) ** 2) added_volume = cfg.inflow_m3_s * hydrograph_factor * cfg.dt depth[:, upstream_cols][cells] += added_volume / (n * cfg.cell_area) def compute_flux(depth: np.ndarray, water_level: np.ndarray, cfg: FloodConfig) -> tuple[np.ndarray, np.ndarray]: """ 水位勾配に比例する簡易拡散波フラックスを計算する。 qx, qyはセル界面を厳密に持つ形式ではなく、デモ用にセル中心勾配から求める簡易量です。 """ deta_dy, deta_dx = np.gradient(water_level, cfg.dx, cfg.dx) h_eff = np.maximum(depth, 0.0) conveyance = cfg.diffusion_factor * (1.0 / cfg.manning_n) * h_eff ** (5.0 / 3.0) qx = -conveyance * deta_dx qy = -conveyance * deta_dy return qx, qy def divergence(qx: np.ndarray, qy: np.ndarray, dx: float) -> np.ndarray: """2次元フラックスの発散を計算する。""" dqy_dy, _ = np.gradient(qy, dx, dx) _, dqx_dx = np.gradient(qx, dx, dx) return dqx_dx + dqy_dy def _inflow_factor(t: float) -> float: """ピークを持つ簡易ハイドログラフ係数。""" return 0.35 + 0.85 * np.exp(-((t - 900.0) / 430.0) ** 2) def synthetic_depth_at_time( t: float, dem: np.ndarray, river_mask: np.ndarray, levee_mask: np.ndarray, cfg: FloodConfig, ) -> np.ndarray: """ デモ用の時刻別浸水深を作成する。 本関数は、Webページで結果図の見方を説明するための軽量サンプルです。 実務版では、ここを有限体積法・浅水流方程式・不等流/不定流境界条件に置き換えます。 """ y, x = np.mgrid[0 : cfg.ny, 0 : cfg.nx] depth = np.zeros_like(dem) # 河道内の水深。上流から下流へわずかに減衰させ、ピーク時間で増えるようにする。 river_depth = (0.55 + 0.95 * _inflow_factor(t)) * (1.0 - 0.18 * x / max(cfg.nx - 1, 1)) depth[river_mask] = river_depth[river_mask] # 破堤箇所の概略位置を決める。 breach_col = int(cfg.nx * 0.52) breach_rows = np.where(levee_mask[:, breach_col])[0] breach_row = int(breach_rows.max()) if len(breach_rows) else int(cfg.ny * 0.48) # 破堤開始前は河道内水深のみ。 if t < cfg.breach_start_s: return depth progress = np.clip((t - cfg.breach_start_s) / (cfg.total_time - cfg.breach_start_s), 0.0, 1.0) # 右岸側へ流出し、下流方向に広がる氾濫水を楕円状の波面として表現する。 center_x = breach_col + 20.0 + 38.0 * progress center_y = breach_row + 14.0 + 12.0 * progress spread_x = 12.0 + 44.0 * progress spread_y = 7.0 + 24.0 * progress plume = np.exp(-(((x - center_x) / spread_x) ** 2 + ((y - center_y) / spread_y) ** 2)) # 地盤が低いほど水がたまりやすい、という簡易な地形補正。 floodplain = (y > breach_row - 2) & (x > breach_col - 8) local_ref = np.percentile(dem[floodplain], 68) if np.any(floodplain) else np.percentile(dem, 55) lowness = np.clip((local_ref - dem + 0.35) / 1.4, 0.0, 1.0) # 道路盛土・微高地で分断される雰囲気を残すため、地形補正を乗じる。 flood_depth = (0.15 + 1.65 * progress) * plume * (0.30 + 0.95 * lowness) flood_depth *= floodplain # 破堤直下には局所的に深い領域を作る。 breach_pool = np.exp(-(((x - (breach_col + 7.0)) / 9.0) ** 2 + ((y - (breach_row + 5.0)) / 6.0) ** 2)) flood_depth += (0.35 + 0.55 * progress) * breach_pool * floodplain # 水深がごく小さい場所は未浸水として扱う。 flood_depth = np.where(flood_depth > cfg.inundation_threshold_m, flood_depth, 0.0) depth = np.maximum(depth, flood_depth) return depth def estimate_velocity( depth: np.ndarray, dem: np.ndarray, river_mask: np.ndarray, levee_mask: np.ndarray, cfg: FloodConfig, ) -> tuple[np.ndarray, np.ndarray]: """ 流速・流向の表示用ベクトルを推定する。 厳密な流速ではなく、Webデモの「流速分布・流向分布」の見せ方を説明するための 簡易ベクトルです。河道内は下流方向、氾濫原は破堤箇所から低地へ広がる方向を与えます。 """ y, x = np.mgrid[0 : cfg.ny, 0 : cfg.nx] qx = np.zeros_like(depth) qy = np.zeros_like(depth) wetted = depth > cfg.inundation_threshold_m speed = 0.06 + 0.18 * np.sqrt(np.maximum(depth, 0.0)) # 河道内は概ね下流方向へ流れる。 qx[river_mask & wetted] = speed[river_mask & wetted] qy[river_mask & wetted] = 0.12 * speed[river_mask & wetted] * np.sin(x[river_mask & wetted] / 14.0) # 破堤箇所から右岸側低地へ広がる方向。 breach_col = int(cfg.nx * 0.52) breach_rows = np.where(levee_mask[:, breach_col])[0] breach_row = int(breach_rows.max()) if len(breach_rows) else int(cfg.ny * 0.48) flood_mask = wetted & (~river_mask) vx = x - breach_col + 10.0 vy = y - breach_row + 4.0 norm = np.sqrt(vx * vx + vy * vy) + 1.0e-6 qx[flood_mask] = speed[flood_mask] * vx[flood_mask] / norm[flood_mask] qy[flood_mask] = speed[flood_mask] * vy[flood_mask] / norm[flood_mask] return qx, qy def run_flood_simulation(cfg: FloodConfig) -> dict: """簡易2次元氾濫デモ計算を実行する。""" dem, river_mask, levee_mask, basin_mask = make_synthetic_terrain(cfg) breached_dem = apply_breach(levee_mask, dem, cfg) snapshot_times = [300, 600, 900, 1200, 1800] snapshots: dict[int, np.ndarray] = {} max_depth = np.zeros_like(dem) for sec in snapshot_times: depth = synthetic_depth_at_time(sec, breached_dem, river_mask, levee_mask, cfg) snapshots[sec] = depth max_depth = np.maximum(max_depth, depth) final_depth = synthetic_depth_at_time(cfg.total_time, breached_dem, river_mask, levee_mask, cfg) max_depth = np.maximum(max_depth, final_depth) qx, qy = estimate_velocity(final_depth, breached_dem, river_mask, levee_mask, cfg) # 評価地点は、破堤箇所の下流側・右岸側低地に置く。 eval_row = int(cfg.ny * 0.68) eval_col = int(cfg.nx * 0.62) metrics = [] for t in np.arange(0.0, cfg.total_time + 1.0, 30.0): depth = synthetic_depth_at_time(t, breached_dem, river_mask, levee_mask, cfg) inundated = depth > cfg.inundation_threshold_m area_m2 = float(inundated.sum() * cfg.cell_area) volume_m3 = float(np.maximum(depth - cfg.inundation_threshold_m, 0.0).sum() * cfg.cell_area) eval_water_level = float(breached_dem[eval_row, eval_col] + depth[eval_row, eval_col]) metrics.append((t / 60.0, area_m2 / 10000.0, volume_m3 / 1000.0, eval_water_level)) return { "dem": dem, "breached_dem": breached_dem, "river_mask": river_mask, "levee_mask": levee_mask, "basin_mask": basin_mask, "final_depth": final_depth, "max_depth": max_depth, "snapshots": snapshots, "qx": qx, "qy": qy, "metrics": np.array(metrics), "eval_point": (eval_row, eval_col), } def _axis_in_meters(ax, cfg: FloodConfig) -> None: """格子番号ではなく概略距離で軸を表示する。""" ax.set_xlabel("x distance (m)") ax.set_ylabel("y distance (m)") ax.set_xticks(np.linspace(0, cfg.nx - 1, 5)) ax.set_xticklabels([f"{v:.0f}" for v in np.linspace(0, (cfg.nx - 1) * cfg.dx, 5)]) ax.set_yticks(np.linspace(0, cfg.ny - 1, 5)) ax.set_yticklabels([f"{v:.0f}" for v in np.linspace(0, (cfg.ny - 1) * cfg.dx, 5)]) def save_figures(result: dict, cfg: FloodConfig) -> None: """Webページ掲載用のサンプル図を保存する。""" if plt is None: print("matplotlibがないため、図の保存はスキップします。") return out = Path(cfg.output_dir) out.mkdir(parents=True, exist_ok=True) # 1. 最大浸水深分布 fig, ax = plt.subplots(figsize=(9.5, 5.8)) depth_plot = np.ma.masked_where(result["max_depth"] <= cfg.inundation_threshold_m, result["max_depth"]) ax.imshow(result["dem"], origin="upper", cmap="terrain", alpha=0.45) im = ax.imshow(depth_plot, origin="upper", cmap="Blues", vmin=0.0, vmax=max(0.6, float(result["max_depth"].max()))) ax.contour(result["river_mask"], levels=[0.5], colors="navy", linewidths=0.7) ax.contour(result["levee_mask"], levels=[0.5], colors="saddlebrown", linewidths=0.6) ax.scatter(result["eval_point"][1], result["eval_point"][0], marker="x", s=70, c="red", label="evaluation point") ax.set_title("Maximum inundation depth distribution") ax.legend(loc="lower left", fontsize=8) _axis_in_meters(ax, cfg) fig.colorbar(im, ax=ax, label="depth (m)") fig.tight_layout() fig.savefig(out / "floodstream_01_max_depth_distribution.png", dpi=180) plt.close(fig) # 2. 時刻別浸水範囲・水深 snapshot_items = sorted(result["snapshots"].items())[:4] fig, axes = plt.subplots(2, 2, figsize=(10.5, 7.4), sharex=True, sharey=True) for ax, (sec, depth) in zip(axes.ravel(), snapshot_items): depth_plot = np.ma.masked_where(depth <= cfg.inundation_threshold_m, depth) ax.imshow(result["dem"], origin="upper", cmap="terrain", alpha=0.42) im = ax.imshow(depth_plot, origin="upper", cmap="Blues", vmin=0.0, vmax=max(0.6, float(result["max_depth"].max()))) ax.contour(depth > cfg.inundation_threshold_m, levels=[0.5], colors="black", linewidths=0.5) ax.set_title(f"t = {sec // 60} min") _axis_in_meters(ax, cfg) fig.suptitle("Inundation extent and depth by time", y=0.98) fig.colorbar(im, ax=axes.ravel().tolist(), shrink=0.92, label="depth (m)") fig.savefig(out / "floodstream_02_time_depth_snapshots.png", dpi=180, bbox_inches="tight") plt.close(fig) # 3. 流速分布・流向分布 qx = result["qx"] qy = result["qy"] speed = np.sqrt(qx * qx + qy * qy) positive_speed = speed[speed > 0.0] velocity_threshold = float(np.percentile(positive_speed, 45)) if positive_speed.size else 0.0 speed_plot = np.ma.masked_where( (result["final_depth"] <= cfg.inundation_threshold_m) | (speed <= velocity_threshold), speed, ) fig, ax = plt.subplots(figsize=(9.5, 5.8)) ax.imshow(result["dem"], origin="upper", cmap="terrain", alpha=0.38) im = ax.imshow(speed_plot, origin="upper", cmap="viridis") step = 8 yy, xx = np.mgrid[0 : cfg.ny : step, 0 : cfg.nx : step] qx_s = qx[::step, ::step] qy_s = qy[::step, ::step] mag_s = np.sqrt(qx_s * qx_s + qy_s * qy_s) arrow_threshold = float(np.percentile(positive_speed, 78)) if positive_speed.size else 0.0 mask = mag_s > arrow_threshold ax.quiver(xx[mask], yy[mask], qx_s[mask], qy_s[mask], angles="xy", scale_units="xy", scale=0.035, width=0.0022) ax.set_title("Flow velocity and direction") _axis_in_meters(ax, cfg) fig.colorbar(im, ax=ax, label="relative velocity") fig.tight_layout() fig.savefig(out / "floodstream_03_velocity_flow_direction.png", dpi=180) plt.close(fig) # 4. 浸水面積、浸水ボリューム、評価地点水位 metrics = result["metrics"] fig, ax1 = plt.subplots(figsize=(9.5, 5.6)) ax1.plot(metrics[:, 0], metrics[:, 1], label="inundated area (ha)", linewidth=2.0) ax1.plot(metrics[:, 0], metrics[:, 2], label="inundation volume (10^3 m3)", linewidth=2.0) ax1.set_xlabel("time (min)") ax1.set_ylabel("area / volume") ax1.grid(True, alpha=0.25) ax2 = ax1.twinx() ax2.plot(metrics[:, 0], metrics[:, 3], label="water level at evaluation point (m)", linestyle="--", linewidth=2.0) ax2.set_ylabel("water level (m)") lines1, labels1 = ax1.get_legend_handles_labels() lines2, labels2 = ax2.get_legend_handles_labels() ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left", fontsize=8) ax1.set_title("Inundated area, volume and evaluation-point water level") fig.tight_layout() fig.savefig(out / "floodstream_04_area_volume_waterlevel.png", dpi=180) plt.close(fig) # 既存ページとの互換用。従来のSVG図とは別にPNGも保存する。 fig, ax = plt.subplots(figsize=(9, 5)) ax.imshow(result["dem"], origin="upper", cmap="terrain") ax.set_title("Synthetic DEM with river and levee") _axis_in_meters(ax, cfg) fig.colorbar(ax.images[0], ax=ax, label="elevation (m)") fig.tight_layout() fig.savefig(out / "floodstream_dem.png", dpi=160) plt.close(fig) fig, ax = plt.subplots(figsize=(9, 5)) ax.imshow(result["max_depth"], origin="upper", cmap="Blues", vmin=0.0) ax.set_title("Maximum flood depth") _axis_in_meters(ax, cfg) fig.colorbar(ax.images[0], ax=ax, label="maximum inundation depth (m)") fig.tight_layout() fig.savefig(out / "floodstream_max_depth.png", dpi=160) plt.close(fig) def main() -> None: cfg = FloodConfig() result = run_flood_simulation(cfg) save_figures(result, cfg) print("計算完了") print(f"最大浸水深: {result['max_depth'].max():.3f} m") print(f"最終浸水セル数: {(result['final_depth'] > cfg.inundation_threshold_m).sum()} cells") print(f"出力先: {cfg.output_dir}") if __name__ == "__main__": main()