-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpredict.py
133 lines (114 loc) · 5.38 KB
/
predict.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kfp.v2.dsl import Dataset, Input, Output, Model, component
from pipelines.kfp_components.dependencies import PYTHON37, TENSORFLOW, PANDAS
@component(base_image=PYTHON37, packages_to_install=[TENSORFLOW, PANDAS])
def predict_tensorflow_model(
input_data: Input[Dataset],
model: Input[Model],
predictions: Output[Dataset],
label_column_name: str = None,
predictions_column_name: str = "predictions",
file_pattern: str = None,
) -> None:
"""Make predictions using a trained Tensorflow Keras model.
Args:
input_data (Input[Dataset]): Input data as kfp's Dataset object.
Attribute .path is the GCS location for a single csv file.
model (Input[Model]): Trained model as a kfp Model object.
Attribute .path is the GCS location for all model files
(including the main protobuf file)
predictions (Output[Dataset]): Output data with predictions as kfp's
Dataset object.
Attribute .path is the GCS location for a single csv file.
label_column_name (str): Name of column containing the labels. Defaults to None
predictions_column_name (str): Name of column in which to save the predicted
labels. Defaults to "predictions".
file_pattern (str): Read data from one or more files. If empty,
then prediction data is read from single file.
For multiple files, use a pattern e.g. "files-*.csv".
Returns:
None
"""
import logging
import tensorflow as tf
import pandas
from typing import Iterator
from pathlib import Path
logging.getLogger().setLevel(logging.INFO)
def read_files(
path: Path, file_pattern: str = None, **kwargs
) -> Iterator[pandas.DataFrame]:
"""
Read from one or multiple files using `pandas.read_csv`. Provide a
file pattern to read from multiple files e.g. "files-*.csv".
Args:
path (Path): Path of single file or folder containing multiple files.
file_pattern (str): If path points to single file, don't provide a pattern.
Otherwise e.g. "files-*.csv".
**kwargs: Additional keyword-arguments for `pandas.read_csv`.
Returns:
Iterator[pandas.DataFrame]: Iterator of Pandas DataFrames.
"""
paths = [path]
if file_pattern:
logging.info(f"Searching files with pattern {file_pattern} in {path}")
paths = list(path.glob(file_pattern))
logging.info(f"Found {len(paths)} files")
if len(paths) == 0:
raise RuntimeError("No input files found!")
for p in paths:
logging.info(f"Reading file: {p}")
yield pandas.read_csv(p, **kwargs)
logging.info("Finished reading files")
def create_dataset(input_data: Path, file_pattern: str = "") -> tf.data.Dataset:
"""Create a TF Dataset from input csv files
Args:
input_data (Path): Train/Valid data in CSV format
file_pattern (str): Read data from one or more files. If empty, then
prediction data is read from single file.
For multiple files, use a pattern e.g. "files-*.csv".
Returns:
dataset: TF dataset where each element is a (features, labels)
tuple that corresponds to a batch of CSV rows
"""
if file_pattern:
input_data = input_data / file_pattern
# Apply data sharding: Sharded elements are produced by the dataset
# Each worker will process the whole dataset and discard the portion that is
# not for itself. Note that for this mode to correctly partitions the dataset
# elements, the dataset needs to produce elements in a deterministic order.
data_options = tf.data.Options()
data_options.experimental_distribute.auto_shard_policy = (
tf.data.experimental.AutoShardPolicy.DATA
)
created_dataset = tf.data.experimental.make_csv_dataset(
file_pattern=str(input_data),
batch_size=100,
label_name=label_column_name,
num_epochs=1,
shuffle=False,
num_parallel_reads=1,
)
return created_dataset.with_options(data_options)
logging.info(f"Read data from {input_data.path}")
df = pandas.concat(read_files(Path(input_data.path), file_pattern))
logging.info("Create TF Dataset for prediction input")
pred_input = create_dataset(Path(input_data.path), file_pattern)
logging.info(f"Load model from {model.path}")
tf_model = tf.keras.models.load_model(model.path)
logging.info("Predict...")
df[predictions_column_name] = tf_model.predict(pred_input)
logging.info(f"Save predictions to {predictions.path}")
df.to_csv(predictions.path, sep=",", header=True, index=False)