Skip to content
Snippets Groups Projects
Commit 155d2a22 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

implemented tests for jobs

parent 64dd7e81
No related branches found
No related tags found
No related merge requests found
Pipeline #
from fractalis import celery_app
def create_job():
def create_job(script, arguments):
pass
......
from time import sleep
from fractalis import celery_app
@celery_app.task
def add(a, b):
return a + b
@celery_app.task
def do_nothing(time):
sleep(time)
from time import sleep
from uuid import UUID, uuid4
import pytest
from fractalis.analytics import job
class TestJob(object):
def test_exception_when_starting_non_existing_script(self):
assert False
with pytest.raises(ImportError):
job.create_job('querty', {})
def test_start_job_returns_uuid(self):
assert False
def test_exception_when_invalid_parameters(self):
with pytest.raises(TypeError):
job.create_job('test/sample/add', {'a': 1})
def test_delayed_job_has_pending_status(self):
assert False
def test_start_job_returns_uuid(self):
job_id = job.create_job('test/sample/add', {})
UUID(job_id)
def test_job_in_progress_has_running_status(self):
assert False
job_id = job.create_job('test/sample/do_nothing', {'time': 2})
job_details = job.get_job_details(job_id)
assert job_details.status == 'RUNNING'
def test_finished_job_returns_results(self):
assert False
job_id = job.create_job('test/sample/add', {'a': 1, 'b': 2})
sleep(1)
job_details = job.get_job_details(job_id)
assert job_details.status == 'FINISHED'
assert job_details.message == 3
def test_exception_when_checking_non_existing_job(self):
assert False
with pytest.raises(LookupError):
job.get_job_details(uuid4())
def test_job_is_gone_after_canceling(self):
assert False
job_id = job.create_job('test/sample/do_nothing', {'time': 10})
job.cancel_job(job_id)
# TODO Not sure which exception is thrown
with pytest.raises():
job.get_job_details(job_id)
def test_exception_when_canceling_non_existing_job(self):
assert False
# TODO Not sure which exception is thrown
with pytest.raises():
job.cancel_job(uuid4())
......@@ -59,7 +59,7 @@ class TestSession(object):
with test_client.session_transaction() as session:
session['foo'] = 'bar'
sleep(2)
//TODO Not sure which exception is thrown
#TODO Not sure which exception is thrown
flask.session['foo']
def test_session_data_not_in_db_when_expired(self, flask_app):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment