Source code for deode.tasks.sqlite
"""ExtractSQLite."""
import json
import os
import sqlite3
from pathlib import Path
from typing import Dict
import pandas as pd
from grib2sqlite import logger as sqlite_logger
from grib2sqlite import parse_grib_file
from deode.datetime_utils import as_datetime, oi2dt_list
from deode.eps.eps_setup import get_member_config
from deode.logs import LogDefaults, logger
from deode.tasks.base import Task
from deode.toolbox import Platform
[docs]
class ExtractSQLite(Task):
"""Extract sqlite point files."""
def __init__(self, config):
"""Construct ExtractSQLite object.
Args:
config (deode.ParsedConfig): Configuration
Raises:
FileNotFoundError: Required file not fount
"""
Task.__init__(self, config, __class__.__name__)
self.archive = self.platform.get_system_value("archive")
self.basetime = as_datetime(self.config["general.times.basetime"])
self.forecast_range = self.config["general.times.forecast_range"]
try:
self.infile_dt = self.config["extractsqlite.selection"]
except KeyError:
self.infile_dt = self.config["general.output_settings.fullpos"]
self.infile_template = self.config["file_templates.fullpos.archive"]
self.sqlite_path = self.platform.substitute(
self.config["extractsqlite.sqlite_path"]
)
self.sqlite_template = self.platform.substitute(
self.config["extractsqlite.sqlite_template"]
)
self.model_name = self.platform.substitute(
self.config["extractsqlite.sqlite_model_name"]
)
extraction_list = self.config["extractsqlite.parameter_list"]
# NOTE: If you provide an empty list, it is replaced by the default!
# If you don't want SQLite extraction, turn off the switch in stead.
# Not a perfect situation, but I think its OK for now.
if len(extraction_list) == 0:
is_ensemble = len(self.config["eps.general.members"]) > 1
if is_ensemble:
logger.info("Using default eps parameter extraction.")
extraction_list = self.config["extractsqlite.parameter_list_default_eps"]
else:
logger.info("Using default deterministic parameter extraction.")
extraction_list = self.config["extractsqlite.parameter_list_default_det"]
self.n_list = len(extraction_list)
logger.info("Found {} extraction lists.", self.n_list)
self.param_file = []
self.station_file = []
for i in range(self.n_list):
station_file = self.platform.substitute(extraction_list[i]["location_file"])
if not os.path.isfile(station_file):
raise FileNotFoundError(f" missing {station_file}")
self.station_file.append(station_file)
param_file = self.platform.substitute(extraction_list[i]["param_file"])
if not os.path.isfile(param_file):
raise FileNotFoundError(f" missing {param_file}")
self.param_file.append(param_file)
self.output_settings = self.config["general.output_settings"]
[docs]
def execute(self):
"""Execute ExtractSQLite on all output files."""
# split into "combined" and "direct" parameters
# loop over lead times
# Also allow multiple extraction lists, typically UA and SFC parameters
# with different station lists.
dt_list = oi2dt_list(self.infile_dt, self.forecast_range)
for i in range(self.n_list):
logger.info("SQLITE: parameter file {}", self.param_file[i])
logger.info("SQLITE: station file {}", self.station_file[i])
station_list = pd.read_csv(self.station_file[i], skipinitialspace=True)
with open(self.param_file[i], "r", encoding="utf-8") as pf:
param_list = json.load(pf)
for dt in dt_list:
infile = self.platform.substitute(
os.path.join(self.archive, self.infile_template),
validtime=self.basetime + dt,
)
if not os.path.isfile(infile):
raise FileNotFoundError(f" missing {infile}")
logger.info("SQLITE EXTRACTION: {}", infile)
loglevel = self.config.get("general.loglevel", LogDefaults.LEVEL).upper()
sqlite_logger.setLevel(loglevel)
parse_grib_file(
infile=infile,
param_list=param_list,
station_list=station_list,
sqlite_template=self.sqlite_path + "/" + self.sqlite_template,
model_name=self.model_name,
weights=None,
)
[docs]
class MergeSQLites(Task):
"""Extract sqlite point files."""
def __init__(self, config):
"""Construct ExtractSQLite object.
Args:
config (deode.ParsedConfig): Configuration
Raises:
FileNotFoundError: Required file not found
"""
Task.__init__(self, config, __class__.__name__)
[docs]
def execute(self):
"""Execute MergeSQLite on all member FC files."""
sqlite_template = (
self.platform.substitute(self.config["extractsqlite.sqlite_template"])
.replace("{", "@")
.replace("}", "@")
)
merged_sqlite_path = Path(
self.platform.substitute(self.config["extractsqlite.merged_sqlite_path"])
)
model_name = self.platform.substitute(
self.config["extractsqlite.sqlite_model_name"]
)
extraction_list = self.config["extractsqlite.parameter_list"]
if len(extraction_list) == 0:
is_ensemble = len(self.config["eps.general.members"]) > 1
if is_ensemble:
# NOTE: in this context, we expect eps data, of course.
logger.info("Using default eps parameter extraction.")
extraction_list = self.config["extractsqlite.parameter_list_default_eps"]
else:
logger.info("Using default deterministic parameter extraction.")
extraction_list = self.config["extractsqlite.parameter_list_default_det"]
n_list = len(extraction_list)
logger.info("Found {} extraction lists.", n_list)
for i in range(n_list):
paramfile = self.platform.substitute(extraction_list[i]["param_file"])
if not os.path.isfile(paramfile):
raise FileNotFoundError(f"Missing parameter file: {paramfile}")
logger.info("Parameter list {}: {}", i + 1, paramfile)
with open(paramfile, "r", encoding="utf-8") as file_:
parameter_list = json.load(file_)
for param in parameter_list:
harp_param = param["harp_param"]
# Get param specific sqlite template and path
sqlite_param_template = sqlite_template.replace("@PP@", harp_param)
sqlite_file = Path(self.platform.substitute(sqlite_param_template))
merged_sqlite_file_path = merged_sqlite_path / sqlite_file
# Prepare sqlite file paths for every member
sqlite_file_dict: Dict[int, str] = {}
for member in self.config["eps.general.members"]:
member_config = get_member_config(self.config, member)
member_path = Platform(member_config).substitute(
member_config["extractsqlite.sqlite_path"]
)
full_sqlite_path = Path(member_path) / sqlite_file
sqlite_file_dict[member] = str(full_sqlite_path)
logger.info(
f"Files to merge for parameter {harp_param} are:\n"
+ json.dumps(sqlite_file_dict, indent=4)
)
logger.info(f"Merged filepath: {merged_sqlite_file_path}")
# Initialize the merged DataFrame
df_merged: pd.DataFrame | None = None
for member, file_path in sqlite_file_dict.items():
if not os.path.exists(file_path):
logger.warning(
f"SQLite file not found for member {member}: {file_path}"
)
continue
with sqlite3.connect(file_path) as connection:
df_member = pd.read_sql_query("SELECT * FROM FC", connection)
# Find column that contains model_name
value_cols = [col for col in df_member.columns if model_name in col]
if not value_cols:
logger.warning(
f"No value column containing '{model_name}' \
found in {file_path}"
)
continue
if len(value_cols) > 1:
raise ValueError(f"Multiple value columns found in {file_path}")
df_member = df_member.rename(
columns={value_cols[0]: f"{model_name}_mbr{member:03d}"}
)
# Merge dataframes by keys that do not contain model_name
merge_keys = [
col for col in df_member.columns if model_name not in col
]
if df_merged is None:
df_merged = df_member
else:
df_merged = df_merged.merge(df_member, on=merge_keys, how="inner")
if df_merged is not None:
# Reorder columns: keys first, *_mbrXXX columns at the end
key_cols = [
col
for col in df_merged.columns
if not col.endswith(
tuple(f"_mbr{mbr:03d}" for mbr in sqlite_file_dict)
)
]
mbr_cols = [col for col in df_merged.columns if col not in key_cols]
df_merged = df_merged[key_cols + mbr_cols]
# Write merged dataframe to file
os.makedirs(merged_sqlite_file_path.parent, exist_ok=True)
with sqlite3.connect(merged_sqlite_file_path) as connection:
df_merged.to_sql(
"FC", connection, if_exists="replace", index=False
)
logger.info(f"Merged file written to: {merged_sqlite_file_path}")
else:
logger.warning(f"No data merged for parameter {harp_param}")