"""Species-aware density unit conversions.
Port of tgcmproc ``denconv.F`` (B. Foster). Converts atmospheric species
fields among the four density representations used by TIE-GCM post-
processing:
- **MMR** — mass mixing ratio (dimensionless, mass fraction)
- **CM3** — number density (molecules cm⁻³)
- **CM3-MR** — volume mixing ratio / mole fraction (dimensionless)
- **GM/CM3** — mass density (g cm⁻³)
The conversion requires the mean air molar mass (``BARM``) and the air
number density at each grid point (``pkt``), both of which depend on the
model's temperature and O / O₂ fields.
Reference formulas (tgcmproc ``denconv.F``)::
BARM = 1 / (O₂/32 + O/16 + (1 − O₂ − O)/28)
pkt = pressure / (k_B · T) [CGS]
MMR → CM3 : f · pkt · BARM / W
MMR → CM3-MR : f · BARM / W
MMR → GM/CM3 : f · pkt · BARM · 1.66e-24
The only model-specific piece is how *pressure* is recovered from the
vertical coordinate (see :func:`compute_pkt`): TIE-GCM uses log-pressure
``p = p₀·exp(−ζ)``; WACCM-X uses the CAM hybrid sigma-pressure
``p = hyam·P0 + hybm·PS``. Both are supported.
"""
import logging
import numpy as np
from .containers import (
PlotData,
MODEL_DEFAULTS,
get_species_names,
cache_data_fn,
)
from .data_parse import get_mtime, level_log_transform, derive_aware
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants (CGS, matching tgcmproc denconv.F)
# ---------------------------------------------------------------------------
_BOLTZ_CGS = 1.3805e-16 # erg K⁻¹
_AMU_G = 1.66e-24 # g per atomic mass unit
_P0_TIEGCM_CGS = 5.0e-4 # dyn cm⁻² (TIE-GCM log-pressure reference)
SUPPORTED_DENSITY_UNITS = ('MMR', 'CM3', 'CM3-MR', 'GM/CM3')
# Role → molar mass (g/mol). Role keys match MODEL_DEFAULTS['species'].
# Species molar masses (g/mol). Ported from tgcmproc ``fset_known.F``
# (subroutine ``fset_known``), which registers each species' ``flds_known(n)%wt``:
# O2=32, O (O1)=16, NO=30, CO2=44, O3=48, HO2=33, NO2=46, OH=17, and N2=28 (a real
# table row, flagged derived from O2/O1). H is the standard atomic weight 1.008
# here vs the Fortran's rounded 1. These weights are the MMR -> number-density
# divisors. OX/NOZ/HOX are tgcmproc's effective *group* weights (fset_known.F
# %wt) for unit-converting the composition groups; they are NOT member-mass sums
# (e.g. NOZ=30 is NO's mass, not NO+NO2).
_SPECIES_MOLAR_MASS = {
'o': 16.00,
'o2': 32.00,
'n2': 28.00,
'no': 30.00,
'co2': 44.00,
'h': 1.008,
'o3': 48.00,
'ho2': 33.00,
'no2': 46.00,
'oh': 17.00,
'ox': 16.00, # group weight (tgcmproc fset_known.F)
'noz': 30.00, # group weight (NO's mass, not NO + NO2)
'hox': 17.00, # group weight
}
# Common unit-string aliases seen in model attrs.
_UNIT_ALIASES = {
'cm-3': 'CM3',
'cm^-3': 'CM3',
'1/cm3': 'CM3',
'molecules/cm3': 'CM3',
'mmr': 'MMR',
'kg/kg': 'MMR',
'g/g': 'MMR',
'vmr': 'CM3-MR',
'mol/mol': 'CM3-MR',
'mole fraction': 'CM3-MR',
'g/cm3': 'GM/CM3',
'g cm-3': 'GM/CM3',
}
def _normalize_unit(unit):
"""Canonicalise a density unit string to one of SUPPORTED_DENSITY_UNITS."""
if unit is None:
return None
key = unit.strip().lower()
if key in _UNIT_ALIASES:
return _UNIT_ALIASES[key]
upper = unit.strip().upper()
if upper in SUPPORTED_DENSITY_UNITS:
return upper
return unit # fall through; caller raises
[docs]
def get_species_molar_mass(model, variable_name):
"""Molar mass (g/mol) for a species variable name in the given model.
Resolves through :func:`get_species_names` so that model-specific
naming (e.g. TIE-GCM ``O1`` vs WACCM-X ``O``) is handled uniformly.
Raises:
ValueError: If the variable is not a known species in the model,
or no molar mass is registered for that species role.
"""
sp = get_species_names(model)
inverse = {v: k for k, v in sp.items()}
role = inverse.get(variable_name)
if role is None:
raise ValueError(
f"'{variable_name}' is not a recognised species for {model}. "
f"Known: {sorted(sp.values())}"
)
if role not in _SPECIES_MOLAR_MASS:
raise ValueError(
f"No molar mass registered for species role '{role}' "
f"({variable_name} in {model}). Add it to _SPECIES_MOLAR_MASS."
)
return _SPECIES_MOLAR_MASS[role]
[docs]
def compute_barm(o_mmr, o2_mmr):
"""Mean air molar mass (g/mol) from atomic-O and molecular-O₂ MMRs.
Ported from tgcmproc ``denconv.F`` (subroutine ``mkdenparms``, module
``den_convert``, standard/TIE-GCM branch)::
barm = 1 / (o2/32. + o1/16. + max(.00001, 1.-o2-o1) / 28.)
The residual ``(1 − O₂ − O)`` is treated as N₂ (molar mass 28); the
``.00001`` floor is the original Fortran guard for the upper thermosphere
where O₂ + O ≈ 1. (The planetary jtgcm/mtgcm/vtgcm barm variants are not
ported.)
"""
o_mmr = np.asarray(o_mmr, dtype=float)
o2_mmr = np.asarray(o2_mmr, dtype=float)
residual = np.maximum(1.0e-5, 1.0 - o2_mmr - o_mmr)
return 1.0 / (o2_mmr / 32.0 + o_mmr / 16.0 + residual / 28.0)
[docs]
def compute_pkt(levs, temperature, model='TIE-GCM', *,
hyam=None, hybm=None, p0=None, ps=None):
"""Air number density (cm⁻³) = pressure / (k_B · T), in CGS.
Ported from tgcmproc ``denconv.F`` (subroutine ``mkdenparms``; pkt formula
at line 68)::
pkt(:,k) = p0 * exp(-(zpb + (k-1)*dz)) / (boltz * TN), boltz = 1.3805e-16
The TIE-GCM log-pressure coordinate gives ``p = p0·exp(-ζ)`` with
``p0 = 5e-4`` (cgs; ``proc.F`` labels it "mb" but the value matches the
microbar = dyn cm⁻² convention). The WACCM-X hybrid-pressure branch below
is a gcmprocpy extension with no tgcmproc counterpart.
The pressure is recovered from the model's vertical coordinate, which
differs by model:
- **TIE-GCM** (log-pressure ``lev = ζ = ln(p₀/p)``)::
p = p₀ · exp(−ζ), p₀ = 5 × 10⁻⁴ dyn cm⁻²
``temperature`` lev axis must be axis 0 when multi-dimensional;
``lev`` broadcasts across the remaining axes.
- **WACCM-X** (CAM hybrid sigma-pressure)::
p(k,j,i) = hyam(k) · P0 + hybm(k) · PS(j,i) [Pa]
converted to dyn cm⁻² (×10). Requires ``hyam``/``hybm`` (on the
field's vertical coordinate) and surface pressure ``ps`` (lat, lon);
``p0`` defaults to 1 × 10⁵ Pa if not supplied. Returns a full
``(nlev, nlat, nlon)`` field since pressure varies horizontally.
Args:
levs: Vertical coordinate values, shape ``(nlev,)``.
temperature: Temperature in K.
model: ``'TIE-GCM'`` or ``'WACCM-X'``.
hyam, hybm: Hybrid A/B coefficients on the field's level coordinate
(WACCM-X only), shape ``(nlev,)``.
p0: Reference pressure in Pa (WACCM-X only; default 1e5).
ps: Surface pressure in Pa, shape ``(nlat, nlon)`` (WACCM-X only).
"""
t = np.asarray(temperature, dtype=float)
if model == 'TIE-GCM':
levs = np.asarray(levs, dtype=float)
# Broadcast lev (axis 0) across temperature's remaining axes.
shape = [1] * max(t.ndim, 1)
if t.ndim >= 1:
shape[0] = len(levs)
p = _P0_TIEGCM_CGS * np.exp(-levs.reshape(shape))
elif model == 'WACCM-X':
if hyam is None or hybm is None or ps is None:
raise ValueError(
"WACCM-X compute_pkt needs hyam, hybm, and ps (surface "
"pressure); pass them from the dataset."
)
hyam = np.asarray(hyam, dtype=float)
hybm = np.asarray(hybm, dtype=float)
ps = np.asarray(ps, dtype=float)
p0 = 1.0e5 if p0 is None else float(p0)
# p(lev, lat, lon) in Pa, then Pa -> dyn cm⁻² (1 Pa = 10 dyn cm⁻²).
p_pa = hyam[:, None, None] * p0 + hybm[:, None, None] * ps[None, :, :]
p = p_pa * 10.0
else:
raise NotImplementedError(
f"compute_pkt supports TIE-GCM and WACCM-X (got '{model}')."
)
return p / (_BOLTZ_CGS * t)
[docs]
def convert_density_units(values, from_unit, to_unit, *,
barm, pkt, molar_mass):
"""Convert a density field between MMR / CM3 / CM3-MR / GM/CM3.
Uses MMR as the pivot. Ported from tgcmproc ``denconv.F`` (subroutine
``denconv``, module ``den_convert``)::
MMR→CM3 = mmr * pkt * barm / w
MMR→CM3-MR = mmr * barm / w
MMR→GM/CM3 = mmr * pkt * barm * 1.66e-24
CM3→MMR = f * w / (pkt * barm)
where ``barm`` is the mean air molar mass and ``pkt`` the air number
density (:func:`compute_barm` / :func:`compute_pkt`), and ``w`` the species
molar mass. The reverse (non-MMR → MMR) directions are algebraic
inversions of the above (not written verbatim in the Fortran).
Args:
values: Array-like field to convert.
from_unit, to_unit: Source / target unit strings. Accepts
common aliases (``'cm-3'``, ``'kg/kg'``, ``'g/cm3'``...).
barm: Mean air molar mass (g/mol) broadcast-compatible with
*values*.
pkt: Air number density (cm⁻³), same broadcast shape.
molar_mass: Species molar mass (g/mol), scalar.
Returns:
ndarray with the same shape as *values*, in *to_unit*.
"""
fu = _normalize_unit(from_unit)
tu = _normalize_unit(to_unit)
if fu not in SUPPORTED_DENSITY_UNITS:
raise ValueError(
f"Unsupported from_unit '{from_unit}'. "
f"Supported: {SUPPORTED_DENSITY_UNITS} (aliases accepted)."
)
if tu not in SUPPORTED_DENSITY_UNITS:
raise ValueError(
f"Unsupported to_unit '{to_unit}'. "
f"Supported: {SUPPORTED_DENSITY_UNITS} (aliases accepted)."
)
values = np.asarray(values, dtype=float)
if fu == tu:
return values
w = float(molar_mass)
# Step 1: from_unit → MMR
if fu == 'MMR':
mmr = values
elif fu == 'CM3':
mmr = values * w / (pkt * barm)
elif fu == 'CM3-MR':
mmr = values * w / barm
elif fu == 'GM/CM3':
rho_air = pkt * barm * _AMU_G
mmr = values / rho_air
# Step 2: MMR → to_unit (formulas straight from denconv.F)
if tu == 'MMR':
return mmr
if tu == 'CM3':
return mmr * pkt * barm / w
if tu == 'CM3-MR':
return mmr * barm / w
if tu == 'GM/CM3':
return mmr * pkt * barm * _AMU_G
def _select_level(values, coord_vals, selected_lev_ilev):
"""Select a level from a ``(nlev, ...)`` array, mirroring batch_arr_lat_lon.
``'mean'`` averages over the level axis; an exact coordinate value selects
it; an out-of-range value clamps to the nearest end; otherwise the two
nearest levels are averaged. Done *after* unit conversion so a level mean
is the mean of the converted field (not a converted mean).
"""
if selected_lev_ilev is None:
return values
if selected_lev_ilev == 'mean':
return values.mean(axis=0)
sel = float(selected_lev_ilev)
coord_vals = np.asarray(coord_vals, dtype=float)
exact = np.where(coord_vals == sel)[0]
if exact.size:
return values[int(exact[0])]
if sel >= coord_vals.max():
return values[int(np.argmax(coord_vals))]
if sel <= coord_vals.min():
return values[int(np.argmin(coord_vals))]
order = np.argsort(np.abs(coord_vals - sel))
return (values[int(order[0])] + values[int(order[1])]) / 2.0
@derive_aware
@cache_data_fn
def arr_density(datasets, variable_name, time, *,
to_unit='CM3', from_unit=None, selected_lev_ilev=None,
log_level=True, **kwargs):
"""Extract a species field and convert it to the requested density unit.
Reads the species, temperature, and O / O₂ fields from the first
dataset that has *time*, computes BARM and pkt, and returns the
converted field.
Args:
datasets (list): :class:`ModelDataset` objects.
variable_name (str): Species variable name (e.g. ``'O2'``,
``'NO'``, ``'CO2'``).
time: ``numpy.datetime64`` or ISO-8601 string.
to_unit (str): Target unit. One of :data:`SUPPORTED_DENSITY_UNITS`
(aliases accepted). Default ``'CM3'``.
from_unit (str): Source unit. If ``None`` (default), reads it
from the dataset variable's ``units`` attribute.
selected_lev_ilev (float | str, optional): If given, return only this
level (or ``'mean'`` over levels), selected *after* conversion.
``None`` (default) returns the full vertical grid.
log_level (bool): Whether to log-transform level coordinates for
the returned :class:`PlotData`.
Returns:
PlotData: full ``(nlev, nlat, nlon)`` field in *to_unit*, or a
``(nlat, nlon)`` slice if *selected_lev_ilev* is given. ``None``
if no dataset contains *time*.
Raises:
ValueError: If required species/temperature/O/O₂ fields are missing,
the source unit cannot be determined, or (WACCM-X) the hybrid
pressure coefficients (hyam/hybm/PS) are absent.
"""
if isinstance(time, str):
time = np.datetime64(time, 'ns')
for mds in datasets:
if not mds.has_time(time):
continue
ds = mds.ds
sp = get_species_names(mds.model)
t_name, o_name, o2_name = sp['temp'], sp['o'], sp['o2']
required = [variable_name, t_name, o_name, o2_name]
missing = [r for r in required if r not in ds.variables]
if missing:
raise ValueError(
f"Density conversion requires {required} but dataset "
f"{mds.filename!r} is missing: {missing}"
)
molar_mass = get_species_molar_mass(mds.model, variable_name)
src_unit = from_unit if from_unit is not None else \
ds[variable_name].attrs.get('units')
if src_unit is None:
raise ValueError(
f"No 'units' attribute on {variable_name!r}; pass from_unit= "
f"explicitly."
)
selected_mtime = get_mtime(ds, time)
ds_t = ds.sel(time=time)
field = ds_t[variable_name].values.astype(float)
temp = ds_t[t_name].values.astype(float)
o_mmr = ds_t[o_name].values.astype(float)
o2_mmr = ds_t[o2_name].values.astype(float)
da = ds_t[variable_name]
if 'lev' in da.dims:
levs = ds_t.lev.values.astype(float)
elif 'ilev' in da.dims:
levs = ds_t.ilev.values.astype(float)
else:
raise ValueError(
f"'{variable_name}' has no lev/ilev coordinate for density "
f"conversion."
)
# BARM/pkt are only needed when the unit actually changes. An identity
# conversion (e.g. an already-cm-3 field requested as CM3) skips them —
# so it never requires WACCM-X hybrid coefficients for a no-op.
if _normalize_unit(src_unit) == _normalize_unit(to_unit):
barm = pkt = None
else:
barm = compute_barm(o_mmr, o2_mmr)
if mds.model == 'WACCM-X':
# Hybrid sigma-pressure: pressure (hence pkt) varies horizontally.
vdim = 'lev' if 'lev' in da.dims else 'ilev'
am, bm = ('hyam', 'hybm') if vdim == 'lev' else ('hyai', 'hybi')
for req in (am, bm, 'PS'):
if req not in ds.variables:
raise ValueError(
f"WACCM-X density conversion needs '{req}' in "
f"{mds.filename!r} (hybrid-pressure coordinate)."
)
p0 = float(ds_t['P0'].values) if 'P0' in ds.variables else None
pkt = compute_pkt(
levs, temp, model='WACCM-X',
hyam=ds_t[am].values.astype(float),
hybm=ds_t[bm].values.astype(float),
p0=p0, ps=ds_t['PS'].values.astype(float),
)
else:
pkt = compute_pkt(levs, temp, model=mds.model)
converted = convert_density_units(
field, src_unit, to_unit,
barm=barm, pkt=pkt, molar_mass=molar_mass,
)
long_name = ds[variable_name].attrs.get('long_name', variable_name)
if selected_lev_ilev is not None:
# Select the level AFTER conversion (so a 'mean' is the mean of the
# converted field, not a conversion of the mean).
sliced = _select_level(converted, levs, selected_lev_ilev)
return PlotData(
values=sliced,
variable_unit=_normalize_unit(to_unit),
variable_long_name=long_name,
model=mds.model,
filename=mds.filename,
lats=ds_t.lat.values.astype(float),
lons=ds_t.lon.values.astype(float),
selected_lev=selected_lev_ilev,
mtime=selected_mtime,
)
levs_display = level_log_transform(levs.copy(), mds.model, log_level)
return PlotData(
values=converted,
variable_unit=_normalize_unit(to_unit),
variable_long_name=long_name,
model=mds.model,
filename=mds.filename,
levs=levs_display,
lats=ds_t.lat.values.astype(float),
lons=ds_t.lon.values.astype(float),
mtime=selected_mtime,
)
logger.warning("%s not found in any dataset.", time)
return None