#!/usr/bin/env python # -*- coding: utf-8 -*- """ 1次元・混合粒径5区分・実務拡張版 河床変動シミュレーション 追加機能: 1. 等流 / 簡易不等流(標準逐次法)の切替 2. 流量ハイドログラフCSV入力 3. 実測横断風の任意断面CSV入力 4. MPM式 / 芦田・道上型掃流砂量式の切替 5. 交換層・下層材料・岩盤高・最大洗掘深の考慮 このコードはデモ・技術検討用です。 実務適用では観測データによる検証とキャリブレーションを行ってください。 """ from __future__ import annotations from dataclasses import dataclass from pathlib import Path import argparse import numpy as np import pandas as pd import matplotlib.pyplot as plt from scipy.optimize import brentq @dataclass class ModelParams: dx_m: float = 100.0 rho_w: float = 1000.0 rho_s: float = 2650.0 g: float = 9.80665 porosity: float = 0.40 # 5粒径区分[m]: 0.5mm, 2mm, 5mm, 20mm, 50mm grain_d_m: tuple[float, ...] = (0.0005, 0.002, 0.005, 0.020, 0.050) # "ashida_michiue" or "mpm" bedload_formula: str = "ashida_michiue" theta_cr: float = 0.047 mpm_alpha: float = 8.0 ashida_alpha: float = 17.0 active_layer_m: float = 0.30 dt_s: float = 1800.0 total_time_s: float = 48.0 * 3600.0 morph_factor: float = 3.0 min_slope: float = 1.0e-5 max_slope: float = 0.04 max_dz_per_step_m: float = 0.04 upstream_supply_factor: float = 0.85 # "normal" or "gvf" hydraulic_mode: str = "gvf" downstream_depth_factor: float = 1.0 def normalize_fractions(F: np.ndarray) -> np.ndarray: F = np.maximum(F, 0.0) s = F.sum(axis=1, keepdims=True) s[s <= 0.0] = 1.0 return F / s # ============================================================ # サンプル入力作成 # ============================================================ def make_sample_channel(path: Path, n_points: int = 61, dx_m: float = 100.0) -> None: x = np.arange(n_points, dtype=float) * dx_m bed_z = 100.0 - x / 850.0 bed_z += 0.45 * np.exp(-((x - 2200.0) / 700.0) ** 2) bed_z -= 0.30 * np.exp(-((x - 4600.0) / 650.0) ** 2) manning_n = 0.034 + 0.003 * np.sin(2.0 * np.pi * x / 3200.0) f1 = 0.08 + 0.14 * (x / x.max()) f2 = 0.22 + 0.08 * (x / x.max()) f3 = 0.34 - 0.03 * (x / x.max()) f4 = 0.24 - 0.11 * (x / x.max()) f5 = 1.0 - (f1 + f2 + f3 + f4) F = normalize_fractions(np.vstack([f1, f2, f3, f4, f5]).T) # 下層は表層より粗い想定 SF = normalize_fractions( np.column_stack([ F[:, 0] * 0.65, F[:, 1] * 0.85, F[:, 2] * 1.05, F[:, 3] * 1.20, F[:, 4] * 1.45, ]) ) bedrock_z = bed_z - (1.0 + 0.5 * np.sin(2.0 * np.pi * x / 3800.0)) max_scour = np.maximum(0.45, 0.8 + 0.25 * np.cos(2.0 * np.pi * x / 3300.0)) df = pd.DataFrame({ "x_m": x, "bed_z_m": bed_z, "manning_n": manning_n, "f1": F[:, 0], "f2": F[:, 1], "f3": F[:, 2], "f4": F[:, 3], "f5": F[:, 4], "sf1": SF[:, 0], "sf2": SF[:, 1], "sf3": SF[:, 2], "sf4": SF[:, 3], "sf5": SF[:, 4], "bedrock_z_m": bedrock_z, "max_scour_m": max_scour, }) df.to_csv(path, index=False, encoding="utf-8-sig") def make_sample_hydrograph(path: Path) -> None: time_h = np.arange(0.0, 48.0 + 1.0, 1.0) q = 70.0 + 500.0 * np.exp(-0.5 * ((time_h - 18.0) / 6.0) ** 2) q += 110.0 * np.exp(-0.5 * ((time_h - 34.0) / 4.5) ** 2) q = np.maximum(q, 50.0) pd.DataFrame({"time_h": time_h, "Q_m3s": q}).to_csv(path, index=False, encoding="utf-8-sig") def make_sample_cross_sections(path: Path, channel_csv: Path) -> None: ch = pd.read_csv(channel_csv) rows = [] for _, r in ch.iterrows(): x = float(r["x_m"]) low_width = 24.0 + 4.0 * np.sin(2.0 * np.pi * x / 3000.0) flood_width = 66.0 + 7.0 * np.sin(2.0 * np.pi * x / 4400.0) bank_h = 2.2 + 0.25 * np.cos(2.0 * np.pi * x / 3600.0) pts = [ (-flood_width / 2.0, bank_h + 2.2), (-flood_width / 2.0 + 5.0, bank_h + 0.6), (-low_width / 2.0 - 6.0, bank_h), (-low_width / 2.0, 0.35), (-low_width / 5.0, 0.05), (0.0, 0.0), (low_width / 5.0, 0.06), (low_width / 2.0, 0.38), (low_width / 2.0 + 6.0, bank_h), (flood_width / 2.0 - 5.0, bank_h + 0.7), (flood_width / 2.0, bank_h + 2.1), ] for offset, rel in pts: rows.append({"x_m": x, "offset_m": offset, "rel_elev_m": rel}) pd.DataFrame(rows).to_csv(path, index=False, encoding="utf-8-sig") # ============================================================ # 入力読み込み # ============================================================ def load_hydrograph(path: Path): df = pd.read_csv(path) if "time_h" not in df.columns or "Q_m3s" not in df.columns: raise ValueError("hydrograph.csv needs columns: time_h, Q_m3s") t_s = df["time_h"].to_numpy(float) * 3600.0 q = df["Q_m3s"].to_numpy(float) def q_at(t: float) -> float: return float(np.interp(t, t_s, q, left=q[0], right=q[-1])) return q_at, float(t_s[-1]) def load_cross_sections(path: Path) -> dict[float, tuple[np.ndarray, np.ndarray]]: df = pd.read_csv(path) required = {"x_m", "offset_m", "rel_elev_m"} if not required.issubset(df.columns): raise ValueError("cross_sections.csv needs columns: x_m, offset_m, rel_elev_m") sections = {} for x, g in df.groupby("x_m"): gg = g.sort_values("offset_m") sections[float(x)] = ( gg["offset_m"].to_numpy(float), gg["rel_elev_m"].to_numpy(float), ) return sections # ============================================================ # 横断面水理 # ============================================================ def section_properties(offset: np.ndarray, rel_z: np.ndarray, depth: float) -> dict[str, float]: """ 任意横断形状に対して、水面高depth[m]での断面特性を求める。 depthは中央河床を0とした相対水位。 """ hwater = float(depth) area = 0.0 wetted_perimeter = 0.0 wet_segments = [] for i in range(len(offset) - 1): y1, y2 = float(offset[i]), float(offset[i + 1]) z1, z2 = float(rel_z[i]), float(rel_z[i + 1]) dy = y2 - y1 dz = z2 - z1 d1 = hwater - z1 d2 = hwater - z2 if d1 <= 0.0 and d2 <= 0.0: continue if d1 > 0.0 and d2 > 0.0: area += 0.5 * (d1 + d2) * abs(dy) wetted_perimeter += float(np.hypot(dy, dz)) wet_segments.append((min(y1, y2), max(y1, y2))) else: # 水面との交点 if abs(d2 - d1) < 1.0e-12: continue r = d1 / (d1 - d2) r = min(max(r, 0.0), 1.0) yc = y1 + r * dy zc = z1 + r * dz if d1 > 0.0: area += 0.5 * d1 * abs(yc - y1) wetted_perimeter += float(np.hypot(yc - y1, zc - z1)) wet_segments.append((min(y1, yc), max(y1, yc))) else: area += 0.5 * d2 * abs(y2 - yc) wetted_perimeter += float(np.hypot(y2 - yc, z2 - zc)) wet_segments.append((min(yc, y2), max(yc, y2))) if wet_segments: top_width = max(b for _, b in wet_segments) - min(a for a, _ in wet_segments) else: top_width = 0.0 radius = area / wetted_perimeter if wetted_perimeter > 0.0 else 0.0 return { "area_m2": area, "wetted_perimeter_m": wetted_perimeter, "hydraulic_radius_m": radius, "top_width_m": top_width, } def conveyance_q(depth: float, offset: np.ndarray, rel_z: np.ndarray, n: float, slope: float) -> float: prop = section_properties(offset, rel_z, depth) A = prop["area_m2"] R = prop["hydraulic_radius_m"] if A <= 0.0 or R <= 0.0: return 0.0 return (1.0 / n) * A * (R ** (2.0 / 3.0)) * np.sqrt(max(slope, 1.0e-9)) def normal_depth(Q: float, offset: np.ndarray, rel_z: np.ndarray, n: float, slope: float) -> float: Q = max(float(Q), 1.0e-9) low = max(1.0e-4, -float(np.min(rel_z)) + 1.0e-4) high = max(3.0, float(np.max(rel_z)) + 1.0) def f(h: float) -> float: return conveyance_q(h, offset, rel_z, n, slope) - Q while f(high) < 0.0: high *= 1.4 if high > 120.0: raise RuntimeError("normal depth search failed") return float(brentq(f, low, high, maxiter=80)) def bed_slope(x: np.ndarray, z: np.ndarray, p: ModelParams) -> np.ndarray: dzdx = np.gradient(z, x) S0 = -dzdx return np.clip(S0, p.min_slope, p.max_slope) def values_for_depth(Q: float, depth: float, offset: np.ndarray, rel_z: np.ndarray, n: float, p: ModelParams) -> dict[str, float]: prop = section_properties(offset, rel_z, depth) A = max(prop["area_m2"], 1.0e-9) R = max(prop["hydraulic_radius_m"], 1.0e-9) V = Q / A Sf = (Q * n / (A * (R ** (2.0 / 3.0)))) ** 2 Fr = V / np.sqrt(p.g * max(depth, 1.0e-9)) return { **prop, "depth_m": depth, "velocity_mps": V, "fr": Fr, "friction_slope": Sf, } def arrays_from_depths(x, bed_z, sections, manning_n, Q, depth, p): nsta = len(x) area = np.zeros(nsta) radius = np.zeros(nsta) top_width = np.zeros(nsta) velocity = np.zeros(nsta) Sf = np.zeros(nsta) Fr = np.zeros(nsta) for i in range(nsta): off, rz = sections[float(x[i])] val = values_for_depth(Q, depth[i], off, rz, manning_n[i], p) area[i] = val["area_m2"] radius[i] = val["hydraulic_radius_m"] top_width[i] = val["top_width_m"] velocity[i] = val["velocity_mps"] Sf[i] = val["friction_slope"] Fr[i] = val["fr"] tau = p.rho_w * p.g * radius * Sf water_z = bed_z + depth return { "depth_m": depth, "water_z_m": water_z, "area_m2": area, "hydraulic_radius_m": radius, "top_width_m": top_width, "velocity_mps": velocity, "friction_slope": Sf, "fr": Fr, "tau_pa": tau, } def solve_normal_profile(x, bed_z, sections, manning_n, Q, p): S0 = bed_slope(x, bed_z, p) depth = np.zeros_like(x, dtype=float) for i in range(len(x)): off, rz = sections[float(x[i])] depth[i] = normal_depth(Q, off, rz, manning_n[i], S0[i]) return arrays_from_depths(x, bed_z, sections, manning_n, Q, depth, p) def solve_gvf_profile(x, bed_z, sections, manning_n, Q, p): """ 簡易不等流。 下流端は等流水深、上流側は標準逐次法で解く。 解が不安定な場合は当該断面のみ等流水深へフォールバックする。 """ nsta = len(x) S0 = bed_slope(x, bed_z, p) h_normal = np.zeros(nsta) for i in range(nsta): off, rz = sections[float(x[i])] h_normal[i] = normal_depth(Q, off, rz, manning_n[i], S0[i]) depth = np.zeros(nsta) depth[-1] = h_normal[-1] * p.downstream_depth_factor off_d, rz_d = sections[float(x[-1])] val_down = values_for_depth(Q, depth[-1], off_d, rz_d, manning_n[-1], p) for i in range(nsta - 2, -1, -1): dx = abs(x[i + 1] - x[i]) off, rz = sections[float(x[i])] z_down = bed_z[i + 1] z_here = bed_z[i] H_down = z_down + depth[i + 1] + val_down["velocity_mps"] ** 2 / (2.0 * p.g) Sf_down = val_down["friction_slope"] def residual(h): val = values_for_depth(Q, h, off, rz, manning_n[i], p) H_here = z_here + h + val["velocity_mps"] ** 2 / (2.0 * p.g) loss = 0.5 * (val["friction_slope"] + Sf_down) * dx return H_here - H_down - loss hmin = max(1.0e-4, -float(np.min(rz)) + 1.0e-4) hmax = max(8.0, float(np.max(rz)) + 3.0, h_normal[i] * 2.5) grid = np.geomspace(hmin, hmax, 24) vals = [residual(h) for h in grid] root = None for j in range(len(grid) - 1): if np.isfinite(vals[j]) and np.isfinite(vals[j + 1]) and vals[j] * vals[j + 1] < 0.0: try: root = brentq(residual, grid[j], grid[j + 1], maxiter=60) break except ValueError: pass if root is None: root = h_normal[i] depth[i] = float(root) val_down = values_for_depth(Q, depth[i], off, rz, manning_n[i], p) return arrays_from_depths(x, bed_z, sections, manning_n, Q, depth, p) def solve_hydraulics(x, bed_z, sections, manning_n, Q, p): if p.hydraulic_mode == "normal": return solve_normal_profile(x, bed_z, sections, manning_n, Q, p) if p.hydraulic_mode == "gvf": return solve_gvf_profile(x, bed_z, sections, manning_n, Q, p) raise ValueError("hydraulic_mode must be normal or gvf") # ============================================================ # 流砂量 # ============================================================ def bedload_by_class(tau: np.ndarray, F: np.ndarray, p: ModelParams) -> np.ndarray: D = np.array(p.grain_d_m, dtype=float) s_rel = p.rho_s / p.rho_w theta = tau[:, None] / ((p.rho_s - p.rho_w) * p.g * D[None, :]) theta_c = p.theta_cr if p.bedload_formula == "mpm": excess = np.maximum(theta - theta_c, 0.0) phi = p.mpm_alpha * excess ** 1.5 elif p.bedload_formula == "ashida_michiue": phi = np.zeros_like(theta) mask = theta > theta_c ratio = np.zeros_like(theta) ratio[mask] = theta_c / theta[mask] phi[mask] = ( p.ashida_alpha * theta[mask] ** 1.5 * (1.0 - ratio[mask]) * (1.0 - np.sqrt(ratio[mask])) ) else: raise ValueError("bedload_formula must be mpm or ashida_michiue") qb_unit = F * phi * np.sqrt((s_rel - 1.0) * p.g * D[None, :] ** 3) return np.maximum(qb_unit, 0.0) def d50_from_fraction(F: np.ndarray, D: np.ndarray) -> np.ndarray: order = np.argsort(D) Ds = D[order] out = np.zeros(F.shape[0]) for i in range(F.shape[0]): Fs = F[i, order] cum = np.cumsum(Fs) if cum[0] >= 0.5: out[i] = Ds[0] else: out[i] = np.interp(0.5, cum, Ds) return out def update_active_layer(F, dz, class_flux_faces, substrate_F, p): Fnew = F.copy() La = max(p.active_layer_m, 1.0e-6) for i in range(F.shape[0]): if dz[i] > 0.0: r = min(dz[i] / La, 1.0) incoming = class_flux_faces[i, :] s = incoming.sum() dep_comp = incoming / s if s > 0.0 else F[i] Fnew[i] = (1.0 - r) * F[i] + r * dep_comp elif dz[i] < 0.0: r = min((-dz[i]) / La, 1.0) Fnew[i] = (1.0 - r) * F[i] + r * substrate_F[i] return normalize_fractions(Fnew) # ============================================================ # メイン計算 # ============================================================ def run_simulation(channel_csv, hydrograph_csv, cross_section_csv, out_dir, p): out_dir.mkdir(parents=True, exist_ok=True) ch = pd.read_csv(channel_csv) required = ["x_m", "bed_z_m", "manning_n", "f1", "f2", "f3", "f4", "f5"] missing = [c for c in required if c not in ch.columns] if missing: raise ValueError(f"channel CSV missing columns: {missing}") x = ch["x_m"].to_numpy(float) z0 = ch["bed_z_m"].to_numpy(float) z = z0.copy() manning_n = ch["manning_n"].to_numpy(float) F = normalize_fractions(ch[["f1", "f2", "f3", "f4", "f5"]].to_numpy(float)) if all(c in ch.columns for c in ["sf1", "sf2", "sf3", "sf4", "sf5"]): substrate_F = normalize_fractions(ch[["sf1", "sf2", "sf3", "sf4", "sf5"]].to_numpy(float)) else: substrate_F = F.copy() bedrock_z = ch["bedrock_z_m"].to_numpy(float) if "bedrock_z_m" in ch.columns else np.full_like(z, -1e9) max_scour = ch["max_scour_m"].to_numpy(float) if "max_scour_m" in ch.columns else np.full_like(z, 1e9) min_bed_z = np.maximum(bedrock_z, z0 - max_scour) sections = load_cross_sections(cross_section_csv) q_at, hydro_end_s = load_hydrograph(hydrograph_csv) if p.total_time_s <= 0.0: p.total_time_s = hydro_end_s nsteps = int(np.ceil(p.total_time_s / p.dt_s)) D = np.array(p.grain_d_m, dtype=float) summary = [] snapshots = {} for step in range(nsteps + 1): t = min(step * p.dt_s, p.total_time_s) Q = q_at(t) hyd = solve_hydraulics(x, z, sections, manning_n, Q, p) qb_unit = bedload_by_class(hyd["tau_pa"], F, p) width_eff = np.maximum(hyd["top_width_m"], 1.0) class_Qb = qb_unit * width_eff[:, None] total_Qb = class_Qb.sum(axis=1) if step < nsteps: flux_faces = np.zeros(len(x) + 1) class_flux_faces = np.zeros((len(x) + 1, F.shape[1])) flux_faces[0] = p.upstream_supply_factor * total_Qb[0] class_flux_faces[0, :] = p.upstream_supply_factor * class_Qb[0, :] flux_faces[1:] = total_Qb class_flux_faces[1:, :] = class_Qb dQbdx = (flux_faces[1:] - flux_faces[:-1]) / p.dx_m dzdt = -(1.0 / (1.0 - p.porosity)) * dQbdx / width_eff dz = dzdt * p.dt_s * p.morph_factor dz = np.clip(dz, -p.max_dz_per_step_m, p.max_dz_per_step_m) z_trial = z + dz z_limited = np.maximum(z_trial, min_bed_z) dz_effective = z_limited - z z = z_limited F = update_active_layer(F, dz_effective, class_flux_faces, substrate_F, p) if abs(t / 3600.0 - 18.0) <= p.dt_s / 7200.0: snapshots["peak"] = { "time_h": t / 3600.0, "Q_m3s": Q, "bed_z_m": z.copy(), "water_z_m": hyd["water_z_m"].copy(), "depth_m": hyd["depth_m"].copy(), "velocity_mps": hyd["velocity_mps"].copy(), } if step % max(1, nsteps // 24) == 0 or step == nsteps: d50 = d50_from_fraction(F, D) summary.append({ "time_h": t / 3600.0, "Q_m3s": Q, "mean_depth_m": float(np.mean(hyd["depth_m"])), "mean_velocity_mps": float(np.mean(hyd["velocity_mps"])), "mean_friction_slope": float(np.mean(hyd["friction_slope"])), "mean_bed_change_m": float(np.mean(z - z0)), "min_bed_change_m": float(np.min(z - z0)), "max_bed_change_m": float(np.max(z - z0)), "mean_D50_mm": float(np.mean(d50 * 1000.0)), "total_bedload_m3s": float(np.sum(total_Qb)), }) Q_final = q_at(p.total_time_s) hyd_final = solve_hydraulics(x, z, sections, manning_n, Q_final, p) d50 = d50_from_fraction(F, D) pd.DataFrame({ "x_m": x, "initial_bed_z_m": z0, "final_bed_z_m": z, "bed_change_m": z - z0, "bedrock_z_m": bedrock_z, "min_allowed_bed_z_m": min_bed_z, "final_water_z_m": hyd_final["water_z_m"], "final_depth_m": hyd_final["depth_m"], "final_velocity_mps": hyd_final["velocity_mps"], "final_friction_slope": hyd_final["friction_slope"], "final_tau_pa": hyd_final["tau_pa"], "final_D50_mm": d50 * 1000.0, "f1_final": F[:, 0], "f2_final": F[:, 1], "f3_final": F[:, 2], "f4_final": F[:, 3], "f5_final": F[:, 4], }).to_csv(out_dir / "final_profile.csv", index=False, encoding="utf-8-sig") pd.DataFrame(summary).to_csv(out_dir / "time_series_summary.csv", index=False, encoding="utf-8-sig") if "peak" in snapshots: pd.DataFrame({ "x_m": x, "peak_bed_z_m": snapshots["peak"]["bed_z_m"], "peak_water_z_m": snapshots["peak"]["water_z_m"], "peak_depth_m": snapshots["peak"]["depth_m"], "peak_velocity_mps": snapshots["peak"]["velocity_mps"], }).to_csv(out_dir / "peak_water_profile.csv", index=False, encoding="utf-8-sig") # 図 plt.figure(figsize=(10, 5)) plt.plot(x / 1000.0, z0, label="Initial bed") plt.plot(x / 1000.0, z, label="Final bed") plt.plot(x / 1000.0, bedrock_z, linestyle="--", label="Bedrock") if "peak" in snapshots: plt.plot(x / 1000.0, snapshots["peak"]["water_z_m"], label="Water surface at peak") plt.xlabel("Distance downstream [km]") plt.ylabel("Elevation [m]") plt.title("1D bed and water-surface profile") plt.grid(True) plt.legend() plt.tight_layout() plt.savefig(out_dir / "01_bed_and_water_profile.png", dpi=170) plt.close() plt.figure(figsize=(10, 4)) plt.plot(x / 1000.0, z - z0) plt.axhline(0.0, linewidth=0.8) plt.xlabel("Distance downstream [km]") plt.ylabel("Bed change [m]") plt.title("Bed aggradation / degradation") plt.grid(True) plt.tight_layout() plt.savefig(out_dir / "02_bed_change.png", dpi=170) plt.close() plt.figure(figsize=(10, 5)) labels = ["0.5 mm", "2 mm", "5 mm", "20 mm", "50 mm"] for j in range(5): plt.plot(x / 1000.0, F[:, j], label=labels[j]) plt.xlabel("Distance downstream [km]") plt.ylabel("Fraction [-]") plt.title("Final active-layer grain fractions") plt.grid(True) plt.legend(ncol=3) plt.tight_layout() plt.savefig(out_dir / "03_grain_fraction_final.png", dpi=170) plt.close() plt.figure(figsize=(10, 4)) plt.plot(x / 1000.0, d50 * 1000.0) plt.xlabel("Distance downstream [km]") plt.ylabel("D50 [mm]") plt.title("Final D50 profile") plt.grid(True) plt.tight_layout() plt.savefig(out_dir / "04_d50_profile.png", dpi=170) plt.close() hdf = pd.read_csv(hydrograph_csv) plt.figure(figsize=(10, 4)) plt.plot(hdf["time_h"], hdf["Q_m3s"]) plt.xlabel("Time [h]") plt.ylabel("Discharge [m3/s]") plt.title("Input hydrograph") plt.grid(True) plt.tight_layout() plt.savefig(out_dir / "05_hydrograph.png", dpi=170) plt.close() sample_x = float(x[len(x) // 2]) off, rz = sections[sample_x] plt.figure(figsize=(8, 4)) plt.plot(off, rz) plt.xlabel("Offset [m]") plt.ylabel("Relative elevation [m]") plt.title(f"Sample cross section at x = {sample_x:.0f} m") plt.grid(True) plt.tight_layout() plt.savefig(out_dir / "06_sample_cross_section.png", dpi=170) plt.close() print(f"Done. Outputs: {out_dir}") def main(): parser = argparse.ArgumentParser() parser.add_argument("--channel", default="channel_100m.csv") parser.add_argument("--hydrograph", default="hydrograph.csv") parser.add_argument("--cross-sections", default="cross_sections.csv") parser.add_argument("--out", default="outputs") parser.add_argument("--make-sample", action="store_true") parser.add_argument("--hydraulic-mode", choices=["normal", "gvf"], default="gvf") parser.add_argument("--bedload-formula", choices=["mpm", "ashida_michiue"], default="ashida_michiue") parser.add_argument("--hours", type=float, default=48.0) parser.add_argument("--dt", type=float, default=1800.0) parser.add_argument("--morph-factor", type=float, default=3.0) args = parser.parse_args() p = ModelParams( hydraulic_mode=args.hydraulic_mode, bedload_formula=args.bedload_formula, total_time_s=args.hours * 3600.0, dt_s=args.dt, morph_factor=args.morph_factor, ) channel = Path(args.channel) hydrograph = Path(args.hydrograph) cross_sections = Path(args.cross_sections) if args.make_sample or not channel.exists(): make_sample_channel(channel, dx_m=p.dx_m) print(f"created: {channel}") if args.make_sample or not hydrograph.exists(): make_sample_hydrograph(hydrograph) print(f"created: {hydrograph}") if args.make_sample or not cross_sections.exists(): make_sample_cross_sections(cross_sections, channel) print(f"created: {cross_sections}") run_simulation(channel, hydrograph, cross_sections, Path(args.out), p) if __name__ == "__main__": main()