# psycopg3 Direct PostgreSQL Implementation
# SPDX-License-Identifier: Apache-2.0
"""
Direct PostgreSQL database implementation using psycopg3.
This module provides a high-performance PostgreSQL backend using psycopg3
(the modern successor to psycopg2). It offers better async support and
native connection pooling.
Example:
>>> from structum_lab.plugins.database import PostgresDatabase
>>>
>>> db = PostgresDatabase(url="postgresql://user:pass@localhost/mydb")
>>> with db.transaction() as conn:
... conn.execute("SELECT * FROM users")
... users = conn.fetchall()
"""
from __future__ import annotations
import time
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
from structum_lab.database.interfaces import (
ConnectionInterface,
HealthCheckResult,
HealthStatus,
)
from .base import BaseDatabase
if TYPE_CHECKING:
from psycopg_pool import ConnectionPool # type: ignore[import-not-found]
[docs]
class PostgresConnection:
"""psycopg3 connection wrapper implementing ConnectionInterface."""
[docs]
def __init__(self, conn: Any) -> None:
"""Initialize the connection wrapper.
Args:
conn: A psycopg3 connection object.
"""
self._conn = conn
self._cursor: Any = None
[docs]
def execute(
self,
query: str,
params: dict[str, Any] | tuple[Any, ...] | None = None,
) -> Any:
"""Execute SQL query."""
self._cursor = self._conn.cursor()
if params:
if isinstance(params, dict):
# Convert named params to psycopg format
self._cursor.execute(query, params)
else:
self._cursor.execute(query, params)
else:
self._cursor.execute(query)
return self._cursor
[docs]
def fetchone(self) -> dict[str, Any] | None:
"""Fetch one row as dict[str, Any]."""
if self._cursor is None:
return None
row = self._cursor.fetchone()
if row is None:
return None
# Get column names from cursor description
columns = [desc[0] for desc in self._cursor.description]
return dict(zip(columns, row, strict=False))
[docs]
def fetchall(self) -> list[dict[str, Any]]:
"""Fetch all rows as list[Any] of dicts."""
if self._cursor is None:
return []
rows = self._cursor.fetchall()
columns = [desc[0] for desc in self._cursor.description]
return [dict(zip(columns, row, strict=False)) for row in rows]
[docs]
def fetchmany(self, size: int) -> list[dict[str, Any]]:
"""Fetch up to size rows."""
if self._cursor is None:
return []
rows = self._cursor.fetchmany(size)
columns = [desc[0] for desc in self._cursor.description]
return [dict(zip(columns, row, strict=False)) for row in rows]
[docs]
class PostgresDatabase(BaseDatabase):
"""Direct PostgreSQL database implementation using psycopg3.
High-performance PostgreSQL backend with:
- Native connection pooling (psycopg_pool)
- Better async support than psycopg2
- Modern Python type support
Example:
>>> db = PostgresDatabase(url="postgresql://...")
>>> db.connect()
>>>
>>> with db.transaction() as conn:
... conn.execute("INSERT INTO users (name) VALUES (%(name)s)", {"name": "John"})
>>>
>>> db.close()
Note:
This backend only supports PostgreSQL. For other databases,
use SQLAlchemyDatabase instead.
"""
[docs]
def __init__(
self,
url: str,
*,
pool_size: int = 5,
pool_timeout: int = 30,
echo: bool = False,
min_size: int = 1,
**kwargs: Any,
) -> None:
"""Initialize PostgreSQL database.
Args:
url: PostgreSQL connection URL
pool_size: Maximum pool size
pool_timeout: Pool acquisition timeout
echo: Log SQL queries (not directly supported, use logging)
min_size: Minimum pool size
**kwargs: Additional psycopg options
"""
super().__init__(
url=url,
pool_size=pool_size,
pool_timeout=pool_timeout,
echo=echo,
**kwargs,
)
self._min_size = min_size
self._pool: ConnectionPool | None = None
# Validate URL is PostgreSQL
parsed = urlparse(url)
if parsed.scheme not in ("postgresql", "postgres"):
raise ValueError(
f"PostgresDatabase only supports PostgreSQL URLs, got: {parsed.scheme}"
)
[docs]
def connect(self) -> None:
"""Create connection pool."""
if self._connected:
return
from psycopg_pool import ConnectionPool # type: ignore[import-not-found]
self._pool = ConnectionPool(
conninfo=self._url,
min_size=self._min_size,
max_size=self._pool_size,
timeout=self._pool_timeout,
)
self._connected = True
[docs]
def close(self) -> None:
"""Close connection pool."""
if self._pool is not None:
self._pool.close()
self._pool = None
self._connected = False
[docs]
def get_connection(self) -> ConnectionInterface:
"""Get a connection from the pool.
Note:
Prefer using `transaction()` context manager for proper cleanup.
"""
if not self._connected:
self.connect()
assert self._pool is not None
conn = self._pool.getconn()
return PostgresConnection(conn)
[docs]
@contextmanager
def transaction(self) -> Iterator[ConnectionInterface]:
"""Execute operations within a transaction.
Automatically commits on success, rolls back on exception.
Yields:
Connection with active transaction
"""
if not self._connected:
self.connect()
assert self._pool is not None
with self._pool.connection() as conn:
pg_conn = PostgresConnection(conn)
try:
yield pg_conn
conn.commit()
except Exception:
conn.rollback()
raise
[docs]
def health_check(self) -> HealthCheckResult:
"""Check database connectivity.
Returns:
HealthCheckResult with status and latency
"""
if not self._connected:
return HealthCheckResult(
status=HealthStatus.UNHEALTHY,
message="Database not connected",
)
try:
start = time.perf_counter()
with self.transaction() as conn:
conn.execute("SELECT 1")
latency_ms = (time.perf_counter() - start) * 1000
return HealthCheckResult(
status=HealthStatus.HEALTHY,
message="PostgreSQL connection OK",
latency_ms=round(latency_ms, 2),
)
except Exception as e:
return HealthCheckResult(
status=HealthStatus.UNHEALTHY,
message=f"PostgreSQL error: {e}",
)