diff --git a/fractalis/__init__.py b/fractalis/__init__.py index b9b92329609a95bcf5057fd5229e0604e61c5e41..09271d564c4144898801fbf34c08008aa8a949f6 100644 --- a/fractalis/__init__.py +++ b/fractalis/__init__.py @@ -27,7 +27,9 @@ app.session_interface = RedisSessionInterface(redis) # register blueprints. Do not move the import, due to circular dependencyies. from fractalis.analytics.controller import analytics_blueprint # noqa +from fractalis.data.controller import data_blueprint # noqa app.register_blueprint(analytics_blueprint, url_prefix='/analytics') +app.register_blueprint(data_blueprint, url_prefix='/data') if __name__ == '__main__': handler = logging.handlers.TimedRotatingFileHandler('fractalis.log', diff --git a/fractalis/config.py b/fractalis/config.py index bdc722c835143ba12d863f1a754787e198a338ad..6e51669025e333c02aee2391eae211825b265672 100644 --- a/fractalis/config.py +++ b/fractalis/config.py @@ -1,3 +1,4 @@ +import os from datetime import timedelta # DO NOT MODIFY THIS FILE! @@ -13,4 +14,7 @@ BROKER_URL = 'amqp://' CELERY_RESULT_BACKEND = 'redis://{}:{}'.format(REDIS_HOST, REDIS_PORT) CELERY_TASK_TIME_LIMIT = 60 * 10 +FRACTALIS_TMP_DIR = os.path.abspath(os.path.join( + os.sep, 'tmp', 'fractalis')) + # DO NOT MODIFY THIS FILE! diff --git a/fractalis/data/controller.py b/fractalis/data/controller.py index 416a2d69aed76b6f863e4b54fe8c7768685741a4..ea0a60d77f7490fa8e398a8fd9daeed872e2e752 100644 --- a/fractalis/data/controller.py +++ b/fractalis/data/controller.py @@ -1,8 +1,12 @@ +import json + from flask import Blueprint, session, request, jsonify -from .etlhandler import ETLHandler +from .etls.etlhandler import ETLHandler from .schema import create_data_schema from fractalis.validator import validate_json, validate_schema +from fractalis.celery import app as celery +from fractalis import redis data_blueprint = Blueprint('data_blueprint', __name__) @@ -11,8 +15,8 @@ data_blueprint = Blueprint('data_blueprint', __name__) @data_blueprint.before_request def prepare_session(): session.permanent = True - if 'data_jobs' not in session: - session['data_jobs'] = [] + if 'data_ids' not in session: + session['data_ids'] = [] @data_blueprint.route('', methods=['POST']) @@ -23,16 +27,59 @@ def create_data(): etlhandler = ETLHandler.factory(handler=json['handler'], server=json['server'], token=json['token']) - job_ids = etlhandler.handle(json['descriptors']) - session['data_jobs'].append(job_ids) - return jsonify({'job_ids': job_ids}), 201 + data_ids = etlhandler.handle(json['descriptors']) + session['data_ids'] += data_ids + return jsonify({'data_ids': data_ids}), 201 + + +def get_data(key, wait): + value = redis.hget(name='data', key=key) + data_obj = json.loads(value) + job_id = data_obj['job_id'] + async_result = celery.AsyncResult(job_id) + if wait: + async_result.get(propagate=False) # wait for results + state = async_result.state + result = async_result.result + if isinstance(result, Exception): # Exception -> str + result = "{}: {}".format(type(result).__name__, str(result)) + data_obj['state'] = state + data_obj['message'] = result + return data_obj + + +@data_blueprint.route('/<uuid:data_id>', methods=['GET']) +def get_data_by_id(data_id): + data_id = str(data_id) + wait = request.args.get('wait') == '1' + if data_id not in session['data_ids']: # access control + return jsonify({'error_msg': "No matching data found."}), 404 + data = get_data(data_id, wait) + return jsonify(data), 200 + + +@data_blueprint.route('/<string:params>', methods=['GET']) +def get_data_by_params(params): + params = json.loads(params) + wait = request.args.get('wait') == '1' + data_id = ETLHandler.compute_data_id(server=params['server'], + descriptor=params['descriptor']) + if data_id not in session['data_ids']: # access control + return jsonify({'error_msg': "No matching data found."}), 404 + data = get_data(data_id, wait) + return jsonify(data), 200 + + +@data_blueprint.route('', methods=['GET']) +def get_all_data_state(): + pass -@data_blueprint.route('', method=['GET']) -def get_all_session_data_status(): +@data_blueprint.route('/<uuid:data_id>', methods=['DELETE']) +def delete_data(data_id): pass -@data_blueprint.route('/<uuid:data_id>', method=['GET']) -def get_data_status(data_id): +@data_blueprint.route('', methods=['DELETE']) +def delete_all_data(): pass diff --git a/fractalis/data/etls/etl.py b/fractalis/data/etls/etl.py index 04b658bb7871812d64eddb86e5fbdf61728b2719..76a47e40b9f6beddf830b6386a694bedd3933808 100644 --- a/fractalis/data/etls/etl.py +++ b/fractalis/data/etls/etl.py @@ -1,15 +1,8 @@ -import os import abc -import json -from uuid import uuid4 -from hashlib import sha256 from celery import Task from pandas import DataFrame -from fractalis import app -from fractalis import redis - class ETL(Task, metaclass=abc.ABCMeta): @@ -50,21 +43,17 @@ class ETL(Task, metaclass=abc.ABCMeta): def transform(self, raw_data): pass - def load(self, data_frame, server, descriptor): - data_dir = app.config['FRACTALIS_TMP_FOLDER'] - os.makedirs(data_dir, exist_ok=True) - file_name = uuid4() - file_path = os.path.join(data_dir, file_name) - descriptor_str = json.dumps(descriptor, sort_keys=True) - to_hash = '{}|{}'.format(server, descriptor_str).encode('utf-8') - hash_key = sha256(to_hash) + def load(self, data_frame, file_path): data_frame.to_csv(file_path) - redis.hset(name='data', key=hash_key, value=file_path) - def run(self, server, token, descriptor): + def validate_state(self): + return True + + def run(self, server, token, descriptor, file_path): raw_data = self.extract(server, token, descriptor) data_frame = self.transform(raw_data) if not isinstance(data_frame, DataFrame): raise TypeError("transform() must return 'pandas.DataFrame', but" "returned '{}' instead.".format(type(data_frame))) - self.load(data_frame, server, descriptor) + self.load(data_frame, file_path) + self.validate_state() diff --git a/fractalis/data/etls/etlhandler.py b/fractalis/data/etls/etlhandler.py index 03f170feb6c0b85d0333ad834e585237f0c8571d..bf25f53f163304618a8eca22b3d0754f3ec77f49 100644 --- a/fractalis/data/etls/etlhandler.py +++ b/fractalis/data/etls/etlhandler.py @@ -1,6 +1,12 @@ +import os import abc +import json +from hashlib import sha256 +from uuid import uuid4 from fractalis.data.etls.etl import ETL +from fractalis import app +from fractalis import redis class ETLHandler(metaclass=abc.ABCMeta): @@ -14,16 +20,32 @@ class ETLHandler(metaclass=abc.ABCMeta): self._server = server self._token = token + @staticmethod + def compute_data_id(server, descriptor): + descriptor_str = json.dumps(descriptor, sort_keys=True) + to_hash = '{}|{}'.format(server, descriptor_str).encode('utf-8') + hash_key = sha256(to_hash) + return hash_key + def handle(self, descriptors): - etl_job_ids = [] + data_ids = [] for descriptor in descriptors: + hash_key = self.compute_data_id(self._server, descriptor) + tmp_dir = app.config['FRACTALIS_TMP_DIR'] + data_dir = os.path.join(tmp_dir, 'data') + os.makedirs(data_dir, exist_ok=True) + file_name = str(uuid4()) + file_path = os.path.join(data_dir, file_name) etl = ETL.factory(handler=self._HANDLER, data_type=descriptor['data_type']) async_result = etl.delay(server=self._server, token=self._token, - descriptor=descriptor) - etl_job_ids.append(async_result.id) - return etl_job_ids + descriptor=descriptor, + file_path=file_path) + data_obj = {'file_path': file_path, 'job_id': async_result.id} + redis.hset(name='data', key=hash_key, value=json.dumps(data_obj)) + data_ids.append(async_result.id) + return data_ids @classmethod def factory(cls, handler, server, token): diff --git a/fractalis/data/etls/test/etl_foo.py b/fractalis/data/etls/test/etl_foo.py index f9fc34c494ba857abb16934fab80a740e01387e8..ff8499162abb3e562731f5957e023dbbd74a0f5d 100644 --- a/fractalis/data/etls/test/etl_foo.py +++ b/fractalis/data/etls/test/etl_foo.py @@ -1,3 +1,6 @@ +import pandas as pd +import numpy as np + from fractalis.data.etls.etl import ETL @@ -7,8 +10,10 @@ class FooETL(ETL): _HANDLER = 'test' _DATA_TYPE = 'foo' - def extract(self, params): - pass + def extract(self, server, token, descriptor): + fake_raw_data = np.random.randn(10, 5) + return fake_raw_data def transform(self, raw_data): - pass + fake_df = pd.DataFrame(raw_data) + return fake_df diff --git a/fractalis/data/schema.py b/fractalis/data/schema.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..ae5f42fb90fe2d17fb17e73d592119e6cc7df91e 100644 --- a/fractalis/data/schema.py +++ b/fractalis/data/schema.py @@ -0,0 +1,2 @@ +create_data_schema = { +} diff --git a/setup.py b/setup.py index cf28359c83e381bbbca5cd0c2ffa39a1a198459e..cc5a0d6b7e2385d1835991a4370e4827d8a1c2d7 100644 --- a/setup.py +++ b/setup.py @@ -12,6 +12,7 @@ setup( 'celery[redis]', 'redis', 'pandas', + 'numpy' ], setup_requires=[ 'pytest-runner', diff --git a/tests/test_data.py b/tests/test_data.py index c089ece442ab8ac2ecc7c1115854176a0c98a248..999b8edbc4cc2f7e2dfd679a13fc97f7a1b0ec52 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -1,64 +1,47 @@ """This module tests the data controller module.""" +import os +from uuid import UUID + import flask import pytest +from fractalis import redis +from fractalis import app + -@pytest.mark.skip(reason='notimplemented') class TestData: @pytest.fixture(scope='function') - def app(self): + def test_client(self): from fractalis import app app.testing = True with app.test_client() as test_client: yield test_client + redis.flushall() - # POST / - - def test_201_on_POST_and_resource_exists_if_created(self, app): - rv = app.post('/data', data=flask.json.dumps(dict( - etl='transmart', + @pytest.fixture(scope='function') + def post(self, test_client): + return test_client.post('/data', data=flask.json.dumps(dict( + handler='test', server='localhost:1234', - concept='GSE123/Demographics/Age', + token='7746391376142672192764', + descriptors=[ + { + 'data_type': 'foo', + 'concept': 'abc//efg/whatever' + } + ] ))) - body = flask.json.loads(rv.get_data()) - new_url = '/data/{}'.format(body['data_id']) - assert app.head(new_url) == 200 - - def test_400_on_POST_if_invalid_request(self, app): - assert False - - def test_200_instead_of_201_on_POST_if_data_already_exists(self, app): - assert False - - def test_data_deleted_on_expiration(self, app): - assert False - - def test_data_in_db_after_creation(self, app): - assert False - # GET /data_id - - def test_200_on_GET_if_resource_created_and_correct_content(self, app): - assert False - - def test_400_on_GET_if_invalid_request(self, app): - assert False - - def test_404_on_GET_if_dataid_not_existing(self, app): - assert False - - def test_404_on_GET_if_no_auth(self, app): - assert False - - # GET / - - def test_200_on_GET_and_correct_summary_if_data_exist(self, app): - assert False - - def test_200_on_GET_and_correct_summary_if_no_data_exist(self, app): - assert False + # POST / - def test_only_permitted_data_visible(self, app): - assert False + def test_201_on_POST_and_file_exists(self, test_client, post): + rv = post + body = flask.json.loads(rv.get_data()) + assert len(body['data_ids']) == 1 + data_id = body['data_ids'][0] + assert UUID(data_id) + data_dir = os.path.join(app.config['FRACTALIS_TMP_DIR'], 'data') + assert len(os.listdir(data_dir)) == 1 + assert UUID(os.listdir(data_dir)[0])