# pyradtran/core.py
"""
Simulation engine for pyRadtran.
This module contains the :class:`Simulation` class, which is the
low-level workhorse behind every ``uvspec`` invocation. It is
responsible for:
* Generating a complete ``uvspec`` input file from a
:class:`~pyradtran.config.SimulationConfig` and per-point overrides.
* Spawning the ``uvspec`` subprocess and capturing its stdout.
* Cleaning up temporary files.
Most users should interact with the higher-level
:class:`~pyradtran.interface.PyRadtranAccessor` (``ds.pyradtran.run()``)
rather than calling :class:`Simulation` directly.
See Also
--------
pyradtran.interface : High-level batch interface and xarray accessor.
pyradtran.io.OutputParser : Parse ``uvspec`` output files.
"""
import logging
import os
import subprocess
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
from .config import SimulationConfig
from .exceptions import UvspecExecutionError
from .utils import RadiosondeFinder
logger = logging.getLogger(__name__)
[docs]
class Simulation:
"""Low-level wrapper around a single ``uvspec`` execution.
Parameters
----------
config : SimulationConfig
Fully merged configuration object.
See Also
--------
pyradtran.interface.execute_simulation_batch : Parallel driver.
"""
[docs]
def __init__(self, config: SimulationConfig):
"""Initialise with a merged :class:`SimulationConfig`."""
self.config = config
self.radiosonde_finder = (
RadiosondeFinder(config.paths.radiosonde_base)
if config.paths.radiosonde_base
else None
)
[docs]
def run_simulation(
self,
dt: datetime,
latitude: float,
longitude: float,
override_albedo: Optional[float] = None,
override_surface_temperature: Optional[float] = None,
override_altitude_km: Optional[float] = None,
override_surface_type: Optional[int] = None,
era5_atmosphere_file: Optional[Path] = None,
parameter_overrides: Dict[str, Any] = None,
) -> Optional[Path]:
"""
Run a single ``uvspec`` simulation.
Parameters
----------
dt : datetime
Simulation date/time (UTC).
latitude, longitude : float
Location in degrees.
override_albedo : float, optional
Per-point surface albedo (overrides config value).
override_surface_temperature : float, optional
Per-point surface temperature in K.
override_altitude_km : float, optional
Per-point observation altitude in km.
override_surface_type : int, optional
IGBP surface-type code (1–20) for BRDF look-up.
era5_atmosphere_file : pathlib.Path, optional
Custom ERA5 atmosphere file (radiosonde format).
parameter_overrides : dict, optional
Extra ``key: value`` pairs appended to the input file.
Returns
-------
pathlib.Path or None
Path to the ``uvspec`` output file, or *None* on failure.
Raises
------
UvspecExecutionError
If the subprocess returns a non-zero exit code.
"""
temp_cloud_files = []
try:
# Handle dynamic cloud profiles in overrides
# If wc_file or ic_file is passed as a dict/dataframe, convert to file
if parameter_overrides and (
"wc_file" in parameter_overrides or "ic_file" in parameter_overrides
):
# We need to modify parameter_overrides, so use a copy to avoid side effects on the passed dict
parameter_overrides = parameter_overrides.copy()
new_overrides, files_to_clean = self._handle_dynamic_clouds(
parameter_overrides
)
parameter_overrides.update(new_overrides)
temp_cloud_files.extend(files_to_clean)
# Find radiosonde file if using radiosonde data
radiosonde_path = None
if (
self.config.simulation_defaults.h2o_source == "radiosonde"
and self.radiosonde_finder
):
radiosonde_path = self.radiosonde_finder.find_radiosonde_file(
dt, latitude, longitude
)
# Generate input file content
input_content = self._generate_input_content(
dt=dt,
latitude=latitude,
longitude=longitude,
radiosonde_path=radiosonde_path,
override_albedo=override_albedo,
override_surface_temperature=override_surface_temperature,
override_altitude_km=override_altitude_km,
override_surface_type=override_surface_type,
era5_atmosphere_file=era5_atmosphere_file,
parameter_overrides=parameter_overrides,
)
# Create temporary input file
with tempfile.NamedTemporaryFile(
mode="w", suffix=".inp", delete=False, dir=self.config.paths.working_dir
) as inp_file:
inp_file.write(input_content)
input_path = Path(inp_file.name)
# Create output file path
output_path = input_path.with_suffix(".out")
# Run LibRadtran
success = self._run_uvspec(input_path, output_path)
if success and output_path.exists():
logger.debug(f"Simulation completed successfully: {output_path}")
# Clean up input file if requested
if self.config.execution.cleanup_temp_files:
input_path.unlink(missing_ok=True)
return output_path
else:
logger.error("Simulation failed or produced no output")
return None
except Exception as e:
logger.error(f"Simulation failed: {str(e)}")
raise UvspecExecutionError(f"Simulation failed: {str(e)}")
finally:
# Cleanup temp cloud files
for p in temp_cloud_files:
try:
if os.path.exists(p):
os.unlink(p)
except Exception as e:
logger.warning(f"Failed to cleanup temp file {p}: {e}")
def _generate_input_content(
self,
dt: datetime,
latitude: float,
longitude: float,
radiosonde_path: Optional[Path] = None,
override_albedo: Optional[float] = None,
override_surface_temperature: Optional[float] = None,
override_altitude_km: Optional[float] = None,
override_surface_type: Optional[int] = None,
era5_atmosphere_file: Optional[Path] = None,
parameter_overrides: Dict[str, Any] = None,
) -> str:
"""Build the full ``uvspec`` input-file content as a string."""
lines = []
# Basic LibRadtran settings
lines.append(f"rte_solver {self.config.simulation_defaults.rte_solver}")
if self.config.simulation_defaults.mol_abs_param:
lines.append(
f"mol_abs_param {self.config.simulation_defaults.mol_abs_param}"
)
# Data files path
lines.append(f"data_files_path {self.config.paths.libradtran_data}")
lines.append(f"atmosphere_file {self.config.paths.atmosphere_profile}")
# Atmosphere settings
if era5_atmosphere_file is not None:
# Use custom ERA5 atmosphere file
era5_abs_path = Path(era5_atmosphere_file).resolve()
# Read the first few lines to determine H2O format
# The file should be in libradtran radiosonde format and looks like this:
# # ERA5 atmosphere profile in libradtran radiosonde style
# # p(hPa) T(K) h2o(cm-3)
# 50.00 214.95 2.868e-06
h2o_unit = "MMR" # Default to MMR (mass mixing ratio)
with open(era5_abs_path, "r") as f:
second_line = f.readlines()[1].strip()
if "kg kg**-1" in second_line:
h2o_unit = "MMR"
elif "%" in second_line:
h2o_unit = "RH"
elif "kg/kg" in second_line:
h2o_unit = "MMR"
# else: keep the default MMR
lines.append(f"radiosonde {era5_abs_path} H2O {h2o_unit}")
logger.debug(
f"Using ERA5 atmosphere file as radiosonde: {era5_abs_path} with H2O unit: {h2o_unit}"
)
# Radiosonde data
elif (
radiosonde_path
and self.config.simulation_defaults.h2o_source == "radiosonde"
):
lines.append(f"radiosonde {radiosonde_path} H2O RH")
# Molecular modifications
# if self.config.simulation_defaults.ozone_du is not None:
# lines.append(f"mol_modify O3 {self.config.simulation_defaults.ozone_du} DU")
# if (self.config.simulation_defaults.h2o_mm is not None and
# self.config.simulation_defaults.h2o_source == 'fixed'):
# lines.append(f"mol_modify H2O {self.config.simulation_defaults.h2o_mm} MM")
# Solar/thermal source
if self.config.simulation_defaults.source == "solar":
if self.config.simulation_defaults.integrate_wavelength:
lines.append(f"source solar {self.config.paths.solar_spectrum} per_nm")
else:
lines.append(f"source solar {self.config.paths.solar_spectrum}")
# Solar geometry
if self.config.simulation_defaults.sza is not None:
lines.append(f"sza {self.config.simulation_defaults.sza}")
else:
# Calculate solar zenith angle from time and location
sza = self._calculate_solar_zenith_angle(dt, latitude, longitude)
lines.append(f"sza {sza}")
# Day of year for solar spectrum adjustment
day_of_year = dt.timetuple().tm_yday
lines.append(f"day_of_year {day_of_year}")
elif self.config.simulation_defaults.source == "thermal":
lines.append("source thermal")
# Surface temperature for thermal calculations
surf_temp = (
override_surface_temperature
or self.config.simulation_defaults.surface_temperature_k
)
if surf_temp is not None:
lines.append(f"sur_temperature {surf_temp}")
# Spectral settings
if (
self.config.simulation_defaults.wavelength_nm
and len(self.config.simulation_defaults.wavelength_nm) == 2
):
wl_min, wl_max = self.config.simulation_defaults.wavelength_nm
lines.append(f"wavelength {wl_min} {wl_max}")
if self.config.simulation_defaults.integrate_wavelength:
lines.append("output_process integrate")
# Surface properties
if override_surface_type is not None:
# Use IGBP surface type library
lines.append("brdf_rpv_library IGBP")
lines.append(f"brdf_rpv_type {int(override_surface_type)}")
elif override_albedo is not None:
albedo = override_albedo
lines.append(f"albedo {albedo}")
elif self.config.simulation_defaults.albedo_value is not None:
lines.append(f"albedo {self.config.simulation_defaults.albedo_value}")
# Output settings
output_columns = " ".join(self.config.simulation_defaults.output_columns)
if output_columns:
lines.append(f"output_user {output_columns}")
# Output altitudes
if override_altitude_km is not None:
# Single altitude override
lines.append(f"zout {override_altitude_km}")
else:
# Use configured altitudes
if self.config.simulation_defaults.output_altitudes_km:
alt_str = " ".join(
f"{alt:.4f}"
for alt in self.config.simulation_defaults.output_altitudes_km
)
lines.append(f"zout {alt_str}")
# Viewing geometry
if self.config.simulation_defaults.viewing_geometry == "nadir":
lines.append("umu 1.0") # Nadir viewing
# Cloud settings (simplified)
if self.config.simulation_defaults.clouds.enabled:
self._add_cloud_settings(lines)
# Apply parameter overrides
# Merge config-based overrides with runtime overrides
combined_overrides = self.config.simulation_defaults.parameter_overrides.copy()
if parameter_overrides:
combined_overrides.update(parameter_overrides)
if combined_overrides:
for key, value in combined_overrides.items():
# Remove any existing lines that start with the key followed by a space
lines = [line for line in lines if not line.startswith(f"{key} ")]
# Append the override line
lines.append(f"{key} {value}")
# Add quiet mode to reduce output
lines.append("quiet")
return "\n".join(lines) + "\n"
def _handle_dynamic_clouds(
self, overrides: Dict[str, Any]
) -> tuple[Dict[str, Any], list[str]]:
"""Convert dict-valued ``wc_file`` / ``ic_file`` overrides to temp files."""
new_updates = {}
cleanup_list = []
# Helper to write profile
def write_profile_file(key_prefix, data):
content = self.format_cloud_profile(data)
# Create temp file
output_dir = self.config.paths.working_dir
if not output_dir.exists():
output_dir.mkdir(parents=True, exist_ok=True)
fd, path = tempfile.mkstemp(
prefix=f"pyradtran_{key_prefix}_", suffix=".dat", dir=output_dir
)
with os.fdopen(fd, "w") as f:
f.write(content)
return path
if "wc_file" in overrides and isinstance(overrides["wc_file"], (dict, list)):
# It's not a path, it's data
# Check if it's not already a string
if not isinstance(overrides["wc_file"], str):
path = write_profile_file("wc", overrides["wc_file"])
new_updates["wc_file"] = f"1D {path}"
cleanup_list.append(path)
if "ic_file" in overrides and isinstance(overrides["ic_file"], (dict, list)):
if not isinstance(overrides["ic_file"], str):
path = write_profile_file("ic", overrides["ic_file"])
new_updates["ic_file"] = f"1D {path}"
cleanup_list.append(path)
return new_updates, cleanup_list
def _add_cloud_settings(self, lines: list):
"""Append cloud-related lines to the input file."""
clouds = self.config.simulation_defaults.clouds
if clouds.cloud_source == "file":
# File-based clouds
if clouds.cloud_type in ["wc", "mixed"] and clouds.wc_file:
lines.append(f"wc_file {clouds.wc_file}")
if clouds.cloud_type in ["ic", "mixed"] and clouds.ic_file:
lines.append(f"ic_file {clouds.ic_file}")
elif clouds.cloud_source == "parametric":
# Simple parametric cloud
if clouds.cloud_type in ["wc", "mixed"]:
lines.append(
f"wc_layer {clouds.layer_bottom_km} {clouds.layer_top_km} {clouds.water_content_g_m3} {clouds.effective_radius_um}"
)
if clouds.cloud_type in ["ic", "mixed"]:
lines.append(
f"ic_layer {clouds.layer_bottom_km} {clouds.layer_top_km} {clouds.ice_content_g_m3} {clouds.effective_radius_um}"
)
def _calculate_solar_zenith_angle(
self, dt: datetime, latitude: float, longitude: float
) -> float:
"""Approximate solar zenith angle (degrees) from time and location."""
try:
import numpy as np
# Day of year
day_of_year = dt.timetuple().tm_yday
# Solar declination angle
declination = 23.45 * np.sin(np.radians((360 * (284 + day_of_year)) / 365))
# Hour angle
time_decimal = dt.hour + dt.minute / 60.0 + dt.second / 3600.0
hour_angle = 15 * (time_decimal - 12) + longitude
# Solar zenith angle
lat_rad = np.radians(latitude)
decl_rad = np.radians(declination)
hour_rad = np.radians(hour_angle)
cos_sza = np.sin(lat_rad) * np.sin(decl_rad) + np.cos(lat_rad) * np.cos(
decl_rad
) * np.cos(hour_rad)
sza = np.degrees(np.arccos(np.clip(cos_sza, -1, 1)))
return round(sza, 2)
except Exception as e:
logger.warning(
f"Failed to calculate solar zenith angle: {e}, using default 30°"
)
return 30.0
def _run_uvspec(self, input_path: Path, output_path: Path) -> bool:
"""Spawn ``uvspec``, piping *input_path* to stdin and writing stdout to *output_path*."""
try:
cmd = [str(self.config.paths.libradtran_bin)]
logger.debug(f"Running LibRadtran: {' '.join(cmd)}")
logger.debug(f"Input file: {input_path}")
logger.debug(f"Output file: {output_path}")
with open(input_path, "r") as inp_file, open(output_path, "w") as out_file:
result = subprocess.run(
cmd,
stdin=inp_file,
stdout=out_file,
stderr=subprocess.PIPE,
timeout=self.config.execution.timeout_seconds,
text=True,
)
if result.returncode != 0:
error_msg = result.stderr.strip() if result.stderr else "Unknown error"
logger.error(
f"LibRadtran failed with return code {result.returncode}: {error_msg}"
)
return False
return True
except subprocess.TimeoutExpired:
logger.error(
f"LibRadtran execution timed out after {self.config.execution.timeout_seconds} seconds"
)
return False
except Exception as e:
logger.error(f"Failed to execute LibRadtran: {str(e)}")
return False
# Expose main class
__all__ = ["Simulation"]