Source code for aim.digifeeds.database.crud

"""Digifeeds Crud operations
============================

Operations that act on the digifeeds database
"""

from sqlalchemy import select, func
from sqlalchemy.orm import Session, joinedload
from aim.digifeeds.database import schemas
from aim.digifeeds.database import models


[docs] class NotFoundError(Exception): pass
[docs] class AlreadyExistsError(Exception): pass
[docs] def get_item(db: Session, barcode: str): """ Get item from the database Args: db (sqlalchemy.orm.Session): Digifeeds database session barcode (str): Barcode of the item Returns: aim.digifeeds.database.models.Item: Item object """ stmnt = ( select(models.Item) .filter_by(barcode=barcode) .options( # this is here so when we delete the barcode the statuses show up in the output joinedload(models.Item.statuses) ) ) item = db.scalars(stmnt).first() if item is None: raise NotFoundError() else: return item
[docs] def get_items_total(db: Session, filter: schemas.ItemFilters = None): stmnt = get_items_statement(filter=filter) return db.execute(select(func.count()).select_from(stmnt.subquery())).scalar_one()
[docs] def get_items( db: Session, limit: int, offset: int, filter: schemas.ItemFilters = None, ): """ Get Digifeed items from the database Args: db (sqlalchemy.orm.Session): Digifeeds database session filter (schemas.ItemFilters | None): filter to apply to the list of items. Returns: aim.digifeeds.database.models.Item: Item object """ stmnt = get_items_statement(filter=filter).offset(offset).limit(limit) return db.scalars(stmnt).all()
[docs] def get_items_statement(filter: schemas.ItemFilters = None): stmnt = select(models.Item) match filter: case "in_zephir": stmnt = stmnt.where( models.Item.statuses.any(models.ItemStatus.status_name == "in_zephir") ) case "not_in_zephir": stmnt = stmnt.where( ~models.Item.statuses.any(models.ItemStatus.status_name == "in_zephir") ) case "pending_deletion": stmnt = stmnt.where( models.Item.statuses.any( models.ItemStatus.status_name == "pending_deletion" ) ) case "not_pending_deletion": stmnt = stmnt.where( ~models.Item.statuses.any( models.ItemStatus.status_name == "pending_deletion" ) ) case "not_found_in_alma": stmnt = stmnt.where( models.Item.statuses.any( models.ItemStatus.status_name == "not_found_in_alma" ) ) return stmnt
[docs] def add_item(db: Session, item: schemas.ItemCreate): """Add an item to the database. All you need is a barcode. Args: db (sqlalchemy.orm.Session): Digifeeds database session item (schemas.ItemCreate): Item object with a barcode Returns: aim.digifeeds.database.models.Item: Item object """ db_item = models.Item(barcode=item.barcode) stmnt = select(models.Item).filter_by(barcode=item.barcode) already_in_db_item = db.scalars(stmnt).first() if already_in_db_item is not None: raise AlreadyExistsError() db.add(db_item) db.commit() db.refresh(db_item) return db_item
[docs] def get_status(db: Session, name: str): """Gets a given status from the database based on the name Args: db (sqlalchemy.orm.Session): Digifeeds database session name (str): Name of the status Returns: aim.digifeeds.database.models.Status: Status object """ stmnt = select(models.Status).filter_by(name=name) status = db.scalars(stmnt).first() if status is None: raise NotFoundError() else: return status
[docs] def get_statuses(db: Session): """Gets statuses from the database Args: db (sqlalchemy.orm.Session): Digifeeds database session Returns: aim.digifeeds.database.models.Status: Status object """ return db.scalars(select(models.Status)).all()
[docs] def add_item_status(db: Session, item: models.Item, status: models.Status): """Add a status to an item in the database Args: db (sqlalchemy.orm.Session): Digifeeds database session item (models.Item): Item object status (models.Status): Status Returns: aim.digifeeds.database.models.Item: Item object """ db_item_status = models.ItemStatus(item=item, status=status) db.add(db_item_status) db.commit() db.refresh(item) return item
[docs] def delete_item(db: Session, barcode: str): db_item = get_item(db=db, barcode=barcode) # need to load this now so the statuses show up in the return item = schemas.Item(**db_item.__dict__) db.delete(db_item) db.commit() return item