Skip to content

depsi.atmosphere_estimation

depsi.atmosphere_estimation.estimate_atmosphere_phase(stm, prediction_coords=None, psc_phase_residuals='psc_phase_residuals', atmosphere_mother='atmosphere_mother', unmodeled_displacement_args=None, kriging_args=None, stm_coords_metadata={'mode': 'euclidean', 'x_label': 'x', 'y_label': 'y'}, prediction_coords_metadata={'mode': 'euclidean', 'x_label': 'x', 'y_label': 'y'})

Estimate the atmosphere phase.

This function applies a temporal filter to extract the high-frequency atmospheric signal and then uses spatial kriging to predict the atmospheric phase per epoch.

Parameters:

Name Type Description Default
stm Dataset

The STM with unwrapped phases.

required
prediction_coords Dataset | DataArray

The prediction coordinates on which to interpolate the atmosphere phase. It should have coordinates 'x' and 'y'. If None, the coordinates from the stm Dataset will be used.

None
psc_phase_residuals

The name of the variable in the stm Dataset that contains the PSC phase residuals. Default is "psc_phase_residuals". This variable is used to estimate the unmodeled displacement and in kriging variogram.

'psc_phase_residuals'
atmosphere_mother int | str

A string indicating the name of the variable in the stm Dataset that contains the atmosphere mother or an integer indicating the time index of the atmosphere.

'atmosphere_mother'
unmodeled_displacement_args dict

Keyword arguments for the estimate_unmodeled_displacement function. The allowed keys are: filter_length, sampling_rate, filter_type. For example: {"filter_length": 1, "sampling_rate": 1000, "filter_type": "gaussian"}. See the documentation of estimate_unmodeled_displacement for more details. The argument unmodeled_displacement_args can be left empty. Then default parameters i.e. {"filter_length": 1, "sampling_rate": 1000, "filter_type": "gaussian"} will be used.

None
kriging_args dict

Additional keyword arguments to pass to the function solve_kriging_per_single_time. Allowed keys are: "n_nearest_neighbors", "kriging_backend", "empirical_variogram_args", "variogram_args", See the documentation of solve_kriging_per_single_time for more details.

None
stm_coords_metadata Mapping[str, str]

The metadata for the stm coordinates. The mode can be "euclidean" or "geographic". If "geographic", the coordinates are assumed to be in degrees (latitude and longitude), and we convert them to metric units using an appropriate projection. Default is "euclidean".

{'mode': 'euclidean', 'x_label': 'x', 'y_label': 'y'}
prediction_coords_metadata Mapping[str, str]

The metadata for the prediction coordinates. The mode can be "euclidean" or "geographic". If "geographic", the coordinates are assumed to be in degrees (latitude and longitude), and we convert them to metric units using an appropriate projection. Default is "euclidean".

{'mode': 'euclidean', 'x_label': 'x', 'y_label': 'y'}

Returns:

Type Description
Dataset

The predicted atmosphere phase and the associated variance (sigmasq) for each time step.

Source code in depsi/atmosphere_estimation.py
def estimate_atmosphere_phase(
    stm: xr.Dataset,
    prediction_coords: xr.Dataset | xr.DataArray = None,
    psc_phase_residuals="psc_phase_residuals",
    atmosphere_mother: int | str = "atmosphere_mother",
    unmodeled_displacement_args: dict = None,
    kriging_args: dict = None,
    stm_coords_metadata: Mapping[str, str] = {"mode": "euclidean", "x_label": "x", "y_label": "y"},
    prediction_coords_metadata: Mapping[str, str] = {"mode": "euclidean", "x_label": "x", "y_label": "y"},
) -> xr.Dataset:
    """Estimate the atmosphere phase.

    This function applies a temporal filter to extract the high-frequency
    atmospheric signal and then uses spatial kriging to predict the atmospheric
    phase per epoch.

    Parameters
    ----------
    stm: xr.Dataset
        The STM with unwrapped phases.
    prediction_coords: xr.Dataset | xr.DataArray
        The prediction coordinates on which to interpolate the atmosphere phase.
        It should have coordinates 'x' and 'y'. If None, the coordinates from
        the stm Dataset will be used.
    psc_phase_residuals: str
        The name of the variable in the stm Dataset that contains the PSC phase
        residuals. Default is "psc_phase_residuals". This variable is used to
        estimate the unmodeled displacement and in kriging variogram.
    atmosphere_mother: int | str
        A string indicating the name of the variable in the stm Dataset
        that contains the atmosphere mother or an integer indicating the time
        index of the atmosphere.
    unmodeled_displacement_args: dict
        Keyword arguments for the `estimate_unmodeled_displacement` function.
        The allowed keys are: `filter_length`, `sampling_rate`, `filter_type`.
        For example: {"filter_length": 1, "sampling_rate": 1000, "filter_type":
        "gaussian"}. See the documentation of `estimate_unmodeled_displacement`
        for more details. The argument `unmodeled_displacement_args` can be left
        empty. Then default parameters i.e. {"filter_length": 1,
        "sampling_rate": 1000, "filter_type": "gaussian"} will be used.
    kriging_args: dict
        Additional keyword arguments to pass to the function
        `solve_kriging_per_single_time`. Allowed keys are:
        "n_nearest_neighbors", "kriging_backend", "empirical_variogram_args",
        "variogram_args", See the documentation of
        `solve_kriging_per_single_time` for more details.
    stm_coords_metadata: dict = {"mode": "euclidean", "x_label": "x", "y_label": "y"},
        The metadata for the stm coordinates.
        The mode can be "euclidean" or "geographic". If "geographic",
        the coordinates are assumed to be in degrees (latitude and longitude),
        and we convert them to metric units using an appropriate projection.
        Default is "euclidean".
    prediction_coords_metadata: dict = {"mode": "euclidean", "x_label": "x", "y_label": "y"},
        The metadata for the prediction coordinates. The mode can be "euclidean"
        or "geographic". If "geographic", the coordinates are assumed to be in
        degrees (latitude and longitude), and we convert them to metric units
        using an appropriate projection. Default is "euclidean".

    Returns
    -------
    xr.Dataset
        The predicted atmosphere phase and the associated variance (sigmasq) for
        each time step.
    """
    # Check prediction coordinates
    if prediction_coords is None:
        prediction_coords = xr.Dataset(coords=stm.isel(time=0).coords)
        prediction_coords_metadata = stm_coords_metadata

    # Fix coordinates
    stm = _fix_coords(stm, stm_coords_metadata)
    prediction_coords = _fix_coords(prediction_coords, prediction_coords_metadata)

    # Step 1: Apply temporal filtering to extract high-frequency atmospheric signal
    if not unmodeled_displacement_args:
        unmodeled_displacement_args = {}
    unmodeled_disp = estimate_unmodeled_displacement(
        psc_phase_residuals=stm[psc_phase_residuals],
        baseline_years=stm["time"],
        **unmodeled_displacement_args,
    )

    # Add results to stm
    stm["unmodeled_disp"] = unmodeled_disp

    # Get atmosphere mother
    if isinstance(atmosphere_mother, str):
        if atmosphere_mother not in stm:
            raise ValueError(f"atmosphere_mother '{atmosphere_mother}' not found in stm Dataset.")
        atmosphere_mother = stm[atmosphere_mother]
    else:
        atmosphere_mother = stm.isel(time=atmosphere_mother)[psc_phase_residuals]

    # Estimate atmosphere phase
    stm["atmosphere_estimates"] = stm[psc_phase_residuals] - stm["unmodeled_disp"] + atmosphere_mother

    # Step 2: Apply spatial kriging to predict atmospheric phase per epoch
    if not kriging_args:
        kriging_args = {}
    predicted_atmosphere = solve_kriging(
        ps_atmosphere=stm["atmosphere_estimates"],
        prediction_coords=prediction_coords,
        **kriging_args,
    )
    # rename variables "predicted" to "atmosphere_predicted" and "sigmasq" to "atmosphere_sigmasq"
    predicted_atmosphere = predicted_atmosphere.rename(
        {"predicted": "atmosphere_predicted", "sigmasq": "atmosphere_sigmasq"}
    )
    return predicted_atmosphere

