Skip to content
Snippets Groups Projects
Commit 1d35f503 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

fixing test

parent 4ee6b38f
No related branches found
No related tags found
No related merge requests found
Pipeline #
"""This module provides the ETL class"""
import abc
from celery import Task
......@@ -5,28 +7,57 @@ from pandas import DataFrame
class ETL(Task, metaclass=abc.ABCMeta):
"""This is an abstract class that implements a celery Task and provides a
factory method to create instances of implementations of itself. Its main
purpose is to manage extraction of the data from the target server. ETL
stands for (E)xtract (T)ransform (L)oad and not by coincidence similar named
methods can be found in this class.
"""
@property
@abc.abstractmethod
def name(self):
def name(self) -> str:
"""Used by celery to identify this task by name."""
pass
@property
@abc.abstractmethod
def _HANDLER(self):
def _handler(self) -> str:
"""Used by self.can_handle to check whether the current implementation
belongs to a certain handler. This is necessary to avoid conflicts with
other ETL with identical self.name field.
"""
pass
@property
@abc.abstractmethod
def _DATA_TYPE(self):
def _data_type(self) -> str:
"""Used by self.can_handle to check whether the current implementation
can handle given data type.
"""
pass
@classmethod
def can_handle(cls, handler, data_type):
return handler == cls._HANDLER and data_type == cls._DATA_TYPE
def can_handle(cls, handler: str, data_type: str) -> bool:
"""Check if the current implementation of ETL can handle given handler
and data type.
:param handler: Describes the handler. E.g.: transmart, ada
:param data_type: Describes the data type. E.g.: ldd, hdd
:return: True if implementation can handle given parameters.
"""
return handler == cls._handler and data_type == cls._data_type
@classmethod
def factory(cls, handler, data_type):
def factory(cls, handler: str, data_type: str) -> 'ETL':
"""Return an instance of the implementation ETL that can handle the given
parameters.
:param handler: Describes the handler. E.g.: transmart, ada
:param data_type: Describes the data type. E.g.: ldd, hdd
:return: An instance of an implementation of ETL that returns True for
self.can_handle
"""
from . import ETL_REGISTRY
for etl in ETL_REGISTRY:
if etl.can_handle(handler, data_type):
......@@ -36,20 +67,45 @@ class ETL(Task, metaclass=abc.ABCMeta):
.format(handler, data_type))
@abc.abstractmethod
def extract(self, server, token, descriptor):
def extract(self, server: str, token: str, descriptor: dict) -> object:
"""Extract the data via HTTP requests.
:param server: The server from which to extract from.
:param token: The token used for authentication.
:param descriptor: The descriptor containing all necessary information
to download the data.
"""
pass
@abc.abstractmethod
def transform(self, raw_data):
def transform(self, raw_data: object) -> DataFrame:
"""Transform the data into a pandas.DataFrame with a naming according to
the Fractalis standard format.
:param raw_data: The data to transform.
"""
pass
def load(self, data_frame, file_path):
def load(self, data_frame: DataFrame, file_path: str):
"""Load (save) the data to the file system.
:param data_frame: DataFrame to write.
:param file_path: Path to write to.
"""
data_frame.to_csv(file_path)
def validate_state(self):
return True
def run(self, server: str, token: str,
descriptor: dict, file_path: str):
"""Run the current task.
This is called by the celery worker.
Only overwrite this method if you really know what you are doing.
def run(self, server, token, descriptor, file_path):
:param server: The server on which the data are located.
:param token: The token used for authentication.
:param descriptor: Contains all necessary information to download data.
:param file_path: The path to where the file is written.
"""
raw_data = self.extract(server, token, descriptor)
data_frame = self.transform(raw_data)
if not isinstance(data_frame, DataFrame):
......
"""This module provides the ETLHandler class."""
import os
import abc
import json
import time
from hashlib import sha256
from uuid import uuid4
from typing import List
from fractalis.data.etl import ETL
from fractalis import app
......@@ -11,10 +14,18 @@ from fractalis import redis
class ETLHandler(metaclass=abc.ABCMeta):
"""This is an abstract class that provides a factory method to create
instances of implementations of itself. The main purpose of this class
is the supervision of all ETL processes belonging to this handler. Besides
that it takes care of authentication business.
"""
@property
@abc.abstractmethod
def _HANDLER(self):
def _handler(self) -> str:
"""Used by self.can_handle to check whether the current implementation
is able to handle the incoming request.
"""
pass
def __init__(self, server, token):
......@@ -22,13 +33,29 @@ class ETLHandler(metaclass=abc.ABCMeta):
self._token = token
@staticmethod
def compute_data_id(server, descriptor):
def compute_data_id(server: str, descriptor: dict) -> str:
"""Return a hash key based on the given parameters.
Parameters are automatically sorted before the hash is computed.
:param server: The server which is being handled.
:param descriptor: A dict describing the data.
:return: The computed hash key.
"""
descriptor_str = json.dumps(descriptor, sort_keys=True)
to_hash = '{}|{}'.format(server, descriptor_str).encode('utf-8')
hash_key = sha256(to_hash).hexdigest()
return hash_key
def handle(self, descriptors, wait):
def handle(self, descriptors: List[dict], wait: bool = False) -> List[str]:
"""Create instances of ETL for the given descriptors and submit them (ETL
implements celery.Task) to the broker. The task ids are returned to keep
track of them.
:param descriptors: A list of items describing the data to download.
:param wait: Makes this method synchronous by waiting for the tasks to
return.
:return: The list of task ids for the submitted tasks.
"""
data_ids = []
for descriptor in descriptors:
data_id = self.compute_data_id(self._server, descriptor)
......@@ -41,7 +68,7 @@ class ETLHandler(metaclass=abc.ABCMeta):
else:
file_name = str(uuid4())
file_path = os.path.join(data_dir, file_name)
etl = ETL.factory(handler=self._HANDLER,
etl = ETL.factory(handler=self._handler,
data_type=descriptor['data_type'])
async_result = etl.delay(server=self._server,
token=self._token,
......@@ -59,7 +86,16 @@ class ETLHandler(metaclass=abc.ABCMeta):
return data_ids
@classmethod
def factory(cls, handler, server, token):
def factory(cls, handler: str, server: str, token: str) -> 'ETLHandler':
"""Return an instance of the implementation of ETLHandler that can
handle the given parameters.
:param handler: Describes the handler. E.g.: transmart, ada
:param server: The server to download data from.
:param token: The token used for authentication.
:return: An instance of an implementation of ETLHandler that returns
True for self.can_handle
"""
from . import HANDLER_REGISTRY
for Handler in HANDLER_REGISTRY:
if Handler.can_handle(handler):
......@@ -68,8 +104,14 @@ class ETLHandler(metaclass=abc.ABCMeta):
"No ETLHandler implementation found for: '{}'".format(handler))
@classmethod
def can_handle(cls, handler):
return handler == cls._HANDLER
def can_handle(cls, handler: str) -> bool:
"""Check whether this implementation of ETLHandler can handle the given
parameters.
:param handler: Describes the handler. E.g.: transmart, ada
:return: True if this implementation can handle the given parameters.
"""
return handler == cls._handler
@abc.abstractmethod
def _heartbeat(self):
......
......@@ -3,7 +3,7 @@ from fractalis.data.etlhandler import ETLHandler
class AdaHandler(ETLHandler):
_HANDLER = 'ada'
_handler = 'ada'
def _heartbeat(self):
pass
\ No newline at end of file
......@@ -7,8 +7,8 @@ from fractalis.data.etl import ETL
class BarETL(ETL):
name = 'test_bar_task'
_HANDLER = 'test'
_DATA_TYPE = 'bar'
_handler = 'test'
_data_type = 'bar'
def extract(self, server, token, descriptor):
fake_raw_data = np.random.randn(10, 5)
......
......@@ -7,8 +7,8 @@ from fractalis.data.etl import ETL
class FooETL(ETL):
name = 'test_foo_task'
_HANDLER = 'test'
_DATA_TYPE = 'foo'
_handler = 'test'
_data_type = 'foo'
def extract(self, server, token, descriptor):
fake_raw_data = np.random.randn(10, 5)
......
......@@ -7,8 +7,8 @@ from fractalis.data.etl import ETL
class RandomDFETL(ETL):
name = 'test_randomdf_task'
_HANDLER = 'test'
_DATA_TYPE = 'randomdf'
_handler = 'test'
_data_type = 'randomdf'
def extract(self, server, token, descriptor):
fake_raw_data = np.random.randn(10, 5)
......
......@@ -3,7 +3,7 @@ from fractalis.data.etlhandler import ETLHandler
class TestHandler(ETLHandler):
_HANDLER = 'test'
_handler = 'test'
def _heartbeat():
pass
......@@ -3,7 +3,7 @@ from fractalis.data.etlhandler import ETLHandler
class TransmartHandler(ETLHandler):
_ETL = 'transmart'
_handler = 'transmart'
def heartbeat():
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment