Skip to content
Snippets Groups Projects
Commit 776bdb68 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

skeleton for factory design pattern

parent c6903de3
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -5,3 +5,6 @@
*.egg-info/
__pycache__/
.cache/
*.rdb
*.log
*.idea
from flask import Blueprint, session, request, jsonify
from fractalis import redis
from fractalis.data.etls.etlhandler import ETLHandler
data_blueprint = Blueprint('data_blueprint', __name__)
@data_blueprint.route('', methods=['POST'])
def create_data(etl, server, concepts, token):
ETLHandler.factory()
return jsonify({data_status_list: data_status_list}), 201
@data_blueprint.route('', method=['GET'])
def get_all_session_data_status():
pass
@data_blueprint.route('/<uuid:data_id>', method=['GET'])
def get_data_status(data_id):
etls.get_status(data_id)
"""This module helps us to populate __subclasses__() of ETLHandler and ETL"""
import os
current_path = os.path.dirname(os.path.abspath(__file__))
for dir_path, dir_names, file_names in os.walk(current_path):
if (dir_path == current_path or
'__pycache__' in dir_path or
'__init__.py' not in file_names):
continue
dirname = os.path.basename(dir_path)
exec('from fractalis.data.etls.{} import *'.format(dirname))
import abc
class ETL(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def _DATA_TYPE(self):
pass
@staticmethod
def can_handle_data_type(self, data_type):
return data_type == self._DATA_TYPE
@staticmethod
@abc.abstractmethod
def run(self):
pass
import abc
class ETLHandler(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def _ETL(self):
pass
def can_handle(self, etl):
return etl == self._ETL
def handle(self):
pass
@staticmethod
def factory(self, etl):
for subclass in self.__subclasses__():
if subclass.can_handle(etl):
return subclass()
@abc.abstractmethod
def heartbeat(self):
pass
__all__ = ['handler_transmart', 'etl_hdd', 'etl_ldd']
from fractalis.data.etls.etlhandler import ETLHandler
class HandlerTransmart(ETLHandler):
_ETL = 'transmart'
def heartbeat():
pass
"""This module tests the data controller module."""
import flask
import pytest
@pytest.mark.skip(reason='notimplemented')
class TestData(object):
@pytest.fixture(scope='function')
def app(self, app):
from fractalis import app
app.testing = True
with app.test_client() as test_client:
yield test_client
# POST /
def test_201_on_POST_and_resource_exists_if_created(self, app):
rv = app.post('/data', data=flask.json.dumps(dict(
etl='transmart',
server='localhost:1234',
concept='GSE123/Demographics/Age',
)))
body = flask.json.loads(rv.get_data())
new_url = '/data/{}'.format(body['data_id'])
assert app.head(new_url) == 200
def test_400_on_POST_if_invalid_request(self, app):
assert False
def test_200_instead_of_201_on_POST_if_data_already_exists(self, app):
assert False
def test_data_deleted_on_expiration(self, app):
assert False
def test_data_in_db_after_creation(self, app):
assert False
# GET /data_id
def test_200_on_GET_if_resource_created_and_correct_content(self, app):
assert False
def test_400_on_GET_if_invalid_request(self, app):
assert False
def test_404_on_GET_if_dataid_not_existing(self, app):
assert False
def test_404_on_GET_if_no_auth(self, app):
assert False
# GET /
def test_200_on_GET_and_correct_summary_if_data_exist(self, app):
assert False
def test_200_on_GET_and_correct_summary_if_no_data_exist(self, app):
assert False
def test_only_permitted_data_visible(self, app):
assert False
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment