Module minder_utils.dataloader

Expand source code
from .dataloader import *
from .partial_order_loader import Partial_Order_Loader
from .simclr_loader import *
from .load_saved import process_data

__all__ = ['Dataloader', 'Partial_Order_Loader', 'create_labelled_loader',
           'create_unlabelled_loader', 'process_data']

Sub-modules

minder_utils.dataloader.dataloader
minder_utils.dataloader.load_saved
minder_utils.dataloader.partial_order_loader
minder_utils.dataloader.simclr_loader

Functions

def create_labelled_loader(X, y, batch_size=10, normalise_data=True, shuffle=True, seed=0, split=True, augmentation=False)

Create a dataloader for labelled data Parameters


X : numpy array, data
 
y : numpy array, label
 
batch_size
 
normalise_data
 
shuffle
 
seed
 
split
 
augmentation : augment data or not
 

Returns torch dataloader

Expand source code
def create_labelled_loader(X, y, batch_size=10, normalise_data=True, shuffle=True, seed=0, split=True, augmentation=False):
    '''
    Create a dataloader for labelled data
    Parameters
    ----------
    X: numpy array, data
    y: numpy array, label
    batch_size
    normalise_data
    shuffle
    seed
    split
    augmentation: augment data or not

    Returns torch dataloader
    -------

    '''
    transformers = DataTransform(augmentation_transformers()) if augmentation else None
    if split:
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=seed, stratify=y)
        train_dataset = CustomTensorDataset([torch.Tensor(X_train), torch.tensor(y_train)], transformers, normalise_data)
        train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=shuffle)
        test_dataset = CustomTensorDataset([torch.Tensor(X_test), torch.tensor(y_test)], transformers, normalise_data)
        test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=shuffle)
        return train_dataloader, test_dataloader
    else:
        train_dataset = CustomTensorDataset([torch.Tensor(X), torch.tensor(y)], transformers, normalise_data)
        train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=shuffle)
        return train_dataloader
def create_unlabelled_loader(X, batch_size=10, shuffle=True, augmentation=False, normalise_data=True)

Create a dataloader for unlabelled data, note this function will label every datapoint with one. Parameters


X : unlabelled data
 
batch_size
 
shuffle
 
augmentation
 
normalise_data : normalise the data or not
 

Returns torch dataloader

Expand source code
def create_unlabelled_loader(X, batch_size=10, shuffle=True, augmentation=False,
                             normalise_data=True):
    '''
    Create a dataloader for unlabelled data, note this function will label every datapoint
    with one.
    Parameters
    ----------
    X: unlabelled data
    batch_size
    shuffle
    augmentation
    normalise_data: normalise the data or not

    Returns torch dataloader
    -------

    '''
    transformers = DataTransform(augmentation_transformers()) if augmentation else None
    train_dataset = CustomTensorDataset([torch.Tensor(X), torch.ones(X.shape[0])], transformers,
                                        normalise_data=normalise_data)
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=shuffle, drop_last=True)
    return train_dataloader
def process_data(data, unlabelled_data, num_days_extended=0, flatten=False)

This function is to process the data from dataloader and make it easy for the models to use Parameters


data : dict, dictionary contains activity, p_ids, uti_labels
 
unlabelled_data : numpy array, unlabelled data
 

num_days_extended: int, How many consecutive days you want to extend the labelled data. The data will be extended by 2 * num_days_extended (n days before and after) flatten: bool, flatten the activity data or not.

Returns list contains unlabelled_data, X, y, p_ids

Expand source code
def process_data(data, unlabelled_data, num_days_extended=0, flatten=False):
    '''
    This function is to process the data from dataloader and make it easy for
    the models to use
    Parameters
    ----------
    data: dict, dictionary contains activity, p_ids, uti_labels
    unlabelled_data: numpy array, unlabelled data
    num_days_extended: int, How many consecutive days you want to extend the labelled data.
        The data will be extended by 2 * num_days_extended (n days before and after)
    flatten: bool, flatten the activity data or not.

    Returns list contains unlabelled_data, X, y, p_ids
    -------

    '''
    X, y, p_ids, dates = data['activity'], data['uti_labels'], data['p_ids'], data['dates']
    X_truncated = []
    y_truncated = []
    dates_truncated = []
    p_ids_truncated = []
    for idx in range(len(X)):
        truncated_len = 1 + num_days_extended * 2
        if len(X[idx]) >= truncated_len:
            X_truncated.append(X[idx][: truncated_len])
            y_truncated.append(y[idx][: truncated_len])
            p_ids_truncated.append(p_ids[idx][: truncated_len])
            dates_truncated.append(dates[idx])
    X, y, dates, p_ids = np.array(X_truncated), np.array(y_truncated), np.array(dates_truncated), np.array(p_ids_truncated)

    if flatten:
        X = X.reshape(-1, 3, 8, 14)
        y = y.reshape(-1, )
        y[y > 0] = 1
        y[y < 0] = -1
    else:
        X = X.reshape(X.shape[0], X.shape[1], 3, 8, 14)

    unlabelled_data = unlabelled_data.reshape(unlabelled_data.shape[0], 3, 8, 14)
    return unlabelled_data, X, y, dates, p_ids

Classes

class Dataloader (activity, physiological=None, environmental=None, max_days=3, label_data=False)

Categorise the data into labelled & unlabelled data. This dataloader should be used combined with minder_utils.formatting.Formatting.

After initialising formater = Formatting()

Parameters

  • activity: activity data, formater.activity
  • physiological: physiological data, formater.physiological
  • environmental: environmental data, formater.environmental
  • max_days: Default 3. How many consecutive days to extended as UTI, if max_days = n, n days before & after the validated date will be labelled as UTI
  • label_data: Default False. label the data or not. If False, get_labelled_data() cannot be used.
