Skip to content
Snippets Groups Projects
Commit 8ccbc1ce authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

implemented the GET requests, which triggered some useful refactoring

parent 5576b1f1
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -27,7 +27,9 @@ app.session_interface = RedisSessionInterface(redis)
# register blueprints. Do not move the import, due to circular dependencyies.
from fractalis.analytics.controller import analytics_blueprint # noqa
from fractalis.data.controller import data_blueprint # noqa
app.register_blueprint(analytics_blueprint, url_prefix='/analytics')
app.register_blueprint(data_blueprint, url_prefix='/data')
if __name__ == '__main__':
handler = logging.handlers.TimedRotatingFileHandler('fractalis.log',
......
import os
from datetime import timedelta
# DO NOT MODIFY THIS FILE!
......@@ -13,4 +14,7 @@ BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'redis://{}:{}'.format(REDIS_HOST, REDIS_PORT)
CELERY_TASK_TIME_LIMIT = 60 * 10
FRACTALIS_TMP_DIR = os.path.abspath(os.path.join(
os.sep, 'tmp', 'fractalis'))
# DO NOT MODIFY THIS FILE!
import json
from flask import Blueprint, session, request, jsonify
from .etlhandler import ETLHandler
from .etls.etlhandler import ETLHandler
from .schema import create_data_schema
from fractalis.validator import validate_json, validate_schema
from fractalis.celery import app as celery
from fractalis import redis
data_blueprint = Blueprint('data_blueprint', __name__)
......@@ -11,8 +15,8 @@ data_blueprint = Blueprint('data_blueprint', __name__)
@data_blueprint.before_request
def prepare_session():
session.permanent = True
if 'data_jobs' not in session:
session['data_jobs'] = []
if 'data_ids' not in session:
session['data_ids'] = []
@data_blueprint.route('', methods=['POST'])
......@@ -23,16 +27,59 @@ def create_data():
etlhandler = ETLHandler.factory(handler=json['handler'],
server=json['server'],
token=json['token'])
job_ids = etlhandler.handle(json['descriptors'])
session['data_jobs'].append(job_ids)
return jsonify({'job_ids': job_ids}), 201
data_ids = etlhandler.handle(json['descriptors'])
session['data_ids'] += data_ids
return jsonify({'data_ids': data_ids}), 201
def get_data(key, wait):
value = redis.hget(name='data', key=key)
data_obj = json.loads(value)
job_id = data_obj['job_id']
async_result = celery.AsyncResult(job_id)
if wait:
async_result.get(propagate=False) # wait for results
state = async_result.state
result = async_result.result
if isinstance(result, Exception): # Exception -> str
result = "{}: {}".format(type(result).__name__, str(result))
data_obj['state'] = state
data_obj['message'] = result
return data_obj
@data_blueprint.route('/<uuid:data_id>', methods=['GET'])
def get_data_by_id(data_id):
data_id = str(data_id)
wait = request.args.get('wait') == '1'
if data_id not in session['data_ids']: # access control
return jsonify({'error_msg': "No matching data found."}), 404
data = get_data(data_id, wait)
return jsonify(data), 200
@data_blueprint.route('/<string:params>', methods=['GET'])
def get_data_by_params(params):
params = json.loads(params)
wait = request.args.get('wait') == '1'
data_id = ETLHandler.compute_data_id(server=params['server'],
descriptor=params['descriptor'])
if data_id not in session['data_ids']: # access control
return jsonify({'error_msg': "No matching data found."}), 404
data = get_data(data_id, wait)
return jsonify(data), 200
@data_blueprint.route('', methods=['GET'])
def get_all_data_state():
pass
@data_blueprint.route('', method=['GET'])
def get_all_session_data_status():
@data_blueprint.route('/<uuid:data_id>', methods=['DELETE'])
def delete_data(data_id):
pass
@data_blueprint.route('/<uuid:data_id>', method=['GET'])
def get_data_status(data_id):
@data_blueprint.route('', methods=['DELETE'])
def delete_all_data():
pass
import os
import abc
import json
from uuid import uuid4
from hashlib import sha256
from celery import Task
from pandas import DataFrame
from fractalis import app
from fractalis import redis
class ETL(Task, metaclass=abc.ABCMeta):
......@@ -50,21 +43,17 @@ class ETL(Task, metaclass=abc.ABCMeta):
def transform(self, raw_data):
pass
def load(self, data_frame, server, descriptor):
data_dir = app.config['FRACTALIS_TMP_FOLDER']
os.makedirs(data_dir, exist_ok=True)
file_name = uuid4()
file_path = os.path.join(data_dir, file_name)
descriptor_str = json.dumps(descriptor, sort_keys=True)
to_hash = '{}|{}'.format(server, descriptor_str).encode('utf-8')
hash_key = sha256(to_hash)
def load(self, data_frame, file_path):
data_frame.to_csv(file_path)
redis.hset(name='data', key=hash_key, value=file_path)
def run(self, server, token, descriptor):
def validate_state(self):
return True
def run(self, server, token, descriptor, file_path):
raw_data = self.extract(server, token, descriptor)
data_frame = self.transform(raw_data)
if not isinstance(data_frame, DataFrame):
raise TypeError("transform() must return 'pandas.DataFrame', but"
"returned '{}' instead.".format(type(data_frame)))
self.load(data_frame, server, descriptor)
self.load(data_frame, file_path)
self.validate_state()
import os
import abc
import json
from hashlib import sha256
from uuid import uuid4
from fractalis.data.etls.etl import ETL
from fractalis import app
from fractalis import redis
class ETLHandler(metaclass=abc.ABCMeta):
......@@ -14,16 +20,32 @@ class ETLHandler(metaclass=abc.ABCMeta):
self._server = server
self._token = token
@staticmethod
def compute_data_id(server, descriptor):
descriptor_str = json.dumps(descriptor, sort_keys=True)
to_hash = '{}|{}'.format(server, descriptor_str).encode('utf-8')
hash_key = sha256(to_hash)
return hash_key
def handle(self, descriptors):
etl_job_ids = []
data_ids = []
for descriptor in descriptors:
hash_key = self.compute_data_id(self._server, descriptor)
tmp_dir = app.config['FRACTALIS_TMP_DIR']
data_dir = os.path.join(tmp_dir, 'data')
os.makedirs(data_dir, exist_ok=True)
file_name = str(uuid4())
file_path = os.path.join(data_dir, file_name)
etl = ETL.factory(handler=self._HANDLER,
data_type=descriptor['data_type'])
async_result = etl.delay(server=self._server,
token=self._token,
descriptor=descriptor)
etl_job_ids.append(async_result.id)
return etl_job_ids
descriptor=descriptor,
file_path=file_path)
data_obj = {'file_path': file_path, 'job_id': async_result.id}
redis.hset(name='data', key=hash_key, value=json.dumps(data_obj))
data_ids.append(async_result.id)
return data_ids
@classmethod
def factory(cls, handler, server, token):
......
import pandas as pd
import numpy as np
from fractalis.data.etls.etl import ETL
......@@ -7,8 +10,10 @@ class FooETL(ETL):
_HANDLER = 'test'
_DATA_TYPE = 'foo'
def extract(self, params):
pass
def extract(self, server, token, descriptor):
fake_raw_data = np.random.randn(10, 5)
return fake_raw_data
def transform(self, raw_data):
pass
fake_df = pd.DataFrame(raw_data)
return fake_df
create_data_schema = {
}
......@@ -12,6 +12,7 @@ setup(
'celery[redis]',
'redis',
'pandas',
'numpy'
],
setup_requires=[
'pytest-runner',
......
"""This module tests the data controller module."""
import os
from uuid import UUID
import flask
import pytest
from fractalis import redis
from fractalis import app
@pytest.mark.skip(reason='notimplemented')
class TestData:
@pytest.fixture(scope='function')
def app(self):
def test_client(self):
from fractalis import app
app.testing = True
with app.test_client() as test_client:
yield test_client
redis.flushall()
# POST /
def test_201_on_POST_and_resource_exists_if_created(self, app):
rv = app.post('/data', data=flask.json.dumps(dict(
etl='transmart',
@pytest.fixture(scope='function')
def post(self, test_client):
return test_client.post('/data', data=flask.json.dumps(dict(
handler='test',
server='localhost:1234',
concept='GSE123/Demographics/Age',
token='7746391376142672192764',
descriptors=[
{
'data_type': 'foo',
'concept': 'abc//efg/whatever'
}
]
)))
body = flask.json.loads(rv.get_data())
new_url = '/data/{}'.format(body['data_id'])
assert app.head(new_url) == 200
def test_400_on_POST_if_invalid_request(self, app):
assert False
def test_200_instead_of_201_on_POST_if_data_already_exists(self, app):
assert False
def test_data_deleted_on_expiration(self, app):
assert False
def test_data_in_db_after_creation(self, app):
assert False
# GET /data_id
def test_200_on_GET_if_resource_created_and_correct_content(self, app):
assert False
def test_400_on_GET_if_invalid_request(self, app):
assert False
def test_404_on_GET_if_dataid_not_existing(self, app):
assert False
def test_404_on_GET_if_no_auth(self, app):
assert False
# GET /
def test_200_on_GET_and_correct_summary_if_data_exist(self, app):
assert False
def test_200_on_GET_and_correct_summary_if_no_data_exist(self, app):
assert False
# POST /
def test_only_permitted_data_visible(self, app):
assert False
def test_201_on_POST_and_file_exists(self, test_client, post):
rv = post
body = flask.json.loads(rv.get_data())
assert len(body['data_ids']) == 1
data_id = body['data_ids'][0]
assert UUID(data_id)
data_dir = os.path.join(app.config['FRACTALIS_TMP_DIR'], 'data')
assert len(os.listdir(data_dir)) == 1
assert UUID(os.listdir(data_dir)[0])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment