Source code for deode.tasks.sqlite
"""ExtractSQLite."""
import json
import os
import sqlite3
from pathlib import Path
from typing import Dict
import pandas as pd
from grib2sqlite import logger as sqlite_logger
from grib2sqlite import parse_grib_file
from deode.datetime_utils import as_datetime, oi2dt_list
from deode.eps.eps_setup import get_member_config
from deode.logs import LogDefaults, logger
from deode.tasks.base import Task
from deode.toolbox import Platform
[docs]
class ExtractSQLite(Task):
"""Extract sqlite point files."""
def __init__(self, config):
"""Construct ExtractSQLite object.
Args:
config (deode.ParsedConfig): Configuration
Raises:
FileNotFoundError: Required file not fount
"""
Task.__init__(self, config, __class__.__name__)
self.archive = self.platform.get_system_value("archive")
self.basetime = as_datetime(self.config["general.times.basetime"])
self.forecast_range = self.config["general.times.forecast_range"]
try:
self.infile_dt = self.config["extractsqlite.selection"]
except KeyError:
self.infile_dt = self.config["general.output_settings.fullpos"]
self.infile_template = self.config["file_templates.fullpos.archive"]
self.sqlite_path = self.platform.substitute(
self.config["extractsqlite.sqlite_path"]
)
self.sqlite_template = self.platform.substitute(
self.config["extractsqlite.sqlite_template"]
)
self.model_name = self.platform.substitute(
self.config["extractsqlite.sqlite_model_name"]
)
self.stationfile = self.platform.substitute(
self.config["extractsqlite.station_list"]
)
if not os.path.isfile(self.stationfile):
raise FileNotFoundError(f" missing {self.stationfile}")
logger.info("Station list: {}", self.stationfile)
paramfile = self.platform.substitute(self.config["extractsqlite.parameter_list"])
if not os.path.isfile(paramfile):
raise FileNotFoundError(f" missing {paramfile}")
logger.info("Parameter list: {}", paramfile)
with open(paramfile, "r", encoding="utf-8") as pf:
self.parameter_list = json.load(pf)
self.output_settings = self.config["general.output_settings"]
[docs]
def execute(self):
"""Execute ExtractSQLite on all output files."""
# split into "combined" and "direct" parameters
# loop over lead times
dt_list = oi2dt_list(self.infile_dt, self.forecast_range)
station_list = pd.read_csv(self.stationfile, skipinitialspace=True)
for dt in dt_list:
infile = self.platform.substitute(
os.path.join(self.archive, self.infile_template),
validtime=self.basetime + dt,
)
if not os.path.isfile(infile):
raise FileNotFoundError(f" missing {infile}")
logger.info("SQLITE EXTRACTION: {}", infile)
loglevel = self.config.get("general.loglevel", LogDefaults.LEVEL).upper()
sqlite_logger.setLevel(loglevel)
parse_grib_file(
infile=infile,
param_list=self.parameter_list,
station_list=station_list,
sqlite_template=self.sqlite_path + "/" + self.sqlite_template,
model_name=self.model_name,
weights=None,
)
[docs]
class MergeSQLites(Task):
"""Extract sqlite point files."""
def __init__(self, config):
"""Construct ExtractSQLite object.
Args:
config (deode.ParsedConfig): Configuration
Raises:
FileNotFoundError: Required file not found
"""
Task.__init__(self, config, __class__.__name__)
[docs]
def execute(self):
"""Execute MergeSQLite on all member FC files."""
sqlite_template = (
self.platform.substitute(self.config["extractsqlite.sqlite_template"])
.replace("{", "@")
.replace("}", "@")
)
merged_sqlite_path = Path(
self.platform.substitute(self.config["extractsqlite.merged_sqlite_path"])
)
model_name = self.platform.substitute(
self.config["extractsqlite.sqlite_model_name"]
)
paramfile = self.platform.substitute(self.config["extractsqlite.parameter_list"])
if not os.path.isfile(paramfile):
raise FileNotFoundError(f"Missing parameter file: {paramfile}")
logger.info("Parameter list: {}", paramfile)
with open(paramfile, "r", encoding="utf-8") as file_:
parameter_list = json.load(file_)
for param in parameter_list:
harp_param = param["harp_param"]
# Get param specific sqlite template and path
sqlite_param_template = sqlite_template.replace("@PP@", harp_param)
sqlite_file = Path(self.platform.substitute(sqlite_param_template))
merged_sqlite_file_path = merged_sqlite_path / sqlite_file
# Prepare sqlite file paths for every member
sqlite_file_dict: Dict[int, str] = {}
for member in self.config["eps.general.members"]:
member_config = get_member_config(self.config, member)
member_path = Platform(member_config).substitute(
member_config["extractsqlite.sqlite_path"]
)
full_sqlite_path = Path(member_path) / sqlite_file
sqlite_file_dict[member] = str(full_sqlite_path)
logger.info(
f"Files to merge for parameter {harp_param} are:\n"
+ json.dumps(sqlite_file_dict, indent=4)
)
logger.info(f"Merged filepath: {merged_sqlite_file_path}")
# Initialize the merged DataFrame
df_merged: pd.DataFrame | None = None
for member, file_path in sqlite_file_dict.items():
if not os.path.exists(file_path):
logger.warning(
f"SQLite file not found for member {member}: {file_path}"
)
continue
with sqlite3.connect(file_path) as connection:
df_member = pd.read_sql_query("SELECT * FROM FC", connection)
# Find column that contains model_name
value_cols = [col for col in df_member.columns if model_name in col]
if not value_cols:
logger.warning(
f"No value column containing '{model_name}' found in {file_path}"
)
continue
if len(value_cols) > 1:
raise ValueError(f"Multiple value columns found in {file_path}")
df_member = df_member.rename(
columns={value_cols[0]: f"{model_name}_mbr{member:03d}"}
)
# Merge dataframes by keys that do not contain model_name
merge_keys = [col for col in df_member.columns if model_name not in col]
if df_merged is None:
df_merged = df_member
else:
df_merged = df_merged.merge(df_member, on=merge_keys, how="inner")
if df_merged is not None:
# Reorder columns: keys first, *_mbrXXX columns at the end
key_cols = [
col
for col in df_merged.columns
if not col.endswith(
tuple(f"_mbr{mbr:03d}" for mbr in sqlite_file_dict)
)
]
mbr_cols = [col for col in df_merged.columns if col not in key_cols]
df_merged = df_merged[key_cols + mbr_cols]
# Write merged dataframe to file
os.makedirs(merged_sqlite_file_path.parent, exist_ok=True)
with sqlite3.connect(merged_sqlite_file_path) as connection:
df_merged.to_sql("FC", connection, if_exists="replace", index=False)
logger.info(f"Merged file written to: {merged_sqlite_file_path}")
else:
logger.warning(f"No data merged for parameter {harp_param}")