Expand source code
class Dataloader:
    """
    Categorise the data into labelled & unlabelled data.
    This dataloader should be used combined with minder_utils.formatting.Formatting.

    After initialising ```formater = Formatting()```
    Parameters:
        - activity: activity data, ```formater.activity```
        - physiological: physiological data, ```formater.physiological```
        - environmental: environmental data, ```formater.environmental```
        - max_days: Default 3. How many consecutive days to extended as UTI, if ```max_days = n```, ```n``` days before & after
            the validated date will be labelled as UTI
        - label_data: Default False. label the data or not. If False, ```get_labelled_data()``` cannot be used.
    """

    def __init__(self, activity, physiological=None, environmental=None, max_days=3, label_data=False):
        if activity is None:
            warnings.warn('Activity data is None, this class can be only used to load the processed data')
            return
        activity = pd.read_csv(activity) if type(activity) == str else activity
        shared_id = None
        for data in [activity, physiological, environmental]:
            if data is None:
                continue
            shared_id = set(data.id.unique()) if shared_id is None else shared_id.intersection(set(data.id.unique()))
        activity = activity[activity.id.isin(shared_id)]
        activity = standardise_activity_data(activity)
        activity.time = pd.to_datetime(activity.time)
        activity.loc[:, 'Date'] = activity.time.dt.date
        date_range = pd.date_range(activity.Date.min(), activity.Date.max())

        self.physiological = standardise_physiological_environmental(physiological, date_range, shared_id) \
            if physiological is not None else physiological
        self.environmental = standardise_physiological_environmental(environmental, date_range, shared_id) \
            if environmental is not None else environmental

        for datatype in ['environmental', 'physiological']:
            config[datatype]['sort_dict'] = dict(
                zip(config[datatype]['sensors'], range(len(config[datatype]['sensors']))))

        if label_data:
            activity = label_dataframe(activity)
            self.labelled_df = activity[~activity.valid.isna()]
            self.labelled_df.set_index(['id', 'valid', 'Date'], inplace=True)
            if len(self.labelled_df) > 0:
                self.true_p_ids = self.labelled_df.loc[:, True, :].index.get_level_values(0).unique()
                self.false_p_ids = self.labelled_df.loc[:, False, :].index.get_level_values(0).unique()
            else:
                print('no data is labelled')

        activity.set_index(['id', 'Date'], inplace=True)
        self.activity = activity
        self.max_days = max_days

        self.transfer_sensors = ['back door', 'bathroom1', 'bedroom1', 'dining room',
                                 'fridge door', 'front door', 'hallway', 'kettle', 'kitchen',
                                 'living room', 'lounge', 'microwave', 'study', 'toaster']
        self.select_sensors = config['activity']['sensors']

    def __len__(self):
        return int(len(self.labelled_df) / 24)

    @property
    @load_save(**config['labelled_data']['save'])
    def labelled_data(self):
        activity_data, physiological_data, environmental_data, patient_ids, uti_labels, labelled_dates = \
            self.get_labelled_data(normalise=False)
        return {
            'activity': activity_data,
            'phy': physiological_data,
            'env': environmental_data,
            'p_ids': patient_ids,
            'uti_labels': uti_labels,
            'dates': labelled_dates
        }

    @property
    @load_save(**config['unlabelled_data']['save'])
    def unlabelled_data(self):
        activity_data, physiological_data, environmental_data, patient_ids, dates = \
            self.get_unlabelled_data(normalise=False)
        return {
            'activity': activity_data,
            'phy': physiological_data,
            'env': environmental_data,
            'p_ids': patient_ids,
            'dates': dates
        }

    def get_labelled_data(self, normalise=False):
        # get p ids
        p_ids = self.labelled_df.index.get_level_values(0).unique()
        activity_data, uti_labels, patient_ids, physiological_data, environmental_data, labelled_dates = [], [], [], [], [], []
        for idx in range(len(p_ids)):
            # get data of patient
            data = self.labelled_df.loc[p_ids[idx]]
            for valid in data.index.get_level_values(0).unique():
                dates = data.loc[valid].index.get_level_values(0).unique()
                for date in dates:
                    # validated data
                    act_data, labels, patient, phy_data, env_data = [], [], [], [], []
                    p_date = date
                    p_data = data.loc[(valid, p_date), self.select_sensors].to_numpy()
                    if normalise:
                        p_data = normalized(np.array(p_data)).reshape(3, 8, -1)
                        # p_data = normalized(np.array(p_data))
                    act_data.append(p_data)
                    phy_data.append(self.get_data(self.physiological, p_ids[idx], p_date, 'physiological'))
                    env_data.append(self.get_data(self.environmental, p_ids[idx], p_date, 'environmental'))
                    labels.append(int(valid) if valid else -1)
                    patient.append(p_ids[idx])
                    labelled_dates.append(date)
                    for i in range(1, self.max_days + 1):
                        for symbol in [-1, 1]:
                            f_date = p_date - datetime.timedelta(i) * symbol
                            try:
                                p_data = self.activity.loc[(p_ids[idx], f_date), self.select_sensors].to_numpy()
                                if normalise:
                                    p_data = normalized(np.array(p_data)).reshape(3, 8, -1)
                                    # p_data = normalized(np.array(p_data), axis=-1).reshape(3, 8, -1)
                                act_data.append(p_data)
                                phy_data.append(self.get_data(self.physiological, p_ids[idx], f_date, 'physiological'))
                                env_data.append(self.get_data(self.environmental, p_ids[idx], f_date, 'environmental'))
                                labels.append(self.laplace_smooth(i) * symbol)
                                patient.append(p_ids[idx])
                            except KeyError:
                                break
                    activity_data.append(act_data)
                    uti_labels.append(labels)
                    patient_ids.append(patient)
                    physiological_data.append(phy_data)
                    environmental_data.append(env_data)

        activity_data = np.array(activity_data)
        uti_labels = np.array(uti_labels)
        patient_ids = np.array(patient_ids)
        physiological_data = np.array(physiological_data)
        environmental_data = np.array(environmental_data)
        labelled_dates = np.array(labelled_dates)

        return activity_data, physiological_data, environmental_data, patient_ids, uti_labels, labelled_dates

    def get_unlabelled_data(self, normalise=False, date='2021-03-01'):
        '''
        Get the unlabelled data,
        Parameters
        ----------
        normalise: bool, normalise the data or not
        date: str, only return the data later than the date provided. By default,
            it will not return the tihm unlabelled

        Returns activity, physiological, environmental data, patient ids, dates
        -------

        '''
        # May need to change the for loop to dataframe operations
        # df = self.activity.reset_index().set_index(['id', 'Date'])
        # phy_df = self.physiological.reset_index()
        # phy_df = phy_df.pivot_table(index=['id', 'time'], columns='location',
        #                             values='value').reset_index().rename(columns={'time': 'Date'})
        # indices = df.reset_index()[['id', 'Date']].drop_duplicates()
        # get p ids
        df = self.activity.reset_index().set_index(['id', 'Date'])
        if date is not None:
            df = df[df.index.get_level_values(1) > date]

        p_ids = df.index.get_level_values(0).unique()
        outputs = []
        phy_data, env_data = [], []
        outputs_p_ids = []
        outputs_dates = []
        for idx in range(len(p_ids)):
            # get data of patient
            data = df.loc[p_ids[idx]]
            dates = data.index.get_level_values(0).unique()
            for date in dates:
                # validated data
                p_data = data.loc[date, self.select_sensors].to_numpy()
                if normalise:
                    # p_data = torch.Tensor(np.array(p_data))
                    # p_data = F.normalize(p_data, p=2, dim=-1)
                    p_data = normalized(np.array(p_data)).reshape(3, 8, -1)
                    # p_data = np.array(p_data)
                    # p_data = normalize(p_data, axis=2)
                outputs.append(p_data)
                phy_data.append(self.get_data(self.physiological, p_ids[idx], date, 'physiological'))
                env_data.append(self.get_data(self.environmental, p_ids[idx], date, 'environmental'))
                outputs_p_ids.append(p_ids[idx])
                outputs_dates.append(date)
        return np.array(outputs), np.array(phy_data), np.array(env_data), \
               np.array(outputs_p_ids), None, np.array(outputs_dates)

    @staticmethod
    def laplace_smooth(i, lam=3, denominator=1):
        return np.exp(- np.abs(i) / lam) / denominator

    @staticmethod
    def get_data(df, p_id, date, datatype):
        if df is None:
            return
        try:
            return df.loc[(p_id, date, config[datatype]['sensors'])] \
                .sort_values('location', key=lambda x: x.map(config[datatype]['sort_dict']))['value'].to_numpy()
        except KeyError:
            return [0.] * len(config[datatype]['sensors'])

Static methods

def get_data(df, p_id, date, datatype)
Expand source code
@staticmethod
def get_data(df, p_id, date, datatype):
    if df is None:
        return
    try:
        return df.loc[(p_id, date, config[datatype]['sensors'])] \
            .sort_values('location', key=lambda x: x.map(config[datatype]['sort_dict']))['value'].to_numpy()
    except KeyError:
        return [0.] * len(config[datatype]['sensors'])
def laplace_smooth(i, lam=3, denominator=1)
Expand source code
@staticmethod
def laplace_smooth(i, lam=3, denominator=1):
    return np.exp(- np.abs(i) / lam) / denominator

Instance variables

var labelled_data
Expand source code
@property
@load_save(**config['labelled_data']['save'])
def labelled_data(self):
    activity_data, physiological_data, environmental_data, patient_ids, uti_labels, labelled_dates = \
        self.get_labelled_data(normalise=False)
    return {
        'activity': activity_data,
        'phy': physiological_data,
        'env': environmental_data,
        'p_ids': patient_ids,
        'uti_labels': uti_labels,
        'dates': labelled_dates
    }
var unlabelled_data
Expand source code
@property
@load_save(**config['unlabelled_data']['save'])
def unlabelled_data(self):
    activity_data, physiological_data, environmental_data, patient_ids, dates = \
        self.get_unlabelled_data(normalise=False)
    return {
        'activity': activity_data,
        'phy': physiological_data,
        'env': environmental_data,
        'p_ids': patient_ids,
        'dates': dates
    }

