API Reference¶
Azure Blob Storage¶
Utilities for working with Azure Blob Storage.
Container Operations¶
- ocha_stratus.get_container_client(container_name='projects', stage='dev', write=False)[source]¶
Get an Azure Blob Storage container client.
- Parameters:
- Returns:
Azure storage container client object
- Return type:
ContainerClient
File Operations¶
CSV Files¶
- ocha_stratus.upload_csv_to_blob(df, blob_name, stage='dev', container_name='projects', **kwargs)[source]¶
Upload a pandas DataFrame to Azure Blob Storage in CSV format.
- Parameters:
df (pandas.DataFrame) – DataFrame to upload
blob_name (str) – Name of the blob to create/update
stage (Literal["prod", "dev"], optional) – Environment stage to upload to, by default “dev”
container_name (str, optional) – Name of the container to upload to, by default “projects”
**kwargs (dict) – Additional arguments passed to pandas.DataFrame.to_csv()
Parquet Files¶
- ocha_stratus.upload_parquet_to_blob(df, blob_name, stage='dev', container_name='projects', **kwargs)[source]¶
Upload a pandas DataFrame or GeoDataFrame to Azure Blob Storage in parquet format.
- Parameters:
df (pandas.DataFrame or geopandas.GeoDataFrame) – DataFrame to upload
blob_name (str) – Name of the blob to create/update
stage (Literal["prod", "dev"], optional) – Environment stage to upload to, by default “dev”
container_name (str, optional) – Name of the container to upload to, by default “projects”
**kwargs (dict) – Additional arguments passed to DataFrame.to_parquet()
- ocha_stratus.load_parquet_from_blob(blob_name, stage='dev', container_name='projects')[source]¶
Load a parquet file from Azure Blob Storage into a pandas DataFrame.
- Parameters:
- Returns:
DataFrame containing the loaded data
- Return type:
Shapefiles¶
- ocha_stratus.upload_shp_to_blob(gdf, blob_name, stage='dev', container_name='projects')[source]¶
Upload a GeoDataFrame to Azure Blob Storage as a zipped shapefile.
- Parameters:
gdf (geopandas.GeoDataFrame) – GeoDataFrame to upload
blob_name (str) – Name of the blob to create/update
stage (Literal["prod", "dev"], optional) – Environment stage to upload to, by default “dev”
container_name (str, optional) – Name of the container to upload to, by default “projects”
- ocha_stratus.load_shp_from_blob(blob_name, shapefile=None, stage='dev', container_name='projects')[source]¶
Load a zipped shapefile from Azure Blob Storage into a GeoDataFrame. Downloads the Shapefile locally to a temporary directory.
- Parameters:
blob_name (str) – Name of the blob to load
shapefile (str, optional) – Name of the specific shapefile within the zip to load, by default None
stage (Literal["prod", "dev"], optional) – Environment stage to load from, by default “dev”
container_name (str, optional) – Name of the container to load from, by default “projects”
- Returns:
GeoDataFrame containing the loaded spatial data
- Return type:
Cloud Optimized GeoTIFFs¶
- ocha_stratus.upload_cog_to_blob(da, blob_name, stage='dev', container_name='projects')[source]¶
Upload an xarray DataArray as a Cloud Optimized GeoTIFF (COG) to Azure Blob Storage.
- Parameters:
da (xarray.DataArray) – DataArray containing the raster data to upload
blob_name (str) – Name of the blob to create/update
stage (Literal["prod", "dev"], optional) – Environment stage to upload to, by default “dev”
container_name (str, optional) – Name of the container to upload to, by default “projects”
- ocha_stratus.open_blob_cog(blob_name, stage='dev', container_name='projects', chunks=None, container_client=None)[source]¶
Open a Cloud Optimized GeoTIFF (COG) from Azure Blob Storage.
- Parameters:
blob_name (str) – Name of the COG blob
stage (Literal["prod", "dev"], optional) – Environment stage, by default “dev”
container_name (str, optional) – Name of the container, by default “projects”
chunks (bool or dict, optional) – Chunk size for dask array, by default None
container_client (ContainerClient, optional) – Azure ContainerClient in which the blob is located
- Returns:
DataArray containing the raster data
- Return type:
Generic data¶
- ocha_stratus.upload_blob_data(data, blob_name, stage='dev', container_name='projects', content_type=None)[source]¶
Internal function to upload raw data to Azure Blob Storage.
- Parameters:
data (bytes or BinaryIO) – Data to upload
blob_name (str) – Name of the blob to create/update
stage (Literal["prod", "dev"], optional) – Environment stage to upload to, by default “dev”
container_name (str, optional) – Name of the container to upload to, by default “projects”
content_type (str, optional) – MIME type of the content, by default None
Database Operations¶
Utilities for working with Azure PostgreSQL databases.
- ocha_stratus.get_engine(stage='dev', write=False)[source]¶
Create a SQLAlchemy engine for connecting to Azure SQL Database.
- Parameters:
stage (Literal["dev", "prod"], optional) – Environment stage to connect to, by default “dev”
write (bool, optional) – Whether write access is required
- Returns:
SQLAlchemy engine configured with the appropriate connection URL
- Return type:
- Raises:
ValueError – If the provided stage is neither “dev” nor “prod”
- ocha_stratus.postgres_upsert(table, conn, keys, data_iter, constraint=None)[source]¶
Perform an upsert (insert or update) operation on a PostgreSQL table. Adapted from: https://stackoverflow.com/questions/55187884/insert-into-postgresql-table-from-pandas-with-on-conflict-update # noqa: E501
- Parameters:
table (sqlalchemy.sql.schema.Table) – The SQLAlchemy Table object where the data will be inserted or updated.
conn (sqlalchemy.engine.Connection) – The SQLAlchemy connection object used to execute the upsert operation.
keys (list of str) – The list of column names used as keys for the upsert operation.
data_iter (iterable) – An iterable of tuples or lists containing the data to be inserted or updated.
constraint_name (str) – Name of the uniqueness constraint
- Return type:
None
Cloud-Optimized GeoTIFF (COG) Operations¶
Utilities for working with standard COG datasets.
- ocha_stratus.stack_cogs(dataset, dates, stage='prod', clip_gdf=None, mode='interactive')[source]¶
Stack Cloud Optimized GeoTIFFs (COGs) from Azure Blob Storage into a single xarray Dataset.
Retrieves and combines multiple COG files for a specified dataset and date range from Azure Blob Storage, returning a unified xarray Dataset with temporal and optional leadtime dimensions.
- Parameters:
dataset ({"imerg", "seas5", "era5", "floodscan"}) – Name of the dataset to retrieve COGs for. Used as prefix for blob name filtering.
dates (List[str] or List) – Collection of dates to filter COGs by. Should match ‘YYYY-MM-DD’ format. Will reference the issued date of the dataset (although for non-forecast datasets this is equivalent to the valid date).
clip_gdf (GeoDataFrame, optional) – GeoPandas DataFrame containing geometries to clip the COGs to. If provided, each COG will be clipped to the union of all geometries in the DataFrame before stacking. The GeoDataFrame should be in the same CRS as the COGs.
stage (str, optional) – Deployment stage for the container client, by default “prod”. Determines which Azure storage environment to connect to.
mode ({"interactive", "pipeline"}, optional) – Processing mode, by default “interactive”. If “interactive”, displays a progress bar using tqdm during processing.
- Returns:
Combined dataset with all COGs stacked along temporal dimensions. Contains ‘date’ dimension and optional ‘leadtime’ dimension if present in the source data. Attributes from individual COGs are dropped during combination. If clip_gdf is provided, data will be clipped to the specified geometries.
- Return type:
- Raises:
Exception – If no COGs are found matching the specified dataset and dates.
Warning
Logs a warning if the number of found COGs doesn’t match the number of input dates, indicating some requested dates may not have available data.
Notes
Only processes COGs containing “processed” in their filename
Handles both issued and valid date types based on COG metadata
Automatically expands dimensions to include ‘date’ and ‘leadtime’ (if present)
Uses xr.combine_by_coords to merge datasets, which requires consistent
coordinate systems across all input COGs
Datasets¶
Dataset-specific loading functions.
Administrative boundaries¶
- ocha_stratus.codab.load_codab_from_blob(iso3, admin_level=0, stage='prod')[source]¶
Load COD-AB boundaries from Fieldmaps cached in Azure Blob Storage. Data downloaded from https://fieldmaps.io/data/cod.
- Parameters:
- Returns:
GeoDataFrame containing administrative boundaries for the specified country and level
- Return type:
- ocha_stratus.codab.load_codab_from_fieldmaps(iso3, admin_level=0)[source]¶
Load COD-AB boundaries directly into memory from FieldMaps GeoParquet files. Data is from the global edge-matched subnational boundary layers here: https://fieldmaps.io/data.
- Parameters:
- Returns:
GeoDataFrame containing administrative boundaries for the specified country and level
- Return type:
CERF funding allocations¶
- ocha_stratus.cerf.load_cerf_from_blob(iso3=None, stage='dev')[source]¶
Load CERF funding data from Azure blob storage.
Retrieves CERF (Central Emergency Response Fund) data stored as a Parquet file in Azure blob storage, with optional filtering by country ISO3 code. Data downloaded from https://data.humdata.org/dataset/cerf-allocations and manually transformed to parquet and uploaded to blob.
- Parameters:
iso3 (str or None, optional) – ISO3 country code to filter results. If None, returns all records. Default is None.
stage (Literal["dev", "prod"], optional) – Environment stage to load from, by default “dev”
- Returns:
DataFrame containing CERF funding data, optionally filtered by country.
- Return type:
pd.DataFrame
EM-DAT disaster records¶
- ocha_stratus.emdat.load_emdat_from_blob(iso3=None, include_historic=False, stage='dev')[source]¶
Load EM-DAT disaster data from Azure blob storage. See here for a description of columns: https://doc.emdat.be/docs/data-structure-and-content/emdat-public-table/#column-description
- Parameters:
iso3 (str or None, optional) – ISO3 country code to filter results. If None, returns all records. Default is None.
include_historic (bool, optional) – Whether to include historic disaster data (pre-2000). Default is False.
stage (Literal["dev", "prod"], optional) – Environment stage to load from, by default “dev”
- Returns:
DataFrame containing EM-DAT disaster data, optionally filtered by country.
- Return type:
pd.DataFrame