Source code for ocha_stratus.datasources.emdat
import logging
from typing import Literal
import duckdb
import pandas as pd
from ..azure_blob import get_container_client
logger = logging.getLogger(__name__)
EMDAT_FNAME = "emdat/processed/emdat_all.parquet"
[docs]
def load_emdat_from_blob(
iso3: str | None = None,
include_historic: bool = False,
stage: Literal["dev", "prod"] = "dev",
) -> pd.DataFrame:
"""
Load EM-DAT disaster data from Azure blob storage.
See here for a description of columns:
https://doc.emdat.be/docs/data-structure-and-content/emdat-public-table/#column-description
Parameters
----------
iso3 : str or None, optional
ISO3 country code to filter results. If None, returns all records.
Default is None.
include_historic : bool, optional
Whether to include historic disaster data (pre-2000). Default is False.
stage : Literal["dev", "prod"], optional
Environment stage to load from, by default "dev"
Returns
-------
pd.DataFrame
DataFrame containing EM-DAT disaster data, optionally filtered by country.
"""
iso3 = iso3.upper() if iso3 else iso3
blob_client = get_container_client(
container_name="global", stage=stage
).get_blob_client(EMDAT_FNAME)
url = blob_client.url
blob_properties = blob_client.get_blob_properties()
last_modified = blob_properties.last_modified
logger.info(f"EMDAT data last updated: {last_modified}")
con = duckdb.connect()
conditions = []
params = []
if iso3 is not None:
conditions.append(f"ISO = ${len(params) + 1}")
params.append(iso3)
if not include_historic:
conditions.append("Historic = 'No'")
query = f"SELECT * FROM read_parquet('{url}')"
if conditions:
query += " WHERE " + " AND ".join(conditions)
df = con.execute(query, params).df()
return df