Skip to content

Database Connections

Oxyde provides multiple ways to manage database connections.

Quick Start

from oxyde import db

# Initialize
await db.init(default="postgresql://localhost/mydb")

# Use models
users = await User.objects.all()

# Close
await db.close()

Connection URLs

Database URL Format
PostgreSQL postgresql://user:pass@host:5432/database
PostgreSQL postgres://user:pass@host:5432/database
SQLite (file) sqlite:///path/to/file.db
SQLite (memory) sqlite:///:memory:
MySQL mysql://user:pass@host:3306/database

High-Level API

db.init()

Initialize one or more database connections:

from oxyde import db, PoolSettings

# Single database
await db.init(default="postgresql://localhost/mydb")

# Multiple databases
await db.init(
    default="postgresql://localhost/main",
    analytics="postgresql://localhost/analytics",
    cache="sqlite:///:memory:",
)

# With custom settings
await db.init(
    default="postgresql://localhost/mydb",
    settings=PoolSettings(max_connections=20),
)

db.close()

Close all connections gracefully:

await db.close()

This rolls back any active transactions before closing.

db.connect()

Context manager for scripts and tests:

async with db.connect("sqlite:///:memory:") as conn:
    users = await User.objects.all()
# Connection closed automatically

With custom name:

async with db.connect("sqlite:///test.db", name="test") as conn:
    users = await User.objects.all(using="test")

db.lifespan()

FastAPI integration:

from fastapi import FastAPI
from oxyde import db

app = FastAPI(
    lifespan=db.lifespan(
        default="postgresql://localhost/mydb",
        settings=PoolSettings(max_connections=50),
    )
)

@app.get("/users")
async def get_users():
    return await User.objects.all()

AsyncDatabase

Low-level connection wrapper:

from oxyde import AsyncDatabase, PoolSettings

database = AsyncDatabase(
    "postgresql://localhost/mydb",
    name="default",
    settings=PoolSettings(max_connections=20),
)

# Manual lifecycle
await database.connect()
# ... use database ...
await database.disconnect()

# Or as context manager
async with database:
    users = await User.objects.all()

Pool Settings

Configure connection pool behavior:

from oxyde import PoolSettings
from datetime import timedelta

settings = PoolSettings(
    # Pool size
    max_connections=20,           # Maximum pool size
    min_connections=5,            # Minimum idle connections

    # Timeouts
    acquire_timeout=30.0,         # Max wait for connection (seconds)
    idle_timeout=600.0,           # Close idle connections after (seconds)
    max_lifetime=1800.0,          # Max connection age (seconds)

    # Health check
    test_before_acquire=True,     # Ping before using connection

    # Transaction cleanup (background task)
    transaction_timeout=300,      # Max transaction age (seconds)
    transaction_cleanup_interval=60,  # Cleanup check interval (seconds)
)

SQLite Settings

SQLite-specific PRAGMA settings (applied automatically):

settings = PoolSettings(
    # WAL mode for better concurrent writes (10-20x faster)
    sqlite_journal_mode="WAL",

    # Balance between speed and safety
    sqlite_synchronous="NORMAL",

    # Cache size in pages (~10MB)
    sqlite_cache_size=10000,

    # Lock timeout in milliseconds
    sqlite_busy_timeout=5000,
)

Default settings are optimized for most use cases.

Multiple Databases

Configuration

await db.init(
    default="postgresql://localhost/main",
    analytics="postgresql://localhost/analytics",
    legacy="mysql://localhost/old_system",
)

Using Specific Database

# Default database
users = await User.objects.all()

# Specific database
events = await Event.objects.all(using="analytics")
old_users = await LegacyUser.objects.all(using="legacy")

Transactions Across Databases

Each database has separate transactions:

from oxyde.db import transaction

# Transaction on default database
async with transaction.atomic():
    await User.objects.create(name="Alice")

# Transaction on analytics database
async with transaction.atomic(using="analytics"):
    await Event.objects.create(type="signup")

Connection Registry

Get Connection by Name

from oxyde import get_connection

conn = await get_connection("default")
print(conn.connected)  # True

Register Custom Connection

from oxyde import AsyncDatabase, register_connection

database = AsyncDatabase("postgresql://localhost/custom", name="custom")
register_connection(database)
await database.connect()

# Now available as "custom"
users = await User.objects.all(using="custom")

Disconnect All

from oxyde import disconnect_all

await disconnect_all()  # Close all registered connections

Error Handling

from oxyde import db
from oxyde.exceptions import ManagerError

try:
    await db.init(default="postgresql://invalid-host/db")
except Exception as e:
    print(f"Connection failed: {e}")

Best Practices

1. Use Context Managers for Scripts

async def main():
    async with db.connect("sqlite:///app.db"):
        # Automatic cleanup on exit
        await run_app()

2. Use Lifespan for Web Apps

app = FastAPI(lifespan=db.lifespan(default="postgresql://..."))

3. Configure Pool Size Based on Workload

# Web API: many short connections
PoolSettings(max_connections=50, min_connections=10)

# Background worker: fewer long connections
PoolSettings(max_connections=10, min_connections=2)

# SQLite: single connection is usually enough
PoolSettings(max_connections=1)

4. Set Appropriate Timeouts

# Production
PoolSettings(
    acquire_timeout=30,      # Don't wait forever
    idle_timeout=300,        # Close idle after 5 min
    max_lifetime=3600,       # Refresh connections hourly
)

Advanced: Multiple Databases

Per-Database Settings

from oxyde import AsyncDatabase, PoolSettings

# Main database: high concurrency
main_db = AsyncDatabase(
    "postgresql://localhost/main",
    name="default",
    settings=PoolSettings(
        max_connections=50,
        min_connections=10,
    ),
)

# Analytics: read-heavy, fewer connections
analytics_db = AsyncDatabase(
    "postgresql://localhost/analytics",
    name="analytics",
    settings=PoolSettings(
        max_connections=10,
        min_connections=2,
    ),
)

await main_db.connect()
await analytics_db.connect()

Read Replicas

await db.init(
    default="postgresql://primary/db",
    replica="postgresql://replica/db",
)

async def get_user(user_id: int):
    # Read from replica
    return await User.objects.get(id=user_id, using="replica")

async def update_user(user_id: int, **data):
    # Write to primary
    await User.objects.filter(id=user_id).update(**data)

Cross-Database Operations

async def sync_user_to_analytics(user_id: int):
    # Read from main
    user = await User.objects.get(id=user_id)

    # Write to analytics
    await UserProfile.objects.create(
        user_id=user.id,
        name=user.name,
        using="analytics"
    )

Dynamic Tenant Connections

from oxyde import AsyncDatabase

class TenantConnectionPool:
    def __init__(self):
        self._connections: dict[int, AsyncDatabase] = {}

    async def get_connection(self, tenant_id: int) -> AsyncDatabase:
        if tenant_id not in self._connections:
            tenant = await Tenant.objects.get(id=tenant_id)
            conn = AsyncDatabase(
                tenant.database_url,
                name=f"tenant_{tenant_id}",
            )
            await conn.connect()
            self._connections[tenant_id] = conn
        return self._connections[tenant_id]

    async def close_all(self):
        for conn in self._connections.values():
            await conn.disconnect()
        self._connections.clear()

tenant_pool = TenantConnectionPool()

FastAPI with Multiple Databases

from fastapi import FastAPI, Depends, Request
from oxyde import db, PoolSettings

app = FastAPI(
    lifespan=db.lifespan(
        default="postgresql://localhost/main",
        analytics="postgresql://localhost/analytics",
        settings=PoolSettings(max_connections=20),
    )
)

@app.get("/users")
async def get_users():
    return await User.objects.all()

@app.get("/events")
async def get_events():
    return await Event.objects.all(using="analytics")

Testing with Multiple Databases

import pytest

@pytest.fixture
async def test_dbs():
    await db.init(
        default="sqlite:///:memory:",
        analytics="sqlite:///:memory:",
    )
    yield
    await db.close()

@pytest.mark.asyncio
async def test_cross_db(test_dbs):
    user = await User.objects.create(name="Test")
    await Event.objects.create(
        type="test",
        user_id=user.id,
        using="analytics"
    )

    events = await Event.objects.filter(user_id=user.id).all(using="analytics")
    assert len(events) == 1

Next Steps