Source code for ocha_stratus.datasources.cerf

import logging
from typing import Literal

import duckdb
import pandas as pd

from ..azure_blob import get_container_client

logger = logging.getLogger(__name__)

CERF_FNAME = "cerf/cerf_hdx_download.parquet"


[docs] def load_cerf_from_blob( iso3: str | None = None, stage: Literal["dev", "prod"] = "dev" ) -> pd.DataFrame: """ Load CERF funding data from Azure blob storage. Retrieves CERF (Central Emergency Response Fund) data stored as a Parquet file in Azure blob storage, with optional filtering by country ISO3 code. Data downloaded from https://data.humdata.org/dataset/cerf-allocations and manually transformed to parquet and uploaded to blob. Parameters ---------- iso3 : str or None, optional ISO3 country code to filter results. If None, returns all records. Default is None. stage : Literal["dev", "prod"], optional Environment stage to load from, by default "dev" Returns ------- pd.DataFrame DataFrame containing CERF funding data, optionally filtered by country. """ iso3 = iso3.upper() if iso3 else iso3 blob_client = get_container_client( container_name="global", stage=stage ).get_blob_client(CERF_FNAME) url = blob_client.url blob_properties = blob_client.get_blob_properties() last_modified = blob_properties.last_modified logger.info(f"CERF data last updated: {last_modified}") con = duckdb.connect() if iso3 is not None: df = con.execute( f"SELECT * FROM read_parquet('{url}') WHERE countryCode = $1", [iso3], ).df() else: df = con.execute(f"SELECT * FROM read_parquet('{url}')").df() return df