depsi.atmosphere_estimation.estimate_unmodeled_displacement(psc_phase_residuals, baseline_years, filter_length=1, sampling_rate=1000, filter_type='gaussian')

Apply a low-pass filter to the time series to estimate the unmodeled deformation.

Parameters:

Name Type Description Default
psc_phase_residuals DataArray

The PSC time series residuals to apply the filter to.

required
baseline_years DataArray

The baseline years corresponding to the time series.

required
filter_length int

Length of the filter (year) to apply a low-pass filter to the time series. Default is 1.

1
sampling_rate int

Sampling rate of the time series, default is 1000.

1000
filter_type

Method to use for building the window , e.g. 'block', 'triangle', or 'gaussian', default is 'gaussian', see scipy.signal.windows for more.

'gaussian'

Returns:

Type Description
DataArray

The unmodeled deformation estimated from the time series.

Source code in depsi/atmosphere_estimation.py
def estimate_unmodeled_displacement(
    psc_phase_residuals: xr.DataArray,
    baseline_years: xr.DataArray,
    filter_length: int = 1,
    sampling_rate: int = 1000,
    filter_type="gaussian",
) -> xr.DataArray:
    """Apply a low-pass filter to the time series to estimate the unmodeled deformation.

    Parameters
    ----------
    psc_phase_residuals: xr.DataArray
        The PSC time series residuals to apply the filter to.
    baseline_years: xr.DataArray
        The baseline years corresponding to the time series.
    filter_length: int
        Length of the filter (year) to apply a low-pass filter to the time series. Default is 1.
    sampling_rate: int, optional.
        Sampling rate of the time series, default is 1000.
    filter_type: str, optional
        Method to use for building the window , e.g. 'block', 'triangle', or
        'gaussian', default is 'gaussian', see `scipy.signal.windows` for more.

    Returns
    -------
    xr.DataArray
        The unmodeled deformation estimated from the time series.
    """
    # Check if baseline_years size is equal to psc_phase_residuals size
    if baseline_years.size != psc_phase_residuals["time"].size:
        raise ValueError("The size of baseline_years must match the time dimension of psc_phase_residuals.")
    # Check baseline_years is monotonic
    is_monotonic_increasing = np.all(np.diff(baseline_years.values) >= 0)
    is_monotonic_decreasing = np.all(np.diff(baseline_years.values) <= 0)
    if not (is_monotonic_increasing or is_monotonic_decreasing):
        raise ValueError("baseline_years must be monotonic.")

    # Build the window for the low-pass filter
    baseline_scaled = baseline_years * sampling_rate
    timespan = baseline_scaled.max() - baseline_scaled.min() + 1

    if (filter_length * sampling_rate) > timespan:
        raise ValueError(
            "Filter length * sampling_rate is too large compared to the temporal span. "
            "Adjust the filter_length or sampling_rate."
        )

    window_size = int(timespan) | 1  # Ensure window size is an odd integer

    if filter_type == "block":
        window = _get_signal_window_with_zero_padding(
            type="boxcar", timespan=timespan, filter_length=filter_length, sampling_rate=sampling_rate
        )

    elif filter_type == "triangle":
        window = _get_signal_window_with_zero_padding(
            type="triang", timespan=timespan, filter_length=filter_length, sampling_rate=sampling_rate
        )
    elif filter_type == "gaussian":
        std_dev = filter_length * sampling_rate / 6  # ±3σ covers the window
        window = signal.windows.gaussian(window_size, std=std_dev)
    else:
        raise NotImplementedError(
            f"Filter type {filter_type} is not implemented. Available types are: 'block', 'triangle', 'gaussian'."
        )

    # normalize the window
    window = window / window.sum()

    # Extract weights for each time difference
    time_diffs = np.subtract.outer(baseline_scaled.values, baseline_scaled.values)
    center_index = window_size // 2
    weight_indices = np.clip(center_index + np.round(time_diffs).astype(int), 0, window_size - 1)
    weight_matrix = window[weight_indices]

    # Normalize weights along axis=1 (per row)
    weight_matrix /= np.sum(weight_matrix, axis=1, keepdims=True)

    # Apply the low-pass filter and return the non-linear deformation
    def _apply_filter(data):
        return np.einsum("ij,j->i", weight_matrix, data)

    unmodeled_disp = xr.apply_ufunc(
        _apply_filter,
        psc_phase_residuals,
        input_core_dims=[["time"]],
        output_core_dims=[["time"]],
        vectorize=True,
        dask="parallelized",
        output_dtypes=[psc_phase_residuals.dtype],
    )
    return unmodeled_disp.transpose(*psc_phase_residuals.dims)  # align dims order

depsi.atmosphere_estimation.setup_kriging_system(da, method='universal', empirical_variogram_args=None, variogram_args=None)

Kriging in space for a single time step.

Make sure that coordinates 'x' and 'y' are present in the DataArray and they are in Euclidean mode.

Parameters:

Name Type Description Default
da DataArray

The DataArray containing the data to be interpolated. It must have coordinates 'x' and 'y' in Euclidean mode.

required
method

The kriging method to use, e.g. 'universal'. Default is 'universal'. Other methods are not implemented yet. See https://geostat-framework.readthedocs.io/projects/pykrige/en/stable/generated/pykrige.uk.UniversalKriging.html#pykrige.uk.UniversalKriging for more details.

'universal'
empirical_variogram_args dict

Additional keyword arguments to pass to the function calculate_empirical_variogram. Allowed keys are: 'method', 'nlags', 'cutoff'. If it left empty, default parameters will be used as {"method":"unbiased_robust", "nlags": 50, "cutoff"=10000.0}. The method argument can be one of 'standard', 'unbiased', 'unbiased_robust'. Default is 'unbiased_robust'. See the documentation of calculate_empirical_variogram for more details.

None
variogram_args dict

Additional keyword arguments to pass to the kriging method. Valid keys are: 'variogram_model', 'variogram_parameters', 'drift_terms'.

The variogram_model can be one of: 'linear', 'power', 'gaussian', 'spherical', 'exponential', 'hole-effect'. Default is 'gaussian'.

The variogram parameters can be provided as a dictionary, for example,

linear

{'slope': slope, 'nugget': nugget}

power

{'scale': scale, 'exponent': exponent, 'nugget': nugget}

gaussian, spherical, exponential and hole-effect:

{'sill': s, 'range': r, 'nugget': n}
# OR
{'psill': p, 'range': r, 'nugget': n}

If variogram_parameters are not provided, they will be estimated from the empirical variogram using the specified values in empirical_variogram_args.

The drift_terms argument is only used for universal kriging. Supported drift terms are currently 'regional_linear', 'point_log', 'external_Z', 'specified', and 'functional'. Default is 'regional_linear', which activates a first-order drift model.

None

Returns:

Name Type Description
kriging_obj Any

A kriging object that can be used to perform kriging interpolation.

