Source code for sacroml.preprocessing.loaders

"""Handlers to pull in datasets and perform preprocessing."""

# pylint: disable=consider-using-with, too-many-return-statements

import logging
import os
from zipfile import BadZipFile, ZipFile

import numpy as np
import pandas as pd
import pylab as plt
from sklearn.datasets import fetch_openml, load_iris
from sklearn.preprocessing import LabelEncoder, OneHotEncoder

logging.basicConfig(level="DEBUG")

logger = logging.getLogger(__file__)


PROJECT_ROOT_FOLDER = os.path.dirname(os.path.dirname(__file__))
logger.info("ROOT PROJECT FOLDER = %s", PROJECT_ROOT_FOLDER)


[docs] class UnknownDataset(Exception): """Exception raised if the user passes a name that we don't recognise."""
[docs] class DataNotAvailable(Exception): """Exception raised if the user asks for a dataset that they do not have."""
[docs] def get_data_sklearn( # pylint: disable = too-many-branches dataset_name: str, data_folder: str = os.path.join(PROJECT_ROOT_FOLDER, "data") ) -> tuple[pd.DataFrame, pd.DataFrame]: """Get data in a format sensible for sklearn. User passes a name and that dataset is returned as a tuple of pandas DataFrames (data, labels). Parameters ---------- dataset_name : str The name of the dataset to load data_folder : str The name of the local folder in which data is stored. Returns ------- X : pd.DataFrame The input dataframe -- rows are examples, columns variables y : pd.DataFrame the target dataframe -- has a single column containing the target values Notes ----- The following datasets are available: mimic2-iaccd (requires data download) in-hospital-mortality (requires data download) medical-mnist-ab-v-br-100 (requires data download) medical-mnist-ab-v-br-500 (requires data download) medical-mnist-all-100 (requires data download) indian liver (requires data download) synth-ae (requires data download) synth-ae-small (requires data download) nursery (downloads automatically) iris (available out of the box via sklearn) Datasets can be normalised by adding the following prefixes: standard: standardises all columns to have zero mean and unit variance. minmax: standardises all columns to have values between 0 and 1. round: rounds continues features to have 3dp These can be nested. Examples -------- .. code-block:: python # pull the mimic2-iaccd data X, y = get_data_sklearn("mimic2-iaccd") # pull the iris data and round continuous features X, y = get_data_sklearn("minmax iris") """ logger.info("DATASET FOLDER = %s", data_folder) if dataset_name.startswith("standard"): sub_name = dataset_name.split("standard")[1].strip() feature_df, target_df = get_data_sklearn(sub_name, data_folder) for column in feature_df.columns: col_mean = feature_df[column].mean() col_std = np.sqrt(feature_df[column].var()) feature_df[column] = feature_df[column] - col_mean feature_df[column] = feature_df[column] / col_std return feature_df, target_df if dataset_name.startswith("minmax"): sub_name = dataset_name.split("minmax")[1].strip() feature_df, target_df = get_data_sklearn(sub_name, data_folder) for column in feature_df.columns: col_min = feature_df[column].min() col_range = feature_df[column].max() - col_min feature_df[column] = feature_df[column] - col_min feature_df[column] = feature_df[column] / col_range return feature_df, target_df if dataset_name.startswith("round"): sub_name = dataset_name.split("round")[1].strip() logger.debug(sub_name) feature_df, target_df = get_data_sklearn(sub_name, data_folder) column_dtype = feature_df.dtypes for i, column in enumerate(feature_df.columns): if column_dtype[i] == "float64": feature_df[column] = feature_df[column].round(decimals=3) return feature_df, target_df if dataset_name == "mimic2-iaccd": return _mimic_iaccd(data_folder) if dataset_name == "in-hospital-mortality": return _in_hospital_mortality(data_folder) if dataset_name == "medical-mnist-ab-v-br-100": return _medical_mnist_loader(data_folder, 100, ["AbdomenCT", "BreastMRI"]) if dataset_name == "medical-mnist-ab-v-br-500": return _medical_mnist_loader(data_folder, 500, ["AbdomenCT", "BreastMRI"]) if dataset_name == "medical-mnist-all-100": return _medical_mnist_loader( data_folder, 100, ["AbdomenCT", "BreastMRI", "CXR", "ChestCT", "Hand", "HeadCT"], ) if dataset_name == "indian liver": return _indian_liver(data_folder) if dataset_name == "synth-ae": return _synth_ae(data_folder) if dataset_name == "synth-ae-small": return _synth_ae(data_folder, 200) if dataset_name == "synth-ae-large": return _synth_ae(data_folder, 500000) if dataset_name == "synth-ae-extra-large": return _synth_ae(data_folder, 2000000) if dataset_name == "synth-ae-XXL": return _synth_ae(data_folder, 50000000) if dataset_name == "nursery": return _nursery() if dataset_name == "iris": return _iris() if dataset_name == "RDMP": return _rdmp(data_folder) raise UnknownDataset(dataset_name)
def _iris() -> tuple[pd.DataFrame, pd.DataFrame]: """Get the Sklearn iris data - just first two classes.""" X, y = load_iris(return_X_y=True, as_frame=True) X = X[y < 2] y = y[y < 2] return X, pd.DataFrame(y) def _nursery() -> tuple[pd.DataFrame, pd.DataFrame]: """Return the sklearn nursery dataset.""" data = fetch_openml(data_id=26, as_frame=True) target_encoder = LabelEncoder() target_vals = target_encoder.fit_transform(data["target"].values) target_dataframe = pd.DataFrame({"target": target_vals}) feature_encoder = OneHotEncoder() x_encoded = feature_encoder.fit_transform(data["data"]).toarray() feature_dataframe = pd.DataFrame( x_encoded, columns=feature_encoder.get_feature_names_out() ) return feature_dataframe, target_dataframe def _images_to_ndarray( images_dir: str, number_to_load: int, label: int, flatten: bool = True ) -> tuple[np.array, np.array]: """Get number_to_load images from the images_dir and create arrays. Patched to support non-flattened images. Same behaviour as before except if called with flatten=False explicitly. """ folder_path = images_dir + os.sep images_names = sorted(os.listdir(folder_path)) images_names = images_names[:number_to_load] # fix f or macosx if ".DS_Store" in images_names: images_names.remove(".DS_Store") if flatten: np_images = np.array( [plt.imread(folder_path + img).flatten() for img in images_names] ) else: np_images = np.array([plt.imread(folder_path + img) for img in images_names]) labels = np.ones((len(np_images), 1), int) * label return (np_images, labels) def _medical_mnist_loader( # pylint: disable = too-many-locals data_folder: str, n_per_class: int, classes: list[str] ) -> tuple[pd.DataFrame, pd.DataFrame]: """Get Medical MNIST into pandas format. Borrows heavily from: https://www.kaggle.com/harelshattenstein/medical-mnist-knn Creates a binary classification. """ base_folder = os.path.join( data_folder, "kaggle-medical-mnist", "archive", ) zip_file = os.path.join(data_folder, "kaggle-medical-mnist", "archive.zip") logger.debug(base_folder, data_folder) if not any([os.path.exists(base_folder), os.path.exists(zip_file)]): help_message = f""" Data file {base_folder} does not exist. Please download fhe file from: https://www.kaggle.com/andrewmvd/medical-mnist and place it in the correct folder. It unzips the file first. """ raise DataNotAvailable(help_message) if os.path.exists(base_folder): pass elif os.path.exists(zip_file): try: with ZipFile(zip_file) as zip_handle: zip_handle.extractall(base_folder) logger.debug("Extracted all") except BadZipFile: logger.error("Encountered bad zip file") raise labels_dict = { 0: "AbdomenCT", 1: "BreastMRI", 2: "CXR", 3: "ChestCT", 4: "Hand", 5: "HeadCT", } reverse_labels_dict = {v: k for k, v in labels_dict.items()} for i, class_name in enumerate(classes): label = reverse_labels_dict[class_name] x_images, y_images = _images_to_ndarray( os.path.join(base_folder, class_name), n_per_class, label ) if i == 0: all_x = x_images all_y = y_images else: all_x = np.vstack((all_x, x_images)) all_y = np.vstack((all_y, y_images)) return (pd.DataFrame(all_x), pd.DataFrame(all_y)) def _synth_ae( data_folder: str, n_rows: int = 5000 ) -> tuple[pd.DataFrame, pd.DataFrame]: """Get synth ae data. First norws (default 5000) rows from the Synthetic A&E data from NHS England https://data.england.nhs.uk/dataset/a-e-synthetic-data/resource/81b068e5-6501-4840-a880-a8e7aa56890e # pylint: disable=line-too-long. """ file_path = os.path.join(data_folder, "AE_England_synthetic.csv") if not os.path.exists(file_path): help_message = f""" Data file {file_path} does not exist. Please download the file from: https://data.england.nhs.uk/dataset/a-e-synthetic-data/resource/81b068e5-6501-4840-a880-a8e7aa56890e Alternatively, download the file directly from the following URL: https://nhsengland-direct-uploads.s3-eu-west-1.amazonaws.com/A%26E+Synthetic+Data.7z Unzip it (7z) and then copy the .csv file into your data folder. """ raise DataNotAvailable(help_message) input_data = pd.read_csv(file_path, nrows=n_rows) columns_to_drop = [ "AE_Arrive_Date", "AE_Arrive_HourOfDay", "Admission_Method", "ICD10_Chapter_Code", "Treatment_Function_Code", "Length_Of_Stay_Days", "ProvID", ] input_data.drop(columns_to_drop, axis=1, inplace=True) # Remove any rows with NAs in the remaining columns input_data.dropna(axis=0, inplace=True) # One-hot encode some columns encode_columns = ["Age_Band", "AE_HRG"] encode_data = input_data[encode_columns] input_data.drop(encode_columns, axis=1, inplace=True) oh = OneHotEncoder() oh.fit(encode_data) onehot_df = pd.DataFrame( oh.transform(encode_data).toarray(), columns=oh.get_feature_names_out(), index=input_data.index, ) input_data = pd.concat([input_data, onehot_df], axis=1) X = input_data.drop(["Admitted_Flag"], axis=1) y = input_data[["Admitted_Flag"]] return (X, y) def _indian_liver(data_folder: str) -> tuple[pd.DataFrame, pd.DataFrame]: """Get Indian Liver Patient Dataset. https://archive.ics.uci.edu/ml/machine-learning-databases/00225/Indian%20Liver%20Patient%20Dataset%20(ILPD).csv # pylint: disable=line-too-long. """ file_path = os.path.join(data_folder, "Indian Liver Patient Dataset (ILPD).csv") if not os.path.exists(file_path): help_message = f""" Data file {file_path} does not exist. Please download fhe file from: https://archive.ics.uci.edu/ml/datasets/ILPD+(Indian+Liver+Patient+Dataset and place it in the correct folder. """ raise DataNotAvailable(help_message) column_names = [ "age", "gender", "total Bilirubin", "direct Bilirubin", "Alkphos", "SGPT", "SGOT", "total proteins", "albumin", "A/G ratio", "class", ] liver_data = pd.read_csv(file_path, names=column_names, index_col=False) liver_data.gender.replace("Male", 0, inplace=True) liver_data.gender.replace("Female", 1, inplace=True) liver_data.dropna(axis=0, inplace=True) liver_labels = liver_data["class"] liver_data.drop(["class"], axis=1, inplace=True) label_encoder = LabelEncoder() encoded_labels = label_encoder.fit_transform(liver_labels.values) liver_labels = pd.DataFrame({"class": encoded_labels}) return (liver_data, liver_labels) def _in_hospital_mortality(data_folder: str) -> tuple[pd.DataFrame, pd.DataFrame]: """Get In-hospital mortality data. See: https://datadryad.org/stash/dataset/doi:10.5061/dryad.0p2ngf1zd. """ # Check the data has been downloaded. # If not, throw an exception with instructions on how to # download, and where to store files = ["data01.csv", "doi_10.5061_dryad.0p2ngf1zd__v5.zip"] file_path = [os.path.join(data_folder, f) for f in files] print(file_path) if not any(os.path.exists(fp) for fp in file_path): help_message = f""" Data file {file_path[0]} or {file_path[1]} does not exist. Please download the file from: https://datadryad.org/stash/dataset/doi:10.5061/dryad.0p2ngf1zd and place it in the correct folder. It works with either the zip file or uncompressed. Alternatively download the data file from this URL: https://datadryad.org/stash/downloads/file_stream/773992 and then change the name of the file 773992 to data01.csv. """ raise DataNotAvailable(help_message) if os.path.exists(file_path[1]): input_data = pd.read_csv(ZipFile(file_path[1]).open("data01.csv")) else: input_data = pd.read_csv(file_path[0]) clean_data = input_data.dropna(axis=0, how="any").drop(columns=["group", "ID"]) target = "outcome" labels = clean_data[target] features = clean_data.drop([target], axis=1) label_encoder = LabelEncoder() encoded_labels = label_encoder.fit_transform(labels.values) labels = pd.DataFrame({"outcome": encoded_labels}) return (features, labels) def _mimic_iaccd(data_folder: str) -> tuple[pd.DataFrame, pd.DataFrame]: """Get the mimic_iaccd data and perform preprocessing.""" # Check the data has been downloaded. # If not throw an exception with instructions on how to # download, and where to store file_path = os.path.join(data_folder, "mimic2-iaccd", "1.0", "full_cohort_data.csv") print(file_path, os.path.exists(file_path)) if not os.path.exists(file_path): help_message = f""" The MIMIC2-iaccd data is not available in {data_folder}. The following file should exist: {file_path}. Please download from https://physionet.org/files/mimic2-iaccd/1.0/full_cohort_data.csv?download and rename the file to full_cohort_data.csv. """ raise DataNotAvailable(help_message) # File exists, load and preprocess# logger.info("Loading mimic2-iaccd") input_data = pd.read_csv(file_path) logger.info("Preprocessing") # remove columns non-numerical and repetitive or uninformative data for the analysis col = [ "service_unit", "day_icu_intime", "hosp_exp_flg", "icu_exp_flg", "day_28_flg", ] # service_num is the numerical version of service_unit # day_icu_intime_num is the numerical version of day_icu_intime # the other columns are to do with death and are somewhat repetitive with censor_flg input_data.drop(col, axis=1, inplace=True) # drop columns with only 1 value input_data.drop("sepsis_flg", axis=1, inplace=True) # drop NA by row input_data.dropna(axis=0, inplace=True) # extract target target = "censor_flg" y = input_data[target] X = input_data.drop([target, "mort_day_censored"], axis=1) label_encoder = LabelEncoder() encoded_labels = label_encoder.fit_transform(y.values) y = pd.DataFrame({"censor_flag": encoded_labels}) return (X, y) def _rdmp( # pylint: disable=too-many-locals, too-many-statements data_folder: str, ) -> tuple[pd.DataFrame, pd.DataFrame]: """Get the RDMP dataset.""" def find_age(row: pd.Series) -> int: date_ = pd.to_datetime("01/06/2020") if row.date_of_death != row.date_of_death: age = np.floor((date_ - row.date_of_birth).days / 365.25) else: age = np.floor((row.date_of_death - row.date_of_birth).days / 365.25) return age def hospital_days(row: pd.Series) -> int: if row.DischargeDate == row.DischargeDate: t = row.DischargeDate - row.AdmissionDate days = t.days + round(((t.seconds / 60) / 60) / 24) else: days = 0 return days processed_data_file = "rdmp_binary.csv" os.path.join(data_folder, "RDMP") if os.path.exists(os.path.join(data_folder, "RDMP", processed_data_file)): logger.info("Loading processed RDMP file.") # load data processed csv file df = pd.read_csv(os.path.join(data_folder, "RDMP", processed_data_file)) else: logger.info("Processing RDMP synthetic data") file_list = [ "CarotidArteryScan.csv", "Demography.csv", "HospitalAdmissions.csv", ] files_path = [os.path.join(data_folder, "RDMP", f) for f in file_list] found = [os.path.exists(file_path) for file_path in files_path] not_found = [ file_path for file_path in files_path if not os.path.exists(file_path) ] if not all(found): help_message = f""" Some or all data files do not exist. Please download the files from RDMP, and place it in the correct folder. Missing files are: {not_found} """ raise DataNotAvailable(help_message) # Load data headers0 = [ "R_CC_STEN_A", "R_CC_STEN_B", "R_CC_STEN_C", "R_CC_STEN_D", "R_CC_STEN_S", "L_IC_STEN_A", "L_IC_STEN_B", "L_IC_STEN_C", "L_IC_STEN_D", "L_IC_STEN_S", "R_IC_STEN_A", "R_IC_STEN_B", "R_IC_STEN_C", "R_IC_STEN_D", "R_IC_STEN_S", "PatientID", "L_CC_STEN_S", "L_CC_STEN_D", "L_CC_STEN_B", "L_BD_RATIO", "L_AC_RATIO", "R_BD_RATIO", "R_AC_RATIO", "L_CC_STENOSIS", "L_CC_PEAK_SYS", "L_CC_END_DIA", "L_IC_STENOSIS", "L_IC_PEAK_SYS", "L_IC_END_DIA", "L_EC_STENOSIS", "L_PLAQUE", "L_SYMPTOMS", "L_BRUIT", "L_CC_STEN_A", "ON_STEN_STUDY", "R_VERT_ARTERY", "R_BRUIT", "R_SYMPTOMS", "R_PLAQUE", "L_CC_STEN_C", "R_EC_STENOSIS", "R_IC_PEAK_SYS", "R_IC_STENOSIS", "R_CC_END_DIA", "R_CC_PEAK_SYS", "R_CC_STENOSIS", "L_VERT_ARTERY", "R_IC_END_DIA", ] headers1 = [ "chi", "sex", "current_address_L2", "date_of_death", "date_of_birth", ] headers2 = [ "chi", "AdmissionDate", "DischargeDate", "MainCondition", "OtherCondition1", "OtherCondition2", "OtherCondition3", "MainOperation", "MainOperationB", "OtherOperation1", "OtherOperation1B", "OtherOperation2", "OtherOperation2B", "OtherOperation3", "OtherOperation3B", ] # Process first file df = pd.read_csv(files_path[0], usecols=headers0, encoding="ISO 8859-1") # Change name to be the same in all files df.rename(columns={"PatientID": "chi"}, inplace=True) df = df.groupby(["chi"]).max() # Process second file df_ = pd.read_csv(files_path[1], usecols=headers1) df_["date_of_birth"] = pd.to_datetime(df_["date_of_birth"]) df_["date_of_death"] = pd.to_datetime(df_["date_of_death"]) df_ = df_.groupby(["chi"]).max() # Merge first and second file df = df.merge(df_, how="inner", on="chi", suffixes=(False, False)) del df_ # Process third file df__ = pd.read_csv(files_path[2], usecols=headers2, encoding="ISO 8859-1") df__["AdmissionDate"] = pd.to_datetime(df__["AdmissionDate"]) df__["DischargeDate"] = pd.to_datetime(df__["DischargeDate"]) df__["days_in_hospital"] = df__.apply(hospital_days, axis=1) number_stays = df__.groupby(["chi"]).count()["AdmissionDate"] dih = df__.groupby(["chi"])["days_in_hospital"].sum() nc = ( df__.groupby(["chi"])[[x for x in df__.columns if "Condition" in x]] .count() .mean(axis=1) ) no = ( df__.groupby(["chi"])[[x for x in df__.columns if "Operation" in x]] .count() .sum(axis=1) ) df__.drop( columns=[ x for x in df__.columns if "Date" in x or "Operation" in x or "Condition" in x ], inplace=True, ) df__ = pd.DataFrame() df__["days_in_hospital"] = dih df__["average_number_conditions"] = nc df__["total_number_operations"] = no df__["number_admissions"] = number_stays # merge the third file df = df.merge(df__, how="inner", on="chi", suffixes=(False, False)) # Final processing after merging df["death"] = [1 if x else 0 for x in pd.notna(df.date_of_death)] df["age"] = df.apply(find_age, axis=1).astype("int64") df.drop(columns=["date_of_birth", "date_of_death"], inplace=True) # save the dataframe df.to_csv(os.path.join(data_folder, "RDMP", processed_data_file)) labels = df["death"] df.drop(columns=["death"], inplace=True) # OneHotEncoder for col in df.columns: if df[col].dtypes in ("bool", "object"): encoder = LabelEncoder() df[col] = encoder.fit_transform(df[col].values) return (df, labels)