#!/usr/bin/env python # -*- coding: utf-8 -*- """ cequalw2_python_workflow_sample.py CE-QUAL-W2 + Python によるダム貯水池 鉛直2次元富栄養化モデルの 前処理・実行管理・後処理を行うためのサンプルです。 このコードはデモサイト掲載用のテンプレートです。 CE-QUAL-W2本体の実行ファイルや実ダムの入力ファイルは同梱していません。 想定する処理: 1. 流入量・水温・水質・気象データをPythonで整形 2. CE-QUAL-W2入力ファイルのテンプレートをケース別に生成 3. CE-QUAL-W2実行ファイルをsubprocessで呼び出し 4. 出力結果をPythonで読み込み、観測値との比較・図化・シナリオ集計を実施 """ from __future__ import annotations import json import subprocess from dataclasses import dataclass from pathlib import Path import numpy as np import pandas as pd @dataclass class W2CaseConfig: """CE-QUAL-W2計算ケースの基本設定。""" case_name: str start_date: str end_date: str tp_load_factor: float = 1.0 tn_load_factor: float = 1.0 selective_withdrawal: bool = False aeration: bool = False def prepare_timeseries(raw_csv: Path, output_csv: Path) -> pd.DataFrame: """ 流入・気象・水質時系列をCE-QUAL-W2用に整形する。 実務では、欠測補間、単位変換、日平均/時間平均化、 流入負荷量の計算などをここで行います。 """ df = pd.read_csv(raw_csv, parse_dates=["datetime"]) df = df.sort_values("datetime").set_index("datetime") df = df.interpolate(limit_direction="both") # 例: T-P濃度 mg/L と流量 m3/s から日負荷 kg/day を概算 if {"flow_m3s", "tp_mgL"}.issubset(df.columns): df["tp_load_kg_day"] = df["flow_m3s"] * df["tp_mgL"] * 86.4 output_csv.parent.mkdir(parents=True, exist_ok=True) df.to_csv(output_csv) return df def render_template(template_path: Path, output_path: Path, values: dict) -> None: """ CE-QUAL-W2入力ファイルをテンプレートから生成する。 テンプレート内の {{ KEY }} を values[KEY] で置換します。 """ text = template_path.read_text(encoding="utf-8") for key, value in values.items(): text = text.replace(f"{{{{ {key} }}}}", str(value)) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(text, encoding="utf-8") def build_case(case: W2CaseConfig, templates_dir: Path, run_root: Path) -> Path: """ケースフォルダを作成し、CE-QUAL-W2入力ファイルを生成する。""" case_dir = run_root / case.case_name case_dir.mkdir(parents=True, exist_ok=True) values = { "CASE_NAME": case.case_name, "START_DATE": case.start_date, "END_DATE": case.end_date, "TP_LOAD_FACTOR": case.tp_load_factor, "TN_LOAD_FACTOR": case.tn_load_factor, "SELECTIVE_WITHDRAWAL": int(case.selective_withdrawal), "AERATION": int(case.aeration), } # 実務では control.npt, w2_con.npt, bathymetry, inflow, meteorology 等を生成 for template in templates_dir.glob("*.template"): output_name = template.name.replace(".template", "") render_template(template, case_dir / output_name, values) (case_dir / "case_config.json").write_text( json.dumps(values, indent=2, ensure_ascii=False), encoding="utf-8", ) return case_dir def run_cequalw2(case_dir: Path, exe_path: Path) -> None: """ CE-QUAL-W2を実行する。 Windowsの場合は w2_v45.exe など、Linuxの場合はビルド済み実行ファイルを指定します。 """ result = subprocess.run( [str(exe_path)], cwd=case_dir, text=True, capture_output=True, check=False, ) (case_dir / "stdout.log").write_text(result.stdout, encoding="utf-8") (case_dir / "stderr.log").write_text(result.stderr, encoding="utf-8") if result.returncode != 0: raise RuntimeError(f"CE-QUAL-W2 failed in {case_dir}: returncode={result.returncode}") def read_demo_output(case_dir: Path) -> pd.DataFrame: """ CE-QUAL-W2出力を読み込む処理のダミー例。 実務では、CE-QUAL-W2のCSV/ASCII/NetCDF相当の出力形式に合わせて 専用パーサーを作成します。 """ days = np.arange(1, 181) cfg = json.loads((case_dir / "case_config.json").read_text(encoding="utf-8")) base_chla = 8 + 28 * np.exp(-((days - 84) / 22) ** 2) + 18 * np.exp(-((days - 132) / 18) ** 2) chla = base_chla * (0.65 + 0.35 * float(cfg["TP_LOAD_FACTOR"])) if cfg["SELECTIVE_WITHDRAWAL"]: chla *= 0.90 bottom_do = 6.5 - 5.1 / (1 + np.exp(-(days - 82) / 15)) + 2.2 / (1 + np.exp(-(days - 155) / 9)) if cfg["AERATION"]: bottom_do += 1.7 return pd.DataFrame( { "day": days, "surface_chla_ugL": chla, "bottom_do_mgL": bottom_do, } ) def summarize_cases(case_dirs: list[Path], output_csv: Path) -> pd.DataFrame: """複数ケースのピークChl-a、底層DO最小値などを集計する。""" records = [] for case_dir in case_dirs: df = read_demo_output(case_dir) records.append( { "case": case_dir.name, "peak_chla_ugL": df["surface_chla_ugL"].max(), "min_bottom_do_mgL": df["bottom_do_mgL"].min(), "hypoxia_days_do_lt_2": int((df["bottom_do_mgL"] < 2.0).sum()), } ) summary = pd.DataFrame(records) output_csv.parent.mkdir(parents=True, exist_ok=True) summary.to_csv(output_csv, index=False) return summary def main() -> None: templates_dir = Path("templates/cequalw2") run_root = Path("runs/cequalw2_demo") cases = [ W2CaseConfig("base", "2020-04-01", "2020-09-30"), W2CaseConfig("tp_load_30pct_reduction", "2020-04-01", "2020-09-30", tp_load_factor=0.70), W2CaseConfig("selective_withdrawal", "2020-04-01", "2020-09-30", selective_withdrawal=True), W2CaseConfig("hypolimnetic_aeration", "2020-04-01", "2020-09-30", aeration=True), ] case_dirs = [build_case(case, templates_dir, run_root) for case in cases] # 実務では以下を有効化: # for case_dir in case_dirs: # run_cequalw2(case_dir, exe_path=Path("w2_bin/w2_v45.exe")) summary = summarize_cases(case_dirs, Path("reports/cequalw2_case_summary.csv")) print(summary) if __name__ == "__main__": main()