-
Carlos Vega authoredCarlos Vega authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
csv_subject_exporter.py 3.09 KiB
import csv
import datetime
import logging
from web.models import StudySubject, Subject
from ..models.custom_data import CustomStudySubjectField
from ..models.etl.etl_export_data import field_can_be_exported
from ..models.etl.subject_export import SubjectExportData
logger = logging.getLogger(__name__)
class CsvSubjectExporter:
def __init__(self, export_data: SubjectExportData):
self.export_data = export_data
def execute(self):
with open(self.export_data.get_absolute_file_path(), "w", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile, delimiter=self.export_data.csv_delimiter, quoting=csv.QUOTE_ALL)
writer.writerow(self.create_header())
subjects = StudySubject.objects.order_by("-id")
for subject in subjects:
row = self.subject_to_row(subject)
writer.writerow(row)
def subject_to_row(self, study_subject: StudySubject):
result = []
for mapping in self.export_data.column_mappings.order_by("id"):
cell = ""
for field in StudySubject._meta.get_fields():
if field_can_be_exported(field):
if mapping.table_name == StudySubject._meta.db_table and field.name == mapping.column_name:
cell = self.get_cell_value_from_object(field, study_subject)
for field in Subject._meta.get_fields():
if field_can_be_exported(field):
if mapping.table_name == Subject._meta.db_table and field.name == mapping.column_name:
cell = self.get_cell_value_from_object(field, study_subject.subject)
if mapping.table_name == CustomStudySubjectField._meta.db_table:
cell = study_subject.get_custom_data_value(
CustomStudySubjectField.objects.get(pk=int(mapping.column_name.replace("custom_field-", "")))
).value
result.append(cell)
return result
def get_cell_value_from_object(self, field, data):
if field.get_internal_type() == "ForeignKey":
if self.export_data.export_object_as_id:
return getattr(data, field.name + "_id")
else:
return getattr(data, field.name)
elif field.get_internal_type() == "ManyToManyField":
collection_value = ""
for element in getattr(data, field.name).all():
if self.export_data.export_object_as_id:
collection_value += str(element.id) + self.export_data.collection_delimiter
else:
collection_value += str(element) + self.export_data.collection_delimiter
return collection_value
else:
value = getattr(data, field.name)
if isinstance(value, datetime.date):
return value.strftime(self.export_data.date_format)
else:
return value
def create_header(self):
result = []
for mapping in self.export_data.column_mappings.order_by("id"):
result.append(mapping.csv_column_name)
return result