Source code for gcmprocpy.imfgen.core

"""Top-level orchestration: fetch -> process -> Dataset."""

from datetime import datetime

import numpy as np

from .dataset import build_dataset
from .dates import coerce_datetime, date_value, iso_timestamp, yesterday
from .processing import process_channels
from .sources import bcwind_samples, omni_samples

DEFAULT_OMNI_START = "1982-01-01"


[docs] def generate_imf( start=None, end=None, source="omni", window=10, cache_dir=None, bcwind_path=None, download=True, verbose=False, ): """Generate an IMF ``xarray.Dataset``. Parameters ---------- start, end : date-like or None Inclusive bounds (``YYYY-MM-DD``, ``YYYYDDD``, ISO, or ``datetime``). For ``omni`` they default to ``1982-01-01`` and yesterday. For ``bcwind`` they optionally filter the file (default: the file's full span). source : {"omni", "bcwind"} OMNI 1-minute ASCII (default) or a BCWIND HDF5 file. window : int Trailing-average window in minutes for the OMNI pipeline (default 10). Ignored for ``bcwind`` (raw pass-through). cache_dir : str or None Directory holding / receiving ``omni_min<year>.asc`` files (OMNI only). bcwind_path : str or None Path to the BCWIND HDF5 file (required when ``source="bcwind"``). download : bool Fetch missing OMNI year files over FTP (default True). verbose : bool Print progress. Returns ------- xarray.Dataset Use :func:`imfgen.save_imf` to write it to NetCDF. """ if source == "omni": start_dt = coerce_datetime(start if start is not None else DEFAULT_OMNI_START) end_dt = (coerce_datetime(end, end_of_day=True) if end is not None else yesterday()) if verbose: print(f"Generating OMNI IMF {start_dt.date()} -> {end_dt.date()} " f"(window={window})") samples = omni_samples(start_dt, end_dt, window=window, cache_dir=cache_dir, download=download, verbose=verbose) source_path = None elif source == "bcwind": if not bcwind_path: raise ValueError("source='bcwind' requires bcwind_path=<file.h5>.") start_dt = coerce_datetime(start) if start is not None else None end_dt = (coerce_datetime(end, end_of_day=True) if end is not None else None) if verbose: print(f"Converting BCWIND file {bcwind_path}") samples = bcwind_samples(bcwind_path, start_dt=start_dt, end_dt=end_dt) source_path = bcwind_path else: raise ValueError(f"Unknown source {source!r}; expected 'omni' or 'bcwind'.") timestamps = samples["timestamps"] n_out = len(timestamps) processed = process_channels( samples["channels"], samples["window"], n_out, samples["interpolate"] ) dates = np.array([date_value(t) for t in timestamps]) iso = np.array([iso_timestamp(t) for t in timestamps]) ds = build_dataset(processed, dates, iso, source=source, source_path=source_path) if verbose: print(f"Built IMF dataset: {ds.attrs['yearday_beg']} -> " f"{ds.attrs['yearday_end']} ({n_out} minutes)") return ds
[docs] def generate_imf_years(start=None, end=None, window=10, cache_dir=None, download=True, verbose=False): """Yield one OMNI ``Dataset`` per calendar year in ``[start, end]``. Each year is generated **independently** (its own within-year interpolation), so the per-year files reproduce the legacy ``imf_OMNI_YYYY001-YYYYddd.nc`` files exactly. This is what ``imfgen --split-years`` writes. (BCWIND files are a single span and are not split.) """ start_dt = coerce_datetime(start if start is not None else DEFAULT_OMNI_START) end_dt = (coerce_datetime(end, end_of_day=True) if end is not None else yesterday()) for year in range(start_dt.year, end_dt.year + 1): y_start = max(start_dt, datetime(year, 1, 1)) y_end = min(end_dt, datetime(year, 12, 31, 23, 59)) yield generate_imf( start=y_start, end=y_end, source="omni", window=window, cache_dir=cache_dir, download=download, verbose=verbose, )