Source code for rainfallqc.checks.pypwsqc_filters

# -*- coding: utf-8 -*-
"""
Quality control checks translated from the pyPWSQC framework (https://pypwsqc.readthedocs.io/en/latest/).

The PWSQC framework includes filters originally develop for automated PWS within the COST Action OPENSENSE.

'run_' and 'check_' relate to the algorithms from pyPWSQC.

Functions are ordered alphabetically.
"""

from typing import List

import numpy as np
import polars as pl
import poligrain as plg
import pypwsqc.flagging
import xarray as xr

from rainfallqc.utils import data_utils

MAX_DISTANCE_M = 10e3

DEFAULT_TIME_UNITS = "seconds since 1970-01-01 00:00:00"
DEFAULT_RAINFALL_ATTRIBUTES = {
    "name": "rainfall",
    "long_name": "rainfall amount per time unit",
    "units": "mm",
    "coverage_contant_type": "physicalMeasurement",
}
DEFAULT_LAT_LON_ATTRIBUTES = {"unit": "degrees in WGS84 projection"}
DEFAULT_ELEVATION_ATTRIBUTES = {"units": "meters", "longname": "meters_above_sea"}


[docs] def run_bias_correction(neighbour_data: pl.DataFrame) -> None: """ Bias correction. Parameters ---------- neighbour_data : Rainfall data of neighbouring gauges with time col Returns ------- neighbour_data : todo """ pass
[docs] def run_event_based_filter(neighbour_data: pl.DataFrame) -> None: """ Event based filter (EBF). Parameters ---------- neighbour_data : Rainfall data of neighbouring gauges with time col Returns ------- neighbour_data : todo """ pass
[docs] def check_faulty_zeros( neighbour_data: pl.DataFrame, neighbour_metadata: pl.DataFrame, neighbouring_gauge_ids: List[str], # TODO: consider if this would break if integers neighbour_metadata_gauge_id_col: str, time_res: str, projection: str, nint: int, n_stat: int, max_distance_for_neighbours: int | float = MAX_DISTANCE_M, time_units: str = DEFAULT_TIME_UNITS, rainfall_attributes: dict = DEFAULT_RAINFALL_ATTRIBUTES, lat_lon_attributes: dict = DEFAULT_LAT_LON_ATTRIBUTES, global_attributes: dict = None, ) -> xr.Dataset: """ Will flag faulty zeros based on neighbours ... Parameters ---------- neighbour_data : Rainfall data of neighbouring gauges with time col neighbour_metadata : Metadata for the rainfall data with 'latitude' and 'longitude' neighbour_metadata_gauge_id_col : Column with the gauge ID target_gauge_col : Target gauge column neighbouring_gauge_ids: List of ids with neighbouring gauges time_res : Time resolution of data projection : cartesian/metric coordinate system nint : Number of intervals n_stat : Number of stations max_distance_for_neighbours : Maximum distance to consider for neighbours time_units : Units and encoding of the 'time' column rainfall_attributes : Attributes for rainfall in the xarray Dataset lat_lon_attributes : Attributes for lat and lon in the xarray Dataset global_attributes : Global attributes for xarray Dataset Returns ------- neighbour_data_ds_filtered : Data with flags for faulty zeros Examples -------- available at: https://pypwsqc.readthedocs.io/en/latest/notebooks/merged_filters.html """ # 0. Initial checks data_utils.check_data_is_specific_time_res(neighbour_data, time_res) for gauge_id in neighbouring_gauge_ids: assert gauge_id in neighbour_metadata[neighbour_metadata_gauge_id_col], ( f"ID: '{gauge_id}' needs to be a value in the metadata." ) assert gauge_id in neighbour_data.columns, f"ID: '{gauge_id}' needs to be a column be in data." # 1. filter metadata to only be nearby neighbour_metadata = neighbour_metadata.filter( pl.col(neighbour_metadata_gauge_id_col).is_in(neighbouring_gauge_ids) ) # 2. convert to xarray neighbour_data_ds = convert_neighbour_data_to_xarray( neighbour_data=neighbour_data, neighbour_metadata=neighbour_metadata, time_units=time_units, projection=projection, rainfall_attributes=rainfall_attributes, lat_lon_attributes=lat_lon_attributes, global_attributes=global_attributes, ) # 3. compute distance matrix (if not already exists) distance_matrix = compute_distance_matrix(neighbour_data_ds) # 4. filter distance matrix neighbour_data_ds = subset_distance_matrix( neighbour_data_ds=neighbour_data_ds, distance_matrix=distance_matrix, max_distance_for_neighbours=max_distance_for_neighbours, ) # 5. run FZ filter neighbour_data_ds_filtered = pypwsqc.flagging.fz_filter(neighbour_data_ds, nint=nint, n_stat=n_stat) return neighbour_data_ds_filtered
[docs] def check_high_influx_filter(neighbour_data: pl.DataFrame) -> None: """ High influx filter. Parameters ---------- neighbour_data : Rainfall data of neighbouring gauges with time col Returns ------- neighbour_data : todo """ pass
[docs] def run_indicator_correlation(neighbour_data: pl.DataFrame) -> None: """ Run indicator correlation. Parameters ---------- neighbour_data : Rainfall data of neighbouring gauges with time col Returns ------- neighbour_data : todo """ pass
[docs] def run_peak_removal(neighbour_data: pl.DataFrame) -> None: """ Peak removal. Parameters ---------- neighbour_data : Rainfall data of neighbouring gauges with time col Returns ------- neighbour_data : todo """ pass
[docs] def check_station_outlier( neighbour_data: pl.DataFrame, neighbour_metadata: pl.DataFrame, neighbouring_gauge_ids: List[str], # TODO: consider if this would break if integers neighbour_metadata_gauge_id_col: str, time_res: str, projection: str, evaluation_period: int, mmatch: int, gamma: float, n_stat: int, max_distance_for_neighbours: int | float = MAX_DISTANCE_M, time_units: str = DEFAULT_TIME_UNITS, rainfall_attributes: dict = DEFAULT_RAINFALL_ATTRIBUTES, lat_lon_attributes: dict = DEFAULT_LAT_LON_ATTRIBUTES, global_attributes: dict = None, ) -> xr.Dataset: """ Station outlier. Parameters ---------- neighbour_data : Rainfall data of neighbouring gauges with time col neighbour_metadata : Metadata for the rainfall data with 'latitude' and 'longitude' neighbour_metadata_gauge_id_col : Column with the gauge ID target_gauge_col : Target gauge column neighbouring_gauge_ids: List of ids with neighbouring gauges time_res : Time resolution of data projection : cartesian/metric coordinate system evaluation_period length of (rolling) window for correlation calculation mmatch threshold for number of matching rainy intervals in evaluation period gamma threshold for rolling median pearson correlation n_stat : Number of stations max_distance_for_neighbours : Maximum distance to consider for neighbours time_units : Units and encoding of the 'time' column rainfall_attributes : Attributes for rainfall in the xarray Dataset lat_lon_attributes : Attributes for lat and lon in the xarray Dataset global_attributes : Global attributes for xarray Dataset Returns ------- neighbour_data_ds_filtered : Data with flags for station outliers Examples -------- available at: https://pypwsqc.readthedocs.io/en/latest/notebooks/merged_filters.html """ # 0. Initial checks data_utils.check_data_is_specific_time_res(neighbour_data, time_res) for gauge_id in neighbouring_gauge_ids: assert gauge_id in neighbour_metadata[neighbour_metadata_gauge_id_col], ( f"ID: '{gauge_id}' needs to be a value in the metadata." ) assert gauge_id in neighbour_data.columns, f"ID: '{gauge_id}' needs to be a column be in data." # 1. filter metadata to only be nearby neighbour_metadata = neighbour_metadata.filter( pl.col(neighbour_metadata_gauge_id_col).is_in(neighbouring_gauge_ids) ) # 2. convert to xarray neighbour_data_ds = convert_neighbour_data_to_xarray( neighbour_data=neighbour_data, neighbour_metadata=neighbour_metadata, time_units=time_units, projection=projection, rainfall_attributes=rainfall_attributes, lat_lon_attributes=lat_lon_attributes, global_attributes=global_attributes, ) # 3. compute distance matrix (if not already exists) distance_matrix = compute_distance_matrix(neighbour_data_ds) # 4. filter distance matrix neighbour_data_ds = subset_distance_matrix( neighbour_data_ds=neighbour_data_ds, distance_matrix=distance_matrix, max_distance_for_neighbours=max_distance_for_neighbours, ) # 5. Initialise SO flags neighbour_data_ds["so_flag"] = xr.DataArray( np.ones((len(neighbour_data_ds.id), len(neighbour_data_ds.time))) * -999, dims=("id", "time") ) neighbour_data_ds["median_corr_nbrs"] = xr.DataArray( np.ones((len(neighbour_data_ds.id), len(neighbour_data_ds.time))) * -999, dims=("id", "time") ) neighbour_data_ds["gamma"] = xr.DataArray( np.ones((len(neighbour_data_ds.id), len(neighbour_data_ds.time))) * gamma, dims=("id", "time") ) # 6. run SO filter neighbour_data_ds_filtered = pypwsqc.flagging.so_filter( neighbour_data_ds, distance_matrix=distance_matrix, evaluation_period=evaluation_period, mmatch=mmatch, gamma=gamma, n_stat=n_stat, max_distance=max_distance_for_neighbours, ) return neighbour_data_ds_filtered
[docs] def convert_neighbour_data_to_xarray( neighbour_data: pl.DataFrame, neighbour_metadata: pl.DataFrame, projection: str, time_units: str = DEFAULT_TIME_UNITS, rainfall_attributes: dict = DEFAULT_RAINFALL_ATTRIBUTES, lat_lon_attributes: dict = DEFAULT_LAT_LON_ATTRIBUTES, global_attributes: dict = None, ) -> xr.Dataset: """ Convert neighbour data in polars format to xarray dataset. Parameters ---------- neighbour_data : Rainfall data of neighbouring gauges with time col neighbour_metadata : Metadata for the rainfall data with 'latitude' and 'longitude' projection : cartesian/metric coordinate system time_units : Units and encoding of the 'time' column rainfall_attributes : Attributes for rainfall in the xarray Dataset lat_lon_attributes : Attributes for lat and lon in the xarray Dataset global_attributes : Global attributes for xarray Dataset Returns ------- neighbour_data_ds : xarray dataset with assigned attributes """ # 1. convert to xarray via pandas neighbour_data_ds = neighbour_data.to_pandas().set_index("time").to_xarray().to_array(dim="id") neighbour_data_ds = neighbour_data_ds.to_dataset(name="rainfall") # 2. assign coords neighbour_data_ds = neighbour_data_ds.assign_coords( longitude=("id", neighbour_metadata["longitude"].to_numpy()), latitude=("id", neighbour_metadata["latitude"].to_numpy()), ) # 3. set encoding attribute for time neighbour_data_ds.time.encoding["units"] = time_units neighbour_data_ds["time"] = neighbour_data_ds["time"].assign_attrs({"unit": time_units}) # 4. Assign variable attributes neighbour_data_ds["rainfall"] = neighbour_data_ds["rainfall"].assign_attrs(rainfall_attributes) neighbour_data_ds["longitude"] = neighbour_data_ds["longitude"].assign_attrs(lat_lon_attributes) neighbour_data_ds["latitude"] = neighbour_data_ds["latitude"].assign_attrs(lat_lon_attributes) # 5. Assign global attributes neighbour_data_ds = neighbour_data_ds.assign_attrs(global_attributes) # 6. reproject xarray neighbour_data_ds.coords["x"], neighbour_data_ds.coords["y"] = plg.spatial.project_point_coordinates( x=neighbour_data_ds.longitude, y=neighbour_data_ds.latitude, target_projection=projection ) return neighbour_data_ds
[docs] def compute_distance_matrix(neighbour_data_ds: xr.Dataset) -> xr.Dataset: """ Compute a distance matrix. Parameters ---------- neighbour_data_ds : xarray dataset of neighbour data Returns ------- distance_matrix : A distance matrix of all neighbouring gauges """ distance_matrix = plg.spatial.calc_point_to_point_distances(neighbour_data_ds, neighbour_data_ds) return distance_matrix
[docs] def subset_distance_matrix( neighbour_data_ds: xr.Dataset, distance_matrix: xr.Dataset, max_distance_for_neighbours: int | float ) -> xr.Dataset: """ Compute a distance matrix. Parameters ---------- neighbour_data_ds : xarray dataset of neighbour data distance_matrix : A distance matrix of all neighbouring gauges max_distance_for_neighbours : Maximum distance to consider for neighbours Returns ------- neighbour_data_ds : A distance matrix of all neighbouring gauges """ # 1. to remove distance_matrix.load() # 2. select nearest neighbours with max_distance buffer nbrs_not_nan = [] reference = [] for pws_id in neighbour_data_ds.id.data: neighbor_ids = distance_matrix.id.data[ (distance_matrix.sel(id=pws_id) < max_distance_for_neighbours) & (distance_matrix.sel(id=pws_id) > 0) ] N = neighbour_data_ds.rainfall.sel(id=neighbor_ids).notnull().sum(dim="id") # noqa: PD004 nbrs_not_nan.append(N) median = neighbour_data_ds.sel(id=neighbor_ids).rainfall.median(dim="id") reference.append(median) neighbour_data_ds["nbrs_not_nan"] = xr.concat(nbrs_not_nan, dim="id") neighbour_data_ds["reference"] = xr.concat(reference, dim="id") return neighbour_data_ds