Skip to content
Snippets Groups Projects
Commit ca59b6c2 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Fixed bug that could cause deadlock. Replaced 'PENDING' state with 'NOTFOUND'.

parent 813e6d41
No related branches found
No related tags found
No related merge requests found
......@@ -57,13 +57,15 @@ def get_task_details(task_id: UUID) -> Tuple[Response, int]:
logger.warning(error)
return jsonify({'error': error}), 403
async_result = celery.AsyncResult(task_id)
if wait:
state = async_result.state
state = 'NOTFOUND' if state == 'PENDING' else state
if wait and state == 'SUBMITTED':
async_result.get(propagate=False)
result = async_result.result
if isinstance(result, Exception): # Exception -> str
result = "{}: {}".format(type(result).__name__, str(result))
logger.debug("Task found and has access. Sending response.")
return jsonify({'state': async_result.state, 'result': result}), 200
return jsonify({'state': state, 'result': result}), 200
@analytics_blueprint.route('/<uuid:task_id>', methods=['DELETE'])
......
......@@ -46,7 +46,9 @@ def get_data_state_for_task_id(task_id: str, wait: bool) -> dict:
:return: Data state that has been stored in Redis.
"""
async_result = celery.AsyncResult(task_id)
if wait:
state = async_result.state
state = 'NOTFOUND' if state == 'PENDING' else state
if wait and state == 'SUBMITTED':
logger.debug("'wait' was set. Waiting for tasks to finish ...")
async_result.get(propagate=False)
value = redis.get('data:{}'.format(task_id))
......@@ -61,7 +63,7 @@ def get_data_state_for_task_id(task_id: str, wait: bool) -> dict:
if isinstance(result, Exception): # Exception -> str
result = "{}: {}".format(type(result).__name__, str(result))
data_state['etl_message'] = result
data_state['etl_state'] = async_result.state
data_state['etl_state'] = state
return data_state
......
......@@ -73,7 +73,7 @@ def request_state_access(state_id: UUID) -> Tuple[Response, int]:
# if all task finish successfully we now that session has access to state
session['state_access'][state_id] = task_ids
logger.debug("Tasks successfully submitted. Sending response.")
return jsonify(''), 201
return jsonify(''), 202
@state_blueprint.route('/<uuid:state_id>', methods=['GET'])
......@@ -88,8 +88,7 @@ def get_state_data(state_id: UUID) -> Tuple[Response, int]:
wait = request.args.get('wait') == '1'
for task_id in session['state_access'][state_id]:
data_state = get_data_state_for_task_id(task_id=task_id, wait=wait)
if (data_state['etl_state'] == 'PENDING' or
data_state['etl_state'] == 'SUBMITTED'):
if data_state['etl_state'] == 'SUBMITTED':
return jsonify({'message': 'ETLs are still running.'}), 200
elif data_state['etl_state'] == 'SUCCESS':
continue
......
......@@ -50,13 +50,12 @@ def cleanup_all() -> None:
celery.control.purge()
for key in redis.keys('data:*'):
value = redis.get(key)
try:
data_state = json.loads(value)
celery.AsyncResult(data_state.get('task_id')).get(propagate=False)
except ValueError:
pass
# celery.control.revoke(data_state['task_id'], terminate=True,
# signal='SIGUSR1', wait=True)
data_state = json.loads(value)
task_id = data_state.get('task_id')
if task_id is not None:
async_result = celery.AsyncResult(task_id)
if async_result.state == 'SUBMITTED':
async_result.get(propagate=False)
redis.flushall()
tmp_dir = app.config['FRACTALIS_TMP_DIR']
if os.path.exists(tmp_dir):
......
......@@ -30,7 +30,7 @@ class TestState:
rv = test_client.post('/state/{}'.format(str(uuid4())))
assert rv.status_code == 404
def test_error_if_state_id_is_no_uuid(self, test_client):
def test_400_if_state_id_is_no_uuid(self, test_client):
rv = test_client.post('/state/123')
assert rv.status_code == 400
......@@ -43,13 +43,57 @@ class TestState:
assert 'error' in body
assert 'given payload cannot be saved' in body['error']
def test_201_and_valid_state_if_valid_conditions(self, test_client):
def test_202_create_valid_state_if_valid_conditions(self, test_client):
uuid = str(uuid4())
redis.set('state:{}'.format(uuid), {'meta': {'descriptor': 'foo'}})
redis.set('data:{}'.format(uuid), {'meta': {'descriptor': 'foo'}})
rv = test_client.post('/state/{}'.format(uuid))
assert rv.status_code == 201
body = flask.json.loads(rv.get_data())
assert rv.status_code == 202, body
assert not body
with test_client.session_transaction() as sess:
assert sess['data_tasks']
assert sess['state_access']
assert sess['data_tasks'] == sess['state_access']
assert [UUID(uuid) for id in sess['data_tasks']]
\ No newline at end of file
assert [UUID(uuid) for uuid in sess['data_tasks']]
def test_404_if_get_non_existing_state(self, test_client):
uuid = str(uuid4())
rv = test_client.post('/state/{}'.format(uuid))
assert rv.status_code == 404
def test_400_if_get_non_uuid_state(self, test_client):
rv = test_client.post('/state/123')
assert rv.status_code == 400
def test_403_if_get_not_previously_self_requested_state(self, test_client):
uuid = str(uuid4())
redis.set('data:{}'.format(uuid), {'meta': {'descriptor': 'foo'}})
rv = test_client.post('/state', data=flask.json.dumps('test'))
body = flask.json.loads(rv.get_data())
assert rv.status_code == 201, body
state_id = body['state_id']
rv = test_client.post('/state/{}'.format(state_id))
body = flask.json.loads(rv.get_data())
assert rv.status_code == 202, body
with test_client.session_transaction() as sess:
del sess['state_access'][state_id]
rv = test_client.get('/state/{}'.format(state_id))
body = flask.json.loads(rv.get_data())
assert rv.status_code == 403, body
assert 'error' in body
def test_return_state(self, test_client):
uuid = str(uuid4())
redis.set('data:{}'.format(uuid), {'meta': {'descriptor': 'foo'}})
rv = test_client.post('/state', data=flask.json.dumps('test'))
body = flask.json.loads(rv.get_data())
assert rv.status_code == 201, body
state_id = body['state_id']
rv = test_client.post('/state/{}'.format(state_id))
body = flask.json.loads(rv.get_data())
assert rv.status_code == 202, body
rv = test_client.get('/state/{}'.format(state_id))
body = flask.json.loads(rv.get_data())
assert rv.status_code == 200, body
assert 'state' in body
assert body['state'] == 'test'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment