Skip to content
Snippets Groups Projects
Commit 8ddcdf8b authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

implemented celery beat for cleanup tasks

parent 2f96fcf5
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -8,3 +8,4 @@ __pycache__/
*.rdb
*.log
*.idea
celerybeat-schedule
......@@ -22,6 +22,9 @@ except KeyError:
logger = logging.getLogger('fractalis')
logger.warning("FRACTALIS_CONFIG is not set. Using defaults.")
from fractalis.data.sync import cleanup
app.tasks.register(cleanup)
etl_classes = list_classes_with_base_class('fractalis.data.etls', ETL)
for etl_class in etl_classes:
app.tasks.register(etl_class)
......
......@@ -13,8 +13,15 @@ PERMANENT_SESSION_LIFETIME = timedelta(days=1)
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'redis://{}:{}'.format(REDIS_HOST, REDIS_PORT)
CELERY_TASK_TIME_LIMIT = 60 * 10
CELERYBEAT_SCHEDULE = {
'cleanup-every-hour': {
'task': 'fractalis.data.sync.cleanup',
'schedule': timedelta(hours=1),
}
}
FRACTALIS_TMP_DIR = os.path.abspath(os.path.join(
os.sep, 'tmp', 'fractalis'))
FRACTALIS_CACHE_EXP = timedelta(days=10)
# DO NOT MODIFY THIS FILE!
import json
import time
from flask import Blueprint, session, request, jsonify
......@@ -36,6 +37,11 @@ def get_data_by_id(data_id, wait):
value = redis.hget(name='data', key=data_id)
value = value.decode('utf-8')
data_obj = json.loads(value)
# update last_access field
data_obj['last_access'] = time.time()
redis.hset(name='data', key=data_id, value=json.dumps(data_obj))
job_id = data_obj['job_id']
async_result = celery.AsyncResult(job_id)
if wait:
......@@ -58,7 +64,8 @@ def get_data_by_params(params):
except ValueError:
data_id = params
if data_id not in session['data_ids']: # access control
return jsonify({'error_msg': "No matching data found."}), 404
return jsonify(
{'error_msg': "No matching data found. Maybe expired?"}), 404
data_obj = get_data_by_id(data_id, wait)
return jsonify(data_obj), 200
......
import os
import abc
import json
import time
from hashlib import sha256
from uuid import uuid4
......@@ -46,7 +47,11 @@ class ETLHandler(metaclass=abc.ABCMeta):
token=self._token,
descriptor=descriptor,
file_path=file_path)
data_obj = {'file_path': file_path, 'job_id': async_result.id}
data_obj = {
'file_path': file_path,
'job_id': async_result.id,
'last_access': time.time()
}
redis.hset(name='data', key=data_id, value=json.dumps(data_obj))
data_ids.append(data_id)
return data_ids
......
import os
import json
import datetime
from fractalis.celery import app as celery
from fractalis import redis
from fractalis import app
@celery.task
def cleanup(cache_expr=None):
cache_expr = (app.config['FRACTALIS_CACHE_EXP'] if cache_expr is None
else cache_expr)
data = redis.hgetall(name='data')
for key in data:
data_obj = data[key].decode('utf-8')
data_obj = json.loads(data_obj)
last_access = datetime.datetime.fromtimestamp(data_obj['last_access'])
now = datetime.datetime.now()
delta = now - last_access
if delta > cache_expr:
os.remove(data_obj['file_path'])
redis.hdel('data', key)
......@@ -2,6 +2,7 @@
import os
import json
import datetime
from glob import glob
from uuid import UUID, uuid4
......@@ -10,6 +11,7 @@ import pytest
from fractalis import redis
from fractalis import app
from fractalis.data import sync
class TestData:
......@@ -245,3 +247,14 @@ class TestData:
data_obj = json.loads(data[key].decode('utf-8'))
assert data_obj['job_id']
assert data_obj['file_path']
def test_valid_state_after_cleanup(self, big_post):
rv = big_post(random=False)
body = flask.json.loads(rv.get_data())
data_dir = os.path.join(app.config['FRACTALIS_TMP_DIR'], 'data')
assert rv.status_code == 201, body
assert redis.hgetall(name='data')
assert len(os.listdir(data_dir))
sync.cleanup(datetime.timedelta(seconds=0))
assert not redis.hgetall(name='data')
assert not len(os.listdir(data_dir))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment