Source code for gcmprocpy.data_epflux

"""Eliassen-Palm flux calculations.

Port of tgcmproc ``epflux.F`` (B. Foster and Hanli Liu, 2-4/98).
Computes three derived diagnostic fields from 3-D wind and temperature:

- **EPVY** — meridional EP flux component (m² s⁻²)
- **EPVZ** — vertical EP flux component (m² s⁻²)
- **EPVDIV** — EP flux divergence / wave forcing (m s⁻¹ day⁻¹)

Reference:
    Volland, Hans, *Atmospheric Tidal and Planetary Waves*,
    Kluwer Academic Publishers (Norwell, MA), Section 2.7.
"""
import logging
import numpy as np
from .containers import PlotData, MODEL_DEFAULTS, get_species_names, register_derived
from .data_parse import get_mtime, level_log_transform

logger = logging.getLogger(__name__)

# Physical constants (matching tgcmproc epflux.F)
_G = 9.8                             # gravity (m s⁻²)
_CP = 1004.0                         # specific heat at constant pressure (J kg⁻¹ K⁻¹)
_TS = 300.0                          # reference surface temperature (K)
_RE = 6.371e6                        # Earth radius (m)
_OMEGA = 2.0 * np.pi / 86400.0      # Earth rotation rate (rad s⁻¹)
_R = 287.0                           # gas constant for dry air (J kg⁻¹ K⁻¹)
_D2R = np.pi / 180.0                # degrees → radians
_H = _R * _TS / _G                  # reference scale height (m) ≈ 8786
_AMU = 1.66053907e-27               # atomic mass unit (kg)
_M_O, _M_O2, _M_N2 = 16.0, 32.0, 28.0  # atomic / molecular masses (u)


def _interp_ilev_to_lev(w_ilev, ilev_vals, lev_vals):
    """Linearly interpolate a 3-D field from *ilev* to *lev* grid.

    Args:
        w_ilev: Array on interface levels, shape ``(nilev, nlat, nlon)``.
        ilev_vals: Interface level coordinate values, shape ``(nilev,)``.
        lev_vals: Midpoint level coordinate values, shape ``(nlev,)``.

    Returns:
        Array on midpoint levels, shape ``(nlev, nlat, nlon)``.
    """
    nlev = len(lev_vals)
    w_lev = np.empty((nlev,) + w_ilev.shape[1:])
    for k in range(nlev):
        idx = np.searchsorted(ilev_vals, lev_vals[k]) - 1
        idx = max(0, min(idx, len(ilev_vals) - 2))
        denom = ilev_vals[idx + 1] - ilev_vals[idx]
        if abs(denom) < 1e-30:
            alpha = 0.0
        else:
            alpha = (lev_vals[k] - ilev_vals[idx]) / denom
        alpha = max(0.0, min(1.0, alpha))
        w_lev[k] = (1.0 - alpha) * w_ilev[idx] + alpha * w_ilev[idx + 1]
    return w_lev


[docs] def epflux(temp, u, v, lats, levs, w=None, rho=None): """Compute Eliassen-Palm flux components. All inputs must be in SI units (K, m s⁻¹, degrees, dimensionless log-pressure levels). The calculation follows the quasi-geostrophic EP flux formulation: .. math:: S_y = -\\overline{u'v'} + \\frac{\\overline{v'T'}}{\\gamma} \\frac{\\partial \\bar{u}}{\\partial z} S_z = -\\left[\\overline{u'w'} + \\left(\\frac{1}{\\cos\\phi} \\frac{\\partial(\\bar{u}\\cos\\phi)}{\\partial y} - f\\right) \\frac{\\overline{v'T'}}{\\gamma}\\right] D = \\frac{1}{\\cos^2\\phi} \\frac{\\partial(\\cos^2\\phi \\, S_y)}{\\partial y} + \\frac{1}{\\bar{\\rho}} \\frac{\\partial(\\bar{\\rho} \\, S_z)}{\\partial z} where primes denote deviations from the zonal mean, overbars are zonal means, *γ* is the static stability, and *f* the Coriolis parameter. Args: temp: Temperature (K), shape ``(nlev, nlat, nlon)``. u: Zonal wind (m s⁻¹), same shape. v: Meridional wind (m s⁻¹), same shape. lats: Latitude array (degrees), shape ``(nlat,)``. levs: Log-pressure levels (dimensionless), shape ``(nlev,)``. w: Vertical wind (m s⁻¹), same shape as *temp*. Optional — required for EPVZ and EPVDIV. rho: Total mass density (kg m⁻³), same shape as *temp*. Optional — used as zonal mean for EPVDIV. If omitted, falls back to the ideal-gas proxy ρ ∝ exp(-lev)/T (constant mean molecular mass), which deviates from tgcmproc when M̄ varies with altitude (notably in the thermosphere). Returns: dict: Keys ``'EPVY'``, ``'EPVZ'``, ``'EPVDIV'``, each a ``(nlev, nlat)`` ndarray. EPVZ and EPVDIV are *None* if *w* is not provided. """ temp = np.asarray(temp, dtype=float) u = np.asarray(u, dtype=float) v = np.asarray(v, dtype=float) lats = np.asarray(lats, dtype=float) levs = np.asarray(levs, dtype=float) nlev, nlat, nlon = temp.shape # Heights from log-pressure (m) zpht = levs * _H # Latitude geometry lat_rad = lats * _D2R cos_lat = np.cos(lat_rad) # (nlat,) dlat = np.abs(lat_rad[1] - lat_rad[0]) # assume uniform spacing dy = _RE * dlat # meridional grid spacing (m) # ---- Zonal means (nlev, nlat) ---- tzm = temp.mean(axis=2) uzm = u.mean(axis=2) vzm = v.mean(axis=2) # ---- Eddy fields ---- t_prime = temp - tzm[:, :, np.newaxis] u_prime = u - uzm[:, :, np.newaxis] v_prime = v - vzm[:, :, np.newaxis] # ---- Static stability gamma (K m⁻¹) ---- # gamma = dTzm/dz + g*Tzm/(Ts*cp) gamma = np.zeros((nlev, nlat)) for k in range(1, nlev - 1): dz = zpht[k + 1] - zpht[k - 1] gamma[k, :] = ((tzm[k + 1, :] - tzm[k - 1, :]) / dz + _G * tzm[k, :] / (_TS * _CP)) gamma[0, :] = 2.0 * gamma[1, :] - gamma[2, :] gamma[-1, :] = 2.0 * gamma[-2, :] - gamma[-3, :] # ---- du_zm/dz (s⁻¹) ---- dudz = np.zeros((nlev, nlat)) for k in range(1, nlev - 1): dz = zpht[k + 1] - zpht[k - 1] dudz[k, :] = (uzm[k + 1, :] - uzm[k - 1, :]) / dz dudz[0, :] = 2.0 * dudz[1, :] - dudz[2, :] dudz[-1, :] = 2.0 * dudz[-2, :] - dudz[-3, :] # ---- Zonal-mean eddy covariances (nlev, nlat) ---- uv_bar = (u_prime * v_prime).mean(axis=2) vt_bar = (v_prime * t_prime).mean(axis=2) # ---- EPVY: meridional component (m² s⁻²) ---- epvy = -uv_bar + (vt_bar / gamma) * dudz epvz = None epvdiv = None if w is not None: w = np.asarray(w, dtype=float) wzm = w.mean(axis=2) w_prime = w - wzm[:, :, np.newaxis] uw_bar = (u_prime * w_prime).mean(axis=2) # ---- EPVZ: vertical component (m² s⁻²) ---- # Coriolis parameter f_cor = 2.0 * _OMEGA * np.sin(lat_rad) # (nlat,) # d(uzm*cos(lat))/dy / cos(lat) — centered differences over lat dudy = np.zeros((nlev, nlat)) for j in range(1, nlat - 1): dudy[:, j] = ((uzm[:, j + 1] * cos_lat[j + 1] - uzm[:, j - 1] * cos_lat[j - 1]) / (cos_lat[j] * 2.0 * dy)) # Extrapolate lat boundaries dudy[:, 0] = 2.0 * dudy[:, 1] - dudy[:, 2] dudy[:, -1] = 2.0 * dudy[:, -2] - dudy[:, -3] epvz = -(uw_bar + (dudy - f_cor[np.newaxis, :]) * vt_bar / gamma) # ---- EPVDIV: divergence / wave forcing (m s⁻¹ day⁻¹) ---- # Use provided mass density (matches tgcmproc mkrhokg) when # available; otherwise fall back to the ideal-gas proxy with # constant mean molecular mass. if rho is not None: rhozm = np.asarray(rho, dtype=float).mean(axis=2) # (nlev, nlat) else: rhozm = np.exp(-levs[:, np.newaxis]) / tzm epvdiv = np.zeros((nlev, nlat)) # Interior points only (j=1..nlat-2, k=1..nlev-2) for j in range(1, nlat - 1): # d(cos²(lat)*Sy)/dy / cos²(lat) depydy = ((cos_lat[j + 1]**2 * epvy[:, j + 1] - cos_lat[j - 1]**2 * epvy[:, j - 1]) / (cos_lat[j]**2 * 2.0 * dy)) # d(rho*Sz)/dz / rho — computed at interior levels depzdz = np.zeros(nlev) for k in range(1, nlev - 1): dz = zpht[k + 1] - zpht[k - 1] depzdz[k] = ((rhozm[k + 1, j] * epvz[k + 1, j] - rhozm[k - 1, j] * epvz[k - 1, j]) / (rhozm[k, j] * dz)) # Extrapolate vertical boundaries depzdz[0] = 2.0 * depzdz[1] - depzdz[2] depzdz[-1] = 2.0 * depzdz[-2] - depzdz[-3] epvdiv[:, j] = (depydy + depzdz) * 86400.0 # → m/s/day # Extrapolate latitude boundaries epvdiv[:, 0] = 2.0 * epvdiv[:, 1] - epvdiv[:, 2] epvdiv[:, -1] = 2.0 * epvdiv[:, -2] - epvdiv[:, -3] return {'EPVY': epvy, 'EPVZ': epvz, 'EPVDIV': epvdiv}
# Variable-name metadata for each component _COMPONENT_META = { 'EPVY': ('m² s⁻²', 'EP Flux (meridional)'), 'EPVZ': ('m² s⁻²', 'EP Flux (vertical)'), 'EPVDIV': ('m s⁻¹ day⁻¹', 'EP Flux Divergence'), }
[docs] def arr_epflux(datasets, component, time, log_level=True): """Compute an EP flux component from model datasets. Extracts temperature and wind fields at the requested time, converts units to SI, and calls :func:`epflux`. Args: datasets (list): List of :class:`ModelDataset` objects. component (str): ``'EPVY'``, ``'EPVZ'``, or ``'EPVDIV'``. time: Timestamp (``numpy.datetime64`` or ISO string). log_level (bool): Whether to log-transform level coordinates for the output PlotData. Returns: PlotData: Result with shape ``(nlev, nlat)`` suitable for :func:`plt_lev_lat`. Raises: ValueError: If *component* is invalid or required variables are missing from the datasets. """ component = component.upper() if component not in _COMPONENT_META: raise ValueError( f"component must be one of {list(_COMPONENT_META)}, got '{component}'" ) if isinstance(time, str): time = np.datetime64(time, 'ns') for mds in datasets: if not mds.has_time(time): continue ds = mds.ds # ---- Model variable names from MODEL_DEFAULTS ---- defaults = MODEL_DEFAULTS[mds.model] sp = get_species_names(mds.model) t_name = sp['temp'] u_name = defaults['wind_u'] v_name = defaults['wind_v'] wind_scale = defaults['wind_scale'] w_name = 'W' # EP flux uses log-pressure W, not wind_w (WN/OMEGA) # Check required variables exist required = [t_name, u_name, v_name] missing = [r for r in required if r not in ds.variables] if missing: raise ValueError( f"EP flux requires {required} but dataset is missing: {missing}" ) selected_mtime = get_mtime(ds, time) ds_t = ds.sel(time=time) # ---- Extract 3-D fields on lev grid ---- temp = ds_t[t_name].values.astype(float) # (nlev, nlat, nlon) u_raw = ds_t[u_name].values.astype(float) v_raw = ds_t[v_name].values.astype(float) lats = ds_t.lat.values.astype(float) try: levs = ds_t[t_name].lev.values.astype(float) except AttributeError: levs = ds_t[t_name].ilev.values.astype(float) # Remove all-NaN levels not_nan = ~np.isnan(temp).all(axis=(1, 2)) temp = temp[not_nan] u_raw = u_raw[not_nan] v_raw = v_raw[not_nan] levs_raw = levs[not_nan] # Convert horizontal winds to m/s u_si = u_raw * wind_scale v_si = v_raw * wind_scale # ---- Extract W if needed ---- w_si = None need_w = component in ('EPVZ', 'EPVDIV') if need_w: # TIE-GCM 3.0 renamed W → OMEGA; check both if w_name not in ds.variables: if 'OMEGA' in ds.variables: w_name = 'OMEGA' else: raise ValueError( f"Cannot compute {component}: vertical wind " f"(W or OMEGA) not in dataset. Only EPVY is available." ) w_data = ds_t[w_name] w_raw = w_data.values.astype(float) # W may be on ilev (interface levels) — interpolate to lev if 'ilev' in w_data.dims: ilev_vals = ds_t.ilev.values.astype(float) w_raw = _interp_ilev_to_lev(w_raw, ilev_vals, levs) # Apply same NaN mask w_raw = w_raw[not_nan] # TIE-GCM W is d(zp)/dt in s⁻¹, so w_geom ≈ H * W # WACCM-X W is already in m/s if mds.model == 'TIE-GCM': w_si = w_raw * _H else: w_si = w_raw # ---- Mass density (kg m⁻³) for EPVDIV (matches tgcmproc mkrhokg) ---- rho_si = None if component == 'EPVDIV': o_name, o2_name, n2_name = sp['o'], sp['o2'], sp['n2'] if all(n in ds.variables for n in (o_name, o2_name, n2_name)): n_o = ds_t[o_name].values.astype(float) n_o2 = ds_t[o2_name].values.astype(float) n_n2 = ds_t[n2_name].values.astype(float) # Number density (cm⁻³) → mass density (kg m⁻³): # ρ = Σ Mᵢ · nᵢ · m_u · 1e6 (cm³→m³) rho_full = (_M_O * n_o + _M_O2 * n_o2 + _M_N2 * n_n2) * _AMU * 1.0e6 rho_si = rho_full[not_nan] else: logger.warning( "EPVDIV: %s missing one of (%s, %s, %s); using ideal-gas " "ρ ∝ exp(-lev)/T proxy (results may differ from tgcmproc).", mds.filename, o_name, o2_name, n2_name, ) # ---- Compute EP flux ---- result = epflux(temp, u_si, v_si, lats, levs_raw, w=w_si, rho=rho_si) values = result[component] if values is None: raise ValueError( f"Cannot compute {component}: vertical wind not available" ) # Transform levels for display levs_display = level_log_transform(levs_raw.copy(), mds.model, log_level) unit, long_name = _COMPONENT_META[component] return PlotData( values=values, lats=lats, levs=levs_display, variable_unit=unit, variable_long_name=long_name, mtime=selected_mtime, model=mds.model, filename=mds.filename, ) logger.warning(f"{time} not found.") return None
register_derived('EPVY', arr_epflux, plot_types={'lev_lat'}) register_derived('EPVZ', arr_epflux, plot_types={'lev_lat'}) register_derived('EPVDIV', arr_epflux, plot_types={'lev_lat'})