Files
forms/app/repositories/base.py
2026-04-09 19:28:41 +03:00

243 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from sqlalchemy.orm import Session
from sqlalchemy import func, and_, or_, desc, asc
from app.models.base import BaseModel
# Тип для модели SQLAlchemy
ModelType = TypeVar("ModelType", bound=BaseModel)
class BaseRepository(Generic[ModelType]):
"""
Базовый репозиторий с CRUD операциями.
Использует Generic для поддержки разных моделей.
"""
def __init__(self, db: Session, model: Type[ModelType]):
"""
Инициализация репозитория
Args:
db: Сессия SQLAlchemy
model: Класс модели
"""
self.db = db
self.model = model
def get_by_id(self, id: int) -> Optional[ModelType]:
"""Получить запись по ID"""
return self.db.query(self.model).filter(self.model.id == id).first()
def get_by_uuid(self, uuid: str) -> Optional[ModelType]:
"""Получить запись по UUID (если модель имеет поле uuid)"""
if hasattr(self.model, 'uuid'):
return self.db.query(self.model).filter(self.model.uuid == uuid).first()
raise AttributeError(f"Model {self.model.__name__} does not have 'uuid' field")
def get_all(
self,
skip: int = 0,
limit: int = 100,
order_by: Optional[str] = None,
descending: bool = False
) -> List[ModelType]:
"""Получить все записи с пагинацией"""
query = self.db.query(self.model)
if order_by and hasattr(self.model, order_by):
order_column = getattr(self.model, order_by)
if descending:
query = query.order_by(desc(order_column))
else:
query = query.order_by(asc(order_column))
elif hasattr(self.model, 'created_at'):
query = query.order_by(desc(self.model.created_at))
return query.offset(skip).limit(limit).all()
def create(self, obj_in: Dict[str, Any]) -> ModelType:
"""Создать новую запись"""
db_obj = self.model(**obj_in)
self.db.add(db_obj)
self.db.commit()
self.db.refresh(db_obj)
return db_obj
def create_many(self, objects_in: List[Dict[str, Any]]) -> List[ModelType]:
"""Создать несколько записей"""
db_objects = [self.model(**obj_in) for obj_in in objects_in]
self.db.add_all(db_objects)
self.db.commit()
for obj in db_objects:
self.db.refresh(obj)
return db_objects
def update(self, id: int, obj_in: Dict[str, Any]) -> Optional[ModelType]:
"""Обновить запись"""
db_obj = self.get_by_id(id)
if not db_obj:
return None
for field, value in obj_in.items():
if hasattr(db_obj, field):
setattr(db_obj, field, value)
self.db.commit()
self.db.refresh(db_obj)
return db_obj
def update_by_uuid(self, uuid: str, obj_in: Dict[str, Any]) -> Optional[ModelType]:
"""Обновить запись по UUID"""
db_obj = self.get_by_uuid(uuid)
if not db_obj:
return None
for field, value in obj_in.items():
if hasattr(db_obj, field):
setattr(db_obj, field, value)
self.db.commit()
self.db.refresh(db_obj)
return db_obj
def delete(self, id: int) -> bool:
"""Удалить запись"""
db_obj = self.get_by_id(id)
if not db_obj:
return False
self.db.delete(db_obj)
self.db.commit()
return True
def delete_by_uuid(self, uuid: str) -> bool:
"""Удалить запись по UUID"""
db_obj = self.get_by_uuid(uuid)
if not db_obj:
return False
self.db.delete(db_obj)
self.db.commit()
return True
def count(self, **filters) -> int:
"""Подсчитать количество записей с фильтрами"""
query = self.db.query(self.model)
for field, value in filters.items():
if hasattr(self.model, field):
query = query.filter(getattr(self.model, field) == value)
return query.count()
def exists(self, **filters) -> bool:
"""Проверить существование записи"""
return self.count(**filters) > 0
def filter_by(
self,
skip: int = 0,
limit: int = 100,
order_by: Optional[str] = None,
descending: bool = False,
**filters
) -> List[ModelType]:
"""Фильтрация записей по параметрам"""
query = self.db.query(self.model)
for field, value in filters.items():
if hasattr(self.model, field):
query = query.filter(getattr(self.model, field) == value)
if order_by and hasattr(self.model, order_by):
order_column = getattr(self.model, order_by)
if descending:
query = query.order_by(desc(order_column))
else:
query = query.order_by(asc(order_column))
return query.offset(skip).limit(limit).all()
def find_first(self, **filters) -> Optional[ModelType]:
"""Найти первую запись по фильтрам"""
query = self.db.query(self.model)
for field, value in filters.items():
if hasattr(self.model, field):
query = query.filter(getattr(self.model, field) == value)
return query.first()
def bulk_update(self, ids: List[int], obj_in: Dict[str, Any]) -> int:
"""Массовое обновление записей"""
updated_count = self.db.query(self.model).filter(
self.model.id.in_(ids)
).update(obj_in, synchronize_session=False)
self.db.commit()
return updated_count
def bulk_delete(self, ids: List[int]) -> int:
"""Массовое удаление записей"""
deleted_count = self.db.query(self.model).filter(
self.model.id.in_(ids)
).delete(synchronize_session=False)
self.db.commit()
return deleted_count
def get_or_create(self, defaults: Optional[Dict] = None, **filters) -> tuple[ModelType, bool]:
"""
Получить запись или создать новую
Returns:
tuple: (объект, создан_ли_новый)
"""
instance = self.find_first(**filters)
if instance:
return instance, False
if defaults:
filters.update(defaults)
return self.create(filters), True
def paginate(
self,
page: int = 1,
per_page: int = 20,
order_by: Optional[str] = None,
descending: bool = False,
**filters
) -> Dict[str, Any]:
"""
Пагинированный список записей
Returns:
Dict с ключами: items, total, page, per_page, pages
"""
query = self.db.query(self.model)
for field, value in filters.items():
if hasattr(self.model, field):
query = query.filter(getattr(self.model, field) == value)
total = query.count()
if order_by and hasattr(self.model, order_by):
order_column = getattr(self.model, order_by)
if descending:
query = query.order_by(desc(order_column))
else:
query = query.order_by(asc(order_column))
items = query.offset((page - 1) * per_page).limit(per_page).all()
return {
"items": items,
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page
}