Source code for rainfallqc.checks.gauge_checks

# -*- coding: utf-8 -*-
"""
Quality control checks examining suspicious rain gauges.

Gauge checks are defined as QC checks that: "detect abnormalities in summary and descriptive statistics of rain gauges."

Classes and functions ordered by appearance in IntenseQC framework.
"""

import polars as pl
import scipy.stats

from rainfallqc.core.all_qc_checks import qc_check
from rainfallqc.utils import data_utils, stats


[docs] @qc_check("check_years_where_nth_percentile_is_zero", require_non_negative=True) def check_years_where_nth_percentile_is_zero(data: pl.DataFrame, target_gauge_col: str, quantile: float) -> list: """ Return years where the n-th percentiles is zero. This is QC1 from the IntenseQC framework Parameters ---------- data : Rainfall data target_gauge_col : Column with rainfall data quantile : Between 0 & 1 Returns ------- year_list : List of years where n-th percentile is zero. """ nth_perc = data.group_by_dynamic("time", every="1y").agg(pl.quantile(target_gauge_col, quantile)) return nth_perc.filter(pl.col(target_gauge_col) == 0)["time"].dt.year().to_list()
[docs] @qc_check("check_years_where_annual_mean_k_top_rows_are_zero", require_non_negative=True) def check_years_where_annual_mean_k_top_rows_are_zero(data: pl.DataFrame, target_gauge_col: str, k: int) -> list: """ Return year list where the annual mean top-K rows are zero. This is QC2 from the IntenseQC framework Parameters ---------- data : Rainfall data target_gauge_col : Column with rainfall data k : Number of top values check i.e. k==5 is top 5 Returns ------- year_list : List of years where k-largest are zero. """ data_top_k = data.group_by_dynamic("time", every="1y").agg(pl.col(target_gauge_col).top_k(k).min()) return data_top_k.filter(pl.col(target_gauge_col) == 0)["time"].dt.year().to_list()
[docs] @qc_check("check_temporal_bias", require_non_negative=True) def check_temporal_bias( data: pl.DataFrame, target_gauge_col: str, time_granularity: str, p_threshold: float = 0.01, ) -> int: """ Perform a two-sided t-test on the distribution of mean rainfall over time slices. This is QC3 (day of week bias) and QC4 (hour-of-day bias) from the IntenseQC framework. Parameters ---------- data : Rainfall data target_gauge_col : Column with rainfall data time_granularity : Temporal grouping, either 'weekday' or 'hour' p_threshold : Significance level for the test Returns ------- flag : int 1 if bias is detected (p < threshold), 0 otherwise """ if time_granularity == "weekday": time_group = pl.col("time").dt.weekday() elif time_granularity == "hour": time_group = pl.col("time").dt.hour() else: raise ValueError("time_granularity must be either 'weekday' or 'hour'") # 1. Get time-average mean grouped_means = data.group_by(time_group).agg(pl.col(target_gauge_col).drop_nans().mean())[target_gauge_col] # 2. Get data mean overall_mean = data[target_gauge_col].drop_nans().mean() # 3. Compute 1-sample t-test _, p_val = scipy.stats.ttest_1samp(grouped_means, overall_mean) return int(p_val < p_threshold)
[docs] @qc_check("check_intermittency", require_non_negative=True) def check_intermittency( data: pl.DataFrame, target_gauge_col: str, no_data_threshold: int = 2, annual_count_threshold: int = 5 ) -> list: """ Return years where more than five periods of missing data are bounded by zeros. TODO: split into multiple sub-functions and write more tests! This is QC5 from the IntenseQC framework. Parameters ---------- data : Rainfall data target_gauge_col : Column with rainfall data no_data_threshold : Number of missing values needed to be counted as a no data period (default: 2 (days)) annual_count_threshold : Number of missing data periods above no_data_threshold per year (default: 5) Returns ------- years_w_intermittency : List of years with intermittency issues. """ # 1. Check data has consistent time step data_utils.check_data_has_consistent_time_step(data) # 2. Replace missing values with NaN data = data_utils.replace_missing_vals_with_nan(data, target_gauge_col) # 3. Mark missing values data = data.with_columns(pl.col(target_gauge_col).is_nan().alias("is_missing")) # 4. Assign group numbers to consecutive missing values grouped = data.with_columns( pl.when(pl.col("is_missing")) .then((~pl.col("is_missing")).cum_sum()) # Only number missing stretches .otherwise(None) .alias("group") ) # 5. Count size of each missing group group_counts = ( grouped.filter(pl.col("is_missing")) .group_by("group") .agg(pl.len().alias("count")) .filter(pl.col("count") >= no_data_threshold) ) # 6. Keep only valid groups valid_missing_groups = group_counts["group"].to_list() valid_missing = grouped.filter(pl.col("group").is_in(valid_missing_groups)) # 7. Get first and last time for each group first_last = ( valid_missing.sort("time") .group_by("group") .agg([pl.first("time").alias("start_time"), pl.last("time").alias("end_time")]) ) # 8. Add stable row index to full sorted data df_sorted = data.sort("time").with_row_index("row_idx") # 9. Lookup row indices of group bounds first_last_with_idx = ( first_last.join(df_sorted.select(["time", "row_idx"]), left_on="start_time", right_on="time") .rename({"row_idx": "start_idx"}) .join(df_sorted.select(["time", "row_idx"]), left_on="end_time", right_on="time") .rename({"row_idx": "end_idx"}) ) # 10. Compute prev/next indices and fetch values row_lookup = df_sorted.select([pl.col("row_idx"), pl.col(target_gauge_col)]) first_last_with_vals = ( first_last_with_idx.with_columns( [(pl.col("start_idx") - 1).alias("prev_idx"), (pl.col("end_idx") + 1).alias("next_idx")] ) .join(row_lookup.rename({target_gauge_col: "prev_val"}), left_on="prev_idx", right_on="row_idx") .join(row_lookup.rename({target_gauge_col: "next_val"}), left_on="next_idx", right_on="row_idx") ) # 11. Filter to groups bounded by zero bounded_by_zero = first_last_with_vals.filter((pl.col("prev_val") == 0) & (pl.col("next_val") == 0)) # 12. Extract year from each bounded group's start time bounded_years = bounded_by_zero.select(pl.col("start_time").dt.year().alias("year")) # 13. Count how many bounded groups occur per year year_counts = bounded_years.group_by("year").len() # 14. Get years exceeding threshold years_w_intermittency = year_counts.filter(pl.col("len") >= annual_count_threshold)["year"].to_list() return years_w_intermittency
[docs] @qc_check("check_breakpoints", require_non_negative=True) def check_breakpoints( data: pl.DataFrame, target_gauge_col: str, p_threshold: float = 0.01, ) -> int: """ Use a Pettitt test rainfall data to check for breakpoints. This is QC6 from the IntenseQC framework. Parameters ---------- data : Rainfall data. target_gauge_col : Column with rainfall data. p_threshold : Significance level for the test. Returns ------- flag : int 1 if breakpoint is detected (p < p_threshold), 0 otherwise """ # 1. Upsample data to daily data_upsampled = data.upsample("time", every="1d") # 2. Compute Pettitt test for breakpoints _, p_val = stats.pettitt_test(data_upsampled[target_gauge_col].fill_nan(0.0)) if p_val < p_threshold: return 1 else: return 0
[docs] @qc_check("check_min_val_change", require_non_negative=True) def check_min_val_change(data: pl.DataFrame, target_gauge_col: str, expected_min_val: float) -> list: """ Return years when the minimum recorded value changes. Used to determine whether there are possible changes to the measuring equipment. This is QC7 from the IntenseQC framework. Parameters ---------- data : Rainfall data target_gauge_col : Column with rainfall data. expected_min_val : Expected value of rainfall i.e. basically the resolution of data. Returns ------- yr_list : List of years with minimum value changes. """ # 1. Filter out non-zero years data_non_zero = data.filter(pl.col(target_gauge_col) > 0) # 2. Get minimum value each year data_min_by_year = data_non_zero.group_by_dynamic(pl.col("time"), every="1y").agg(pl.col(target_gauge_col).min()) non_res_years = data_min_by_year.filter(pl.col(target_gauge_col) != expected_min_val) return non_res_years["time"].dt.year().to_list()