Skip to content
Snippets Groups Projects
Commit 4fd472b6 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

new system to autodiscover classes/tasks

parent a01cebd3
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -6,7 +6,9 @@ import logging
from celery import Celery
from fractalis.utils import get_sub_packages_for_package
from fractalis.utils import list_classes_with_base_class
from fractalis.data.etls.etl import ETL
app = Celery(__name__)
app.config_from_object('fractalis.config')
......@@ -16,11 +18,12 @@ try:
sys.path.append(os.path.dirname(os.path.expanduser(config_file)))
config = __import__(module).__dict__
for key in app.conf:
if key in config and key[0] != '_':
if key in config and key.startswith('_'):
app.conf[key] = config[key]
except KeyError:
logger = logging.getLogger('fractalis')
logger.warning("FRACTALIS_CONFIG is not set. Using defaults.")
task_package = 'fractalis.analytics.scripts'
app.autodiscover_tasks(packages=get_sub_packages_for_package(task_package))
etl_classes = list_classes_with_base_class('fractalis.data.etls', ETL)
for etl_class in etl_classes:
app.tasks.register(etl_class)
"""This module helps us to populate __subclasses__() of ETLHandler and ETL"""
import os
from fractalis.utils import list_classes_with_base_class
from fractalis.data.etls.etlhandler import ETLHandler
from fractalis.data.etls.etl import ETL
current_path = os.path.dirname(os.path.abspath(__file__))
for dir_path, dir_names, file_names in os.walk(current_path):
if (dir_path == current_path or
'__pycache__' in dir_path or
'__init__.py' not in file_names):
continue
dirname = os.path.basename(dir_path)
exec('from fractalis.data.etls.{} import *'.format(dirname))
HANDLER_REGISTRY = list_classes_with_base_class('fractalis.data.etls',
ETLHandler)
ETL_REGISTRY = list_classes_with_base_class('fractalis.data.etls',
ETL)
import abc
from fractalis.celery import app as celery
# TODO: is there a difference between this and importing
# fractalis.celery.app.Task ?
from celery import Task
class ETL(celery.Task, metaclass=abc.ABCMeta):
class ETL(Task, metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def name(self):
pass
@property
@abc.abstractmethod
......@@ -14,21 +22,18 @@ class ETL(celery.Task, metaclass=abc.ABCMeta):
def _DATA_TYPE(self):
pass
def __init__(self, params):
for key in params:
self.__dict__[key] = params[key]
@classmethod
def _can_handle(cls, params):
def can_handle(cls, params):
return (params['_handler'] == cls._HANDLER and
params['_descriptor']['data_type'] == cls._DATA_TYPE)
@classmethod
def factory(cls, params):
for subclass in cls.__subclasses__():
if subclass._can_handle(params):
return subclass(params)
raise NotImplemented(
from . import ETL_REGISTRY
for etl in ETL_REGISTRY:
if etl.can_handle(params):
return etl()
raise NotImplementedError(
"No ETL implementation found for: '{}'"
.format(params))
......
......@@ -29,15 +29,16 @@ class ETLHandler(metaclass=abc.ABCMeta):
@classmethod
def factory(cls, **kwargs):
for subclass in cls.__subclasses__():
if subclass._can_handle(kwargs):
return subclass(kwargs)
raise NotImplemented(
from . import HANDLER_REGISTRY
for handler in HANDLER_REGISTRY:
if handler.can_handle(kwargs):
return handler(kwargs)
raise NotImplementedError(
"No ETLHandler implementation found for: '{}'"
.format(kwargs))
@classmethod
def _can_handle(cls, kwargs):
def can_handle(cls, kwargs):
return kwargs['handler'] == cls._HANDLER
@abc.abstractmethod
......
__all__ = ['handler_test', 'etl_foo', 'etl_bar']
......@@ -3,8 +3,9 @@ from fractalis.data.etls.etl import ETL
class FooETL(ETL):
name = 'test_foo_task'
_HANDLER = 'test'
_DATA_TYPE = 'foo'
def run(self):
pass
return 42
import os
import glob
import inspect
import importlib
def get_sub_packages_for_package(package):
module = importlib.import_module(package)
abs_path = os.path.dirname(os.path.abspath(module.__file__))
sub_packages = []
for dir_path, dir_names, file_names in os.walk(abs_path):
if (dir_path == abs_path or
'__pycache__' in dir_path or
'__init__.py' not in file_names):
continue
dirname = os.path.basename(dir_path)
sub_package = '{}.{}'.format(package, dirname)
sub_packages.append(sub_package)
return sub_packages
def import_module_by_abs_path(module_path):
module_name = os.path.splitext(os.path.basename(module_path))[0]
spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def list_classes_with_base_class(package, base_class):
package = importlib.import_module(package)
abs_path = os.path.dirname(os.path.abspath(package.__file__))
class_list = []
for module_path in glob.iglob('{}/*/**/*.py'.format(abs_path),
recursive=True):
if not os.path.basename(module_path).startswith('_'):
module = import_module_by_abs_path(module_path)
classes = inspect.getmembers(module, inspect.isclass)
for name, obj in classes:
if obj.__base__ == base_class:
class_list.append(obj)
return class_list
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment