Skip to content
Snippets Groups Projects
redcap_connector.py 27.6 KiB
Newer Older
import io
import logging
from typing import Optional, List
import certifi
import timeout_decorator
from django.forms.models import model_to_dict
from django_cron import CronJobBase, Schedule
from six import ensure_str

from web.models import ConfigurationItem, StudySubject, Language, AppointmentType, Appointment, Visit, Study, \
    Provenance, Worker, User
from web.models.constants import REDCAP_TOKEN_CONFIGURATION_TYPE, \
    REDCAP_BASE_URL_CONFIGURATION_TYPE, CRON_JOB_TIMEOUT, RED_CAP_LANGUAGE_4_FIELD_TYPE, \
    RED_CAP_LANGUAGE_3_FIELD_TYPE, RED_CAP_LANGUAGE_2_FIELD_TYPE, RED_CAP_LANGUAGE_1_FIELD_TYPE, \
    RED_CAP_MPOWER_ID_FIELD_TYPE, RED_CAP_DEAD_FIELD_TYPE, RED_CAP_SEX_FIELD_TYPE, RED_CAP_DATE_BORN_FIELD_TYPE, \
    RED_CAP_ND_NUMBER_FIELD_TYPE, RED_CAP_VIRUS_FIELD_TYPE, GLOBAL_STUDY_ID, RED_CAP_SAMPLE_DATE_FIELD_TYPE, \
    RED_CAP_KIT_ID_FIELD_TYPE, RED_CAP_IGA_STATUS_FIELD_TYPE, RED_CAP_IGG_STATUS_FIELD_TYPE, IMPORTER_USER, \
    IMPORT_APPOINTMENT_TYPE
from web.models.inconsistent_subject import InconsistentField, InconsistentSubject
from web.models.missing_subject import MissingSubject

logger = logging.getLogger(__name__)


    url = None
    nd_number = None
    date_born = None
    sex = None
    dead = None
    languages = None
    mpower_id = None

    def __init__(self):
        self.languages = []

    def add_language(self, language):
        if language is not None:
            self.languages.append(language)


    virus_collection_date = None
    iga_status = None
    igg_status = None
    virus_inconclusive = None
def different_string(string1, string2):
Jacek Lebioda's avatar
Jacek Lebioda committed
    if type(string1) == bytes:
        s1 = string1.decode('utf8')
    else:
        s1 = string1
    if type(string2) == bytes:
        s2 = string1.decode('utf8')
    else:
        s2 = string2
    if s1 is None:
        s1 = ""
    if s2 is None:
        s2 = ""
    return s1.strip() != s2.strip()
def date_equals(date1: str, date2: datetime) -> bool:
    if (date1 is None or date1 == '') and date2 is None:
    if date1 is None or date1 == '' or date2 is None:
    return date1 == date2.strftime("%Y-%m-%d")
    def __init__(self):
        self.token = None
        self.base_url = None
        items = ConfigurationItem.objects.filter(type=REDCAP_TOKEN_CONFIGURATION_TYPE)
        if len(items) > 0:
            if items[0].value:
                self.token = items[0].value
        items = ConfigurationItem.objects.filter(type=REDCAP_BASE_URL_CONFIGURATION_TYPE)
        if len(items) > 0:
            if items[0].value:
                self.base_url = items[0].value
        self.language_by_name = {}
        languages = Language.objects.all()
        for language in languages:
            self.language_by_name[language.name.lower()] = language

        self.date_born_field = ConfigurationItem.objects.get(type=RED_CAP_DATE_BORN_FIELD_TYPE).value
        self.sex_field = ConfigurationItem.objects.get(type=RED_CAP_SEX_FIELD_TYPE).value
        self.nd_number_field = ConfigurationItem.objects.get(type=RED_CAP_ND_NUMBER_FIELD_TYPE).value
        self.dead_field = ConfigurationItem.objects.get(type=RED_CAP_DEAD_FIELD_TYPE).value
        self.language_1_field = ConfigurationItem.objects.get(type=RED_CAP_LANGUAGE_1_FIELD_TYPE).value
        self.language_2_field = ConfigurationItem.objects.get(type=RED_CAP_LANGUAGE_2_FIELD_TYPE).value
        self.language_3_field = ConfigurationItem.objects.get(type=RED_CAP_LANGUAGE_3_FIELD_TYPE).value
        self.language_4_field = ConfigurationItem.objects.get(type=RED_CAP_LANGUAGE_4_FIELD_TYPE).value
        self.m_power_id_field = ConfigurationItem.objects.get(type=RED_CAP_MPOWER_ID_FIELD_TYPE).value
        self.virus_field = ConfigurationItem.objects.get(type=RED_CAP_VIRUS_FIELD_TYPE).value
        self.sample_kit_id_field = ConfigurationItem.objects.get(type=RED_CAP_KIT_ID_FIELD_TYPE).value
        self.sample_date_field = ConfigurationItem.objects.get(type=RED_CAP_SAMPLE_DATE_FIELD_TYPE).value
        self.iga_status_field = ConfigurationItem.objects.get(type=RED_CAP_IGA_STATUS_FIELD_TYPE).value
        self.igg_status_field = ConfigurationItem.objects.get(type=RED_CAP_IGG_STATUS_FIELD_TYPE).value
        self.study = Study.objects.get(id=GLOBAL_STUDY_ID)
