Source code for ocha_stratus.azure_database
import os
from typing import Literal
from sqlalchemy import create_engine
from sqlalchemy.dialects.postgresql import insert
[docs]
def get_engine(stage: Literal["dev", "prod"] = "dev", write: bool = False):
"""
Create a SQLAlchemy engine for connecting to Azure SQL Database.
Parameters
----------
stage : Literal["dev", "prod"], optional
Environment stage to connect to, by default "dev"
write : bool, optional
Whether write access is required
Returns
-------
sqlalchemy.engine.Engine
SQLAlchemy engine configured with the appropriate connection URL
Raises
------
ValueError
If the provided stage is neither "dev" nor "prod"
"""
AZURE_DB_PW_DEV = os.getenv("DSCI_AZ_DB_DEV_PW")
AZURE_DB_PW_PROD = os.getenv("DSCI_AZ_DB_PROD_PW")
AZURE_DB_PW_DEV_WRITE = os.getenv("DSCI_AZ_DB_DEV_PW_WRITE")
AZURE_DB_PW_PROD_WRITE = os.getenv("DSCI_AZ_DB_PROD_PW_WRITE")
DS_AZ_DB_DEV_HOST = os.getenv("DSCI_AZ_DB_DEV_HOST")
DS_AZ_DB_PROD_HOST = os.getenv("DSCI_AZ_DB_PROD_HOST")
AZURE_DB_UID_PROD = os.getenv("DSCI_AZ_DB_PROD_UID")
AZURE_DB_UID_DEV = os.getenv("DSCI_AZ_DB_DEV_UID")
AZURE_DB_UID_PROD_WRITE = os.getenv("DSCI_AZ_DB_PROD_UID_WRITE")
AZURE_DB_UID_DEV_WRITE = os.getenv("DSCI_AZ_DB_DEV_UID_WRITE")
AZURE_DB_BASE_URL = "postgresql+psycopg2://{uid}:{pw}@{host}/postgres"
if stage == "dev":
if write:
uid = AZURE_DB_UID_DEV_WRITE
pw = AZURE_DB_PW_DEV_WRITE
else:
uid = AZURE_DB_UID_DEV
pw = AZURE_DB_PW_DEV
url = AZURE_DB_BASE_URL.format(
uid=uid,
pw=pw,
host=DS_AZ_DB_DEV_HOST,
)
elif stage == "prod":
if write:
uid = AZURE_DB_UID_PROD_WRITE
pw = AZURE_DB_PW_PROD_WRITE
else:
uid = AZURE_DB_UID_PROD
pw = AZURE_DB_PW_PROD
url = AZURE_DB_BASE_URL.format(
uid=uid,
pw=pw,
host=DS_AZ_DB_PROD_HOST,
)
else:
raise ValueError(f"Invalid stage: {stage}")
return create_engine(url)
[docs]
def postgres_upsert(table, conn, keys, data_iter, constraint=None):
"""
Perform an upsert (insert or update) operation on a PostgreSQL table. Adapted from:
https://stackoverflow.com/questions/55187884/insert-into-postgresql-table-from-pandas-with-on-conflict-update # noqa: E501
Parameters
----------
table : sqlalchemy.sql.schema.Table
The SQLAlchemy Table object where the data will be inserted or updated.
conn : sqlalchemy.engine.Connection
The SQLAlchemy connection object used to execute the upsert operation.
keys : list of str
The list of column names used as keys for the upsert operation.
data_iter : iterable
An iterable of tuples or lists containing the data to be inserted or
updated.
constraint_name : str
Name of the uniqueness constraint
Returns
-------
None
"""
if not constraint:
constraint = f"{table.table.name}_unique"
data = [dict(zip(keys, row)) for row in data_iter]
insert_statement = insert(table.table).values(data)
upsert_statement = insert_statement.on_conflict_do_update(
constraint=constraint,
set_={c.key: c for c in insert_statement.excluded},
)
conn.execute(upsert_statement)
return