Source code for gcmprocpy.data_density

"""Species-aware density unit conversions.

Port of tgcmproc ``denconv.F`` (B. Foster).  Converts atmospheric species
fields among the four density representations used by TIE-GCM post-
processing:

- **MMR**    — mass mixing ratio (dimensionless, mass fraction)
- **CM3**    — number density (molecules cm⁻³)
- **CM3-MR** — volume mixing ratio / mole fraction (dimensionless)
- **GM/CM3** — mass density (g cm⁻³)

The conversion requires the mean air molar mass (``BARM``) and the air
number density at each grid point (``pkt``), both of which depend on the
model's temperature and O / O₂ fields.

Reference formulas (tgcmproc ``denconv.F``)::

    BARM = 1 / (O₂/32 + O/16 + (1 − O₂ − O)/28)
    pkt  = p₀ · exp(−ζ) / (k_B · T)                [CGS]

    MMR → CM3     :   f · pkt · BARM / W
    MMR → CM3-MR  :   f · BARM / W
    MMR → GM/CM3  :   f · pkt · BARM · 1.66e-24

Currently TIE-GCM only.  WACCM-X uses a hybrid-pressure coordinate and
needs a different pressure expression; calling :func:`arr_density` with
a WACCM-X dataset raises :class:`NotImplementedError`.
"""
import logging
import numpy as np

from .containers import (
    PlotData,
    MODEL_DEFAULTS,
    get_species_names,
    cache_data_fn,
)
from .data_parse import get_mtime, level_log_transform

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Constants (CGS, matching tgcmproc denconv.F)
# ---------------------------------------------------------------------------
_BOLTZ_CGS = 1.3805e-16          # erg K⁻¹
_AMU_G = 1.66e-24                # g per atomic mass unit
_P0_TIEGCM_CGS = 5.0e-4          # dyn cm⁻²  (TIE-GCM log-pressure reference)

SUPPORTED_DENSITY_UNITS = ('MMR', 'CM3', 'CM3-MR', 'GM/CM3')

# Role → molar mass (g/mol).  Role keys match MODEL_DEFAULTS['species'].
_SPECIES_MOLAR_MASS = {
    'o':   16.00,
    'o2':  32.00,
    'n2':  28.00,
    'no':  30.00,
    'co2': 44.00,
    'h':    1.008,
    'o3':  48.00,
    'ho2': 33.00,
}


# Common unit-string aliases seen in model attrs.
_UNIT_ALIASES = {
    'cm-3': 'CM3',
    'cm^-3': 'CM3',
    '1/cm3': 'CM3',
    'molecules/cm3': 'CM3',
    'mmr': 'MMR',
    'kg/kg': 'MMR',
    'g/g': 'MMR',
    'vmr': 'CM3-MR',
    'mol/mol': 'CM3-MR',
    'mole fraction': 'CM3-MR',
    'g/cm3': 'GM/CM3',
    'g cm-3': 'GM/CM3',
}


def _normalize_unit(unit):
    """Canonicalise a density unit string to one of SUPPORTED_DENSITY_UNITS."""
    if unit is None:
        return None
    key = unit.strip().lower()
    if key in _UNIT_ALIASES:
        return _UNIT_ALIASES[key]
    upper = unit.strip().upper()
    if upper in SUPPORTED_DENSITY_UNITS:
        return upper
    return unit  # fall through; caller raises


[docs] def get_species_molar_mass(model, variable_name): """Molar mass (g/mol) for a species variable name in the given model. Resolves through :func:`get_species_names` so that model-specific naming (e.g. TIE-GCM ``O1`` vs WACCM-X ``O``) is handled uniformly. Raises: ValueError: If the variable is not a known species in the model, or no molar mass is registered for that species role. """ sp = get_species_names(model) inverse = {v: k for k, v in sp.items()} role = inverse.get(variable_name) if role is None: raise ValueError( f"'{variable_name}' is not a recognised species for {model}. " f"Known: {sorted(sp.values())}" ) if role not in _SPECIES_MOLAR_MASS: raise ValueError( f"No molar mass registered for species role '{role}' " f"({variable_name} in {model}). Add it to _SPECIES_MOLAR_MASS." ) return _SPECIES_MOLAR_MASS[role]
[docs] def compute_barm(o_mmr, o2_mmr): """Mean air molar mass (g/mol) from atomic-O and molecular-O₂ MMRs. Mirrors ``denconv.F``:: BARM = 1 / (O₂/32 + O/16 + max(1e-5, 1 − O₂ − O) / 28) The residual ``(1 − O₂ − O)`` is treated as N₂. A 1e-5 floor guards against the pathological case where O₂ + O ≈ 1 in the upper thermosphere. """ o_mmr = np.asarray(o_mmr, dtype=float) o2_mmr = np.asarray(o2_mmr, dtype=float) residual = np.maximum(1.0e-5, 1.0 - o2_mmr - o_mmr) return 1.0 / (o2_mmr / 32.0 + o_mmr / 16.0 + residual / 28.0)
[docs] def compute_pkt(levs, temperature, model='TIE-GCM'): """Air number density (cm⁻³) on TIE-GCM log-pressure levels. ``pkt = p₀ · exp(−ζ) / (k_B · T)`` in CGS, where *ζ* is the log- pressure coordinate (``lev``) and ``p₀ = 5 × 10⁻⁴`` dyn cm⁻². Args: levs: Log-pressure level values, shape ``(nlev,)``. temperature: Temperature in K. Lev axis must be axis 0 when the array is multi-dimensional. model: Currently ``'TIE-GCM'`` only. """ if model != 'TIE-GCM': raise NotImplementedError( f"compute_pkt supports TIE-GCM only (got '{model}')." ) levs = np.asarray(levs, dtype=float) t = np.asarray(temperature, dtype=float) # Broadcast lev (axis 0) across temperature's remaining axes. shape = [1] * max(t.ndim, 1) if t.ndim >= 1: shape[0] = len(levs) p = _P0_TIEGCM_CGS * np.exp(-levs.reshape(shape)) return p / (_BOLTZ_CGS * t)
[docs] def convert_density_units(values, from_unit, to_unit, *, barm, pkt, molar_mass): """Convert a density field between MMR / CM3 / CM3-MR / GM/CM3. Uses MMR as the pivot. All formulas mirror ``denconv.F``; the reverse (non-MMR → MMR) direction is algebraic inversion. Args: values: Array-like field to convert. from_unit, to_unit: Source / target unit strings. Accepts common aliases (``'cm-3'``, ``'kg/kg'``, ``'g/cm3'``...). barm: Mean air molar mass (g/mol) broadcast-compatible with *values*. pkt: Air number density (cm⁻³), same broadcast shape. molar_mass: Species molar mass (g/mol), scalar. Returns: ndarray with the same shape as *values*, in *to_unit*. """ fu = _normalize_unit(from_unit) tu = _normalize_unit(to_unit) if fu not in SUPPORTED_DENSITY_UNITS: raise ValueError( f"Unsupported from_unit '{from_unit}'. " f"Supported: {SUPPORTED_DENSITY_UNITS} (aliases accepted)." ) if tu not in SUPPORTED_DENSITY_UNITS: raise ValueError( f"Unsupported to_unit '{to_unit}'. " f"Supported: {SUPPORTED_DENSITY_UNITS} (aliases accepted)." ) values = np.asarray(values, dtype=float) if fu == tu: return values w = float(molar_mass) # Step 1: from_unit → MMR if fu == 'MMR': mmr = values elif fu == 'CM3': mmr = values * w / (pkt * barm) elif fu == 'CM3-MR': mmr = values * w / barm elif fu == 'GM/CM3': rho_air = pkt * barm * _AMU_G mmr = values / rho_air # Step 2: MMR → to_unit (formulas straight from denconv.F) if tu == 'MMR': return mmr if tu == 'CM3': return mmr * pkt * barm / w if tu == 'CM3-MR': return mmr * barm / w if tu == 'GM/CM3': return mmr * pkt * barm * _AMU_G
@cache_data_fn def arr_density(datasets, variable_name, time, *, to_unit='CM3', from_unit=None, log_level=True, **kwargs): """Extract a species field and convert it to the requested density unit. Reads the species, temperature, and O / O₂ fields from the first dataset that has *time*, computes BARM and pkt, and returns the converted field. Args: datasets (list): :class:`ModelDataset` objects. variable_name (str): Species variable name (e.g. ``'O2'``, ``'NO'``, ``'CO2'``). time: ``numpy.datetime64`` or ISO-8601 string. to_unit (str): Target unit. One of :data:`SUPPORTED_DENSITY_UNITS` (aliases accepted). Default ``'CM3'``. from_unit (str): Source unit. If ``None`` (default), reads it from the dataset variable's ``units`` attribute. log_level (bool): Whether to log-transform level coordinates for the returned :class:`PlotData`. Returns: PlotData: Shape ``(nlev, nlat, nlon)`` in *to_unit*. ``None`` if no dataset contains *time*. Raises: NotImplementedError: For non-TIE-GCM datasets (WACCM-X pending). ValueError: If the species / temperature / O / O₂ fields are missing, or the source unit cannot be determined. """ if isinstance(time, str): time = np.datetime64(time, 'ns') for mds in datasets: if not mds.has_time(time): continue ds = mds.ds if mds.model != 'TIE-GCM': raise NotImplementedError( f"arr_density currently supports TIE-GCM only " f"(got '{mds.model}'). WACCM-X pressure-coordinate port " f"pending." ) sp = get_species_names(mds.model) t_name, o_name, o2_name = sp['temp'], sp['o'], sp['o2'] required = [variable_name, t_name, o_name, o2_name] missing = [r for r in required if r not in ds.variables] if missing: raise ValueError( f"Density conversion requires {required} but dataset " f"{mds.filename!r} is missing: {missing}" ) molar_mass = get_species_molar_mass(mds.model, variable_name) src_unit = from_unit if from_unit is not None else \ ds[variable_name].attrs.get('units') if src_unit is None: raise ValueError( f"No 'units' attribute on {variable_name!r}; pass from_unit= " f"explicitly." ) selected_mtime = get_mtime(ds, time) ds_t = ds.sel(time=time) field = ds_t[variable_name].values.astype(float) temp = ds_t[t_name].values.astype(float) o_mmr = ds_t[o_name].values.astype(float) o2_mmr = ds_t[o2_name].values.astype(float) da = ds_t[variable_name] if 'lev' in da.dims: levs = ds_t.lev.values.astype(float) elif 'ilev' in da.dims: levs = ds_t.ilev.values.astype(float) else: raise ValueError( f"'{variable_name}' has no lev/ilev coordinate for density " f"conversion." ) barm = compute_barm(o_mmr, o2_mmr) pkt = compute_pkt(levs, temp, model=mds.model) converted = convert_density_units( field, src_unit, to_unit, barm=barm, pkt=pkt, molar_mass=molar_mass, ) levs_display = level_log_transform(levs.copy(), mds.model, log_level) long_name = ds[variable_name].attrs.get('long_name', variable_name) return PlotData( values=converted, variable_unit=_normalize_unit(to_unit), variable_long_name=long_name, model=mds.model, filename=mds.filename, levs=levs_display, lats=ds_t.lat.values.astype(float), lons=ds_t.lon.values.astype(float), mtime=selected_mtime, ) logger.warning("%s not found in any dataset.", time) return None