# -*- coding: utf-8 -*-
"""
Data loading tools.
Classes for reading rain gauge network data at bottom of file.
"""
import datetime
import glob
import os.path
import zipfile
from abc import ABC, abstractmethod
from importlib import resources
from typing import List
import numpy as np
import pandas as pd
import polars as pl
import xarray as xr
from rainfallqc.utils import data_utils, neighbourhood_utils
DAILY_MULTIPLYING_FACTORS = {"15m": 96, "hourly": 24, "daily": 1}
MONTHLY_MULTIPLYING_FACTORS = {"15m": 96 * 24, "hourly": 30 * 24, "daily": 30, "monthly": 1}
GSDR_TIME_RES_CONVERSION = {"1hr": "hourly", "1d": "daily", "1mo": "monthly"}
GPCC_TIME_RES_CONVERSION = {"tw": "daily", "mw": "monthly"}
GPCC_HOUR_OFFSET = 7 # Apparently the GSDR data runs from 7am to 7am, so this converts it for comparison
[docs]
def read_gsdr_data_from_file(
data_path: str,
raw_data_time_res: str,
rain_col_prefix: str = None,
rain_col_suffix: str = None,
suffix_only: bool = False,
gsdr_header_rows: int = 20,
) -> pl.DataFrame:
"""
Read GSDR data from file.
Note: this was developed on the GSDR data available from IntenseQC. So it needs a number of header rows in data.
Parameters
----------
data_path :
Path to GSDR data file
raw_data_time_res :
Time resolution of data record i.e. 'hourly' or 'daily'
rain_col_prefix :
Prefix for column for target_gauge_col (set as None by default)
rain_col_suffix :
Suffix for column name for target_gauge_col (set as None by default)
suffix_only :
Override to only include the suffix e.g. if the column name is the ID)
gsdr_header_rows :
Number of rows to skip in the header of the GSDR data (default=20)
Returns
-------
gsdr_data :
GSDR data as Pandas DataFrame
"""
# 1. read in metadata of gauge
gsdr_metadata = read_gsdr_metadata(data_path)
if suffix_only:
rain_col_name = f"{rain_col_suffix}"
else:
rain_col_name = f"{gsdr_metadata['original_units']}"
if rain_col_prefix:
rain_col_name = f"{rain_col_prefix}_" + rain_col_name
if rain_col_suffix:
rain_col_name += f"_{rain_col_suffix}"
# 2. read in gauge data
gsdr_data = pl.read_csv(
data_path,
skip_rows=gsdr_header_rows,
new_columns=[rain_col_name],
schema_overrides={rain_col_name: pl.Float64},
)
# 3. add datetime column to data
gsdr_data = add_datetime_to_gsdr_data(
gsdr_data, gsdr_metadata, multiplying_factor=DAILY_MULTIPLYING_FACTORS[raw_data_time_res]
)
gsdr_data = data_utils.replace_missing_vals_with_nan(
gsdr_data, target_gauge_col=rain_col_name, missing_val=int(gsdr_metadata["no_data_value"])
)
# 4. Select time and rain col
gsdr_data = gsdr_data.select(["time", rain_col_name]) # Reorder (to look nice)
return gsdr_data
[docs]
def read_gpcc_data_from_zip(
data_path: str,
gpcc_file_name: str,
target_gauge_col: str,
time_res: str,
hour_offset: int = GPCC_HOUR_OFFSET,
missing_val: int | float = -999,
) -> pl.DataFrame:
"""
Read the specific format and header of Global Precipitation Climatology Centre (GPCC) files.
Parameters
----------
data_path :
path to GPCC zip file
gpcc_file_name :
Name of GPCC file within zip
target_gauge_col :
Name of rainfall column
time_res :
'daily' or 'monthly'
hour_offset :
Hours to offset grouped data by (default is 7)
missing_val :
Missing value (default: -999)
Returns
-------
gpcc_data : dict
Data from GPCC file
"""
assert ".zip" in data_path, "Data needs to be a zip file"
# 0. Load GPCC data
f = zipfile.ZipFile(data_path).open(gpcc_file_name)
gpcc_data = pl.from_pandas(pd.read_csv(f, skiprows=1, header=None, sep=r"\s+"))
if time_res == "daily":
# 1. drop unnecessary columns
gpcc_data = gpcc_data.drop([str(i) for i in range(4, len(gpcc_data.columns))])
# 2. make daily datetime column (apparently it's 7am-7pm)
gpcc_data = gpcc_data.with_columns(
pl.datetime(pl.col("2"), pl.col("1"), pl.col("0"), hour_offset).alias("time")
).drop(["0", "1", "2"])
# 3. rename and reorder
gpcc_data = gpcc_data.rename({"3": target_gauge_col})
elif time_res == "monthly":
# 1. drop unnecessary columns
gpcc_data = gpcc_data.drop([str(i) for i in range(3, len(gpcc_data.columns))])
# 2. make monthly datetime column
gpcc_data = gpcc_data.with_columns(pl.datetime(year=pl.col("1"), month=pl.col("0"), day=1).alias("time")).drop(
["0", "1"]
)
# 3. rename and reorder
gpcc_data = gpcc_data.rename({"2": target_gauge_col})
else:
raise ValueError(f"Time resolution={time_res} not recognized. Please use 'daily' or 'monthly'")
# 4. Check data is specific format
try:
data_utils.check_data_is_specific_time_res(gpcc_data, time_res)
except ValueError as ve:
print(ve)
print(f"Attempting to resample into {time_res}")
gpcc_data = gpcc_data.group_by_dynamic(
"time", every=data_utils.TEMPORAL_CONVERSIONS[time_res], offset=str(hour_offset) + "h"
).agg(pl.col(target_gauge_col).first())
# 5. Select time and rain col
gpcc_data = gpcc_data.select(["time", target_gauge_col]) # Reorder (to look nice)
# 6. Replace missing value
gpcc_data = data_utils.replace_missing_vals_with_nan(
gpcc_data, target_gauge_col=target_gauge_col, missing_val=missing_val
)
return gpcc_data
[docs]
def add_datetime_to_gsdr_data(
gsdr_data: pl.DataFrame, gsdr_metadata: dict, multiplying_factor: int | float
) -> pl.DataFrame:
"""
Add datetime column to GSDR gauge data using metadata from that gauge.
NOTE: Could maybe extend so can find metadata if not provided?
Parameters
----------
gsdr_data :
GSDR data
gsdr_metadata :
Metadata from GSDR file
multiplying_factor : int or float
Factor to multiply the data by.
Returns
-------
gsdr_data
GSDR data with datetime column added
"""
start_date = gsdr_metadata["start_datetime"]
end_date = gsdr_metadata["end_datetime"]
assert isinstance(start_date, datetime.datetime), (
"Please convert start_ and end_datetime to datetime.datetime objects"
)
date_interval = []
delta_days = ((end_date + datetime.timedelta(days=1)) - start_date).days
for i in range(delta_days * multiplying_factor):
date_interval.append(start_date + datetime.timedelta(hours=i))
# add time column
assert len(gsdr_data) == len(date_interval)
gsdr_data = gsdr_data.with_columns(time=pl.Series(date_interval))
return gsdr_data
[docs]
def load_etccdi_data(etccdi_var: str, path_to_etccdi: str = None) -> xr.Dataset:
"""
Load ETCCDI data.
Parameters
----------
etccdi_var :
variable to load from ETCCDI
path_to_etccdi :
path to ETCCDI data (default is location of data in tests)
Returns
-------
etccdi_data :
Loaded data
"""
if not path_to_etccdi:
netcdf_file = f"RawData_HADEX2_{etccdi_var}_1951-2010_ANN_from-90to90_from-180to180.nc"
path_to_etccdi_data = resources.files("rainfallqc.data.ETCCDI").joinpath(netcdf_file)
etccdi_data = xr.open_dataset(str(path_to_etccdi_data), decode_timedelta=True, engine="netcdf4")
else:
print(f"User defined path to ETCCDI being used: {path_to_etccdi}")
etccdi_data = xr.open_dataset(
f"{path_to_etccdi}RawData_HADEX2_{etccdi_var}_1951-2010_ANN_from-90to90_from-180to180.nc",
decode_timedelta=True,
engine="netcdf4",
)
etccdi_data.load()
return etccdi_data
[docs]
def get_paths_using_gauge_ids(
gauge_ids: List[str] | np.ndarray[str], dir_path: str, file_format: str, time_res: str = None
) -> dict:
"""
Get data path of Gauge IDs.
Parameters
----------
gauge_ids :
Array of gauge IDs
dir_path :
Path to data directory
file_format :
Format of files in directory.
time_res :
Time resolution (e.g. 'mw' or 'tw')
Returns
-------
gauge_paths :
Dictionary of gauge ID and path
"""
all_data_paths = {}
for g_id in gauge_ids:
if time_res:
g_id_path = glob.glob(f"{dir_path}*{time_res}*{g_id}*{file_format}")
else:
time_res = ""
g_id_path = glob.glob(f"{dir_path}*{g_id}*{file_format}")
try:
all_data_paths[g_id] = g_id_path[0]
except IndexError:
print(f"Cannot find data for {time_res} {g_id} in directory {dir_path} with file format {file_format}.")
all_data_paths[g_id] = g_id_path[0]
return all_data_paths
[docs]
class GaugeNetworkReader(ABC):
"""Base class for reading rain gauge networks."""
def __init__(self, path_to_gauge_network: str):
"""Load network reader."""
self.path_to_gauge_network = path_to_gauge_network
self.metadata = self._load_metadata()
@abstractmethod
def _load_metadata(self) -> dict:
"""Must be implemented by subclasses to load gauge network metadata."""
# @abstractmethod
# def load_network_data(self) -> pl.DataFrame:
# """Must be implemented by subclasses to load gauge network data."""
# pass
[docs]
def get_nearest_overlapping_neighbours_to_target(
self, target_id: str, distance_threshold: int | float, n_closest: int, min_overlap_days: int
) -> set:
"""
Get IDs of the nearest neighbours to a target whilst checking that there is at least a minimum time overlap.
Parameters
----------
target_id :
Target gauge to get neighbour IDs of
distance_threshold :
Distance threshold to check for neighbours
n_closest :
Number of nearest neighbours to return
min_overlap_days :
Minimum time overlap between neighbours to return
Returns
-------
neighbouring_gauge_id :
IDs of neighbouring gauges within a given distance to target and min overlapping days
"""
all_neighbour_ids = neighbourhood_utils.get_ids_of_n_nearest_overlapping_neighbouring_gauges(
self.metadata, target_id, distance_threshold, n_closest, min_overlap_days
)
return all_neighbour_ids
[docs]
class GSDRNetworkReader(GaugeNetworkReader):
"""GSDR rain gauge network reader."""
def __init__(self, path_to_gsdr_dir: str, file_format: str = ".txt"):
"""Load network reader."""
self.path_to_gsdr_dir = path_to_gsdr_dir
self.file_format = file_format
super().__init__(path_to_gsdr_dir)
self.data_paths = self._get_data_paths()
self.metadata = self._add_paths_to_metadata()
self.time_res = self.metadata["new_timestep"][0]
def _load_metadata(self) -> pl.DataFrame:
"""
Load metadata from GSDR gauge metadata path.
Returns
-------
metadata :
Metadata of GSDR gauges.
"""
metadata = load_gsdr_gauge_network_metadata(self.path_to_gsdr_dir, self.file_format)
return metadata
def _get_data_paths(self) -> dict:
"""
Get paths to gauge network of GSDR gauges.
Returns
-------
gauge_paths :
Dataframe of all GSDR gauges rain record.
"""
gauge_paths = get_paths_using_gauge_ids(
self.metadata["station_id"], self.path_to_gsdr_dir, file_format=self.file_format
)
return gauge_paths
def _add_paths_to_metadata(self) -> pl.DataFrame:
return self.metadata.with_columns(
pl.col("station_id").map_elements(self.data_paths.get, return_dtype=pl.Utf8).alias("path")
)
[docs]
def load_network_data(
self,
rain_col_prefix: str,
data_paths: List[str] | np.ndarray[str],
suffix_only: bool = False,
gsdr_header_rows: int = 20,
) -> pl.DataFrame:
"""
Load GSDR network data based on file paths.
Parameters
----------
data_paths :
Paths to load network data from.
rain_col_prefix :
Prefix for rain column name (default is 'rain')
suffix_only :
Override to only include the suffix e.g. if the column name is the ID)
gsdr_header_rows :
Number of rows to skip in the header of the GSDR data (default=20)
Returns
-------
network_data :
Dataframe of GSDR gauges.
"""
for ind, path in enumerate(data_paths):
# 1. get gauge_id name
gsdr_file_name = path.split("/")[-1]
gsdr_name = gsdr_file_name.split(".")[0]
# 2. Read in one gauge
one_gauge = read_gsdr_data_from_file(
data_path=path,
raw_data_time_res=GSDR_TIME_RES_CONVERSION[self.time_res],
rain_col_prefix=rain_col_prefix,
rain_col_suffix=gsdr_name,
suffix_only=suffix_only,
gsdr_header_rows=gsdr_header_rows,
)
# 3. Join data together
if ind == 0:
all_data = one_gauge
else:
all_data = all_data.join(one_gauge, on="time", how="full", coalesce=True)
all_data = all_data.sort("time")
return all_data
[docs]
class GPCCNetworkReader(GaugeNetworkReader):
"""GPCC rain gauge network reader."""
def __init__(
self, path_to_gpcc_dir: str, time_res: str, file_format: str = ".zip", unzipped_file_format: str = ".dat"
):
"""Load network reader."""
self.path_to_gpcc_dir = path_to_gpcc_dir
self.file_format = file_format
self.unzipped_file_format = unzipped_file_format
self.time_res = time_res
super().__init__(path_to_gpcc_dir)
self.data_paths = self._get_data_paths()
self.metadata = self._add_paths_to_metadata()
def _load_metadata(self) -> pl.DataFrame:
"""
Load metadata from GPCC gauge metadata path.
Returns
-------
metadata :
Metadata of GPCC gauges.
"""
metadata = load_gpcc_gauge_network_metadata(self.path_to_gpcc_dir, self.time_res)
return metadata
def _get_data_paths(self) -> dict:
"""
Get paths to gauge network of GPCC gauges.
Returns
-------
gauge_paths :
Dataframe of all GSDR gauges rain record.
"""
gauge_paths = get_paths_using_gauge_ids(
self.metadata["station_id"], self.path_to_gpcc_dir, file_format=self.file_format, time_res=self.time_res
)
return gauge_paths
def _add_paths_to_metadata(self) -> pl.DataFrame:
return self.metadata.with_columns(
pl.col("station_id").map_elements(self.data_paths.get, return_dtype=pl.Utf8).alias("path")
)
[docs]
def load_network_data(
self, data_paths: List[str] | np.ndarray[str], target_gauge_col: str, missing_val: int | float = -999.9
) -> pl.DataFrame:
"""
Load GPCC network data based on file paths.
Parameters
----------
data_paths :
Paths to load network data from.
target_gauge_col :
Rainfall data column
missing_val :
Missing value (default: -999)
Returns
-------
network_data :
Dataframe of GPCC gauges.
"""
for ind, zip_path in enumerate(data_paths):
# 1. Split file name and check time res is correct
gpcc_zip_file_name = zip_path.split("/")[-1]
assert self.time_res in gpcc_zip_file_name, (
f"Wrong time resolution for metadata: {self.time_res} & {gpcc_zip_file_name}"
)
gpcc_file_name = gpcc_zip_file_name.split(".zip")[0]
gpcc_unzipped_file_name = gpcc_file_name + self.unzipped_file_format
# 2. Read in one gauge
one_gauge = read_gpcc_data_from_zip(
data_path=zip_path,
target_gauge_col=f"{target_gauge_col}_{gpcc_file_name}",
gpcc_file_name=gpcc_unzipped_file_name,
time_res=GPCC_TIME_RES_CONVERSION[self.time_res],
missing_val=missing_val,
)
# 2. Join data together
if ind == 0:
all_data = one_gauge
else:
all_data = all_data.join(one_gauge, on="time", how="full", coalesce=True)
all_data = all_data.sort("time")
return all_data