Skip to content
Snippets Groups Projects
Commit 8bd7667f authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Added filtering functionality to task module

parent cc4f17af
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -4,7 +4,7 @@ import abc
import json
import re
import logging
from typing import List
from typing import List, Tuple
from pandas import read_csv, DataFrame
from celery import Task
......@@ -47,7 +47,8 @@ class AnalyticTask(Task, metaclass=abc.ABCMeta):
"""
pass
def secure_load(self, file_path: str) -> DataFrame:
@staticmethod
def secure_load(file_path: str) -> DataFrame:
"""Decrypt data so they can be loaded into a pandas data frame.
:param file_path: The location of the encrypted file.
:return: The decrypted file loaded into a pandas data frame.
......@@ -62,14 +63,16 @@ class AnalyticTask(Task, metaclass=abc.ABCMeta):
df = DataFrame.from_dict(data)
return df
def data_task_id_to_data_frame(self, data_task_id,
session_data_tasks, decrypt):
def data_task_id_to_data_frame(
self, data_task_id: str,
session_data_tasks: List[str], decrypt: bool) -> DataFrame:
"""Attempts to load the data frame associated with the provided data id.
:param data_task_id: The data id associated with the previously loaded
data.
:param session_data_tasks: A list of data tasks previously executed by
this the requesting session. This is used for permission checks.
:param decrypt: Specify whether the data have to be decrypted for usage.
only part of the data, for instance some genes out of thousands.
:return: A pandas data frame associated with the data id.
"""
if data_task_id not in session_data_tasks:
......@@ -97,6 +100,47 @@ class AnalyticTask(Task, metaclass=abc.ABCMeta):
df = read_csv(file_path)
return df
@staticmethod
def apply_filters(df: DataFrame, filters: dict) -> DataFrame:
"""Apply filter to data frame and return it.
:param df: The data frame.
:param filters: The filters where each key is represents a column in the
data frame and the value a list of values to keep.
:return: Filtered data frame.
"""
for key in filters:
df = df[df[key].isin(filters[key])]
return df
@staticmethod
def contains_data_task_id(value: str) -> bool:
"""Check whether the given string represents
a special data id arguments.
:param value: The string to test.
:return: True if argument contains data_task_id.
"""
return value.startswith('$') and value.endswith('$')
@staticmethod
def parse_value(value: str) -> Tuple[str, dict]:
"""Extract data task id and filters from the string.
:param value: A string that contains a data task id.
:return: A tuple of id and filters to apply later.
"""
value = value[1:-1]
# noinspection PyBroadException
try:
value = json.loads(value)
data_task_id = value['id']
filters = value.get('filters')
except Exception:
logger.warning("Failed to parse value. "
"Fallback assumption is that it contains the id "
"but nothing else.")
data_task_id = value
filters = None
return data_task_id, filters
def prepare_args(self, session_data_tasks: List[str],
args: dict, decrypt: bool) -> dict:
"""Replace data task ids in the arguments with their associated
......@@ -107,30 +151,38 @@ class AnalyticTask(Task, metaclass=abc.ABCMeta):
:param decrypt: Indicates whether cache must be decrypted to be used.
:return: The new parsed arguments
"""
arguments = {}
parsed_args = {}
for arg in args:
value = args[arg]
# value is data id
if (isinstance(value, str) and value and
value.startswith('$') and value.endswith('$')):
data_task_id = value[1:-1]
value = self.data_task_id_to_data_frame(
if isinstance(value, str) and self.contains_data_task_id(value):
data_task_id, filters = self.parse_value(value)
df = self.data_task_id_to_data_frame(
data_task_id, session_data_tasks, decrypt)
if filters:
df = self.apply_filters(df, filters)
value = df
# value is list containing data ids
if (isinstance(value, list) and value and
isinstance(value[0], str) and
value[0].startswith('$') and value[0].endswith('$')):
data_task_ids = [el[1:-1] for el in value]
if (isinstance(value, list) and
value and self.contains_data_task_id(value[0])):
dfs = []
for data_task_id in data_task_ids:
for el in value:
data_task_id, filters = self.parse_value(el)
df = self.data_task_id_to_data_frame(
data_task_id, session_data_tasks, decrypt)
if filters:
df = self.apply_filters(df, filters)
dfs.append(df)
value = dfs
arguments[arg] = value
return arguments
def task_result_to_json(self, result: dict) -> str:
parsed_args[arg] = value
return parsed_args
@staticmethod
def task_result_to_json(result: dict) -> str:
"""Transform task result to JSON so we can send it as a response.
:param result: The return value of main()
:return: A string that can be parsed by the front-end for instance.
......
......@@ -3,7 +3,6 @@ test = pytest
[tool:pytest]
addopts =
--cov=fractalis
--capture=no
--color=yes
--verbose
......
......@@ -30,7 +30,6 @@ setup(
],
tests_require=[
'pytest==3.0.3',
'pytest-cov',
'pytest-mock',
'responses'
]
......
......@@ -300,7 +300,7 @@ class TestAnalytics:
assert new_body['state'] == 'SUCCESS', new_body
assert len(json.loads(json.loads(new_body['result'])['df'])) == 30
def test_can_handle_empty_df_list(self, test_client, small_data_post):
def test_can_handle_empty_df_list(self, test_client):
rv = test_client.post('/analytics', data=flask.json.dumps(dict(
task_name='merge_df_task',
args={'df_list': []}
......
"""This module provides tests for the AnalyticsTask class."""
import pandas as pd
from fractalis.analytics.task import AnalyticTask
# noinspection PyMissingOrEmptyDocstring,PyMissingTypeHints
class MockTask(AnalyticTask):
@property
def name(self) -> str:
return ''
def main(self, *args, **kwargs) -> dict:
pass
# noinspection PyMissingOrEmptyDocstring,PyMissingTypeHints
class TestAnalyticsTask:
task = MockTask()
def test_apply_filter(self):
df = pd.DataFrame([[1, 2, 3], [4, 5, 6], [7, 8, 9]],
columns=['A', 'B', 'C'])
filters = {'A': [1, 7], 'B': [2, 5, 8], 'C': [6, 9]}
df = self.task.apply_filters(df=df, filters=filters)
assert df.shape == (1, 3)
assert df.iloc[0].tolist() == [7, 8, 9]
def test_parse_value(self):
arg1 = '${"id": 123, "filters": {"foo": [1,2]}}$'
arg2 = '$123$'
data_task_id, filters = self.task.parse_value(arg1)
assert data_task_id == 123
assert 'foo' in filters
assert filters['foo'] == [1, 2]
data_task_id, filters = self.task.parse_value(arg2)
assert data_task_id == 123
assert not filters
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment