diff --git a/conf.ini b/conf.ini index 88a1fa103..e4667ca92 100644 --- a/conf.ini +++ b/conf.ini @@ -19,6 +19,8 @@ DO_GZIP_RESPONSE = True SECRET_KEY=OmuudYrSFVxcIVIWf6YlYdkP6NXApP TOKEN_SIGINING_KEY=JsVzvtWw2EwksaYCZsMmd2zmm TOKEN_REFRESH_ROTATE = True +DISABLE_V2_API = False + #PORTFOLIO_PARQUET_STORAGE = True #TOKEN_REFRESH_LIFETIME = minutes=0, hours=0, days=0, weeks=0 diff --git a/kubernetes/worker-controller/src/autoscaler.py b/kubernetes/worker-controller/src/autoscaler.py index 41ab74d5c..ea48120b6 100755 --- a/kubernetes/worker-controller/src/autoscaler.py +++ b/kubernetes/worker-controller/src/autoscaler.py @@ -166,7 +166,9 @@ async def parse_queued_pending(self, msg) -> [RunningAnalysis]: # Check for pending analyses if (queue_name not in ['celery', 'celery-v2', 'task-controller']): - queued_count = entry['queue']['queued_count'] + queued_task_count = entry.get('queue', {}).get('queued_count', 0) # sub-task queued (API DB status) + queue_message_count = entry.get('queue', {}).get('queue_message_count', 0) # queue has messages + queued_count = max(queued_task_count, queue_message_count) if (queued_count > 0) and not analyses_list: # a task is queued, but no analyses are running. diff --git a/scripts/minikube-deploy.sh b/scripts/minikube-deploy.sh index 78feb008e..933c71e11 100755 --- a/scripts/minikube-deploy.sh +++ b/scripts/minikube-deploy.sh @@ -25,10 +25,10 @@ if [[ -z $OASIS_MODEL_DATA_DIR ]]; then fi ## init minikube -minikube delete -minikube config set cpus 12 -minikube config set memory 16000 -minikube start +# minikube delete +# minikube config set cpus 12 +# minikube config set memory 16000 +# minikube start # build images eval $(minikube docker-env) diff --git a/src/model_execution_worker/distributed_tasks.py b/src/model_execution_worker/distributed_tasks.py index 7a079dc5c..f2c271aee 100644 --- a/src/model_execution_worker/distributed_tasks.py +++ b/src/model_execution_worker/distributed_tasks.py @@ -1005,18 +1005,9 @@ def prepare_losses_generation_directory(self, params, analysis_id=None, slug=Non @app.task(bind=True, name='generate_losses_chunk', **celery_conf.worker_task_kwargs) @loss_generation_task def generate_losses_chunk(self, params, chunk_idx, num_chunks, analysis_id=None, slug=None, **kwargs): - - if num_chunks == 1: - # Run multiple ktools pipes (based on cpu cores) - current_chunk_id = None - max_chunk_id = -1 - work_dir = 'work' - - else: - # Run a single ktools pipe - current_chunk_id = chunk_idx + 1 - max_chunk_id = num_chunks - work_dir = f'{current_chunk_id}.work' + current_chunk_id = chunk_idx + 1 + max_chunk_id = num_chunks + work_dir = f'{current_chunk_id}.work' chunk_params = { **params, diff --git a/src/server/oasisapi/analyses/migrations/0013_analysis_chunking_options.py b/src/server/oasisapi/analyses/migrations/0013_analysis_chunking_options.py new file mode 100644 index 000000000..9bf77f309 --- /dev/null +++ b/src/server/oasisapi/analyses/migrations/0013_analysis_chunking_options.py @@ -0,0 +1,20 @@ +# Generated by Django 3.2.20 on 2024-01-12 13:17 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('analysis_models', '0008_analysismodel_run_mode'), + ('analyses', '0012_analysis_run_mode'), + ] + + operations = [ + migrations.AddField( + model_name='analysis', + name='chunking_options', + field=models.OneToOneField(auto_created=True, default=None, null=True, on_delete=django.db.models.deletion.CASCADE, to='analysis_models.modelchunkingoptions'), + ), + ] diff --git a/src/server/oasisapi/analyses/models.py b/src/server/oasisapi/analyses/models.py index c7e972a2f..8505c6bd4 100644 --- a/src/server/oasisapi/analyses/models.py +++ b/src/server/oasisapi/analyses/models.py @@ -18,7 +18,7 @@ from src.server.oasisapi.celery_app_v2 import v2 as celery_app_v2 from src.server.oasisapi.queues.consumers import send_task_status_message, TaskStatusMessageItem, \ TaskStatusMessageAnalysisItem, build_task_status_message -from ..analysis_models.models import AnalysisModel +from ..analysis_models.models import AnalysisModel, ModelChunkingOptions from ..data_files.models import DataFile from ..files.models import RelatedFile, file_storage_link from ..portfolios.models import Portfolio @@ -198,6 +198,8 @@ class Analysis(TimeStampedModel): summary_levels_file = models.ForeignKey(RelatedFile, on_delete=models.CASCADE, blank=True, null=True, default=None, related_name='summary_levels_file_analyses') + chunking_options = models.OneToOneField(ModelChunkingOptions, on_delete=models.CASCADE, auto_created=True, default=None, null=True) + class Meta: ordering = ['id'] verbose_name_plural = 'analyses' @@ -299,6 +301,10 @@ def get_absolute_subtask_list_url(self, request=None, namespace=None): override_ns = f'{namespace}:' if namespace else '' return reverse(f'{override_ns}analysis-sub-task-list', kwargs={'pk': self.pk}, request=self._update_ns(request)) + def get_absolute_chunking_configuration_url(self, request=None, namespace=None): + override_ns = f'{namespace}:' if namespace else '' + return reverse(f'{override_ns}analysis-chunking-configuration', kwargs={'pk': self.pk}, request=self._update_ns(request)) + def get_groups(self): groups = [] portfolio_groups = self.portfolio.groups.all() @@ -307,11 +313,19 @@ def get_groups(self): return groups def get_num_events(self): - selected_strat = self.model.chunking_options.loss_strategy - dynamic_strat = self.model.chunking_options.chunking_types.DYNAMIC_CHUNKS - if selected_strat != dynamic_strat: - return 1 + # Select Chunking opts + if self.chunking_options is None: + chunking_options = self.model.chunking_options + else: + chunking_options = self.chunking_options + + # Esc if not DYNAMIC + DYNAMIC_CHUNKS = chunking_options.chunking_types.DYNAMIC_CHUNKS + if chunking_options.loss_strategy != DYNAMIC_CHUNKS: + return None + + # Return num of selected User events from settings analysis_settings = self.settings_file.read_json() model_settings = self.model.resource_file.read_json() user_selected_events = analysis_settings.get('event_ids', []) @@ -319,6 +333,7 @@ def get_num_events(self): if len(user_selected_events) > 1: return len(user_selected_events) + # Read event_set_size for model settings event_set_options = model_settings.get('model_settings', {}).get('event_set').get('options', []) event_set_sizes = {e['id']: e['number_of_events'] for e in event_set_options if 'number_of_events' in e} if selected_event_set not in event_set_sizes: @@ -412,8 +427,14 @@ def validate_run(self): if not self.input_file: errors['input_file'] = ['Must not be null'] + # Get chunking options + if self.chunking_options is None: + chunking_options = self.model.chunking_options + else: + chunking_options = self.chunking_options + # Valadation for dyanmic loss chunks - if self.model.chunking_options.loss_strategy == self.model.chunking_options.chunking_types.DYNAMIC_CHUNKS: + if chunking_options.loss_strategy == chunking_options.chunking_types.DYNAMIC_CHUNKS: if not self.model.resource_file: errors['model_settings_file'] = ['Must not be null for Dynamic chunking'] elif self.settings_file: @@ -771,7 +792,7 @@ def copy(self): def delete_connected_files(sender, instance, **kwargs): """ Post delete handler to clear out any dangaling analyses files """ - files_for_removal = [ + for_removal = [ 'settings_file', 'input_file', 'input_generation_traceback_file', @@ -782,8 +803,23 @@ def delete_connected_files(sender, instance, **kwargs): 'lookup_success_file', 'lookup_validation_file', 'summary_levels_file', + 'chunking_options', + ] + for ref in for_removal: + obj_ref = getattr(instance, ref) + if obj_ref: + obj_ref.delete() + + +@receiver(post_delete, sender=AnalysisTaskStatus) +def delete_connected_task_logs(sender, instance, **kwargs): + """ Post delete handler to clear out any dangaling log files + """ + for_removal = [ + 'output_log', + 'error_log', ] - for ref in files_for_removal: - file_ref = getattr(instance, ref) - if file_ref: - file_ref.delete() + for ref in for_removal: + obj_ref = getattr(instance, ref) + if obj_ref: + obj_ref.delete() diff --git a/src/server/oasisapi/analyses/v2_api/serializers.py b/src/server/oasisapi/analyses/v2_api/serializers.py index 24c894e99..59aaa2931 100644 --- a/src/server/oasisapi/analyses/v2_api/serializers.py +++ b/src/server/oasisapi/analyses/v2_api/serializers.py @@ -9,7 +9,6 @@ from ...files.models import file_storage_link from ...permissions.group_auth import verify_and_get_groups, validate_data_files - from ...schemas.serializers import ( GroupNameSerializer, QueueNameSerializer, @@ -95,6 +94,7 @@ class Meta: run_traceback_file = serializers.SerializerMethodField(read_only=True) run_log_file = serializers.SerializerMethodField(read_only=True) storage_links = serializers.SerializerMethodField(read_only=True) + chunking_configuration = serializers.SerializerMethodField(read_only=True) @swagger_serializer_method(serializer_or_field=serializers.URLField) def get_input_file(self, instance): @@ -156,6 +156,11 @@ def get_storage_links(self, instance): request = self.context.get('request') return instance.get_absolute_storage_url(request=request) + @swagger_serializer_method(serializer_or_field=serializers.URLField) + def get_chunking_configuration(self, instance): + request = self.context.get('request') + return instance.get_absolute_chunking_configuration_url(request=request) + @swagger_serializer_method(serializer_or_field=GroupNameSerializer) def get_groups(self, instance): return instance.get_groups() @@ -206,6 +211,7 @@ class AnalysisSerializer(serializers.ModelSerializer): run_traceback_file = serializers.SerializerMethodField() run_log_file = serializers.SerializerMethodField() storage_links = serializers.SerializerMethodField() + chunking_configuration = serializers.SerializerMethodField() ns = 'v2-analyses' # Groups - inherited from portfolio @@ -244,6 +250,7 @@ class Meta: 'run_traceback_file', 'run_log_file', 'storage_links', + 'chunking_configuration', 'lookup_chunks', 'analysis_chunks', 'sub_task_count', @@ -314,6 +321,11 @@ def get_storage_links(self, instance): request = self.context.get('request') return instance.get_absolute_storage_url(request=request, namespace=self.ns) + @swagger_serializer_method(serializer_or_field=serializers.URLField) + def get_chunking_configuration(self, instance): + request = self.context.get('request') + return instance.get_absolute_chunking_configuration_url(request=request, namespace=self.ns) + @swagger_serializer_method(serializer_or_field=GroupNameSerializer) def get_groups(self, instance): return instance.get_groups() diff --git a/src/server/oasisapi/analyses/v2_api/task_controller.py b/src/server/oasisapi/analyses/v2_api/task_controller.py index b7c6a0fff..81817534d 100644 --- a/src/server/oasisapi/analyses/v2_api/task_controller.py +++ b/src/server/oasisapi/analyses/v2_api/task_controller.py @@ -369,11 +369,7 @@ def generate_inputs(cls, analysis: 'Analysis', initiator: User, loc_lines: int) from src.server.oasisapi.analyses.models import Analysis # fetch the number of lookup chunks and store in analysis - if analysis.model.chunking_options.lookup_strategy == 'FIXED_CHUNKS': - num_chunks = min(analysis.model.chunking_options.fixed_lookup_chunks, loc_lines) - elif analysis.model.chunking_options.lookup_strategy == 'DYNAMIC_CHUNKS': - loc_lines_per_chunk = analysis.model.chunking_options.dynamic_locations_per_lookup - num_chunks = min(ceil(loc_lines / loc_lines_per_chunk), analysis.model.chunking_options.dynamic_chunks_max) + num_chunks = cls._get_inputs_generation_chunks(analysis, loc_lines) run_data_uuid = uuid.uuid4().hex statuses, tasks = cls.get_inputs_generation_tasks(analysis, initiator, run_data_uuid, num_chunks) @@ -394,13 +390,22 @@ def generate_inputs(cls, analysis: 'Analysis', initiator: User, loc_lines: int) return chain @classmethod - def _get_inputs_generation_chunks(cls, analysis): - if analysis.model.chunking_options.lookup_strategy == 'FIXED_CHUNKS': - num_chunks = analysis.model.chunking_options.fixed_lookup_chunks - elif analysis.model.chunking_options.lookup_strategy == 'DYNAMIC_CHUNKS': - loc_lines = sum(1 for line in analysis.portfolio.location_file.read()) - loc_lines_per_chunk = analysis.model.chunking_options.dynamic_locations_per_lookup - num_chunks = ceil(loc_lines / loc_lines_per_chunk) + def _get_inputs_generation_chunks(cls, analysis, loc_lines): + # loc_lines = sum(1 for line in analysis.portfolio.location_file.read()) + + # Get options + if analysis.chunking_options is not None: + chunking_options = analysis.chunking_options # Use options from Analysis + else: + chunking_options = analysis.model.chunking_options # Use defaults set on model + + # Set chunks + if chunking_options.lookup_strategy == 'FIXED_CHUNKS': + num_chunks = min(chunking_options.fixed_lookup_chunks, loc_lines) + elif chunking_options.lookup_strategy == 'DYNAMIC_CHUNKS': + loc_lines_per_chunk = chunking_options.dynamic_locations_per_lookup + num_chunks = min(ceil(loc_lines / loc_lines_per_chunk), chunking_options.dynamic_chunks_max) + return num_chunks @classmethod @@ -512,13 +517,7 @@ def generate_losses(cls, analysis: 'Analysis', initiator: User, events_total: in """ from src.server.oasisapi.analyses.models import Analysis - # fetch number of event chunks - if analysis.model.chunking_options.loss_strategy == 'FIXED_CHUNKS': - num_chunks = analysis.model.chunking_options.fixed_analysis_chunks - elif analysis.model.chunking_options.loss_strategy == 'DYNAMIC_CHUNKS': - events_per_chunk = analysis.model.chunking_options.dynamic_events_per_analysis - num_chunks = min(ceil(events_total / events_per_chunk), analysis.model.chunking_options.dynamic_chunks_max) - + num_chunks = cls._get_loss_generation_chunks(analysis, events_total) run_data_uuid = uuid.uuid4().hex statuses, tasks = cls.get_loss_generation_tasks(analysis, initiator, run_data_uuid, num_chunks) @@ -538,11 +537,20 @@ def generate_losses(cls, analysis: 'Analysis', initiator: User, events_total: in return chain @classmethod - def _get_loss_generation_chunks(cls, analysis): - if analysis.model.chunking_options.loss_strategy == 'FIXED_CHUNKS': - num_chunks = analysis.model.chunking_options.fixed_analysis_chunks - elif analysis.model.chunking_options.loss_strategy == 'DYNAMIC_CHUNKS': - raise notimplementederror("FEATURE NOT AVALIBLE -- need event set size from worker") + def _get_loss_generation_chunks(cls, analysis, events_total): + # Get options + if analysis.chunking_options is not None: + chunking_options = analysis.chunking_options # Use options from Analysis + else: + chunking_options = analysis.model.chunking_options # Use defaults set on model + + # fetch number of event chunks + if chunking_options.loss_strategy == 'FIXED_CHUNKS': + num_chunks = chunking_options.fixed_analysis_chunks + elif chunking_options.loss_strategy == 'DYNAMIC_CHUNKS': + events_per_chunk = chunking_options.dynamic_events_per_analysis + num_chunks = min(ceil(events_total / events_per_chunk), chunking_options.dynamic_chunks_max) + return num_chunks @classmethod diff --git a/src/server/oasisapi/analyses/v2_api/tests/test_analysis_api.py b/src/server/oasisapi/analyses/v2_api/tests/test_analysis_api.py index e6181269d..f9d406baa 100644 --- a/src/server/oasisapi/analyses/v2_api/tests/test_analysis_api.py +++ b/src/server/oasisapi/analyses/v2_api/tests/test_analysis_api.py @@ -156,6 +156,7 @@ def test_cleaned_name_portfolio_and_model_are_present___object_is_created(self, 'task_finished': None, 'groups': [], 'analysis_chunks': None, + 'chunking_configuration': 'http://testserver/v2/analyses/1/chunking_configuration/', 'lookup_chunks': None, 'priority': 4, }, response.json) @@ -233,6 +234,7 @@ def test_complex_model_file_present___object_is_created(self, name): 'task_started': None, 'task_finished': None, 'analysis_chunks': None, + 'chunking_configuration': 'http://testserver/v2/analyses/1/chunking_configuration/', 'lookup_chunks': None, 'priority': 4, }, response.json) diff --git a/src/server/oasisapi/analyses/v2_api/tests/test_analysis_model.py b/src/server/oasisapi/analyses/v2_api/tests/test_analysis_model.py index ec4ccf6b2..7d66a0fc4 100644 --- a/src/server/oasisapi/analyses/v2_api/tests/test_analysis_model.py +++ b/src/server/oasisapi/analyses/v2_api/tests/test_analysis_model.py @@ -245,7 +245,7 @@ def test_state_is_ready___run_is_started(self, status, task_id): with patch('src.server.oasisapi.analyses.models.celery_app_v2.send_task', new=mock_task): analysis.run(initiator, version='v2') - mock_task.assert_called_once_with('start_loss_generation_task', (analysis.pk, initiator.pk, 1), + mock_task.assert_called_once_with('start_loss_generation_task', (analysis.pk, initiator.pk, None), {}, queue='celery-v2', link_error=ANY, priority=4) @given( diff --git a/src/server/oasisapi/analyses/v2_api/viewsets.py b/src/server/oasisapi/analyses/v2_api/viewsets.py index 485612a52..3b10765ea 100644 --- a/src/server/oasisapi/analyses/v2_api/viewsets.py +++ b/src/server/oasisapi/analyses/v2_api/viewsets.py @@ -17,6 +17,7 @@ from .serializers import AnalysisSerializer, AnalysisCopySerializer, AnalysisTaskStatusSerializer, \ AnalysisStorageSerializer, AnalysisListSerializer from ...analysis_models.models import AnalysisModel +from ...analysis_models.v2_api.serializers import ModelChunkingConfigSerializer from ...data_files.v2_api.serializers import DataFileSerializer from ...files.serializers import RelatedFileSerializer from ...files.views import handle_related_file, handle_json_data @@ -268,6 +269,8 @@ def get_serializer_class(self): return AnalysisStorageSerializer elif self.action in self.file_action_types_with_settings_file: return RelatedFileSerializer + elif self.action in ['chunking_configuration']: + return ModelChunkingConfigSerializer else: return Serializer @@ -544,6 +547,19 @@ def sub_task_list(self, request, pk=None, version=None): serializer = AnalysisTaskStatusSerializer(sub_task_queryset, many=True, context=context) return Response(serializer.data) + @action(methods=['get', 'post'], detail=True) + def chunking_configuration(self, request, pk=None, version=None): + method = request.method.lower() + obj = self.get_object() + if method == 'get': + serializer = self.get_serializer(obj.chunking_options) + else: + serializer = self.get_serializer(obj.chunking_options, data=request.data) + serializer.is_valid(raise_exception=True) + obj.chunking_options = serializer.save() + obj.save() + return Response(serializer.data) + class AnalysisSettingsView(VerifyGroupAccessModelViewSet): """ diff --git a/src/server/oasisapi/analysis_models/admin.py b/src/server/oasisapi/analysis_models/admin.py index 8a492324d..7ff3d6bca 100644 --- a/src/server/oasisapi/analysis_models/admin.py +++ b/src/server/oasisapi/analysis_models/admin.py @@ -1,5 +1,5 @@ from django.contrib import admin -from .models import AnalysisModel, SettingsTemplate +from .models import AnalysisModel, SettingsTemplate, ModelScalingOptions, ModelChunkingOptions from django.contrib.admin.actions import delete_selected as delete_selected_ @@ -28,8 +28,13 @@ def activate_model(modeladmin, request, queryset): @admin.register(AnalysisModel) class CatModelAdmin(admin.ModelAdmin): actions = [delete_hard, activate_model] - - list_display = ['model_id', 'supplier_id', 'version_id', 'creator', 'deleted'] + list_display = [ + 'model_id', + 'supplier_id', + 'version_id', + 'creator', + 'deleted' + ] def get_queryset(self, request): return self.model.all_objects @@ -41,4 +46,33 @@ def get_queryset(self, request): @admin.register(SettingsTemplate) class SettingsTemplateAdmin(admin.ModelAdmin): - list_display = ['file', 'name', 'creator'] + list_display = [ + 'file', + 'name', + 'creator' + ] + + +@admin.register(ModelScalingOptions) +class ModelScalingOptionsAdmin(admin.ModelAdmin): + list_display = [ + 'scaling_types', + 'scaling_strategy', + 'worker_count_fixed', + 'worker_count_max', + 'worker_count_min', + 'chunks_per_worker', + ] + + +@admin.register(ModelChunkingOptions) +class ModelChunkingTemplateAdmin(admin.ModelAdmin): + list_display = [ + 'lookup_strategy', + 'loss_strategy', + 'dynamic_locations_per_lookup', + 'dynamic_events_per_analysis', + 'dynamic_chunks_max', + 'fixed_analysis_chunks', + 'fixed_lookup_chunks', + ] diff --git a/src/server/oasisapi/settings/base.py b/src/server/oasisapi/settings/base.py index 69280fb32..d7f1ae74b 100644 --- a/src/server/oasisapi/settings/base.py +++ b/src/server/oasisapi/settings/base.py @@ -447,6 +447,9 @@ CELERY_TASK_ALWAYS_EAGER = True +# Option to remote the 'v2' routes and only run the server with 'v1' endpoints +DISABLE_V2_API = iniconf.settings.getboolean('server', 'disable_v2_api', fallback=False) + if DEBUG_TOOLBAR: INTERNAL_IPS = [ '127.0.0.1', diff --git a/src/server/oasisapi/urls.py b/src/server/oasisapi/urls.py index fbd2c4539..0352a3a6e 100644 --- a/src/server/oasisapi/urls.py +++ b/src/server/oasisapi/urls.py @@ -73,13 +73,14 @@ ] # API v2 Routes -api_urlpatterns += [ - url(r'^v2/', include('src.server.oasisapi.analysis_models.v2_api.urls', namespace='v2-models')), - url(r'^v2/', include('src.server.oasisapi.analyses.v2_api.urls', namespace='v2-analyses')), - url(r'^v2/', include('src.server.oasisapi.portfolios.v2_api.urls', namespace='v2-portfolios')), - url(r'^v2/', include('src.server.oasisapi.data_files.v2_api.urls', namespace='v2-files')), - url(r'^v2/', include('src.server.oasisapi.queues.urls', namespace='v2-queues')), -] +if not settings.DISABLE_V2_API: + api_urlpatterns += [ + url(r'^v2/', include('src.server.oasisapi.analysis_models.v2_api.urls', namespace='v2-models')), + url(r'^v2/', include('src.server.oasisapi.analyses.v2_api.urls', namespace='v2-analyses')), + url(r'^v2/', include('src.server.oasisapi.portfolios.v2_api.urls', namespace='v2-portfolios')), + url(r'^v2/', include('src.server.oasisapi.data_files.v2_api.urls', namespace='v2-files')), + url(r'^v2/', include('src.server.oasisapi.queues.urls', namespace='v2-queues')), + ] urlpatterns = static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) if settings.URL_SUB_PATH: