-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipeline_classes.py
143 lines (125 loc) · 5.03 KB
/
pipeline_classes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from pipeline_stage import PipelineStage
import app
import os
from sagemaker.inputs import TrainingInput
from sagemaker.processing import FrameworkProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.pytorch import PyTorch
class ProcessingStage(PipelineStage):
def set_inputs(self):
self.inputs = [
ProcessingInput(
source=self.config.data_directory.dataset,
destination=f"{self.sagemaker_root}/{self.config.pipeline_name}/input/",
input_name=self.config.processing_input.input_name,
s3_data_type=self.config.s3_data_type,
s3_input_mode=self.config.s3_input_mode,
s3_data_distribution_type=self.config.s3_data_distribution_type,
)
]
def set_outputs(self):
self.outputs = [
ProcessingOutput(
source=f"{self.sagemaker_root}/{self.config.pipeline_name}/output/",
destination=self.config.data_directory.preprocessed,
output_name=self.config.processing_output.output_name,
s3_upload_mode=self.config.s3_upload_mode,
)
]
def set_processor(self):
self.processor = FrameworkProcessor(
estimator_cls=PyTorch,
framework_version=self.config.processor.framework_version,
py_version=self.config.processor.py_version,
role=self.config.role,
instance_count=self.config.processor.instance_count,
instance_type=self.config.processor.instance_type,
max_runtime_in_seconds=self.config.processor.max_runtime_in_seconds,
)
def create_stage(self):
self.set_inputs()
self.set_outputs()
self.set_processor()
processing_step = ProcessingStep(
name=self.config.step_name,
code=self.config.code,
source_dir=str(self.stage_dir),
inputs=self.inputs,
outputs=self.outputs,
processor=self.processor,
)
return processing_step
class TrainingStage(PipelineStage):
def set_inputs(self):
self.inputs = {
self.config.data_directory.preprocessed: TrainingInput(
s3_data=app.processing.properties.ProcessingOutputConfig.Outputs[
self.config.data_directory.preprocessed
].S3Output.S3Uri,
content_type='application/json',
),
}
def set_estimator(self):
hyperparameters = {
"model_type": self.config.model_type,
"epochs": self.config.epochs,
"batch_size": self.config.batch_size,
"loss": self.config.loss,
"metrics": self.config.metrics,
"optimizer_type": self.config.optimizer_type,
"optimizer_lr": self.config.optimizer_lr,
}
self.estimator = PyTorch(
entry_point=os.path.join(self.stage_dir, "stage.py"),
framework_version=self.config.framework_version,
py_version=self.config.py_version,
instance_type=self.config.instance_type,
instance_count=self.config.instance_count,
role=self.config.role,
script_mode=True,
hyperparameters=hyperparameters
)
def create_stage(self):
self.set_inputs()
self.set_estimator()
training_step = TrainingStep(
name=self.config.step_name,
estimator=self.estimator,
inputs=self.inputs
)
return training_step
class DeploymentStage(PipelineStage):
def set_inputs(self):
self.inputs = [
ProcessingInput(
source=app.training.properties.ModelArtifacts.S3ModelArtifacts,
destination=f"{self.sagemaker_root}/{self.config.pipeline_name}/model/",
input_name="deploy_model_input",
s3_data_type=self.config.s3_data_type,
s3_input_mode=self.config.s3_input_mode,
s3_data_distribution_type=self.config.s3_data_distribution_type,
),
]
def set_processor(self):
self.processor = FrameworkProcessor(
estimator_cls=PyTorch,
framework_version=self.config.processor.framework_version,
py_version=self.config.processor.py_version,
role=self.config.role,
instance_count=self.config.processor.instance_count,
instance_type=self.config.processor.instance_type,
max_runtime_in_seconds=self.config.processor.max_runtime_in_seconds,
)
def create_stage(self):
self.set_inputs()
self.set_outputs()
self.set_processor()
deployment_step = ProcessingStep(
name=self.config.step_name,
code=self.config.code,
source_dir=str(self.stage_dir),
inputs=self.inputs,
outputs=self.outputs,
processor=self.processor,
)
return deployment_step