Skip to content

Commit

Permalink
Added chunking options to analysis level (#944)
Browse files Browse the repository at this point in the history
* Add missing check for build up of queue messages

* First pass

* Attach chunking options to analyses model

* Add chunking endpoints to analysis

* Set task-controller to check both model and analysis chunking options

* PEP

* flake

* Add migrations

* update tests

* Add env var to disable v2 api

* f]

* remove special case -- chunk=1 now means run losses in a single ktools pipe

* Debug prints

* Read chunking options from analysis when running job validation

* fix & pep

* f

* Fixed -- removed debug logging

* Remove logging

* Update test expected
  • Loading branch information
sambles authored Jan 15, 2024
1 parent 1aed1a6 commit e0f832c
Show file tree
Hide file tree
Showing 14 changed files with 192 additions and 65 deletions.
2 changes: 2 additions & 0 deletions conf.ini
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ DO_GZIP_RESPONSE = True
SECRET_KEY=OmuudYrSFVxcIVIWf6YlYdkP6NXApP
TOKEN_SIGINING_KEY=JsVzvtWw2EwksaYCZsMmd2zmm
TOKEN_REFRESH_ROTATE = True
DISABLE_V2_API = False

#PORTFOLIO_PARQUET_STORAGE = True
#TOKEN_REFRESH_LIFETIME = minutes=0, hours=0, days=0, weeks=0

Expand Down
4 changes: 3 additions & 1 deletion kubernetes/worker-controller/src/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ async def parse_queued_pending(self, msg) -> [RunningAnalysis]:

# Check for pending analyses
if (queue_name not in ['celery', 'celery-v2', 'task-controller']):
queued_count = entry['queue']['queued_count']
queued_task_count = entry.get('queue', {}).get('queued_count', 0) # sub-task queued (API DB status)
queue_message_count = entry.get('queue', {}).get('queue_message_count', 0) # queue has messages
queued_count = max(queued_task_count, queue_message_count)

if (queued_count > 0) and not analyses_list:
# a task is queued, but no analyses are running.
Expand Down
8 changes: 4 additions & 4 deletions scripts/minikube-deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ if [[ -z $OASIS_MODEL_DATA_DIR ]]; then
fi

## init minikube
minikube delete
minikube config set cpus 12
minikube config set memory 16000
minikube start
# minikube delete
# minikube config set cpus 12
# minikube config set memory 16000
# minikube start

# build images
eval $(minikube docker-env)
Expand Down
15 changes: 3 additions & 12 deletions src/model_execution_worker/distributed_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1005,18 +1005,9 @@ def prepare_losses_generation_directory(self, params, analysis_id=None, slug=Non
@app.task(bind=True, name='generate_losses_chunk', **celery_conf.worker_task_kwargs)
@loss_generation_task
def generate_losses_chunk(self, params, chunk_idx, num_chunks, analysis_id=None, slug=None, **kwargs):

if num_chunks == 1:
# Run multiple ktools pipes (based on cpu cores)
current_chunk_id = None
max_chunk_id = -1
work_dir = 'work'

else:
# Run a single ktools pipe
current_chunk_id = chunk_idx + 1
max_chunk_id = num_chunks
work_dir = f'{current_chunk_id}.work'
current_chunk_id = chunk_idx + 1
max_chunk_id = num_chunks
work_dir = f'{current_chunk_id}.work'

chunk_params = {
**params,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated by Django 3.2.20 on 2024-01-12 13:17

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('analysis_models', '0008_analysismodel_run_mode'),
('analyses', '0012_analysis_run_mode'),
]

operations = [
migrations.AddField(
model_name='analysis',
name='chunking_options',
field=models.OneToOneField(auto_created=True, default=None, null=True, on_delete=django.db.models.deletion.CASCADE, to='analysis_models.modelchunkingoptions'),
),
]
58 changes: 47 additions & 11 deletions src/server/oasisapi/analyses/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from src.server.oasisapi.celery_app_v2 import v2 as celery_app_v2
from src.server.oasisapi.queues.consumers import send_task_status_message, TaskStatusMessageItem, \
TaskStatusMessageAnalysisItem, build_task_status_message
from ..analysis_models.models import AnalysisModel
from ..analysis_models.models import AnalysisModel, ModelChunkingOptions
from ..data_files.models import DataFile
from ..files.models import RelatedFile, file_storage_link
from ..portfolios.models import Portfolio
Expand Down Expand Up @@ -198,6 +198,8 @@ class Analysis(TimeStampedModel):
summary_levels_file = models.ForeignKey(RelatedFile, on_delete=models.CASCADE, blank=True, null=True,
default=None, related_name='summary_levels_file_analyses')

chunking_options = models.OneToOneField(ModelChunkingOptions, on_delete=models.CASCADE, auto_created=True, default=None, null=True)

class Meta:
ordering = ['id']
verbose_name_plural = 'analyses'
Expand Down Expand Up @@ -299,6 +301,10 @@ def get_absolute_subtask_list_url(self, request=None, namespace=None):
override_ns = f'{namespace}:' if namespace else ''
return reverse(f'{override_ns}analysis-sub-task-list', kwargs={'pk': self.pk}, request=self._update_ns(request))

def get_absolute_chunking_configuration_url(self, request=None, namespace=None):
override_ns = f'{namespace}:' if namespace else ''
return reverse(f'{override_ns}analysis-chunking-configuration', kwargs={'pk': self.pk}, request=self._update_ns(request))

def get_groups(self):
groups = []
portfolio_groups = self.portfolio.groups.all()
Expand All @@ -307,18 +313,27 @@ def get_groups(self):
return groups

def get_num_events(self):
selected_strat = self.model.chunking_options.loss_strategy
dynamic_strat = self.model.chunking_options.chunking_types.DYNAMIC_CHUNKS
if selected_strat != dynamic_strat:
return 1

# Select Chunking opts
if self.chunking_options is None:
chunking_options = self.model.chunking_options
else:
chunking_options = self.chunking_options

# Esc if not DYNAMIC
DYNAMIC_CHUNKS = chunking_options.chunking_types.DYNAMIC_CHUNKS
if chunking_options.loss_strategy != DYNAMIC_CHUNKS:
return None

# Return num of selected User events from settings
analysis_settings = self.settings_file.read_json()
model_settings = self.model.resource_file.read_json()
user_selected_events = analysis_settings.get('event_ids', [])
selected_event_set = analysis_settings.get('model_settings', {}).get('event_set', "")
if len(user_selected_events) > 1:
return len(user_selected_events)

# Read event_set_size for model settings
event_set_options = model_settings.get('model_settings', {}).get('event_set').get('options', [])
event_set_sizes = {e['id']: e['number_of_events'] for e in event_set_options if 'number_of_events' in e}
if selected_event_set not in event_set_sizes:
Expand Down Expand Up @@ -412,8 +427,14 @@ def validate_run(self):
if not self.input_file:
errors['input_file'] = ['Must not be null']

# Get chunking options
if self.chunking_options is None:
chunking_options = self.model.chunking_options
else:
chunking_options = self.chunking_options

# Valadation for dyanmic loss chunks
if self.model.chunking_options.loss_strategy == self.model.chunking_options.chunking_types.DYNAMIC_CHUNKS:
if chunking_options.loss_strategy == chunking_options.chunking_types.DYNAMIC_CHUNKS:
if not self.model.resource_file:
errors['model_settings_file'] = ['Must not be null for Dynamic chunking']
elif self.settings_file:
Expand Down Expand Up @@ -771,7 +792,7 @@ def copy(self):
def delete_connected_files(sender, instance, **kwargs):
""" Post delete handler to clear out any dangaling analyses files
"""
files_for_removal = [
for_removal = [
'settings_file',
'input_file',
'input_generation_traceback_file',
Expand All @@ -782,8 +803,23 @@ def delete_connected_files(sender, instance, **kwargs):
'lookup_success_file',
'lookup_validation_file',
'summary_levels_file',
'chunking_options',
]
for ref in for_removal:
obj_ref = getattr(instance, ref)
if obj_ref:
obj_ref.delete()


@receiver(post_delete, sender=AnalysisTaskStatus)
def delete_connected_task_logs(sender, instance, **kwargs):
""" Post delete handler to clear out any dangaling log files
"""
for_removal = [
'output_log',
'error_log',
]
for ref in files_for_removal:
file_ref = getattr(instance, ref)
if file_ref:
file_ref.delete()
for ref in for_removal:
obj_ref = getattr(instance, ref)
if obj_ref:
obj_ref.delete()
14 changes: 13 additions & 1 deletion src/server/oasisapi/analyses/v2_api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from ...files.models import file_storage_link
from ...permissions.group_auth import verify_and_get_groups, validate_data_files


from ...schemas.serializers import (
GroupNameSerializer,
QueueNameSerializer,
Expand Down Expand Up @@ -95,6 +94,7 @@ class Meta:
run_traceback_file = serializers.SerializerMethodField(read_only=True)
run_log_file = serializers.SerializerMethodField(read_only=True)
storage_links = serializers.SerializerMethodField(read_only=True)
chunking_configuration = serializers.SerializerMethodField(read_only=True)

@swagger_serializer_method(serializer_or_field=serializers.URLField)
def get_input_file(self, instance):
Expand Down Expand Up @@ -156,6 +156,11 @@ def get_storage_links(self, instance):
request = self.context.get('request')
return instance.get_absolute_storage_url(request=request)

@swagger_serializer_method(serializer_or_field=serializers.URLField)
def get_chunking_configuration(self, instance):
request = self.context.get('request')
return instance.get_absolute_chunking_configuration_url(request=request)

@swagger_serializer_method(serializer_or_field=GroupNameSerializer)
def get_groups(self, instance):
return instance.get_groups()
Expand Down Expand Up @@ -206,6 +211,7 @@ class AnalysisSerializer(serializers.ModelSerializer):
run_traceback_file = serializers.SerializerMethodField()
run_log_file = serializers.SerializerMethodField()
storage_links = serializers.SerializerMethodField()
chunking_configuration = serializers.SerializerMethodField()
ns = 'v2-analyses'

# Groups - inherited from portfolio
Expand Down Expand Up @@ -244,6 +250,7 @@ class Meta:
'run_traceback_file',
'run_log_file',
'storage_links',
'chunking_configuration',
'lookup_chunks',
'analysis_chunks',
'sub_task_count',
Expand Down Expand Up @@ -314,6 +321,11 @@ def get_storage_links(self, instance):
request = self.context.get('request')
return instance.get_absolute_storage_url(request=request, namespace=self.ns)

@swagger_serializer_method(serializer_or_field=serializers.URLField)
def get_chunking_configuration(self, instance):
request = self.context.get('request')
return instance.get_absolute_chunking_configuration_url(request=request, namespace=self.ns)

@swagger_serializer_method(serializer_or_field=GroupNameSerializer)
def get_groups(self, instance):
return instance.get_groups()
Expand Down
56 changes: 32 additions & 24 deletions src/server/oasisapi/analyses/v2_api/task_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,7 @@ def generate_inputs(cls, analysis: 'Analysis', initiator: User, loc_lines: int)
from src.server.oasisapi.analyses.models import Analysis

# fetch the number of lookup chunks and store in analysis
if analysis.model.chunking_options.lookup_strategy == 'FIXED_CHUNKS':
num_chunks = min(analysis.model.chunking_options.fixed_lookup_chunks, loc_lines)
elif analysis.model.chunking_options.lookup_strategy == 'DYNAMIC_CHUNKS':
loc_lines_per_chunk = analysis.model.chunking_options.dynamic_locations_per_lookup
num_chunks = min(ceil(loc_lines / loc_lines_per_chunk), analysis.model.chunking_options.dynamic_chunks_max)
num_chunks = cls._get_inputs_generation_chunks(analysis, loc_lines)

run_data_uuid = uuid.uuid4().hex
statuses, tasks = cls.get_inputs_generation_tasks(analysis, initiator, run_data_uuid, num_chunks)
Expand All @@ -394,13 +390,22 @@ def generate_inputs(cls, analysis: 'Analysis', initiator: User, loc_lines: int)
return chain

@classmethod
def _get_inputs_generation_chunks(cls, analysis):
if analysis.model.chunking_options.lookup_strategy == 'FIXED_CHUNKS':
num_chunks = analysis.model.chunking_options.fixed_lookup_chunks
elif analysis.model.chunking_options.lookup_strategy == 'DYNAMIC_CHUNKS':
loc_lines = sum(1 for line in analysis.portfolio.location_file.read())
loc_lines_per_chunk = analysis.model.chunking_options.dynamic_locations_per_lookup
num_chunks = ceil(loc_lines / loc_lines_per_chunk)
def _get_inputs_generation_chunks(cls, analysis, loc_lines):
# loc_lines = sum(1 for line in analysis.portfolio.location_file.read())

# Get options
if analysis.chunking_options is not None:
chunking_options = analysis.chunking_options # Use options from Analysis
else:
chunking_options = analysis.model.chunking_options # Use defaults set on model

# Set chunks
if chunking_options.lookup_strategy == 'FIXED_CHUNKS':
num_chunks = min(chunking_options.fixed_lookup_chunks, loc_lines)
elif chunking_options.lookup_strategy == 'DYNAMIC_CHUNKS':
loc_lines_per_chunk = chunking_options.dynamic_locations_per_lookup
num_chunks = min(ceil(loc_lines / loc_lines_per_chunk), chunking_options.dynamic_chunks_max)

return num_chunks

@classmethod
Expand Down Expand Up @@ -512,13 +517,7 @@ def generate_losses(cls, analysis: 'Analysis', initiator: User, events_total: in
"""
from src.server.oasisapi.analyses.models import Analysis

# fetch number of event chunks
if analysis.model.chunking_options.loss_strategy == 'FIXED_CHUNKS':
num_chunks = analysis.model.chunking_options.fixed_analysis_chunks
elif analysis.model.chunking_options.loss_strategy == 'DYNAMIC_CHUNKS':
events_per_chunk = analysis.model.chunking_options.dynamic_events_per_analysis
num_chunks = min(ceil(events_total / events_per_chunk), analysis.model.chunking_options.dynamic_chunks_max)

num_chunks = cls._get_loss_generation_chunks(analysis, events_total)
run_data_uuid = uuid.uuid4().hex
statuses, tasks = cls.get_loss_generation_tasks(analysis, initiator, run_data_uuid, num_chunks)

Expand All @@ -538,11 +537,20 @@ def generate_losses(cls, analysis: 'Analysis', initiator: User, events_total: in
return chain

@classmethod
def _get_loss_generation_chunks(cls, analysis):
if analysis.model.chunking_options.loss_strategy == 'FIXED_CHUNKS':
num_chunks = analysis.model.chunking_options.fixed_analysis_chunks
elif analysis.model.chunking_options.loss_strategy == 'DYNAMIC_CHUNKS':
raise notimplementederror("FEATURE NOT AVALIBLE -- need event set size from worker")
def _get_loss_generation_chunks(cls, analysis, events_total):
# Get options
if analysis.chunking_options is not None:
chunking_options = analysis.chunking_options # Use options from Analysis
else:
chunking_options = analysis.model.chunking_options # Use defaults set on model

# fetch number of event chunks
if chunking_options.loss_strategy == 'FIXED_CHUNKS':
num_chunks = chunking_options.fixed_analysis_chunks
elif chunking_options.loss_strategy == 'DYNAMIC_CHUNKS':
events_per_chunk = chunking_options.dynamic_events_per_analysis
num_chunks = min(ceil(events_total / events_per_chunk), chunking_options.dynamic_chunks_max)

return num_chunks

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def test_cleaned_name_portfolio_and_model_are_present___object_is_created(self,
'task_finished': None,
'groups': [],
'analysis_chunks': None,
'chunking_configuration': 'http://testserver/v2/analyses/1/chunking_configuration/',
'lookup_chunks': None,
'priority': 4,
}, response.json)
Expand Down Expand Up @@ -233,6 +234,7 @@ def test_complex_model_file_present___object_is_created(self, name):
'task_started': None,
'task_finished': None,
'analysis_chunks': None,
'chunking_configuration': 'http://testserver/v2/analyses/1/chunking_configuration/',
'lookup_chunks': None,
'priority': 4,
}, response.json)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def test_state_is_ready___run_is_started(self, status, task_id):

with patch('src.server.oasisapi.analyses.models.celery_app_v2.send_task', new=mock_task):
analysis.run(initiator, version='v2')
mock_task.assert_called_once_with('start_loss_generation_task', (analysis.pk, initiator.pk, 1),
mock_task.assert_called_once_with('start_loss_generation_task', (analysis.pk, initiator.pk, None),
{}, queue='celery-v2', link_error=ANY, priority=4)

@given(
Expand Down
16 changes: 16 additions & 0 deletions src/server/oasisapi/analyses/v2_api/viewsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .serializers import AnalysisSerializer, AnalysisCopySerializer, AnalysisTaskStatusSerializer, \
AnalysisStorageSerializer, AnalysisListSerializer
from ...analysis_models.models import AnalysisModel
from ...analysis_models.v2_api.serializers import ModelChunkingConfigSerializer
from ...data_files.v2_api.serializers import DataFileSerializer
from ...files.serializers import RelatedFileSerializer
from ...files.views import handle_related_file, handle_json_data
Expand Down Expand Up @@ -268,6 +269,8 @@ def get_serializer_class(self):
return AnalysisStorageSerializer
elif self.action in self.file_action_types_with_settings_file:
return RelatedFileSerializer
elif self.action in ['chunking_configuration']:
return ModelChunkingConfigSerializer
else:
return Serializer

Expand Down Expand Up @@ -544,6 +547,19 @@ def sub_task_list(self, request, pk=None, version=None):
serializer = AnalysisTaskStatusSerializer(sub_task_queryset, many=True, context=context)
return Response(serializer.data)

@action(methods=['get', 'post'], detail=True)
def chunking_configuration(self, request, pk=None, version=None):
method = request.method.lower()
obj = self.get_object()
if method == 'get':
serializer = self.get_serializer(obj.chunking_options)
else:
serializer = self.get_serializer(obj.chunking_options, data=request.data)
serializer.is_valid(raise_exception=True)
obj.chunking_options = serializer.save()
obj.save()
return Response(serializer.data)


class AnalysisSettingsView(VerifyGroupAccessModelViewSet):
"""
Expand Down
Loading

0 comments on commit e0f832c

Please sign in to comment.