# -*- coding: utf-8 -*-
"""
Statistical tests and other indices for rainfall data quality control.
Classes and functions ordered alphabetically.
"""
import numpy as np
import polars as pl
import scipy.stats
from rainfallqc.utils import data_utils
RAINFALL_WORLD_RECORDS = {"15m": 198.0, "1h": 401.0, "1d": 1825.0, "hourly": 401.0, "daily": 1825.0} # mm
[docs]
def affinity_index(data: pl.DataFrame, binary_col: str, return_match_and_diff: bool = False) -> tuple | float:
"""
Calculate affinity index from binary column.
Parameters
----------
data :
Rainfall data
binary_col :
Column with binary data
return_match_and_diff :
Whether to return count of matching and difference columns as well as affinity index.
Returns
-------
affinity :
Affinity index.
"""
match = data[binary_col].value_counts().filter(pl.col(binary_col) == 1)["count"]
match = match.item() if match.len() == 1 else 0
diff = data[binary_col].value_counts().filter(pl.col(binary_col) == 0)["count"]
diff = diff.item() if diff.len() == 1 else 0
affinity = match / (match + diff)
if return_match_and_diff:
return match, diff, affinity
return affinity
[docs]
def dry_spell_fraction(rain_daily: pl.DataFrame, target_gauge_col: str, dry_period_days: int) -> pl.Series:
"""
Make dry spell fraction column.
Parameters
----------
rain_daily :
Single time-step of rainfall data with 'dry_day' column
target_gauge_col :
Column with Rainfall data
dry_period_days :
Dry periods window in days
Returns
-------
rain_daily_w_dry_spell_fraction :
Single row with dry spell fraction column
"""
assert "is_dry" in rain_daily, "No dry_day column found, please run rainfallqc.utils.data_utils.get_dry_spells()"
# 1. Get dry spells
rain_daily_dry_day = data_utils.get_dry_spells(rain_daily, target_gauge_col)
# 2. Get dry spell fraction
rain_daily_dry_day = rain_daily_dry_day.with_columns(
dry_spell_fraction=pl.col("is_dry").rolling_sum(window_size=dry_period_days, min_samples=dry_period_days)
/ dry_period_days
)
return rain_daily_dry_day["dry_spell_fraction"]
[docs]
def factor_diff(data: pl.DataFrame, target_col: str, other_col: str) -> pl.DataFrame:
"""
Compute factor diff for polars.
Parameters
----------
data :
Rainfall data
target_col :
Target column to compute factor diff for
other_col :
Other column to compute factor diff for
Returns
-------
data_w_factor_diff :
Data with factor diff
"""
return data.with_columns(
pl.when((pl.col(target_col) > 0) & (pl.col(other_col) > 0))
.then(pl.col(target_col) / pl.col(other_col))
.otherwise(np.nan)
.alias("factor_diff")
)
[docs]
def filter_out_rain_world_records(data: pl.DataFrame, target_gauge_col: str, time_res: str) -> pl.DataFrame:
"""
Filter out rain world records based on time resolution.
Parameters
----------
data :
Rainfall data
target_gauge_col :
Column with rainfall data
time_res :
Temporal resolution of the time series either 'daily' or 'hourly'
Returns
-------
data_not_wr :
Data without rain world records
"""
# 1. Get world records
rainfall_world_records = get_rainfall_world_records()
# 2. Filter out world records
data_not_wr = data.with_columns(
pl.when(pl.col(target_gauge_col).is_null() | pl.col(target_gauge_col).is_nan())
.then(np.nan)
.when(pl.col(target_gauge_col) > rainfall_world_records[time_res])
.then(np.nan)
.otherwise(pl.col(target_gauge_col))
.alias(target_gauge_col)
)
return data_not_wr
[docs]
def fit_expon_and_get_percentile(series: pl.Series, percentiles: list[float]) -> dict[float, float]:
"""
Fit exponential to data series and then get percentile using PPF.
Parameters
----------
series :
Data series to fit exponential distribution.
percentiles :
Percentiles (between 0-1) to evaluate on the fitted exponential distribution
Returns
-------
expon_percentiles :
Threshold at percentile of fitted distribution
"""
# 1. Fit exponential distribution of normalised diff
expon_params = scipy.stats.expon.fit(series)
# 2. Calculate thresholds at percentiles of fitted distribution
return {p: scipy.stats.expon.ppf(p, expon_params[0], expon_params[1]) for p in percentiles}
[docs]
def gauge_correlation(data: pl.DataFrame, target_col: str, other_col: str) -> float:
"""
Calculate correlation between rain gauge data columns.
Parameters
----------
data :
Rainfall data
target_col :
Target rainfall column
other_col :
Other rainfall column
Returns
-------
corr_coef :
Correlation coefficient.
"""
return np.ma.corrcoef(np.ma.masked_invalid(data[target_col]), np.ma.masked_invalid(data[other_col]))[0, 1]
[docs]
def get_rainfall_world_records() -> dict[str, float]:
"""
Return rainfall world record as of 29/04/25.
See:
- http://www.nws.noaa.gov/oh/hdsc/record_precip/record_precip_world.html
- http://www.bom.gov.au/water/designRainfalls/rainfallEvents/worldRecRainfall.shtml
- https://wmo.asu.edu/content/world-meteorological-organization-global-weather-climate-extremes-archive
Returns
-------
rwr :
rainfall world records set in stats.py
"""
return RAINFALL_WORLD_RECORDS
[docs]
def percentage_diff(target: pl.Expr, other: pl.Expr) -> pl.Series:
"""
Percentage difference between target and other column.
Parameters
----------
target:
Target data to compare other too
other:
Other data
Returns
-------
perc_diff:
Percentage difference
"""
return (target - other) * 100 / other
[docs]
def pettitt_test(arr: pl.Series | np.ndarray) -> (int | float, int | float):
"""
Pettitt test for detecting a change point in a time series.
Calculated following Pettitt (1979): https://www.jstor.org/stable/2346729?seq=4#metadata_info_tab_contents.
TAKEN FROM: https://stackoverflow.com/questions/58537876/how-to-run-standard-normal-homogeneity-test-for-a-time-series-data.
Parameters
----------
arr : pl.Series or np.ndarray
The input time series data.
Returns
-------
tau : int
Index of the change point (first point of the second segment).
p : float
p-value for the test statistic.
"""
if isinstance(arr, pl.Series):
arr = arr.to_numpy()
n = len(arr)
K = np.zeros(n)
# Compute rank matrix difference in a vectorized way
for t in range(n):
left = arr[:t]
right = arr[t:]
if left.size > 0 and right.size > 0:
K[t] = np.sum(np.sign(left[:, None] - right[None, :]))
tau = int(np.argmax(np.abs(K)))
U = np.max(np.abs(K))
p = 2 * np.exp((-6 * U**2) / (n**3 + n**2))
return tau, p
[docs]
def simple_precip_intensity_index(data: pl.DataFrame, target_gauge_col: str, wet_threshold: int | float) -> float:
"""
Calculate simple precipitation intensity index.
Parameters
----------
data :
Rainfall data
target_gauge_col :
Column with rainfall data
wet_threshold :
Threshold for rainfall intensity in given time period
Returns
-------
sdii_val :
Simple precipitation intensity index
"""
data_rain_sum = data.filter(pl.col(target_gauge_col) >= wet_threshold).fill_nan(0.0).sum()[target_gauge_col][0]
data_wet_day_count = data.filter(pl.col(target_gauge_col) >= wet_threshold).drop_nans().count()[target_gauge_col][0]
return data_rain_sum / float(data_wet_day_count)