Source code for deode.tasks.extractsqlite

"""ExtractSQLite."""

import json
import os

import pandas
from grib2sqlite import logger as sqlite_logger
from grib2sqlite import parse_grib_file

from ..datetime_utils import as_datetime, oi2dt_list
from ..logs import LogDefaults, logger
from .base import Task


[docs] class ExtractSQLite(Task): """Extract sqlite point files.""" def __init__(self, config): """Construct ExtractSQLite object. Args: config (deode.ParsedConfig): Configuration Raises: FileNotFoundError: Required file not fount """ Task.__init__(self, config, __class__.__name__) self.archive = self.platform.get_system_value("archive") self.basetime = as_datetime(self.config["general.times.basetime"]) self.forecast_range = self.config["general.times.forecast_range"] try: self.infile_dt = self.config["extractsqlite.selection"] except KeyError: self.infile_dt = self.config["general.output_settings.fullpos"] self.infile_template = self.config["file_templates.fullpos.archive"] self.sqlite_path = self.platform.substitute( self.config["extractsqlite.sqlite_path"] ) self.sqlite_template = self.platform.substitute( self.config["extractsqlite.sqlite_template"] ) self.model_name = self.platform.substitute( self.config["extractsqlite.sqlite_model_name"] ) self.stationfile = self.platform.substitute( self.config["extractsqlite.station_list"] ) if not os.path.isfile(self.stationfile): raise FileNotFoundError(f" missing {self.stationfile}") logger.info("Station list: {}", self.stationfile) paramfile = self.platform.substitute(self.config["extractsqlite.parameter_list"]) if not os.path.isfile(paramfile): raise FileNotFoundError(f" missing {paramfile}") logger.info("Parameter list: {}", paramfile) with open(paramfile) as pf: self.parameter_list = json.load(pf) pf.close() self.output_settings = self.config["general.output_settings"]
[docs] def execute(self): """Execute ExtractSQLite on all output files.""" # split into "combined" and "direct" parameters # loop over lead times dt_list = oi2dt_list(self.infile_dt, self.forecast_range) station_list = pandas.read_csv(self.stationfile, skipinitialspace=True) for dt in dt_list: infile = self.platform.substitute( os.path.join(self.archive, self.infile_template), validtime=self.basetime + dt, ) if not os.path.isfile(infile): raise FileNotFoundError(f" missing {infile}") logger.info("SQLITE EXTRACTION: {}", infile) loglevel = self.config.get("general.loglevel", LogDefaults.LEVEL).upper() sqlite_logger.setLevel(loglevel) parse_grib_file( infile=infile, param_list=self.parameter_list, station_list=station_list, sqlite_template=self.sqlite_path + "/" + self.sqlite_template, model_name=self.model_name, weights=None, )