# -*- coding: utf-8 -*-
"""
All data operations for polars including datetime and calendar functionality.
Classes and functions ordered alphabetically.
"""
import calendar
import datetime
from collections.abc import Sequence
from typing import List
import numpy as np
import polars as pl
import xarray as xr
SECONDS_IN_DAY = 86400.0
TEMPORAL_CONVERSIONS = {"hourly": "1h", "daily": "1d", "monthly": "1mo"}
MONTHLY_TIME_STEPS = ["28d", "29d", "30d", "31d"]
[docs]
def back_propagate_daily_data_flags(data: pl.DataFrame, flag_column: str, num_days: int) -> pl.DataFrame:
"""
Back fill-in flags a number of days.
This will prioritise higher flag values.
Parameters
----------
data :
Daily data with `flag_column`
flag_column :
column with flags
num_days:
Number of days to back-propagate
Returns
-------
data :
Data with flags back-propogated
"""
data = data.clone()
# Extract flag values
flag_values = (flag for flag in data[flag_column].unique() if (np.isfinite(flag) & (flag > 0)))
for flag, data_filtered in [(flag, data.filter(pl.col(flag_column) == flag)) for flag in flag_values]:
for time_to_use in data_filtered["time"]:
assert isinstance(time_to_use, datetime.datetime), "time_to_use must be datetime.datetime"
start_time = time_to_use - datetime.timedelta(days=num_days)
data = data.with_columns(
pl.when((pl.col("time") >= start_time) & (pl.col("time") <= time_to_use))
.then(flag)
.otherwise(pl.col(flag_column))
.alias(flag_column)
)
return data
[docs]
def calculate_dry_spell_fraction(data: pl.DataFrame, target_gauge_col: str, dry_period_days: int) -> pl.Series:
"""
Calculate dry spell fraction.
Parameters
----------
data :
Data with time column
target_gauge_col :
Column with rainfall data
dry_period_days :
Length for of a "dry_spell"
Returns
-------
rain_daily_dry_day :
Data with dry spell fraction
"""
if not isinstance(data, pl.DataFrame):
data = data.to_frame()
# 1. make dry day column
data_dry_days = get_dry_spells(data, target_gauge_col)
# 2. Calculate late dry spell fraction
data_dry_days = data_dry_days.with_columns(
dry_spell_fraction=pl.col("is_dry").rolling_sum(window_size=dry_period_days, min_samples=dry_period_days)
/ dry_period_days
)
return data_dry_days["dry_spell_fraction"]
[docs]
def check_data_has_consistent_time_step(data: pl.DataFrame) -> None:
"""
Check data has a consistent time step i.e. '1h'.
Parameters
----------
data :
Data with time column
Raises
------
ValueError :
If data has more than one time steps
"""
unique_timesteps = get_data_timesteps(data)
if unique_timesteps.len() != 1:
timestep_strings = [format_timedelta_duration(td) for td in unique_timesteps]
raise ValueError(
f"""Data has a inconsistent time step. Data has following time steps: {timestep_strings}.
One potential fix is to resample the data using: 'data.upsample(\"time\", every=\"[insert_timestep]\")'"""
)
[docs]
def check_data_is_monthly(data: pl.DataFrame) -> None:
"""
Check data is monthly.
Parameters
----------
data :
Data with time column
Raises
------
ValueError :
If data has a no monthly time steps
"""
unique_timesteps = get_data_timesteps(data)
timestep_strings = [format_timedelta_duration(td) for td in unique_timesteps]
if not all(ts in MONTHLY_TIME_STEPS for ts in timestep_strings):
raise ValueError(
f"Data contains non-monthly timesteps not like '29d', '30d', etc. Timesteps found are {timestep_strings}"
)
if not timestep_strings:
raise ValueError("No timesteps found in data.")
[docs]
def check_data_is_specific_time_res(data: pl.DataFrame, time_res: str | list) -> None:
"""
Check data has a hourly or daily time step.
Does not work for monthly data, please use 'check_data_is_monthly'.
Parameters
----------
data :
Data with time column.
time_res :
Time resolutions either a single string or list of strings
Raises
------
ValueError :
If data is not hourly or daily.
"""
# Normalize to list
if isinstance(time_res, str):
allowed_res = [time_res]
elif isinstance(time_res, Sequence):
allowed_res = list(time_res)
else:
raise TypeError("time_res must be a string or list of strings")
# add terms like 'hourly', 'daily' or 'monthly'
for time_conv in TEMPORAL_CONVERSIONS:
if time_res == time_conv:
allowed_res.append(TEMPORAL_CONVERSIONS[time_res])
# Get actual time step as a string like "1h"
time_step = get_data_timestep_as_str(data)
if time_step not in allowed_res:
raise ValueError(f"Invalid time step. Expected one of {allowed_res}, but data has time-step(s): {time_step}")
[docs]
def check_for_negative_values(df: pl.DataFrame, target_gauge_col: str) -> bool:
"""
Check if the target column contains any negative values.
Parameters
----------
df :
DataFrame to check.
target_gauge_col :
Column to check for negative values.
Raises
------
ValueError
If negative values are found in the target column.
"""
return (df[target_gauge_col] < 0).any()
[docs]
def convert_datarray_seconds_to_days(series_seconds: xr.DataArray) -> np.ndarray:
"""
Convert xarray series from seconds to days. For some reason the CDD data from ETCCDI is in seconds.
Parameters
----------
series_seconds :
Data in series to convert to days.
Returns
-------
series_days :
Data array converted to days.
"""
return series_seconds.values.astype("timedelta64[s]").astype("float32") / SECONDS_IN_DAY
[docs]
def convert_daily_data_to_monthly(
daily_data: pl.DataFrame, rain_cols: list, perc_for_valid_month: int | float = 95
) -> pl.DataFrame:
"""
Convert daily data to monthly whilst setting month to NaN if less than a given percentage of days is missing.
Parameters
----------
daily_data :
Daily data to convert to monthly
rain_cols :
Columns with rainfall data
perc_for_valid_month :
Percentage of month needed to be classed as a valid month for the monthly group by
Returns
-------
monthly_data :
Monthly data
"""
check_data_is_specific_time_res(daily_data, "daily")
# 1. Make month and year column
daily_data = make_month_and_year_col(daily_data)
# 2. Calculate expected days in month
daily_data = get_expected_days_in_month(daily_data)
agg_expressions = [pl.len().alias("n_days"), pl.col("expected_days_in_month").first()]
agg_expressions += [pl.col(col).sum().alias(col) for col in rain_cols]
# 3. Group data into monthly
monthly_data = (
daily_data.group_by_dynamic("time", every="1mo", closed="right")
.agg(agg_expressions)
.filter(
pl.col("n_days")
>= (
pl.col("expected_days_in_month") * perc_for_valid_month / 100
) # Ensure at least n% values for month are available
)
.drop("n_days", "expected_days_in_month")
)
return monthly_data
[docs]
def downsample_and_fill_columns(
high_res_data: pl.DataFrame,
low_res_data: pl.DataFrame,
data_cols: str | list[str],
fill_limit: int,
fill_method: str = "backward",
time_col: str = "time",
) -> pl.DataFrame:
"""
Join columns from lower resolution data to higher resolution data and fill gaps.
Parameters
----------
high_res_data :
Higher resolution data (e.g., 15-min)
low_res_data :
Lower resolution data with columns to join (e.g., hourly)
data_cols :
Column name(s) to join and fill. Can be:
- Single column name: "rainfall"
- List of columns: ["rain1", "rain2"]
- Regex pattern: "^rain.*$"
fill_limit :
Maximum number of intervals to fill
fill_method :
"forward", "backward", or "none"
time_col :
Name of time column (default: 'time')
Returns
-------
high_res_data_filled :
High resolution data with filled columns
"""
# Normalize data_cols to ensure it works with pl.col()
if isinstance(data_cols, str):
# Single column or regex pattern
cols_to_join = [time_col, pl.col(data_cols)]
else:
# List of column names
cols_to_join = [time_col] + [pl.col(col) for col in data_cols]
# Select time and all data columns to join
cols_to_join_df = low_res_data.select(cols_to_join)
# Join columns to high resolution data
result = high_res_data.join(cols_to_join_df, on=time_col, how="left")
# Apply fill method
if fill_method == "backward":
result = result.with_columns(pl.col(data_cols).backward_fill(limit=fill_limit))
elif fill_method == "forward":
result = result.with_columns(pl.col(data_cols).forward_fill(limit=fill_limit))
elif fill_method == "none":
pass # No filling
else:
raise ValueError(f"fill_method must be 'forward', 'backward', or 'none', got '{fill_method}'")
return result
[docs]
def downsample_monthly_data(
sub_monthly_data: pl.DataFrame,
monthly_data: pl.DataFrame,
data_cols: str | list[str],
time_col: str = "time",
) -> pl.DataFrame:
"""
Join monthly data to hourly and fill only within same month.
Parameters
----------
sub_monthly_data :
Sub-monthly data (e.g., hourly)
monthly_data :
Monthly data with columns to join
data_cols :
Column name(s) to join and fill. Can be:
- Single column name: "rainfall"
- List of columns: ["rain1", "rain2"]
time_col :
Name of time column (default: 'time')
Returns
-------
result :
Sub-monthly data with monthly columns joined and filled within month
"""
# Add month start column to both dataframes
data_with_month = sub_monthly_data.with_columns(pl.col(time_col).dt.truncate("1mo").alias("_month_start"))
monthly_with_month = monthly_data.with_columns(pl.col(time_col).dt.truncate("1mo").alias("_month_start"))
# Join on month start instead of exact time
result = data_with_month.join(
monthly_with_month.select(["_month_start", pl.col(data_cols)]), on="_month_start", how="left"
)
# backward fill within each month
result = result.with_columns(pl.col(data_cols).backward_fill().over("_month_start"))
# Drop temporary column
result = result.drop("_month_start")
return result
[docs]
def get_data_timestep_as_str(data: pl.DataFrame) -> str:
"""
Get time step of data.
Parameters
----------
data :
Data with time column
Returns
-------
time_step :
Time step of data i.e. '1h', '1d', '15m'.
"""
check_data_has_consistent_time_step(data)
unique_timestep = get_data_timesteps(data)
return format_timedelta_duration(unique_timestep[0])
[docs]
def get_data_timesteps(data: pl.DataFrame) -> pl.Series:
"""
Get data timesteps. Ideally the data should have 1.
Parameters
----------
data :
Data with time column.
Returns
-------
unique_timesteps :
All unique time steps in data (timedelta).
"""
data_timesteps = data.with_columns([pl.col("time").diff().alias("time_step")])
unique_timesteps = data_timesteps["time_step"].drop_nulls().unique()
return unique_timesteps
[docs]
def get_dry_period_proportions(dry_period_days: int) -> dict:
"""
Get dry period proportions.
Parameters
----------
dry_period_days :
Length for of a "dry_spell" (default: 15 days)
Returns
-------
fraction_dry_days :
Dictionary with keys "1", "2", "3" with dry spell fractions
"""
fraction_dry_days = {}
for d in range(1, 3 + 1):
fraction_dry_days[str(d)] = np.trunc((1.0 - (float(d) / dry_period_days)) * 10**2) / (10**2)
return fraction_dry_days
[docs]
def get_dry_spells(data: pl.DataFrame, target_gauge_col: str) -> pl.DataFrame:
"""
Get dry spell column.
Parameters
----------
data :
Rainfall data
target_gauge_col :
Column with rainfall data
Returns
-------
data_w_dry_spells :
Data with is_dry binary column
"""
return data.with_columns(
(pl.col(target_gauge_col) == 0).cast(pl.Int8()).alias("is_dry"),
)
[docs]
def get_expected_days_in_month(data: pl.DataFrame) -> pl.DataFrame:
"""
Get expected number of days in a months within the data.
Parameters
----------
data :
Data with 'year' and 'month' columns
Returns
-------
data:
Data with 'expected_days_in_month" column
"""
# Use map_elements + calendar.monthrange to compute days in each month
return data.with_columns(
[
pl.struct(["year", "month"])
.map_elements(
lambda x: calendar.monthrange(x["year"], x["month"])[1],
return_dtype=pl.Int64,
)
.alias("expected_days_in_month")
]
)
[docs]
def get_normalised_diff(data: pl.DataFrame, target_col: str, other_col: str, diff_col_name: str) -> pl.DataFrame:
"""
Ger normalised difference between two columns in data.
Parameters
----------
data :
Data with columns
target_col :
Target column
other_col :
Other column.
diff_col_name :
New column name for difference column
Returns
-------
data_w_norm_diff :
Data with normalised diff
"""
return data.with_columns(
(normalise_data(pl.col(target_col)) - normalise_data(pl.col(other_col))).alias(diff_col_name)
)
[docs]
def make_month_and_year_col(data: pl.DataFrame) -> pl.DataFrame:
"""
Make year and month columns for polars dataframe.
Parameters
----------
data :
Data with time column
Returns
-------
data :
Data with year and month columns
"""
return data.with_columns(
[
pl.col("time").dt.year().alias("year"),
pl.col("time").dt.month().alias("month"),
]
)
[docs]
def normalise_data(data: pl.Series | pl.expr.Expr) -> pl.Series:
"""
Normalise data to [0, 1].
Parameters
----------
data :
Data with time column.
Returns
-------
norm_data :
Normalised data.
"""
return (data - data.min()) / (data.max() - data.min())
[docs]
def offset_data_by_time(data: pl.DataFrame, target_col: str, offset_in_time: int, time_res: str) -> pl.DataFrame:
"""
Shift/offset data either backwards or forwards in time.
Parameters
----------
data :
Data with column to offset in 'time'
target_col :
Column of data to offset
offset_in_time :
Amount to offset data by i.e. 1 for 1 day if time_res set to '1d'
time_res :
Time resolution like 'hourly', 'daily', '1h' or '1d'
Returns
-------
data :
Offset data by 'offset_in_time' amount
"""
# 0. Check data is specific time_res
check_data_is_specific_time_res(data, time_res=time_res)
# 1. If time_res is like 'hourly' then get time res in a format that works with upsample i.e. '1d'
time_res = TEMPORAL_CONVERSIONS.get(time_res, time_res)
# 2. Upsample data to time_res to fill in gaps
data = data.upsample("time", every=time_res)
# 3. Shift data in time
return data.with_columns(
pl.col(target_col).last().over(pl.col("time").dt.truncate(time_res)).shift(offset_in_time),
)
[docs]
def replace_missing_vals_with_nan(
data: pl.DataFrame,
target_gauge_col: str,
missing_val: int = None,
) -> pl.DataFrame:
"""
Replace no data value with numpy.nan.
Parameters
----------
data :
Rainfall data
target_gauge_col :
Column of rainfall
missing_val :
Missing value identifier
Returns
-------
gsdr_data
GSDR data with missing values replaced
"""
if missing_val is None:
return data.with_columns(
pl.when(pl.col(target_gauge_col).is_null())
.then(np.nan)
.otherwise(pl.col(target_gauge_col))
.alias(target_gauge_col)
)
else:
return data.with_columns(
pl.when((pl.col(target_gauge_col).is_null()) | (pl.col(target_gauge_col) == missing_val))
.then(np.nan)
.otherwise(pl.col(target_gauge_col))
.alias(target_gauge_col)
)
[docs]
def resample_data_by_time_step(
data: pl.DataFrame, rain_cols: List[str], time_col: str, time_step: str, min_count: int, hour_offset: int
) -> pl.DataFrame:
"""
Group hourly data into daily and check for at least 24 daily time steps per day.
Parameters
----------
data :
Rainfall data to resample
rain_cols :
List of column with rainfall data
time_col :
Name of time column
time_step :
Time step to resample into (e.g. '1d' for daily, '1h' for hourly, '15m' for 15 minute)
min_count :
Minimum number of time steps needed per time period
hour_offset :
Time offset in hours (needed if data is not aligned to midnight)
Returns
-------
resampled_data :
Rainfall data grouped into a given time step
"""
# resample into daily (also round to 1 decimal place)
return data.group_by_dynamic(time_col, every=time_step, closed="left", label="left", offset=f"{hour_offset}h").agg(
[
pl.when(pl.col(col).count() >= min_count).then(pl.col(col).sum()).otherwise(None).alias(col)
for col in rain_cols
]
)