Module minder_utils.scripts.weekly_loader
Expand source code
from minder_utils.formatting.format_util import iter_dir
import os
import datetime as DT
from minder_utils.download.download import Downloader
from minder_utils.formatting.formatting import Formatting
from minder_utils.dataloader import Dataloader
import numpy as np
from minder_utils.util.util import save_mkdir, delete_dir
import json
from minder_utils.settings import dates_save, date_backup
import pandas as pd
from minder_utils.configurations import dates_path
from minder_utils.configurations import config
class Weekly_dataloader:
    """
    Support UTI only
    This class will
        - download all previous data if it have not been downloaded before
            - will be saved as labelled data and unlabelled data
        - download the latest weekly data
        - reformat all the data into following (N is the number of samples)
            - activity data: N * 3 * 8 * 20
            - environmental data: N * 19
            - physiological data: N * 12
    """
    def __init__(self, categories=None, save_dir=os.path.join('./data', 'weekly_test'), num_days_extended=3):
        '''
        @param data_type: activity, environmental, physiological
        @param num_days_extended: for uti only, how many consecutive days to be labelled
        '''
        self.default_categories = ['activity', 'environmental', 'physiological']
        self.categories = self.default_categories if categories is None else categories
        assert all(data_type in self.default_categories for data_type in self.categories), 'available categories: ' \
                                                                                           'activity, environmental, ' \
                                                                                           'physiological'
        self.num_days_extended = num_days_extended
        self.downloader = Downloader()
        self.default_dir = save_dir
        save_mkdir(self.default_dir)
    @property
    def previous_labelled_data(self):
        return os.path.join(self.default_dir, 'previous', 'npy', 'labelled')
    @property
    def previous_unlabelled_data(self):
        return os.path.join(self.default_dir, 'previous', 'npy', 'unlabelled')
    @property
    def current_data(self):
        return os.path.join(self.default_dir, 'current', 'npy', 'unlabelled')
    @property
    def current_csv_data(self):
        return os.path.join(self.default_dir, 'current', 'csv')
    @property
    def previous_csv_data(self):
        return os.path.join(self.default_dir, 'previous', 'csv')
    @property
    def gap_csv_data(self):
        return os.path.join(self.default_dir, 'gap', 'csv')
    def initialise(self):
        dates_save(refresh=True)
        for folder in ['current', 'previous']:
            delete_dir(os.path.join(self.default_dir, folder, 'csv'))
            save_mkdir(os.path.join(self.default_dir, folder, 'csv'))
            delete_dir(os.path.join(self.default_dir, folder, 'npy'))
            save_mkdir(os.path.join(self.default_dir, folder, 'npy'))
            self.download(folder, include_devices=True)
            self.format(folder)
    def check_exist(self, path):
        check_list = {
            '.csv': {'activity': ['raw_door_sensor', 'raw_appliance_use', 'raw_activity_pir', 'device_types']},
            '.npy': {'activity': {'current': ['unlabelled', 'patient_id', 'dates'],
                                  'previous': ['unlabelled', 'patient_id', 'dates', 'X', 'y']}},
        }
        folder_type = 'previous' if 'previous' in path else 'current'
        reformat_flag = False
        for data_type in ['activity']:
            # Check the csv file
            if not set([ele + '.csv' for ele in check_list['.csv'][data_type]]) \
                   <= set(iter_dir(os.path.join(path, 'csv'), '.csv', False)):
                print(data_type, folder_type, 'raw data does not exist, start to download')
                self.download(folder_type, data_type)
                reformat_flag = True
            else:
                print(data_type, folder_type, 'is already downloaded')
            # Check the npy file
            if not set([ele + '.npy' for ele in check_list['.npy'][data_type][folder_type]]) \
                   <= set(iter_dir(os.path.join(path, 'npy'), '.npy', False)) or reformat_flag:
                print('formatting the data: ', data_type, folder_type)
                self.format(folder_type)
            else:
                print(data_type, folder_type, 'has been processed')
    def download(self, period, include_devices=False):
        categories = []
        for data_type in self.categories:
            categories.extend(config[data_type]['type'].copy())
        if include_devices:
            categories.append('device_types')
        date_dict = self.get_dates()
        self.downloader.export(since=date_dict[period]['since'], until=date_dict[period]['until'], reload=True,
                               save_path=os.path.join(self.default_dir, 'previous' if period == 'gap' else period,
                                                      'csv/'),
                               categories=categories)
    def format(self, period):
        loader = Formatting(os.path.join(self.default_dir, period, 'csv'), add_tihm=period == 'previous')
        dataloader = Dataloader(loader.activity_data,
                                loader.physiological_data,
                                loader.environmental_data,
                                self.num_days_extended, period == 'previous')
        categories = ['labelled', 'unlabelled'] if period == 'previous' else ['unlabelled']
        for data_type in categories:
            save_path = os.path.join(self.default_dir, period, 'npy', data_type)
            save_mkdir(save_path)
            attr = 'get_{}_data'.format(data_type)
            activity_data, physiological_data, environmental_data, p_ids, labels, dates = getattr(dataloader, attr)()
            np.save(os.path.join(save_path, 'activity.npy'.format(data_type)), activity_data)
            np.save(os.path.join(save_path, 'physiological.npy'.format(data_type)), physiological_data)
            np.save(os.path.join(save_path, 'environmental.npy'.format(data_type)), environmental_data)
            np.save(os.path.join(save_path, 'patient_id.npy'), p_ids)
            if data_type == 'labelled':
                np.save(os.path.join(save_path, 'label.npy'), labels)
            np.save(os.path.join(save_path, 'dates.npy'), dates)
    def refresh(self, refresh_period=None):
        if refresh_period is None:
            refresh_period = ['current']
        try:
            date_dict = self.get_dates()
        except FileNotFoundError:
            print('Dates file does not exist, start to initialise')
            self.initialise()
            return
        if date_dict['current']['until'] == DT.date.today() - DT.timedelta(days=1):
            print('Data is up-to-date')
            return
        dates_save(refresh=False)
        date_dict = self.get_dates()
        try:
            if date_dict['gap']['until'] > date_dict['gap']['since']:
                self.download('gap')
            self.download('current')
        except TypeError:
            date_backup(True)
            return False
        self.collate()
        for folder in refresh_period:
            self.format(folder)
        return
    def collate(self):
        date_dict = self.get_dates()
        for filename in iter_dir(self.previous_csv_data, split=False):
            if filename not in ['device_types.csv', 'homes.csv', 'patients.csv']:
                previous_data = pd.read_csv(os.path.join(self.previous_csv_data, filename), index_col=0)
                current_data = pd.read_csv(os.path.join(self.current_csv_data, filename), index_col=0)
                current_data = current_data[current_data.start_date != 'start_date']
                previous_data = previous_data[previous_data.start_date != 'start_date']
                current_data.start_date = pd.to_datetime(current_data.start_date)
                current_mask = current_data.start_date.dt.date < date_dict['gap']['until']
                previous_data = pd.concat([previous_data, current_data[current_mask]])
                current_data = current_data[~current_mask]
                current_data.drop_duplicates().to_csv(os.path.join(self.current_csv_data, filename), index=False)
                previous_data.drop_duplicates().to_csv(os.path.join(self.previous_csv_data, filename), index=False)
        return
    @staticmethod
    def get_dates():
        '''
        This function returns the current dates saved in the configurations folder.
        This is an internal function.
        Returns
        ---------
        - dates: dict:
            This dictionary holds the state ('gap', 'current', etc) and the dates.
        '''
        with open(dates_path) as json_file:
            date_dict = json.load(json_file)
        for state in date_dict:
            for time in date_dict[state]:
                date_dict[state][time] = pd.to_datetime(date_dict[state][time])
        return date_dict
    @staticmethod
    def clean_df(path):
        '''
        Use to clean dataframe contains unnamed columns.
        Returns
        -------
        '''
        for filename in iter_dir(path, split=False):
            df = pd.read_csv(os.path.join(path, filename), index_col=0)
            df = df.loc[:, ~df.columns.str.contains('^Unnamed')]
            df.to_csv(os.path.join(path, filename), index=False)Classes
