Source code for ocha_stratus.cogs

import logging
import re
from typing import List, Literal, Optional, Union

import geopandas as gpd
import pandas as pd
import rioxarray  # noqa
import tqdm
import xarray as xr

from .azure_blob import get_container_client, open_blob_cog

logger = logging.getLogger(__name__)


def _parse_date(filename):
    """
    Parses the date based on a COG filename.
    """
    res = re.search("([0-9]{4}-[0-9]{2}-[0-9]{2})", filename)
    try:
        output_date = pd.to_datetime(res[0]).strftime("%Y-%m-%d")
        return output_date
    except Exception:
        return None


[docs] def stack_cogs( dataset: Literal["imerg", "seas5", "era5", "floodscan"], dates: Union[List[str], List], stage: str = "prod", clip_gdf: Optional[gpd.GeoDataFrame] = None, mode: Literal["interactive", "pipeline"] = "interactive", ) -> xr.Dataset: """ Stack Cloud Optimized GeoTIFFs (COGs) from Azure Blob Storage into a single xarray Dataset. Retrieves and combines multiple COG files for a specified dataset and date range from Azure Blob Storage, returning a unified xarray Dataset with temporal and optional leadtime dimensions. Parameters ---------- dataset : {"imerg", "seas5", "era5", "floodscan"} Name of the dataset to retrieve COGs for. Used as prefix for blob name filtering. dates : List[str] or List Collection of dates to filter COGs by. Should match 'YYYY-MM-DD' format. Will reference the issued date of the dataset (although for non-forecast datasets this is equivalent to the valid date). clip_gdf : GeoDataFrame, optional GeoPandas DataFrame containing geometries to clip the COGs to. If provided, each COG will be clipped to the union of all geometries in the DataFrame before stacking. The GeoDataFrame should be in the same CRS as the COGs. stage : str, optional Deployment stage for the container client, by default "prod". Determines which Azure storage environment to connect to. mode : {"interactive", "pipeline"}, optional Processing mode, by default "interactive". If "interactive", displays a progress bar using tqdm during processing. Returns ------- xarray.Dataset Combined dataset with all COGs stacked along temporal dimensions. Contains 'date' dimension and optional 'leadtime' dimension if present in the source data. Attributes from individual COGs are dropped during combination. If clip_gdf is provided, data will be clipped to the specified geometries. Raises ------ Exception If no COGs are found matching the specified dataset and dates. Warnings -------- Logs a warning if the number of found COGs doesn't match the number of input dates, indicating some requested dates may not have available data. Notes ----- - Only processes COGs containing "processed" in their filename - Handles both issued and valid date types based on COG metadata - Automatically expands dimensions to include 'date' and 'leadtime' (if present) - Uses `xr.combine_by_coords` to merge datasets, which requires consistent coordinate systems across all input COGs """ container = get_container_client("raster", stage=stage) clip_geometry = None if clip_gdf is not None: # Union all geometries in the GeoDataFrame clip_geometry = clip_gdf.geometry.unary_union logger.info(f"Clipping enabled with {len(clip_gdf)} geometries") cogs_list = [ x.name for x in container.list_blobs(name_starts_with=f"{dataset}/") if (_parse_date(x.name) in (dates)) & ("processed" in x.name) ] das = [] cogs_list = tqdm.tqdm(cogs_list) if mode == "interactive" else cogs_list logger.info(f"Stacking {len(cogs_list)} cogs...") if len(cogs_list) != len(dates): logger.warning("Not all COGs available, given input dates") if len(cogs_list) == 0: raise Exception(f"No COGs found to process for dates: {dates}") for cog in cogs_list: da_in = open_blob_cog( cog, container_name="raster", container_client=container ) if clip_geometry is not None: da_in = da_in.rio.clip([clip_geometry], drop=True) date_suffix = ( "valid" if da_in.attrs["month_issued"] == "None" else "issued" ) year_ = da_in.attrs[f"year_{date_suffix}"] month_ = str(da_in.attrs[f"month_{date_suffix}"]).zfill(2) day_ = ( "01" if da_in.attrs["date_valid"] == "None" else da_in.attrs["date_valid"] ) date_in = f"{year_}-{month_}-{day_}" da_in = da_in.squeeze(drop=True) da_in["date"] = date_in expand_dims = ["date"] if da_in.attrs["leadtime"] != "None": da_in["leadtime"] = da_in.attrs["leadtime"] expand_dims.append("leadtime") da_in = da_in.expand_dims(expand_dims) da_in = da_in.persist() das.append(da_in) return xr.combine_by_coords(das, combine_attrs="drop")