Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement BOM export tool #8756

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 0 additions & 55 deletions src/backend/InvenTree/InvenTree/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,61 +225,6 @@ def set_metadata(
self.save()


class DataImportMixin:
"""Model mixin class which provides support for 'data import' functionality.

Models which implement this mixin should provide information on the fields available for import
"""

# TODO: This mixin should be removed after https://github.com/inventree/InvenTree/pull/6911 is implemented
# TODO: This approach to data import functionality is *outdated*

# Define a map of fields available for import
IMPORT_FIELDS = {}

@classmethod
def get_import_fields(cls):
"""Return all available import fields.

Where information on a particular field is not explicitly provided,
introspect the base model to (attempt to) find that information.
"""
fields = cls.IMPORT_FIELDS

for name, field in fields.items():
# Attempt to extract base field information from the model
base_field = None

for f in cls._meta.fields:
if f.name == name:
base_field = f
break

if base_field:
if 'label' not in field:
field['label'] = base_field.verbose_name

if 'help_text' not in field:
field['help_text'] = base_field.help_text

fields[name] = field

return fields

@classmethod
def get_required_import_fields(cls):
"""Return all *required* import fields."""
fields = {}

for name, field in cls.get_import_fields().items():
required = field.get('required', False)

if required:
fields[name] = field

return fields


class ReferenceIndexingMixin(models.Model):
"""A mixin for keeping track of numerical copies of the "reference" field.

Expand Down
264 changes: 0 additions & 264 deletions src/backend/InvenTree/InvenTree/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from django.db import models
from django.utils.translation import gettext_lazy as _

import tablib
from djmoney.contrib.django_rest_framework.fields import MoneyField
from djmoney.money import Money
from djmoney.utils import MONEY_CLASSES, get_currency_field_name
Expand Down Expand Up @@ -594,269 +593,6 @@ def to_internal_value(self, data):
raise serializers.ValidationError(_('Invalid value'))


class DataFileUploadSerializer(serializers.Serializer):
"""Generic serializer for uploading a data file, and extracting a dataset.

- Validates uploaded file
- Extracts column names
- Extracts data rows
"""

# Implementing class should register a target model (database model) to be used for import
TARGET_MODEL = None

class Meta:
"""Metaclass options."""

fields = ['data_file']

data_file = serializers.FileField(
label=_('Data File'),
help_text=_('Select data file for upload'),
required=True,
allow_empty_file=False,
)

def validate_data_file(self, data_file):
"""Perform validation checks on the uploaded data file."""
self.filename = data_file.name

_name, ext = os.path.splitext(data_file.name)

# Remove the leading . from the extension
ext = ext[1:]

accepted_file_types = ['xls', 'xlsx', 'csv', 'tsv', 'xml']

if ext not in accepted_file_types:
raise serializers.ValidationError(_('Unsupported file format'))

# Impose a 50MB limit on uploaded BOM files
max_upload_file_size = 50 * 1024 * 1024

if data_file.size > max_upload_file_size:
raise serializers.ValidationError(_('File is too large'))

# Read file data into memory (bytes object)
try:
data = data_file.read()
except Exception as e:
raise serializers.ValidationError(str(e))

if ext in ['csv', 'tsv', 'xml']:
try:
data = data.decode()
except Exception as e:
raise serializers.ValidationError(str(e))

# Convert to a tablib dataset (we expect headers)
try:
self.dataset = tablib.Dataset().load(data, ext, headers=True)
except Exception as e:
raise serializers.ValidationError(str(e))

if len(self.dataset.headers) == 0:
raise serializers.ValidationError(_('No columns found in file'))

if len(self.dataset) == 0:
raise serializers.ValidationError(_('No data rows found in file'))

return data_file

def match_column(self, column_name, field_names, exact=False):
"""Attempt to match a column name (from the file) to a field (defined in the model).

Order of matching is:
- Direct match
- Case insensitive match
- Fuzzy match
"""
if not column_name:
return None

column_name = str(column_name).strip()

column_name_lower = column_name.lower()

if column_name in field_names:
return column_name

for field_name in field_names:
if field_name.lower() == column_name_lower:
return field_name

if exact:
# Finished available 'exact' matches
return None

# TODO: Fuzzy pattern matching for column names

# No matches found
return None

def extract_data(self):
"""Returns dataset extracted from the file."""
# Provide a dict of available import fields for the model
model_fields = {}

# Keep track of columns we have already extracted
matched_columns = set()

if self.TARGET_MODEL:
try:
model_fields = self.TARGET_MODEL.get_import_fields()
except Exception:
pass

# Extract a list of valid model field names
model_field_names = list(model_fields.keys())

# Provide a dict of available columns from the dataset
file_columns = {}

for header in self.dataset.headers:
column = {}

# Attempt to "match" file columns to model fields
match = self.match_column(header, model_field_names, exact=True)

if match is not None and match not in matched_columns:
matched_columns.add(match)
column['value'] = match
else:
column['value'] = None

file_columns[header] = column

return {
'file_fields': file_columns,
'model_fields': model_fields,
'rows': [row.values() for row in self.dataset.dict],
'filename': self.filename,
}

def save(self):
"""Empty overwrite for save."""


class DataFileExtractSerializer(serializers.Serializer):
"""Generic serializer for extracting data from an imported dataset.

- User provides an array of matched headers
- User provides an array of raw data rows
"""

# Implementing class should register a target model (database model) to be used for import
TARGET_MODEL = None

class Meta:
"""Metaclass options."""

fields = ['columns', 'rows']

# Mapping of columns
columns = serializers.ListField(child=serializers.CharField(allow_blank=True))

rows = serializers.ListField(
child=serializers.ListField(
child=serializers.CharField(allow_blank=True, allow_null=True)
)
)

def validate(self, data):
"""Clean data."""
data = super().validate(data)

self.columns = data.get('columns', [])
self.rows = data.get('rows', [])

if len(self.rows) == 0:
raise serializers.ValidationError(_('No data rows provided'))

if len(self.columns) == 0:
raise serializers.ValidationError(_('No data columns supplied'))

self.validate_extracted_columns()

return data

@property
def data(self):
"""Returns current data."""
if self.TARGET_MODEL:
try:
model_fields = self.TARGET_MODEL.get_import_fields()
except Exception:
model_fields = {}

rows = []

for row in self.rows:
"""Optionally pre-process each row, before sending back to the client."""

processed_row = self.process_row(self.row_to_dict(row))

if processed_row:
rows.append({'original': row, 'data': processed_row})

return {'fields': model_fields, 'columns': self.columns, 'rows': rows}

def process_row(self, row):
"""Process a 'row' of data, which is a mapped column:value dict.

Returns either a mapped column:value dict, or None.

If the function returns None, the column is ignored!
"""
# Default implementation simply returns the original row data
return row

def row_to_dict(self, row):
"""Convert a "row" to a named data dict."""
row_dict = {'errors': {}}

for idx, value in enumerate(row):
if idx < len(self.columns):
col = self.columns[idx]

if col:
row_dict[col] = value

return row_dict

def validate_extracted_columns(self):
"""Perform custom validation of header mapping."""
if self.TARGET_MODEL:
try:
model_fields = self.TARGET_MODEL.get_import_fields()
except Exception:
model_fields = {}

cols_seen = set()

for name, field in model_fields.items():
required = field.get('required', False)

# Check for missing required columns
if required and name not in self.columns:
raise serializers.ValidationError(
_(f"Missing required column: '{name}'")
)

for col in self.columns:
if not col:
continue

# Check for duplicated columns
if col in cols_seen:
raise serializers.ValidationError(_(f"Duplicate column: '{col}'"))

cols_seen.add(col)

def save(self):
"""No "save" action for this serializer."""


class NotesFieldMixin:
"""Serializer mixin for handling 'notes' fields.

Expand Down
Loading
Loading