Skip to content

Commit

Permalink
DAG nabit!!!
Browse files Browse the repository at this point in the history
  • Loading branch information
seanchatmangpt committed Mar 24, 2024
1 parent d2eb614 commit 8458176
Show file tree
Hide file tree
Showing 34 changed files with 1,559 additions and 221 deletions.
2 changes: 1 addition & 1 deletion src/dspygen/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@


from dspygen.utils.file_tools import dspy_modules_dir
from dspygen.workflow.workflow_router import router as workflow_router

app = FastAPI()

Expand All @@ -16,7 +17,6 @@
import os

from dspygen.dsl.dsl_pipeline_executor import router as pipeline_router
from dspygen.experiments.control_flow.workflow_executor import router as workflow_router


app.include_router(pipeline_router)
Expand Down
7 changes: 5 additions & 2 deletions src/dspygen/dsl/dsl_step_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def execute(self):
with dspy.context(lm=lm_inst):
module_output = module_inst.forward(**rendered_args)

# Update the pipeline context with the output from this step
self.pipeline.context[self.step.module] = module_output

if self.step.rm_model:
# rm_inst = _get_retrieval_model_instance(self.pipeline, self.step)

Expand All @@ -51,8 +54,8 @@ def execute(self):
# Execute the module with the current context
module_output = module_inst.forward(**self.step.args)

# Update the pipeline context with the output from this step
self.pipeline.context[self.step.module] = module_output
# Update the pipeline context with the output from this step
self.pipeline.context[self.step.module] = module_output

return Munch(self.pipeline.context)

Expand Down
4 changes: 2 additions & 2 deletions src/dspygen/dsl/utils/dsl_rm_module_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ def _get_rm_module_instance(pipeline, rendered_args, step):
Uses the DSLModule class from dspygen.modules.dsl_module to handle modules defined in the pipeline YAML.
"""
# Get the file_path from the pipeline.context or rendered_args
file_path = pipeline.context.get("file_path", rendered_args.get("file_path"))
# file_path = pipeline.context.get("file_path", rendered_args.get("file_path"))

return DataRetriever(file_path=file_path, pipeline=pipeline, step=step, **rendered_args)
return DataRetriever(pipeline=pipeline, step=step, **rendered_args)


132 changes: 0 additions & 132 deletions src/dspygen/experiments/control_flow/dsl_control_flow_models.py

This file was deleted.

46 changes: 0 additions & 46 deletions src/dspygen/experiments/control_flow/workflow_executor.py

This file was deleted.

5 changes: 3 additions & 2 deletions src/dspygen/subcommands/wkf_cmd.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import typer

from dspygen.experiments.control_flow.dsl_control_flow_models import Workflow, execute_workflow
from dspygen.utils.cli_tools import chatbot
from dspygen.workflow.workflow_executor import execute_workflow
from dspygen.workflow.workflow_models import Workflow

app = typer.Typer(help="Language Workflow Domain Specific Language commands for DSPyGen.")


@app.command("run")
def run_workflow(yaml_file: str = typer.Argument("/Users/candacechatman/dev/dspygen/src/dspygen/experiments/control_flow/control_flow_workflow.yaml")):
def run_workflow(yaml_file: str = typer.Argument("/Users/candacechatman/dev/dspygen/src/dspygen/experiments/workflow/control_flow_workflow.yaml")):
"""
Run a workflow defined in a YAML file. Default is workflow.yaml
"""
Expand Down
File renamed without changes.
24 changes: 24 additions & 0 deletions src/dspygen/workflow/data_analysis_workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: DataAnalysisWorkflow
triggers: manual
imports:
- /Users/candacechatman/dev/dspygen/src/dspygen/workflow/data_preparation_workflow.yaml
jobs:
- name: AnalyzeData
runner: python
depends_on:
- PrepareData
steps:
- name: LoadFilteredData
code: |
import json
global filtered_data
with open(filtered_data_path, 'r') as f:
filtered_data = json.load(f)
env: {}

- name: CalculateAverage
code: |
average_value = sum(item['value'] for item in filtered_data) / len(filtered_data)
print(f'Average value: {average_value}')
env: {}
29 changes: 29 additions & 0 deletions src/dspygen/workflow/data_preparation_workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: DataPreparationWorkflow
triggers: manual
jobs:
- name: PrepareData
runner: python
steps:
- name: FilterData
code: |
raw_data = [
{'id': 1, 'value': 150},
{'id': 2, 'value': 90},
{'id': 3, 'value': 200},
{'id': 4, 'value': 30},
{'id': 5, 'value': 120}
]
filtered_data = [item for item in raw_data if item['value'] > 100]
env: {}

- name: SaveFilteredData
code: |
import json
import tempfile
_, path = tempfile.mkstemp(suffix='.json')
with open(path, 'w') as f:
json.dump(filtered_data, f)
print(f'Filtered data saved to {path}')
global filtered_data_path
filtered_data_path = path
env: {}
77 changes: 77 additions & 0 deletions src/dspygen/workflow/workflow_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import copy
from typing import Optional, Dict, Any
from dspygen.workflow.workflow_models import Workflow, Action, Job
from loguru import logger


def initialize_context(init_ctx: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Initializes the workflow context."""
return copy.deepcopy(init_ctx) if init_ctx else {}


def update_context(context: Dict[str, Any], updates: Dict[str, Any]) -> Dict[str, Any]:
"""Updates the workflow context with new values."""
# Create a copy of context with only python primitives
new_context = {k: v for k, v in context.items() if isinstance(v, (int, float, str, bool, list, dict))}

new_context = copy.deepcopy(new_context)

new_context.update(updates)

return new_context


def evaluate_condition(condition: str, context: Dict[str, Any]) -> bool:
"""Evaluates a condition within the current context."""
try:
safe_context = copy.deepcopy(context)
return eval(condition, {}, safe_context)
except Exception as e:
logger.error(f"Error evaluating condition '{condition}': {e}")
return False


def execute_job(job: Job, context: Dict[str, Any]) -> Dict[str, Any]:
"""Executes all actions within a job."""
logger.info(f"Executing job: {job.name}")
job_context = update_context(context, {}) # Isolate context for the job

for action in job.steps:
job_context = execute_action(action, job_context) # Execute each action

return job_context


def execute_action(action: Action, context: Dict[str, Any]) -> Dict[str, Any]:
"""Executes a single action, updating the context accordingly."""
logger.info(f"Executing action: {action.name}")

# Check for conditional execution
if action.cond and not evaluate_condition(action.cond.expr, context):
logger.info(f"Condition for action '{action.name}' not met, skipping.")
return context # Skip the action if condition not met

action_context = update_context(context, {})# Isolate context for the action

if action.code:
# Execute action's code, allowing it to modify the action-specific context
exec(action.code, action_context, action_context)
context = update_context(context, action_context) # Update global context with changes

return context


def execute_workflow(workflow: Workflow, init_ctx: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Executes all jobs defined in a workflow."""
logger.info(f"Executing workflow: {workflow.name}")
global_context = initialize_context(init_ctx) # Initialize global context

workflow.process_imports()
workflow.topological_sort()

for job in workflow.jobs:
global_context = execute_job(job, global_context) # Execute each job

del global_context['__builtins__'] # Remove builtins from context

return global_context
Loading

0 comments on commit 8458176

Please sign in to comment.