"""Topography and SOILGRID data preparation tasks."""
import os
import shutil
import sys
from ..geo_utils import Projection, Projstring
from ..logs import logger
from ..os_utils import Search, deodemakedirs
from .base import Task
def _import_gdal():
"""Return imported gdal from osgeo. Utility func useful for debugging and testing."""
try:
from osgeo import gdal
return gdal
except ImportError as error:
msg = "Cannot use the installed gdal library, "
msg += "or there is no gdal library installed. "
msg += "If you have not installed it, you may want to try running"
msg += " 'pip install pygdal==\"`gdal-config --version`.*\"' "
msg += "or, if you use conda,"
msg += " 'conda install -c conda-forge gdal'."
raise ImportError(msg) from error
[docs]
class Topography(Task):
"""Topography data preparation."""
def __init__(self, config):
"""Init Topography.
Args:
config (Config): Config object
"""
self.domain = self.get_domain_properties(config)
Task.__init__(self, config, __class__.__name__)
self.topo_source = self.fmanager.platform.get_platform_value(
"topo_source", alt="gmted2010"
)
self.topo_data_path = self.fmanager.platform.get_platform_value("topo_data_path")
[docs]
def get_domain_properties(self, config) -> dict:
"""Get domain properties.
Args:
config (Config): Config object
Returns:
dict: Domain properties
"""
domain = {
"nlon": config["domain.njmax"],
"nlat": config["domain.nimax"],
"latc": config["domain.xlatcen"],
"lonc": config["domain.xloncen"],
"lat0": config["domain.xlat0"],
"lon0": config["domain.xlon0"],
"gsize": config["domain.xdx"],
}
return domain
[docs]
@staticmethod
def tif2bin(gd, bin_file) -> None:
"""Convert tif file to binary file used by surfex.
Args:
gd: gdal dataset
bin_file (str): Binary file
"""
band = gd.GetRasterBand(1)
with open(bin_file, "wb") as f:
for iy in range(gd.RasterYSize):
data = band.ReadAsArray(0, iy, gd.RasterXSize, 1)
sel = data == -32768
data[sel] = 0
data.byteswap().astype("int16").tofile(f)
[docs]
def execute(self):
"""Run task.
Define run sequence.
"""
climdir = self.platform.get_system_value("climdir")
unix_group = self.platform.get_platform_value("unix_group")
deodemakedirs(climdir, unixgroup=unix_group)
if self.topo_source == "gmted2010":
# Process GMTED2010 geotif files
projstr = Projstring().get_projstring(
lon0=self.domain["lon0"], lat0=self.domain["lat0"]
)
proj = Projection(projstr)
domain_properties = proj.get_domain_properties(self.domain)
tif_files, hdr_east, hdr_west, hdr_south, hdr_north = self.define_gmted_input(
domain_properties
)
# Output merged GMTED file to working directory as file gmted_mea075.tif
gdal = _import_gdal()
gd = gdal.Warp(
"gmted_mea075.tif",
tif_files,
format="GTiff",
options=["COMPRESS=LZW", "TILED=YES"],
)
Topography.tif2bin(gd, "gmted_mea075.bin")
shutil.move("gmted_mea075.bin", f"{climdir}/gmted2010.dir")
# Get number of rows and columns
hdr_rows = gd.RasterYSize
hdr_cols = gd.RasterXSize
gd = None
# Create the header file
header_file = f"{climdir}/gmted2010.hdr"
logger.debug("Write header file {}", header_file)
Topography.write_gmted_header_file(
header_file, hdr_north, hdr_south, hdr_west, hdr_east, hdr_rows, hdr_cols
)
# Create symlinks to generic topo files
topo_dir_target = f"{climdir}/gmted2010.dir"
topo_hdr_target = f"{climdir}/gmted2010.hdr"
else:
# Use custom topography files provided by user
# topo_source is the base filename (without extension)
topo_dir_target = f"{self.topo_data_path}/{self.topo_source}.dir"
topo_hdr_target = f"{self.topo_data_path}/{self.topo_source}.hdr"
if not os.path.isfile(topo_dir_target):
logger.error("Custom topography .dir file not found: {}", topo_dir_target)
sys.exit(1)
if not os.path.isfile(topo_hdr_target):
logger.error("Custom topography .hdr file not found: {}", topo_hdr_target)
sys.exit(1)
logger.info(
"Using custom topography files: {} and {}",
topo_dir_target,
topo_hdr_target,
)
# Create symlinks topo.dir and topo.hdr pointing to the appropriate files
# using filemanager for consistency with other tasks
self.fmanager.input(
topo_dir_target,
f"{climdir}/topo.dir",
check_archive=False,
provider_id="symlink",
)
self.fmanager.input(
topo_hdr_target,
f"{climdir}/topo.hdr",
check_archive=False,
provider_id="symlink",
)
[docs]
class Soil(Task):
"""Prepare soil data task for PGD."""
def __init__(self, config):
"""Construct soil data object.
Args:
config (deode.ParsedConfig): Configuration
"""
self.domain = self.get_domain_properties(config)
Task.__init__(self, config, __class__.__name__)
logger.debug("Constructed Soil task")
[docs]
def get_domain_properties(self, config) -> dict:
"""Get domain properties.
Args:
config (deode.ParsedConfig): Configuration
Returns:
dict: Domain properties
"""
domain = {
"nlon": config["domain.njmax"],
"nlat": config["domain.nimax"],
"latc": config["domain.xlatcen"],
"lonc": config["domain.xloncen"],
"lat0": config["domain.xlat0"],
"lon0": config["domain.xlon0"],
"gsize": config["domain.xdx"],
}
return domain
[docs]
@staticmethod
def check_domain_validity(domain_properties: dict) -> None:
"""Check if domain is valid.
Args:
domain_properties (dict): Dict with domain properties
Raises:
ValueError: If domain is outside soilgrid data area
"""
# Area available from soilgrid data
glo_north = 84.0
glo_south = -56.0
glo_east = 180.0
glo_west = -180.0
is_outside = bool(
domain_properties["minlon"] < glo_west
or domain_properties["minlon"] > glo_east
or domain_properties["maxlon"] < glo_west
or domain_properties["maxlon"] > glo_east
or domain_properties["minlat"] < glo_south
or domain_properties["minlat"] > glo_north
or domain_properties["maxlat"] < glo_south
or domain_properties["maxlat"] > glo_north
)
if is_outside:
raise ValueError("Domain outside soilgrid data area")
[docs]
@staticmethod
def coordinates_for_cutting_dataset(
domain_properties: dict, halo: float = 5.0
) -> tuple:
"""Get coordinates for cutting dataset.
Args:
domain_properties (dict): Dict with domain properties
halo (float): Halo. Defaults to 5.0.
Returns:
tuple: Coordinates for cutting dataset
"""
cut_west = domain_properties["minlon"] - halo
cut_east = domain_properties["maxlon"] + halo
cut_south = domain_properties["minlat"] - halo
cut_north = domain_properties["maxlat"] + halo
return cut_west, cut_east, cut_south, cut_north
[docs]
def execute(self):
"""Run task.
Define run sequence.
Raises:
FileNotFoundError: If no tif files are found.
"""
logger.debug("Running soil task")
soilgrid_path = self.fmanager.platform.get_platform_value("SOILGRID_DATA_PATH")
soilgrid_tifs = Search.find_files(soilgrid_path, postfix=".tif", fullpath=True)
if len(soilgrid_tifs) == 0:
raise FileNotFoundError(f"No soilgrid tifs found under {soilgrid_path}")
# symlink with filemanager from toolbox
for soilgrid_tif in soilgrid_tifs:
soilgrid_tif_basename = os.path.basename(soilgrid_tif)
self.fmanager.input(
soilgrid_tif,
soilgrid_tif_basename,
check_archive=False,
provider_id="symlink",
)
projstr = Projstring().get_projstring(
lon0=self.domain["lon0"], lat0=self.domain["lat0"]
)
proj = Projection(projstr)
domain_properties = proj.get_domain_properties(self.domain)
self.check_domain_validity(domain_properties)
# Get coordinates for cutting dataset
cut_lon, cut_east, cut_south, cut_north = self.coordinates_for_cutting_dataset(
domain_properties
)
# Cut soilgrid tifs
soilgrid_tif_subarea_files = []
find_size_and_corners = True
gdal = _import_gdal()
for soilgrid_tif in soilgrid_tifs:
soilgrid_tif_basename = os.path.basename(soilgrid_tif)
soilgrid_tif_subarea = soilgrid_tif_basename.replace(".tif", "_subarea.tif")
soilgrid_tif_subarea_files.append(soilgrid_tif_subarea)
ds = gdal.Open(soilgrid_tif_basename)
ds = gdal.Translate(
soilgrid_tif_subarea,
ds,
projWin=[cut_lon, cut_north, cut_east, cut_south],
)
if find_size_and_corners:
# Get number of rows and columns
hdr_rows = ds.RasterYSize
hdr_cols = ds.RasterXSize
# Get corners
gt = ds.GetGeoTransform()
hdr_west = gt[0]
hdr_north = gt[3]
hdr_east = hdr_west + hdr_cols * gt[1]
hdr_south = hdr_north + hdr_rows * gt[5]
find_size_and_corners = False
ds = None
climdir = self.platform.get_system_value("climdir")
unix_group = self.platform.get_platform_value("unix_group")
deodemakedirs(climdir, unixgroup=unix_group)
for subarea_file in soilgrid_tif_subarea_files:
if subarea_file.startswith("SNDPPT"):
ds = gdal.Open(subarea_file)
ds = gdal.Translate(
climdir + "/SAND_SOILGRID.dir",
ds,
format="EHdr",
outputType=gdal.GDT_Byte,
)
ds = None
elif subarea_file.startswith("CLYPPT"):
ds = gdal.Open(subarea_file)
ds = gdal.Translate(
climdir + "/CLAY_SOILGRID.dir",
ds,
format="EHdr",
outputType=gdal.GDT_Byte,
)
ds = None
elif subarea_file.startswith("SOC_TOP"):
ds = gdal.Open(subarea_file)
ds = gdal.Translate(
climdir + "/soc_top.dir", ds, format="EHdr", outputType=gdal.GDT_Int16
)
ds = None
elif subarea_file.startswith("SOC_SUB"):
ds = gdal.Open(subarea_file)
ds = gdal.Translate(
climdir + "/soc_sub.dir", ds, format="EHdr", outputType=gdal.GDT_Int16
)
ds = None
else:
logger.warning("Unknown soilgrid tif file: {}", subarea_file)
# Compose headers in surfex/pgd format
self.write_soil_header_file(
climdir + "/CLAY_SOILGRID.hdr",
"Clay",
hdr_north,
hdr_south,
hdr_west,
hdr_east,
hdr_rows,
hdr_cols,
nodata=0,
bits=8,
write_fact=False,
)
self.write_soil_header_file(
climdir + "/SAND_SOILGRID.hdr",
"Sand",
hdr_north,
hdr_south,
hdr_west,
hdr_east,
hdr_rows,
hdr_cols,
nodata=0,
bits=8,
write_fact=False,
)
self.write_soil_header_file(
climdir + "/soc_top.hdr",
"soc_top",
hdr_north,
hdr_south,
hdr_west,
hdr_east,
hdr_rows,
hdr_cols,
nodata=-9999,
bits=16,
write_fact=True,
)
self.write_soil_header_file(
climdir + "/soc_sub.hdr",
"soc_sub",
hdr_north,
hdr_south,
hdr_west,
hdr_east,
hdr_rows,
hdr_cols,
nodata=-9999,
bits=16,
write_fact=True,
)
logger.debug("Finished soil task")