diff --git a/skyvern/services/task_v2_service.py b/skyvern/services/task_v2_service.py index eb52189820..1b1c839b07 100644 --- a/skyvern/services/task_v2_service.py +++ b/skyvern/services/task_v2_service.py @@ -251,6 +251,26 @@ async def initialize_task_v2( ) raise + current_run_id = context.run_id if context and context.run_id else task_v2.observer_cruise_id + enable_current_url_validation = await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached( + "ENABLE_TASKV2_METADATA_CURRENT_URL_VALIDATION", + current_run_id, + properties={"organization_id": task_v2.organization_id}, + ) + + if not enable_current_url_validation: + # If not in experiment, initialize metadata immediately using the old prompt as before + task_v2 = await initialize_task_v2_metadata( + organization=organization, + task_v2=task_v2, + workflow=new_workflow, + workflow_run=workflow_run, + user_prompt=user_prompt, + current_browser_url=None, + user_url=user_url, + enable_current_url_validation=False, + ) + return task_v2 @@ -262,7 +282,7 @@ async def initialize_task_v2_metadata( user_prompt: str | None, current_browser_url: str | None, user_url: str | None, - current_run_id: str, + enable_current_url_validation: bool, ) -> TaskV2: thought = await app.DATABASE.create_thought( task_v2_id=task_v2.observer_cruise_id, @@ -271,12 +291,6 @@ async def initialize_task_v2_metadata( thought_scenario=ThoughtScenario.generate_metadata, ) - enable_current_url_validation = await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached( - "ENABLE_TASKV2_METADATA_CURRENT_URL_VALIDATION", - current_run_id, - properties={"organization_id": task_v2.organization_id}, - ) - if enable_current_url_validation: metadata_prompt = prompt_engine.load_prompt( "task_v2_generate_metadata_with_current_url", @@ -578,16 +592,23 @@ async def run_task_v2_helper( if page: current_url = await SkyvernFrame.get_url(page) - task_v2 = await initialize_task_v2_metadata( - task_v2=task_v2, - workflow=workflow, - workflow_run=workflow_run, - organization=organization, - user_prompt=task_v2.prompt, - current_browser_url=current_url, - user_url=task_v2.url, - current_run_id=current_run_id, + metadata_thoughts = await app.DATABASE.get_thoughts( + task_v2_id=task_v2_id, + thought_types=[ThoughtType.metadata], + organization_id=organization_id ) + # No metadata thoughts means it was skipped in initialize_task_v2 and should be done here + if not metadata_thoughts: + task_v2 = await initialize_task_v2_metadata( + task_v2=task_v2, + workflow=workflow, + workflow_run=workflow_run, + organization=organization, + user_prompt=task_v2.prompt, + current_browser_url=current_url, + user_url=task_v2.url, + enable_current_url_validation=True, + ) url = str(task_v2.url) max_steps = int_max_steps_override or settings.MAX_STEPS_PER_TASK_V2 @@ -1177,7 +1198,8 @@ async def _generate_loop_task( raise Exception("extraction_block failed") # validate output parameter try: - output_value_obj: dict[str, Any] = extraction_block_result.output_parameter_value.get("extracted_information") # type: ignore + output_value_obj: dict[str, Any] = extraction_block_result.output_parameter_value.get( + "extracted_information") # type: ignore if not output_value_obj or not isinstance(output_value_obj, dict): raise Exception("Invalid output parameter of the extraction block for the loop task") if loop_values_key not in output_value_obj: @@ -1280,8 +1302,8 @@ async def _generate_loop_task( ) if data_extraction_goal and navigation_goal: navigation_goal = ( - navigation_goal - + " Optimize for extracting as much data as possible. Complete when most data is seen even if some data is partially missing." + navigation_goal + + " Optimize for extracting as much data as possible. Complete when most data is seen even if some data is partially missing." ) block_yaml = TaskBlockYAML( label=task_in_loop_label, @@ -1660,9 +1682,9 @@ def _get_extracted_data_from_block_result( """ if task_type == "extract": if ( - isinstance(block_result.output_parameter_value, dict) - and "extracted_information" in block_result.output_parameter_value - and block_result.output_parameter_value["extracted_information"] + isinstance(block_result.output_parameter_value, dict) + and "extracted_information" in block_result.output_parameter_value + and block_result.output_parameter_value["extracted_information"] ): return block_result.output_parameter_value["extracted_information"] elif task_type == "loop":