Skip to content
Snippets Groups Projects
Commit 26103cb8 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Implementing logging for fractalis

parent e3ec02ea
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -3,56 +3,74 @@
Modules in this package:
- config -- Manages Fractalis Flask app configuration
"""
import logging
import logging.config
import yaml
from flask import Flask
from flask_cors import CORS
from flask_session import Session
from redis import StrictRedis
from celery.signals import after_setup_logger, after_setup_task_logger
app = Flask(__name__)
# allow everyone to submit requests
CORS(app, supports_credentials=True)
# Configure app with defaults
app.config.from_object('fractalis.config')
# Configure app with manually settings
try:
app.config.from_envvar('FRACTALIS_CONFIG')
app.logger.info("FRACTALIS_CONFIG environment variable is set and was "
"applied to Flask app.")
default_config = False
except RuntimeError:
app.logger.warning("FRACTALIS_CONFIG environment variable is not set. "
"Using defaults for Flask app.")
default_config = True
pass
# setup logging
with open('logging.yaml', 'rt') as f:
log_config = yaml.safe_load(f.read())
logging.config.dictConfig(log_config)
log = logging.getLogger(__name__)
# we can't log this earlier because the logger depends on the loaded app config
if default_config:
log.error("Environment Variable FRACTALIS_CONFIG not set. Falling back "
"to default settings. This is not a good idea in production!")
# create a redis instance
log.info("Creating Redis connection.")
redis = StrictRedis(host=app.config['REDIS_HOST'],
port=app.config['REDIS_PORT'])
# Configure app with composed configurations to save admin some work
app.config['SESSION_REDIS'] = redis
app.config['CELERY_RESULT_BACKEND'] = 'redis://{}:{}'.format(
app.config['REDIS_HOST'], app.config['REDIS_PORT'])
app.config['REDIS_HOST'], app.config['REDIS_PORT'])
# Set new session interface for app
log.info("Replacing default session interface.")
Session(app)
# allow everyone to submit requests
log.info("Setting up CORS.")
CORS(app, supports_credentials=True)
# create celery app
log.info("Creating celery app.")
from fractalis.celeryapp import make_celery, register_tasks # noqa
celery = make_celery(app)
# register blueprints
from fractalis.analytics.controller import analytics_blueprint # noqa
from fractalis.data.controller import data_blueprint # noqa
log.info("Registering Flask blueprints.")
app.register_blueprint(analytics_blueprint, url_prefix='/analytics')
app.register_blueprint(data_blueprint, url_prefix='/data')
log.info("Registering celery tasks.")
register_tasks()
log.info("Initialisation of service complete.")
if __name__ == '__main__':
handler = logging.handlers.TimedRotatingFileHandler('fractalis.log',
when='midnight',
backupCount=14)
handler.setLevel(logging.INFO)
app.logger.addHandler(handler)
log.info("Starting builtin web server.")
app.run()
log.info("Builtin web server started.")
"""This module is responsible for the establishment and configuration of a
Celery instance."""
import logging
from celery import Celery
from fractalis.analytics.job import AnalyticsJob
......@@ -8,6 +10,9 @@ from fractalis.data.etl import ETL
from fractalis.utils import list_classes_with_base_class
logger = logging.getLogger(__name__)
def make_celery(app):
celery = Celery(app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
......@@ -29,13 +34,17 @@ def make_celery(app):
def register_tasks():
from fractalis import celery
logger.info("Registering ETLs ...")
etl_classes = list_classes_with_base_class('fractalis.data.etls', ETL)
for etl_class in etl_classes:
logger.info("Registering task: {}".format(etl_class.name))
celery.tasks.register(etl_class)
logger.info("Registering analysis tasks ...")
analytics_job_classes = list_classes_with_base_class(
'fractalis.analytics.job', AnalyticsJob)
for analytics_job_class in analytics_job_classes:
logger.info("Registering task: {}".format(analytics_job_class.name))
celery.tasks.register(analytics_job_class)
......
import os
import logging
from uuid import uuid4
from datetime import timedelta
# DO NOT MODIFY THIS FILE DIRECTLY
# Flask
SECRET_KEY = str(uuid4()) # set me manually in production
DEBUG = False
......@@ -15,7 +17,6 @@ SESSION_COOKIE_SECURE = False
SESSION_REFRESH_EACH_REQUEST = True
PERMANENT_SESSION_LIFETIME = timedelta(days=1)
# Flask-Session
SESSION_TYPE = 'redis'
SESSION_PERMANENT = True
......@@ -24,10 +25,14 @@ SESSION_USE_SIGNER = False
# Celery
BROKER_URL = 'amqp://'
CELERYD_TASK_SOFT_TIME_LIMIT = 60 * 10
CELERYD_HIJACK_ROOT_LOGGER = False
# Fractalis
LOG_LEVEL = logging.INFO
LOG_FILE = os.path.join(os.sep, 'tmp', 'fractalis.log')
FRACTALIS_TMP_DIR = os.path.abspath(os.path.join(
os.sep, 'tmp', 'fractalis'))
FRACTALIS_CACHE_EXP = timedelta(days=10)
# DO NOT MODIFY THIS FILE DIRECTLY
......@@ -91,12 +91,6 @@ def get_data_state(params):
return jsonify({'data_state': data_obj}), 200
def delete_data_id(data_id):
redis.delete('data:{}'.format(data_id))
redis.delete('shadow:data:{}'.format(data_id))
session['data_ids'].remove(data_id)
@data_blueprint.route('/<string:params>', methods=['DELETE'])
def delete_data(params):
wait = request.args.get('wait') == '1'
......@@ -116,12 +110,14 @@ def delete_data(params):
async_result = remove_file.delay(file_path)
if wait:
async_result.get(propagate=False)
delete_data_id(data_id)
redis.delete('data:{}'.format(data_id))
redis.delete('shadow:data:{}'.format(data_id))
session['data_ids'].remove(data_id)
return '', 200
@data_blueprint.route('', methods=['DELETE'])
def delete_all_data():
for data_id in session['data_ids']:
delete_data_id(data_id)
delete_data(data_id)
return '', 200
......@@ -2,6 +2,7 @@
import abc
import json
import logging
from typing import List
from celery import Task
......@@ -10,6 +11,9 @@ from pandas import DataFrame
from fractalis import redis
logger = logging.getLogger(__name__)
class ETL(Task, metaclass=abc.ABCMeta):
"""This is an abstract class that implements a celery Task and provides a
factory method to create instances of implementations of itself. Its main
......@@ -136,11 +140,19 @@ class ETL(Task, metaclass=abc.ABCMeta):
:param session_id: The id of the session that requested this job
:param data_id: The id of the data object that is related to this ETL
"""
logger.info("Starting ETL process ...")
logger.info("(E)xtracting data from server '{}'.".format(server))
raw_data = self.extract(server, token, descriptor)
logger.info("(T)ransforming data to Fractalis format.")
data_frame = self.transform(raw_data)
if not isinstance(data_frame, DataFrame):
raise TypeError("transform() must return 'pandas.DataFrame', but"
"returned '{}' instead.".format(type(data_frame)))
try:
raise TypeError(
"transform() must return 'pandas.DataFrame', but returned "
"'{}' instead.".format(type(data_frame)))
except TypeError as e:
logging.exception(e)
raise
self.load(data_frame, file_path)
# at this point we know that the session has permission to read the data
# otherwise authentication with target API would have failed
......
version: 1
disable_existing_loggers: False
formatters:
default:
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
handlers:
default:
class: logging.handlers.TimedRotatingFileHandler
level: WARNING
formatter: default
filename: fractalis.log
when: midnight
backupCount: 14
encoding: utf8
console:
class: logging.StreamHandler
level: DEBUG
formatter: default
root:
level: INFO
handlers: [default, console]
fractalis:
Flask:
level: INFO
handlers: [default, console]
\ No newline at end of file
......@@ -17,7 +17,8 @@ setup(
'pandas',
'numpy',
'scipy',
'requests'
'requests',
'PyYAML'
],
setup_requires=[
'pytest-runner',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment