Newer
Older
import datetime
from typing import Optional, List
import pycurl
from django.forms.models import model_to_dict
from django_cron import CronJobBase, Schedule
from web.models import ConfigurationItem, StudySubject, Language, AppointmentType, Appointment, Visit, Study, \
Provenance, Worker, User
from web.models.constants import REDCAP_TOKEN_CONFIGURATION_TYPE, \
REDCAP_BASE_URL_CONFIGURATION_TYPE, CRON_JOB_TIMEOUT, RED_CAP_LANGUAGE_4_FIELD_TYPE, \
RED_CAP_LANGUAGE_3_FIELD_TYPE, RED_CAP_LANGUAGE_2_FIELD_TYPE, RED_CAP_LANGUAGE_1_FIELD_TYPE, \
RED_CAP_MPOWER_ID_FIELD_TYPE, RED_CAP_DEAD_FIELD_TYPE, RED_CAP_SEX_FIELD_TYPE, RED_CAP_DATE_BORN_FIELD_TYPE, \
RED_CAP_ND_NUMBER_FIELD_TYPE, RED_CAP_VIRUS_FIELD_TYPE, GLOBAL_STUDY_ID, RED_CAP_SAMPLE_DATE_FIELD_TYPE, \
RED_CAP_KIT_ID_FIELD_TYPE, RED_CAP_IGA_STATUS_FIELD_TYPE, RED_CAP_IGG_STATUS_FIELD_TYPE, IMPORTER_USER, \
IMPORT_APPOINTMENT_TYPE
from web.models.inconsistent_subject import InconsistentField, InconsistentSubject
from web.models.missing_subject import MissingSubject
logger = logging.getLogger(__name__)
class RedcapSubject:
url = None
nd_number = None
date_born = None
sex = None
dead = None
languages = None
mpower_id = None
def __init__(self):
self.languages = []
self.visits = []
def add_language(self, language):
if language is not None:
self.languages.append(language)
class RedcapVisit:
virus = None
visit_number = 0
virus_collection_date = None
iga_status = None
igg_status = None
def different_string(string1, string2):
if type(string1) == bytes:
s1 = string1.decode('utf8')
else:
s1 = string1
if type(string2) == bytes:
s2 = string1.decode('utf8')
else:
s2 = string2
if s1 is None:
s1 = ""
if s2 is None:
s2 = ""
return s1.strip() != s2.strip()
def date_equals(date1: str, date2: datetime) -> bool:
if (date1 is None or date1 == '') and date2 is None:
if date1 is None or date1 == '' or date2 is None:
return date1 == date2.strftime("%Y-%m-%d")
class RedcapConnector:
def __init__(self):
self.token = None
self.base_url = None
items = ConfigurationItem.objects.filter(type=REDCAP_TOKEN_CONFIGURATION_TYPE)
if len(items) > 0:
if items[0].value:
self.token = items[0].value
items = ConfigurationItem.objects.filter(type=REDCAP_BASE_URL_CONFIGURATION_TYPE)
if len(items) > 0:
if items[0].value:
self.base_url = items[0].value
self.language_by_name = {}
languages = Language.objects.all()
for language in languages:
self.language_by_name[language.name.lower()] = language
self.date_born_field = ConfigurationItem.objects.get(type=RED_CAP_DATE_BORN_FIELD_TYPE).value
self.sex_field = ConfigurationItem.objects.get(type=RED_CAP_SEX_FIELD_TYPE).value
self.nd_number_field = ConfigurationItem.objects.get(type=RED_CAP_ND_NUMBER_FIELD_TYPE).value
self.dead_field = ConfigurationItem.objects.get(type=RED_CAP_DEAD_FIELD_TYPE).value
self.language_1_field = ConfigurationItem.objects.get(type=RED_CAP_LANGUAGE_1_FIELD_TYPE).value
self.language_2_field = ConfigurationItem.objects.get(type=RED_CAP_LANGUAGE_2_FIELD_TYPE).value
self.language_3_field = ConfigurationItem.objects.get(type=RED_CAP_LANGUAGE_3_FIELD_TYPE).value
self.language_4_field = ConfigurationItem.objects.get(type=RED_CAP_LANGUAGE_4_FIELD_TYPE).value
self.m_power_id_field = ConfigurationItem.objects.get(type=RED_CAP_MPOWER_ID_FIELD_TYPE).value
self.virus_field = ConfigurationItem.objects.get(type=RED_CAP_VIRUS_FIELD_TYPE).value
self.sample_kit_id_field = ConfigurationItem.objects.get(type=RED_CAP_KIT_ID_FIELD_TYPE).value
Piotr Gawron
committed
self.sample_date_field = ConfigurationItem.objects.get(type=RED_CAP_SAMPLE_DATE_FIELD_TYPE).value
self.iga_status_field = ConfigurationItem.objects.get(type=RED_CAP_IGA_STATUS_FIELD_TYPE).value
self.igg_status_field = ConfigurationItem.objects.get(type=RED_CAP_IGG_STATUS_FIELD_TYPE).value
self.study = Study.objects.get(id=GLOBAL_STUDY_ID)
importer_user_name = ConfigurationItem.objects.get(type=IMPORTER_USER).value
if importer_user_name is not None and importer_user_name != '':
user = User.objects.filter(username=importer_user_name)
if user is None:
logger.warning("User does not exist: " + importer_user_name)
else:
self.importer_user = Worker.objects.filter(user=user)
def find_missing(self):
pid = self.get_project_id()
redcap_version = self.get_redcap_version()
red_cap_subjects = self.get_red_cap_subjects()
red_cap_subject_by_nd = {}
for subject in red_cap_subjects:
red_cap_subject_by_nd[subject.nd_number] = subject
smash_subjects = StudySubject.objects.exclude(nd_number='')
smash_subject_by_nd = {}
for subject in smash_subjects:
smash_subject_by_nd[subject.nd_number] = subject
result = []
for subject in red_cap_subjects:
if smash_subject_by_nd.get(subject.nd_number) is None:
url = self.create_redcap_link(pid, redcap_version, subject)
result.append(MissingSubject.create(red_cap_subject=subject, smash_subject=None, url=url))
for subject in smash_subjects:
if red_cap_subject_by_nd.get(subject.nd_number) is None:
result.append(MissingSubject.create(red_cap_subject=None, smash_subject=subject))
return result
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@staticmethod
def add_missing(missing_subjects):
MissingSubject.objects.filter(ignore=False).delete()
ignored_missing_subjects = MissingSubject.objects.all()
ignored_redcap_by_nd_number = {}
ignored_smash_by_nd_number = {}
for missing_subject in ignored_missing_subjects:
if missing_subject.redcap_id is not None:
ignored_redcap_by_nd_number[missing_subject.redcap_id] = missing_subject
if missing_subject.subject is not None:
ignored_smash_by_nd_number[missing_subject.subject.nd_number] = missing_subject
for missing_subject in missing_subjects:
ignored = False
if missing_subject.redcap_id is not None and ignored_redcap_by_nd_number.get(
missing_subject.redcap_id) is not None:
ignored = True
if missing_subject.subject is not None and ignored_smash_by_nd_number.get(
missing_subject.subject.nd_number) is not None:
ignored = True
if not ignored:
MissingSubject.objects.create(subject=missing_subject.subject, redcap_id=missing_subject.redcap_id,
redcap_url=missing_subject.redcap_url)
@staticmethod
def add_inconsistent(inconsistent_subjects):
InconsistentField.objects.all().delete()
InconsistentSubject.objects.all().delete()
for inconsistent_subject in inconsistent_subjects:
subject = InconsistentSubject.objects.create(subject=inconsistent_subject.subject,
redcap_url=inconsistent_subject.redcap_url)
for field in inconsistent_subject.fields:
InconsistentField.objects.create(
name=field.name,
smash_value=field.smash_value,
redcap_value=field.redcap_value,
inconsistent_subject=subject)
def refresh_missing(self):
missing = self.find_missing()
self.add_missing(missing)
def refresh_inconsistent(self):
inconsistent = self.find_inconsistent()
self.add_inconsistent(inconsistent)
def find_inconsistent(self):
appointment_type_code_to_finish = ConfigurationItem.objects.get(type=IMPORT_APPOINTMENT_TYPE).value
appointment_type_to_finish = None
if appointment_type_code_to_finish is not None:
appointment_types = AppointmentType.objects.filter(code=appointment_type_code_to_finish)
if len(appointment_types) > 0:
appointment_type_to_finish = appointment_types[0]
pid = self.get_project_id()
redcap_version = self.get_redcap_version()
red_cap_subjects = self.get_red_cap_subjects()
red_cap_subject_by_nd = {}
for subject in red_cap_subjects:
red_cap_subject_by_nd[subject.nd_number] = subject
smash_subjects = StudySubject.objects.exclude(nd_number='')
result = []
for subject in smash_subjects:
red_cap_subject = red_cap_subject_by_nd.get(subject.nd_number)
if red_cap_subject is not None:
url = self.create_redcap_link(pid, redcap_version, subject)
inconsistent_subject = self.create_inconsistency_subject(red_cap_subject, subject, url)
if inconsistent_subject is not None:
result.append(inconsistent_subject)
if appointment_type_to_finish is not None:
for visit in red_cap_subject.visits:
smasch_visits = Visit.objects.filter(visit_number=visit.visit_number, subject=subject)
smasch_appointments = Appointment.objects.filter(
visit__in=smasch_visits,
appointment_types=appointment_type_to_finish,
status=Appointment.APPOINTMENT_STATUS_SCHEDULED)
for smasch_appointment in smasch_appointments:
smasch_appointment.mark_as_finished()
if not smasch_appointment.visit.is_finished:
description = '{} changed from "{}" to "{}"'.format(
'is_finished',
smasch_appointment.visit.is_finished,
True)
p = Provenance(modified_table=Visit._meta.db_table,
modified_table_id=smasch_appointment.visit.id,
modification_author=self.importer_user,
previous_value=smasch_appointment.visit.is_finished,
new_value=True,
modification_description=description,
modified_field='is_finished')
smasch_appointment.visit.is_finished = True
smasch_appointment.visit.save()
self.update_data_from_redcap(subject, visit)
return result
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def update_data_from_redcap(self, subject: StudySubject, visit: RedcapVisit) -> List[Provenance]:
result = []
if visit.virus is not None:
changes = []
for i in range(1, 6):
if visit.visit_number == i:
result_label = "Virus {} RT-PCR".format(i - 1)
updated_label = "Visit {} RT-PCR update date".format(i - 1)
collect_label = "Visit {} RT-PCR collection date".format(i - 1)
iga_label = "Visit {} IgA Status".format(i - 1)
igg_label = "Visit {} IgG Status".format(i - 1)
if subject.get_custom_field_value(result_label) != visit.virus:
changes.extend([(result_label, visit.virus),
(updated_label, datetime.datetime.now().strftime("%Y-%m-%d"))])
if not date_equals(subject.get_custom_field_value(collect_label), visit.virus_collection_date):
changes.extend([(collect_label, visit.virus_collection_date.strftime("%Y-%m-%d"))])
if subject.get_custom_field_value(iga_label) != visit.iga_status:
changes.extend([(iga_label, visit.iga_status)])
if subject.get_custom_field_value(igg_label) != visit.igg_status:
changes.extend([(igg_label, visit.igg_status)])
if len(changes) > 0:
for field, new_value in changes:
old_value = subject.get_custom_field_value(field)
description = '{} changed from "{}" to "{}"'.format(field, old_value, new_value)
p = Provenance(modified_table=StudySubject._meta.db_table,
modified_table_id=subject.id,
modification_author=self.importer_user,
previous_value=old_value,
new_value=new_value,
modification_description=description,
modified_field=field)
subject.set_custom_field_value(field, new_value)
p.save()
result.append(p)
subject.save()
return result
@staticmethod
def check_sex_consistency(red_cap_subject, study_subject):
if study_subject.subject.sex != red_cap_subject.sex:
return InconsistentField.create("sex", study_subject.subject.sex, red_cap_subject.sex)
@staticmethod
def check_birth_date_consistency(red_cap_subject, study_subject):
subject_date_born = ""
Piotr Gawron
committed
if study_subject.subject.date_born is not None:
subject_date_born = study_subject.subject.date_born.strftime('%Y-%m-%d')
redcap_subject_date_born = red_cap_subject.date_born
if redcap_subject_date_born is None:
redcap_subject_date_born = ""
if len(redcap_subject_date_born) > 10:
redcap_subject_date_born = redcap_subject_date_born[:10]
if subject_date_born != redcap_subject_date_born:
return InconsistentField.create("date of birth", subject_date_born, redcap_subject_date_born)
@staticmethod
def check_dead_consistency(red_cap_subject, study_subject):
if study_subject.subject.dead != red_cap_subject.dead:
return InconsistentField.create("dead", str(study_subject.subject.dead), str(red_cap_subject.dead))
@staticmethod
def check_mpower_id_consistency(red_cap_subject: RedcapSubject, study_subject: StudySubject) \
-> Optional[InconsistentField]:
if different_string(study_subject.get_custom_field_value('MPower ID'), red_cap_subject.mpower_id):
return InconsistentField.create("mpower id", study_subject.get_custom_field_value('MPower ID'),
red_cap_subject.mpower_id)
@staticmethod
def check_languages_consistency(red_cap_subject, study_subject):
missing_language = False
if len(red_cap_subject.languages) < 4:
Piotr Gawron
committed
for language in study_subject.subject.languages.all():
if language not in red_cap_subject.languages:
missing_language = True
for language in red_cap_subject.languages:
Piotr Gawron
committed
if language not in study_subject.subject.languages.all():
missing_language = True
if missing_language:
subject_languages = ""
Piotr Gawron
committed
for language in study_subject.subject.languages.all():
subject_languages += language.name + ", "
red_cap_subject_languages = ""
for language in red_cap_subject.languages:
red_cap_subject_languages += language.name + ", "
return InconsistentField.create("languages", subject_languages, red_cap_subject_languages)
@staticmethod
def create_inconsistency_subject(red_cap_subject: RedcapSubject, study_subject: StudySubject,
url: str) -> InconsistentSubject:
# func dict
field_checks = {
'sex': RedcapConnector.check_sex_consistency,
'date_born': RedcapConnector.check_birth_date_consistency,
'dead': RedcapConnector.check_dead_consistency,
'mpower_id': RedcapConnector.check_mpower_id_consistency,
'languages': RedcapConnector.check_languages_consistency
}
fields = []
# get fields which are true from redcap columns
fields_to_check = [k for k, v in model_to_dict(study_subject.study.redcap_columns).items() if v is True]
for field_to_check in fields_to_check:
field = field_checks[field_to_check](red_cap_subject, study_subject)
if field is not None:
fields.append(field)
result = None
if len(fields) > 0:
result = InconsistentSubject.create(smash_subject=study_subject, url=url, fields=fields)
return result
def create_redcap_link(self, pid, redcap_version, subject):
s_base_url = ensure_str(self.base_url)
s_redcap_version = ensure_str(redcap_version)
s_subject_nd_number = ensure_str(subject.nd_number)
s_pid = ensure_str(str(pid))
return f"{s_base_url}/redcap_v{s_redcap_version}/DataEntry/index.php?" \
f"pid={s_pid}&id={s_subject_nd_number}&page=demographics"
def get_red_cap_subjects(self):
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
query_data = self.get_subject_query_data()
data = self.execute_query(query_data)
result = []
for row in data:
if isinstance(row, dict):
redcap_subject = RedcapSubject()
redcap_subject.nd_number = row.get(self.nd_number_field)
if self.date_born_field != "":
redcap_subject.date_born = row.get(self.date_born_field)
if self.sex_field != "":
redcap_subject.sex = row.get(self.sex_field)
if self.dead_field != "":
redcap_subject.dead = (row.get(self.dead_field).lower() == "yes")
if self.m_power_id_field != "":
redcap_subject.mpower_id = row.get(self.m_power_id_field)
if self.language_1_field != "" and row.get(self.language_1_field):
redcap_subject.add_language(self.get_language(row.get(self.language_1_field)))
if self.language_2_field != "" and row[self.language_2_field]:
redcap_subject.add_language(self.get_language(row.get(self.language_2_field)))
if self.language_3_field != "" and row[self.language_3_field]:
redcap_subject.add_language(self.get_language(row.get(self.language_3_field)))
if self.language_4_field != "" and row[self.language_4_field]:
redcap_subject.add_language(self.get_language(row.get(self.language_4_field)))
visit = RedcapVisit()
visit.visit_number = 1
if self.virus_field != "":
if row.get(self.virus_field) == "Negative":
elif row.get(self.virus_field) == "Positive":
elif row.get(self.virus_field) == "Inconclusive":
visit.virus = "Inconclusive"
Piotr Gawron
committed
if self.sample_date_field != "":
date_str = row.get(self.sample_date_field)
if date_str is not None and date_str != "" and date_str != "Not done" and date_str != "Not known":
try:
visit.virus_collection_date = datetime.datetime.strptime(row.get(self.sample_date_field),
"%Y-%m-%d")
except ValueError:
logger.warning("Invalid date: " + row.get(self.sample_date_field))
visit.virus_collection_date = None
if self.iga_status_field != "":
visit.iga_status = row.get(self.iga_status_field)
if self.igg_status_field != "":
visit.igg_status = row.get(self.igg_status_field)
if self.sample_kit_id_field != "":
if row.get(self.sample_kit_id_field) != "":
Piotr Gawron
committed
redcap_subject.visits.append(visit)
result.append(redcap_subject)
for i in range(1, 9):
query_data = self.get_subject_query_data()
query_data["events[0]"] = "visit_" + str(i + self.study.redcap_first_visit_number) + "_arm_1"
data = self.execute_query(query_data)
if isinstance(data, dict):
break
for row in data:
if isinstance(row, dict):
nd_number = row.get(self.nd_number_field)
for redcap_subject in result:
if redcap_subject.nd_number == nd_number:
visit = RedcapVisit()
Piotr Gawron
committed
visit.visit_number = i + self.study.redcap_first_visit_number + 1
if self.virus_field != "":
if row.get(self.virus_field) == "Negative":
elif row.get(self.virus_field) == "Positive":
elif row.get(self.virus_field) == "Inconclusive":
visit.virus_inconclusive = "Inconclusive"
if self.sample_date_field != "":
date_str = row.get(self.sample_date_field)
if date_str is not None and date_str != "" and date_str != "Not done" \
and date_str != "Not known":
visit.virus_collection_date = datetime.datetime.strptime(
row.get(self.sample_date_field),
"%Y-%m-%d")
logger.warning("Invalid date: " + row.get(self.sample_date_field))
visit.virus_collection_date = None
if self.iga_status_field != "":
visit.iga_status = row.get(self.iga_status_field)
if self.igg_status_field != "":
visit.igg_status = row.get(self.igg_status_field)
if self.sample_kit_id_field != "":
if row.get(self.sample_kit_id_field) != "":
Piotr Gawron
committed
redcap_subject.visits.append(visit)
return result
def get_subject_query_data(self):
result = {
'token': self.token,
'content': 'record',
'format': 'json',
'type': 'flat',
'events[0]': 'visit_' + str(self.study.redcap_first_visit_number) + '_arm_1',
'rawOrLabel': 'label',
'rawOrLabelHeaders': 'raw',
'exportCheckboxLabel': 'false',
'exportSurveyFields': 'false',
'exportDataAccessGroups': 'false',
'returnFormat': 'json'
}
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
field_number = 0
if self.date_born_field != "":
result['fields[' + str(field_number) + ']'] = self.date_born_field
field_number += 1
if self.sex_field != "":
result['fields[' + str(field_number) + ']'] = self.sex_field
field_number += 1
if self.nd_number_field != "":
result['fields[' + str(field_number) + ']'] = self.nd_number_field
field_number += 1
if self.dead_field != "":
result['fields[' + str(field_number) + ']'] = self.dead_field
field_number += 1
if self.language_1_field != "":
result['fields[' + str(field_number) + ']'] = self.language_1_field
field_number += 1
if self.language_2_field != "":
result['fields[' + str(field_number) + ']'] = self.language_2_field
field_number += 1
if self.language_3_field != "":
result['fields[' + str(field_number) + ']'] = self.language_3_field
field_number += 1
if self.language_4_field != "":
result['fields[' + str(field_number) + ']'] = self.language_4_field
field_number += 1
if self.m_power_id_field != "":
result['fields[' + str(field_number) + ']'] = self.m_power_id_field
field_number += 1
if self.virus_field != "":
result['fields[' + str(field_number) + ']'] = self.virus_field
field_number += 1
if self.sample_kit_id_field != "":
result['fields[' + str(field_number) + ']'] = self.sample_kit_id_field
field_number += 1
Piotr Gawron
committed
if self.sample_date_field != "":
result['fields[' + str(field_number) + ']'] = self.sample_date_field
field_number += 1
if self.iga_status_field != "":
result['fields[' + str(field_number) + ']'] = self.iga_status_field
field_number += 1
if self.igg_status_field != "":
result['fields[' + str(field_number) + ']'] = self.igg_status_field
field_number += 1
return result
def get_language(self, name):
language = self.language_by_name.get(name.lower())
if language is None:
logger.warning("Unknown language: " + name)
return language
def execute_query(self, query_data, is_json=True):
curl_connection = pycurl.Curl()
curl_connection.setopt(pycurl.CAINFO, certifi.where())
curl_connection.setopt(curl_connection.URL, self.base_url + "/api/")
curl_connection.setopt(curl_connection.HTTPPOST, list(query_data.items()))
curl_connection.setopt(curl_connection.WRITEFUNCTION, buf.write)
curl_connection.perform()
curl_connection.close()
val = buf.getvalue()
data = json.loads(val)
else:
data = buf.getvalue()
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
buf.close()
return data
def get_project_id(self):
query_data = {
'token': self.token,
'content': 'project',
'format': 'json',
'returnFormat': 'json'
}
data = self.execute_query(query_data)
return data['project_id']
def get_redcap_version(self):
query_data = {
'token': self.token,
'content': 'version'
}
data = self.execute_query(query_data, is_json=False)
return data
def is_valid(self):
if not self.token:
return False
if not self.base_url:
return False
return True
class RedCapRefreshJob(CronJobBase):
RUN_EVERY_MINUTES = 60
schedule = Schedule(run_every_mins=RUN_EVERY_MINUTES)
code = 'web.red_cap_hourly_refresh' # a unique code
@timeout_decorator.timeout(CRON_JOB_TIMEOUT)
def do(self):
connector = RedcapConnector()
if connector.is_valid():
logger.info("Refreshing redcap data")
connector.refresh_inconsistent()
connector.refresh_missing()
logger.info("Redcap data refreshed")
return "ok"
else:
logger.info("Redcap connector is down")
return "connector down"