Skip to content
Snippets Groups Projects
Commit eed86197 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

cleaned up some naming conventions and crazyness

parent 5aec3b12
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -14,8 +14,8 @@ analytics_blueprint = Blueprint('analytics_blueprint', __name__)
@analytics_blueprint.before_request
def prepare_session():
session.permanent = True
if 'jobs' not in session:
session['jobs'] = []
if 'analytics_jobs' not in session:
session['analytics_jobs'] = []
@analytics_blueprint.route('', methods=['POST'])
......@@ -28,14 +28,14 @@ def create_job():
return jsonify({'error_msg': "Job with name '{}' not found.".format(
json['job_name'])}), 400
async_result = analytics_job.delay(**json['args'])
session['jobs'].append(async_result.id)
session['analytics_jobs'].append(async_result.id)
return jsonify({'job_id': async_result.id}), 201
@analytics_blueprint.route('/<uuid:job_id>', methods=['GET'])
def get_job_details(job_id):
job_id = str(job_id)
if job_id not in session['jobs']: # access control
if job_id not in session['analytics_jobs']: # access control
return jsonify({'error_msg': "No matching job found."}), 404
async_result = celery.AsyncResult(job_id)
wait = request.args.get('wait') == '1'
......@@ -52,10 +52,10 @@ def get_job_details(job_id):
@analytics_blueprint.route('/<uuid:job_id>', methods=['DELETE'])
def cancel_job(job_id):
job_id = str(job_id)
if job_id not in session['jobs']: # Access control
if job_id not in session['analytics_jobs']: # Access control
return jsonify({'error_msg': "No matching job found."}), 404
wait = request.args.get('wait') == '1'
# possibly dangerous: http://stackoverflow.com/a/29627549
celery.control.revoke(job_id, terminate=True, signal='SIGUSR1', wait=wait)
session['jobs'].remove(job_id)
session['analytics_jobs'].remove(job_id)
return jsonify({'job_id': job_id}), 200
......@@ -11,8 +11,8 @@ data_blueprint = Blueprint('data_blueprint', __name__)
@data_blueprint.before_request
def prepare_session():
session.permanent = True
if 'jobs' not in session:
session['jobs'] = []
if 'data_jobs' not in session:
session['data_jobs'] = []
@data_blueprint.route('', methods=['POST'])
......@@ -20,9 +20,11 @@ def prepare_session():
@validate_schema(create_data_schema)
def create_data():
json = request.get_json(force=True) # pattern enforced by decorators
handler = ETLHandler.factory(json)
job_ids = handler.handle()
session['jobs'].append(job_ids)
etlhandler = ETLHandler.factory(handler=json['handler'],
server=json['server'],
token=json['token'])
job_ids = etlhandler.handle(json['descriptors'])
session['data_jobs'].append(job_ids)
return jsonify({'job_ids': job_ids}), 201
......
import abc
import csv
from hashlib import sha256
# TODO: is there a difference between this and importing
# fractalis.celery.app.Task ?
......@@ -23,20 +25,31 @@ class ETL(Task, metaclass=abc.ABCMeta):
pass
@classmethod
def can_handle(cls, params):
return (params['_handler'] == cls._HANDLER and
params['_descriptor']['data_type'] == cls._DATA_TYPE)
def can_handle(cls, handler, data_type):
return handler == cls._HANDLER and data_type == cls._DATA_TYPE
@classmethod
def factory(cls, params):
def factory(cls, handler, data_type):
from . import ETL_REGISTRY
for etl in ETL_REGISTRY:
if etl.can_handle(params):
if etl.can_handle(handler, data_type):
return etl()
raise NotImplementedError(
"No ETL implementation found for: '{}'"
.format(params))
"No ETL implementation found for handler '{}'' and data type '{}'"
.format(handler, data_type))
@abc.abstractmethod
def run(self):
def extract(self, descriptor):
pass
@abc.abstractmethod
def transform(self, raw_data):
pass
def load(self, data):
pass
def run(self, descriptor):
raw_data = self.extract(descriptor)
data = self.transform(raw_data)
self.load(data)
import abc
import copy
from fractalis.data.etls.etl import ETL
......@@ -11,36 +10,27 @@ class ETLHandler(metaclass=abc.ABCMeta):
def _HANDLER(self):
pass
def __init__(self, kwargs):
for key in kwargs:
self.__dict__['_' + key] = kwargs[key]
def handle(self):
assert self._descriptors
def handle(self, descriptors):
etl_job_ids = []
for descriptor in self._descriptors:
params = copy.deepcopy(vars(self))
del params['_descriptors']
params['_descriptor'] = descriptor
etl = ETL.factory(params)
async_result = etl.delay(params)
for descriptor in descriptors:
etl = ETL.factory(self._HANDLER, descriptor['data_type'])
async_result = etl.delay(descriptor)
etl_job_ids.append(async_result.id)
return etl_job_ids
@classmethod
def factory(cls, **kwargs):
def factory(cls, handler, server, token):
from . import HANDLER_REGISTRY
for handler in HANDLER_REGISTRY:
if handler.can_handle(kwargs):
return handler(kwargs)
for Handler in HANDLER_REGISTRY:
if Handler.can_handle(handler):
return Handler()
raise NotImplementedError(
"No ETLHandler implementation found for: '{}'"
.format(kwargs))
"No ETLHandler implementation found for: '{}'".format(handler))
@classmethod
def can_handle(cls, kwargs):
return kwargs['handler'] == cls._HANDLER
def can_handle(cls, handler):
return handler == cls._HANDLER
@abc.abstractmethod
def _heartbeat(self):
def _heartbeat(self, server, token):
pass
......@@ -7,5 +7,8 @@ class FooETL(ETL):
_HANDLER = 'test'
_DATA_TYPE = 'foo'
def run(self):
return 42
def extract(self, params):
pass
def transform(self, raw_data):
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment