Queries¶
Oxyde uses a Django-style QuerySet API for database queries.
Query Builder Pattern¶
Queries are built by chaining methods. The query executes when you call a terminal method like all() or get().
# Build query (not executed yet)
query = User.objects.filter(status="active").order_by("-created_at").limit(10)
# Execute query
users = await query.all()
The Manager: Model.objects¶
Every model has an objects manager:
class User(OxydeModel):
class Meta:
is_table = True
# ...
# Access via Model.objects
users = await User.objects.all()
Query Methods¶
Retrieving Objects¶
all()¶
Get all records:
With filters:
get()¶
Get exactly one record (raises exception if not found or multiple found):
from oxyde.exceptions import NotFoundError, MultipleObjectsReturned
try:
user = await User.objects.get(id=1)
except NotFoundError:
print("User not found")
except MultipleObjectsReturned:
print("Multiple users found")
get_or_none()¶
Get one record or None:
user = await User.objects.get_or_none(email="alice@example.com")
if user:
print(f"Found: {user.name}")
first() / last()¶
Get first or last record by primary key:
With filters (use limit):
users = await User.objects.filter(status="active").order_by("-created_at").limit(1).all()
newest_active = users[0] if users else None
Filtering¶
filter()¶
Add WHERE conditions (AND):
# Simple equality
users = await User.objects.filter(status="active").all()
# Multiple conditions (AND)
users = await User.objects.filter(status="active", age__gte=18).all()
# Chained filters (also AND)
users = await User.objects.filter(status="active").filter(age__gte=18).all()
exclude()¶
Exclude matching records:
# NOT status = 'banned'
users = await User.objects.exclude(status="banned").all()
# Combine with filter
users = await User.objects.filter(age__gte=18).exclude(role="bot").all()
See Filtering for complete lookup reference.
Ordering¶
order_by()¶
Sort results:
# Ascending
users = await User.objects.order_by("name").all()
# Descending (prefix with -)
users = await User.objects.order_by("-created_at").all()
# Multiple columns
users = await User.objects.order_by("status", "-created_at").all()
Pagination¶
limit() / offset()¶
# First 10 records
users = await User.objects.limit(10).all()
# Skip 20, take 10 (pagination)
users = await User.objects.offset(20).limit(10).all()
Selecting Fields¶
values()¶
Return dictionaries instead of model instances:
# Select specific fields
users = await User.objects.values("id", "email").all()
# [{"id": 1, "email": "alice@example.com"}, ...]
values_list()¶
Return tuples:
# As tuples
users = await User.objects.values_list("id", "email").all()
# [(1, "alice@example.com"), ...]
# Flat list (single field only)
ids = await User.objects.values_list("id", flat=True).all()
# [1, 2, 3, ...]
distinct()¶
Remove duplicates:
Aggregation¶
count()¶
sum() / avg() / max() / min()¶
total = await Order.objects.sum("amount")
average = await User.objects.avg("age")
highest = await Product.objects.max("price")
lowest = await Product.objects.min("price")
See Aggregation for GROUP BY and HAVING.
Existence Check¶
exists()¶
has_admins = await User.objects.filter(role="admin").exists()
if has_admins:
print("At least one admin exists")
Creating Records¶
create()¶
user = await User.objects.create(
name="Alice",
email="alice@example.com",
age=30
)
print(user.id) # Auto-generated
bulk_create()¶
Insert multiple records efficiently:
users = [
User(name="Alice", email="alice@example.com"),
User(name="Bob", email="bob@example.com"),
User(name="Carol", email="carol@example.com"),
]
created = await User.objects.bulk_create(users)
With batching:
get_or_create()¶
Get existing or create new:
user, created = await User.objects.get_or_create(
email="alice@example.com",
defaults={"name": "Alice", "age": 30}
)
if created:
print("Created new user")
else:
print("Found existing user")
Updating Records¶
update()¶
Bulk update matching records:
# Returns count of affected rows
count = await User.objects.filter(status="pending").update(status="active")
With F expressions:
bulk_update()¶
Update multiple model instances:
users = await User.objects.filter(status="pending").all()
for user in users:
user.status = "active"
count = await User.objects.bulk_update(users, ["status"])
increment()¶
Atomic field increment:
await Post.objects.filter(id=1).increment("views", by=1)
await Product.objects.filter(id=1).increment("stock", by=-1) # Decrement
Deleting Records¶
delete()¶
Bulk delete matching records:
Joins¶
join()¶
Eager load related models:
# Load author with each post
posts = await Post.objects.join("author").all()
for post in posts:
print(f"{post.title} by {post.author.name}")
prefetch()¶
Load reverse relations:
# Load posts for each author
authors = await Author.objects.prefetch("posts").all()
for author in authors:
print(f"{author.name} has {len(author.posts)} posts")
See Relations for more.
Locking¶
for_update()¶
Lock rows for update (PostgreSQL/MySQL):
from oxyde.db import transaction
async with transaction.atomic():
user = await User.objects.filter(id=1).for_update().first()
user.balance -= 100
await user.save()
for_share()¶
Lock rows for reading:
async with transaction.atomic():
users = await User.objects.filter(status="active").for_share().all()
Union¶
Combine query results:
admins = User.objects.filter(role="admin")
moderators = User.objects.filter(role="moderator")
# UNION (distinct)
staff = await admins.union(moderators).all()
# UNION ALL (keep duplicates)
all_staff = await admins.union_all(moderators).all()
Debugging¶
sql()¶
Get generated SQL:
query = User.objects.filter(age__gte=18).limit(10)
sql, params = query.sql()
print(f"SQL: {sql}")
print(f"Params: {params}")
With specific dialect:
explain()¶
Get query plan:
plan = await User.objects.filter(age__gte=18).explain()
print(plan)
# With actual execution times
plan = await User.objects.filter(age__gte=18).explain(analyze=True)
Query Immutability¶
Queries are immutable. Each method returns a new query:
base = User.objects.filter(status="active")
admins = base.filter(role="admin") # base is unchanged
users = base.filter(role="user") # base is unchanged
# base, admins, and users are different queries
Specifying Database¶
Use using parameter to specify which database:
# Default database
users = await User.objects.all()
# Specific database
users = await User.objects.all(using="analytics")
Next Steps¶
- Filtering — Complete lookup reference
- Expressions — F expressions for database operations
- Aggregation — GROUP BY and aggregate functions