Source code for gcmprocpy.data_epflux

"""Eliassen-Palm flux calculations.

Port of tgcmproc ``epflux.F`` (B. Foster and Hanli Liu, 2-4/98).
Computes three derived diagnostic fields from 3-D wind and temperature:

- **EPVY** — meridional EP flux component (m² s⁻²)
- **EPVZ** — vertical EP flux component (m² s⁻²)
- **EPVDIV** — EP flux divergence / wave forcing (m s⁻¹ day⁻¹)

Reference:
    Volland, Hans, *Atmospheric Tidal and Planetary Waves*,
    Kluwer Academic Publishers (Norwell, MA), Section 2.7.
"""
import logging
import numpy as np
from .containers import PlotData, MODEL_DEFAULTS, get_species_names, register_derived
from .data_parse import get_mtime, level_log_transform
from .data_density import arr_density

logger = logging.getLogger(__name__)

# Physical constants (matching tgcmproc epflux.F)
_G = 9.8                             # gravity (m s⁻²)
_CP = 1004.0                         # specific heat at constant pressure (J kg⁻¹ K⁻¹)
_TS = 300.0                          # reference surface temperature (K)
_RE = 6.371e6                        # Earth radius (m)
_OMEGA = 2.0 * np.pi / 86400.0      # Earth rotation rate (rad s⁻¹)
_R = 287.0                           # gas constant for dry air (J kg⁻¹ K⁻¹)
_D2R = np.pi / 180.0                # degrees → radians
_H = _R * _TS / _G                  # reference scale height (m) ≈ 8786


def _interp_ilev_to_lev(w_ilev, ilev_vals, lev_vals):
    """Linearly interpolate a 3-D field from *ilev* to *lev* grid.

    Args:
        w_ilev: Array on interface levels, shape ``(nilev, nlat, nlon)``.
        ilev_vals: Interface level coordinate values, shape ``(nilev,)``.
        lev_vals: Midpoint level coordinate values, shape ``(nlev,)``.

    Returns:
        Array on midpoint levels, shape ``(nlev, nlat, nlon)``.
    """
    nlev = len(lev_vals)
    w_lev = np.empty((nlev,) + w_ilev.shape[1:])
    for k in range(nlev):
        idx = np.searchsorted(ilev_vals, lev_vals[k]) - 1
        idx = max(0, min(idx, len(ilev_vals) - 2))
        denom = ilev_vals[idx + 1] - ilev_vals[idx]
        if abs(denom) < 1e-30:
            alpha = 0.0
        else:
            alpha = (lev_vals[k] - ilev_vals[idx]) / denom
        alpha = max(0.0, min(1.0, alpha))
        w_lev[k] = (1.0 - alpha) * w_ilev[idx] + alpha * w_ilev[idx + 1]
    return w_lev


def _total_mass_density_kg_m3(mds, time):
    """Total air mass density (kg m⁻³) on the full grid for one dataset + time.

    Mirrors tgcmproc ``mkrhokg`` (``mkderived.F``): each major species (O, O₂,
    N₂) is converted to mass density with :func:`~gcmprocpy.data_density.arr_density`
    (``to_unit='GM/CM3'``) and summed, then g cm⁻³ → kg m⁻³ (×1000).  Routing
    through ``arr_density`` means the conversion is unit-aware (MMR / VMR / cm⁻³,
    read from each field's ``units`` attribute) and model-aware (TIE-GCM
    log-pressure, WACCM-X hybrid pressure), so the density correctly carries the
    ``pkt = p/(k_B·T)`` vertical falloff.  ``N2`` is auto-derived if absent.

    Raises:
        ValueError: if *time* is absent or a required species / hybrid-pressure
            coefficient is missing — the caller then falls back to the
            ideal-gas proxy.
    """
    sp = get_species_names(mds.model)
    rho_gcm3 = None
    for role in ('o', 'o2', 'n2'):
        pd = arr_density([mds], sp[role], time, to_unit='GM/CM3', log_level=False)
        if pd is None:
            raise ValueError(f"no data for species '{sp[role]}' at {time}")
        contrib = np.asarray(pd.values, dtype=float)
        rho_gcm3 = contrib if rho_gcm3 is None else rho_gcm3 + contrib
    return rho_gcm3 * 1.0e3  # g cm⁻³ → kg m⁻³


[docs] def epflux(temp, u, v, lats, levs, w=None, rho=None): """Compute Eliassen-Palm flux components. All inputs must be in SI units (K, m s⁻¹, degrees, dimensionless log-pressure levels). The calculation follows the quasi-geostrophic EP flux formulation: .. math:: S_y = -\\overline{u'v'} + \\frac{\\overline{v'T'}}{\\gamma} \\frac{\\partial \\bar{u}}{\\partial z} S_z = -\\left[\\overline{u'w'} + \\left(\\frac{1}{\\cos\\phi} \\frac{\\partial(\\bar{u}\\cos\\phi)}{\\partial y} - f\\right) \\frac{\\overline{v'T'}}{\\gamma}\\right] D = \\frac{1}{\\cos^2\\phi} \\frac{\\partial(\\cos^2\\phi \\, S_y)}{\\partial y} + \\frac{1}{\\bar{\\rho}} \\frac{\\partial(\\bar{\\rho} \\, S_z)}{\\partial z} where primes denote deviations from the zonal mean, overbars are zonal means, *γ* is the static stability, and *f* the Coriolis parameter. Ported from tgcmproc ``epflux.F`` (module ``epflux``; subroutines ``epfluxy``/``epfluxz``/``epfluxdiv`` with zonal-mean, ``gamma`` and ``zpht`` setup in ``save_epv``; B. Foster & H. Liu, 2-4/98). EPVY, EPVZ, γ, the reference scale height ``H = r·ts/g``, and the 3/12/98 sign fix (``Sy = -u'v' + (v'T'/γ)·∂u/∂z``, **plus** sign) follow the Fortran. The EPVDIV mass density is supplied by :func:`arr_epflux` (see its note). Args: temp: Temperature (K), shape ``(nlev, nlat, nlon)``. u: Zonal wind (m s⁻¹), same shape. v: Meridional wind (m s⁻¹), same shape. lats: Latitude array (degrees), shape ``(nlat,)``. levs: Log-pressure levels (dimensionless), shape ``(nlev,)``. w: Vertical wind (m s⁻¹), same shape as *temp*. Optional — required for EPVZ and EPVDIV. rho: Total mass density (kg m⁻³), same shape as *temp*. Optional — used as zonal mean for EPVDIV. If omitted, falls back to the ideal-gas proxy ρ ∝ exp(-lev)/T (constant mean molecular mass), which deviates from tgcmproc when M̄ varies with altitude (notably in the thermosphere). Returns: dict: Keys ``'EPVY'``, ``'EPVZ'``, ``'EPVDIV'``, each a ``(nlev, nlat)`` ndarray. EPVZ and EPVDIV are *None* if *w* is not provided. """ temp = np.asarray(temp, dtype=float) u = np.asarray(u, dtype=float) v = np.asarray(v, dtype=float) lats = np.asarray(lats, dtype=float) levs = np.asarray(levs, dtype=float) nlev, nlat, nlon = temp.shape # Heights from log-pressure (m) zpht = levs * _H # Latitude geometry lat_rad = lats * _D2R cos_lat = np.cos(lat_rad) # (nlat,) dlat = np.abs(lat_rad[1] - lat_rad[0]) # assume uniform spacing dy = _RE * dlat # meridional grid spacing (m) # ---- Zonal means (nlev, nlat) ---- tzm = temp.mean(axis=2) uzm = u.mean(axis=2) vzm = v.mean(axis=2) # ---- Eddy fields ---- t_prime = temp - tzm[:, :, np.newaxis] u_prime = u - uzm[:, :, np.newaxis] v_prime = v - vzm[:, :, np.newaxis] # ---- Static stability gamma (K m⁻¹) ---- # gamma = dTzm/dz + g*Tzm/(Ts*cp) gamma = np.zeros((nlev, nlat)) for k in range(1, nlev - 1): dz = zpht[k + 1] - zpht[k - 1] gamma[k, :] = ((tzm[k + 1, :] - tzm[k - 1, :]) / dz + _G * tzm[k, :] / (_TS * _CP)) gamma[0, :] = 2.0 * gamma[1, :] - gamma[2, :] gamma[-1, :] = 2.0 * gamma[-2, :] - gamma[-3, :] # ---- du_zm/dz (s⁻¹) ---- dudz = np.zeros((nlev, nlat)) for k in range(1, nlev - 1): dz = zpht[k + 1] - zpht[k - 1] dudz[k, :] = (uzm[k + 1, :] - uzm[k - 1, :]) / dz dudz[0, :] = 2.0 * dudz[1, :] - dudz[2, :] dudz[-1, :] = 2.0 * dudz[-2, :] - dudz[-3, :] # ---- Zonal-mean eddy covariances (nlev, nlat) ---- uv_bar = (u_prime * v_prime).mean(axis=2) vt_bar = (v_prime * t_prime).mean(axis=2) # ---- EPVY: meridional component (m² s⁻²) ---- epvy = -uv_bar + (vt_bar / gamma) * dudz epvz = None epvdiv = None if w is not None: w = np.asarray(w, dtype=float) wzm = w.mean(axis=2) w_prime = w - wzm[:, :, np.newaxis] uw_bar = (u_prime * w_prime).mean(axis=2) # ---- EPVZ: vertical component (m² s⁻²) ---- # Coriolis parameter f_cor = 2.0 * _OMEGA * np.sin(lat_rad) # (nlat,) # d(uzm*cos(lat))/dy / cos(lat) — centered differences over lat dudy = np.zeros((nlev, nlat)) for j in range(1, nlat - 1): dudy[:, j] = ((uzm[:, j + 1] * cos_lat[j + 1] - uzm[:, j - 1] * cos_lat[j - 1]) / (cos_lat[j] * 2.0 * dy)) # Extrapolate lat boundaries dudy[:, 0] = 2.0 * dudy[:, 1] - dudy[:, 2] dudy[:, -1] = 2.0 * dudy[:, -2] - dudy[:, -3] epvz = -(uw_bar + (dudy - f_cor[np.newaxis, :]) * vt_bar / gamma) # ---- EPVDIV: divergence / wave forcing (m s⁻¹ day⁻¹) ---- # Use provided mass density (matches tgcmproc mkrhokg) when # available; otherwise fall back to the ideal-gas proxy with # constant mean molecular mass. if rho is not None: rhozm = np.asarray(rho, dtype=float).mean(axis=2) # (nlev, nlat) else: rhozm = np.exp(-levs[:, np.newaxis]) / tzm epvdiv = np.zeros((nlev, nlat)) # Interior points only (j=1..nlat-2, k=1..nlev-2) for j in range(1, nlat - 1): # d(cos²(lat)*Sy)/dy / cos²(lat) depydy = ((cos_lat[j + 1]**2 * epvy[:, j + 1] - cos_lat[j - 1]**2 * epvy[:, j - 1]) / (cos_lat[j]**2 * 2.0 * dy)) # d(rho*Sz)/dz / rho — computed at interior levels depzdz = np.zeros(nlev) for k in range(1, nlev - 1): dz = zpht[k + 1] - zpht[k - 1] depzdz[k] = ((rhozm[k + 1, j] * epvz[k + 1, j] - rhozm[k - 1, j] * epvz[k - 1, j]) / (rhozm[k, j] * dz)) # Extrapolate vertical boundaries depzdz[0] = 2.0 * depzdz[1] - depzdz[2] depzdz[-1] = 2.0 * depzdz[-2] - depzdz[-3] epvdiv[:, j] = (depydy + depzdz) * 86400.0 # → m/s/day # Extrapolate latitude boundaries epvdiv[:, 0] = 2.0 * epvdiv[:, 1] - epvdiv[:, 2] epvdiv[:, -1] = 2.0 * epvdiv[:, -2] - epvdiv[:, -3] return {'EPVY': epvy, 'EPVZ': epvz, 'EPVDIV': epvdiv}
# Variable-name metadata for each component _COMPONENT_META = { 'EPVY': ('m² s⁻²', 'EP Flux (meridional)'), 'EPVZ': ('m² s⁻²', 'EP Flux (vertical)'), 'EPVDIV': ('m s⁻¹ day⁻¹', 'EP Flux Divergence'), }
[docs] def arr_epflux(datasets, component, time, log_level=True): """Compute an EP flux component from model datasets. Extracts temperature and wind fields at the requested time, converts units to SI, and calls :func:`epflux`. Args: datasets (list): List of :class:`ModelDataset` objects. component (str): ``'EPVY'``, ``'EPVZ'``, or ``'EPVDIV'``. time: Timestamp (``numpy.datetime64`` or ISO string). log_level (bool): Whether to log-transform level coordinates for the output PlotData. Returns: PlotData: Result with shape ``(nlev, nlat)`` suitable for :func:`plt_lev_lat`. Note: EPVDIV needs the zonal-mean total mass density ``rhozm``. Following tgcmproc ``mkrhokg`` (``mkderived.F``), it is built by summing the major species (O, O₂, N₂) converted to mass density via :func:`arr_density` (``to_unit='GM/CM3'``, ×1000 → kg m⁻³), so it carries the ``pkt = p/(k_B·T)`` vertical falloff — for both TIE-GCM and WACCM-X and for any source unit (MMR / VMR / cm⁻³). If the required species (or, for WACCM-X, the hybrid-pressure coefficients ``hyam``/``hybm``/``PS``) are unavailable, it falls back to the ideal-gas proxy ``ρ ∝ exp(-lev)/T``. Raises: ValueError: If *component* is invalid or required variables are missing from the datasets. """ component = component.upper() if component not in _COMPONENT_META: raise ValueError( f"component must be one of {list(_COMPONENT_META)}, got '{component}'" ) if isinstance(time, str): time = np.datetime64(time, 'ns') for mds in datasets: if not mds.has_time(time): continue ds = mds.ds # ---- Model variable names from MODEL_DEFAULTS ---- defaults = MODEL_DEFAULTS[mds.model] sp = get_species_names(mds.model) t_name = sp['temp'] u_name = defaults['wind_u'] v_name = defaults['wind_v'] wind_scale = defaults['wind_scale'] w_name = 'W' # EP flux uses log-pressure W, not wind_w (WN/OMEGA) # Check required variables exist required = [t_name, u_name, v_name] missing = [r for r in required if r not in ds.variables] if missing: raise ValueError( f"EP flux requires {required} but dataset is missing: {missing}" ) selected_mtime = get_mtime(ds, time) ds_t = ds.sel(time=time) # ---- Extract 3-D fields on lev grid ---- temp = ds_t[t_name].values.astype(float) # (nlev, nlat, nlon) u_raw = ds_t[u_name].values.astype(float) v_raw = ds_t[v_name].values.astype(float) lats = ds_t.lat.values.astype(float) try: levs = ds_t[t_name].lev.values.astype(float) except AttributeError: levs = ds_t[t_name].ilev.values.astype(float) # Remove all-NaN levels not_nan = ~np.isnan(temp).all(axis=(1, 2)) temp = temp[not_nan] u_raw = u_raw[not_nan] v_raw = v_raw[not_nan] levs_raw = levs[not_nan] # Convert horizontal winds to m/s u_si = u_raw * wind_scale v_si = v_raw * wind_scale # ---- Extract W if needed ---- w_si = None need_w = component in ('EPVZ', 'EPVDIV') if need_w: # TIE-GCM 3.0 renamed W → OMEGA; check both if w_name not in ds.variables: if 'OMEGA' in ds.variables: w_name = 'OMEGA' else: raise ValueError( f"Cannot compute {component}: vertical wind " f"(W or OMEGA) not in dataset. Only EPVY is available." ) w_data = ds_t[w_name] w_raw = w_data.values.astype(float) # W may be on ilev (interface levels) — interpolate to lev if 'ilev' in w_data.dims: ilev_vals = ds_t.ilev.values.astype(float) w_raw = _interp_ilev_to_lev(w_raw, ilev_vals, levs) # Apply same NaN mask w_raw = w_raw[not_nan] # TIE-GCM W is d(zp)/dt in s⁻¹, so w_geom ≈ H * W # WACCM-X W is already in m/s if mds.model == 'TIE-GCM': w_si = w_raw * _H else: w_si = w_raw # ---- Mass density (kg m⁻³) for EPVDIV (matches tgcmproc mkrhokg) ---- # Sum the major species converted to GM/CM3 via arr_density, which is # unit-aware (MMR / VMR / cm⁻³) and model-aware (TIE-GCM log-pressure, # WACCM-X hybrid pressure), so rho carries the pkt = p/(k_B·T) vertical # falloff. Fall back to epflux's ideal-gas proxy if it cannot be built. rho_si = None if component == 'EPVDIV': try: rho_full = _total_mass_density_kg_m3(mds, time) rho_si = rho_full[not_nan] except (ValueError, KeyError) as exc: logger.warning( "EPVDIV: could not build mass density for %s (%s); using " "ideal-gas ρ ∝ exp(-lev)/T proxy (may differ from tgcmproc).", mds.filename, exc, ) # ---- Compute EP flux ---- result = epflux(temp, u_si, v_si, lats, levs_raw, w=w_si, rho=rho_si) values = result[component] if values is None: raise ValueError( f"Cannot compute {component}: vertical wind not available" ) # Transform levels for display levs_display = level_log_transform(levs_raw.copy(), mds.model, log_level) unit, long_name = _COMPONENT_META[component] return PlotData( values=values, lats=lats, levs=levs_display, variable_unit=unit, variable_long_name=long_name, mtime=selected_mtime, model=mds.model, filename=mds.filename, ) logger.warning(f"{time} not found.") return None
register_derived('EPVY', arr_epflux, plot_types={'lev_lat'}) register_derived('EPVZ', arr_epflux, plot_types={'lev_lat'}) register_derived('EPVDIV', arr_epflux, plot_types={'lev_lat'})