Methods

def get_labelled_data(self, normalise=False)
Expand source code
def get_labelled_data(self, normalise=False):
    # get p ids
    p_ids = self.labelled_df.index.get_level_values(0).unique()
    activity_data, uti_labels, patient_ids, physiological_data, environmental_data, labelled_dates = [], [], [], [], [], []
    for idx in range(len(p_ids)):
        # get data of patient
        data = self.labelled_df.loc[p_ids[idx]]
        for valid in data.index.get_level_values(0).unique():
            dates = data.loc[valid].index.get_level_values(0).unique()
            for date in dates:
                # validated data
                act_data, labels, patient, phy_data, env_data = [], [], [], [], []
                p_date = date
                p_data = data.loc[(valid, p_date), self.select_sensors].to_numpy()
                if normalise:
                    p_data = normalized(np.array(p_data)).reshape(3, 8, -1)
                    # p_data = normalized(np.array(p_data))
                act_data.append(p_data)
                phy_data.append(self.get_data(self.physiological, p_ids[idx], p_date, 'physiological'))
                env_data.append(self.get_data(self.environmental, p_ids[idx], p_date, 'environmental'))
                labels.append(int(valid) if valid else -1)
                patient.append(p_ids[idx])
                labelled_dates.append(date)
                for i in range(1, self.max_days + 1):
                    for symbol in [-1, 1]:
                        f_date = p_date - datetime.timedelta(i) * symbol
                        try:
                            p_data = self.activity.loc[(p_ids[idx], f_date), self.select_sensors].to_numpy()
                            if normalise:
                                p_data = normalized(np.array(p_data)).reshape(3, 8, -1)
                                # p_data = normalized(np.array(p_data), axis=-1).reshape(3, 8, -1)
                            act_data.append(p_data)
                            phy_data.append(self.get_data(self.physiological, p_ids[idx], f_date, 'physiological'))
                            env_data.append(self.get_data(self.environmental, p_ids[idx], f_date, 'environmental'))
                            labels.append(self.laplace_smooth(i) * symbol)
                            patient.append(p_ids[idx])
                        except KeyError:
                            break
                activity_data.append(act_data)
                uti_labels.append(labels)
                patient_ids.append(patient)
                physiological_data.append(phy_data)
                environmental_data.append(env_data)

    activity_data = np.array(activity_data)
    uti_labels = np.array(uti_labels)
    patient_ids = np.array(patient_ids)
    physiological_data = np.array(physiological_data)
    environmental_data = np.array(environmental_data)
    labelled_dates = np.array(labelled_dates)

    return activity_data, physiological_data, environmental_data, patient_ids, uti_labels, labelled_dates
def get_unlabelled_data(self, normalise=False, date='2021-03-01')

Get the unlabelled data, Parameters


normalise : bool, normalise the data or not
 
date : str, only return the data later than the date provided. By default,
it will not return the tihm unlabelled

Returns activity, physiological, environmental data, patient ids, dates

