""" 斜面の雨水流出解析サンプル: 分布型キネマティックウェーブ法 =============================================================== このスクリプトは、demo.godo-tys.jp/software/kinema/ に掲載するための 軽量なPythonサンプルです。 目的 ---- - DEM格子上で、降雨から斜面流出を概略計算する流れを示す - 斜面流を Manning 型のキネマティックウェーブ式で近似する - 各セルから下流セルへ水量をルーティングし、流域出口のハイドログラフを得る 注意 ---- このコードは教育・デモ用の簡易版です。実務適用では、 河道追跡、損失雨量、飽和・不飽和浸透、河道断面、境界条件、観測流量による検証が必要です。 """ from __future__ import annotations from dataclasses import dataclass from pathlib import Path import numpy as np try: import matplotlib.pyplot as plt except ImportError: # matplotlibがない環境でも計算部分は読めるようにする plt = None @dataclass class KinematicWaveConfig: nx: int = 50 # x方向セル数 ny: int = 40 # y方向セル数 dx: float = 10.0 # セルサイズ m dt: float = 10.0 # 時間刻み s total_time: float = 1.5 * 3600 # 計算時間 s manning_n: float = 0.08 # 斜面流の粗度係数 min_slope: float = 1.0e-4 # 最小勾配 infiltration_mm_h: float = 5.0 output_dir: str = "output_kinema" def make_synthetic_dem(cfg: KinematicWaveConfig) -> np.ndarray: """下流側へ傾く斜面と浅い谷を持つデモ用DEMを作成する。""" y, x = np.mgrid[0 : cfg.ny, 0 : cfg.nx] # 左上から右下へ流れるように全体勾配を与える dem = 120.0 - 0.55 * x - 0.28 * y # 中央付近に浅い谷を作る valley_center = cfg.ny * 0.52 + 8.0 * np.sin(x / 12.0) valley = np.exp(-((y - valley_center) ** 2) / (2 * 6.0**2)) dem -= 2.5 * valley # 微地形の凹凸 dem += 0.25 * np.sin(x / 7.0) * np.cos(y / 9.0) return dem def rainfall_mm_h(t: float) -> float: """三角形に近い簡易降雨ハイエトグラフを返す。""" hour = t / 3600.0 if hour < 0.25: return 10.0 if hour < 0.75: return 35.0 if hour < 1.25: return 75.0 if hour < 1.75: return 45.0 if hour < 2.25: return 20.0 return 0.0 def compute_flow_receiver(dem: np.ndarray, dx: float, min_slope: float) -> tuple[np.ndarray, np.ndarray]: """ D8近傍のうち最も低いセルを流下先として求める。 Returns ------- receiver : ndarray of int 各セルの流下先セル番号。出口セルは自分自身を指す。 slope : ndarray of float 各セルから流下先への勾配。 """ ny, nx = dem.shape receiver = np.arange(nx * ny, dtype=int).reshape(ny, nx) slope = np.full((ny, nx), min_slope, dtype=float) directions = [ (-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1), ] for r in range(ny): for c in range(nx): z0 = dem[r, c] best_drop = 0.0 best_rc = (r, c) best_dist = dx for dr, dc in directions: rr = r + dr cc = c + dc if rr < 0 or rr >= ny or cc < 0 or cc >= nx: continue dist = dx * np.sqrt(2.0) if dr != 0 and dc != 0 else dx drop = z0 - dem[rr, cc] if drop > best_drop: best_drop = drop best_rc = (rr, cc) best_dist = dist receiver[r, c] = best_rc[0] * nx + best_rc[1] slope[r, c] = max(best_drop / best_dist, min_slope) return receiver.ravel(), slope def run_kinematic_wave(cfg: KinematicWaveConfig): """分布型キネマティックウェーブ法の簡易計算を実行する。""" dem = make_synthetic_dem(cfg) receiver, slope = compute_flow_receiver(dem, cfg.dx, cfg.min_slope) ny, nx = dem.shape area = cfg.dx * cfg.dx n_nodes = nx * ny # 表面水深 m h = np.zeros(n_nodes, dtype=float) # 流域出口は最も標高が低いセルとする outlet = int(np.argmin(dem)) times = [] outlet_q = [] rain_series = [] n_steps = int(cfg.total_time / cfg.dt) for step in range(n_steps): t = step * cfg.dt rain = rainfall_mm_h(t) loss = cfg.infiltration_mm_h effective_rain = max(rain - loss, 0.0) / 1000.0 / 3600.0 # m/s # 有効降雨を各セルに加える h += effective_rain * cfg.dt # Manning型の単位幅流量 q = 1/n * h^(5/3) * sqrt(S) # セル幅を掛けて流量Qに換算する q_unit = (1.0 / cfg.manning_n) * np.maximum(h, 0.0) ** (5.0 / 3.0) * np.sqrt(slope.ravel()) q = q_unit * cfg.dx # m3/s # 1ステップでセルの水量以上を流出させない storage = h * area outflow_volume = np.minimum(q * cfg.dt, storage * 0.75) h -= outflow_volume / area inflow_volume = np.zeros(n_nodes, dtype=float) movable = receiver != np.arange(n_nodes) np.add.at(inflow_volume, receiver[movable], outflow_volume[movable]) h += inflow_volume / area times.append(t / 3600.0) outlet_q.append(outflow_volume[outlet] / cfg.dt) rain_series.append(rain) return { "dem": dem, "final_depth": h.reshape(ny, nx), "time_h": np.asarray(times), "outlet_q": np.asarray(outlet_q), "rain_mm_h": np.asarray(rain_series), "outlet_index": outlet, } def save_figures(result: dict, cfg: KinematicWaveConfig) -> None: """計算結果をPNGとして保存する。""" if plt is None: print("matplotlibがないため、図の保存はスキップします。") return out = Path(cfg.output_dir) out.mkdir(parents=True, exist_ok=True) plt.figure(figsize=(8, 5)) plt.imshow(result["final_depth"], origin="upper") plt.colorbar(label="surface water depth (m)") plt.title("Final surface water depth") plt.tight_layout() plt.savefig(out / "kinema_final_depth.png", dpi=160) plt.close() fig, ax1 = plt.subplots(figsize=(8, 5)) ax1.plot(result["time_h"], result["outlet_q"], linewidth=2) ax1.set_xlabel("time (hour)") ax1.set_ylabel("outlet discharge (m3/s)") ax2 = ax1.twinx() ax2.bar(result["time_h"], result["rain_mm_h"], width=0.03, alpha=0.35) ax2.set_ylabel("rainfall (mm/h)") ax2.invert_yaxis() plt.title("Rainfall and outlet hydrograph") fig.tight_layout() fig.savefig(out / "kinema_hydrograph.png", dpi=160) plt.close(fig) def main() -> None: cfg = KinematicWaveConfig() result = run_kinematic_wave(cfg) save_figures(result, cfg) print("計算完了") print(f"最大水深: {result['final_depth'].max():.3f} m") print(f"最大出口流量: {result['outlet_q'].max():.3f} m3/s") print(f"出力先: {cfg.output_dir}") if __name__ == "__main__": main()