Source code for rainfallqc.checks.timeseries_checks

# -*- coding: utf-8 -*-
"""
Quality control checks based on suspicious time-series artefacts.

Time-series checks are defined as QC checks that: "detect abnormalities in patterns of the data record."

Classes and functions ordered by appearance in IntenseQC framework.
"""

import numpy as np
import polars as pl
import xarray as xr

from rainfallqc.core.all_qc_checks import qc_check
from rainfallqc.utils import data_readers, data_utils, neighbourhood_utils, spatial_utils, stats

DAILY_DIVIDING_FACTOR = {"15m": 96, "1h": 24, "1d": 1, "hourly": 24, "daily": 1}


[docs] @qc_check("check_dry_period_cdd", require_non_negative=True) def check_dry_period_cdd( data: pl.DataFrame, target_gauge_col: str, time_res: str, gauge_lat: int | float, gauge_lon: int | float ) -> pl.DataFrame: """ Identify suspiciously long dry periods in time-series using the ETCCDI Consecutive Dry Days (CDD) index. This is QC12 from the IntenseQC framework. Parameters ---------- data : Rainfall data target_gauge_col : Column with rainfall data time_res : Temporal resolution of the time series either '15m', 'daily' or 'hourly' gauge_lat : latitude of the rain gauge gauge_lon : longitude of the rain gauge Returns ------- data_w_dry_spell_flags : Data with dry spell flags """ if time_res != "15m" and time_res != "daily" and time_res != "hourly": raise ValueError("time_res must be 'daily' or 'hourly'") # 1. Load CDD data etccdi_cdd = data_readers.load_etccdi_data(etccdi_var="CDD") # 2. Make dry spell days column from ETCCDI data etccdi_cdd_days = compute_dry_spell_days(etccdi_cdd) # 3. Get nearest local CDD value to the gauge coordinates nearby_etccdi_cdd_days = neighbourhood_utils.get_nearest_non_nan_etccdi_val_to_gauge( etccdi_cdd_days, etccdi_name="CDD", gauge_lat=gauge_lat, gauge_lon=gauge_lon ) # 4. Get local maximum CDD_days value max_etccdi_cdd_days = np.max(nearby_etccdi_cdd_days["CDD_days"]) # 5. Get dry spell durations (with start and end dates) gauge_dry_spell_lengths = get_dry_spell_duration(data, target_gauge_col) # 6. Flag dry spells gauge_dry_spell_lengths_flags = flag_dry_spell_duration(gauge_dry_spell_lengths, max_etccdi_cdd_days, time_res) # 7. Join data back to main data and flag data_w_dry_spell_flags = join_dry_spell_data_back_to_original(data, gauge_dry_spell_lengths_flags) # 8. Join rain col back data_w_dry_spell_flags = data_w_dry_spell_flags.with_columns(pl.lit(data[target_gauge_col]).alias(target_gauge_col)) # 9. Remove unnecessary columns return data_w_dry_spell_flags.select(["time", "dry_spell_flag"])
[docs] @qc_check("check_daily_accumulations", require_non_negative=True) def check_daily_accumulations( data: pl.DataFrame, target_gauge_col: str, gauge_lat: int | float, gauge_lon: int | float, wet_day_threshold: int | float = 1.0, accumulation_multiplying_factor: int | float = 2.0, accumulation_threshold: float = None, ) -> pl.DataFrame: """ Identify suspicious periods where an hour of rainfall is preceded by 23 hours with no rain. Uses a simple precipitation intensity index (SDII) from ETCCDI. This is QC13 from the IntenseQC framework. Please see 'Notes' below for any additional information about the implementation of this method. Parameters ---------- data : Hourly or 15-min rainfall data target_gauge_col : Column with rainfall data gauge_lat : latitude of the rain gauge gauge_lon : longitude of the rain gauge wet_day_threshold : Threshold for rainfall intensity in one day (default is 1 mm) accumulation_multiplying_factor : Factor to multiply SDII value for to identify an accumulation of rain recordings accumulation_threshold : Rain accumulation for detecting possible daily accumulations Returns ------- data_w_daily_accumulation_flags : Data with daily accumulation flags Notes ----- This method returns only 0 and 1 flags. This differs from the description of the daily accumulation check from IntenseQC. This decision was taken as the IntenseQC python package only returns 0 and 1 flags. """ # 0. Check data is 15 min or hourly data_utils.check_data_is_specific_time_res(data, time_res=["15m", "1h"]) time_step = data_utils.get_data_timestep_as_str(data) if time_step == "15m": original_data = data.clone() data = data_utils.resample_data_by_time_step( data, rain_cols=[target_gauge_col], time_col="time", time_step="1h", min_count=2, hour_offset=0 ) # 1. Get accumulation threshold from ETCCDI SDII value, if not given if not accumulation_threshold: accumulation_threshold = get_accumulation_threshold_from_etccdi( data, target_gauge_col, time_res=time_step, gauge_lat=gauge_lat, gauge_lon=gauge_lon, wet_day_threshold=wet_day_threshold, accumulation_multiplying_factor=accumulation_multiplying_factor, ) # 2. Flag daily (24 hour) accumulations in hourly data based on SDII threshold da_flags = flag_accumulation_periods( data, target_gauge_col, accumulation_threshold=accumulation_threshold, accumulation_period_in_hours=24 ) # 3. Add daily_accumulation column data = data.with_columns(daily_accumulation=pl.Series(da_flags)) # 4. Convert back to 15-min data if needed if time_step == "15m": data = data_utils.downsample_and_fill_columns( high_res_data=original_data, low_res_data=data, data_cols="daily_accumulation", fill_limit=3, fill_method="backward", ) # 5. Remove unnecessary columns return data.select(["time", "daily_accumulation"])
[docs] @qc_check("check_monthly_accumulations", require_non_negative=True) def check_monthly_accumulations( data: pl.DataFrame, target_gauge_col: str, gauge_lat: int | float, gauge_lon: int | float, min_dry_spell_duration_in_days: int = 28, wet_day_threshold: int | float = 1.0, accumulation_multiplying_factor: int | float = 2.0, accumulation_threshold: float = None, ) -> pl.DataFrame: """ Identify suspicious periods when an hour of rainfall is preceded by 1 month with no rain. Flags two different types of accumulations: 1) dry, when the isolated high value 2) wet, when the isolated value is followed by a few more wet values Uses a simple precipitation intensity index (SDII) from ETCCDI. This is QC14 from the IntenseQC framework. Parameters ---------- data : Daily or Hourly or 15 min rainfall data target_gauge_col : Column with rainfall data gauge_lat : latitude of the rain gauge gauge_lon : longitude of the rain gauge min_dry_spell_duration_in_days : Minimum number of days in dry spell preceeding monthly accumulation (default is 28 i.e. Feb) wet_day_threshold : Threshold for rainfall intensity in one day (default is 1 mm) accumulation_multiplying_factor : Factor to multiply SDII value for to identify an accumulation of rain recordings (default is 2) accumulation_threshold : Rain accumulation for detecting possible monthly accumulations Returns ------- data_w_monthly_accumulation_flags : Data with monthly accumulation flags Notes ----- The original method filters out dry spells less than """ # 0. Check time step of data data_utils.check_data_is_specific_time_res(data, time_res=["15m", "1h", "1d"]) time_step = data_utils.get_data_timestep_as_str(data) # Set min and max amount of time steps to be a 'monthly' accumulation (-1 to remove rainfall accumulation) min_dry_spell_duration = min_dry_spell_duration_in_days * DAILY_DIVIDING_FACTOR[time_step] - 1 max_dry_spell_duration = 31 * DAILY_DIVIDING_FACTOR[time_step] - 1 # 1. Get accumulation threshold from ETCCDI SDII value, if not given if not accumulation_threshold: accumulation_threshold = get_accumulation_threshold_from_etccdi( data, target_gauge_col, time_res=time_step, gauge_lat=gauge_lat, gauge_lon=gauge_lon, wet_day_threshold=wet_day_threshold, accumulation_multiplying_factor=accumulation_multiplying_factor, ) # 2. Get info about dry spells in rainfall record gauge_dry_spell_info = get_dry_spell_info(data, target_gauge_col) # 3. Get possible accumulations gauge_data_possible_accumulations = get_possible_accumulations( gauge_dry_spell_info, target_gauge_col, accumulation_threshold ) # 4. Flag monthly (720 h) accumulations gauge_data_monthly_accumulations = flag_accumulation_based_on_next_dry_spell_duration( gauge_data_possible_accumulations, min_dry_spell_duration=min_dry_spell_duration, accumulation_col_name="monthly_accumulation", ) # 5. Fill in monthly accumulation flags gauge_data_monthly_accumulations = fill_in_monthly_accumulation_flags( gauge_data_monthly_accumulations, time_step=time_step, min_dry_spell_duration=min_dry_spell_duration, max_dry_spell_duration=max_dry_spell_duration, ) # 6. Remove unnecessary columns return gauge_data_monthly_accumulations.select(["time", "monthly_accumulation"])
[docs] @qc_check("check_streaks", require_non_negative=True) def check_streaks( data: pl.DataFrame, target_gauge_col: str, gauge_lat: int | float, gauge_lon: int | float, smallest_measurable_rainfall_amount: float, accumulation_threshold: float = None, ) -> pl.DataFrame: """ Check for suspected repeated values. Flags (TODO: could change numbers as original includes unhelpful 2): 1, if streaks of 2 or more repeated values exceeding 2* mean wet day rainfall 3, if streaks of 12 or more greater than smallest measurable rainfall amount 4, if streaks of 24 or more greater than zero 5, if period of zeros bounded by streaks of >= 24 This is QC15 from the IntenseQC framework. Parameters ---------- data : Hourly or 15-min data with rainfall. target_gauge_col : Column with rainfall data. gauge_lat : latitude of the rain gauge. gauge_lon : longitude of the rain gauge. smallest_measurable_rainfall_amount : Resolution of rainfall data (i.e. minimum rainfall recording). accumulation_threshold : Rain accumulation for detecting possible monthly accumulations Returns ------- data_w_streak_flags : Data with streak flags. """ # 0. Check data is 15 min or hourly data_utils.check_data_is_specific_time_res(data, time_res=["15m", "1h"]) time_step = data_utils.get_data_timestep_as_str(data) if time_step == "15m": original_data = data.clone() data = data.group_by_dynamic("time", every="1h").agg(pl.col(target_gauge_col).sum()) time_multiplier = 4 # 4x 15-min periods per hour else: time_multiplier = 1 # 1. Get accumulation threshold from ETCCDI SDII value, if not given if not accumulation_threshold: hourly_accumulation_threshold = get_accumulation_threshold_from_etccdi( data, target_gauge_col, time_res=time_step, gauge_lat=gauge_lat, gauge_lon=gauge_lon, wet_day_threshold=1.0, accumulation_multiplying_factor=2.0, ) accumulation_threshold = hourly_accumulation_threshold / time_multiplier # 2. Get streaks of repeated values streak_data = get_streaks_of_repeated_values(data, target_gauge_col) # 3. Flag streaks of 2 or more repeated large values exceeding 2 * mean wet day rainfall (from ETCCDI SDII) streak_flag1 = flag_streaks_exceeding_wet_day_rainfall_threshold( streak_data, target_gauge_col, streak_length=2, accumulation_threshold=accumulation_threshold ) # 4. Flag streaks of 12 or more greater than smallest measurable rainfall amount streak_flag3 = flag_streaks_exceeding_smallest_measurable_rainfall_amount( streak_data, target_gauge_col, streak_length=12 * time_multiplier, smallest_measurable_rainfall_amount=smallest_measurable_rainfall_amount, ) # 5. Flag streaks of 24 or more greater than zero streak_flag4 = flag_streaks_exceeding_zero(streak_data, target_gauge_col, streak_length=24 * time_multiplier) # 6. Flag periods of zeros bounded by streaks of multiples of 24 streak_flag5 = flag_streaks_of_zero_bounded_by_days(streak_data, target_gauge_col, time_res=time_step) # 7. Join flags together data_w_streak_flags = data.with_columns( streak_flag1=streak_flag1["streak_flag1"], streak_flag3=streak_flag3["streak_flag3"], streak_flag4=streak_flag4["streak_flag4"], streak_flag5=streak_flag5["streak_flag5"], ) # 8. Convert back to 15-min data if needed if time_step == "15m": data_w_streak_flags = data_utils.downsample_and_fill_columns( high_res_data=original_data, low_res_data=data_w_streak_flags, data_cols="^streak_flag.*$", fill_limit=3, fill_method="backward", ) return data_w_streak_flags.select(["time", "streak_flag1", "streak_flag3", "streak_flag4", "streak_flag5"])
[docs] def flag_streaks_of_zero_bounded_by_days(data: pl.DataFrame, target_gauge_col: str, time_res: str) -> pl.DataFrame: """ Flag streak of zeros bounded by record that are a multiple of 24 hours. Parameters ---------- data : Hourly, 15-min or daily data with rainfall. target_gauge_col : Column with rainfall data. time_res : Time resolution: "1h", "15m", "1d", or "hourly", "daily" Returns ------- streaks_w_flag5 : Data with streak flag 5. """ # 0. Check time resolution is expected if time_res not in DAILY_DIVIDING_FACTOR: raise ValueError(f"Unsupported time resolution: {time_res}. Use one of {list(DAILY_DIVIDING_FACTOR.keys())}") intervals_per_day = DAILY_DIVIDING_FACTOR[time_res] # 1. group of streaks data_streak_groups = ( data.group_by("streak_id") .agg(streak_len=pl.len(), rain_amount=pl.col(target_gauge_col).first()) .sort(by="streak_id") ) # 2. get dry spells streak_w_dry_spells = data_utils.get_dry_spells(data_streak_groups, target_gauge_col="rain_amount") # 3. Flag streaks of multiples of 1 day streaks_w_flag5 = streak_w_dry_spells.with_columns( ( pl.when( (pl.col("is_dry") == 1) & (pl.col("streak_len") % intervals_per_day == 0) & (pl.col("streak_len").shift(-1) >= intervals_per_day) ) .then(1) .otherwise(0) .alias("streak_flag5_next") ), ( pl.when( (pl.col("is_dry") == 1) & (pl.col("streak_len") % intervals_per_day == 0) & (pl.col("streak_len").shift(1) >= intervals_per_day) ) .then(1) .otherwise(0) .alias("streak_flag5_prev") ), ) # 4. Filter out only streaks where next or previous are multiples of 24 streaks_w_flag5 = streaks_w_flag5.filter((pl.col("streak_flag5_next") == 1) | (pl.col("streak_flag5_prev") == 1)) # 2. Label original data data_w_flags = data.with_columns( pl.when(pl.col("streak_id").is_in(streaks_w_flag5["streak_id"].unique().to_list())) .then(5) .otherwise(0) .alias("streak_flag5") ) return data_w_flags
[docs] def flag_streaks_exceeding_zero(data: pl.DataFrame, target_gauge_col: str, streak_length: int) -> pl.DataFrame: """ Flag values exceeding wet day rainfall accumulation threshold. Parameters ---------- data : Rainfall data with streak_id. target_gauge_col : Column with rainfall data. streak_length : Only streaks longer than this will be considered. Returns ------- data_w_flags : Data with streak flag 4 """ # 1. Get streak above length and exceeding zero streaks_exceeding_zero = get_streaks_above_threshold(data, target_gauge_col, streak_length, 0.0) # 2. Label original data data_w_flags = data.with_columns( pl.when(pl.col("streak_id").is_in(streaks_exceeding_zero["streak_id"].unique().to_list())) .then(4) .otherwise(0) .alias("streak_flag4") ) return data_w_flags
[docs] def flag_streaks_exceeding_smallest_measurable_rainfall_amount( data: pl.DataFrame, target_gauge_col: str, streak_length: int, smallest_measurable_rainfall_amount: float ) -> pl.DataFrame: """ Flag streaks exceeding smallest measurable rainfall amount in data. Parameters ---------- data: Rainfall data with streak_id.. target_gauge_col: Column with rainfall data. streak_length : Only streaks longer than this will be considered smallest_measurable_rainfall_amount: Resolution of rainfall data (i.e. minimum rainfall recording). Returns ------- data_w_flags : Data with streak flag 3 """ # 1. Get streak above length and smallest measurable rainfall amount streaks_above_smallest_measurable_rainfall_amount = get_streaks_above_threshold( data, target_gauge_col, streak_length, smallest_measurable_rainfall_amount ) # 2. Label original data data_w_flags = data.with_columns( pl.when( pl.col("streak_id").is_in(streaks_above_smallest_measurable_rainfall_amount["streak_id"].unique().to_list()) ) .then(3) .otherwise(0) .alias("streak_flag3") ) return data_w_flags
[docs] def flag_streaks_exceeding_wet_day_rainfall_threshold( data: pl.DataFrame, target_gauge_col: str, streak_length: int, accumulation_threshold: float ) -> pl.DataFrame: """ Flag values exceeding wet day rainfall accumulation threshold. Parameters ---------- data : Rainfall data with streak_id.. target_gauge_col : Column with rainfall data. streak_length : Only streaks longer than this will be considered accumulation_threshold : Threshold for rain accumulation. Returns ------- data_w_flags : Data with streak flag 1 """ # 1. Get streak above length and accumulation threshold streaks_above_accumulation = get_streaks_above_threshold( data, target_gauge_col, streak_length, accumulation_threshold ) # 2. Label original data data_w_flags = data.with_columns( pl.when(pl.col("streak_id").is_in(streaks_above_accumulation["streak_id"].unique().to_list())) .then(1) .otherwise(0) .alias("streak_flag1") ) return data_w_flags
[docs] def get_streaks_above_threshold( data: pl.DataFrame, target_gauge_col: str, streak_length: int, value_threshold: int | float ) -> pl.DataFrame: """ Get streak groups above given threshold. Parameters ---------- data : Rainfall data with streak_id.. target_gauge_col : Column with rainfall data. streak_length : Minimum length of streaks. value_threshold : Threshold to check . Returns ------- streaks_above_accumulation : Get all streaks above given value """ # 0. Cast threshold to float value_threshold = float(value_threshold) # 1. group of streaks data_streak_groups = ( data.group_by("streak_id") .agg(streak_len=pl.len(), rain_amount=pl.col(target_gauge_col).first()) .sort(by="streak_id") ) # 2. Get streaks above streak length and threshold streaks_above_accumulation = data_streak_groups.drop_nans().filter( (pl.col("streak_len") >= streak_length) & (pl.col("rain_amount") > value_threshold) ) return streaks_above_accumulation
[docs] def get_streaks_of_repeated_values(data: pl.DataFrame, data_col: str) -> pl.DataFrame: """ Get streaks of repeated values in time series. Parameters ---------- data : Data with time column. data_col : Column with values to check streaks in. Returns ------- streak_data : Data with streak column. """ # Step 1. get streaks columns streak_data = data.with_columns( (pl.when(pl.col(data_col) == pl.col(data_col).shift(1)).then(1).otherwise(0).alias("same_as_prev")) ) # Step 2. Label groups of streaks return streak_data.with_columns( streak_id=(1 - pl.col("same_as_prev")).cum_sum(), )
[docs] def flag_accumulation_based_on_next_dry_spell_duration( data: pl.DataFrame, min_dry_spell_duration: int | float, accumulation_col_name: str ) -> pl.DataFrame: """ Flag possible accumulation based on subsequent minimum dry spell duration. Flags: 3, if dry spell followed with high value then wet period (wet) 1, if dry spell followed with high value then no rain for next 23 hours (dry) 0, if neither Parameters ---------- data : Rainfall data with dry spell info and possible accumulation label min_dry_spell_duration : Minimum dry spell duration accumulation_col_name : Name for accumulation column Returns ------- data_w_flag : Data with accumulation flag """ return data.with_columns( pl.when( (pl.col("possible_accumulation") == 1) & (pl.col("dry_spell_length").fill_null(0.0) >= min_dry_spell_duration) & (pl.col("next_dry_spell").is_not_null()) ) .then(3) .when( (pl.col("possible_accumulation") == 1) & (pl.col("dry_spell_length").fill_null(0.0) >= min_dry_spell_duration) ) .then(1) .otherwise(0) .alias(accumulation_col_name) )
[docs] def fill_in_monthly_accumulation_flags( monthly_accumulation_flags: pl.DataFrame, time_step: str, min_dry_spell_duration: int | float, max_dry_spell_duration: int | float, ) -> pl.DataFrame: """ Fill in flags preceeding monthly accumulation. Parameters ---------- monthly_accumulation_flags : Rainfall data with monthly accumulation flag and dry spell info time_step : Time step of data i.e. '1h', '1d', '15m'. min_dry_spell_duration : Minimum dry spell duration max_dry_spell_duration : Maximum dry spell duration Returns ------- monthly_accumulation_flags : Data with accumulation flag filled in """ data_utils.check_data_is_specific_time_res(monthly_accumulation_flags, time_res=["15m", "1h", "1d"]) # 1. Set duration for month, if the preceeding dry spell is longer than a month if time_step == "15m": duration_to_remove = pl.duration(hours=max_dry_spell_duration / 4) elif time_step == "1h": duration_to_remove = pl.duration(hours=max_dry_spell_duration) else: duration_to_remove = pl.duration(days=max_dry_spell_duration) # 2. get monthly flag rows flagged_rows = monthly_accumulation_flags.filter(pl.col("monthly_accumulation") > 0) # 3. Fill in rows preceeding for row in flagged_rows.iter_rows(named=True): # Check dry spell is at least minimum for a month if row["dry_spell_length"] >= min_dry_spell_duration: # Check dry spell is at not over maximum for a month if row["dry_spell_length"] <= max_dry_spell_duration: dry_spell_start = row["dry_spell_start"] else: # fill in up to the maximum amount for the month dry_spell_start = pl.select(row["dry_spell_end"] - duration_to_remove).item() # Fill in values preceeding monthly_accumulation_flags = monthly_accumulation_flags.with_columns( pl.when((pl.col("time") <= row["dry_spell_end"]) & (pl.col("time") >= dry_spell_start)) .then(row["monthly_accumulation"]) .otherwise(pl.col("monthly_accumulation")) .alias("monthly_accumulation") ) return monthly_accumulation_flags
[docs] def get_surrounding_dry_spell_lengths(data: pl.DataFrame) -> pl.DataFrame: """ Make prev_dry_spell and next_dry_spell columns from dry_spell_lengths. Parameters ---------- data : Data with dry_spell_lengths Returns ------- data : Data with columns of previous and next dry spell durations """ return data.with_columns( prev_dry_spell=pl.col("dry_spell_length").shift(1), next_dry_spell=pl.col("dry_spell_length").shift(-1), )
[docs] def get_possible_accumulations( gauge_dry_spell_info: pl.DataFrame, target_gauge_col: str, accumulation_threshold: float ) -> pl.DataFrame: """ Get possible accumulations as 0 or 1 based on dry spell info. Parameters ---------- gauge_dry_spell_info : Rainfall data with columns with dry spell info (durations, first_wet_after_dry, etc.) target_gauge_col : Column with rainfall data accumulation_threshold : Threshold of rainfall intensity Returns ------- gauge_data_possible_accumulations : Data with 1 is possible accumulation, otherwise 0. """ # 1. Get values above daily accumulation threshold in one hour gauge_data_possible_accumulations = gauge_dry_spell_info.with_columns( pl.when(pl.col("dry_spell_end") == pl.col("time")) .then(pl.col(target_gauge_col).shift(-1).fill_nan(0.0) > accumulation_threshold) .otherwise(np.nan) .alias("possible_accumulation") ) # 2. Shift the value along gauge_data_possible_accumulations = gauge_data_possible_accumulations.with_columns( possible_accumulation=pl.col("possible_accumulation").shift(1) ) return gauge_data_possible_accumulations
[docs] def get_daily_non_wr_data(data: pl.DataFrame, target_gauge_col: str, time_res: str) -> pl.DataFrame: """ Get daily non-world record data. Parameters ---------- data : Hourly rainfall data target_gauge_col : Column with rainfall data time_res : Temporal resolution of the time series either '15m', 'daily' or 'hourly Returns ------- daily_data_not_wr : Daily rainfall data with world records filtered out """ # 1. Filter out hourly world records data_not_wr = stats.filter_out_rain_world_records(data, target_gauge_col, time_res=time_res) # 2. Group into daily resolution daily_data = data_utils.resample_data_by_time_step( data_not_wr, rain_cols=[target_gauge_col], time_col="time", time_step="1d", min_count=0, hour_offset=0 ) # 3. Filter out daily world records daily_data_not_wr = stats.filter_out_rain_world_records(daily_data, target_gauge_col, time_res="daily") return daily_data_not_wr
[docs] def get_local_etccdi_sdii_mean(gauge_lat: int | float, gauge_lon: int | float) -> float: """ Get the nearby ETCCDI Standard Precipitation Index mean SDII. Parameters ---------- gauge_lat : latitude of the rain gauge gauge_lon : longitude of the rain gauge Returns ------- nearby_etccdi_sdii_mean : Local mean SDII value """ # 1. Load SDII data etccdi_sdii = data_readers.load_etccdi_data(etccdi_var="SDII") # 2. Compute spatial mean etccdi_sdii = spatial_utils.compute_spatial_mean_xr(etccdi_sdii, var_name="SDII") # 3. Get nearest local CDD value to the gauge coordinates nearby_etccdi_sdii = neighbourhood_utils.get_nearest_non_nan_etccdi_val_to_gauge( etccdi_sdii, etccdi_name="SDII", gauge_lat=gauge_lat, gauge_lon=gauge_lon ) # 4. Get local maximum CDD_days value nearby_etccdi_sdii_mean = np.max(nearby_etccdi_sdii["SDII_mean"]) return nearby_etccdi_sdii_mean
[docs] def flag_accumulation_periods( data: pl.DataFrame, target_gauge_col: str, accumulation_threshold: float, accumulation_period_in_hours: int ) -> np.ndarray: """ Flag accumulation in a given period of hourly data. TODO: make work for daily using: DAILY_DIVIDING_FACTOR Parameters ---------- data : Hourly rainfall data target_gauge_col : Column with rainfall data accumulation_threshold : Rain accumulation for detecting possible period accumulations accumulation_period_in_hours : Accumulation period in hours Returns ------- pa_flags : Accumulation flags """ # Note uses n-hour moving window rain_vals = data[target_gauge_col] pa_flags = np.zeros_like(rain_vals) for i in range(len(rain_vals) - accumulation_period_in_hours): period_rain_vals = rain_vals[i : i + accumulation_period_in_hours] pa_flag = flag_n_hours_accumulation_based_on_threshold( period_rain_vals, accumulation_threshold, n_hours=accumulation_period_in_hours ) if pa_flag > max(pa_flags[i : i + accumulation_period_in_hours]): pa_flags[i : i + accumulation_period_in_hours] = np.full(accumulation_period_in_hours, pa_flag) return pa_flags
[docs] def flag_n_hours_accumulation_based_on_threshold( period_rain_vals: pl.Series, accumulation_threshold: float, n_hours: int ) -> int | float: """ Flag a period as accumulation if a value is preceded by n hourly recordings of 0. Parameters ---------- period_rain_vals : One period of rain values accumulation_threshold : Reference SDII threshold n_hours : Number of hours in reference period Returns ------- flag : 1 if period accumulation, otherwise 0 """ flag = 0 if period_rain_vals.is_nan().all(): return np.nan elif period_rain_vals[n_hours - 1]: if period_rain_vals[n_hours - 1] > 0: dry_hours = 0 for h in range(n_hours - 1): if period_rain_vals[h] is None: continue elif period_rain_vals[h] <= 0: dry_hours += 1 if dry_hours == n_hours - 1: if period_rain_vals[n_hours - 1] > accumulation_threshold: flag = 1 return flag
[docs] def get_accumulation_threshold( etccdi_sdii: float, gauge_sdii: float, accumulation_multiplying_factor: int | float ) -> float: """ Get rainfall accumulation threshold based on ETCCDI or rain gauge Standard Precipitation Intensity Index (index). Parameters ---------- etccdi_sdii : SDII value from ETCCDI gauge_sdii : SDII value from rain gauge accumulation_multiplying_factor : Factor to multiply to SDII value for to identify an accumulation of rain recordings Returns ------- accumulation_threshold : Reference SDII threshold """ if np.isnan(etccdi_sdii): accumulation_threshold = gauge_sdii * accumulation_multiplying_factor else: accumulation_threshold = etccdi_sdii * accumulation_multiplying_factor return accumulation_threshold
[docs] def get_accumulation_threshold_from_etccdi( data: pl.DataFrame, target_gauge_col: str, time_res: str, gauge_lat: int | float, gauge_lon: int | float, wet_day_threshold: float, accumulation_multiplying_factor: float, ) -> float: """ Get rain accumulation threshold from ETCCDI data. Parameters ---------- data : Rainfall data. target_gauge_col : Column with rainfall data. time_res : Temporal resolution of the time series either '15m', 'daily' or 'hourly' gauge_lat : latitude of the rain gauge. gauge_lon : longitude of the rain gauge. wet_day_threshold : Threshold for rainfall intensity in one day (whether it is a wet day or not) accumulation_multiplying_factor : Factor to multiply SDII value for to identify an accumulation of rain recordings Returns ------- accumulation_threshold : Rain accumulation threshold that is e.g. 2*standard precipitation intensity threshold """ # 1. Get local mean ETCCDI SDII value (this is the default for SDII in this method) etccdi_sdii = get_local_etccdi_sdii_mean(gauge_lat, gauge_lon) # 2. Filter out world records daily_data_non_wr = get_daily_non_wr_data(data, target_gauge_col, time_res) # 3. Calculate simple precipitation intensity index from daily data gauge_sdii = stats.simple_precip_intensity_index(daily_data_non_wr, target_gauge_col, wet_day_threshold) # 4. Get rain gauge accumulation threshold return get_accumulation_threshold(etccdi_sdii, gauge_sdii, accumulation_multiplying_factor)
[docs] def join_dry_spell_data_back_to_original(data: pl.DataFrame, dry_spell_lengths_flags: pl.DataFrame) -> pl.DataFrame: """ Flag dry spell data using dry spell lengths. Parameters ---------- data : Rainfall data dry_spell_lengths_flags : Data with dry spell flags Returns ------- dry_spell_flag_data : Data with dry spell flags """ # 1. Make template of new data dry_spell_flag_data = pl.DataFrame({"time": data["time"], "dry_spell_flag": np.zeros(data["time"].shape)}) # 2. Get all non-0 flags (i.e. suspicious dry spells) dry_spell_non_zero = dry_spell_lengths_flags.filter(pl.col("dry_spell_flag") > 0) # 3. Loop through problematic flags and label the original data based on duration of dry spell for non_zero_data_row in dry_spell_non_zero.iter_rows(): # overwrite flag dry_spell_flag_data = dry_spell_flag_data.with_columns( pl.when((pl.col("time") >= non_zero_data_row[1]) & (pl.col("time") <= non_zero_data_row[2])) .then(non_zero_data_row[4]) .otherwise(pl.col("dry_spell_flag")) .alias("dry_spell_flag") ) return dry_spell_flag_data
[docs] def flag_dry_spell_duration( dry_spell_lengths: pl.DataFrame, ref_dry_spell_length: int | float, time_res: str ) -> pl.DataFrame: """ Flag the dry spell duration using reference local dry spell length. Parameters ---------- dry_spell_lengths : Data with dry spell lengths ref_dry_spell_length : Reference dry spell length time_res : Temporal resolution of the time series either 'daily' or 'hourly' Returns ------- dry_spell_lengths_flags : Data with dry spell flags """ # May need to rethink how this is done uniformly (as could use day check) dry_spell_lengths_flags = dry_spell_lengths.with_columns( pl.when(pl.col("dry_spell_length").is_null() | pl.col("dry_spell_length").is_nan()) .then(np.nan) .when(pl.col("dry_spell_length") / DAILY_DIVIDING_FACTOR[time_res] >= ref_dry_spell_length * 1.5) .then(4) .when(pl.col("dry_spell_length") / DAILY_DIVIDING_FACTOR[time_res] >= ref_dry_spell_length * 1.33) .then(3) .when(pl.col("dry_spell_length") / DAILY_DIVIDING_FACTOR[time_res] >= ref_dry_spell_length * 1.2) .then(2) .when(pl.col("dry_spell_length") / DAILY_DIVIDING_FACTOR[time_res] >= ref_dry_spell_length) .then(1) .otherwise(0) .alias("dry_spell_flag") ) return dry_spell_lengths_flags
[docs] def get_dry_spell_duration(data: pl.DataFrame, target_gauge_col: str) -> pl.DataFrame: """ Get consecutive dry spell duration. Parameters ---------- data : Rainfall data target_gauge_col : Column with rainfall data Returns ------- gauge_dry_spell_lengths : Data with dry spell start, end and duration """ # 1. Get dry spells gauge_dry_spells = data_utils.get_dry_spells(data, target_gauge_col) # 2. Get consecutive groups of dry spells gauge_dry_spell_groups = get_consecutive_dry_days(gauge_dry_spells) # 3. Get dry spell lengths gauge_dry_spell_lengths = ( gauge_dry_spell_groups.filter(pl.col("is_dry") == 1) .group_by("dry_group_id") .agg( pl.first("time").alias("dry_spell_start"), pl.last("time").alias("dry_spell_end"), pl.col("is_dry").sum().alias("dry_spell_length"), ) .sort("dry_group_id") ) return gauge_dry_spell_lengths
[docs] def get_first_wet_after_dry_spell(data: pl.DataFrame, target_gauge_col: str) -> pl.DataFrame: """ Get first non-zero rainfall value after dry spell. Parameters ---------- data : Rainfall data target_gauge_col : Column with rainfall data Returns ------- data_w_first_wet : Data with binary column denoting first wet after dry spell """ # 1. Get dry spells gauge_dry_spells = data_utils.get_dry_spells(data, target_gauge_col) # 2. Get consecutive groups of dry spells gauge_dry_spell_groups = get_consecutive_dry_days(gauge_dry_spells) return gauge_dry_spell_groups.with_columns( pl.when((pl.col("is_dry") == 0) & (pl.col("dry_group_id").diff().fill_null(0) == 1)) .then(pl.col("time")) .otherwise(None) .alias("first_wet_after_dry") )
[docs] def get_dry_spell_info(data: pl.DataFrame, target_gauge_col: str) -> pl.DataFrame: """ Get summary of dry spells (i.e. duration and first wet value after dry and previous and next dry spells duration). Parameters ---------- data : Hourly rainfall data target_gauge_col : Column with rainfall data Returns ------- gauge_dry_spell_info : Data with dry spell information """ # 1. Get dry spell durations (with start and end dates) gauge_dry_spell_lengths = get_dry_spell_duration(data, target_gauge_col) # 2. Get first wet value after consecutive dry spell gauge_first_wet_after_dry = get_first_wet_after_dry_spell(data, target_gauge_col) # 3. Join data together gauge_dry_spell_info = gauge_first_wet_after_dry.join(gauge_dry_spell_lengths, on="dry_group_id", how="left") # 4. Get previous and next dry spell durations for flagging return get_surrounding_dry_spell_lengths(gauge_dry_spell_info)
[docs] def get_consecutive_dry_days(gauge_dry_spells: pl.DataFrame) -> pl.DataFrame: """ Get consecutive groups of 0 rainfall days. Parameters ---------- gauge_dry_spells : Data with 'is_dry' column Returns ------- gauge_dry_spell_groups : Data with group ids for consecutive dry days """ return gauge_dry_spells.with_columns(((pl.col("is_dry").diff().fill_null(0) == 1).cum_sum()).alias("dry_group_id"))
[docs] def compute_dry_spell_days(dry_spell_data: xr.Dataset) -> xr.Dataset: """ Compute dry spells in days from ETCCDI Consecutive Dry Days data. Parameters ---------- dry_spell_data : ETCCDI CDD index data Returns ------- dry_spell_days : ETCCDI CDD index data with `CDD_days` variable """ # Convert CDD from seconds to days dry_spell_days = data_utils.convert_datarray_seconds_to_days(dry_spell_data["CDD"]) # Mask out non-land data dry_spell_days[dry_spell_days < 0.0] = np.nan # Remove errors from data where more than 366 days are dry dry_spell_days[dry_spell_days > 366] = np.nan # remove errors # Remove invalid data dry_spell_days = np.ma.masked_invalid(dry_spell_days) # Make CDD days variable dry_spell_data["CDD_days"] = (("lat", "lon"), np.ma.max(dry_spell_days, axis=0)) return dry_spell_data