Source code in depsi/atmosphere_estimation.py
def setup_kriging_system(
    da: xr.DataArray,
    method="universal",
    empirical_variogram_args: dict = None,
    variogram_args: dict = None,
):
    """Kriging in space for a single time step.

    Make sure that coordinates 'x' and 'y' are present in the DataArray and they
    are in Euclidean mode.

    Parameters
    ----------
    da: xr.DataArray
        The DataArray containing the data to be interpolated. It must have
        coordinates 'x' and 'y' in Euclidean mode.
    method: str
        The kriging method to use, e.g. 'universal'. Default is 'universal'.
        Other methods are not implemented yet.
        See https://geostat-framework.readthedocs.io/projects/pykrige/en/stable/generated/pykrige.uk.UniversalKriging.html#pykrige.uk.UniversalKriging
        for more details.
    empirical_variogram_args: dict
        Additional keyword arguments to pass to the function
        `calculate_empirical_variogram`. Allowed keys are: 'method', 'nlags',
        'cutoff'. If it left empty, default parameters will be used as
        {"method":"unbiased_robust", "nlags": 50, "cutoff"=10000.0}. The `method`
        argument can be one of 'standard', 'unbiased', 'unbiased_robust'.
        Default is 'unbiased_robust'. See the documentation of
        `calculate_empirical_variogram` for more details.
    variogram_args: dict
        Additional keyword arguments to pass to the kriging method. Valid keys are:
        'variogram_model', 'variogram_parameters', 'drift_terms'.

        The `variogram_model` can be one of: 'linear', 'power', 'gaussian',
        'spherical', 'exponential', 'hole-effect'. Default is 'gaussian'.

        The variogram parameters can be provided as a dictionary, for example,
        # linear
            {'slope': slope, 'nugget': nugget}
        # power
            {'scale': scale, 'exponent': exponent, 'nugget': nugget}
        # gaussian, spherical, exponential and hole-effect:
            {'sill': s, 'range': r, 'nugget': n}
            # OR
            {'psill': p, 'range': r, 'nugget': n}
        If `variogram_parameters` are not provided, they will be estimated from
        the empirical variogram using the specified values in `empirical_variogram_args`.

        The `drift_terms` argument is only used for universal kriging. Supported drift
        terms are currently 'regional_linear', 'point_log', 'external_Z',
        'specified', and 'functional'. Default is 'regional_linear', which activates
        a first-order drift model.

    Returns
    -------
    kriging_obj: Any
        A kriging object that can be used to perform kriging interpolation.
    """
    # Check that da.data shape is 2d
    if len(da.data.shape) > 2:
        raise ValueError("DataArray must be 2D with coordinates 'x' and 'y' in Euclidean mode.")

    # Check if there x, y coords
    if "x" not in da.coords or "y" not in da.coords:
        raise ValueError("DataArray must have coordinates 'x' and 'y' in Euclidean mode.")
    # Calculate empirical variogram
    if not empirical_variogram_args:
        logger.info("Estimating variogram with default parameters.")
        empirical_variogram_args = {}

    _check_empirical_variogram_args(empirical_variogram_args)

    lags, semivariances = calculate_empirical_variogram(da, **empirical_variogram_args)

    # Check if variogram parameters are provided
    # if not, estimate them
    if not variogram_args:
        variogram_args = {}

    _check_variogram_args(variogram_args)

    variogram_model = variogram_args.get("variogram_model", "gaussian")
    variogram_parameters = variogram_args.get("variogram_parameters", None)
    if variogram_parameters is None:
        variogram_parameters, _ = fit_variogram(da, lags, semivariances, variogram_model)

    # Create a kriging instance
    if method == "universal":
        # see input arguments in
        # https://github.com/GeoStat-Framework/PyKrige/blob/e02baad442ac99b22f038b09b6290e7abacc17ae/src/pykrige/uk.py#L220
        kriging_obj = pykrige.uk.UniversalKriging(
            da.coords["x"],
            da.coords["y"],
            da,
            variogram_model=variogram_model,
            variogram_parameters=variogram_parameters,
            exact_values=False,  #  If True, results would be input values at input locations
            drift_terms=variogram_args.get("drift_terms", "regional_linear"),  # this activates drift of order 1
        )

        # Adjust some variables
        kriging_obj.lags = lags
        kriging_obj.semivariance = semivariances

        return kriging_obj
    else:
        raise NotImplementedError(f"{method} is not implemented yet.")

depsi.atmosphere_estimation.fit_variogram(da, lags=None, semivariances=None, variogram_model='gaussian', empirical_variogram_method='unbiased_robust', empirical_variogram_nlags=50, empirical_variogram_cutoff=10000.0)

Fit a variogram model to the empirical variogram.

If the arguments lags or semivariances are None, the empirical variogram can be calculated using arguments empirical_variogram_method, empirical_variogram_nlags, and empirical_variogram_cutoff.

Parameters:

Name Type Description Default
da DataArray

The DataArray containing the variable of interest (including 'x' and 'y' coordinates in Euclidean mode) to fit the variogram model.

required
lags ndarray

The lags for the empirical variogram. If None, the empirical variogram will be calculated.

None
semivariances ndarray

The semivariances for the empirical variogram. If None, the empirical variogram will be calculated.

None
variogram_model str

The variogram model to fit. Options are: 'linear', 'power', 'gaussian', 'spherical', 'exponential', 'hole-effect'. Default is 'gaussian'.

'gaussian'
empirical_variogram_method str

The method argument can be one of 'standard', 'unbiased', 'unbiased_robust'. Default is 'unbiased_robust'. See the documentation of calculate_empirical_variogram.

'unbiased_robust'
empirical_variogram_nlags int

The number of lags to use for the empirical variogram. Default is 50.

50
empirical_variogram_cutoff float

The maximum distance to consider for the empirical variogram. Default is 10000.0.

10000.0

Returns:

Name Type Description
variogram_parameters dict

The fitted variogram parameters.

(lags, estimated_semivariances, semivariances): tuple

A tuple containing the lags, the estimated semivariances from the fitted model, and the empirical semivariances.

Source code in depsi/atmosphere_estimation.py
def fit_variogram(
    da: xr.DataArray,
    lags: np.ndarray = None,
    semivariances: np.ndarray = None,
    variogram_model: str = "gaussian",
    empirical_variogram_method: str = "unbiased_robust",
    empirical_variogram_nlags: int = 50,
    empirical_variogram_cutoff: float = 10000.0,
):
    """Fit a variogram model to the empirical variogram.

    If the arguments `lags` or `semivariances` are None, the empirical variogram
    can be calculated using arguments `empirical_variogram_method`,
    `empirical_variogram_nlags`, and `empirical_variogram_cutoff`.

    Parameters
    ----------
    da: xr.DataArray
        The DataArray containing the variable of interest (including 'x' and 'y'
        coordinates in Euclidean mode) to fit the variogram model.
    lags: np.ndarray, optional
        The lags for the empirical variogram. If None, the empirical variogram
        will be calculated.
    semivariances: np.ndarray, optional
        The semivariances for the empirical variogram. If None, the empirical
        variogram will be calculated.
    variogram_model: str
        The variogram model to fit. Options are: 'linear', 'power', 'gaussian',
        'spherical', 'exponential', 'hole-effect'. Default is 'gaussian'.
    empirical_variogram_method: str
        The `method` argument can be one of 'standard', 'unbiased',
        'unbiased_robust'. Default is 'unbiased_robust'. See the documentation
        of `calculate_empirical_variogram`.
    empirical_variogram_nlags: int
        The number of lags to use for the empirical variogram. Default is 50.
    empirical_variogram_cutoff: float
        The maximum distance to consider for the empirical variogram. Default is 10000.0.

    Returns
    -------
    variogram_parameters: dict
        The fitted variogram parameters.
    (lags, estimated_semivariances, semivariances): tuple
        A tuple containing the lags, the estimated semivariances from the fitted
        model, and the empirical semivariances.
    """
    if lags is None or semivariances is None:
        lags, semivariances = calculate_empirical_variogram(
            da, empirical_variogram_method, empirical_variogram_nlags, empirical_variogram_cutoff
        )

    # see equations and reference in
    # https://geostat-framework.readthedocs.io/projects/pykrige/en/stable/variogram_models.html
    # Gaussian model uses "effective range" introduced in  Pebesma, E.J. &
    # Wesseling, C.G. (1998). "Gstat: a program for geostatistical modelling,
    # prediction and simulation." Computers & Geosciences, 24(1), 17–31
    variogram_dict = {
        "linear": pykrige.variogram_models.linear_variogram_model,
        "power": pykrige.variogram_models.power_variogram_model,
        "gaussian": pykrige.variogram_models.gaussian_variogram_model,
        "spherical": pykrige.variogram_models.spherical_variogram_model,
        "exponential": pykrige.variogram_models.exponential_variogram_model,
        "hole-effect": pykrige.variogram_models.hole_effect_variogram_model,
    }

    variogram_function = variogram_dict.get(variogram_model)

    # see reference in
    # https://github.com/GeoStat-Framework/PyKrige/blob/e02baad442ac99b22f038b09b6290e7abacc17ae/src/pykrige/core.py#L582
    estimated_model_parameters = pykrige.core._calculate_variogram_model(
        lags, semivariances, variogram_model, variogram_function, weight=False
    )

    # Prepare the parameters
    variogram_parameters = {}
    if variogram_model == "linear":
        variogram_parameters["slope"] = estimated_model_parameters[0]
        variogram_parameters["nugget"] = estimated_model_parameters[1]
    elif variogram_model == "power":
        variogram_parameters["scale"] = estimated_model_parameters[0]
        variogram_parameters["exponent"] = estimated_model_parameters[1]
        variogram_parameters["nugget"] = estimated_model_parameters[2]
    else:
        variogram_parameters["sill"] = estimated_model_parameters[0] + estimated_model_parameters[2]
        variogram_parameters["range"] = estimated_model_parameters[1]
        variogram_parameters["nugget"] = estimated_model_parameters[2]

    estimated_semivariances = variogram_function(estimated_model_parameters, lags)
    return variogram_parameters, (lags, estimated_semivariances, semivariances)

depsi.atmosphere_estimation.solve_kriging(ps_atmosphere, prediction_coords, method='universal', **kwargs)

Kriging in space to estimate the atmosphere signal time series.

Parameters:

Name Type Description Default
ps_atmosphere DataArray

The DataArray containing the atmosphere signal with coordinates 'x' and 'y' in Euclidean mode. It must have a time dimension.

required
prediction_coords Dataset | DataArray

The prediction coordinates on which to interpolate the atmosphere signal. It should have coordinates 'x' and 'y' in Euclidean mode.

required
method

The kriging method to use, e.g. 'universal'. Default is 'universal'. Other methods are not implemented yet. see https://geostat-framework.readthedocs.io/projects/pykrige/en/stable/generated/pykrige.uk.UniversalKriging.html#pykrige.uk.UniversalKriging for more details.

'universal'
kwargs

Additional keyword arguments to pass to the function solve_kriging_per_single_time. Allowed keys are: "n_nearest_neighbors", "kriging_backend", "empirical_variogram_args", "variogram_args", See the documentation of solve_kriging_per_single_time for more details.

{}

Returns:

Type Description
Dataset

A dataset containing the interpolated atmosphere signal and the associated variance (sigmasq) for each time step.

