Skip to content
Snippets Groups Projects
Commit 5576b1f1 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

implemented the L of ETL

parent c5dc4242
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -9,7 +9,6 @@ from flask import Flask
from redis import StrictRedis
from fractalis.session import RedisSessionInterface
from fractalis.analytics.controller import analytics_blueprint
app = Flask(__name__)
......@@ -25,6 +24,9 @@ except RuntimeError:
redis = StrictRedis(host=app.config['REDIS_HOST'],
port=app.config['REDIS_PORT'])
app.session_interface = RedisSessionInterface(redis)
# register blueprints. Do not move the import, due to circular dependencyies.
from fractalis.analytics.controller import analytics_blueprint # noqa
app.register_blueprint(analytics_blueprint, url_prefix='/analytics')
if __name__ == '__main__':
......
import os
import abc
import json
from uuid import uuid4
from hashlib import sha256
# TODO: is there a difference between this and importing
# fractalis.celery.app.Task ?
from celery import Task
from pandas import DataFrame
from fractalis import app
from fractalis import redis
class ETL(Task, metaclass=abc.ABCMeta):
......@@ -37,17 +43,28 @@ class ETL(Task, metaclass=abc.ABCMeta):
.format(handler, data_type))
@abc.abstractmethod
def extract(self, descriptor):
def extract(self, server, token, descriptor):
pass
@abc.abstractmethod
def transform(self, raw_data):
pass
def load(self, data):
pass
def load(self, data_frame, server, descriptor):
data_dir = app.config['FRACTALIS_TMP_FOLDER']
os.makedirs(data_dir, exist_ok=True)
file_name = uuid4()
file_path = os.path.join(data_dir, file_name)
descriptor_str = json.dumps(descriptor, sort_keys=True)
to_hash = '{}|{}'.format(server, descriptor_str).encode('utf-8')
hash_key = sha256(to_hash)
data_frame.to_csv(file_path)
redis.hset(name='data', key=hash_key, value=file_path)
def run(self, descriptor):
raw_data = self.extract(descriptor)
data = self.transform(raw_data)
self.load(data)
def run(self, server, token, descriptor):
raw_data = self.extract(server, token, descriptor)
data_frame = self.transform(raw_data)
if not isinstance(data_frame, DataFrame):
raise TypeError("transform() must return 'pandas.DataFrame', but"
"returned '{}' instead.".format(type(data_frame)))
self.load(data_frame, server, descriptor)
......@@ -10,11 +10,18 @@ class ETLHandler(metaclass=abc.ABCMeta):
def _HANDLER(self):
pass
def __init__(self, server, token):
self._server = server
self._token = token
def handle(self, descriptors):
etl_job_ids = []
for descriptor in descriptors:
etl = ETL.factory(self._HANDLER, descriptor['data_type'])
async_result = etl.delay(descriptor)
etl = ETL.factory(handler=self._HANDLER,
data_type=descriptor['data_type'])
async_result = etl.delay(server=self._server,
token=self._token,
descriptor=descriptor)
etl_job_ids.append(async_result.id)
return etl_job_ids
......@@ -23,7 +30,7 @@ class ETLHandler(metaclass=abc.ABCMeta):
from . import HANDLER_REGISTRY
for Handler in HANDLER_REGISTRY:
if Handler.can_handle(handler):
return Handler()
return Handler(server, token)
raise NotImplementedError(
"No ETLHandler implementation found for: '{}'".format(handler))
......@@ -32,5 +39,5 @@ class ETLHandler(metaclass=abc.ABCMeta):
return handler == cls._HANDLER
@abc.abstractmethod
def _heartbeat(self, server, token):
def _heartbeat(self):
pass
......@@ -11,6 +11,7 @@ setup(
'jsonschema',
'celery[redis]',
'redis',
'pandas',
],
setup_requires=[
'pytest-runner',
......
......@@ -7,7 +7,7 @@ import flask
import pytest
class TestAnalytics(object):
class TestAnalytics:
@pytest.fixture(scope='function')
def app(self):
......
......@@ -5,10 +5,10 @@ import pytest
@pytest.mark.skip(reason='notimplemented')
class TestData(object):
class TestData:
@pytest.fixture(scope='function')
def app(self, app):
def app(self):
from fractalis import app
app.testing = True
with app.test_client() as test_client:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment