Skip to content

Commit

Permalink
add more details to python scalar example
Browse files Browse the repository at this point in the history
  • Loading branch information
atimin committed Jul 12, 2023
1 parent 0248d35 commit a97f651
Showing 1 changed file with 30 additions and 11 deletions.
41 changes: 30 additions & 11 deletions python/examples/payload_with_scalars.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,28 @@

IMG_PATH = Path(__file__).parent / "pandas.jpg"

if __name__ == "__main__":

def create_scalar_package() -> bytes:
global original
pb_time = Timestamp()
pb_time.GetCurrentTime()

DATA = np.array([0, 10.0, 0.3, 1], dtype=np.float32)
NAMES = ["x0", "x1", "x2", "x3"]

# Create a package and serialize it
original = DriftPackage()
original.id = pb_time.ToMilliseconds()
original.source_timestamp.CopyFrom(pb_time)
original.publish_timestamp.CopyFrom(pb_time)
original.status = StatusCode.GOOD

# Fill meta data
info = ScalarValuesInfo()
for name in NAMES:
var = ScalarValuesInfo.VariableInfo()
var.name = name
var.status = StatusCode.GOOD
info.variables.append(var)

original.meta.type = MetaInfo.SCALAR_VALUES
original.meta.scalar_info.CopyFrom(info)

# Put data in buffer without decomposition and compression
buffer = WaveletBuffer(
signal_shape=[len(DATA)],
Expand All @@ -43,27 +40,49 @@
wavelet_type=WaveletType.NONE,
)
buffer.decompose(DATA, denoise.Null())

# Prepare payload
payload = DataPayload()
payload.data = buffer.serialize(compression_level=0)

msg = Any()
msg.Pack(payload)
original.data.append(msg)

label = DriftPackage.Label()
label.key = "host_name"
label.value = "my_host"
original.labels.append(label)

label = DriftPackage.Label()
label.key = "topic"
label.value = "opcua"
original.labels.append(label)

# Serialize package to message
message = original.SerializeToString()
return original.SerializeToString()


def parse_scalar_package(message: bytes) -> None:
# Parse the package
new_pacakge = DriftPackage()
new_pacakge.ParseFromString(message)
print(
f"Package ID={new_pacakge.id} type={MetaInfo.DataType.Name(new_pacakge.meta.type)}"
)

print("Labels:")
for label in new_pacakge.labels:
print(f"{label.key}={label.value}")

payload = DataPayload()
new_pacakge.data[0].Unpack(payload)

buffer = WaveletBuffer.parse(payload.data)
print(f"WaveletBuffer {buffer.compose()}")
values = buffer.compose()

print("Values:")
for i, var in enumerate(new_pacakge.meta.scalar_info.variables):
print(f"{var.name}=value={values[i]}, status={var.status}")


if __name__ == "__main__":
message = create_scalar_package()
parse_scalar_package(message)

0 comments on commit a97f651

Please sign in to comment.