From 8d808589b37201949a233faf0bc83dd9488dcda8 Mon Sep 17 00:00:00 2001 From: Oliver Walters Date: Tue, 24 Dec 2024 00:57:42 +0000 Subject: [PATCH] Remove old serializer classes --- .../InvenTree/InvenTree/serializers.py | 264 ------------------ src/backend/InvenTree/part/serializers.py | 178 ------------ 2 files changed, 442 deletions(-) diff --git a/src/backend/InvenTree/InvenTree/serializers.py b/src/backend/InvenTree/InvenTree/serializers.py index 2a5f730620fd..08b886547547 100644 --- a/src/backend/InvenTree/InvenTree/serializers.py +++ b/src/backend/InvenTree/InvenTree/serializers.py @@ -11,7 +11,6 @@ from django.db import models from django.utils.translation import gettext_lazy as _ -import tablib from djmoney.contrib.django_rest_framework.fields import MoneyField from djmoney.money import Money from djmoney.utils import MONEY_CLASSES, get_currency_field_name @@ -594,269 +593,6 @@ def to_internal_value(self, data): raise serializers.ValidationError(_('Invalid value')) -class DataFileUploadSerializer(serializers.Serializer): - """Generic serializer for uploading a data file, and extracting a dataset. - - - Validates uploaded file - - Extracts column names - - Extracts data rows - """ - - # Implementing class should register a target model (database model) to be used for import - TARGET_MODEL = None - - class Meta: - """Metaclass options.""" - - fields = ['data_file'] - - data_file = serializers.FileField( - label=_('Data File'), - help_text=_('Select data file for upload'), - required=True, - allow_empty_file=False, - ) - - def validate_data_file(self, data_file): - """Perform validation checks on the uploaded data file.""" - self.filename = data_file.name - - _name, ext = os.path.splitext(data_file.name) - - # Remove the leading . from the extension - ext = ext[1:] - - accepted_file_types = ['xls', 'xlsx', 'csv', 'tsv', 'xml'] - - if ext not in accepted_file_types: - raise serializers.ValidationError(_('Unsupported file format')) - - # Impose a 50MB limit on uploaded BOM files - max_upload_file_size = 50 * 1024 * 1024 - - if data_file.size > max_upload_file_size: - raise serializers.ValidationError(_('File is too large')) - - # Read file data into memory (bytes object) - try: - data = data_file.read() - except Exception as e: - raise serializers.ValidationError(str(e)) - - if ext in ['csv', 'tsv', 'xml']: - try: - data = data.decode() - except Exception as e: - raise serializers.ValidationError(str(e)) - - # Convert to a tablib dataset (we expect headers) - try: - self.dataset = tablib.Dataset().load(data, ext, headers=True) - except Exception as e: - raise serializers.ValidationError(str(e)) - - if len(self.dataset.headers) == 0: - raise serializers.ValidationError(_('No columns found in file')) - - if len(self.dataset) == 0: - raise serializers.ValidationError(_('No data rows found in file')) - - return data_file - - def match_column(self, column_name, field_names, exact=False): - """Attempt to match a column name (from the file) to a field (defined in the model). - - Order of matching is: - - Direct match - - Case insensitive match - - Fuzzy match - """ - if not column_name: - return None - - column_name = str(column_name).strip() - - column_name_lower = column_name.lower() - - if column_name in field_names: - return column_name - - for field_name in field_names: - if field_name.lower() == column_name_lower: - return field_name - - if exact: - # Finished available 'exact' matches - return None - - # TODO: Fuzzy pattern matching for column names - - # No matches found - return None - - def extract_data(self): - """Returns dataset extracted from the file.""" - # Provide a dict of available import fields for the model - model_fields = {} - - # Keep track of columns we have already extracted - matched_columns = set() - - if self.TARGET_MODEL: - try: - model_fields = self.TARGET_MODEL.get_import_fields() - except Exception: - pass - - # Extract a list of valid model field names - model_field_names = list(model_fields.keys()) - - # Provide a dict of available columns from the dataset - file_columns = {} - - for header in self.dataset.headers: - column = {} - - # Attempt to "match" file columns to model fields - match = self.match_column(header, model_field_names, exact=True) - - if match is not None and match not in matched_columns: - matched_columns.add(match) - column['value'] = match - else: - column['value'] = None - - file_columns[header] = column - - return { - 'file_fields': file_columns, - 'model_fields': model_fields, - 'rows': [row.values() for row in self.dataset.dict], - 'filename': self.filename, - } - - def save(self): - """Empty overwrite for save.""" - - -class DataFileExtractSerializer(serializers.Serializer): - """Generic serializer for extracting data from an imported dataset. - - - User provides an array of matched headers - - User provides an array of raw data rows - """ - - # Implementing class should register a target model (database model) to be used for import - TARGET_MODEL = None - - class Meta: - """Metaclass options.""" - - fields = ['columns', 'rows'] - - # Mapping of columns - columns = serializers.ListField(child=serializers.CharField(allow_blank=True)) - - rows = serializers.ListField( - child=serializers.ListField( - child=serializers.CharField(allow_blank=True, allow_null=True) - ) - ) - - def validate(self, data): - """Clean data.""" - data = super().validate(data) - - self.columns = data.get('columns', []) - self.rows = data.get('rows', []) - - if len(self.rows) == 0: - raise serializers.ValidationError(_('No data rows provided')) - - if len(self.columns) == 0: - raise serializers.ValidationError(_('No data columns supplied')) - - self.validate_extracted_columns() - - return data - - @property - def data(self): - """Returns current data.""" - if self.TARGET_MODEL: - try: - model_fields = self.TARGET_MODEL.get_import_fields() - except Exception: - model_fields = {} - - rows = [] - - for row in self.rows: - """Optionally pre-process each row, before sending back to the client.""" - - processed_row = self.process_row(self.row_to_dict(row)) - - if processed_row: - rows.append({'original': row, 'data': processed_row}) - - return {'fields': model_fields, 'columns': self.columns, 'rows': rows} - - def process_row(self, row): - """Process a 'row' of data, which is a mapped column:value dict. - - Returns either a mapped column:value dict, or None. - - If the function returns None, the column is ignored! - """ - # Default implementation simply returns the original row data - return row - - def row_to_dict(self, row): - """Convert a "row" to a named data dict.""" - row_dict = {'errors': {}} - - for idx, value in enumerate(row): - if idx < len(self.columns): - col = self.columns[idx] - - if col: - row_dict[col] = value - - return row_dict - - def validate_extracted_columns(self): - """Perform custom validation of header mapping.""" - if self.TARGET_MODEL: - try: - model_fields = self.TARGET_MODEL.get_import_fields() - except Exception: - model_fields = {} - - cols_seen = set() - - for name, field in model_fields.items(): - required = field.get('required', False) - - # Check for missing required columns - if required and name not in self.columns: - raise serializers.ValidationError( - _(f"Missing required column: '{name}'") - ) - - for col in self.columns: - if not col: - continue - - # Check for duplicated columns - if col in cols_seen: - raise serializers.ValidationError(_(f"Duplicate column: '{col}'")) - - cols_seen.add(col) - - def save(self): - """No "save" action for this serializer.""" - - class NotesFieldMixin: """Serializer mixin for handling 'notes' fields. diff --git a/src/backend/InvenTree/part/serializers.py b/src/backend/InvenTree/part/serializers.py index 66001ff93001..8aa30699c96d 100644 --- a/src/backend/InvenTree/part/serializers.py +++ b/src/backend/InvenTree/part/serializers.py @@ -1949,181 +1949,3 @@ def save(self): include_inherited=data.get('include_inherited', False), copy_substitutes=data.get('copy_substitutes', True), ) - - -class BomImportUploadSerializer(InvenTree.serializers.DataFileUploadSerializer): - """Serializer for uploading a file and extracting data from it. - - TODO: Delete this entirely once the new importer process is working - """ - - TARGET_MODEL = BomItem - - class Meta: - """Metaclass defining serializer fields.""" - - fields = ['data_file', 'part', 'clear_existing_bom'] - - part = serializers.PrimaryKeyRelatedField( - queryset=Part.objects.all(), required=True, allow_null=False, many=False - ) - - clear_existing_bom = serializers.BooleanField( - label=_('Clear Existing BOM'), - help_text=_('Delete existing BOM items before uploading'), - ) - - def save(self): - """The uploaded data file has been validated, accept the submitted data.""" - data = self.validated_data - - if data.get('clear_existing_bom', False): - part = data['part'] - - with transaction.atomic(): - part.bom_items.all().delete() - - -class BomImportExtractSerializer(InvenTree.serializers.DataFileExtractSerializer): - """Serializer class for exatracting BOM data from an uploaded file. - - The parent class DataFileExtractSerializer does most of the heavy lifting here. - - TODO: Delete this entirely once the new importer process is working - """ - - TARGET_MODEL = BomItem - - def validate_extracted_columns(self): - """Validate that the extracted columns are correct.""" - super().validate_extracted_columns() - - part_columns = ['part', 'part_name', 'part_ipn', 'part_id'] - - if not any(col in self.columns for col in part_columns): - # At least one part column is required! - raise serializers.ValidationError(_('No part column specified')) - - @staticmethod - def process_row(row): - """Process a single row from the loaded BOM file.""" - # Skip any rows which are at a lower "level" - level = row.get('level', None) - - if level is not None: - try: - level = int(level) - if level != 1: - # Skip this row - return None - except Exception: - pass - - # Attempt to extract a valid part based on the provided data - part_id = row.get('part_id', row.get('part', None)) - part_name = row.get('part_name', row.get('part', None)) - part_ipn = row.get('part_ipn', None) - - part = None - - if part_id is not None: - try: - part = Part.objects.get(pk=part_id) - except (ValueError, Part.DoesNotExist): - pass - - # No direct match, where else can we look? - if part is None and (part_name or part_ipn): - queryset = Part.objects.all() - - if part_name: - queryset = queryset.filter(name=part_name) - - if part_ipn: - queryset = queryset.filter(IPN=part_ipn) - - if queryset.exists(): - if queryset.count() == 1: - part = queryset.first() - else: - row['errors']['part'] = _('Multiple matching parts found') - - if part is None: - row['errors']['part'] = _('No matching part found') - elif not part.component: - row['errors']['part'] = _('Part is not designated as a component') - - # Update the 'part' value in the row - row['part'] = part.pk if part is not None else None - - # Check the provided 'quantity' value - quantity = row.get('quantity', None) - - if quantity is None: - row['errors']['quantity'] = _('Quantity not provided') - else: - try: - quantity = Decimal(quantity) - - if quantity <= 0: - row['errors']['quantity'] = _('Quantity must be greater than zero') - except Exception: - row['errors']['quantity'] = _('Invalid quantity') - - return row - - -class BomImportSubmitSerializer(serializers.Serializer): - """Serializer for uploading a BOM against a specified part. - - A "BOM" is a set of BomItem objects which are to be validated together as a set. - - TODO: Delete this entirely once the new importer process is working - """ - - items = BomItemSerializer(many=True, required=True) - - def validate(self, data): - """Validate the submitted BomItem data. - - At least one line (BomItem) is required - """ - items = data['items'] - - if len(items) == 0: - raise serializers.ValidationError(_('At least one BOM item is required')) - - data = super().validate(data) - - return data - - def save(self): - """POST: Perform final save of submitted BOM data. - - Actions: - - By this stage each line in the BOM has been validated - - Individually 'save' (create) each BomItem line - """ - data = self.validated_data - - items = data['items'] - - bom_items = [] - - try: - for item in items: - part = item['part'] - sub_part = item['sub_part'] - - # Ignore duplicate BOM items - if BomItem.objects.filter(part=part, sub_part=sub_part).exists(): - continue - - bom_items.append(BomItem(**item)) - - if len(bom_items) > 0: - logger.info('Importing %s BOM items', len(bom_items)) - BomItem.objects.bulk_create(bom_items) - - except Exception as e: - raise serializers.ValidationError(detail=serializers.as_serializer_error(e))