Module minder_utils.scripts
Expand source code
from .weekly_loader import *
__all__ = ['Weekly_dataloader']
Sub-modules
minder_utils.scripts.feature_selection
minder_utils.scripts.weekly_loader
Classes
class Weekly_dataloader (categories=None, save_dir='./data\\weekly_test', num_days_extended=3)
-
Support UTI only This class will - download all previous data if it have not been downloaded before - will be saved as labelled data and unlabelled data - download the latest weekly data - reformat all the data into following (N is the number of samples) - activity data: N * 3 * 8 * 20 - environmental data: N * 19 - physiological data: N * 12
@param data_type: activity, environmental, physiological @param num_days_extended: for uti only, how many consecutive days to be labelled
Expand source code
class Weekly_dataloader: """ Support UTI only This class will - download all previous data if it have not been downloaded before - will be saved as labelled data and unlabelled data - download the latest weekly data - reformat all the data into following (N is the number of samples) - activity data: N * 3 * 8 * 20 - environmental data: N * 19 - physiological data: N * 12 """ def __init__(self, categories=None, save_dir=os.path.join('./data', 'weekly_test'), num_days_extended=3): ''' @param data_type: activity, environmental, physiological @param num_days_extended: for uti only, how many consecutive days to be labelled ''' self.default_categories = ['activity', 'environmental', 'physiological'] self.categories = self.default_categories if categories is None else categories assert all(data_type in self.default_categories for data_type in self.categories), 'available categories: ' \ 'activity, environmental, ' \ 'physiological' self.num_days_extended = num_days_extended self.downloader = Downloader() self.default_dir = save_dir save_mkdir(self.default_dir) @property def previous_labelled_data(self): return os.path.join(self.default_dir, 'previous', 'npy', 'labelled') @property def previous_unlabelled_data(self): return os.path.join(self.default_dir, 'previous', 'npy', 'unlabelled') @property def current_data(self): return os.path.join(self.default_dir, 'current', 'npy', 'unlabelled') @property def current_csv_data(self): return os.path.join(self.default_dir, 'current', 'csv') @property def previous_csv_data(self): return os.path.join(self.default_dir, 'previous', 'csv') @property def gap_csv_data(self): return os.path.join(self.default_dir, 'gap', 'csv') def initialise(self): dates_save(refresh=True) for folder in ['current', 'previous']: delete_dir(os.path.join(self.default_dir, folder, 'csv')) save_mkdir(os.path.join(self.default_dir, folder, 'csv')) delete_dir(os.path.join(self.default_dir, folder, 'npy')) save_mkdir(os.path.join(self.default_dir, folder, 'npy')) self.download(folder, include_devices=True) self.format(folder) def check_exist(self, path): check_list = { '.csv': {'activity': ['raw_door_sensor', 'raw_appliance_use', 'raw_activity_pir', 'device_types']}, '.npy': {'activity': {'current': ['unlabelled', 'patient_id', 'dates'], 'previous': ['unlabelled', 'patient_id', 'dates', 'X', 'y']}}, } folder_type = 'previous' if 'previous' in path else 'current' reformat_flag = False for data_type in ['activity']: # Check the csv file if not set([ele + '.csv' for ele in check_list['.csv'][data_type]]) \ <= set(iter_dir(os.path.join(path, 'csv'), '.csv', False)): print(data_type, folder_type, 'raw data does not exist, start to download') self.download(folder_type, data_type) reformat_flag = True else: print(data_type, folder_type, 'is already downloaded') # Check the npy file if not set([ele + '.npy' for ele in check_list['.npy'][data_type][folder_type]]) \ <= set(iter_dir(os.path.join(path, 'npy'), '.npy', False)) or reformat_flag: print('formatting the data: ', data_type, folder_type) self.format(folder_type) else: print(data_type, folder_type, 'has been processed') def download(self, period, include_devices=False): categories = [] for data_type in self.categories: categories.extend(config[data_type]['type'].copy()) if include_devices: categories.append('device_types') date_dict = self.get_dates() self.downloader.export(since=date_dict[period]['since'], until=date_dict[period]['until'], reload=True, save_path=os.path.join(self.default_dir, 'previous' if period == 'gap' else period, 'csv/'), categories=categories) def format(self, period): loader = Formatting(os.path.join(self.default_dir, period, 'csv'), add_tihm=period == 'previous') dataloader = Dataloader(loader.activity_data, loader.physiological_data, loader.environmental_data, self.num_days_extended, period == 'previous') categories = ['labelled', 'unlabelled'] if period == 'previous' else ['unlabelled'] for data_type in categories: save_path = os.path.join(self.default_dir, period, 'npy', data_type) save_mkdir(save_path) attr = 'get_{}_data'.format(data_type) activity_data, physiological_data, environmental_data, p_ids, labels, dates = getattr(dataloader, attr)() np.save(os.path.join(save_path, 'activity.npy'.format(data_type)), activity_data) np.save(os.path.join(save_path, 'physiological.npy'.format(data_type)), physiological_data) np.save(os.path.join(save_path, 'environmental.npy'.format(data_type)), environmental_data) np.save(os.path.join(save_path, 'patient_id.npy'), p_ids) if data_type == 'labelled': np.save(os.path.join(save_path, 'label.npy'), labels) np.save(os.path.join(save_path, 'dates.npy'), dates) def refresh(self, refresh_period=None): if refresh_period is None: refresh_period = ['current'] try: date_dict = self.get_dates() except FileNotFoundError: print('Dates file does not exist, start to initialise') self.initialise() return if date_dict['current']['until'] == DT.date.today() - DT.timedelta(days=1): print('Data is up-to-date') return dates_save(refresh=False) date_dict = self.get_dates() try: if date_dict['gap']['until'] > date_dict['gap']['since']: self.download('gap') self.download('current') except TypeError: date_backup(True) return False self.collate() for folder in refresh_period: self.format(folder) return def collate(self): date_dict = self.get_dates() for filename in iter_dir(self.previous_csv_data, split=False): if filename not in ['device_types.csv', 'homes.csv', 'patients.csv']: previous_data = pd.read_csv(os.path.join(self.previous_csv_data, filename), index_col=0) current_data = pd.read_csv(os.path.join(self.current_csv_data, filename), index_col=0) current_data = current_data[current_data.start_date != 'start_date'] previous_data = previous_data[previous_data.start_date != 'start_date'] current_data.start_date = pd.to_datetime(current_data.start_date) current_mask = current_data.start_date.dt.date < date_dict['gap']['until'] previous_data = pd.concat([previous_data, current_data[current_mask]]) current_data = current_data[~current_mask] current_data.drop_duplicates().to_csv(os.path.join(self.current_csv_data, filename), index=False) previous_data.drop_duplicates().to_csv(os.path.join(self.previous_csv_data, filename), index=False) return @staticmethod def get_dates(): ''' This function returns the current dates saved in the configurations folder. This is an internal function. Returns --------- - dates: dict: This dictionary holds the state ('gap', 'current', etc) and the dates. ''' with open(dates_path) as json_file: date_dict = json.load(json_file) for state in date_dict: for time in date_dict[state]: date_dict[state][time] = pd.to_datetime(date_dict[state][time]) return date_dict @staticmethod def clean_df(path): ''' Use to clean dataframe contains unnamed columns. Returns ------- ''' for filename in iter_dir(path, split=False): df = pd.read_csv(os.path.join(path, filename), index_col=0) df = df.loc[:, ~df.columns.str.contains('^Unnamed')] df.to_csv(os.path.join(path, filename), index=False)
Static methods
def clean_df(path)
-
Use to clean dataframe contains unnamed columns. Returns
Expand source code
@staticmethod def clean_df(path): ''' Use to clean dataframe contains unnamed columns. Returns ------- ''' for filename in iter_dir(path, split=False): df = pd.read_csv(os.path.join(path, filename), index_col=0) df = df.loc[:, ~df.columns.str.contains('^Unnamed')] df.to_csv(os.path.join(path, filename), index=False)
def get_dates()
-
This function returns the current dates saved in the configurations folder. This is an internal function.
Returns
- dates: dict: This dictionary holds the state ('gap', 'current', etc) and the dates.
Expand source code
@staticmethod def get_dates(): ''' This function returns the current dates saved in the configurations folder. This is an internal function. Returns --------- - dates: dict: This dictionary holds the state ('gap', 'current', etc) and the dates. ''' with open(dates_path) as json_file: date_dict = json.load(json_file) for state in date_dict: for time in date_dict[state]: date_dict[state][time] = pd.to_datetime(date_dict[state][time]) return date_dict
Instance variables
var current_csv_data
-
Expand source code
@property def current_csv_data(self): return os.path.join(self.default_dir, 'current', 'csv')
var current_data
-
Expand source code
@property def current_data(self): return os.path.join(self.default_dir, 'current', 'npy', 'unlabelled')
var gap_csv_data
-
Expand source code
@property def gap_csv_data(self): return os.path.join(self.default_dir, 'gap', 'csv')
var previous_csv_data
-
Expand source code
@property def previous_csv_data(self): return os.path.join(self.default_dir, 'previous', 'csv')
var previous_labelled_data
-
Expand source code
@property def previous_labelled_data(self): return os.path.join(self.default_dir, 'previous', 'npy', 'labelled')
var previous_unlabelled_data
-
Expand source code
@property def previous_unlabelled_data(self): return os.path.join(self.default_dir, 'previous', 'npy', 'unlabelled')
Methods
def check_exist(self, path)
-
Expand source code
def check_exist(self, path): check_list = { '.csv': {'activity': ['raw_door_sensor', 'raw_appliance_use', 'raw_activity_pir', 'device_types']}, '.npy': {'activity': {'current': ['unlabelled', 'patient_id', 'dates'], 'previous': ['unlabelled', 'patient_id', 'dates', 'X', 'y']}}, } folder_type = 'previous' if 'previous' in path else 'current' reformat_flag = False for data_type in ['activity']: # Check the csv file if not set([ele + '.csv' for ele in check_list['.csv'][data_type]]) \ <= set(iter_dir(os.path.join(path, 'csv'), '.csv', False)): print(data_type, folder_type, 'raw data does not exist, start to download') self.download(folder_type, data_type) reformat_flag = True else: print(data_type, folder_type, 'is already downloaded') # Check the npy file if not set([ele + '.npy' for ele in check_list['.npy'][data_type][folder_type]]) \ <= set(iter_dir(os.path.join(path, 'npy'), '.npy', False)) or reformat_flag: print('formatting the data: ', data_type, folder_type) self.format(folder_type) else: print(data_type, folder_type, 'has been processed')
def collate(self)
-
Expand source code
def collate(self): date_dict = self.get_dates() for filename in iter_dir(self.previous_csv_data, split=False): if filename not in ['device_types.csv', 'homes.csv', 'patients.csv']: previous_data = pd.read_csv(os.path.join(self.previous_csv_data, filename), index_col=0) current_data = pd.read_csv(os.path.join(self.current_csv_data, filename), index_col=0) current_data = current_data[current_data.start_date != 'start_date'] previous_data = previous_data[previous_data.start_date != 'start_date'] current_data.start_date = pd.to_datetime(current_data.start_date) current_mask = current_data.start_date.dt.date < date_dict['gap']['until'] previous_data = pd.concat([previous_data, current_data[current_mask]]) current_data = current_data[~current_mask] current_data.drop_duplicates().to_csv(os.path.join(self.current_csv_data, filename), index=False) previous_data.drop_duplicates().to_csv(os.path.join(self.previous_csv_data, filename), index=False) return
def download(self, period, include_devices=False)
-
Expand source code
def download(self, period, include_devices=False): categories = [] for data_type in self.categories: categories.extend(config[data_type]['type'].copy()) if include_devices: categories.append('device_types') date_dict = self.get_dates() self.downloader.export(since=date_dict[period]['since'], until=date_dict[period]['until'], reload=True, save_path=os.path.join(self.default_dir, 'previous' if period == 'gap' else period, 'csv/'), categories=categories)
def format(self, period)
-
Expand source code
def format(self, period): loader = Formatting(os.path.join(self.default_dir, period, 'csv'), add_tihm=period == 'previous') dataloader = Dataloader(loader.activity_data, loader.physiological_data, loader.environmental_data, self.num_days_extended, period == 'previous') categories = ['labelled', 'unlabelled'] if period == 'previous' else ['unlabelled'] for data_type in categories: save_path = os.path.join(self.default_dir, period, 'npy', data_type) save_mkdir(save_path) attr = 'get_{}_data'.format(data_type) activity_data, physiological_data, environmental_data, p_ids, labels, dates = getattr(dataloader, attr)() np.save(os.path.join(save_path, 'activity.npy'.format(data_type)), activity_data) np.save(os.path.join(save_path, 'physiological.npy'.format(data_type)), physiological_data) np.save(os.path.join(save_path, 'environmental.npy'.format(data_type)), environmental_data) np.save(os.path.join(save_path, 'patient_id.npy'), p_ids) if data_type == 'labelled': np.save(os.path.join(save_path, 'label.npy'), labels) np.save(os.path.join(save_path, 'dates.npy'), dates)
def initialise(self)
-
Expand source code
def initialise(self): dates_save(refresh=True) for folder in ['current', 'previous']: delete_dir(os.path.join(self.default_dir, folder, 'csv')) save_mkdir(os.path.join(self.default_dir, folder, 'csv')) delete_dir(os.path.join(self.default_dir, folder, 'npy')) save_mkdir(os.path.join(self.default_dir, folder, 'npy')) self.download(folder, include_devices=True) self.format(folder)
def refresh(self, refresh_period=None)
-
Expand source code
def refresh(self, refresh_period=None): if refresh_period is None: refresh_period = ['current'] try: date_dict = self.get_dates() except FileNotFoundError: print('Dates file does not exist, start to initialise') self.initialise() return if date_dict['current']['until'] == DT.date.today() - DT.timedelta(days=1): print('Data is up-to-date') return dates_save(refresh=False) date_dict = self.get_dates() try: if date_dict['gap']['until'] > date_dict['gap']['since']: self.download('gap') self.download('current') except TypeError: date_backup(True) return False self.collate() for folder in refresh_period: self.format(folder) return