Skip to content
Snippets Groups Projects
Commit f4c8ece9 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

refactored correlation analysis

parent 5643abc5
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -33,7 +33,7 @@ def create_task() -> Tuple[Response, int]:
"'{}'".format(json['task_name']))
return jsonify({'error_msg': "Task with name '{}' not found."
.format(json['task_name'])}), 400
async_result = analytic_task.delay(data_tasks=session['data_tasks'],
async_result = analytic_task.delay(session_data_tasks=session['data_tasks'],
args=json['args'])
session['analytic_tasks'].append(async_result.id)
logger.debug("Task successfully submitted. Sending response.")
......
......@@ -47,11 +47,35 @@ class AnalyticTask(Task, metaclass=abc.ABCMeta):
pass
@staticmethod
def prepare_args(data_tasks: List[str], args: dict) -> dict:
def data_task_id_to_data_frame(data_task_id, session_data_tasks):
if data_task_id not in session_data_tasks:
error = "No permission to use data_task_id '{}' " \
"for analysis".format(data_task_id)
logger.error(error)
raise PermissionError(error)
entry = redis.get('data:{}'.format(data_task_id))
if not entry:
error = "The key '{}' does not match any entry in Redis. " \
"Value probably expired.".format(data_task_id)
logger.error(error)
raise LookupError(error)
data_state = json.loads(entry)
if not data_state['loaded']:
error = "The data task '{}' has not been loaded, yet. " \
"Wait for it to complete before using it in an " \
"analysis task.".format(data_task_id)
logger.error(error)
raise ValueError(error)
file_path = data_state['file_path']
df = read_csv(file_path)
return df
def prepare_args(self, session_data_tasks: List[str], args: dict) -> dict:
"""Replace data task ids in the arguments with their associated
data frame located on the file system.
data frame located on the file system. This currently works for non
nested strings and non nested lists containing strings.
:param data_tasks: We use this list to check access.
:param session_data_tasks: We use this list to check access.
:param args: The arguments submitted to run().
:return: The new parsed arguments
"""
......@@ -61,38 +85,28 @@ class AnalyticTask(Task, metaclass=abc.ABCMeta):
if (isinstance(value, str) and
value.startswith('$') and value.endswith('$')):
data_task_id = value[1:-1]
if data_task_id not in data_tasks:
error = "No permission to use data_task_id '{}' " \
"for analysis".format(data_task_id)
logger.error(error)
raise PermissionError(error)
entry = redis.get('data:{}'.format(data_task_id))
if not entry:
error = "The key '{}' does not match any entry in Redis. " \
"Value probably expired.".format(data_task_id)
logger.error(error)
raise LookupError(error)
data_state = json.loads(entry)
if not data_state['loaded']:
error = "The data task '{}' has not been loaded, yet. " \
"Wait for it to complete before using it in an " \
"analysis task.".format(data_task_id)
logger.error(error)
raise ValueError(error)
file_path = data_state['file_path']
value = read_csv(file_path)
value = self.data_task_id_to_data_frame(
data_task_id, session_data_tasks)
if (isinstance(value, list) and
value[0].startswith('$') and value[0].endswith('$')):
data_task_ids = [el[1:-1] for el in value]
dfs = []
for data_task_id in data_task_ids:
df = self.data_task_id_to_data_frame(data_task_id,
session_data_tasks)
dfs.append(df)
value = dfs
arguments[arg] = value
return arguments
def run(self, data_tasks: List[str], args: dict) -> str:
def run(self, session_data_tasks: List[str], args: dict) -> str:
"""This is called by the celery worker. This method calls other helper
methods to prepare and validate the in and output of a task.
:param data_tasks: List of data task ids from session to check access.
:param session_data_tasks: List of data task ids from session to check access.
:param args: The dict of arguments submitted to the task.
:return: The result of the task.
"""
arguments = self.prepare_args(data_tasks, args)
arguments = self.prepare_args(session_data_tasks, args)
result = self.main(**arguments)
try:
if type(result) != dict:
......
from typing import List
from functools import reduce
import pandas as pd
import numpy as np
......@@ -11,8 +12,14 @@ class CorrelationTask(AnalyticTask):
name = 'compute-correlation'
def main(self, x: pd.DataFrame, y: pd.DataFrame, id_filter: List[str],
method: str, subsets: List[List[str]]) -> dict:
def main(self,
x: pd.DataFrame,
y: pd.DataFrame,
id_filter: List[str],
method: str,
subsets: List[List[str]],
annotations: List[pd.DataFrame]) -> dict:
if x.shape[0] == 0 or y.shape[0] == 0:
raise ValueError("X or Y contain no data.")
if x.shape[1] < 2 or y.shape[1] < 2:
......@@ -20,49 +27,76 @@ class CorrelationTask(AnalyticTask):
if method not in ['pearson', 'spearman', 'kendall']:
raise ValueError("Unknown method '{}'".format(method))
df = pd.merge(x, y, on='id')
df = self.merge_x_y(x, y)
x_label = list(df)[1]
y_label = list(df)[2]
df = self.apply_id_filter(df, id_filter)
df = self.apply_subsets(df, subsets)
df = self.apply_annotations(annotations, df)
global_stats = self.compute_stats(df, x_label, y_label)
subset_dfs = [df[df['subset'] == i] for i in range(len(subsets) or 1)]
subset_stats = [self.compute_stats(subset_df, x_label, y_label)
for subset_df in subset_dfs]
output = global_stats
output['subsets'] = subset_stats
output['method'] = method
output['data'] = df.to_json()
output['x_label'] = x_label
output['y_label'] = y_label
return output
@staticmethod
def merge_x_y(x: pd.DataFrame, y: pd.DataFrame) -> pd.DataFrame:
df = x.merge(y, on='id', how='inner')
df = df.dropna()
if df.shape[0] == 0:
raise ValueError("X and Y do not share any ids.")
return df
@staticmethod
def apply_id_filter(df: pd.DataFrame, id_filter: list) -> pd.DataFrame:
if id_filter:
df = df[df['id'].isin(id_filter)]
if df.shape[0] == 0:
raise ValueError("The current selection does not match any data.")
return df
@staticmethod
def apply_subsets(df: pd.DataFrame,
subsets: List[List[str]]) -> pd.DataFrame:
if not subsets:
subsets = [df['id']]
output = {
'subsets': {}
}
_df = pd.DataFrame()
for i, subset in enumerate(subsets):
df_subset = df[df['id'].isin(subset)]
subset_col = pd.Series([i] * df_subset.shape[0])
df_subset = df_subset.assign(subset=subset_col)
output['subsets'][i] = self.compute_stats(df_subset)
_df = _df.append(df_subset)
df = _df
del _df
if df.shape[0] == 0:
if _df.shape[0] == 0:
raise ValueError("No data match given subsets. Keep in mind that X "
"and Y are intersected before the subsets are "
"applied.")
global_stats = self.compute_stats(df.drop_duplicates('id'))
output.update(global_stats)
output['method'] = method
output['data'] = df.to_json()
df = df.drop('id', 1)
output['x_label'] = list(df)[0]
output['y_label'] = list(df)[1]
return output
return _df
@staticmethod
def apply_annotations(annotations: List[pd.DataFrame],
df: pd.DataFrame) -> pd.DataFrame:
if annotations:
annotation_data = reduce(
lambda l, r: l.merge(r, on='id', how='outer'), annotations)\
.drop('id', axis=1)\
.apply(
lambda row: ' & '.join(list(map(str, row.tolist()))), axis=1)
annotation_df = pd.DataFrame(columns=['annotation'],
data=annotation_data)
df = df.merge(annotation_df, on='id', how='left')
return df
@staticmethod
def compute_stats(df: pd.DataFrame) -> dict:
def compute_stats(df: pd.DataFrame, x_label: str, y_label: str) -> dict:
df = df.drop_duplicates('id')
df = df[[x_label, y_label]]
if df.shape[0] < 2:
return {
'coef': float('nan'),
......@@ -70,9 +104,8 @@ class CorrelationTask(AnalyticTask):
'slope': float('nan'),
'intercept': float('nan')
}
df = df.drop('id', 1)
x_list = df.ix[:, 0].values.tolist()
y_list = df.ix[:, 1].values.tolist()
x_list = df[x_label].values.tolist()
y_list = df[y_label].values.tolist()
corr_coef, p_value = stats.pearsonr(x_list, y_list)
slope, intercept, *_ = np.polyfit(x_list, y_list, deg=1)
return {
......
......@@ -10,14 +10,18 @@ from fractalis.analytics.tasks.correlation.main import CorrelationTask
# noinspection PyMissingOrEmptyDocstring,PyMissingTypeHints
class TestCorrelation:
def test_returns_expected_output_1(self):
def test_functional_1(self):
task = CorrelationTask()
arr_1 = np.c_[range(20), np.random.randint(0, 100, size=(20, 1))]
arr_2 = np.c_[range(20), np.random.randint(0, 100, size=(20, 1))]
x = pd.DataFrame(arr_1, columns=['id', 'A'])
y = pd.DataFrame(arr_2, columns=['id', 'B'])
result = task.main(x=x, y=y, id_filter=[],
method='pearson', subsets=[list(range(20))])
result = task.main(x=x,
y=y,
id_filter=[],
method='pearson',
subsets=[list(range(20))],
annotations=[])
assert result['coef']
assert result['p_value']
assert result['slope']
......@@ -28,107 +32,147 @@ class TestCorrelation:
assert result['x_label'] == 'A'
assert result['y_label'] == 'B'
def test_returns_expected_output_2(self):
def test_functional_2(self):
task = CorrelationTask()
arr_1 = np.c_[range(20), np.random.randint(0, 100, size=(20, 1))]
arr_2 = np.c_[range(20), np.random.randint(0, 100, size=(20, 1))]
x = pd.DataFrame(arr_1, columns=['id', 'A'])
y = pd.DataFrame(arr_2, columns=['id', 'B'])
result = task.main(x=x, y=y, id_filter=list(range(10)),
method='pearson', subsets=[list(range(5, 15))])
result = task.main(x=x,
y=y,
id_filter=list(range(10)),
method='pearson',
subsets=[list(range(5, 15))],
annotations=[])
df = json.loads(result['data'])
assert len(df['id']) == 5
def test_returns_expected_output_3(self):
def test_functional_3(self):
task = CorrelationTask()
arr_1 = np.c_[range(20), np.random.randint(0, 100, size=(20, 1))]
arr_2 = np.c_[range(20), np.random.randint(0, 100, size=(20, 1))]
x = pd.DataFrame(arr_1, columns=['id', 'A'])
y = pd.DataFrame(arr_2, columns=['id', 'B'])
result_1 = task.main(x=x, y=y, id_filter=list(range(20)),
method='pearson', subsets=[])
result_2 = task.main(x=x, y=y, id_filter=list(range(20)),
method='pearson', subsets=[list(range(20))])
result_1 = task.main(x=x,
y=y,
id_filter=list(range(20)),
method='pearson',
subsets=[],
annotations=[])
result_2 = task.main(x=x,
y=y,
id_filter=list(range(20)),
method='pearson',
subsets=[list(range(20))],
annotations=[])
assert result_1 == result_2
def test_returns_expected_output_4(self):
def test_functional_4(self):
task = CorrelationTask()
arr_1 = np.c_[range(20), np.random.randint(0, 100, size=(20, 1))]
arr_2 = np.c_[range(20), np.random.randint(0, 100, size=(20, 1))]
x = pd.DataFrame(arr_1, columns=['id', 'A'])
y = pd.DataFrame(arr_2, columns=['id', 'B'])
with pytest.raises(ValueError):
task.main(x=x, y=y, id_filter=[],
method='foo', subsets=[list(range(20))])
task.main(x=x,
y=y,
id_filter=[],
method='foo',
subsets=[list(range(20))],
annotations=[])
def test_returns_expected_output_5(self):
def test_functional_5(self):
task = CorrelationTask()
arr_1 = np.c_[range(20), np.random.randint(0, 100, size=(20, 1))]
arr_2 = np.c_[range(20), np.random.randint(0, 100, size=(20, 1))]
x = pd.DataFrame(arr_1, columns=['id', 'A'])
y = pd.DataFrame(arr_2, columns=['id', 'B'])
result = task.main(x=x, y=y, id_filter=[],
method='pearson', subsets=[list(range(15, 25))])
result = task.main(x=x,
y=y,
id_filter=[],
method='pearson',
subsets=[list(range(15, 25))],
annotations=[])
df = json.loads(result['data'])
assert len(df['id']) == 5
def test_returns_expected_output_6(self):
def test_functional_6(self):
task = CorrelationTask()
arr_1 = np.c_[range(20), np.random.randint(0, 100, size=(20, 1))]
arr_2 = np.c_[range(20, 40), np.random.randint(0, 100, size=(20, 1))]
x = pd.DataFrame(arr_1, columns=['id', 'A'])
y = pd.DataFrame(arr_2, columns=['id', 'B'])
with pytest.raises(ValueError):
task.main(x=x, y=y, id_filter=[],
method='pearson', subsets=[list(range(20))])
task.main(x=x,
y=y,
id_filter=[],
method='pearson',
subsets=[list(range(20))],
annotations=[])
def test_returns_expected_output_7(self):
def test_functional_7(self):
task = CorrelationTask()
arr_1 = np.c_[range(10), np.random.randint(0, 100, size=(10, 1))]
arr_2 = np.c_[range(5, 20), np.random.randint(0, 100, size=(15, 1))]
x = pd.DataFrame(arr_1, columns=['id', 'A'])
y = pd.DataFrame(arr_2, columns=['id', 'B'])
with pytest.raises(ValueError):
task.main(x=x, y=y, id_filter=[],
method='pearson', subsets=[list(range(10, 20))])
task.main(x=x,
y=y,
id_filter=[],
method='pearson',
subsets=[list(range(10, 20))],
annotations=[])
def test_returns_expected_output_8(self):
def test_functional_8(self):
task = CorrelationTask()
arr_1 = np.c_[range(10), np.random.randint(0, 100, size=(10, 1))]
arr_2 = np.c_[range(5, 20), np.random.randint(0, 100, size=(15, 1))]
x = pd.DataFrame(arr_1, columns=['id', 'A'])
y = pd.DataFrame(arr_2, columns=['id', 'B'])
result = task.main(x=x, y=y, id_filter=[],
method='pearson', subsets=[list(range(5, 20))])
result = task.main(x=x,
y=y,
id_filter=[],
method='pearson',
subsets=[list(range(5, 20))],
annotations=[])
df = json.loads(result['data'])
assert len(df['id']) == 5
def test_returns_expected_output_9(self):
def test_functional_9(self):
task = CorrelationTask()
arr_1 = np.c_[range(10), np.random.randint(0, 100, size=(10, 1))]
arr_2 = np.c_[range(5, 20), np.random.randint(0, 100, size=(15, 1))]
x = pd.DataFrame(arr_1, columns=['id', 'A'])
y = pd.DataFrame(arr_2, columns=['id', 'B'])
result = task.main(x=x, y=y, id_filter=[], method='pearson',
result = task.main(x=x,
y=y,
id_filter=[],
method='pearson',
subsets=[
list(range(5)),
list(range(5, 10)),
list(range(10, 20))
])
],
annotations=[])
assert not np.isnan(result['coef'])
assert len(result['subsets']) == 3
assert np.isnan(result['subsets'][0]['coef'])
assert not np.isnan(result['subsets'][1]['coef'])
assert np.isnan(result['subsets'][2]['coef'])
def test_returns_expected_output_10(self):
def test_functional_10(self):
task = CorrelationTask()
arr_1 = np.c_[range(2), np.random.randint(0, 100, size=(2, 1))]
arr_2 = np.c_[range(1, 3), np.random.randint(0, 100, size=(2, 1))]
x = pd.DataFrame(arr_1, columns=['id', 'A'])
y = pd.DataFrame(arr_2, columns=['id', 'B'])
result = task.main(x=x, y=y, id_filter=[], method='pearson',
subsets=[list(range(4))])
result = task.main(x=x,
y=y,
id_filter=[],
method='pearson',
subsets=[list(range(4))],
annotations=[])
df = json.loads(result['data'])
assert np.isnan(result['coef'])
assert len(df['id']) == 1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment