Module minder_utils.download.download
Expand source code
import requests
import json
import pandas as pd
import io
from pathlib import Path
import sys
import os
from minder_utils.util.util import progress_spinner, reformat_path, save_mkdir
from minder_utils.configurations import token_path
import numpy as np
from datetime import date, datetime
class Downloader:
'''
This class allows you to download and save the data from minder. Make sure that you
have internally saved your token before using this class (see the
```Getting Started.ipynb``` guide).
``Example``
```
from minder_utils.download import Downloader
dl = Downloader()
category_list = dl.get_category_names('activity')
dl.export(categories = category_list, since= '2021-10-05', save_path='./data/')
```
This would download all of the activity data from the 5th October 2021, and save it
as a csv in the directory ```'./data/'```
'''
def __init__(self):
self.url = 'https://research.minder.care/api/'
self.params = {'Authorization': self.token(), 'Content-Type': 'application/json'}
def get_info(self):
'''
This function returns the available datasets on minder in the form of a
dictionary
Returns
---------
- _: dict:
This returns a dictionary of the available datasets.
'''
print('Sending Request...')
r = requests.get(self.url + 'info/datasets', headers=self.params)
if r.status_code in [401, 403]:
raise TypeError('Authentication failed!'\
' Please check your token - it might be out of date. '\
'You might also not have authorization to complete your request.')
try:
return r.json()
except json.decoder.JSONDecodeError:
print('Get response ', r)
def _export_request(self, categories='all', since=None, until=None):
'''
This is an internal function that makes the request to download the data.
Arguments
---------
- categories: list or string:
If a list, this is the datasets that will be downloaded. Please use the
dataset names that can be returned by using the get_category_names function.
If the string 'all' is supplied, this function will return all of the data. This
is not good! There should be a good reason to do this.
- since: valid input to pd.to_datetime(.):
This is the date and time from which the data will be loaded.
'''
# print('Deleting Existing export request')
# previously_requests = requests.get(self.url + 'export', headers=self.params).json()
# for job in previously_requests:
# response = requests.delete(self.url + 'export/' + job['id'], headers=self.params)
# if response.status_code == 200:
# print('Job ID ', job['id'], 'is successfully deleted', response.text)
# else:
# print('Job ID ', job['id'], 'is NOT deleted. Response code ', response.status_code)
print('Creating new export request')
export_keys = {'datasets': {}}
if since is not None:
export_keys['since'] = self.convert_to_ISO(since)
if until is not None:
export_keys['until'] = self.convert_to_ISO(until)
info = self.get_info()['Categories']
for key in info:
for category in info[key]:
if category in categories or categories == 'all':
export_keys['datasets'][category] = {}
print('Exporting the ', export_keys['datasets'])
print('From ', since, 'to', until)
schedule_job = requests.post(self.url + 'export', data=json.dumps(export_keys), headers=self.params)
job_id = schedule_job.headers['Content-Location']
response = requests.get(job_id, headers=self.params)
if response.status_code == 401:
raise TypeError('Authentication failed!'\
' Please check your token - it might be out of date.')
response = response.json()
waiting = True
while waiting:
if response['status'] == 202:
response = requests.get(job_id, headers=self.params).json()
# the following waits for x seconds and runs an animation in the
# mean time to make sure the user doesn't think the code is broken
progress_spinner(30, 'Waiting for the sever to complete the job', new_line_after=False)
elif response['status'] == 500:
sys.stdout.write('\r')
sys.stdout.write("Request failed")
sys.stdout.flush()
waiting = False
else:
sys.stdout.write('\n')
print("Job is completed, start to download the data")
waiting = False
def _export_request_parallel(self, export_dict):
'''
This function allows the user to make parallel export requests. This is useful
when the requests have difference since and until dates for the different datasets in
the categories.
Arguments
---------
- export_dict: dictionary:
This dictionary contains the categories to be downloaded as keys, with the since
and until as values in a tuple.
For example:
```
{ category : (since , until),
'raw_activity_pir': (pd.to_datetime('2021-10-06'), pd.to_datetime('2021-10-10')),
'raw_door_sensor' : (pd.to_datetime('2021-10-06'), pd.to_datetime('2021-10-10'))}
```
'''
categories_list = list(export_dict.keys())
available_categories_list = self.get_category_names(measurement_name='all')
for category in categories_list:
if not category in available_categories_list:
raise TypeError('Category {} is not available to download. Please check the name.'.format(category))
print('Creating new parallel export requests')
# the following creates a list of export keys to be called by the API
export_key_list = {}
for category in categories_list:
since = export_dict[category][0]
until = export_dict[category][1]
export_keys = {'datasets': {category: {}}}
if not since is None:
export_keys['since'] = self.convert_to_ISO(since)
if not until is None:
export_keys['until'] = self.convert_to_ISO(until)
export_key_list[category] = export_keys
# scheduling jobs for each of the requests:
request_url_dict = {}
schedule_job_dict = {}
for category in categories_list:
export_keys = export_key_list[category]
schedule_job = requests.post(self.url + 'export', data=json.dumps(export_keys), headers=self.params)
schedule_job_dict[category] = schedule_job
request_url = schedule_job.headers['Content-Location']
request_url_dict[category] = request_url
# checking whether the jobs have been completed:
waiting = True
waiting_for = {category: True for category in categories_list}
job_id_dict = {}
while waiting:
for category in categories_list:
if not waiting_for[category]:
continue
request_url = request_url_dict[category]
response = requests.get(request_url, headers=self.params)
if response.status_code in [401, 403]:
raise TypeError('Authentication failed!'\
' Please check your token - it might be out of date. '\
'You might also not have authorization to complete your request.')
elif response.status_code == 202:
waiting_for[category] = True
elif response.status_code == 500:
sys.stdout.write('\r')
sys.stdout.write("Request failed for category {}".format(category))
sys.stdout.write('\n')
sys.stdout.flush()
waiting_for[category] = False
else:
waiting_for[category] = False
if not category in job_id_dict:
response = response.json()
job_id_dict[category] = response['id']
# if we are no longer waiting for a job to complete, move onto the downloads
if True in list(waiting_for.values()):
progress_spinner(30, 'Waiting for the sever to complete the job', new_line_after=False)
else:
sys.stdout.write('\n')
sys.stdout.write("The server has finished processing the requests")
sys.stdout.flush()
sys.stdout.write('\n')
waiting = False
return job_id_dict, request_url_dict
def export(self, since=None, until=None, reload=True,
categories='all', save_path='./data/raw_data/', append=True, export_index=None):
'''
This is a function that is able to download the data and save it as a csv in save_path.
Note that ```categories``` refers to the datasets. If you want to get the categories
for a given set of measurements (ie: activity, care, vital signs, etc) please use
the method ```.get_category_names('measurement_name')```. Alternatively, if you want to view all of the
available datasets, please use the method ```.get_category_names('all')```
If the data files already exist, the new data will be appended to the end. Be careful, this can cause
duplicates! To avoid this, use the ```.refresh()``` function or use ```append = False```
Arguments
---------
- since: valid input to pd.to_datetime(.):
This is the date and time from which the data will be loaded. If ```None```,
the earliest possible date is used.
Default: ```None```
- until: valid input to pd.to_datetime(.):
This is the date and time to which the data will be loaded up until. If ```None```,
the latest possible date is used.
Default: ```None```
- reload: bool:
This value determines whether an export request should be sent.
In most cases, this should be ```True```, unless you want to download
the data from a previously run request.
Default: ```True```
- categories: list or string:
If a list, this is the datasets that will be downloaded. Please use the
dataset names that can be returned by using the get_category_names function.
If the string 'all' is supplied, this function will return all of the data. This
is not good! There should be a good reason to do this.
Default: ```'all'```
- save_path: string:
This is the save path for the data that is downloaded from minder.
Default: ```'./data/raw_data/'```
- append: bool:
If ```True```, the downloaded data will be appended to the previous data, if it exists.
If ```False```, the previous data will be overwritten if it exists.
- export_index: integer:
You may use this argument to download a previous request. ```-1``` will download
the most recent request. This argument will over rule the ```reload``` argument.
Defaults to ```None```.
'''
save_path = reformat_path(save_path)
p = Path(save_path)
if not p.exists():
print('Target directory does not exist, creating a new folder')
save_mkdir(save_path)
if export_index is None:
if reload:
self._export_request(categories=categories, since=since, until=until)
data = requests.get(self.url + 'export', headers=self.params).json()
export_index = -1 if export_index is None else export_index
if export_index is None:
if not reload:
if len(data) > 1:
print('Multiple export requests exist, please choose one to download')
for idx, job in enumerate(data):
print('Job {} '.format(idx).center(50, '='))
print('ID: ', job['id'])
print('Transaction Time', job['jobRecord']['transactionTime'])
print('Export sensors: ', end='')
for record in job['jobRecord']['output']:
print(record['type'], end=' ')
print('')
export_index = int(input('Enter the index of the job ...'))
while export_index not in range(len(data)):
print('Not a valid input')
export_index = int(input('Enter the index of the job ...'))
print('Start to export job')
categories_downloaded = []
for idx, record in enumerate(data[export_index]['jobRecord']['output']):
print('Exporting {}/{}'.format(idx + 1, len(data[export_index]['jobRecord']['output'])).ljust(20, ' '),
str(record['type']).ljust(20, ' '), end=' ')
content = requests.get(record['url'], headers=self.params)
if content.status_code != 200:
print('Fail, Response code {}'.format(content.status_code))
else:
if record['type'] in categories_downloaded:
mode = 'a'
header = False
else:
mode = 'a' if append else 'w'
header = not Path(os.path.join(save_path, record['type'] + '.csv')).exists() or mode == 'w'
pd.read_csv(io.StringIO(content.text)).to_csv(os.path.join(save_path, record['type'] + '.csv'),
mode=mode,
header=header, index=False)
categories_downloaded.append(record['type'])
print('Success')
def refresh(self, until=None, categories=None, save_path='./data/raw_data/'):
'''
This function allows for the user to refresh the data currently saved in the
save path. It will download the data missing between the saved files and the
```until``` argument.
Arguments
---------
- until: valid input to pd.to_datetime(.):
This is the date and time to which the data will be loaded up until. If ```None```,
the latest possible date is used.
Default: ```None```
- categories: list or string:
If a list, this is the datasets that will be downloaded. Please use the
dataset names that can be returned by using the get_category_names function.
If a string is given, only this dataset will be refreshed.
- save_path: string:
This is the save path for the data that is downloaded from minder.
Default: ```'./data/raw_data/'```
'''
if until is None:
until = datetime.now()
save_path = reformat_path(save_path)
if categories is None:
raise TypeError('Please supply at least one category...')
if type(categories) == str:
if categories == 'all':
categories = self.get_category_names('all')
else:
categories = [categories]
export_dict = {}
mode_dict = {}
print('Checking current files...')
last_rows = {}
for category in categories:
file_path = os.path.join(save_path, category)
p = Path(file_path + '.csv')
if not p.exists():
since = None
else:
data = pd.read_csv(file_path + '.csv')
if 'start_date' in data.columns:
# add the following to avoid a duplicate of the last and first row
last_rows[category] = data[['start_date', 'id']].iloc[-1, :].to_numpy()
since = pd.to_datetime(data['start_date'].loc[data['start_date'].last_valid_index()])
if self.convert_to_ISO(since) > self.convert_to_ISO(until):
# change since to earliest date and overwrite all data for this category
since = pd.to_datetime(data[['start_date']].iloc[0, 0])
# if the earliest date is after until, then we error
if self.convert_to_ISO(since) > self.convert_to_ISO(until):
raise TypeError('Please check your inputs. For {} we found that you tried refreshing' \
'to a date earlier than the earliest date in the file.'.format(category))
else:
mode_dict[category] = 'w'
else:
mode_dict[category] = 'a'
else:
since=None
mode_dict[category] = 'w'
export_dict[category] = (since, until)
job_id_dict, request_url_dict = self._export_request_parallel(export_dict=export_dict)
data = requests.get(self.url + 'export', headers=self.params).json()
for category in categories:
if not category in request_url_dict:
raise TypeError('Uh-oh! Something seems to have gone wrong.' \
'Please check the inputs to the function and try again.' \
' Looks as if category {} caused the problem'.format(category))
content = requests.get(request_url_dict[category], headers=self.params)
output = json.load(io.StringIO(content.text))['jobRecord']['output']
for n_output, data_chunk in enumerate(output):
content = requests.get(data_chunk['url'], headers=self.params)
sys.stdout.write('\r')
sys.stdout.write("For {}, exporting {}/{}".format(category, n_output + 1, len(output)))
sys.stdout.flush()
if content.status_code != 200:
sys.stdout.write('\n')
sys.stdout.write('\r')
sys.stdout.write('Fail, Response code {} for category {}'.format(content.status_code, category))
sys.stdout.write('\n')
sys.stdout.flush()
else:
current_data = pd.read_csv(io.StringIO(content.text))
if Path(save_path + category + '.csv').exists():
data_to_save = pd.read_csv(save_path + category + '.csv', index_col=0)
data_to_save = data_to_save.append(current_data, ignore_index=True)
data_to_save = data_to_save.drop_duplicates(ignore_index=True)
else:
data_to_save = current_data
'''
header = (not Path(save_path + category + '.csv').exists()) or mode_dict[category] == 'w'
# checking whether the first line is a duplicate of the end of the previous file
if np.all(current_data[['start_date', 'id']].iloc[0, :] == last_rows[category]):
current_data.iloc[1:, :].reset_index(drop=True).to_csv(save_path + category + '.csv',
mode=mode_dict[category],
header=header)
else:
current_data.to_csv(save_path + category + '.csv', mode=mode_dict[category],
header=header)
'''
data_to_save.to_csv(save_path + category + '.csv', mode='w',
header=True)
sys.stdout.write('\n')
print('Success')
return
def get_category_names(self, measurement_name='all'):
'''
This function allows you to get the category names from a given measurement name.
Arguments
---------
- measurement_name: str:
This is the name of the measurement that you want to get the categories for.
The default 'all' returns all the possible measurement names.
Returns
---------
- out: list of strings:
This is a list that contains the category names that can be used in the
export function.
'''
if measurement_name == 'all':
out = []
for value in self.get_info()['Categories'].values():
out.extend(list(value.keys()))
else:
out = list(self.get_info()['Categories'][measurement_name].keys())
return out
def get_group_names(self):
'''
This function allows you to view the names of the sets of measurements
that can be downloaded from minder.
Returns
---------
- out: list of strings:
This is a list that contains the names of the sets of measurements.
'''
out = self.get_info()['Categories'].keys()
return list(out)
@staticmethod
def token():
'''
This function returns the current user token. This is the token that is saved in the
file token_real.json after running the token_save function in settings.
Returns
---------
- token: string:
This returns the token in the format that can be used in the api call.
'''
token_dir = token_path
with open(token_dir) as json_file:
api_keys = json.load(json_file)
# with open('./token_real.json', 'r') as f:
# api_keys = json.loads(f.read())
return api_keys['token']
@staticmethod
def convert_to_ISO(date):
'''
Converts the date to ISO.
Arguments
---------
- data: valid input to pd.to_datetime(.):
This is the date that you want to convert.
Returns
---------
- out: date:
This is the date converted to ISO.
'''
date = pd.to_datetime(date)
return date.strftime('%Y-%m-%dT%H:%M:%S.000Z')
if __name__ == '__main__':
downloader = Downloader()
downloader.export(reload=True, save_path='../data/raw_data/', categories=['raw_activity_pir'])
Classes
class Downloader-
This class allows you to download and save the data from minder. Make sure that you have internally saved your token before using this class (see the
Getting Started.ipynbguide).Examplefrom minder_utils.download import Downloader dl = Downloader() category_list = dl.get_category_names('activity') dl.export(categories = category_list, since= '2021-10-05', save_path='./data/')This would download all of the activity data from the 5th October 2021, and save it as a csv in the directory
'./data/'Expand source code
class Downloader: ''' This class allows you to download and save the data from minder. Make sure that you have internally saved your token before using this class (see the ```Getting Started.ipynb``` guide). ``Example`` ``` from minder_utils.download import Downloader dl = Downloader() category_list = dl.get_category_names('activity') dl.export(categories = category_list, since= '2021-10-05', save_path='./data/') ``` This would download all of the activity data from the 5th October 2021, and save it as a csv in the directory ```'./data/'``` ''' def __init__(self): self.url = 'https://research.minder.care/api/' self.params = {'Authorization': self.token(), 'Content-Type': 'application/json'} def get_info(self): ''' This function returns the available datasets on minder in the form of a dictionary Returns --------- - _: dict: This returns a dictionary of the available datasets. ''' print('Sending Request...') r = requests.get(self.url + 'info/datasets', headers=self.params) if r.status_code in [401, 403]: raise TypeError('Authentication failed!'\ ' Please check your token - it might be out of date. '\ 'You might also not have authorization to complete your request.') try: return r.json() except json.decoder.JSONDecodeError: print('Get response ', r) def _export_request(self, categories='all', since=None, until=None): ''' This is an internal function that makes the request to download the data. Arguments --------- - categories: list or string: If a list, this is the datasets that will be downloaded. Please use the dataset names that can be returned by using the get_category_names function. If the string 'all' is supplied, this function will return all of the data. This is not good! There should be a good reason to do this. - since: valid input to pd.to_datetime(.): This is the date and time from which the data will be loaded. ''' # print('Deleting Existing export request') # previously_requests = requests.get(self.url + 'export', headers=self.params).json() # for job in previously_requests: # response = requests.delete(self.url + 'export/' + job['id'], headers=self.params) # if response.status_code == 200: # print('Job ID ', job['id'], 'is successfully deleted', response.text) # else: # print('Job ID ', job['id'], 'is NOT deleted. Response code ', response.status_code) print('Creating new export request') export_keys = {'datasets': {}} if since is not None: export_keys['since'] = self.convert_to_ISO(since) if until is not None: export_keys['until'] = self.convert_to_ISO(until) info = self.get_info()['Categories'] for key in info: for category in info[key]: if category in categories or categories == 'all': export_keys['datasets'][category] = {} print('Exporting the ', export_keys['datasets']) print('From ', since, 'to', until) schedule_job = requests.post(self.url + 'export', data=json.dumps(export_keys), headers=self.params) job_id = schedule_job.headers['Content-Location'] response = requests.get(job_id, headers=self.params) if response.status_code == 401: raise TypeError('Authentication failed!'\ ' Please check your token - it might be out of date.') response = response.json() waiting = True while waiting: if response['status'] == 202: response = requests.get(job_id, headers=self.params).json() # the following waits for x seconds and runs an animation in the # mean time to make sure the user doesn't think the code is broken progress_spinner(30, 'Waiting for the sever to complete the job', new_line_after=False) elif response['status'] == 500: sys.stdout.write('\r') sys.stdout.write("Request failed") sys.stdout.flush() waiting = False else: sys.stdout.write('\n') print("Job is completed, start to download the data") waiting = False def _export_request_parallel(self, export_dict): ''' This function allows the user to make parallel export requests. This is useful when the requests have difference since and until dates for the different datasets in the categories. Arguments --------- - export_dict: dictionary: This dictionary contains the categories to be downloaded as keys, with the since and until as values in a tuple. For example: ``` { category : (since , until), 'raw_activity_pir': (pd.to_datetime('2021-10-06'), pd.to_datetime('2021-10-10')), 'raw_door_sensor' : (pd.to_datetime('2021-10-06'), pd.to_datetime('2021-10-10'))} ``` ''' categories_list = list(export_dict.keys()) available_categories_list = self.get_category_names(measurement_name='all') for category in categories_list: if not category in available_categories_list: raise TypeError('Category {} is not available to download. Please check the name.'.format(category)) print('Creating new parallel export requests') # the following creates a list of export keys to be called by the API export_key_list = {} for category in categories_list: since = export_dict[category][0] until = export_dict[category][1] export_keys = {'datasets': {category: {}}} if not since is None: export_keys['since'] = self.convert_to_ISO(since) if not until is None: export_keys['until'] = self.convert_to_ISO(until) export_key_list[category] = export_keys # scheduling jobs for each of the requests: request_url_dict = {} schedule_job_dict = {} for category in categories_list: export_keys = export_key_list[category] schedule_job = requests.post(self.url + 'export', data=json.dumps(export_keys), headers=self.params) schedule_job_dict[category] = schedule_job request_url = schedule_job.headers['Content-Location'] request_url_dict[category] = request_url # checking whether the jobs have been completed: waiting = True waiting_for = {category: True for category in categories_list} job_id_dict = {} while waiting: for category in categories_list: if not waiting_for[category]: continue request_url = request_url_dict[category] response = requests.get(request_url, headers=self.params) if response.status_code in [401, 403]: raise TypeError('Authentication failed!'\ ' Please check your token - it might be out of date. '\ 'You might also not have authorization to complete your request.') elif response.status_code == 202: waiting_for[category] = True elif response.status_code == 500: sys.stdout.write('\r') sys.stdout.write("Request failed for category {}".format(category)) sys.stdout.write('\n') sys.stdout.flush() waiting_for[category] = False else: waiting_for[category] = False if not category in job_id_dict: response = response.json() job_id_dict[category] = response['id'] # if we are no longer waiting for a job to complete, move onto the downloads if True in list(waiting_for.values()): progress_spinner(30, 'Waiting for the sever to complete the job', new_line_after=False) else: sys.stdout.write('\n') sys.stdout.write("The server has finished processing the requests") sys.stdout.flush() sys.stdout.write('\n') waiting = False return job_id_dict, request_url_dict def export(self, since=None, until=None, reload=True, categories='all', save_path='./data/raw_data/', append=True, export_index=None): ''' This is a function that is able to download the data and save it as a csv in save_path. Note that ```categories``` refers to the datasets. If you want to get the categories for a given set of measurements (ie: activity, care, vital signs, etc) please use the method ```.get_category_names('measurement_name')```. Alternatively, if you want to view all of the available datasets, please use the method ```.get_category_names('all')``` If the data files already exist, the new data will be appended to the end. Be careful, this can cause duplicates! To avoid this, use the ```.refresh()``` function or use ```append = False``` Arguments --------- - since: valid input to pd.to_datetime(.): This is the date and time from which the data will be loaded. If ```None```, the earliest possible date is used. Default: ```None``` - until: valid input to pd.to_datetime(.): This is the date and time to which the data will be loaded up until. If ```None```, the latest possible date is used. Default: ```None``` - reload: bool: This value determines whether an export request should be sent. In most cases, this should be ```True```, unless you want to download the data from a previously run request. Default: ```True``` - categories: list or string: If a list, this is the datasets that will be downloaded. Please use the dataset names that can be returned by using the get_category_names function. If the string 'all' is supplied, this function will return all of the data. This is not good! There should be a good reason to do this. Default: ```'all'``` - save_path: string: This is the save path for the data that is downloaded from minder. Default: ```'./data/raw_data/'``` - append: bool: If ```True```, the downloaded data will be appended to the previous data, if it exists. If ```False```, the previous data will be overwritten if it exists. - export_index: integer: You may use this argument to download a previous request. ```-1``` will download the most recent request. This argument will over rule the ```reload``` argument. Defaults to ```None```. ''' save_path = reformat_path(save_path) p = Path(save_path) if not p.exists(): print('Target directory does not exist, creating a new folder') save_mkdir(save_path) if export_index is None: if reload: self._export_request(categories=categories, since=since, until=until) data = requests.get(self.url + 'export', headers=self.params).json() export_index = -1 if export_index is None else export_index if export_index is None: if not reload: if len(data) > 1: print('Multiple export requests exist, please choose one to download') for idx, job in enumerate(data): print('Job {} '.format(idx).center(50, '=')) print('ID: ', job['id']) print('Transaction Time', job['jobRecord']['transactionTime']) print('Export sensors: ', end='') for record in job['jobRecord']['output']: print(record['type'], end=' ') print('') export_index = int(input('Enter the index of the job ...')) while export_index not in range(len(data)): print('Not a valid input') export_index = int(input('Enter the index of the job ...')) print('Start to export job') categories_downloaded = [] for idx, record in enumerate(data[export_index]['jobRecord']['output']): print('Exporting {}/{}'.format(idx + 1, len(data[export_index]['jobRecord']['output'])).ljust(20, ' '), str(record['type']).ljust(20, ' '), end=' ') content = requests.get(record['url'], headers=self.params) if content.status_code != 200: print('Fail, Response code {}'.format(content.status_code)) else: if record['type'] in categories_downloaded: mode = 'a' header = False else: mode = 'a' if append else 'w' header = not Path(os.path.join(save_path, record['type'] + '.csv')).exists() or mode == 'w' pd.read_csv(io.StringIO(content.text)).to_csv(os.path.join(save_path, record['type'] + '.csv'), mode=mode, header=header, index=False) categories_downloaded.append(record['type']) print('Success') def refresh(self, until=None, categories=None, save_path='./data/raw_data/'): ''' This function allows for the user to refresh the data currently saved in the save path. It will download the data missing between the saved files and the ```until``` argument. Arguments --------- - until: valid input to pd.to_datetime(.): This is the date and time to which the data will be loaded up until. If ```None```, the latest possible date is used. Default: ```None``` - categories: list or string: If a list, this is the datasets that will be downloaded. Please use the dataset names that can be returned by using the get_category_names function. If a string is given, only this dataset will be refreshed. - save_path: string: This is the save path for the data that is downloaded from minder. Default: ```'./data/raw_data/'``` ''' if until is None: until = datetime.now() save_path = reformat_path(save_path) if categories is None: raise TypeError('Please supply at least one category...') if type(categories) == str: if categories == 'all': categories = self.get_category_names('all') else: categories = [categories] export_dict = {} mode_dict = {} print('Checking current files...') last_rows = {} for category in categories: file_path = os.path.join(save_path, category) p = Path(file_path + '.csv') if not p.exists(): since = None else: data = pd.read_csv(file_path + '.csv') if 'start_date' in data.columns: # add the following to avoid a duplicate of the last and first row last_rows[category] = data[['start_date', 'id']].iloc[-1, :].to_numpy() since = pd.to_datetime(data['start_date'].loc[data['start_date'].last_valid_index()]) if self.convert_to_ISO(since) > self.convert_to_ISO(until): # change since to earliest date and overwrite all data for this category since = pd.to_datetime(data[['start_date']].iloc[0, 0]) # if the earliest date is after until, then we error if self.convert_to_ISO(since) > self.convert_to_ISO(until): raise TypeError('Please check your inputs. For {} we found that you tried refreshing' \ 'to a date earlier than the earliest date in the file.'.format(category)) else: mode_dict[category] = 'w' else: mode_dict[category] = 'a' else: since=None mode_dict[category] = 'w' export_dict[category] = (since, until) job_id_dict, request_url_dict = self._export_request_parallel(export_dict=export_dict) data = requests.get(self.url + 'export', headers=self.params).json() for category in categories: if not category in request_url_dict: raise TypeError('Uh-oh! Something seems to have gone wrong.' \ 'Please check the inputs to the function and try again.' \ ' Looks as if category {} caused the problem'.format(category)) content = requests.get(request_url_dict[category], headers=self.params) output = json.load(io.StringIO(content.text))['jobRecord']['output'] for n_output, data_chunk in enumerate(output): content = requests.get(data_chunk['url'], headers=self.params) sys.stdout.write('\r') sys.stdout.write("For {}, exporting {}/{}".format(category, n_output + 1, len(output))) sys.stdout.flush() if content.status_code != 200: sys.stdout.write('\n') sys.stdout.write('\r') sys.stdout.write('Fail, Response code {} for category {}'.format(content.status_code, category)) sys.stdout.write('\n') sys.stdout.flush() else: current_data = pd.read_csv(io.StringIO(content.text)) if Path(save_path + category + '.csv').exists(): data_to_save = pd.read_csv(save_path + category + '.csv', index_col=0) data_to_save = data_to_save.append(current_data, ignore_index=True) data_to_save = data_to_save.drop_duplicates(ignore_index=True) else: data_to_save = current_data ''' header = (not Path(save_path + category + '.csv').exists()) or mode_dict[category] == 'w' # checking whether the first line is a duplicate of the end of the previous file if np.all(current_data[['start_date', 'id']].iloc[0, :] == last_rows[category]): current_data.iloc[1:, :].reset_index(drop=True).to_csv(save_path + category + '.csv', mode=mode_dict[category], header=header) else: current_data.to_csv(save_path + category + '.csv', mode=mode_dict[category], header=header) ''' data_to_save.to_csv(save_path + category + '.csv', mode='w', header=True) sys.stdout.write('\n') print('Success') return def get_category_names(self, measurement_name='all'): ''' This function allows you to get the category names from a given measurement name. Arguments --------- - measurement_name: str: This is the name of the measurement that you want to get the categories for. The default 'all' returns all the possible measurement names. Returns --------- - out: list of strings: This is a list that contains the category names that can be used in the export function. ''' if measurement_name == 'all': out = [] for value in self.get_info()['Categories'].values(): out.extend(list(value.keys())) else: out = list(self.get_info()['Categories'][measurement_name].keys()) return out def get_group_names(self): ''' This function allows you to view the names of the sets of measurements that can be downloaded from minder. Returns --------- - out: list of strings: This is a list that contains the names of the sets of measurements. ''' out = self.get_info()['Categories'].keys() return list(out) @staticmethod def token(): ''' This function returns the current user token. This is the token that is saved in the file token_real.json after running the token_save function in settings. Returns --------- - token: string: This returns the token in the format that can be used in the api call. ''' token_dir = token_path with open(token_dir) as json_file: api_keys = json.load(json_file) # with open('./token_real.json', 'r') as f: # api_keys = json.loads(f.read()) return api_keys['token'] @staticmethod def convert_to_ISO(date): ''' Converts the date to ISO. Arguments --------- - data: valid input to pd.to_datetime(.): This is the date that you want to convert. Returns --------- - out: date: This is the date converted to ISO. ''' date = pd.to_datetime(date) return date.strftime('%Y-%m-%dT%H:%M:%S.000Z')Static methods
def convert_to_ISO(date)-
Converts the date to ISO.
Arguments
- data: valid input to pd.to_datetime(.): This is the date that you want to convert.
Returns
- out: date: This is the date converted to ISO.
Expand source code
@staticmethod def convert_to_ISO(date): ''' Converts the date to ISO. Arguments --------- - data: valid input to pd.to_datetime(.): This is the date that you want to convert. Returns --------- - out: date: This is the date converted to ISO. ''' date = pd.to_datetime(date) return date.strftime('%Y-%m-%dT%H:%M:%S.000Z') def token()-
This function returns the current user token. This is the token that is saved in the file token_real.json after running the token_save function in settings.
Returns
- token: string: This returns the token in the format that can be used in the api call.
Expand source code
@staticmethod def token(): ''' This function returns the current user token. This is the token that is saved in the file token_real.json after running the token_save function in settings. Returns --------- - token: string: This returns the token in the format that can be used in the api call. ''' token_dir = token_path with open(token_dir) as json_file: api_keys = json.load(json_file) # with open('./token_real.json', 'r') as f: # api_keys = json.loads(f.read()) return api_keys['token']
Methods
def export(self, since=None, until=None, reload=True, categories='all', save_path='./data/raw_data/', append=True, export_index=None)-
This is a function that is able to download the data and save it as a csv in save_path.
Note that
categoriesrefers to the datasets. If you want to get the categories for a given set of measurements (ie: activity, care, vital signs, etc) please use the method.get_category_names('measurement_name'). Alternatively, if you want to view all of the available datasets, please use the method.get_category_names('all')If the data files already exist, the new data will be appended to the end. Be careful, this can cause duplicates! To avoid this, use the
.refresh()function or useappend = FalseArguments
-
since: valid input to pd.to_datetime(.): This is the date and time from which the data will be loaded. If
None, the earliest possible date is used. Default:None -
until: valid input to pd.to_datetime(.): This is the date and time to which the data will be loaded up until. If
None, the latest possible date is used. Default:None -
reload: bool: This value determines whether an export request should be sent. In most cases, this should be
True, unless you want to download the data from a previously run request. Default:True -
categories: list or string: If a list, this is the datasets that will be downloaded. Please use the dataset names that can be returned by using the get_category_names function. If the string 'all' is supplied, this function will return all of the data. This is not good! There should be a good reason to do this. Default:
'all' -
save_path: string: This is the save path for the data that is downloaded from minder. Default:
'./data/raw_data/' -
append: bool: If
True, the downloaded data will be appended to the previous data, if it exists. IfFalse, the previous data will be overwritten if it exists. -
export_index: integer: You may use this argument to download a previous request.
-1will download the most recent request. This argument will over rule thereloadargument. Defaults toNone.
Expand source code
def export(self, since=None, until=None, reload=True, categories='all', save_path='./data/raw_data/', append=True, export_index=None): ''' This is a function that is able to download the data and save it as a csv in save_path. Note that ```categories``` refers to the datasets. If you want to get the categories for a given set of measurements (ie: activity, care, vital signs, etc) please use the method ```.get_category_names('measurement_name')```. Alternatively, if you want to view all of the available datasets, please use the method ```.get_category_names('all')``` If the data files already exist, the new data will be appended to the end. Be careful, this can cause duplicates! To avoid this, use the ```.refresh()``` function or use ```append = False``` Arguments --------- - since: valid input to pd.to_datetime(.): This is the date and time from which the data will be loaded. If ```None```, the earliest possible date is used. Default: ```None``` - until: valid input to pd.to_datetime(.): This is the date and time to which the data will be loaded up until. If ```None```, the latest possible date is used. Default: ```None``` - reload: bool: This value determines whether an export request should be sent. In most cases, this should be ```True```, unless you want to download the data from a previously run request. Default: ```True``` - categories: list or string: If a list, this is the datasets that will be downloaded. Please use the dataset names that can be returned by using the get_category_names function. If the string 'all' is supplied, this function will return all of the data. This is not good! There should be a good reason to do this. Default: ```'all'``` - save_path: string: This is the save path for the data that is downloaded from minder. Default: ```'./data/raw_data/'``` - append: bool: If ```True```, the downloaded data will be appended to the previous data, if it exists. If ```False```, the previous data will be overwritten if it exists. - export_index: integer: You may use this argument to download a previous request. ```-1``` will download the most recent request. This argument will over rule the ```reload``` argument. Defaults to ```None```. ''' save_path = reformat_path(save_path) p = Path(save_path) if not p.exists(): print('Target directory does not exist, creating a new folder') save_mkdir(save_path) if export_index is None: if reload: self._export_request(categories=categories, since=since, until=until) data = requests.get(self.url + 'export', headers=self.params).json() export_index = -1 if export_index is None else export_index if export_index is None: if not reload: if len(data) > 1: print('Multiple export requests exist, please choose one to download') for idx, job in enumerate(data): print('Job {} '.format(idx).center(50, '=')) print('ID: ', job['id']) print('Transaction Time', job['jobRecord']['transactionTime']) print('Export sensors: ', end='') for record in job['jobRecord']['output']: print(record['type'], end=' ') print('') export_index = int(input('Enter the index of the job ...')) while export_index not in range(len(data)): print('Not a valid input') export_index = int(input('Enter the index of the job ...')) print('Start to export job') categories_downloaded = [] for idx, record in enumerate(data[export_index]['jobRecord']['output']): print('Exporting {}/{}'.format(idx + 1, len(data[export_index]['jobRecord']['output'])).ljust(20, ' '), str(record['type']).ljust(20, ' '), end=' ') content = requests.get(record['url'], headers=self.params) if content.status_code != 200: print('Fail, Response code {}'.format(content.status_code)) else: if record['type'] in categories_downloaded: mode = 'a' header = False else: mode = 'a' if append else 'w' header = not Path(os.path.join(save_path, record['type'] + '.csv')).exists() or mode == 'w' pd.read_csv(io.StringIO(content.text)).to_csv(os.path.join(save_path, record['type'] + '.csv'), mode=mode, header=header, index=False) categories_downloaded.append(record['type']) print('Success') -
def get_category_names(self, measurement_name='all')-
This function allows you to get the category names from a given measurement name.
Arguments
- measurement_name: str: This is the name of the measurement that you want to get the categories for. The default 'all' returns all the possible measurement names.
Returns
- out: list of strings: This is a list that contains the category names that can be used in the export function.
Expand source code
def get_category_names(self, measurement_name='all'): ''' This function allows you to get the category names from a given measurement name. Arguments --------- - measurement_name: str: This is the name of the measurement that you want to get the categories for. The default 'all' returns all the possible measurement names. Returns --------- - out: list of strings: This is a list that contains the category names that can be used in the export function. ''' if measurement_name == 'all': out = [] for value in self.get_info()['Categories'].values(): out.extend(list(value.keys())) else: out = list(self.get_info()['Categories'][measurement_name].keys()) return out def get_group_names(self)-
This function allows you to view the names of the sets of measurements that can be downloaded from minder.
Returns
- out: list of strings: This is a list that contains the names of the sets of measurements.
Expand source code
def get_group_names(self): ''' This function allows you to view the names of the sets of measurements that can be downloaded from minder. Returns --------- - out: list of strings: This is a list that contains the names of the sets of measurements. ''' out = self.get_info()['Categories'].keys() return list(out) def get_info(self)-
This function returns the available datasets on minder in the form of a dictionary
Returns
- _: dict: This returns a dictionary of the available datasets.
Expand source code
def get_info(self): ''' This function returns the available datasets on minder in the form of a dictionary Returns --------- - _: dict: This returns a dictionary of the available datasets. ''' print('Sending Request...') r = requests.get(self.url + 'info/datasets', headers=self.params) if r.status_code in [401, 403]: raise TypeError('Authentication failed!'\ ' Please check your token - it might be out of date. '\ 'You might also not have authorization to complete your request.') try: return r.json() except json.decoder.JSONDecodeError: print('Get response ', r) def refresh(self, until=None, categories=None, save_path='./data/raw_data/')-
This function allows for the user to refresh the data currently saved in the save path. It will download the data missing between the saved files and the
untilargument.Arguments
-
until: valid input to pd.to_datetime(.): This is the date and time to which the data will be loaded up until. If
None, the latest possible date is used. Default:None -
categories: list or string: If a list, this is the datasets that will be downloaded. Please use the dataset names that can be returned by using the get_category_names function. If a string is given, only this dataset will be refreshed.
-
save_path: string: This is the save path for the data that is downloaded from minder. Default:
'./data/raw_data/'
Expand source code
def refresh(self, until=None, categories=None, save_path='./data/raw_data/'): ''' This function allows for the user to refresh the data currently saved in the save path. It will download the data missing between the saved files and the ```until``` argument. Arguments --------- - until: valid input to pd.to_datetime(.): This is the date and time to which the data will be loaded up until. If ```None```, the latest possible date is used. Default: ```None``` - categories: list or string: If a list, this is the datasets that will be downloaded. Please use the dataset names that can be returned by using the get_category_names function. If a string is given, only this dataset will be refreshed. - save_path: string: This is the save path for the data that is downloaded from minder. Default: ```'./data/raw_data/'``` ''' if until is None: until = datetime.now() save_path = reformat_path(save_path) if categories is None: raise TypeError('Please supply at least one category...') if type(categories) == str: if categories == 'all': categories = self.get_category_names('all') else: categories = [categories] export_dict = {} mode_dict = {} print('Checking current files...') last_rows = {} for category in categories: file_path = os.path.join(save_path, category) p = Path(file_path + '.csv') if not p.exists(): since = None else: data = pd.read_csv(file_path + '.csv') if 'start_date' in data.columns: # add the following to avoid a duplicate of the last and first row last_rows[category] = data[['start_date', 'id']].iloc[-1, :].to_numpy() since = pd.to_datetime(data['start_date'].loc[data['start_date'].last_valid_index()]) if self.convert_to_ISO(since) > self.convert_to_ISO(until): # change since to earliest date and overwrite all data for this category since = pd.to_datetime(data[['start_date']].iloc[0, 0]) # if the earliest date is after until, then we error if self.convert_to_ISO(since) > self.convert_to_ISO(until): raise TypeError('Please check your inputs. For {} we found that you tried refreshing' \ 'to a date earlier than the earliest date in the file.'.format(category)) else: mode_dict[category] = 'w' else: mode_dict[category] = 'a' else: since=None mode_dict[category] = 'w' export_dict[category] = (since, until) job_id_dict, request_url_dict = self._export_request_parallel(export_dict=export_dict) data = requests.get(self.url + 'export', headers=self.params).json() for category in categories: if not category in request_url_dict: raise TypeError('Uh-oh! Something seems to have gone wrong.' \ 'Please check the inputs to the function and try again.' \ ' Looks as if category {} caused the problem'.format(category)) content = requests.get(request_url_dict[category], headers=self.params) output = json.load(io.StringIO(content.text))['jobRecord']['output'] for n_output, data_chunk in enumerate(output): content = requests.get(data_chunk['url'], headers=self.params) sys.stdout.write('\r') sys.stdout.write("For {}, exporting {}/{}".format(category, n_output + 1, len(output))) sys.stdout.flush() if content.status_code != 200: sys.stdout.write('\n') sys.stdout.write('\r') sys.stdout.write('Fail, Response code {} for category {}'.format(content.status_code, category)) sys.stdout.write('\n') sys.stdout.flush() else: current_data = pd.read_csv(io.StringIO(content.text)) if Path(save_path + category + '.csv').exists(): data_to_save = pd.read_csv(save_path + category + '.csv', index_col=0) data_to_save = data_to_save.append(current_data, ignore_index=True) data_to_save = data_to_save.drop_duplicates(ignore_index=True) else: data_to_save = current_data ''' header = (not Path(save_path + category + '.csv').exists()) or mode_dict[category] == 'w' # checking whether the first line is a duplicate of the end of the previous file if np.all(current_data[['start_date', 'id']].iloc[0, :] == last_rows[category]): current_data.iloc[1:, :].reset_index(drop=True).to_csv(save_path + category + '.csv', mode=mode_dict[category], header=header) else: current_data.to_csv(save_path + category + '.csv', mode=mode_dict[category], header=header) ''' data_to_save.to_csv(save_path + category + '.csv', mode='w', header=True) sys.stdout.write('\n') print('Success') return -