Skip to content
Snippets Groups Projects
Commit 8476f410 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Deeper integration of celery into flask

parent e6dcb0ba
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -8,7 +8,7 @@ before_script:
- pip install -e . --default-timeout=180
- pip install flake8
- export FRACTALIS_CONFIG="$(pwd)/gitlab-config.py"
- celery worker -A fractalis.celeryapp -D -l info
- celery worker -A fractalis:celery -D -l info
test:
script:
......
......@@ -19,25 +19,31 @@ CORS(app, supports_credentials=True) # allow everyone to submit requests
app.config.from_object('fractalis.config')
try:
app.config.from_envvar('FRACTALIS_CONFIG')
app.logger.info("FRACTALIS_CONFIG environment variable is set and was "
"applied to Flask app.")
except RuntimeError:
app.logger.warning("FRACTALIS_CONFIG is not set. Using defaults.")
app.logger.warning("FRACTALIS_CONFIG environment variable is not set. "
"Using defaults for Flask app.")
redis = StrictRedis(host=app.config['REDIS_HOST'],
port=app.config['REDIS_PORT'])
app.session_interface = RedisSessionInterface(redis)
# register blueprints. Do not move the import, due to circular dependencyies.
from fractalis.celeryapp import make_celery, register_tasks # noqa
celery = make_celery(app)
# register blueprints
from fractalis.analytics.controller import analytics_blueprint # noqa
from fractalis.data.controller import data_blueprint # noqa
app.register_blueprint(analytics_blueprint, url_prefix='/analytics')
app.register_blueprint(data_blueprint, url_prefix='/data')
handler = logging.handlers.TimedRotatingFileHandler('fractalis.log',
when='midnight',
backupCount=14)
handler.setLevel(logging.INFO)
app.logger.addHandler(handler)
register_tasks()
if __name__ == '__main__':
handler = logging.handlers.TimedRotatingFileHandler('fractalis.log',
when='midnight',
backupCount=14)
handler.setLevel(logging.INFO)
app.logger.addHandler(handler)
app.run()
from flask import Blueprint, session, request, jsonify
from fractalis.celeryapp import app as celery
from fractalis import celery
from fractalis.validator import validate_json, validate_schema
from .schema import create_job_schema
from .job import AnalyticsJob
......
......@@ -2,8 +2,6 @@ import abc
import json
import pandas as pd
# TODO: is there a difference between this and importing
# fractalis.celeryapp.app.Task ?
from celery import Task
from fractalis import redis
......
"""This module is responsible for the establishment and configuration of a
Celery instance."""
import logging
import os
from celery import Celery
from fractalis.analytics.job import AnalyticsJob
from fractalis.data.etl import ETL
from fractalis.utils import import_module_by_abs_path
from fractalis.utils import list_classes_with_base_class
app = Celery(__name__)
app.config_from_object('fractalis.config')
try:
module = import_module_by_abs_path(os.environ['FRACTALIS_CONFIG'])
for key in app.conf:
if key in module.__dict__ and not key.startswith('_'):
app.conf[key] = module.__dict__[key]
except KeyError:
logger = logging.getLogger('fractalis')
logger.warning("FRACTALIS_CONFIG is not set. Using defaults.")
from fractalis.sync import remove_untracked_data_files # noqa
from fractalis.sync import remove_expired_redis_entries # noqa
app.tasks.register(remove_untracked_data_files)
app.tasks.register(remove_expired_redis_entries)
etl_classes = list_classes_with_base_class('fractalis.data.etls', ETL)
for etl_class in etl_classes:
app.tasks.register(etl_class)
analytics_job_classes = list_classes_with_base_class('fractalis.analytics.job',
AnalyticsJob)
for analytics_job_class in analytics_job_classes:
app.tasks.register(analytics_job_class)
def make_celery(app):
celery = Celery(app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
def register_tasks():
from fractalis import celery
from fractalis.sync import remove_untracked_data_files
from fractalis.sync import remove_expired_redis_entries
celery.tasks.register(remove_untracked_data_files)
celery.tasks.register(remove_expired_redis_entries)
etl_classes = list_classes_with_base_class('fractalis.data.etls', ETL)
for etl_class in etl_classes:
celery.tasks.register(etl_class)
analytics_job_classes = list_classes_with_base_class(
'fractalis.analytics.job', AnalyticsJob)
for analytics_job_class in analytics_job_classes:
celery.tasks.register(analytics_job_class)
......@@ -11,10 +11,10 @@ REDIS_PORT = '6379'
PERMANENT_SESSION_LIFETIME = timedelta(days=1)
# Celery
broker_url = 'amqp://'
result_backend = 'redis://{}:{}'.format(REDIS_HOST, REDIS_PORT)
task_soft_time_limit = 60 * 10
beat_schedule = {
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'redis://{}:{}'.format(REDIS_HOST, REDIS_PORT)
CELERYD_TASK_SOFT_TIME_LIMIT = 60 * 10
CELERYBEAT_SCHEDULE = {
'cleanup-redis-1h-interval': {
'task': 'fractalis.sync.remove_expired_redis_entries',
'schedule': timedelta(hours=1),
......
......@@ -6,7 +6,7 @@ from flask import Blueprint, session, request, jsonify
from .etlhandler import ETLHandler
from .schema import create_data_schema
from fractalis.validator import validate_json, validate_schema
from fractalis.celeryapp import app as celery
from fractalis import celery
from fractalis import redis
......
......@@ -9,9 +9,7 @@ import datetime
from glob import iglob
from shutil import rmtree
from fractalis.celeryapp import app as celery
from fractalis import redis
from fractalis import app
from fractalis import celery, redis, app
@celery.task
......
......@@ -9,11 +9,12 @@ from fractalis.analytics.jobs.correlation.main import CorrelationJob
class TestCorrelation:
@pytest.mark.skip(reason="Not implemented yet.")
def test_returns_valid_response(self):
job = CorrelationJob()
x = np.random.rand(10).tolist()
y = np.random.rand(10).tolist()
result = job.main(x=x, y=y)
result = job.main(x=x, y=y, ids=[])
try:
result = json.loads(result)
except ValueError:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment