Module minder_utils.scripts.weekly_loader
Expand source code
from minder_utils.formatting.format_util import iter_dir
import os
import datetime as DT
from minder_utils.download.download import Downloader
from minder_utils.formatting.formatting import Formatting
from minder_utils.dataloader import Dataloader
import numpy as np
from minder_utils.util.util import save_mkdir, delete_dir
import json
from minder_utils.settings import dates_save, date_backup
import pandas as pd
from minder_utils.configurations import dates_path
from minder_utils.configurations import config
class Weekly_dataloader:
"""
Support UTI only
This class will
- download all previous data if it have not been downloaded before
- will be saved as labelled data and unlabelled data
- download the latest weekly data
- reformat all the data into following (N is the number of samples)
- activity data: N * 3 * 8 * 20
- environmental data: N * 19
- physiological data: N * 12
"""
def __init__(self, categories=None, save_dir=os.path.join('./data', 'weekly_test'), num_days_extended=3):
'''
@param data_type: activity, environmental, physiological
@param num_days_extended: for uti only, how many consecutive days to be labelled
'''
self.default_categories = ['activity', 'environmental', 'physiological']
self.categories = self.default_categories if categories is None else categories
assert all(data_type in self.default_categories for data_type in self.categories), 'available categories: ' \
'activity, environmental, ' \
'physiological'
self.num_days_extended = num_days_extended
self.downloader = Downloader()
self.default_dir = save_dir
save_mkdir(self.default_dir)
@property
def previous_labelled_data(self):
return os.path.join(self.default_dir, 'previous', 'npy', 'labelled')
@property
def previous_unlabelled_data(self):
return os.path.join(self.default_dir, 'previous', 'npy', 'unlabelled')
@property
def current_data(self):
return os.path.join(self.default_dir, 'current', 'npy', 'unlabelled')
@property
def current_csv_data(self):
return os.path.join(self.default_dir, 'current', 'csv')
@property
def previous_csv_data(self):
return os.path.join(self.default_dir, 'previous', 'csv')
@property
def gap_csv_data(self):
return os.path.join(self.default_dir, 'gap', 'csv')
def initialise(self):
dates_save(refresh=True)
for folder in ['current', 'previous']:
delete_dir(os.path.join(self.default_dir, folder, 'csv'))
save_mkdir(os.path.join(self.default_dir, folder, 'csv'))
delete_dir(os.path.join(self.default_dir, folder, 'npy'))
save_mkdir(os.path.join(self.default_dir, folder, 'npy'))
self.download(folder, include_devices=True)
self.format(folder)
def check_exist(self, path):
check_list = {
'.csv': {'activity': ['raw_door_sensor', 'raw_appliance_use', 'raw_activity_pir', 'device_types']},
'.npy': {'activity': {'current': ['unlabelled', 'patient_id', 'dates'],
'previous': ['unlabelled', 'patient_id', 'dates', 'X', 'y']}},
}
folder_type = 'previous' if 'previous' in path else 'current'
reformat_flag = False
for data_type in ['activity']:
# Check the csv file
if not set([ele + '.csv' for ele in check_list['.csv'][data_type]]) \
<= set(iter_dir(os.path.join(path, 'csv'), '.csv', False)):
print(data_type, folder_type, 'raw data does not exist, start to download')
self.download(folder_type, data_type)
reformat_flag = True
else:
print(data_type, folder_type, 'is already downloaded')
# Check the npy file
if not set([ele + '.npy' for ele in check_list['.npy'][data_type][folder_type]]) \
<= set(iter_dir(os.path.join(path, 'npy'), '.npy', False)) or reformat_flag:
print('formatting the data: ', data_type, folder_type)
self.format(folder_type)
else:
print(data_type, folder_type, 'has been processed')
def download(self, period, include_devices=False):
categories = []
for data_type in self.categories:
categories.extend(config[data_type]['type'].copy())
if include_devices:
categories.append('device_types')
date_dict = self.get_dates()
self.downloader.export(since=date_dict[period]['since'], until=date_dict[period]['until'], reload=True,
save_path=os.path.join(self.default_dir, 'previous' if period == 'gap' else period,
'csv/'),
categories=categories)
def format(self, period):
loader = Formatting(os.path.join(self.default_dir, period, 'csv'), add_tihm=period == 'previous')
dataloader = Dataloader(loader.activity_data,
loader.physiological_data,
loader.environmental_data,
self.num_days_extended, period == 'previous')
categories = ['labelled', 'unlabelled'] if period == 'previous' else ['unlabelled']
for data_type in categories:
save_path = os.path.join(self.default_dir, period, 'npy', data_type)
save_mkdir(save_path)
attr = 'get_{}_data'.format(data_type)
activity_data, physiological_data, environmental_data, p_ids, labels, dates = getattr(dataloader, attr)()
np.save(os.path.join(save_path, 'activity.npy'.format(data_type)), activity_data)
np.save(os.path.join(save_path, 'physiological.npy'.format(data_type)), physiological_data)
np.save(os.path.join(save_path, 'environmental.npy'.format(data_type)), environmental_data)
np.save(os.path.join(save_path, 'patient_id.npy'), p_ids)
if data_type == 'labelled':
np.save(os.path.join(save_path, 'label.npy'), labels)
np.save(os.path.join(save_path, 'dates.npy'), dates)
def refresh(self, refresh_period=None):
if refresh_period is None:
refresh_period = ['current']
try:
date_dict = self.get_dates()
except FileNotFoundError:
print('Dates file does not exist, start to initialise')
self.initialise()
return
if date_dict['current']['until'] == DT.date.today() - DT.timedelta(days=1):
print('Data is up-to-date')
return
dates_save(refresh=False)
date_dict = self.get_dates()
try:
if date_dict['gap']['until'] > date_dict['gap']['since']:
self.download('gap')
self.download('current')
except TypeError:
date_backup(True)
return False
self.collate()
for folder in refresh_period:
self.format(folder)
return
def collate(self):
date_dict = self.get_dates()
for filename in iter_dir(self.previous_csv_data, split=False):
if filename not in ['device_types.csv', 'homes.csv', 'patients.csv']:
previous_data = pd.read_csv(os.path.join(self.previous_csv_data, filename), index_col=0)
current_data = pd.read_csv(os.path.join(self.current_csv_data, filename), index_col=0)
current_data = current_data[current_data.start_date != 'start_date']
previous_data = previous_data[previous_data.start_date != 'start_date']
current_data.start_date = pd.to_datetime(current_data.start_date)
current_mask = current_data.start_date.dt.date < date_dict['gap']['until']
previous_data = pd.concat([previous_data, current_data[current_mask]])
current_data = current_data[~current_mask]
current_data.drop_duplicates().to_csv(os.path.join(self.current_csv_data, filename), index=False)
previous_data.drop_duplicates().to_csv(os.path.join(self.previous_csv_data, filename), index=False)
return
@staticmethod
def get_dates():
'''
This function returns the current dates saved in the configurations folder.
This is an internal function.
Returns
---------
- dates: dict:
This dictionary holds the state ('gap', 'current', etc) and the dates.
'''
with open(dates_path) as json_file:
date_dict = json.load(json_file)
for state in date_dict:
for time in date_dict[state]:
date_dict[state][time] = pd.to_datetime(date_dict[state][time])
return date_dict
@staticmethod
def clean_df(path):
'''
Use to clean dataframe contains unnamed columns.
Returns
-------
'''
for filename in iter_dir(path, split=False):
df = pd.read_csv(os.path.join(path, filename), index_col=0)
df = df.loc[:, ~df.columns.str.contains('^Unnamed')]
df.to_csv(os.path.join(path, filename), index=False)
Classes
class Weekly_dataloader (categories=None, save_dir='./data\\weekly_test', num_days_extended=3)
-
Support UTI only This class will - download all previous data if it have not been downloaded before - will be saved as labelled data and unlabelled data - download the latest weekly data - reformat all the data into following (N is the number of samples) - activity data: N * 3 * 8 * 20 - environmental data: N * 19 - physiological data: N * 12
@param data_type: activity, environmental, physiological @param num_days_extended: for uti only, how many consecutive days to be labelled
Expand source code
class Weekly_dataloader: """ Support UTI only This class will - download all previous data if it have not been downloaded before - will be saved as labelled data and unlabelled data - download the latest weekly data - reformat all the data into following (N is the number of samples) - activity data: N * 3 * 8 * 20 - environmental data: N * 19 - physiological data: N * 12 """ def __init__(self, categories=None, save_dir=os.path.join('./data', 'weekly_test'), num_days_extended=3): ''' @param data_type: activity, environmental, physiological @param num_days_extended: for uti only, how many consecutive days to be labelled ''' self.default_categories = ['activity', 'environmental', 'physiological'] self.categories = self.default_categories if categories is None else categories assert all(data_type in self.default_categories for data_type in self.categories), 'available categories: ' \ 'activity, environmental, ' \ 'physiological' self.num_days_extended = num_days_extended self.downloader = Downloader() self.default_dir = save_dir save_mkdir(self.default_dir) @property def previous_labelled_data(self): return os.path.join(self.default_dir, 'previous', 'npy', 'labelled') @property def previous_unlabelled_data(self): return os.path.join(self.default_dir, 'previous', 'npy', 'unlabelled') @property def current_data(self): return os.path.join(self.default_dir, 'current', 'npy', 'unlabelled') @property def current_csv_data(self): return os.path.join(self.default_dir, 'current', 'csv') @property def previous_csv_data(self): return os.path.join(self.default_dir, 'previous', 'csv') @property def gap_csv_data(self): return os.path.join(self.default_dir, 'gap', 'csv') def initialise(self): dates_save(refresh=True) for folder in ['current', 'previous']: delete_dir(os.path.join(self.default_dir, folder, 'csv')) save_mkdir(os.path.join(self.default_dir, folder, 'csv')) delete_dir(os.path.join(self.default_dir, folder, 'npy')) save_mkdir(os.path.join(self.default_dir, folder, 'npy')) self.download(folder, include_devices=True) self.format(folder) def check_exist(self, path): check_list = { '.csv': {'activity': ['raw_door_sensor', 'raw_appliance_use', 'raw_activity_pir', 'device_types']}, '.npy': {'activity': {'current': ['unlabelled', 'patient_id', 'dates'], 'previous': ['unlabelled', 'patient_id', 'dates', 'X', 'y']}}, } folder_type = 'previous' if 'previous' in path else 'current' reformat_flag = False for data_type in ['activity']: # Check the csv file if not set([ele + '.csv' for ele in check_list['.csv'][data_type]]) \ <= set(iter_dir(os.path.join(path, 'csv'), '.csv', False)): print(data_type, folder_type, 'raw data does not exist, start to download') self.download(folder_type, data_type) reformat_flag = True else: print(data_type, folder_type, 'is already downloaded') # Check the npy file if not set([ele + '.npy' for ele in check_list['.npy'][data_type][folder_type]]) \ <= set(iter_dir(os.path.join(path, 'npy'), '.npy', False)) or reformat_flag: print('formatting the data: ', data_type, folder_type) self.format(folder_type) else: print(data_type, folder_type, 'has been processed') def download(self, period, include_devices=False): categories = [] for data_type in self.categories: categories.extend(config[data_type]['type'].copy()) if include_devices: categories.append('device_types') date_dict = self.get_dates() self.downloader.export(since=date_dict[period]['since'], until=date_dict[period]['until'], reload=True, save_path=os.path.join(self.default_dir, 'previous' if period == 'gap' else period, 'csv/'), categories=categories) def format(self, period): loader = Formatting(os.path.join(self.default_dir, period, 'csv'), add_tihm=period == 'previous') dataloader = Dataloader(loader.activity_data, loader.physiological_data, loader.environmental_data, self.num_days_extended, period == 'previous') categories = ['labelled', 'unlabelled'] if period == 'previous' else ['unlabelled'] for data_type in categories: save_path = os.path.join(self.default_dir, period, 'npy', data_type) save_mkdir(save_path) attr = 'get_{}_data'.format(data_type) activity_data, physiological_data, environmental_data, p_ids, labels, dates = getattr(dataloader, attr)() np.save(os.path.join(save_path, 'activity.npy'.format(data_type)), activity_data) np.save(os.path.join(save_path, 'physiological.npy'.format(data_type)), physiological_data) np.save(os.path.join(save_path, 'environmental.npy'.format(data_type)), environmental_data) np.save(os.path.join(save_path, 'patient_id.npy'), p_ids) if data_type == 'labelled': np.save(os.path.join(save_path, 'label.npy'), labels) np.save(os.path.join(save_path, 'dates.npy'), dates) def refresh(self, refresh_period=None): if refresh_period is None: refresh_period = ['current'] try: date_dict = self.get_dates() except FileNotFoundError: print('Dates file does not exist, start to initialise') self.initialise() return if date_dict['current']['until'] == DT.date.today() - DT.timedelta(days=1): print('Data is up-to-date') return dates_save(refresh=False) date_dict = self.get_dates() try: if date_dict['gap']['until'] > date_dict['gap']['since']: self.download('gap') self.download('current') except TypeError: date_backup(True) return False self.collate() for folder in refresh_period: self.format(folder) return def collate(self): date_dict = self.get_dates() for filename in iter_dir(self.previous_csv_data, split=False): if filename not in ['device_types.csv', 'homes.csv', 'patients.csv']: previous_data = pd.read_csv(os.path.join(self.previous_csv_data, filename), index_col=0) current_data = pd.read_csv(os.path.join(self.current_csv_data, filename), index_col=0) current_data = current_data[current_data.start_date != 'start_date'] previous_data = previous_data[previous_data.start_date != 'start_date'] current_data.start_date = pd.to_datetime(current_data.start_date) current_mask = current_data.start_date.dt.date < date_dict['gap']['until'] previous_data = pd.concat([previous_data, current_data[current_mask]]) current_data = current_data[~current_mask] current_data.drop_duplicates().to_csv(os.path.join(self.current_csv_data, filename), index=False) previous_data.drop_duplicates().to_csv(os.path.join(self.previous_csv_data, filename), index=False) return @staticmethod def get_dates(): ''' This function returns the current dates saved in the configurations folder. This is an internal function. Returns --------- - dates: dict: This dictionary holds the state ('gap', 'current', etc) and the dates. ''' with open(dates_path) as json_file: date_dict = json.load(json_file) for state in date_dict: for time in date_dict[state]: date_dict[state][time] = pd.to_datetime(date_dict[state][time]) return date_dict @staticmethod def clean_df(path): ''' Use to clean dataframe contains unnamed columns. Returns ------- ''' for filename in iter_dir(path, split=False): df = pd.read_csv(os.path.join(path, filename), index_col=0) df = df.loc[:, ~df.columns.str.contains('^Unnamed')] df.to_csv(os.path.join(path, filename), index=False)
Static methods
def clean_df(path)
-
Use to clean dataframe contains unnamed columns. Returns
Expand source code
@staticmethod def clean_df(path): ''' Use to clean dataframe contains unnamed columns. Returns ------- ''' for filename in iter_dir(path, split=False): df = pd.read_csv(os.path.join(path, filename), index_col=0) df = df.loc[:, ~df.columns.str.contains('^Unnamed')] df.to_csv(os.path.join(path, filename), index=False)
def get_dates()
-
This function returns the current dates saved in the configurations folder. This is an internal function.
Returns
- dates: dict: This dictionary holds the state ('gap', 'current', etc) and the dates.
Expand source code
@staticmethod def get_dates(): ''' This function returns the current dates saved in the configurations folder. This is an internal function. Returns --------- - dates: dict: This dictionary holds the state ('gap', 'current', etc) and the dates. ''' with open(dates_path) as json_file: date_dict = json.load(json_file) for state in date_dict: for time in date_dict[state]: date_dict[state][time] = pd.to_datetime(date_dict[state][time]) return date_dict
Instance variables
var current_csv_data
-
Expand source code
@property def current_csv_data(self): return os.path.join(self.default_dir, 'current', 'csv')
var current_data
-
Expand source code
@property def current_data(self): return os.path.join(self.default_dir, 'current', 'npy', 'unlabelled')
var gap_csv_data
-
Expand source code
@property def gap_csv_data(self): return os.path.join(self.default_dir, 'gap', 'csv')
var previous_csv_data
-
Expand source code
@property def previous_csv_data(self): return os.path.join(self.default_dir, 'previous', 'csv')
var previous_labelled_data
-
Expand source code
@property def previous_labelled_data(self): return os.path.join(self.default_dir, 'previous', 'npy', 'labelled')
var previous_unlabelled_data
-
Expand source code
@property def previous_unlabelled_data(self): return os.path.join(self.default_dir, 'previous', 'npy', 'unlabelled')
Methods
def check_exist(self, path)
-
Expand source code
def check_exist(self, path): check_list = { '.csv': {'activity': ['raw_door_sensor', 'raw_appliance_use', 'raw_activity_pir', 'device_types']}, '.npy': {'activity': {'current': ['unlabelled', 'patient_id', 'dates'], 'previous': ['unlabelled', 'patient_id', 'dates', 'X', 'y']}}, } folder_type = 'previous' if 'previous' in path else 'current' reformat_flag = False for data_type in ['activity']: # Check the csv file if not set([ele + '.csv' for ele in check_list['.csv'][data_type]]) \ <= set(iter_dir(os.path.join(path, 'csv'), '.csv', False)): print(data_type, folder_type, 'raw data does not exist, start to download') self.download(folder_type, data_type) reformat_flag = True else: print(data_type, folder_type, 'is already downloaded') # Check the npy file if not set([ele + '.npy' for ele in check_list['.npy'][data_type][folder_type]]) \ <= set(iter_dir(os.path.join(path, 'npy'), '.npy', False)) or reformat_flag: print('formatting the data: ', data_type, folder_type) self.format(folder_type) else: print(data_type, folder_type, 'has been processed')
def collate(self)
-
Expand source code
def collate(self): date_dict = self.get_dates() for filename in iter_dir(self.previous_csv_data, split=False): if filename not in ['device_types.csv', 'homes.csv', 'patients.csv']: previous_data = pd.read_csv(os.path.join(self.previous_csv_data, filename), index_col=0) current_data = pd.read_csv(os.path.join(self.current_csv_data, filename), index_col=0) current_data = current_data[current_data.start_date != 'start_date'] previous_data = previous_data[previous_data.start_date != 'start_date'] current_data.start_date = pd.to_datetime(current_data.start_date) current_mask = current_data.start_date.dt.date < date_dict['gap']['until'] previous_data = pd.concat([previous_data, current_data[current_mask]]) current_data = current_data[~current_mask] current_data.drop_duplicates().to_csv(os.path.join(self.current_csv_data, filename), index=False) previous_data.drop_duplicates().to_csv(os.path.join(self.previous_csv_data, filename), index=False) return
def download(self, period, include_devices=False)
-
Expand source code
def download(self, period, include_devices=False): categories = [] for data_type in self.categories: categories.extend(config[data_type]['type'].copy()) if include_devices: categories.append('device_types') date_dict = self.get_dates() self.downloader.export(since=date_dict[period]['since'], until=date_dict[period]['until'], reload=True, save_path=os.path.join(self.default_dir, 'previous' if period == 'gap' else period, 'csv/'), categories=categories)
def format(self, period)
-
Expand source code
def format(self, period): loader = Formatting(os.path.join(self.default_dir, period, 'csv'), add_tihm=period == 'previous') dataloader = Dataloader(loader.activity_data, loader.physiological_data, loader.environmental_data, self.num_days_extended, period == 'previous') categories = ['labelled', 'unlabelled'] if period == 'previous' else ['unlabelled'] for data_type in categories: save_path = os.path.join(self.default_dir, period, 'npy', data_type) save_mkdir(save_path) attr = 'get_{}_data'.format(data_type) activity_data, physiological_data, environmental_data, p_ids, labels, dates = getattr(dataloader, attr)() np.save(os.path.join(save_path, 'activity.npy'.format(data_type)), activity_data) np.save(os.path.join(save_path, 'physiological.npy'.format(data_type)), physiological_data) np.save(os.path.join(save_path, 'environmental.npy'.format(data_type)), environmental_data) np.save(os.path.join(save_path, 'patient_id.npy'), p_ids) if data_type == 'labelled': np.save(os.path.join(save_path, 'label.npy'), labels) np.save(os.path.join(save_path, 'dates.npy'), dates)
def initialise(self)
-
Expand source code
def initialise(self): dates_save(refresh=True) for folder in ['current', 'previous']: delete_dir(os.path.join(self.default_dir, folder, 'csv')) save_mkdir(os.path.join(self.default_dir, folder, 'csv')) delete_dir(os.path.join(self.default_dir, folder, 'npy')) save_mkdir(os.path.join(self.default_dir, folder, 'npy')) self.download(folder, include_devices=True) self.format(folder)
def refresh(self, refresh_period=None)
-
Expand source code
def refresh(self, refresh_period=None): if refresh_period is None: refresh_period = ['current'] try: date_dict = self.get_dates() except FileNotFoundError: print('Dates file does not exist, start to initialise') self.initialise() return if date_dict['current']['until'] == DT.date.today() - DT.timedelta(days=1): print('Data is up-to-date') return dates_save(refresh=False) date_dict = self.get_dates() try: if date_dict['gap']['until'] > date_dict['gap']['since']: self.download('gap') self.download('current') except TypeError: date_backup(True) return False self.collate() for folder in refresh_period: self.format(folder) return