Carlos Vega's avatar
Carlos Vega committed
        self.importer_user = None

        importer_user_name = ConfigurationItem.objects.get(type=IMPORTER_USER).value
        if importer_user_name is not None and importer_user_name != '':
Carlos Vega's avatar
Carlos Vega committed
            user = User.objects.filter(username=importer_user_name)
            if user is None:
                logger.warning("User does not exist: " + importer_user_name)
Carlos Vega's avatar
Carlos Vega committed
            else:
                self.importer_user = Worker.objects.filter(user=user)

    def find_missing(self):
        pid = self.get_project_id()
        redcap_version = self.get_redcap_version()

        red_cap_subjects = self.get_red_cap_subjects()
        red_cap_subject_by_nd = {}
        for subject in red_cap_subjects:
            red_cap_subject_by_nd[subject.nd_number] = subject

        smash_subjects = StudySubject.objects.exclude(nd_number='')
        smash_subject_by_nd = {}
        for subject in smash_subjects:
            smash_subject_by_nd[subject.nd_number] = subject

        result = []
        for subject in red_cap_subjects:
            if smash_subject_by_nd.get(subject.nd_number) is None:
                url = self.create_redcap_link(pid, redcap_version, subject)
                result.append(MissingSubject.create(red_cap_subject=subject, smash_subject=None, url=url))

        for subject in smash_subjects:
            if red_cap_subject_by_nd.get(subject.nd_number) is None:
                result.append(MissingSubject.create(red_cap_subject=None, smash_subject=subject))

        return result

    @staticmethod
    def add_missing(missing_subjects):
        MissingSubject.objects.filter(ignore=False).delete()
        ignored_missing_subjects = MissingSubject.objects.all()
        ignored_redcap_by_nd_number = {}
        ignored_smash_by_nd_number = {}
        for missing_subject in ignored_missing_subjects:
            if missing_subject.redcap_id is not None:
                ignored_redcap_by_nd_number[missing_subject.redcap_id] = missing_subject
            if missing_subject.subject is not None:
                ignored_smash_by_nd_number[missing_subject.subject.nd_number] = missing_subject

        for missing_subject in missing_subjects:
            ignored = False
            if missing_subject.redcap_id is not None and ignored_redcap_by_nd_number.get(
                    missing_subject.redcap_id) is not None:
                ignored = True
            if missing_subject.subject is not None and ignored_smash_by_nd_number.get(
                    missing_subject.subject.nd_number) is not None:
                ignored = True
            if not ignored:
                MissingSubject.objects.create(subject=missing_subject.subject, redcap_id=missing_subject.redcap_id,
                                              redcap_url=missing_subject.redcap_url)

    @staticmethod
    def add_inconsistent(inconsistent_subjects):
        InconsistentField.objects.all().delete()
        InconsistentSubject.objects.all().delete()

        for inconsistent_subject in inconsistent_subjects:
            subject = InconsistentSubject.objects.create(subject=inconsistent_subject.subject,
                                                         redcap_url=inconsistent_subject.redcap_url)
            for field in inconsistent_subject.fields:
                InconsistentField.objects.create(
                    name=field.name,
                    smash_value=field.smash_value,
                    redcap_value=field.redcap_value,
                    inconsistent_subject=subject)

    def refresh_missing(self):
        missing = self.find_missing()
        self.add_missing(missing)

    def refresh_inconsistent(self):
        inconsistent = self.find_inconsistent()
        self.add_inconsistent(inconsistent)
    def find_inconsistent(self):
        appointment_type_code_to_finish = ConfigurationItem.objects.get(type=IMPORT_APPOINTMENT_TYPE).value
        appointment_type_to_finish = None
        if appointment_type_code_to_finish is not None:
            appointment_types = AppointmentType.objects.filter(code=appointment_type_code_to_finish)
            if len(appointment_types) > 0:
                appointment_type_to_finish = appointment_types[0]

        pid = self.get_project_id()
        redcap_version = self.get_redcap_version()

        red_cap_subjects = self.get_red_cap_subjects()
        red_cap_subject_by_nd = {}
        for subject in red_cap_subjects:
            red_cap_subject_by_nd[subject.nd_number] = subject

        smash_subjects = StudySubject.objects.exclude(nd_number='')

        result = []
        for subject in smash_subjects:
            red_cap_subject = red_cap_subject_by_nd.get(subject.nd_number)
            if red_cap_subject is not None:
                url = self.create_redcap_link(pid, redcap_version, subject)

                inconsistent_subject = self.create_inconsistency_subject(red_cap_subject, subject, url)
                if inconsistent_subject is not None:
                    result.append(inconsistent_subject)
                if appointment_type_to_finish is not None:
                    for visit in red_cap_subject.visits:
                        smasch_visits = Visit.objects.filter(visit_number=visit.visit_number, subject=subject)
                        smasch_appointments = Appointment.objects.filter(
                            visit__in=smasch_visits,
                            appointment_types=appointment_type_to_finish,
                            status=Appointment.APPOINTMENT_STATUS_SCHEDULED)

                        for smasch_appointment in smasch_appointments:
                            smasch_appointment.mark_as_finished()
                            if not smasch_appointment.visit.is_finished:
                                description = '{} changed from "{}" to "{}"'.format(
                                    'is_finished',
                                    smasch_appointment.visit.is_finished,
                                    True)
                                p = Provenance(modified_table=Visit._meta.db_table,
                                               modified_table_id=smasch_appointment.visit.id,
                                               modification_author=self.importer_user,
                                               previous_value=smasch_appointment.visit.is_finished,
                                               new_value=True,
                                               modification_description=description,
                                               modified_field='is_finished')
                                smasch_appointment.visit.is_finished = True
                                smasch_appointment.visit.save()
                        self.update_data_from_redcap(subject, visit)

        return result
    def update_data_from_redcap(self, subject: StudySubject, visit: RedcapVisit) -> List[Provenance]:
        result = []
        if visit.virus is not None:
            changes = []
            for i in range(1, 6):
                if visit.visit_number == i:
                    result_label = "Virus {} RT-PCR".format(i - 1)
                    updated_label = "Visit {} RT-PCR update date".format(i - 1)
                    collect_label = "Visit {} RT-PCR collection date".format(i - 1)
                    iga_label = "Visit {} IgA Status".format(i - 1)
                    igg_label = "Visit {} IgG Status".format(i - 1)
                    if subject.get_custom_field_value(result_label) != visit.virus:
                        changes.extend([(result_label, visit.virus),
                                        (updated_label, datetime.datetime.now().strftime("%Y-%m-%d"))])

                    if not date_equals(subject.get_custom_field_value(collect_label), visit.virus_collection_date):
                        changes.extend([(collect_label, visit.virus_collection_date.strftime("%Y-%m-%d"))])

                    if subject.get_custom_field_value(iga_label) != visit.iga_status:
                        changes.extend([(iga_label, visit.iga_status)])
                    if subject.get_custom_field_value(igg_label) != visit.igg_status:
                        changes.extend([(igg_label, visit.igg_status)])

            if len(changes) > 0:
                for field, new_value in changes:
                    old_value = subject.get_custom_field_value(field)
                    description = '{} changed from "{}" to "{}"'.format(field, old_value, new_value)
                    p = Provenance(modified_table=StudySubject._meta.db_table,
                                   modified_table_id=subject.id,
                                   modification_author=self.importer_user,
                                   previous_value=old_value,
                                   new_value=new_value,
                                   modification_description=description,
                                   modified_field=field)
                    subject.set_custom_field_value(field, new_value)
                    p.save()
                    result.append(p)
                subject.save()
    def check_sex_consistency(red_cap_subject, study_subject):
        if study_subject.subject.sex != red_cap_subject.sex:
            return InconsistentField.create("sex", study_subject.subject.sex, red_cap_subject.sex)

    @staticmethod
    def check_birth_date_consistency(red_cap_subject, study_subject):
        if study_subject.subject.date_born is not None:
            subject_date_born = study_subject.subject.date_born.strftime('%Y-%m-%d')
        redcap_subject_date_born = red_cap_subject.date_born
        if redcap_subject_date_born is None:
            redcap_subject_date_born = ""
        if len(redcap_subject_date_born) > 10:
            redcap_subject_date_born = redcap_subject_date_born[:10]
        if subject_date_born != redcap_subject_date_born:
            return InconsistentField.create("date of birth", subject_date_born, redcap_subject_date_born)

    @staticmethod
    def check_dead_consistency(red_cap_subject, study_subject):
        if study_subject.subject.dead != red_cap_subject.dead:
            return InconsistentField.create("dead", str(study_subject.subject.dead), str(red_cap_subject.dead))

    @staticmethod
    def check_mpower_id_consistency(red_cap_subject: RedcapSubject, study_subject: StudySubject) \
            -> Optional[InconsistentField]:
        if different_string(study_subject.get_custom_field_value('MPower ID'), red_cap_subject.mpower_id):
            return InconsistentField.create("mpower id", study_subject.get_custom_field_value('MPower ID'),
                                            red_cap_subject.mpower_id)

    @staticmethod
    def check_languages_consistency(red_cap_subject, study_subject):
        missing_language = False
        if len(red_cap_subject.languages) < 4:
            for language in study_subject.subject.languages.all():
                if language not in red_cap_subject.languages:
                    missing_language = True
        for language in red_cap_subject.languages:
            if language not in study_subject.subject.languages.all():
                missing_language = True
        if missing_language:
            subject_languages = ""
            for language in study_subject.subject.languages.all():
                subject_languages += language.name + ", "
            red_cap_subject_languages = ""
            for language in red_cap_subject.languages:
                red_cap_subject_languages += language.name + ", "
            return InconsistentField.create("languages", subject_languages, red_cap_subject_languages)

    @staticmethod
    def create_inconsistency_subject(red_cap_subject: RedcapSubject, study_subject: StudySubject,
                                     url: str) -> InconsistentSubject:
        field_checks = {
            'sex': RedcapConnector.check_sex_consistency,
            'date_born': RedcapConnector.check_birth_date_consistency,
            'dead': RedcapConnector.check_dead_consistency,
            'mpower_id': RedcapConnector.check_mpower_id_consistency,
            'languages': RedcapConnector.check_languages_consistency
        }

        fields = []

        # get fields which are true from redcap columns
Jacek Lebioda's avatar
Jacek Lebioda committed
        fields_to_check = [k for k, v in model_to_dict(study_subject.study.redcap_columns).items() if v is True]

        for field_to_check in fields_to_check:
            field = field_checks[field_to_check](red_cap_subject, study_subject)
            if field is not None:
                fields.append(field)
        result = None
        if len(fields) > 0:
            result = InconsistentSubject.create(smash_subject=study_subject, url=url, fields=fields)
        return result

    def create_redcap_link(self, pid, redcap_version, subject):
        s_base_url = ensure_str(self.base_url)
        s_redcap_version = ensure_str(redcap_version)
        s_subject_nd_number = ensure_str(subject.nd_number)
        s_pid = ensure_str(str(pid))
Piotr Gawron's avatar
Piotr Gawron committed
        return f"{s_base_url}/redcap_v{s_redcap_version}/DataEntry/index.php?" \
               f"pid={s_pid}&id={s_subject_nd_number}&page=demographics"

    def get_red_cap_subjects(self):
        query_data = self.get_subject_query_data()
        data = self.execute_query(query_data)
        result = []
        for row in data:
            if isinstance(row, dict):
                redcap_subject = RedcapSubject()
                redcap_subject.nd_number = row.get(self.nd_number_field)
                if self.date_born_field != "":
                    redcap_subject.date_born = row.get(self.date_born_field)
                if self.sex_field != "":
                    redcap_subject.sex = row.get(self.sex_field)
                if self.dead_field != "":
                    redcap_subject.dead = (row.get(self.dead_field).lower() == "yes")
                if self.m_power_id_field != "":
                    redcap_subject.mpower_id = row.get(self.m_power_id_field)
                if self.language_1_field != "" and row.get(self.language_1_field):
                    redcap_subject.add_language(self.get_language(row.get(self.language_1_field)))
                if self.language_2_field != "" and row[self.language_2_field]:
                    redcap_subject.add_language(self.get_language(row.get(self.language_2_field)))
                if self.language_3_field != "" and row[self.language_3_field]:
                    redcap_subject.add_language(self.get_language(row.get(self.language_3_field)))
                if self.language_4_field != "" and row[self.language_4_field]:
                    redcap_subject.add_language(self.get_language(row.get(self.language_4_field)))
                visit = RedcapVisit()
                visit.visit_number = 1
                if self.virus_field != "":
                    if row.get(self.virus_field) == "Negative":
                        visit.virus = "Negative"
                    elif row.get(self.virus_field) == "Positive":
                        visit.virus = "Positive"
                    elif row.get(self.virus_field) == "Inconclusive":
                        visit.virus = "Inconclusive"
                    date_str = row.get(self.sample_date_field)
                    if date_str is not None and date_str != "" and date_str != "Not done" and date_str != "Not known":
                        try:
                            visit.virus_collection_date = datetime.datetime.strptime(row.get(self.sample_date_field),
                                                                                     "%Y-%m-%d")
                        except ValueError:
                            logger.warning("Invalid date: " + row.get(self.sample_date_field))
                            visit.virus_collection_date = None

                if self.iga_status_field != "":
                    visit.iga_status = row.get(self.iga_status_field)
                if self.igg_status_field != "":
                    visit.igg_status = row.get(self.igg_status_field)

                if self.sample_kit_id_field != "":
                    if row.get(self.sample_kit_id_field) != "":
            query_data = self.get_subject_query_data()
            query_data["events[0]"] = "visit_" + str(i + self.study.redcap_first_visit_number) + "_arm_1"
            data = self.execute_query(query_data)
            if isinstance(data, dict):
                break
            for row in data:
                if isinstance(row, dict):
                    nd_number = row.get(self.nd_number_field)
                    for redcap_subject in result:
                        if redcap_subject.nd_number == nd_number:
                            visit = RedcapVisit()
                            visit.visit_number = i + self.study.redcap_first_visit_number + 1
                            if self.virus_field != "":
                                if row.get(self.virus_field) == "Negative":
                                    visit.virus = "Negative"
                                elif row.get(self.virus_field) == "Positive":
                                    visit.virus = "Positive"
                                elif row.get(self.virus_field) == "Inconclusive":
                                    visit.virus_inconclusive = "Inconclusive"
                            if self.sample_date_field != "":
                                date_str = row.get(self.sample_date_field)
Piotr Gawron's avatar
Piotr Gawron committed
                                if date_str is not None and date_str != "" and date_str != "Not done" \
                                        and date_str != "Not known":
                                        visit.virus_collection_date = datetime.datetime.strptime(
                                            row.get(self.sample_date_field),
                                            "%Y-%m-%d")
                                    except ValueError:
                                        logger.warning("Invalid date: " + row.get(self.sample_date_field))
                                        visit.virus_collection_date = None

                            if self.iga_status_field != "":
                                visit.iga_status = row.get(self.iga_status_field)
                            if self.igg_status_field != "":
                                visit.igg_status = row.get(self.igg_status_field)
                            if self.sample_kit_id_field != "":
                                if row.get(self.sample_kit_id_field) != "":

        return result

    def get_subject_query_data(self):
        result = {
            'token': self.token,
            'content': 'record',
            'format': 'json',
            'type': 'flat',
            'events[0]': 'visit_' + str(self.study.redcap_first_visit_number) + '_arm_1',
            'rawOrLabel': 'label',
            'rawOrLabelHeaders': 'raw',
            'exportCheckboxLabel': 'false',
            'exportSurveyFields': 'false',
            'exportDataAccessGroups': 'false',
            'returnFormat': 'json'
        }
        field_number = 0
        if self.date_born_field != "":
            result['fields[' + str(field_number) + ']'] = self.date_born_field
            field_number += 1
        if self.sex_field != "":
            result['fields[' + str(field_number) + ']'] = self.sex_field
            field_number += 1
        if self.nd_number_field != "":
            result['fields[' + str(field_number) + ']'] = self.nd_number_field
            field_number += 1
        if self.dead_field != "":
            result['fields[' + str(field_number) + ']'] = self.dead_field
            field_number += 1
        if self.language_1_field != "":
            result['fields[' + str(field_number) + ']'] = self.language_1_field
            field_number += 1
        if self.language_2_field != "":
            result['fields[' + str(field_number) + ']'] = self.language_2_field
            field_number += 1
        if self.language_3_field != "":
            result['fields[' + str(field_number) + ']'] = self.language_3_field
            field_number += 1
        if self.language_4_field != "":
            result['fields[' + str(field_number) + ']'] = self.language_4_field
            field_number += 1
        if self.m_power_id_field != "":
            result['fields[' + str(field_number) + ']'] = self.m_power_id_field
            field_number += 1
        if self.virus_field != "":
            result['fields[' + str(field_number) + ']'] = self.virus_field
            field_number += 1
        if self.sample_kit_id_field != "":
            result['fields[' + str(field_number) + ']'] = self.sample_kit_id_field
            field_number += 1
        if self.sample_date_field != "":
            result['fields[' + str(field_number) + ']'] = self.sample_date_field
            field_number += 1
        if self.iga_status_field != "":
            result['fields[' + str(field_number) + ']'] = self.iga_status_field
            field_number += 1
        if self.igg_status_field != "":
            result['fields[' + str(field_number) + ']'] = self.igg_status_field
            field_number += 1
        return result

    def get_language(self, name):
        language = self.language_by_name.get(name.lower())
        if language is None:
            logger.warning("Unknown language: " + name)
        return language

    def execute_query(self, query_data, is_json=True):
        buf = io.BytesIO()
        curl_connection = pycurl.Curl()
        curl_connection.setopt(pycurl.CAINFO, certifi.where())
        curl_connection.setopt(curl_connection.URL, self.base_url + "/api/")
