Skip to content
Snippets Groups Projects
Commit 14932876 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Some more documentation

parent 55fe7347
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -4,6 +4,7 @@ Celery instance."""
import logging
from celery import Celery
from flask import Flask
from fractalis.analytics.task import AnalyticTask
from fractalis.data.etl import ETL
......@@ -13,7 +14,12 @@ from fractalis.utils import list_classes_with_base_class
logger = logging.getLogger(__name__)
def make_celery(app):
def make_celery(app: Flask) -> Celery:
"""Create a celery instance which executes its tasks in the application
context of our service.
:param app: The instance of our web service. This holds our configuration.
:return A celery instance that can submit tasks.
"""
celery = Celery(app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['BROKER_URL'])
......@@ -31,7 +37,8 @@ def make_celery(app):
return celery
def register_tasks():
def register_tasks() -> None:
"""Register all of our Task classes with celery."""
from fractalis import celery
logger.info("Registering ETLs ...")
......
......@@ -9,7 +9,7 @@ from typing import List
from celery import Task
from pandas import DataFrame
from fractalis import redis
from fractalis import app, redis
logger = logging.getLogger(__name__)
......@@ -111,8 +111,9 @@ class ETL(Task, metaclass=abc.ABCMeta):
assert value is not None
data_state = json.loads(value.decode('utf-8'))
data_state['loaded'] = True
redis.set(name='data:{}'.format(self.request.id),
value=json.dumps(data_state))
redis.setex(name='data:{}'.format(self.request.id),
value=json.dumps(data_state),
time=app.config['FRACTALIS_CACHE_EXP'])
def run(self, server: str, token: str,
descriptor: dict, file_path: str) -> None:
......
......@@ -70,8 +70,9 @@ class ETLHandler(metaclass=abc.ABCMeta):
'data_type': data_type,
'loaded': False
}
redis.set(name='data:{}'.format(task_id),
value=json.dumps(data_state))
redis.setex(name='data:{}'.format(task_id),
value=json.dumps(data_state),
time=app.config['FRACTALIS_CACHE_EXP'])
def handle(self, descriptors: List[dict], wait: bool = False) -> List[str]:
"""Create instances of ETL for the given descriptors and submit them
......
......@@ -2,9 +2,14 @@ import os
import glob
import inspect
import importlib
from typing import List
def import_module_by_abs_path(module_path):
def import_module_by_abs_path(module_path: str) -> object:
"""Import the module for the given path.
:param module_path: The absolute path to the module.
:return: A reference to the imported module.
"""
module_name = os.path.splitext(os.path.basename(module_path))[0]
spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(spec)
......@@ -12,7 +17,14 @@ def import_module_by_abs_path(module_path):
return module
def list_classes_with_base_class(package, base_class):
def list_classes_with_base_class(
package: str, base_class: object) -> List[object]:
"""For the given base_class list classes in the given package that do
implement it.
:param package: The package to search.
:param base_class: The base class.
:return: A list of classes that implement the base class.
"""
package = importlib.import_module(package)
abs_path = os.path.dirname(os.path.abspath(package.__file__))
class_list = []
......
# http://stackoverflow.com/questions/24238743/flask-decorator-to-verify-json-and-json-schema
"""
Documentation: http://stackoverflow.com/questions/24238743/flask-decorator-to-verify-json-and-json-schema
"""
from functools import wraps
from flask import request, jsonify
......
......@@ -11,22 +11,11 @@ manager = Manager(app)
@manager.command
def janitor():
pubsub = redis.pubsub()
pubsub.psubscribe('*')
for msg in pubsub.listen():
if msg['data'] == 'expired':
print(msg)
split = msg['channel'].split(':')
if 'shadow' == split[1] and 'data' == split[2]:
expired_key = ':'.join(split[2:])
expired_value = redis.get(expired_key)
expired_value = json.loads(expired_value.decode('utf-8'))
file_path = expired_value['file_path']
print('deleting file: ', file_path)
print('deleting redis key: ', expired_key)
redis.delete(expired_key)
remove_file.delay()
def janitor() -> None:
"""Ideally this is maintained by a systemd service to cleanup redis and the
file system while Fractalis is running.
"""
raise NotImplementedError()
if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment