# coding=utf-8 import datetime import locale import platform import re from contextlib import contextmanager from django.db import models from web.docx_helper import process_file from web.models import Appointment, Visit, StudySubject, Worker, Voucher from web.models.constants import MAIL_TEMPLATE_CONTEXT_CHOICES, MAIL_TEMPLATE_CONTEXT_APPOINTMENT, \ MAIL_TEMPLATE_CONTEXT_SUBJECT, MAIL_TEMPLATE_CONTEXT_VISIT, MAIL_TEMPLATE_CONTEXT_VOUCHER, DEFAULT_LOCALE_NAME, \ GLOBAL_STUDY_ID from web.models.custom_data import CustomStudySubjectField DATE_FORMAT_FULL = "%A %d %B %Y" DATETIME_FORMAT = "%A %d %B %Y, %H:%M" DATE_FORMAT_SHORT = "%d.%m.%Y" DATE_FORMAT_TIME = "%H:%M" def date_format_encoding(): return locale.getlocale(locale.LC_TIME)[1] or locale.getpreferredencoding() @contextmanager def setlocale(name): saved = locale.getlocale(locale.LC_TIME) try: yield locale.setlocale(locale.LC_TIME, name) finally: locale.setlocale(locale.LC_TIME, saved) def get_formatted_time(time_format: str) -> str: return datetime.datetime.now().strftime(time_format) def date_to_str(date: datetime, date_format: str) -> str: if date is not None: return date.strftime(date_format) else: return "" def get_field_id(field: CustomStudySubjectField) -> str: return "##S_C_" + re.sub('[^0-9a-zA-Z]+', '_', field.name).upper() + "##" def get_mails_template_generic_tags(): return [ ("##DATE_FULL##", "Current date when the mail will be generated (long format)", get_formatted_time(DATE_FORMAT_FULL)), ("##DATE_SHORT##", "Current date when the mail will be generated (short format)", get_formatted_time(DATE_FORMAT_SHORT)), ("##WORKER##", "The full name of the currently logged in user", ""), ("##WORKER_EMAIL##", "Email address of the currently logged in user", "") ] def get_mails_template_subject_tags(): result = [ ("##S_FULL_NAME##", "Subject's full name", "first_name last_name"), ("##S_FIRST_NAME##", "Subject's first name", ""), ("##S_LAST_NAME##", "Subject's last name", ""), ("##S_ADDRESS##", "Subject's address", "street name and number"), ("##S_CITY##", "Subject's city of residence", ""), ("##S_POST_CODE##", "Subject's post code of residence", ""), ("##S_COUNTRY##", "Subject's country of residence", ""), ("##S_SEX##", "Subject's gender", "Male/Female"), ("##S_TYPE##", "Subject's type", "CONTROL/PATIENT"), ("##S_DATE_BORN##", "Subject's date of birth", get_formatted_time(DATE_FORMAT_SHORT)), ("##S_EMAIL##", "Subject's email address", ""), ("##S_PHONE_NUMBER##", "Subject's phone number", ""), ("##S_PHONE_NUMBER_2##", "Subject's second phone number", ""), ("##S_PHONE_NUMBER_3##", "Subject's third phone number", ""), ("##S_MAIL_LANGUAGE##", "Subject's preferred language for written communication", ""), ("##S_KNOWN_LANGUAGES##", "List of languages known by the subject", "comma separated"), ("##S_SCREENING_NUMBER##", "Subject's screening number", ""), ("##S_ND_NUMBER##", "Subject's ND number", ""), ("##S_DATE_ADDED##", "Subject's date of creation", get_formatted_time(DATE_FORMAT_SHORT)), ("##S_HEALTH_PARTNER_NAME##", "Name of the health partner", ""), ("##S_HEALTH_PARTNER_ADDRESS##", "Address of the health partner", ""), ("##S_HEALTH_PARTNER_ZIP_CODE##", "Zip code of the health partner", ""), ("##S_HEALTH_PARTNER_CITY##", "City of the health partner", ""), ] for field in CustomStudySubjectField.objects.filter(study__id=GLOBAL_STUDY_ID).iterator(): field_id = get_field_id(field) result.append((field_id, field.name, field.default_value)) return result def get_mails_template_visit_tags(): return [ ("##V_DATE_START_FULL##", "Visit's start date", get_formatted_time(DATETIME_FORMAT)), ("##V_DATE_START_SHORT##", "Visit's start date", get_formatted_time(DATE_FORMAT_SHORT)), ("##V_DATE_ENDS_FULL##", "Visit's end date", get_formatted_time(DATETIME_FORMAT)), ("##V_DATE_ENDS_SHORT##", "Visit's end date", get_formatted_time(DATE_FORMAT_SHORT)), ] def get_mails_template_appointment_tags(): return [ ("##A_DATE_FULL##", "Appointment's date and time", get_formatted_time(DATETIME_FORMAT)), ("##A_DATE_SHORT##", "Appointment's date", get_formatted_time(DATE_FORMAT_SHORT)), ("##A_TIME##", "Appointment's time", get_formatted_time(DATE_FORMAT_TIME)), ("##A_FLYING_TEAM##", "Appointment's flying team location", ""), ("##A_LOCATION##", "Appointment's location", "value can be 'Flying Team'"), ("##A_LOCATION_OR_FLYINGTEAM##", "Appointment's real location", "if flying team then returns flying team exact location, otherwise returns location name"), ("##A_STATUS##", "Appointment's status", ""), ("##A_WORKER##", "Worker conducting the assessment", "first_name last_name"), ("##A_WORKER_PHONE##", "Phone number of the worker conducting the assessment", ""), ("##A_WORKER_EMAIL##", "Email address of the worker conducting the assessment", ""), ("##A_ROOM##", "Appointment's room", 'room_number address city'), ("##A_LENGTH##", "Appointment's duration", 'integer, value in minutes'), ("##A_TYPES##", "Appointment's types", "comma separated"), ] def get_mails_template_voucher_tags(): return [ ("##C_NUMBER##", "Number", ''), ("##C_PATIENT_NAME##", "Voucher Partner name", ''), ("##C_VOUCHER_TYPE##", "Voucher type", ''), ("##C_ISSUE_DATE_SHORT##", "Issue date", get_formatted_time(DATE_FORMAT_SHORT)), ("##C_EXPIRY_START_SHORT##", "Expiry date", get_formatted_time(DATE_FORMAT_SHORT)), ("##C_PARTNER_NAME##", "Voucher Partner name", ''), ("##C_PARTNER_ADDRESS##", "Voucher Partner address", ''), ("##C_PARTNER_CITY##", "Voucher Partner city", ''), ("##C_PARTNER_POSTAL_CODE##", "Voucher Partner postal code", ''), ("##C_PARTNER_COUNTRY##", "Voucher Partner country", ''), ("##C_PARTNER_PHONE##", "Voucher Partner phone", ''), ("##C_HOURS##", "Hours", ''), ] class MailTemplate(models.Model): name = models.CharField(max_length=255) context = models.CharField(max_length=1, choices=MAIL_TEMPLATE_CONTEXT_CHOICES) language = models.ForeignKey("web.Language", on_delete=models.CASCADE, blank=True, null=True) template_file = models.FileField(upload_to='templates/') @staticmethod def get_appointment_mail_templates(languages): return MailTemplate.get_mail_templates_for_context(languages, MAIL_TEMPLATE_CONTEXT_APPOINTMENT) @staticmethod def get_subject_mail_templates(languages): return MailTemplate.get_mail_templates_for_context(languages, MAIL_TEMPLATE_CONTEXT_SUBJECT) @staticmethod def get_voucher_mail_templates(languages): return MailTemplate.get_mail_templates_for_context(languages, MAIL_TEMPLATE_CONTEXT_VOUCHER) @staticmethod def get_visit_mail_templates(languages): return MailTemplate.get_mail_templates_for_context(languages, MAIL_TEMPLATE_CONTEXT_VISIT) @staticmethod def get_mail_templates_for_context(languages, context): languages_names = [language.name for language in languages] templates = list(MailTemplate.objects.filter(context=context).all()) active_templates = [] disabled_templates = [] for template in templates: if template.language is None: if len(languages) == 0: active_templates.append(template) else: disabled_templates.append(template) elif template.language.name in languages_names: active_templates.append(template) else: disabled_templates.append(template) active_templates.sort(key=lambda x: languages_names.index(x.language.name) if x.language is not None else -1) return active_templates, disabled_templates def apply(self, instance, user, stream): appointment = None visit = None study_subject = None voucher = None if isinstance(instance, Appointment): appointment = instance visit = instance.visit # general appointment case if visit is not None: study_subject = visit.subject elif isinstance(instance, Visit): visit = instance study_subject = visit.subject elif isinstance(instance, StudySubject): study_subject = instance elif isinstance(instance, Voucher): voucher = instance # set locale to get correct date format locale_name = self.get_locale_name() with setlocale(locale_name): replacements = {} replacements.update(self.get_generic_replacements(Worker.get_by_user(user))) replacements.update(self.get_appointment_replacements(appointment)) replacements.update(self.get_visit_replacements(visit)) replacements.update(self.get_subject_replacements(study_subject)) replacements.update(self.get_voucher_replacements(voucher)) process_file(self.template_file.path, stream, replacements) return stream def get_locale_name(self): if self.language is None: locale_name = DEFAULT_LOCALE_NAME else: locale_name = self.language.locale if platform.system() == 'Windows': locale_name = self.language.windows_locale_name return locale_name @staticmethod def get_generic_replacements(worker): current_datetime = datetime.datetime.now() email = '' if worker is not None: email = worker.email return { "##DATE_FULL##": current_datetime.strftime(DATE_FORMAT_FULL), "##DATE_SHORT##": current_datetime.strftime(DATE_FORMAT_SHORT), "##WORKER##": str(worker), "##WORKER_EMAIL##": email } @staticmethod def get_appointment_replacements(appointment): if appointment is None: return {} if appointment.worker_assigned is not None: worker_phone_number = appointment.worker_assigned.phone_number worker_email_address = appointment.worker_assigned.email else: worker_phone_number = "" worker_email_address = "" if appointment.datetime_when is not None: appointment_date_full = appointment.datetime_when.strftime(DATETIME_FORMAT) appointment_date_short = appointment.datetime_when.strftime(DATE_FORMAT_SHORT) appointment_date_time = appointment.datetime_when.strftime(DATE_FORMAT_TIME) else: appointment_date_full = appointment_date_short = appointment_date_time = "" return { "##A_DATE_FULL##": appointment_date_full, "##A_DATE_SHORT##": appointment_date_short, "##A_TIME##": appointment_date_time, "##A_FLYING_TEAM##": str(appointment.flying_team), "##A_STATUS##": appointment.get_status_display(), "##A_LOCATION##": appointment.location.name, "##A_LOCATION_OR_FLYINGTEAM##": str(appointment.flying_team) or appointment.location.name, "##A_WORKER##": str(appointment.worker_assigned), '##A_WORKER_PHONE##': worker_phone_number, '##A_WORKER_EMAIL##': worker_email_address, "##A_ROOM##": str(appointment.room), "##A_LENGTH##": str(appointment.length), "##A_TYPES##": ", ".join([a.description for a in appointment.appointment_types.all()]) } @staticmethod def get_visit_replacements(visit): if visit is not None: return { "##V_DATE_START_FULL##": visit.datetime_begin.strftime(DATETIME_FORMAT), "##V_DATE_START_SHORT##": visit.datetime_begin.strftime(DATE_FORMAT_SHORT), "##V_DATE_ENDS_FULL##": visit.datetime_end.strftime(DATETIME_FORMAT), "##V_DATE_ENDS_SHORT##": visit.datetime_end.strftime(DATE_FORMAT_SHORT), } return {} @staticmethod def get_subject_replacements(study_subject: StudySubject = None) -> dict: result = {} if study_subject is not None: date_born = date_to_str(study_subject.subject.date_born, DATE_FORMAT_SHORT) result = { "##S_FULL_NAME##": str(study_subject), "##S_FIRST_NAME##": study_subject.subject.first_name, "##S_LAST_NAME##": study_subject.subject.last_name, "##S_ADDRESS##": study_subject.subject.address, "##S_CITY##": study_subject.subject.city, "##S_COUNTRY##": str(study_subject.subject.country), "##S_DIAGNOSIS_YEAR##": str(study_subject.get_custom_field_value('Year of diagnosis')), "##S_DATE_ADDED##": date_to_str(study_subject.date_added, DATE_FORMAT_SHORT), "##S_DATE_BORN##": date_born, "##S_DIAGNOSIS##": str(study_subject.get_custom_field_value('Diagnosis')), "##S_EMAIL##": str(study_subject.subject.email), "##S_SEX##": study_subject.subject.get_sex_display(), "##S_MPOWER_ID##": str(study_subject.get_custom_field_value('MPower ID')), "##S_ND_NUMBER##": study_subject.nd_number, "##S_PHONE_NUMBER##": str(study_subject.subject.phone_number), "##S_PHONE_NUMBER_2##": str(study_subject.subject.phone_number_2), "##S_PHONE_NUMBER_3##": str(study_subject.subject.phone_number_3), "##S_POST_CODE##": study_subject.subject.postal_code, "##S_SCREENING_NUMBER##": study_subject.screening_number, "##S_TYPE##": study_subject.type.name, '##S_MAIL_LANGUAGE##': str(study_subject.subject.default_written_communication_language), '##S_KNOWN_LANGUAGES##': ", ".join([lang.name for lang in study_subject.subject.languages.all()]) } if study_subject.health_partner is not None: result["##S_HEALTH_PARTNER_NAME##"] = str(study_subject.health_partner.name) result["##S_HEALTH_PARTNER_ADDRESS##"] = str(study_subject.health_partner.address) result["##S_HEALTH_PARTNER_ZIP_CODE##"] = str(study_subject.health_partner.postal_code) result["##S_HEALTH_PARTNER_CITY##"] = str(study_subject.health_partner.city) else: result["##S_HEALTH_PARTNER_NAME##"] = "" result["##S_HEALTH_PARTNER_ADDRESS##"] = "" result["##S_HEALTH_PARTNER_ZIP_CODE##"] = "" result["##S_HEALTH_PARTNER_CITY##"] = "" for field in CustomStudySubjectField.objects.filter(study__id=GLOBAL_STUDY_ID).iterator(): result[get_field_id(field)] = study_subject.get_custom_field_value(field.name) return result @staticmethod def get_voucher_replacements(voucher: Voucher): if voucher is not None: voucher_type = voucher.voucher_type.description if voucher.activity_type != '': voucher_type += ' (' + voucher.activity_type + ")" return { "##C_NUMBER##": voucher.number, "##C_PATIENT_NAME##": f'{voucher.study_subject.subject.first_name} {voucher.study_subject.subject.last_name}', "##C_VOUCHER_TYPE##": voucher_type, "##C_ISSUE_DATE_SHORT##": voucher.issue_date.strftime(DATE_FORMAT_SHORT), "##C_EXPIRY_START_SHORT##": voucher.expiry_date.strftime(DATE_FORMAT_SHORT), "##C_PARTNER_NAME##": voucher.usage_partner.first_name + ' ' + voucher.usage_partner.last_name, "##C_PARTNER_ADDRESS##": voucher.usage_partner.address, "##C_PARTNER_POSTAL_CODE##": voucher.usage_partner.postal_code, "##C_PARTNER_CITY##": voucher.usage_partner.city, "##C_PARTNER_COUNTRY##": str(voucher.usage_partner.country), "##C_PARTNER_PHONE##": voucher.usage_partner.phone_number, "##C_HOURS##": str(voucher.hours), } return {}