Skip to content
Snippets Groups Projects
Commit 557dcb4c authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Added some documentation

parent 994c7135
No related branches found
No related tags found
No related merge requests found
Pipeline #
"""This module provides AnalyticTask, which is a modification of a standard
Celery task tailored to Fractalis."""
import abc
import json
import re
import logging
import time
from typing import List
import pandas as pd
from celery import Task
......@@ -14,25 +16,45 @@ logger = logging.getLogger(__name__)
class AnalyticTask(Task, metaclass=abc.ABCMeta):
"""AnalyticTask is a tailored Celery Task that enforces a certain pattern
for all Fractalis analytic tasks and provides certain functionality like
the parsing of the arguments before submitting it to celery.
"""
@property
@abc.abstractmethod
def name(self):
def name(self) -> str:
"""The name of the task."""
pass
@staticmethod
def factory(task_name):
def factory(task_name: str) -> 'AnalyticTask':
"""Initialize the correct task based on the given arguments.
:param task_name: The name of the task to initialize.
:return: An initialized child of this class.
"""
from . import TASK_REGISTRY
for task in TASK_REGISTRY:
if task.name == task_name:
return task()
@abc.abstractmethod
def main(self):
def main(self) -> dict:
"""Since we hijack run(), we need a new entry point for every task.
This method is called by our modified run method with all parsed
arguments.
:return A dict containing the results of the task.
"""
pass
@staticmethod
def prepare_args(data_tasks, args):
def prepare_args(data_tasks: List[str], args: dict) -> dict:
"""Replace data task ids in the arguments with their associated
data frame located on the file system.
:param data_tasks: We use this list to check access.
:param args: The arguments submitted to run()
:return: The new parsed arguments
"""
arguments = {}
for arg in args:
value = args[arg]
......@@ -63,7 +85,13 @@ class AnalyticTask(Task, metaclass=abc.ABCMeta):
arguments[arg] = value
return arguments
def run(self, data_tasks, args):
def run(self, data_tasks: List[str], args: dict) -> str:
"""This is called by the celery worker. This method calls other helper
methods to prepare and validate the in and output of a task.
:param data_tasks: List of data task ids from session to check access.
:param args: This is the dict of arguments submitted to the task.
:return: The result of the task.
"""
arguments = self.prepare_args(data_tasks, args)
result = self.main(**arguments)
try:
......
......@@ -7,7 +7,7 @@ from fractalis.data.etlhandler import ETLHandler
class AdaHandler(ETLHandler):
"""This ETLHandler provides integration with ADA.
'Ada provides key infrastructure for secured integration, vizualization,
'Ada provides key infrastructure for secured integration, visualization,
and analysis of anonymized clinical and experimental data stored in CSV
and tranSMART format, or provided by RedCAP and Synapse apps.'
......
"""This module provides tests for the session interface of Fractalis."""
from uuid import UUID
from time import sleep
......@@ -6,6 +7,7 @@ import flask
from redis import StrictRedis
# noinspection PyMissingOrEmptyDocstring,PyMissingTypeHints
class TestSession(object):
@pytest.fixture(scope='module')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment