Skip to content
Snippets Groups Projects
Commit 6d542d68 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Improved the sync module, but tests not implemented yet.

parent a7bae8b9
No related branches found
No related tags found
No related merge requests found
Pipeline #
"""This module is responsible for the establishment and configuration of a
Celery instance."""
import os
import logging
import os
from celery import Celery
from fractalis.utils import list_classes_with_base_class
from fractalis.utils import import_module_by_abs_path
from fractalis.data.etl import ETL
from fractalis.analytics.job import AnalyticsJob
from fractalis.data.etl import ETL
from fractalis.utils import import_module_by_abs_path
from fractalis.utils import list_classes_with_base_class
app = Celery(__name__)
app.config_from_object('fractalis.config')
......@@ -22,8 +21,10 @@ except KeyError:
logger = logging.getLogger('fractalis')
logger.warning("FRACTALIS_CONFIG is not set. Using defaults.")
from fractalis.data.sync import cleanup # noqa
app.tasks.register(cleanup)
from fractalis.sync import remove_untracked_data_files # noqa
from fractalis.sync import remove_expired_redis_entries # noqa
app.tasks.register(remove_untracked_data_files)
app.tasks.register(remove_expired_redis_entries)
etl_classes = list_classes_with_base_class('fractalis.data.etls', ETL)
for etl_class in etl_classes:
......
......@@ -15,8 +15,12 @@ broker_url = 'amqp://'
result_backend = 'redis://{}:{}'.format(REDIS_HOST, REDIS_PORT)
task_soft_time_limit = 60 * 10
beat_schedule = {
'cleanup-every-hour': {
'task': 'fractalis.data.sync.cleanup',
'cleanup-redis-1h-interval': {
'task': 'fractalis.sync.remove_expired_redis_entries',
'schedule': timedelta(hours=1),
},
'cleanup-fs-1h-interval': {
'task': 'fractalis.sync.remove_untracked_data_files',
'schedule': timedelta(hours=1),
}
}
......
......@@ -86,7 +86,7 @@ class ETL(Task, metaclass=abc.ABCMeta):
"""
pass
def load(self, data_frame: DataFrame, file_path: str):
def load(self, data_frame: DataFrame, file_path: str) -> None:
"""Load (save) the data to the file system.
:param data_frame: DataFrame to write.
......@@ -95,7 +95,7 @@ class ETL(Task, metaclass=abc.ABCMeta):
data_frame.to_csv(file_path)
def run(self, server: str, token: str,
descriptor: dict, file_path: str):
descriptor: dict, file_path: str) -> None:
"""Run the current task.
This is called by the celery worker.
......
import os
import json
import datetime
from shutil import rmtree
from fractalis.celery import app as celery
from fractalis import redis
from fractalis import app
@celery.task
def cleanup(cache_expr=None):
cache_expr = (app.config['FRACTALIS_CACHE_EXP'] if cache_expr is None
else cache_expr)
data = redis.hgetall(name='data')
for key in data:
data_obj = data[key].decode('utf-8')
data_obj = json.loads(data_obj)
last_access = datetime.datetime.fromtimestamp(data_obj['last_access'])
now = datetime.datetime.now()
delta = now - last_access
if delta > cache_expr:
try:
os.remove(data_obj['file_path'])
except FileNotFoundError:
pass
redis.hdel('data', key)
def cleanup_all():
redis.flushall()
tmp_dir = app.config['FRACTALIS_TMP_DIR']
if os.path.exists(tmp_dir):
rmtree(tmp_dir)
"""This module provides functions and celery tasks used to cleanup expired
items. It is also used to keep several components synchronized, e.g. the redis
db and the file system.
"""
import os
import json
import datetime
from glob import iglob
from shutil import rmtree
from fractalis.celery import app as celery
from fractalis import redis
from fractalis import app
@celery.task
def remove_expired_redis_entries() -> None:
"""Remove expired entries from the redis DB."""
cache_expr = app.config['FRACTALIS_CACHE_EXP']
redis_data = redis.hgetall(name='data')
for key in redis_data:
data_obj = json.loads(redis_data[key].decode('utf-8'))
last_access = datetime.datetime.fromtimestamp(data_obj['last_access'])
now = datetime.datetime.now()
delta = now - last_access
if delta > cache_expr:
redis.hdel('data', key)
remove_untracked_data_files()
@celery.task
def remove_untracked_data_files() -> None:
"""Remove files that have no record in the redis DB"""
tmp_dir = app.config['FRACTALIS_TMP_DIR']
data_dir = os.path.join(tmp_dir, 'data')
redis_data = redis.hdel('data')
for file_path in iglob(os.path.join(data_dir, '*.py')):
# check if file tracked by redis
is_tracked = False
for key in redis_data:
data_obj = json.loads(redis_data[key].decode('utf-8'))
if data_obj['file_path'] == file_path:
is_tracked = True
break
if not is_tracked:
os.remove(file_path)
def cleanup_all() -> None:
"""Reset redis and the filesystem. This is only useful for testing and
should !!!NEVER!!! be used otherwise."""
redis.flushall()
tmp_dir = app.config['FRACTALIS_TMP_DIR']
if os.path.exists(tmp_dir):
rmtree(tmp_dir)
"""This module tests the analytics controller module."""
import uuid
import time
import json
import time
import uuid
from uuid import uuid4
import flask
import pytest
from fractalis.data import sync
from fractalis import sync
class TestAnalytics:
......
"""This module tests the data controller module."""
import os
import json
import datetime
import os
from uuid import UUID, uuid4
import flask
import pytest
from fractalis import redis
from fractalis import app
from fractalis.data import sync
from fractalis import redis
from fractalis import sync
class TestData:
......@@ -239,18 +238,6 @@ class TestData:
assert data_obj['job_id']
assert data_obj['file_path']
def test_valid_state_after_cleanup(self, test_client, big_post):
rv = big_post(random=False)
body = flask.json.loads(rv.get_data())
data_dir = os.path.join(app.config['FRACTALIS_TMP_DIR'], 'data')
assert rv.status_code == 201, body
assert test_client.get('/data?wait=1').status_code == 200
assert redis.hgetall(name='data')
assert len(os.listdir(data_dir))
sync.cleanup(datetime.timedelta(seconds=0))
assert not redis.hgetall(name='data')
assert not len(os.listdir(data_dir))
def test_GET_by_id_and_valid_response(self, test_client, big_post):
rv = big_post(random=False)
body = flask.json.loads(rv.get_data())
......
"""This module provides several methods that help keeping the web service
components synchronized.
"""
from fractalis import app, redis, sync
class TestSync:
def test_expired_entries_removed(self):
assert False
def test_tracked_files_removed_after_redis_cleanup(self):
assert False
def test_untracked_removed(self):
assert False
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment