Source code for pyradtran.utils
# pyradtran/utils.py
"""
Utility helpers for pyRadtran.
Currently provides :class:`RadiosondeFinder`, which scans a directory tree
for radiosonde ``.dat`` files and returns the one closest in time to a
given target datetime.
See Also
--------
pyradtran.io.ERA5AtmosphereGenerator : Alternative atmosphere source.
pyradtran.io.RadiosondeAtmosphereGenerator : Online radiosonde retrieval.
"""
import logging
import re
from bisect import bisect_left
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import List, Optional, Tuple
logger = logging.getLogger(__name__)
[docs]
class RadiosondeFinder:
"""Locate the radiosonde file closest in time to a target datetime.
On construction the *base_path* directory tree is scanned for files
matching the pattern ``YYYYMMDD_SSSSSSOD.dat`` (date + seconds-of-day).
Subsequent calls to :meth:`find_closest` perform a fast binary search
on the pre-sorted file list.
Parameters
----------
base_path : pathlib.Path or None
Root directory to scan. If *None*, no scanning is performed and
all look-ups return *None*.
Examples
--------
>>> finder = RadiosondeFinder(Path("/data/radiosondes"))
>>> finder.find_closest(datetime(2022, 3, 28, 12, 0))
PosixPath('/data/radiosondes/2022/20220328_43200SOD.dat')
See Also
--------
pyradtran.io.RadiosondeAtmosphereGenerator : Fetch soundings from IGRA.
"""
_SONDE_FILENAME_PATTERN = re.compile(r"(\d{8})_(\d{5})SOD\.dat")
[docs]
def __init__(self, base_path: Optional[Path]):
self.base_path = base_path
self._sonde_data: List[Tuple[datetime, Path]] = []
if self.base_path:
self._scan_sondes()
else:
logger.info("No radiosonde base path provided, skipping sonde scan.")
def _scan_sondes(self):
"""Scan *base_path* recursively for radiosonde files."""
if not self.base_path or not self.base_path.is_dir():
logger.debug(
f"Radiosonde base path does not exist or not provided: {self.base_path}"
)
return
logger.info(f"Scanning for radiosondes under: {self.base_path}")
sonde_files = []
for sonde_path in self.base_path.rglob("*.dat"):
match = self._SONDE_FILENAME_PATTERN.search(sonde_path.name)
if match:
date_str, sod_str = match.groups()
try:
# Assume sonde filenames are UTC
base_date = datetime.strptime(date_str, "%Y%m%d").replace(
tzinfo=timezone.utc
)
# SOD seems to be seconds of day
file_datetime = base_date + timedelta(seconds=int(sod_str))
sonde_files.append((file_datetime, sonde_path))
except ValueError:
logger.warning(
f"Could not parse timestamp from sonde file: {sonde_path.name}"
)
self._sonde_data = sorted(sonde_files, key=lambda item: item[0])
logger.info(f"Found and parsed {len(self._sonde_data)} radiosonde files.")
if not self._sonde_data:
logger.warning("No valid radiosonde files found in the specified path.")
[docs]
def find_closest(self, target_dt: datetime) -> Optional[Path]:
"""Return the radiosonde file closest in time to *target_dt*.
Parameters
----------
target_dt : datetime
Target time (UTC assumed if timezone-naive).
Returns
-------
pathlib.Path or None
Absolute path to the best-matching file, or *None* when no
files have been indexed.
"""
if not self._sonde_data:
return None
# Ensure target_dt is timezone-aware (assume UTC if naive)
if target_dt.tzinfo is None:
target_dt = target_dt.replace(tzinfo=timezone.utc)
elif target_dt.tzinfo != timezone.utc:
# Convert to UTC if it's a different timezone
target_dt = target_dt.astimezone(timezone.utc)
sonde_times = [item[0] for item in self._sonde_data]
# bisect_left finds the insertion point for target_dt in the sorted list sonde_times
pos = bisect_left(sonde_times, target_dt)
if pos == 0:
# Target time is before the first sonde
return self._sonde_data[0][1]
if pos == len(sonde_times):
# Target time is after the last sonde
return self._sonde_data[-1][1]
# Target time is between sonde_times[pos-1] and sonde_times[pos]
dt_before = target_dt - sonde_times[pos - 1]
dt_after = sonde_times[pos] - target_dt
# Return the path of the sonde with the smaller time difference
if dt_before <= dt_after:
return self._sonde_data[pos - 1][1]
else:
return self._sonde_data[pos][1]
[docs]
def find_radiosonde_file(
self, dt: datetime, latitude: float, longitude: float
) -> Optional[Path]:
"""Find the radiosonde file closest to *dt*.
Parameters
----------
dt : datetime
Target time.
latitude, longitude : float
Reserved for future spatial matching; currently unused.
Returns
-------
pathlib.Path or None
"""
return self.find_closest(dt)
# Add other general utility functions here if needed