Expand source code
def get_unlabelled_data(self, normalise=False, date='2021-03-01'):
    '''
    Get the unlabelled data,
    Parameters
    ----------
    normalise: bool, normalise the data or not
    date: str, only return the data later than the date provided. By default,
        it will not return the tihm unlabelled

    Returns activity, physiological, environmental data, patient ids, dates
    -------

    '''
    # May need to change the for loop to dataframe operations
    # df = self.activity.reset_index().set_index(['id', 'Date'])
    # phy_df = self.physiological.reset_index()
    # phy_df = phy_df.pivot_table(index=['id', 'time'], columns='location',
    #                             values='value').reset_index().rename(columns={'time': 'Date'})
    # indices = df.reset_index()[['id', 'Date']].drop_duplicates()
    # get p ids
    df = self.activity.reset_index().set_index(['id', 'Date'])
    if date is not None:
        df = df[df.index.get_level_values(1) > date]

    p_ids = df.index.get_level_values(0).unique()
    outputs = []
    phy_data, env_data = [], []
    outputs_p_ids = []
    outputs_dates = []
    for idx in range(len(p_ids)):
        # get data of patient
        data = df.loc[p_ids[idx]]
        dates = data.index.get_level_values(0).unique()
        for date in dates:
            # validated data
            p_data = data.loc[date, self.select_sensors].to_numpy()
            if normalise:
                # p_data = torch.Tensor(np.array(p_data))
                # p_data = F.normalize(p_data, p=2, dim=-1)
                p_data = normalized(np.array(p_data)).reshape(3, 8, -1)
                # p_data = np.array(p_data)
                # p_data = normalize(p_data, axis=2)
            outputs.append(p_data)
            phy_data.append(self.get_data(self.physiological, p_ids[idx], date, 'physiological'))
            env_data.append(self.get_data(self.environmental, p_ids[idx], date, 'environmental'))
            outputs_p_ids.append(p_ids[idx])
            outputs_dates.append(date)
    return np.array(outputs), np.array(phy_data), np.array(env_data), \
           np.array(outputs_p_ids), None, np.array(outputs_dates)
class Partial_Order_Loader (data, y=None, shuffle=True, augmented_day=3, max_iter=None, normalise=True)
Expand source code
class Partial_Order_Loader:
    def __init__(self, data, y=None, shuffle=True, augmented_day=3, max_iter=None, normalise=True):
        self.data = data
        self.y = y
        self.augmented_day = augmented_day
        self.max_iter = max_iter
        self.iter_count = 0
        self.shuffle = shuffle
        self.normalise = normalise

    def normalisation(self, data):
        if self.normalise:
            data = torch.Tensor(data)
            return F.normalize(data.view(24, -1), dim=0).view(data.size()).detach().numpy()
        return data

    def __iter__(self):
        return self

    def __next__(self):
        if self.iter_count >= len(self):
            raise StopIteration
        if self.y is not None:
            data_idx = np.random.choice(len(self.data)) if self.shuffle else self.iter_count
            p_data = self.data[data_idx]
            p_label = self.y[data_idx]
            anchor = self.normalisation(p_data[np.isin(p_label, [-1, 1])])

            pre_idx = np.sort(p_label[p_label < 0])
            post_idx = np.sort(p_label[(p_label > 0) & (p_label != 1)])[::-1]
            pre_anchor = []
            post_anchor = []
            for i in range(self.augmented_day):
                pre_anchor.append(self.normalisation(p_data[p_label == pre_idx[i]]))
                post_anchor.append(self.normalisation(p_data[p_label == post_idx[i]]))
            # pre_anchor = np.concatenate(pre_anchor)
            # post_anchor = np.concatenate(post_anchor)
            self.iter_count += 1
        else:
            data_idx = np.random.choice(len(self.data) - self.augmented_day * 2) + self.augmented_day
            p_data = self.data[data_idx]
            anchor = self.normalisation(p_data)

            pre_anchor = []
            post_anchor = []
            for i in range(1, self.augmented_day + 1):
                pre_anchor.append(self.normalisation(p_data[data_idx - i]))
                post_anchor.append(self.normalisation(p_data[data_idx + i]))
            self.iter_count += 1

        pre_anchor, anchor, post_anchor = torch.Tensor(pre_anchor), torch.Tensor(anchor), torch.Tensor(post_anchor)

        return pre_anchor, anchor, post_anchor

    def __len__(self):
        if self.max_iter:
            return self.max_iter
        return len(self.data)

Methods

def normalisation(self, data)
Expand source code
def normalisation(self, data):
    if self.normalise:
        data = torch.Tensor(data)
        return F.normalize(data.view(24, -1), dim=0).view(data.size()).detach().numpy()
    return data