Source code for pyradtran.core.output_parser

"""Parse uvspec output files into xarray.Dataset.

Supports both ASCII (default DISORT 7-column and output_user custom)
and NetCDF output formats.
"""

from __future__ import annotations

from pathlib import Path

import numpy as np
import xarray as xr


[docs] def parse_output( output_path: str | Path, format: str = "netcdf", n_zout: int = 1, column_names: list[str] | None = None, ) -> xr.Dataset: """Parse uvspec output file into xarray.Dataset. Args: output_path: Path to uvspec output file. format: Output format — "netcdf" or "ascii". n_zout: Number of zout levels in output (for ASCII multi-level). column_names: Custom column names for output_user ASCII output. Returns: xarray.Dataset with wavelength (and optionally zout) as coordinates. """ output_path = Path(output_path) if format == "netcdf": return _parse_netcdf(output_path) else: return _parse_ascii(output_path, n_zout=n_zout, column_names=column_names)
def _parse_netcdf(path: Path) -> xr.Dataset: """Read NetCDF output file.""" ds = xr.open_dataset(path) ds.load() ds.close() return ds _STANDARD_COLUMNS = ["wavelength", "edir", "edn", "eup", "udir", "udn", "uup"] def _parse_ascii( path: Path, n_zout: int = 1, column_names: list[str] | None = None, ) -> xr.Dataset: """Read ASCII uvspec output. For multi-level output (n_zout > 1), uvspec interleaves rows by wavelength. """ lines = [] with open(path) as f: for line in f: stripped = line.strip() if stripped and not stripped.startswith("#"): lines.append(stripped) if not lines: raise ValueError(f"Empty output file: {path}") data = np.loadtxt(lines) if data.ndim == 1: data = data.reshape(1, -1) n_rows, n_cols = data.shape if column_names is None: if n_cols == 7: column_names = _STANDARD_COLUMNS elif n_cols == 2: column_names = ["wavelength", "value"] else: column_names = [f"col_{i}" for i in range(n_cols)] if n_zout == 1: data_vars = {} wavelength = None for i, name in enumerate(column_names): if name == "wavelength": wavelength = data[:, i] else: data_vars[name] = ("wavelength", data[:, i]) if wavelength is None: wavelength = data[:, 0] return xr.Dataset(data_vars, coords={"wavelength": wavelength}) else: n_wl = n_rows // n_zout if n_rows != n_wl * n_zout: raise ValueError( f"Expected {n_wl * n_zout} rows for {n_wl} wavelengths x {n_zout} zout levels, " f"got {n_rows}" ) reshaped = data.reshape(n_wl, n_zout, n_cols) data_vars = {} coords = {} for i, name in enumerate(column_names): if name == "wavelength": coords["wavelength"] = reshaped[:, 0, i] else: data_vars[name] = (("wavelength", "zout"), reshaped[:, :, i]) coords["zout"] = np.arange(n_zout) return xr.Dataset(data_vars, coords=coords)