import requests
from rclone_python import rclone
from datetime import datetime, timedelta
from aim.digifeeds.alma_client import AlmaClient
from aim.digifeeds.db_client import DBClient
from aim.services import S
from requests.exceptions import HTTPError
[docs]
class NotAddedToDigifeedsSetError(Exception):
pass
[docs]
class Item:
"""A Digifeeds Item
An item to be processed by the Digifeeds process.
Attributes:
data: The item
"""
def __init__(self, data: dict) -> None:
"""Initializes the instance with data argument.
Args:
data (dict): The item
"""
self.data = data
[docs]
def has_status(self, status: str) -> bool:
"""The status of this Digifeeds Item.
Args:
status (str): A Digifeeds status.
Returns:
bool: True if Digifeeds item has a status, Fales if Digifeeds item does not have a status.
"""
return any(s["name"] == status for s in self.data["statuses"])
[docs]
def add_to_digifeeds_set(self):
if self.has_status("added_to_digifeeds_set"):
return self
try:
AlmaClient().add_barcode_to_digifeeds_set(self.barcode)
except HTTPError as ext_inst:
errorList = ext_inst.response.json()["errorList"]["error"]
if any(e["errorCode"] == "60120" for e in errorList):
if not self.has_status("not_found_in_alma"):
item = Item(
DBClient().add_item_status(
barcode=self.barcode, status="not_found_in_alma"
)
)
return item
elif any(e["errorCode"] == "60115" for e in errorList):
# 60115 means the barcode is already in the set. That means the
# db entry from this barcdoe needs to have
# added_to_digifeeds_set
pass
else:
raise ext_inst
item = Item(
DBClient().add_item_status(
barcode=self.barcode, status="added_to_digifeeds_set"
)
)
return item
[docs]
def check_zephir(self):
if self.has_status("in_zephir"):
return self
response = requests.get(f"{S.zephir_bib_api_url}/mdp.{self.barcode}")
if response.status_code == 200:
db_resp = DBClient().add_item_status(
barcode=self.barcode, status="in_zephir"
)
return Item(db_resp)
else:
return None
[docs]
def move_to_pickup(self):
if not self.in_zephir_for_long_enough:
return None
DBClient().add_item_status(barcode=self.barcode, status="copying_start")
rclone.copyto(
f"{S.digifeeds_s3_rclone_remote}:{S.digifeeds_s3_input_path}/{self.barcode}.zip",
f"{S.digifeeds_pickup_rclone_remote}:{self.barcode}.zip",
)
DBClient().add_item_status(barcode=self.barcode, status="copying_end")
timestamp = datetime.now().strftime("%F_%H-%M-%S")
rclone.moveto(
f"{S.digifeeds_s3_rclone_remote}:{S.digifeeds_s3_input_path}/{self.barcode}.zip",
f"{S.digifeeds_s3_rclone_remote}:{S.digifeeds_s3_processed_path}/{timestamp}_{self.barcode}.zip",
)
db_resp = DBClient().add_item_status(
barcode=self.barcode, status="pending_deletion"
)
return Item(db_resp)
@property
def barcode(self) -> str:
"""The barcode of the Digifeeds item.
Returns:
str: The barcode.
"""
return self.data["barcode"]
@property
def in_zephir_for_long_enough(self) -> bool:
"""
Returns whether or not the item has had metadata in zephir for more than
14 days. The production database saves timestamps in Eastern Time. K8s
runs in UTC. Because this is checking days, this function doesn't set the
timezone because it's not close enough to matter.
Returns:
bool: whether or not the item's metadata has been in zephir for more than 14 days.
"""
waiting_period = 14 # days
in_zephir_status = next(
(
status
for status in self.data["statuses"]
if status["name"] == "in_zephir"
),
None,
)
if in_zephir_status is None:
return False
created_at = datetime.fromisoformat(in_zephir_status["created_at"])
if created_at < (datetime.now() - timedelta(days=waiting_period)):
return True
else:
return False
[docs]
def get_item(barcode: str) -> None:
return Item(DBClient().get_or_add_item(barcode))
[docs]
def process_item(item: Item) -> Item:
barcode = item.barcode
if item.has_status("pending_deletion"):
S.logger.info(
"already_processed",
message="item has already been moved so it does not need processing",
barcode=barcode,
)
return None
S.logger.info(
"add_to_digifeeds_set_start",
message="Start adding item to digifeeds set",
barcode=barcode,
)
add_to_set_item = item.add_to_digifeeds_set()
if add_to_set_item.has_status("not_found_in_alma"):
S.logger.info(
"not_found_in_alma", message="Item not found in alma.", barcode=barcode
)
if add_to_set_item.has_status("added_to_digifeeds_set"):
S.logger.info(
"added_to_digifeeds_set",
message="Item added to digifeeds set",
barcode=barcode,
)
else:
S.logger.error(
"not_added_to_digifeeds_set",
message="Item NOT added to digifeeds set",
barcode=barcode,
)
raise NotAddedToDigifeedsSetError()
check_zephir_item = add_to_set_item.check_zephir()
if check_zephir_item:
S.logger.info("in_zephir", message="Item is in zephir", barcode=barcode)
else:
S.logger.info("not_in_zephir", message="Item is NOT in zephir", barcode=barcode)
return check_zephir_item
move_to_pickup_item = check_zephir_item.move_to_pickup()
if move_to_pickup_item is None:
S.logger.info(
"not_in_zephir_long_enough",
message="Item has not been in zephir long enough",
barcode=barcode,
)
else:
S.logger.info(
"move_to_pickup_success",
message="Item has been successfully moved to pickup",
barcode=barcode,
)