Source code for deode.namelist

#!/usr/bin/env python3
"""Namelist handling for MASTERODB w/SURFEX."""
import ast
import copy
import os
import re
import subprocess
from collections import OrderedDict
from pathlib import Path

import f90nml
import yaml
from omegaconf import OmegaConf
from omegaconf.listconfig import ListConfig

from .config_parser import ConfigPaths
from .datetime_utils import as_timedelta, oi2dt_list
from .logs import logger
from .toolbox import Platform


[docs] def flatten_cn(li): """Recursively flatten a list of lists (of lists).""" for x in li: if isinstance(x, (list, ListConfig)): yield from flatten_cn(x) else: yield x
[docs] def to_dict(x): """Recursively modify OrderedDict etc. to normal dict (required for OmegaConf).""" # Namelist and OrderedDict both inherit from dict if isinstance(x, dict): return {k: to_dict(v) for k, v in x.items()} if isinstance(x, list): return [to_dict(k) for k in x] return x
[docs] def find_value(s): """Purpose: un-quote (list of) numbers and booleans.""" if isinstance(s, list): return [find_value(x) for x in s] if isinstance(s, dict): # This happens if the namelist is read with f90nml # any name with "%" is parsed as a sub-namelist # so an extra level in the structure logger.debug("EVALUATE: SUB-DICT") return {k: find_value(v) for k, v in s.items()} if not isinstance(s, str): return s if s.lower() == "true": logger.debug("EVALUATE BOOLEAN: {} -> {}", s, True) result = True elif s.lower() == "false": logger.debug("EVALUATE BOOLEAN: {} -> {}", s, False) result = False else: try: result = ast.literal_eval(s) logger.debug("EVALUATE: {} -> {}", s, result) except (ValueError, SyntaxError): logger.debug("UN-EVALUATED: {}", s) result = s if isinstance(result, tuple): result = list(result) if isinstance(result, list): logger.debug("EVALUATE NESTED: {} -> {}") result = [find_value(x) for x in result] return result
[docs] def list_set_at_index(li, ix, val): """Handle 'sparse' lists (that may contain null values).""" try: li[ix] = val except IndexError: for _ in range(ix - len(li) + 1): li.append(None) li[ix] = val
[docs] def represent_ordereddict(dumper, data): """YAML representer, simplifies working with namelists read by f90nml.""" my_od = [] for k, v in data.items(): # Convert plain keys to upper case, but leave the internal ones in lower ku = k if k.startswith("_") else k.upper() km = dumper.represent_data(ku) vm = dumper.represent_data(v) my_od.append((km, vm)) return yaml.nodes.MappingNode("tag:yaml.org,2002:map", my_od)
[docs] def write_namelist(nml, output_file): """Write namelist using f90nml. Args: nml (f90nml.Namelist): namelist to write output_file (str) : namelist file name """ if isinstance(nml, dict): nml = f90nml.Namelist(nml) # Write result. nml.uppercase = True nml.true_repr = ".TRUE." nml.false_repr = ".FALSE." nml.end_comma = True # AD: temp fix for IO_SERVER bug nml.write(output_file, force=True) logger.debug("Wrote: {}", output_file)
[docs] class InvalidNamelistKindError(ValueError): """Custom exception."""
[docs] class InvalidNamelistTargetError(ValueError): """Custom exception."""
[docs] class NamelistComparator: """Helper class for namelist generation and integration.""" def __init__(self, config): """Construct the comparator. Args: config (deode.ParsedConfig): Configuration Raises: SystemExit """ self.config = config self.platform = Platform(config)
[docs] def compare_dicts(self, dbase, dcomp, action): """Compare two dictionaries, recursively if needed. The dict must only consist of dict, list, str, float, int, bool Args: dbase: base dict dcomp: comparison dict action(str): one of 'intersection', 'diff' or 'union' Returns: dout: result dict from the specified comparison action Raises: SystemExit # noqa: DAR401 """ if action not in ("intersection", "diff", "union"): raise SystemExit(f"Unknown action {action}") dout = {} dout["_start_index"] = {} for key in dbase: valb = dbase[key] if key in dcomp: valc = dcomp[key] if isinstance(valb, dict) and isinstance(valc, dict): if not key.startswith("_"): # Invoke ourselves recursively msg = f"recursive dict comp for {key}" logger.debug(msg) dout[key] = self.compare_dicts(dbase[key], dcomp[key], action) elif isinstance(valb, list) and isinstance(valc, list): kl = key.lower() try: sib = dbase["_start_index"][kl] except KeyError: sib = [1] try: sic = dcomp["_start_index"][kl] except KeyError: sic = [1] # Start index in output depends on the action, thus: sio = [-999 for _ in range(len(sib))] msg = f"Compare lists {key}, start indices {sib}, {sic}" logger.debug(msg) dout[key] = self.compare_lists( dbase[key], dcomp[key], sib, sic, sio, action ) dout["_start_index"][kl] = sio elif valc == valb: # This should be a scalar, with same value msg = f"{key} : {valb} == {valc}" logger.debug(msg) if action != "diff": dout[key] = valc else: # also scalar, but values differ msg = f"{key} : {valb} /= {valc}" logger.debug(msg) if action != "intersection" and valc is not None: dout[key] = valc elif action == "union": # Key only found in base dout[key] = valb elif action == "diff": # Mark for deletion in case of 'union' dout[key] = None if action != "intersection": # Add keys not in base for key in dcomp: if key not in dbase: dout[key] = dcomp[key] # Remove empty namelists in diffs if action == "diff": todel = [] for key in dout: if isinstance(dout[key], dict) and len(dout[key]) == 0: todel.append(key) # noqa: PERF401 # Delayed deletion to avoid "dictionary changed size during iteration" for key in todel: del dout[key] # Avoid empty _start_index if "_start_index" in dout and not dout["_start_index"]: del dout["_start_index"] return dout
[docs] def compare_lists(self, libase, licomp, sib, sic, sio, action): """Compare two lists, recursively if needed. The list must only consist of dict, list, str, float, int, bool Args: libase: base list licomp: comparison list sib: list of start indices for libase sic: list of start indices for licomp sio: list of start indices for liout (modified) action(str): one of 'intersection', 'diff' or 'union' Returns: liout: result list from the specified comparison action Raises: SystemExit # noqa: DAR401 """ if action not in ("intersection", "diff", "union"): raise SystemExit(f"Unknown action {action}") liout = [] if action == "intersection": sio[0] = max(sib[0], sic[0]) else: sio[0] = min(sib[0], sic[0]) for ib in range(len(libase)): ic = ib + sib[0] - sic[0] io = ib + sib[0] - sio[0] valb = libase[ib] if ic in range(len(licomp)): valc = licomp[ic] if isinstance(valb, dict) and isinstance(valc, dict): list_set_at_index( liout, io, self.compare_dicts(libase[ib], licomp[ic], action) ) elif isinstance(valb, list) and isinstance(valc, list): # Invoke ourselves recursively list_set_at_index( liout, io, self.compare_lists( libase[ib], licomp[ic], sib[1:], sic[1:], sio[1:], action ), ) elif valc == valb: # Scalar, same value if action != "diff": list_set_at_index(liout, io, valc) elif action != "intersection": # Scalar, different value list_set_at_index(liout, io, valc) elif action == "union": list_set_at_index(liout, io, valb) elif action == "diff": # Mark for deletion in case of 'union' list_set_at_index(liout, io, None) if action != "intersection": # Add elements not in base for ic in range(len(licomp)): ib = ic + sic[0] - sib[0] io = ic + sic[0] - sio[0] if ib not in range(len(libase)): list_set_at_index(liout, io, licomp[ic]) return liout
[docs] class NamelistGenerator: """Fortran namelist generator based on hierarchical merging of (yaml) dicts.""" def __init__(self, config, kind, substitute=True): """Construct the generator. Args: config (deode.ParsedConfig): Configuration kind (str): one of 'master' or 'surfex' substitute (boolean): flag for substitution Raises: InvalidNamelistKindError # noqa: DAR401 """ if kind not in ("master", "surfex"): raise InvalidNamelistKindError(kind) self.config = config self.platform = Platform(config) self.kind = kind self.substitute = substitute self.nlcomp = NamelistComparator(config) self.cycle = self.config["general.cycle"] self.cnfile = ConfigPaths.path_from_subpath(f"{self.cycle}/assemble_{kind}.yml") self.nlfile = ConfigPaths.path_from_subpath(f"{self.cycle}/{kind}_namelists.yml") self.domain_name = self.config["domain.name"] self.accept_static_namelist = self.config["general.accept_static_namelists"]
[docs] def load_user_namelist(self): """Read user provided namelist. Returns: found (boolean) : Logics if namelist is found or not nldict : Namelist as dictionary cndict : Rules for dictionary """ namelists = self.platform.get_system_value("namelists") ref_namelist = f"{namelists}/namelist_{self.kind}_{self.target}" logger.debug("Check if reference namelist {} exists", ref_namelist) if os.path.isfile(ref_namelist): logger.info("Use reference namelist {}", ref_namelist) nl = f90nml.read(ref_namelist) target = "user_namelist" # NOTE: f90nml.todict() returns OrderedDict # which makes OmegaConf fail. # but maybe we should consider to_dict(nl.todict()) ??? nldict = {target: to_dict(nl)} cndict = {self.target: [target]} found = False else: logger.warning("No reference namelist {} exists.", ref_namelist) logger.warning("Fallback to yaml files") found = True nldict = {} cndict = {} return found, nldict, cndict
[docs] def fn_stepfreq(self, arg): """Resolve namelist function stepfreq.""" tstep = int(self.config["domain.tstep"]) freq = as_timedelta(arg) result = int(freq.seconds / tstep) return f"{result}"
[docs] def fn_steplist(self, time_intervals): """Resolve namelist function steplist.""" forecast_range = self.config["general.times.forecast_range"] tstep = int(self.config["domain.tstep"]) # default value: output_timesteps = [1, -1] # decode string into list time_intervals = find_value(time_intervals) dtlist = oi2dt_list(time_intervals, forecast_range) logger.debug("steplist: {} // {}", time_intervals, forecast_range) logger.debug("dtlist: {}", dtlist) output_timesteps = [ int((dt.days * 24 * 3600 + dt.seconds) / tstep) for dt in dtlist ] output_timesteps.insert(0, len(output_timesteps)) logger.debug("result: {}", output_timesteps) # NOTE: a resolver can not return a list # so turn into a string return f"{output_timesteps}"
[docs] def fn_config(self, arg, default=None): """Resolve namelist function cfg.""" try: _result = self.platform.config[arg] result = self.platform.substitute(_result) logger.debug("CFG INSERT: {} -> {}", arg, result) except KeyError: result = default if default is not None else arg logger.debug("CFG UNKNOWN: {} default {}", arg, default) # NOTE: all values are returned as STRINGS # which means you must re-interpret with find_val() # this is only really necessary for e.g. lists return f"{result}"
[docs] def resolve_macros(self, cn): """Resolve all macros in a nested dict (both keys and values!).""" if isinstance(cn, list): result = [self.platform.substitute(x) for x in cn] elif isinstance(cn, dict): result = { self.platform.substitute(x): self.resolve_macros(y) for x, y in cn.items() } else: result = self.platform.substitute(cn) return result
[docs] def load(self, target): """Generate the namelists for 'target'. Args: target (str): task to generate namelists for Raises: InvalidNamelistTargetError # noqa: DAR401 Returns: nlres (dict): Assembled namelist """ self.target = target # define OmegaConf resolvers OmegaConf.clear_resolvers() OmegaConf.register_new_resolver("stepfreq", lambda arg: self.fn_stepfreq(arg)) OmegaConf.register_new_resolver("steplist", lambda arg: self.fn_steplist(arg)) OmegaConf.register_new_resolver( "cfg", lambda arg, default=None: self.fn_config(arg, default) ) # Use static namelist if given use_yaml = True if self.accept_static_namelist: use_yaml, nldict0, cndict0 = self.load_user_namelist() nldict = OmegaConf.create(nldict0) cndict = OmegaConf.create(cndict0) if use_yaml: logger.info("Namelist generation input:") logger.info(" namelists: {}", self.nlfile) logger.info(" rules: {}", self.cnfile) # Read namelist file with all the categories with open(self.nlfile, mode="rt", encoding="utf-8") as file: nldict = OmegaConf.load(file) # Read file that describes assembly category order # for the various targets (tasks) with open(self.cnfile, mode="rt", encoding="utf-8") as file: cndict_m1 = yaml.safe_load(file) cndict_m2 = self.resolve_macros(cndict_m1) cndict = OmegaConf.create(cndict_m2) OmegaConf.resolve(cndict) # Check target is valid if target not in cndict: logger.warning( "Could not find target namelist '{}' in {}", target, str(self.cnfile) ) msg = "Available namelist targets:" for key in cndict: if not re.match(r"_.+", key): msg += " " + key + "," logger.warning(msg[:-1]) raise InvalidNamelistTargetError(target) self.nldict = nldict self.cndict = cndict logger.info("Namelist updating") self.update_from_config(target) return self.nldict, self.cndict
[docs] def assemble_namelist(self, target): """Generate the namelists for 'target'. Args: target (str): task to generate namelists for Returns: nlres (f90nml.Namelist): Assembled namelist """ nldict = self.nldict # Assemble the target namelists based on the given category order # also replace all macro's ("@XXX@") cnlist = [self.platform.substitute(x) for x in flatten_cn(self.cndict[target])] # Merge all the partial namelists logger.info("Namelist assembly sections {}", cnlist) nl_merged = OmegaConf.to_container( OmegaConf.merge(*[nldict[i] for i in cnlist if i in nldict]), resolve=self.substitute, ) # make sure that booleans, integers etc. are not represented as strings! if self.substitute: nlres = { n: {v: find_value(vx) for v, vx in nx.items()} for n, nx in nl_merged.items() } else: nlres = nl_merged logger.debug("FINAL NAMELIST: {}", nlres) return f90nml.Namelist(nlres)
[docs] def write_namelist(self, nml, output_file): """Write namelist using f90nml. Args: nml (f90nml.Namelist): namelist to write output_file (str) : namelist file name """ logger.info("Writing main namelist to {}", output_file) write_namelist(nml, output_file)
# NOTE: should also work with OmegaConf objects
[docs] def update_from_config(self, target): """Update with additional namelist dict from config. Args: target (str): task to generate namelists for """ # Try to update potential global settings first if target != "all_targets": self.update_from_config("all_targets") try: _update = self.config["namelist_update"][self.kind][target].dict() # Make sure everything is in upper case update = {} for namelist, keyval in _update.items(): nu = namelist.upper() update[nu] = {} for key, val in keyval.items(): update[nu][key.upper()] = val self.update(update, f"namelist_update_{target}") logger.info("Namelist update found for {} {}", self.kind, target) except KeyError: pass
# NOTE: should also work with OmegaConf objects
[docs] def update(self, nldict, cndict_tag): """Update with additional namelist dict. Args: nldict (dict): additional namelist dict cndict_tag: name to be used for recognition """ self.cndict[self.target].append(cndict_tag) self.nldict[cndict_tag] = nldict # maybe OmegaConf.create(nldict)
[docs] def generate_namelist(self, target, output_file): """Generate the namelists for 'target'. Args: target (str): task to generate namelists for output_file: where to write the result (fort.4 or EXSEG1.nam typically) """ logger.info("Generate namelist for: {}", target) self.load(target) nml = self.assemble_namelist(target) self.write_namelist(nml, output_file)
[docs] class NamelistIntegrator: """Helper class to read fortran namelists and store as yaml, in a more compact form. Reduces duplication if several namelists have similar settings. """ def __init__(self, config): """Construct the integrator. Args: config (deode.ParsedConfig): Configuration Raises: SystemExit # noqa: DAR401 """ self.config = config self.platform = Platform(config) yaml.add_representer(OrderedDict, represent_ordereddict)
[docs] def ftn2dict(self, ftnfile): """Read fortran namelist file with f90nml and return as plain dict.""" fnml = f90nml.read(ftnfile) # The internal representation used in f90nml is not easy to work with, thus ynml = yaml.safe_load(yaml.dump(fnml.todict(complex_tuple=True))) # Check if there are duplicated namelists # - ignore empty ones if they don't come first onml = {} dupl = {} for key in ynml: # Should not need to know the internal coding of f90nml :( if key.startswith("_grp_"): a = key[5:].rsplit("_", 1) namu = a[0].upper() nseq = int(a[1]) if nseq == 0: onml[namu] = ynml[key] else: lk = len(ynml[key]) if lk == 0: msg = f"Ignoring empty duplicate namelist {namu}" logger.warning(msg) else: dupl[namu] = lk msg = f"Found duplicate namelist {namu} with {lk} extra line(s)!" logger.warning(msg) else: onml[key] = ynml[key] if len(dupl) > 0: which = ", ".join(dupl) msg = ( f"The following namelists in {ftnfile} have non-empty duplicates: {which}" ) logger.warning(msg) return onml
[docs] @staticmethod def yml2dict(ymlfile): """Read yaml namelist file and return as dict.""" with open(ymlfile, mode="rt", encoding="utf-8") as file: ynml = yaml.safe_load(file) return ynml
[docs] @staticmethod def dict2yml(nmldict, ymlfile, ordered_sections=None): """Write dict as yaml file.""" with open(ymlfile, mode="wb") as file: if ordered_sections: for section in ordered_sections: if section in nmldict: output_dict = {} output_dict[section] = nmldict[section] yaml.dump( output_dict, file, encoding="utf-8", default_flow_style=False ) else: yaml.dump(nmldict, file, encoding="utf-8", default_flow_style=False)
[docs] class NamelistConverter: """Helper class to convert namelists between cycles, based on thenamelisttool."""
[docs] @staticmethod def get_known_cycles(): """Return the cycles handled by the converter.""" return ["CY48t2", "CY48t3", "CY49", "CY49t1", "CY49t2"]
[docs] @staticmethod def get_to_next_version_tnt_filenames(): """Return the tnt file names between get_known_cycles().""" return [ None, # CY48t2 to CY48t3 "cy48t2_to_cy49.yaml", # CY48t3 to CY49 "cy49_to_cy49t1.yaml", # CY49 to CY49t1 None, # CY49t1 to CY49t2 ]
[docs] @staticmethod def get_tnt_files_list(from_cycle, to_cycle): """Return the list of tnt directive files required for the conversion.""" # definitions of the conversion to apply between cycles tnt_directives_folder = ConfigPaths.path_from_subpath("tnt_directives") if from_cycle and to_cycle: known_cycles = NamelistConverter.get_known_cycles() to_next_version_tnt_filenames = ( NamelistConverter.get_to_next_version_tnt_filenames() ) try: start_index = known_cycles.index(from_cycle) except ValueError: raise SystemExit( f"ERROR: from-cycle {from_cycle} unknown" ) from ValueError try: target_index = known_cycles.index(to_cycle) except ValueError: raise SystemExit(f"ERROR: to-cycle {to_cycle} unknown") from ValueError # Verify that to_cycle is older than from_cycle if start_index > target_index: raise SystemExit( f"ERROR: No conversion possible between {from_cycle} and {to_cycle}" ) else: start_index = 0 target_index = 0 if start_index == target_index: # Apply empty conversion tnt_files = [tnt_directives_folder / "empty.yaml"] else: # Apply all the intermediate conversions tnt_files = [ tnt_directives_folder / to_next_version_tnt_filenames[index] for index in range(start_index, target_index) if to_next_version_tnt_filenames[index] ] return tnt_files
[docs] @staticmethod def convert_yml(input_yml, output_yml, from_cycle, to_cycle): """Convert a namelist in yml file between two cycles. Args: input_yml: the input yaml filename output_yml: the output yaml filename from_cycle: the input cycle to_cycle: the target cycle Raises: SystemExit: when conversion failed """ tnt_files = NamelistConverter.get_tnt_files_list(from_cycle, to_cycle) # Read the input namelist file (yaml) logger.info(f"Read {input_yml}") nmldict = NamelistIntegrator.yml2dict(Path(input_yml)) with open(Path(input_yml), mode="rt", encoding="utf-8") as file: ordered_sections = [ line.split(":")[0] for line in file.readlines() if ":" in line and line.split(":")[0] in nmldict ] for tnt_file in tnt_files: nmldict = NamelistConverter.apply_tnt_directives_to_namelist_dict( tnt_file, nmldict ) if not nmldict: raise SystemExit("Name list conversion failed. ") # Write the output namelist file (yaml) logger.info(f"Write {output_yml}") if "empty" in nmldict and "empty" not in ordered_sections: ordered_sections.append("empty") NamelistIntegrator.dict2yml(nmldict, Path(output_yml), ordered_sections)
[docs] @staticmethod def convert_ftn(input_ftn, output_ftn, from_cycle, to_cycle): """Convert a namelist in fortran file between two cycles. Args: input_ftn: the input fortran filename output_ftn: the output fortran filename from_cycle: the input cycle to_cycle: the target cycle """ tnt_files = NamelistConverter.get_tnt_files_list(from_cycle, to_cycle) logger.info(f"Read {input_ftn}") ftn_file = input_ftn temporary_files = [] for tnt_file in tnt_files: NamelistConverter.apply_tnt_directives_to_ftn_namelist(tnt_file, ftn_file) ftn_file = ftn_file + ".tnt" temporary_files.append(ftn_file) nl = f90nml.read(ftn_file) logger.info(f"Write {output_ftn}") write_namelist(nl, output_ftn) for file in temporary_files: os.remove(file)
[docs] @staticmethod def apply_tnt_directives_to_namelist_dict(tnt_directive_filename, namelist_dict): """Apply the tnt directives to a namelist as dictionary. Args: tnt_directive_filename: the tnt directive filename namelist_dict: the namelist dictionary Returns: new_namelist: the converted namelist dictionary Raises: SystemExit: when conversion failed """ logger.info(f"Apply {tnt_directive_filename}") # Open the directive file with open(tnt_directive_filename, mode="rt", encoding="utf-8") as file: tnt_directives = yaml.safe_load(file) file.close() # Use a copy to be able to modify dictionaries during iterations new_namelist = copy.deepcopy(namelist_dict) # Move keys from one section to another if "keys_to_move" in tnt_directives: for old_block in tnt_directives["keys_to_move"]: for old_key in tnt_directives["keys_to_move"][old_block]: for new_block in tnt_directives["keys_to_move"][old_block][old_key]: new_key = tnt_directives["keys_to_move"][old_block][old_key][ new_block ] for namelists_section in namelist_dict: for namelist_block in namelist_dict[namelists_section]: if ( old_block in namelist_block and old_key in namelist_dict[namelists_section][namelist_block] ): if new_block not in new_namelist[namelists_section]: new_namelist[namelists_section][new_block] = {} new_namelist[namelists_section][new_block][ new_key ] = namelist_dict[namelists_section][old_block][ old_key ] del new_namelist[namelists_section][old_block][ old_key ] if ( len(new_namelist[namelists_section][old_block]) == 0 ): del new_namelist[namelists_section][old_block] if "keys_to_set" in tnt_directives: for block_to_set in tnt_directives["keys_to_set"]: for namelists_section in namelist_dict: if namelists_section != "empty": for keys_to_set in tnt_directives["keys_to_set"][block_to_set]: if block_to_set in namelist_dict[namelists_section]: key_value = tnt_directives["keys_to_set"][block_to_set][ keys_to_set ] if key_value in [".T.", ".TRUE."]: key_value = True if key_value in [".F.", ".FALSE."]: key_value = False new_namelist[namelists_section][block_to_set][ keys_to_set ] = key_value else: logger.warning( f"'No {block_to_set} in {namelists_section}: \ skip insertion of {keys_to_set}'" ) # Creation of new blocks if "new_blocks" in tnt_directives: for new_block in tnt_directives["new_blocks"]: if "empty" not in new_namelist: new_namelist["empty"] = {} new_block_upper = new_block.upper() if new_block_upper not in new_namelist["empty"]: new_namelist["empty"][new_block_upper] = {} # Move of blocks(Not implemented) if "blocks_to_move" in tnt_directives: for blocks in tnt_directives["blocks_to_move"]: if blocks in namelist_block: raise SystemExit("conversion FAILED: blocks_to_move not implemented") # Delete keys if "keys_to_remove" in tnt_directives: for block_to_remove in tnt_directives["keys_to_remove"]: for namelists_section in namelist_dict: for namelist_block in namelist_dict[namelists_section]: if block_to_remove in namelist_block: for key_to_remove in tnt_directives["keys_to_remove"][ block_to_remove ]: if ( key_to_remove in namelist_dict[namelists_section][block_to_remove] ): del new_namelist[namelists_section][block_to_remove][ key_to_remove ] if ( len( new_namelist[namelists_section][ block_to_remove ] ) == 0 ): del new_namelist[namelists_section][ block_to_remove ] return new_namelist
[docs] @staticmethod def apply_tnt_directives_to_ftn_namelist(tnt_directive_filename, input_ftn): """Apply the tnt directives to a fotran namelist using tnt. Args: tnt_directive_filename: the tnt directive filename input_ftn: the namelist fortran file Raises: SystemExit: when conversion failed """ logger.info(f"Apply {tnt_directive_filename}") tnt_directives_folder = ConfigPaths.path_from_subpath( "tnt_directives", ) command = [ "tnt.py", "-d", tnt_directives_folder / tnt_directive_filename, input_ftn, ] try: subprocess.check_call(command) # noqa S603 except subprocess.CalledProcessError as exception: raise SystemExit(f"tnt failed with {exception!r}") from exception