- class Weekly_dataloader (categories=None, save_dir='./data\\weekly_test', num_days_extended=3)
- 
Support UTI only This class will - download all previous data if it have not been downloaded before - will be saved as labelled data and unlabelled data - download the latest weekly data - reformat all the data into following (N is the number of samples) - activity data: N * 3 * 8 * 20 - environmental data: N * 19 - physiological data: N * 12 @param data_type: activity, environmental, physiological @param num_days_extended: for uti only, how many consecutive days to be labelled Expand source codeclass Weekly_dataloader: """ Support UTI only This class will - download all previous data if it have not been downloaded before - will be saved as labelled data and unlabelled data - download the latest weekly data - reformat all the data into following (N is the number of samples) - activity data: N * 3 * 8 * 20 - environmental data: N * 19 - physiological data: N * 12 """ def __init__(self, categories=None, save_dir=os.path.join('./data', 'weekly_test'), num_days_extended=3): ''' @param data_type: activity, environmental, physiological @param num_days_extended: for uti only, how many consecutive days to be labelled ''' self.default_categories = ['activity', 'environmental', 'physiological'] self.categories = self.default_categories if categories is None else categories assert all(data_type in self.default_categories for data_type in self.categories), 'available categories: ' \ 'activity, environmental, ' \ 'physiological' self.num_days_extended = num_days_extended self.downloader = Downloader() self.default_dir = save_dir save_mkdir(self.default_dir) @property def previous_labelled_data(self): return os.path.join(self.default_dir, 'previous', 'npy', 'labelled') @property def previous_unlabelled_data(self): return os.path.join(self.default_dir, 'previous', 'npy', 'unlabelled') @property def current_data(self): return os.path.join(self.default_dir, 'current', 'npy', 'unlabelled') @property def current_csv_data(self): return os.path.join(self.default_dir, 'current', 'csv') @property def previous_csv_data(self): return os.path.join(self.default_dir, 'previous', 'csv') @property def gap_csv_data(self): return os.path.join(self.default_dir, 'gap', 'csv') def initialise(self): dates_save(refresh=True) for folder in ['current', 'previous']: delete_dir(os.path.join(self.default_dir, folder, 'csv')) save_mkdir(os.path.join(self.default_dir, folder, 'csv')) delete_dir(os.path.join(self.default_dir, folder, 'npy')) save_mkdir(os.path.join(self.default_dir, folder, 'npy')) self.download(folder, include_devices=True) self.format(folder) def check_exist(self, path): check_list = { '.csv': {'activity': ['raw_door_sensor', 'raw_appliance_use', 'raw_activity_pir', 'device_types']}, '.npy': {'activity': {'current': ['unlabelled', 'patient_id', 'dates'], 'previous': ['unlabelled', 'patient_id', 'dates', 'X', 'y']}}, } folder_type = 'previous' if 'previous' in path else 'current' reformat_flag = False for data_type in ['activity']: # Check the csv file if not set([ele + '.csv' for ele in check_list['.csv'][data_type]]) \ <= set(iter_dir(os.path.join(path, 'csv'), '.csv', False)): print(data_type, folder_type, 'raw data does not exist, start to download') self.download(folder_type, data_type) reformat_flag = True else: print(data_type, folder_type, 'is already downloaded') # Check the npy file if not set([ele + '.npy' for ele in check_list['.npy'][data_type][folder_type]]) \ <= set(iter_dir(os.path.join(path, 'npy'), '.npy', False)) or reformat_flag: print('formatting the data: ', data_type, folder_type) self.format(folder_type) else: print(data_type, folder_type, 'has been processed') def download(self, period, include_devices=False): categories = [] for data_type in self.categories: categories.extend(config[data_type]['type'].copy()) if include_devices: categories.append('device_types') date_dict = self.get_dates() self.downloader.export(since=date_dict[period]['since'], until=date_dict[period]['until'], reload=True, save_path=os.path.join(self.default_dir, 'previous' if period == 'gap' else period, 'csv/'), categories=categories) def format(self, period): loader = Formatting(os.path.join(self.default_dir, period, 'csv'), add_tihm=period == 'previous') dataloader = Dataloader(loader.activity_data, loader.physiological_data, loader.environmental_data, self.num_days_extended, period == 'previous') categories = ['labelled', 'unlabelled'] if period == 'previous' else ['unlabelled'] for data_type in categories: save_path = os.path.join(self.default_dir, period, 'npy', data_type) save_mkdir(save_path) attr = 'get_{}_data'.format(data_type) activity_data, physiological_data, environmental_data, p_ids, labels, dates = getattr(dataloader, attr)() np.save(os.path.join(save_path, 'activity.npy'.format(data_type)), activity_data) np.save(os.path.join(save_path, 'physiological.npy'.format(data_type)), physiological_data) np.save(os.path.join(save_path, 'environmental.npy'.format(data_type)), environmental_data) np.save(os.path.join(save_path, 'patient_id.npy'), p_ids) if data_type == 'labelled': np.save(os.path.join(save_path, 'label.npy'), labels) np.save(os.path.join(save_path, 'dates.npy'), dates) def refresh(self, refresh_period=None): if refresh_period is None: refresh_period = ['current'] try: date_dict = self.get_dates() except FileNotFoundError: print('Dates file does not exist, start to initialise') self.initialise() return if date_dict['current']['until'] == DT.date.today() - DT.timedelta(days=1): print('Data is up-to-date') return dates_save(refresh=False) date_dict = self.get_dates() try: if date_dict['gap']['until'] > date_dict['gap']['since']: self.download('gap') self.download('current') except TypeError: date_backup(True) return False self.collate() for folder in refresh_period: self.format(folder) return def collate(self): date_dict = self.get_dates() for filename in iter_dir(self.previous_csv_data, split=False): if filename not in ['device_types.csv', 'homes.csv', 'patients.csv']: previous_data = pd.read_csv(os.path.join(self.previous_csv_data, filename), index_col=0) current_data = pd.read_csv(os.path.join(self.current_csv_data, filename), index_col=0) current_data = current_data[current_data.start_date != 'start_date'] previous_data = previous_data[previous_data.start_date != 'start_date'] current_data.start_date = pd.to_datetime(current_data.start_date) current_mask = current_data.start_date.dt.date < date_dict['gap']['until'] previous_data = pd.concat([previous_data, current_data[current_mask]]) current_data = current_data[~current_mask] current_data.drop_duplicates().to_csv(os.path.join(self.current_csv_data, filename), index=False) previous_data.drop_duplicates().to_csv(os.path.join(self.previous_csv_data, filename), index=False) return @staticmethod def get_dates(): ''' This function returns the current dates saved in the configurations folder. This is an internal function. Returns --------- - dates: dict: This dictionary holds the state ('gap', 'current', etc) and the dates. ''' with open(dates_path) as json_file: date_dict = json.load(json_file) for state in date_dict: for time in date_dict[state]: date_dict[state][time] = pd.to_datetime(date_dict[state][time]) return date_dict @staticmethod def clean_df(path): ''' Use to clean dataframe contains unnamed columns. Returns ------- ''' for filename in iter_dir(path, split=False): df = pd.read_csv(os.path.join(path, filename), index_col=0) df = df.loc[:, ~df.columns.str.contains('^Unnamed')] df.to_csv(os.path.join(path, filename), index=False)Static methods- def clean_df(path)
- 
Use to clean dataframe contains unnamed columns. Returns Expand source code@staticmethod def clean_df(path): ''' Use to clean dataframe contains unnamed columns. Returns ------- ''' for filename in iter_dir(path, split=False): df = pd.read_csv(os.path.join(path, filename), index_col=0) df = df.loc[:, ~df.columns.str.contains('^Unnamed')] df.to_csv(os.path.join(path, filename), index=False)
- def get_dates()
- 
This function returns the current dates saved in the configurations folder. This is an internal function. Returns- dates: dict: This dictionary holds the state ('gap', 'current', etc) and the dates.
 Expand source code@staticmethod def get_dates(): ''' This function returns the current dates saved in the configurations folder. This is an internal function. Returns --------- - dates: dict: This dictionary holds the state ('gap', 'current', etc) and the dates. ''' with open(dates_path) as json_file: date_dict = json.load(json_file) for state in date_dict: for time in date_dict[state]: date_dict[state][time] = pd.to_datetime(date_dict[state][time]) return date_dict
 Instance variables- var current_csv_data
- 
Expand source code@property def current_csv_data(self): return os.path.join(self.default_dir, 'current', 'csv')
- var current_data
- 
Expand source code@property def current_data(self): return os.path.join(self.default_dir, 'current', 'npy', 'unlabelled')
- var gap_csv_data
- 
Expand source code@property def gap_csv_data(self): return os.path.join(self.default_dir, 'gap', 'csv')
- var previous_csv_data
- 
Expand source code@property def previous_csv_data(self): return os.path.join(self.default_dir, 'previous', 'csv')
- var previous_labelled_data
- 
Expand source code@property def previous_labelled_data(self): return os.path.join(self.default_dir, 'previous', 'npy', 'labelled')
- var previous_unlabelled_data
- 
Expand source code@property def previous_unlabelled_data(self): return os.path.join(self.default_dir, 'previous', 'npy', 'unlabelled')
 Methods- def check_exist(self, path)
- 
Expand source codedef check_exist(self, path): check_list = { '.csv': {'activity': ['raw_door_sensor', 'raw_appliance_use', 'raw_activity_pir', 'device_types']}, '.npy': {'activity': {'current': ['unlabelled', 'patient_id', 'dates'], 'previous': ['unlabelled', 'patient_id', 'dates', 'X', 'y']}}, } folder_type = 'previous' if 'previous' in path else 'current' reformat_flag = False for data_type in ['activity']: # Check the csv file if not set([ele + '.csv' for ele in check_list['.csv'][data_type]]) \ <= set(iter_dir(os.path.join(path, 'csv'), '.csv', False)): print(data_type, folder_type, 'raw data does not exist, start to download') self.download(folder_type, data_type) reformat_flag = True else: print(data_type, folder_type, 'is already downloaded') # Check the npy file if not set([ele + '.npy' for ele in check_list['.npy'][data_type][folder_type]]) \ <= set(iter_dir(os.path.join(path, 'npy'), '.npy', False)) or reformat_flag: print('formatting the data: ', data_type, folder_type) self.format(folder_type) else: print(data_type, folder_type, 'has been processed')
- def collate(self)
- 
Expand source codedef collate(self): date_dict = self.get_dates() for filename in iter_dir(self.previous_csv_data, split=False): if filename not in ['device_types.csv', 'homes.csv', 'patients.csv']: previous_data = pd.read_csv(os.path.join(self.previous_csv_data, filename), index_col=0) current_data = pd.read_csv(os.path.join(self.current_csv_data, filename), index_col=0) current_data = current_data[current_data.start_date != 'start_date'] previous_data = previous_data[previous_data.start_date != 'start_date'] current_data.start_date = pd.to_datetime(current_data.start_date) current_mask = current_data.start_date.dt.date < date_dict['gap']['until'] previous_data = pd.concat([previous_data, current_data[current_mask]]) current_data = current_data[~current_mask] current_data.drop_duplicates().to_csv(os.path.join(self.current_csv_data, filename), index=False) previous_data.drop_duplicates().to_csv(os.path.join(self.previous_csv_data, filename), index=False) return
- def download(self, period, include_devices=False)
- 
Expand source codedef download(self, period, include_devices=False): categories = [] for data_type in self.categories: categories.extend(config[data_type]['type'].copy()) if include_devices: categories.append('device_types') date_dict = self.get_dates() self.downloader.export(since=date_dict[period]['since'], until=date_dict[period]['until'], reload=True, save_path=os.path.join(self.default_dir, 'previous' if period == 'gap' else period, 'csv/'), categories=categories)
- def format(self, period)
- 
Expand source codedef format(self, period): loader = Formatting(os.path.join(self.default_dir, period, 'csv'), add_tihm=period == 'previous') dataloader = Dataloader(loader.activity_data, loader.physiological_data, loader.environmental_data, self.num_days_extended, period == 'previous') categories = ['labelled', 'unlabelled'] if period == 'previous' else ['unlabelled'] for data_type in categories: save_path = os.path.join(self.default_dir, period, 'npy', data_type) save_mkdir(save_path) attr = 'get_{}_data'.format(data_type) activity_data, physiological_data, environmental_data, p_ids, labels, dates = getattr(dataloader, attr)() np.save(os.path.join(save_path, 'activity.npy'.format(data_type)), activity_data) np.save(os.path.join(save_path, 'physiological.npy'.format(data_type)), physiological_data) np.save(os.path.join(save_path, 'environmental.npy'.format(data_type)), environmental_data) np.save(os.path.join(save_path, 'patient_id.npy'), p_ids) if data_type == 'labelled': np.save(os.path.join(save_path, 'label.npy'), labels) np.save(os.path.join(save_path, 'dates.npy'), dates)
- def initialise(self)
- 
Expand source codedef initialise(self): dates_save(refresh=True) for folder in ['current', 'previous']: delete_dir(os.path.join(self.default_dir, folder, 'csv')) save_mkdir(os.path.join(self.default_dir, folder, 'csv')) delete_dir(os.path.join(self.default_dir, folder, 'npy')) save_mkdir(os.path.join(self.default_dir, folder, 'npy')) self.download(folder, include_devices=True) self.format(folder)
- def refresh(self, refresh_period=None)
- 
Expand source codedef refresh(self, refresh_period=None): if refresh_period is None: refresh_period = ['current'] try: date_dict = self.get_dates() except FileNotFoundError: print('Dates file does not exist, start to initialise') self.initialise() return if date_dict['current']['until'] == DT.date.today() - DT.timedelta(days=1): print('Data is up-to-date') return dates_save(refresh=False) date_dict = self.get_dates() try: if date_dict['gap']['until'] > date_dict['gap']['since']: self.download('gap') self.download('current') except TypeError: date_backup(True) return False self.collate() for folder in refresh_period: self.format(folder) return