Jacek Lebioda's avatar
Jacek Lebioda committed
        curl_connection.setopt(curl_connection.HTTPPOST, list(query_data.items()))
        curl_connection.setopt(curl_connection.WRITEFUNCTION, buf.write)
        curl_connection.perform()
        curl_connection.close()
            val = buf.getvalue()
            data = json.loads(val)
        buf.close()
        return data

    def get_project_id(self):
        query_data = {
            'token': self.token,
            'content': 'project',
            'format': 'json',
            'returnFormat': 'json'
        }
        data = self.execute_query(query_data)
        return data['project_id']

    def get_redcap_version(self):
        query_data = {
            'token': self.token,
            'content': 'version'
        }
        data = self.execute_query(query_data, is_json=False)
        return data

    def is_valid(self):
        if not self.token:
            return False
        if not self.base_url:
            return False

        return True


class RedCapRefreshJob(CronJobBase):
    RUN_EVERY_MINUTES = 60
    schedule = Schedule(run_every_mins=RUN_EVERY_MINUTES)
    code = 'web.red_cap_hourly_refresh'  # a unique code

    # pylint: disable=no-self-use
    @timeout_decorator.timeout(CRON_JOB_TIMEOUT)
    def do(self):
        connector = RedcapConnector()
        if connector.is_valid():
            logger.info("Refreshing redcap data")
            connector.refresh_inconsistent()
            connector.refresh_missing()
            logger.info("Redcap data refreshed")
            return "ok"
        else:
            logger.info("Redcap connector is down")
            return "connector down"