"""ACRO: Output storage and serialization."""
from __future__ import annotations
import datetime
import hashlib
import json
import logging
import os
import shutil
from pathlib import Path
from typing import Any
import pandas as pd
from pandas import DataFrame
from .version import __version__
logger = logging.getLogger("acro:records")
[docs]def load_outcome(outcome: dict) -> DataFrame:
"""Return a DataFrame from an outcome dictionary.
Parameters
----------
outcome : dict
The outcome to load as a DataFrame.
"""
return pd.DataFrame.from_dict(outcome)
[docs]def load_output(path: str, output: list[str]) -> list[str] | list[DataFrame]:
"""Return a loaded output.
Parameters
----------
path : str
The path to the output folder (with results.json).
output : list[str]
The output to load.
Returns
-------
list[str] | list[DataFrame]
The loaded output field.
"""
if len(output) < 1:
raise ValueError("error loading output")
loaded: list[DataFrame] = []
for filename in output:
_, ext = os.path.splitext(filename)
if ext == ".csv":
filename = os.path.normpath(f"{path}/{filename}")
loaded.append(pd.read_csv(filename))
if len(loaded) < 1: # output is path(s) to custom file(s)
return output
return loaded
[docs]class Record: # pylint: disable=too-many-instance-attributes
"""Stores data related to a single output record.
Attributes
----------
uid : str
Unique identifier.
status : str
SDC status: {"pass", "fail", "review"}
output_type : str
Type of output, e.g., "regression"
properties : dict
Dictionary containing structured output data.
sdc : dict
Dictionary containing SDC results.
command : str
String representation of the operation performed.
summary : str
String summarising the ACRO checks.
outcome : DataFrame
DataFrame describing the details of ACRO checks.
output : Any
List of output DataFrames.
comments : list[str]
List of strings entered by the user to add comments to the output.
exception : str
Description of why an exception to fail/review should be granted.
timestamp : str
Time the record was created in ISO format.
"""
def __init__( # pylint: disable=too-many-arguments
self,
uid: str,
status: str,
output_type: str,
properties: dict,
sdc: dict,
command: str,
summary: str,
outcome: DataFrame,
output: list[str] | list[DataFrame],
comments: list[str] | None = None,
) -> None:
"""Construct a new output record.
Parameters
----------
uid : str
Unique identifier.
status : str
SDC status: {"pass", "fail", "review"}
output_type : str
Type of output, e.g., "regression"
properties : dict
Dictionary containing structured output data.
sdc : dict
Dictionary containing SDC results.
command : str
String representation of the operation performed.
summary : str
String summarising the ACRO checks.
outcome : DataFrame
DataFrame describing the details of ACRO checks.
output : list[str] | list[DataFrame]
List of output DataFrames.
comments : list[str] | None, default None
List of strings entered by the user to add comments to the output.
"""
self.uid: str = uid
self.status: str = status
self.output_type: str = output_type
self.properties: dict = properties
self.sdc: dict = sdc
self.command: str = command
self.summary: str = summary
self.outcome: DataFrame = outcome
self.output: Any = output
self.comments: list[str] = [] if comments is None else comments
self.exception: str = ""
now = datetime.datetime.now()
self.timestamp: str = now.isoformat()
[docs] def serialize_output(self, path: str = "outputs") -> list[str]:
"""Serialize outputs.
Parameters
----------
path : str, default 'outputs'
Name of the folder that outputs are to be written.
Returns
-------
list[str]
List of filepaths of the written outputs.
"""
output: list[str] = []
# check if the outputs directory was already created
try: # pragma: no cover
os.makedirs(path)
logger.debug("Directory %s created successfully", path)
except FileExistsError:
logger.debug("Directory %s already exists", path)
# save each output DataFrame to a different csv
if all(isinstance(obj, DataFrame) for obj in self.output):
for i, data in enumerate(self.output):
filename = f"{self.uid}_{i}.csv"
output.append(filename)
filename = os.path.normpath(f"{path}/{filename}")
with open(filename, mode="w", newline="", encoding="utf-8") as file:
file.write(data.to_csv())
# move custom files to the output folder
if self.output_type == "custom":
for filename in self.output:
if os.path.exists(filename):
shutil.copy(filename, path)
output.append(Path(filename).name)
if self.output_type in ["survival plot", "histogram"]:
for filename in self.output:
if os.path.exists(filename):
output.append(Path(filename).name)
shutil.copy(filename, path)
return output
def __str__(self) -> str:
"""Return a string representation of a record.
Returns
-------
str
The record.
"""
return (
f"uid: {self.uid}\n"
f"status: {self.status}\n"
f"type: {self.output_type}\n"
f"properties: {self.properties}\n"
f"sdc: {self.sdc}\n"
f"command: {self.command}\n"
f"summary: {self.summary}\n"
f"outcome: {self.outcome}\n"
f"output: {self.output}\n"
f"timestamp: {self.timestamp}\n"
f"comments: {self.comments}\n"
f"exception: {self.exception}\n"
)
[docs]class Records:
"""Stores data related to a collection of output records."""
def __init__(self) -> None:
"""Construct a new object for storing multiple records."""
self.results: dict[str, Record] = {}
self.output_id: int = 0
[docs] def add( # pylint: disable=too-many-arguments
self,
status: str,
output_type: str,
properties: dict,
sdc: dict,
command: str,
summary: str,
outcome: DataFrame,
output: list[str] | list[DataFrame],
comments: list[str] | None = None,
) -> None:
"""Add an output to the results.
Parameters
----------
status : str
SDC status: {"pass", "fail", "review"}
output_type : str
Type of output, e.g., "regression"
properties : dict
Dictionary containing structured output data.
sdc : dict
Dictionary containing SDC results.
command : str
String representation of the operation performed.
summary : str
String summarising the ACRO checks.
outcome : DataFrame
DataFrame describing the details of ACRO checks.
output : list[str | list[DataFrame]
List of output DataFrames.
comments : list[str] | None, default None
List of strings entered by the user to add comments to the output.
"""
new = Record(
uid=f"output_{self.output_id}",
status=status,
output_type=output_type,
properties=properties,
sdc=sdc,
command=command,
summary=summary,
outcome=outcome,
output=output,
comments=comments,
)
self.results[new.uid] = new
self.output_id += 1
logger.info("add(): %s", new.uid)
[docs] def remove(self, key: str) -> None:
"""Remove an output from the results.
Parameters
----------
key : str
Key specifying which output to remove, e.g., 'output_0'.
"""
if key not in self.results:
raise ValueError(f"unable to remove {key}, key not found")
del self.results[key]
logger.info("remove(): %s removed", key)
[docs] def get(self, key: str) -> Record:
"""Return a specified output from the results.
Parameters
----------
key : str
Key specifying which output to return, e.g., 'output_0'.
Returns
-------
Record
The requested output.
"""
logger.debug("get(): %s ", key)
return self.results[key]
[docs] def get_keys(self) -> list[str]:
"""Return the list of available output keys.
Returns
-------
list[str]
List of output names.
"""
logger.debug("get_keys()")
return list(self.results.keys())
[docs] def get_index(self, index: int) -> Record:
"""Return the output at the specified position.
Parameters
----------
index : int
Position of the output to return.
Returns
-------
Record
The requested output.
"""
logger.debug("get_index(): %s", index)
key = list(self.results.keys())[index]
return self.results[key]
[docs] def add_custom(self, filename: str, comment: str | None = None) -> None:
"""Add an unsupported output to the results dictionary.
Parameters
----------
filename : str
The name of the file that will be added to the list of the outputs.
comment : str | None, default None
An optional comment.
"""
if os.path.exists(filename):
output = Record(
uid=f"output_{self.output_id}",
status="review",
output_type="custom",
properties={},
sdc={},
command="custom",
summary="review",
outcome=DataFrame(),
output=[os.path.normpath(filename)],
comments=None if comment is None else [comment],
)
self.results[output.uid] = output
self.output_id += 1
logger.info("add_custom(): %s", output.uid)
else:
logger.info(
"WARNING: Unable to add %s because the file does not exist", filename
) # pragma: no cover
[docs] def rename(self, old: str, new: str) -> None:
"""Rename an output.
Parameters
----------
old : str
The old name of the output.
new : str
The new name of the output.
"""
if old not in self.results:
raise ValueError(f"unable to rename {old}, key not found")
if new in self.results:
raise ValueError(f"unable to rename, {new} already exists")
self.results[new] = self.results[old]
self.results[new].uid = new
del self.results[old]
logger.info("rename_output(): %s renamed to %s", old, new)
[docs] def add_exception(self, output: str, reason: str) -> None:
"""Add an exception request to an output.
Parameters
----------
output : str
The name of the output.
reason : str
The reason the output should be released.
"""
if output not in self.results:
raise ValueError(f"unable to add exception: {output} not found")
self.results[output].exception = reason
logger.info("exception request was added to %s", output)
[docs] def print(self) -> str:
"""Print the current results.
Returns
-------
str
String representation of all outputs.
"""
logger.debug("print()")
outputs: str = ""
for _, record in self.results.items():
outputs += str(record) + "\n"
print(outputs)
return outputs
[docs] def validate_outputs(self) -> None:
"""Prompt researcher to complete any required fields."""
for _, record in self.results.items():
if record.status != "pass" and record.exception == "":
logger.info(
"\n%s\n"
"The status of the record above is: %s.\n"
"Please explain why an exception should be granted.\n",
str(record),
record.status,
)
record.exception = input("")
[docs] def finalise(self, path: str, ext: str) -> None:
"""Create a results file for checking.
Parameters
----------
path : str
Name of a folder to save outputs.
ext : str
Extension of the results file. Valid extensions: {json, xlsx}.
"""
logger.debug("finalise()")
self.validate_outputs()
if ext == "json":
self.finalise_json(path)
elif ext == "xlsx":
self.finalise_excel(path)
else:
raise ValueError("Invalid file extension. Options: {json, xlsx}")
self.write_checksums(path)
# check if the directory acro_artifacts exists and delete it
if os.path.exists("acro_artifacts"):
shutil.rmtree("acro_artifacts")
logger.info("outputs written to: %s", path)
[docs] def finalise_json(self, path: str) -> None:
"""Write outputs to a JSON file.
Parameters
----------
path : str
Name of a folder to save outputs.
"""
outputs: dict = {}
for key, val in self.results.items():
outputs[key] = {
"uid": val.uid,
"status": val.status,
"type": val.output_type,
"properties": val.properties,
"files": [],
"outcome": json.loads(val.outcome.to_json()),
"command": val.command,
"summary": val.summary,
"timestamp": val.timestamp,
"comments": val.comments,
"exception": val.exception,
}
files: list[str] = val.serialize_output(path)
for file in files:
outputs[key]["files"].append({"name": file, "sdc": val.sdc})
results: dict = {"version": __version__, "results": outputs}
filename: str = os.path.normpath(f"{path}/results.json")
try:
with open(filename, "w", newline="", encoding="utf-8") as handle:
json.dump(results, handle, indent=4, sort_keys=False)
except FileNotFoundError: # pragma: no cover
logger.info(
"You don't have any output in the acro object. "
"Directory %s will not be created.",
path,
)
[docs] def finalise_excel(self, path: str) -> None:
"""Write outputs to an excel spreadsheet.
Parameters
----------
path : str
Name of a folder to save outputs.
"""
filename: str = os.path.normpath(f"{path}/results.xlsx")
try: # check if the directory was already created
os.makedirs(path, exist_ok=True)
logger.debug("Directory %s created successfully", path)
except FileExistsError: # pragma: no cover
logger.debug("Directory %s already exists", path)
with pd.ExcelWriter( # pylint: disable=abstract-class-instantiated
filename, engine="openpyxl"
) as writer:
# description sheet
sheet = []
summary = []
command = []
for output_id, output in self.results.items():
if output.output_type == "custom":
continue # avoid writing custom outputs
sheet.append(output_id)
command.append(output.command)
summary.append(output.summary)
tmp_df = pd.DataFrame(
{"Sheet": sheet, "Command": command, "Summary": summary}
)
tmp_df.to_excel(writer, sheet_name="description", index=False, startrow=0)
# individual sheets
for output_id, output in self.results.items():
if output.output_type == "custom":
continue # avoid writing custom outputs
# command and summary
start = 0
tmp_df = pd.DataFrame(
[output.command, output.summary], index=["Command", "Summary"]
)
tmp_df.to_excel(writer, sheet_name=output_id, startrow=start)
# outcome
if output.outcome is not None:
output.outcome.to_excel(writer, sheet_name=output_id, startrow=4)
# output
for table in output.output:
start = 1 + writer.sheets[output_id].max_row
table.to_excel(writer, sheet_name=output_id, startrow=start)
[docs] def write_checksums(self, path: str) -> None:
"""Write checksums for each file to checksums folder.
Parameters
----------
path : str
Name of a folder to save outputs.
"""
if os.path.exists(path):
checksums: dict[str, str] = {}
for name in os.listdir(path):
filename = os.path.join(path, name)
if os.path.isfile(filename):
with open(filename, "rb") as file:
read = file.read()
checksums[name] = hashlib.sha256(read).hexdigest()
checksums_dir: str = os.path.normpath(f"{path}/checksums")
os.makedirs(checksums_dir, exist_ok=True)
for name, sha256 in checksums.items():
filename = os.path.join(checksums_dir, name + ".txt")
with open(filename, "w", encoding="utf-8") as file:
file.write(sha256)
else:
logger.debug("There is no file to do the checksums") # pragma: no cover
[docs]def load_records(path: str) -> Records:
"""Load outputs from a JSON file.
Parameters
----------
path : str
Name of an output folder containing results.json.
Returns
-------
Records
The loaded records.
"""
records = Records()
filename = os.path.normpath(f"{path}/results.json")
with open(filename, newline="", encoding="utf-8") as handle:
data = json.load(handle)
if data["version"] != __version__: # pragma: no cover
raise ValueError("error loading output")
for key, val in data["results"].items():
files: list[dict] = val["files"]
filenames: list = []
sdcs: list = []
for file in files:
filenames.append(file["name"])
sdcs.append(file["sdc"])
records.results[key] = Record(
uid=val["uid"],
status=val["status"],
output_type=val["type"],
properties=val["properties"],
sdc=sdcs[0],
command=val["command"],
summary=val["summary"],
outcome=load_outcome(val["outcome"]),
output=load_output(path, filenames),
comments=val["comments"],
)
records.results[key].exception = val["exception"]
records.results[key].timestamp = val["timestamp"]
return records