diff --git a/fractalis/__init__.py b/fractalis/__init__.py index 7b8da2d176161520101a2219267b550d6ebd738c..b9b92329609a95bcf5057fd5229e0604e61c5e41 100644 --- a/fractalis/__init__.py +++ b/fractalis/__init__.py @@ -9,7 +9,6 @@ from flask import Flask from redis import StrictRedis from fractalis.session import RedisSessionInterface -from fractalis.analytics.controller import analytics_blueprint app = Flask(__name__) @@ -25,6 +24,9 @@ except RuntimeError: redis = StrictRedis(host=app.config['REDIS_HOST'], port=app.config['REDIS_PORT']) app.session_interface = RedisSessionInterface(redis) + +# register blueprints. Do not move the import, due to circular dependencyies. +from fractalis.analytics.controller import analytics_blueprint # noqa app.register_blueprint(analytics_blueprint, url_prefix='/analytics') if __name__ == '__main__': diff --git a/fractalis/data/etls/etl.py b/fractalis/data/etls/etl.py index 07c1dac80f59fe452003c8bb0486ffa29565188a..04b658bb7871812d64eddb86e5fbdf61728b2719 100644 --- a/fractalis/data/etls/etl.py +++ b/fractalis/data/etls/etl.py @@ -1,8 +1,14 @@ +import os import abc +import json +from uuid import uuid4 +from hashlib import sha256 -# TODO: is there a difference between this and importing -# fractalis.celery.app.Task ? from celery import Task +from pandas import DataFrame + +from fractalis import app +from fractalis import redis class ETL(Task, metaclass=abc.ABCMeta): @@ -37,17 +43,28 @@ class ETL(Task, metaclass=abc.ABCMeta): .format(handler, data_type)) @abc.abstractmethod - def extract(self, descriptor): + def extract(self, server, token, descriptor): pass @abc.abstractmethod def transform(self, raw_data): pass - def load(self, data): - pass + def load(self, data_frame, server, descriptor): + data_dir = app.config['FRACTALIS_TMP_FOLDER'] + os.makedirs(data_dir, exist_ok=True) + file_name = uuid4() + file_path = os.path.join(data_dir, file_name) + descriptor_str = json.dumps(descriptor, sort_keys=True) + to_hash = '{}|{}'.format(server, descriptor_str).encode('utf-8') + hash_key = sha256(to_hash) + data_frame.to_csv(file_path) + redis.hset(name='data', key=hash_key, value=file_path) - def run(self, descriptor): - raw_data = self.extract(descriptor) - data = self.transform(raw_data) - self.load(data) + def run(self, server, token, descriptor): + raw_data = self.extract(server, token, descriptor) + data_frame = self.transform(raw_data) + if not isinstance(data_frame, DataFrame): + raise TypeError("transform() must return 'pandas.DataFrame', but" + "returned '{}' instead.".format(type(data_frame))) + self.load(data_frame, server, descriptor) diff --git a/fractalis/data/etls/etlhandler.py b/fractalis/data/etls/etlhandler.py index ca6d2ed3ba4a2cc2bdf7e83716335e21226c3f3e..03f170feb6c0b85d0333ad834e585237f0c8571d 100644 --- a/fractalis/data/etls/etlhandler.py +++ b/fractalis/data/etls/etlhandler.py @@ -10,11 +10,18 @@ class ETLHandler(metaclass=abc.ABCMeta): def _HANDLER(self): pass + def __init__(self, server, token): + self._server = server + self._token = token + def handle(self, descriptors): etl_job_ids = [] for descriptor in descriptors: - etl = ETL.factory(self._HANDLER, descriptor['data_type']) - async_result = etl.delay(descriptor) + etl = ETL.factory(handler=self._HANDLER, + data_type=descriptor['data_type']) + async_result = etl.delay(server=self._server, + token=self._token, + descriptor=descriptor) etl_job_ids.append(async_result.id) return etl_job_ids @@ -23,7 +30,7 @@ class ETLHandler(metaclass=abc.ABCMeta): from . import HANDLER_REGISTRY for Handler in HANDLER_REGISTRY: if Handler.can_handle(handler): - return Handler() + return Handler(server, token) raise NotImplementedError( "No ETLHandler implementation found for: '{}'".format(handler)) @@ -32,5 +39,5 @@ class ETLHandler(metaclass=abc.ABCMeta): return handler == cls._HANDLER @abc.abstractmethod - def _heartbeat(self, server, token): + def _heartbeat(self): pass diff --git a/setup.py b/setup.py index 8b5a67ea08aed22d9584d12daa70c50bf15e306c..cf28359c83e381bbbca5cd0c2ffa39a1a198459e 100644 --- a/setup.py +++ b/setup.py @@ -11,6 +11,7 @@ setup( 'jsonschema', 'celery[redis]', 'redis', + 'pandas', ], setup_requires=[ 'pytest-runner', diff --git a/tests/test_analytics.py b/tests/test_analytics.py index 082cdaa2588eaaeaa54fe23d24e19b6a9ac03eb8..2c4a5ecda231fe90ea01758acfe3042404954e5a 100644 --- a/tests/test_analytics.py +++ b/tests/test_analytics.py @@ -7,7 +7,7 @@ import flask import pytest -class TestAnalytics(object): +class TestAnalytics: @pytest.fixture(scope='function') def app(self): diff --git a/tests/test_data.py b/tests/test_data.py index 3f1318395ffbc1dc4221047fe9aa1dc358ee7cfb..c089ece442ab8ac2ecc7c1115854176a0c98a248 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -5,10 +5,10 @@ import pytest @pytest.mark.skip(reason='notimplemented') -class TestData(object): +class TestData: @pytest.fixture(scope='function') - def app(self, app): + def app(self): from fractalis import app app.testing = True with app.test_client() as test_client: