Source code for fastapi_mongo_admin.services.repository

"""Collection repository for CRUD operations."""

from __future__ import annotations

from typing import Any, Union

from bson import ObjectId
from bson.errors import InvalidId

from fastapi_mongo_admin.admin.model import ModelAdmin
from fastapi_mongo_admin.db.async_backend import AsyncMotorBackend
from fastapi_mongo_admin.db.sync_backend import SyncPyMongoBackend
from fastapi_mongo_admin.exceptions import DocumentNotFoundError
from fastapi_mongo_admin.schemas.forms import parse_form_to_model
from fastapi_mongo_admin.schemas.inference import prepare_for_mongodb, serialize_document
from fastapi_mongo_admin.services.mapping import translate_from_db, translate_to_db
from fastapi_mongo_admin.services.queryset import (
    build_changelist_query,
    build_related_search_query,
    parse_ordering,
    resolve_changelist_ordering,
)

RELATED_LOOKUP_MIN_CHARS = 2
RELATED_LOOKUP_LIMIT = 25

Backend = Union[AsyncMotorBackend, SyncPyMongoBackend]






[docs] class CollectionRepository: """Repository wrapping sync or async MongoDB backends for one collection.""" def __init__(self, backend: Backend, model_admin: ModelAdmin) -> None: """Initialize the repository. Args: backend: Sync or async collection backend. model_admin: ModelAdmin configuration for validation and hooks. """ self.backend = backend self.model_admin = model_admin self.collection_name = model_admin.collection_name or "" self._is_async = isinstance(backend, AsyncMotorBackend) def _id_query(self, doc_id: str) -> dict[str, Any]: """Build a MongoDB ``_id`` query from a string id. Args: doc_id: Document id string. Returns: Query dict with ``ObjectId`` or raw id value. """ try: return {"_id": ObjectId(doc_id)} except (InvalidId, TypeError): return {"_id": doc_id}
[docs] async def list_documents( self, *, page: int = 1, search: str = "", filter_params: dict[str, str] | None = None, date_hierarchy_params: dict[str, str] | None = None, show_all: bool = False, request: Any = None, ) -> dict[str, Any]: """List documents with pagination, search, and filters. Args: page: Page number (1-based). search: Changelist search string. filter_params: Active list-filter query parameters. date_hierarchy_params: Optional date hierarchy drill-down params. show_all: When ``True``, use ``list_max_show_all`` as page size. request: Current request passed to queryset hooks. Returns: Dict with ``results``, ``total``, ``page``, ``per_page``, and ``num_pages`` keys. """ per_page = ( self.model_admin.list_max_show_all if show_all else self.model_admin.list_per_page ) skip = (max(page, 1) - 1) * per_page base = self.model_admin.get_queryset(request, {}) query = build_changelist_query( self.model_admin, search=search, filter_params=filter_params, date_hierarchy_params=date_hierarchy_params, base_query=base, ) ordering = resolve_changelist_ordering(self.model_admin, filter_params) sort = parse_ordering(ordering, self.model_admin.field_mapping) if self._is_async: total = await self.backend.count(query) # type: ignore[union-attr] docs = await self.backend.find( # type: ignore[union-attr] query, skip=skip, limit=per_page, sort=sort ) else: total = self.backend.count(query) # type: ignore[union-attr] docs = self.backend.find(query, skip=skip, limit=per_page, sort=sort) # type: ignore[union-attr] mapping = self.model_admin.field_mapping results = [serialize_document(translate_from_db(d, mapping)) for d in docs] results = await self._apply_select_related(results) return { "results": results, "total": total, "page": page, "per_page": per_page, "num_pages": max(1, (total + per_page - 1) // per_page), }
[docs] async def get_document(self, doc_id: str) -> dict[str, Any]: """Fetch a single serialized document by id. Args: doc_id: Document id string. Returns: Serialized document dict with an ``id`` field. Raises: DocumentNotFoundError: When no document matches the id. """ query = self._id_query(doc_id) if self._is_async: doc = await self.backend.find_one(query) # type: ignore[union-attr] else: doc = self.backend.find_one(query) # type: ignore[union-attr] if not doc: raise DocumentNotFoundError(doc_id, self.collection_name) return serialize_document(translate_from_db(doc, self.model_admin.field_mapping))
[docs] async def create_document(self, form_data: dict[str, Any], request: Any = None) -> str: """Create a document from form or JSON data. Args: form_data: Raw field values to validate and persist. request: Current request passed to hooks. Returns: New document id string. """ validated = parse_form_to_model( self.model_admin.model, form_data, readonly_fields=self.model_admin.get_readonly_fields(request), ) data = await self.model_admin.save_model(request, {}, validated, is_new=True) db_doc = prepare_for_mongodb(translate_to_db(data, self.model_admin.field_mapping)) if self._is_async: return await self.backend.insert_one(db_doc) # type: ignore[union-attr] return self.backend.insert_one(db_doc) # type: ignore[union-attr]
[docs] async def update_document( self, doc_id: str, form_data: dict[str, Any], request: Any = None ) -> bool: """Update a document from form or JSON data. Args: doc_id: Document id string. form_data: Raw field values to validate and persist. request: Current request passed to hooks. Returns: ``True`` when the backend reports a successful update. """ existing = await self.get_document(doc_id) validated = parse_form_to_model( self.model_admin.model, form_data, existing=existing, readonly_fields=self.model_admin.get_readonly_fields(request, existing), ) data = await self.model_admin.save_model(request, existing, validated, is_new=False) db_doc = prepare_for_mongodb(translate_to_db(data, self.model_admin.field_mapping)) query = self._id_query(doc_id) if self._is_async: return await self.backend.update_one(query, db_doc) # type: ignore[union-attr] return self.backend.update_one(query, db_doc) # type: ignore[union-attr]
[docs] async def delete_document(self, doc_id: str, request: Any = None) -> bool: """Delete a document by id. Args: doc_id: Document id string. request: Current request passed to ``delete_model`` hook. Returns: ``True`` when the backend reports a successful delete. """ doc = await self.get_document(doc_id) await self.model_admin.delete_model(request, doc) query = self._id_query(doc_id) if self._is_async: return await self.backend.delete_one(query) # type: ignore[union-attr] return self.backend.delete_one(query) # type: ignore[union-attr]
[docs] async def delete_many(self, doc_ids: list[str]) -> int: """Bulk delete documents by id. Args: doc_ids: Document id strings. Returns: Number of deleted documents. """ oids = [] for doc_id in doc_ids: try: oids.append(ObjectId(doc_id)) except (InvalidId, TypeError): oids.append(doc_id) query: dict[str, Any] = {"_id": {"$in": oids}} if self._is_async: return await self.backend.delete_many(query) # type: ignore[union-attr] return self.backend.delete_many(query) # type: ignore[union-attr]
async def _apply_select_related(self, results: list[dict[str, Any]]) -> list[dict[str, Any]]: """Batch-fetch related documents for ``list_select_related``. Args: results: Changelist result rows. Returns: Rows enriched with ``_{field}_related`` lookup data. """ related = self.model_admin.list_select_related if not related or not results: return results for field_name, related_collection in related.items(): ids = [r.get(field_name) for r in results if r.get(field_name)] if not ids: continue related_backend = getattr(self, "_related_backends", {}).get(related_collection) if related_backend is None: continue if self._is_async: related_docs = await related_backend.find_by_ids(ids) # type: ignore[union-attr] else: related_docs = related_backend.find_by_ids(ids) # type: ignore[union-attr] lookup = {str(d.get("_id")): d for d in related_docs} for row in results: ref_id = row.get(field_name) if ref_id is not None: row[f"_{field_name}_related"] = lookup.get(str(ref_id)) return results