#!/usr/bin/env python3
"""Implement helper routines to deal with dates and times."""
from datetime import datetime, timezone
from typing import List, Tuple, Union
import dateutil.parser
import isodate
import pandas as pd
from dateutil.utils import default_tzinfo
from .aux_types import QuasiConstant
[docs]
class DatetimeConstants(QuasiConstant):
"""Datetime-related constants."""
# The regex in a json schema's "pattern" must use JavaScript syntax (ECMA 262). See
# https://json-schema.org/understanding-json-schema/reference/regular_expressions.html
ISO_8601_TIME_DURATION_REGEX = (
"^P(?!$)(\\d+Y)?(\\d+M)?(\\d+W)?(\\d+D)?"
+ "(T(?=\\d+[HMS])(\\d+H)?(\\d+M)?(\\d+S)?)?$"
)
DEFAULT_SHIFT = pd.Timedelta(0)
[docs]
def as_datetime(obj):
"""Convert obj to string, parse into datetime and add UTC timezone iff naive."""
return default_tzinfo(dateutil.parser.parse(str(obj)), tzinfo=timezone.utc)
[docs]
def as_timedelta(obj):
"""Convert obj to string and parse into pd.Timedelta."""
return pd.Timedelta(str(obj))
[docs]
def dt2str(dt):
"""Convert timdelta object to file name suitable string.
Args:
dt (timedelta object): duration
Returns:
duration (str): string representation of duration
suitable for FA files
"""
h = int(dt.seconds / 3600) + int(dt.days * 24)
m = int((dt.seconds % 3600 - dt.seconds % 60) / 60)
s = int(dt.seconds % 60)
duration = f"{h:04d}:{m:02d}:{s:02d}"
return duration
[docs]
def check_syntax(output_settings: Union[Tuple[str], List[str]], length: int):
"""Check syntax of output_settings.
Args:
output_settings (Union[Tuple[str], List[str]]): Specifies the output steps
length (integer): length to check on
Raises:
SystemExit: General system handler
"""
for x in output_settings:
if x.count(":") != length:
raise SystemExit(
f"Invalid argument {output_settings} for output_settings.\n"
"Please provide single time increment as a string "
"or a list of 'starttime:endtime:interval' choices"
)
[docs]
def expand_output_settings(
output_settings: Union[str, Tuple[str], List[str]], forecast_range: str
) -> List[List[pd.Timedelta]]:
"""Expand the output_settings coming from config.
Args:
output_settings (Union[str, Tuple[str], List[str]]):
Specifies the output steps
forecast_range (str): Forecast range in duration syntax
Raises:
RuntimeError: Handle erroneous time increment
Returns:
sections (List[List[pd.Timedelta]]) : List of output subsections.
Can be empty in case of empty output_settings
"""
output_intervals = []
if isinstance(output_settings, str):
check_syntax([output_settings], 0)
# Infer the output intervals from the forecast range and output settings
if output_settings != "":
output_intervals = [":".join(["PT0H", forecast_range, output_settings])]
else:
return output_intervals
elif isinstance(output_settings, (tuple, list)):
check_syntax(output_settings, 2)
output_intervals = output_settings
# Check for zero size time increments
zero_increment = as_timedelta("PT0H")
for interval in output_intervals:
if as_timedelta(interval.split(":")[2]) == zero_increment:
raise RuntimeError(f"Zero size time increments not allowed:{interval}")
# Convert the output intervals to a list of lists of timedelta objects
sections = [
[as_timedelta(item) for item in interval.split(":")]
for interval in output_intervals
]
return sections
[docs]
def oi2dt_list(
output_settings: Union[str, Tuple[str], List[str]], forecast_range: str
) -> List[pd.Timedelta]:
"""Build list of output occurences.
Args:
output_settings (Union[str, Tuple[str], List[str]]):
Specifies the output steps
forecast_range (str): Forecast range in duration syntax
Returns:
dt (List[pd.Timedelta]) : Sorted list of output occurences
"""
sections = expand_output_settings(output_settings, forecast_range)
output_dt = set()
current_dt: pd.Timedelta
forecast_timedelta = as_timedelta(forecast_range)
for start, end, step in sections:
current_dt = start
while current_dt <= end and current_dt <= forecast_timedelta:
output_dt.add(current_dt)
current_dt += step
return sorted(output_dt)
[docs]
def cycle_offset(
basetime,
bdcycle,
bdcycle_start=DatetimeConstants.DEFAULT_SHIFT,
bdshift=DatetimeConstants.DEFAULT_SHIFT,
):
"""Calculcate offset from a reference time.
Args:
basetime (datetime): Reference time
bdcycle (timedelta): Interval between cycles
bdcycle_start (timedelta): Time of day when bdcycle starts
bdshift (timedelta): shift of boundary usage
Returns:
timedelta : a timdelta object of the offset
"""
reftime = basetime.hour * 3600 + basetime.minute * 60 + basetime.second
bdcycle_shift = (
reftime - bdcycle_start.total_seconds() % bdcycle.total_seconds()
) % bdcycle.total_seconds()
final_shift = bdcycle_shift + int(bdshift.total_seconds())
return pd.Timedelta(seconds=final_shift)
[docs]
def get_decade(dt) -> str:
"""Return the decade given a datetime object."""
# Extract month and day from datetime object
dtg_mm = int(dt.month)
dtg_dd = int(dt.day)
# Determine decades_mm and decades_dd based on dtg_dd
if dtg_dd < 9:
decades_mm = dtg_mm
decades_dd = 5
elif 8 < dtg_dd < 19:
decades_mm = dtg_mm
decades_dd = 15
elif 18 < dtg_dd < 29:
decades_mm = dtg_mm
decades_dd = 25
else:
decades_mm = dtg_mm + 1
if decades_mm == 13:
decades_mm = 1
decades_dd = 5
decades_mm = f"{decades_mm:02d}"
decades_dd = f"{decades_dd:02d}"
return f"{decades_mm}{decades_dd}"
[docs]
def get_decadal_list(dt_start, dt_end) -> list:
"""Return a list of dates for which decadal pgd files have to be created."""
# check decade of start and end of period
dt_start0 = dt_start.replace(hour=0, minute=0, second=0)
dt_end0 = dt_end.replace(hour=0, minute=0, second=0)
start_decade = get_decade(dt_start0)
end_decade = get_decade(dt_end0)
decades = {}
decades[start_decade] = dt_start0
if start_decade != end_decade:
# More than one decade is covered by period.
for x in range(1, (dt_end0 - dt_start0).days + 1, 1):
date = dt_start0 + as_timedelta(f"P{x}D")
decade = get_decade(date)
if decade not in decades:
decades[decade] = date
return list(decades.values())
[docs]
def get_month_list(start, end) -> list:
"""Get list of months between to given dates (input as string)."""
str_month_list = pd.date_range(start, end, freq="MS").strftime("%m").tolist()
month_list = [int(i) for i in str_month_list]
if len(month_list) == 0:
month_list = [int(as_datetime(start).month)]
else:
month_list.insert(0, int(as_datetime(start).month))
return month_list
[docs]
def evaluate_date(date: str) -> str:
"""Parses an ISO 8601 datetime and/or duration and returns the computed datetime.
- If the input is a datetime (e.g., "2025-03-19T00:00:00Z"), it returns it as-is.
- If the input includes a duration (e.g., "2025-03-19T00:00:00Z/-P1D"), it applies
the duration to the datetime.
- If the input is only a duration (e.g., "-P1D"), it applies it to today's midnight
(UTC).
Args:
date (str): An ISO 8601 datetime, duration, or both.
Returns:
str: The computed datetime in ISO format.
"""
if "/" in date:
datetime_part, duration_part = date.split("/")
return (
(
isodate.parse_datetime(datetime_part)
+ isodate.parse_duration(duration_part)
)
.isoformat(timespec="seconds")
.replace("+00:00", "Z")
)
if date.startswith(("P", "-P")):
today_midnight = datetime.now(timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0
)
return (
(today_midnight + isodate.parse_duration(date))
.isoformat(timespec="seconds")
.replace("+00:00", "Z")
)
return (
isodate.parse_datetime(date).isoformat(timespec="seconds").replace("+00:00", "Z")
)