Source code in depsi/atmosphere_estimation.py
def solve_kriging(
    ps_atmosphere: xr.DataArray, prediction_coords: xr.Dataset | xr.DataArray, method="universal", **kwargs
):
    """Kriging in space to estimate the atmosphere signal time series.

    Parameters
    ----------
    ps_atmosphere: xr.DataArray
        The DataArray containing the atmosphere signal with coordinates 'x' and
        'y' in Euclidean mode. It must have a time dimension.
    prediction_coords: xr.DataArray
        The prediction coordinates on which to interpolate the atmosphere
        signal. It should have coordinates 'x' and 'y' in Euclidean mode.
    method: str
        The kriging method to use, e.g. 'universal'. Default is 'universal'.
        Other methods are not implemented yet.
        see https://geostat-framework.readthedocs.io/projects/pykrige/en/stable/generated/pykrige.uk.UniversalKriging.html#pykrige.uk.UniversalKriging
        for more details.
    kwargs: dict
        Additional keyword arguments to pass to the function
        `solve_kriging_per_single_time`. Allowed keys are:
        "n_nearest_neighbors", "kriging_backend", "empirical_variogram_args",
        "variogram_args", See the documentation of
        `solve_kriging_per_single_time` for more details.

    Returns
    -------
    xr.Dataset
        A dataset containing the interpolated atmosphere signal and the associated
        variance (sigmasq) for each time step.
    """
    # Check if ps_atmosphere has time dimension
    if "time" not in ps_atmosphere.dims:
        raise ValueError(
            "ps_atmosphere must have a 'time' dimension. Otherwise, use `krige_per_single_time` function directly."
        )

    # Check if ps_atmosphere has 'x' and 'y' coordinates
    if "x" not in ps_atmosphere.coords or "y" not in ps_atmosphere.coords:
        raise ValueError("ps_atmosphere must have coordinates 'x' and 'y' in Euclidean mode.")

    # Remove "time" because we will apply kriging per time step
    input_core_dims = list(ps_atmosphere.sizes)
    if "time" in input_core_dims:
        input_core_dims.remove("time")

    coords_no_time = {k: v for k, v in ps_atmosphere.coords.items() if "time" not in v.dims}

    # Check if prediction_coords has "x" and "y" coordinates
    if "x" not in prediction_coords.coords or "y" not in prediction_coords.coords:
        raise ValueError("Prediction coordinates must have coordinates 'x' and 'y' in Euclidean mode.")

    # Check if 'time' in prediction_coords dims
    if "time" in prediction_coords.dims:
        raise ValueError("Prediction coordinates must not have 'time' dimension.")

    # Check if `input_core_dims` are not chunked
    if ps_atmosphere.chunks is not None:
        chunk_sizes = dict(zip(list(ps_atmosphere.sizes), ps_atmosphere.chunks, strict=False))
        if any(len(chunk_sizes[dim]) != 1 for dim in input_core_dims):
            logger.warning(
                "Rechunking ps_atmosphere to have non-chunked core dimensions for kriging. "
                f"Current chunks: {ps_atmosphere.chunks}"
            )
            ps_atmosphere = ps_atmosphere.chunk({dim: -1 for dim in input_core_dims})

    def apply_kriging_per_single_time(data: np.ndarray):
        """Apply kriging for a single time step."""
        da = xr.DataArray(data=data, coords=coords_no_time, dims=input_core_dims)
        predicted, sigmasq = solve_kriging_per_single_time(da, prediction_coords, method=method, **kwargs)
        return predicted, sigmasq

    predicted, sigmasq = xr.apply_ufunc(
        apply_kriging_per_single_time,
        ps_atmosphere,
        input_core_dims=[input_core_dims],
        output_core_dims=[
            list(prediction_coords.sizes)[::-1],
            list(prediction_coords.sizes)[::-1],
        ],  # result has shape (y, x)
        dask="parallelized",
        vectorize=True,
        output_dtypes=[ps_atmosphere.dtype, ps_atmosphere.dtype],
        dask_gufunc_kwargs={
            "output_sizes": dict(prediction_coords.sizes)
        },  # this is needed when prediction_coords has "x" and "y" coordinates
    )

    # Update time values
    predicted = predicted.assign_coords(time=ps_atmosphere["time"].data)
    sigmasq = sigmasq.assign_coords(time=ps_atmosphere["time"].data)

    return xr.Dataset({"predicted": predicted, "sigmasq": sigmasq})