Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1st draft for Python Measurement Writer. #1291

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions lang/python/ecalhdf5/ecal/measurement/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# ========================= eCAL LICENSE =================================
#
# Copyright (C) 2016 - 2019 Continental Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ========================= eCAL LICENSE =================================

import ecal.measurement.hdf5 as ecalhdf5

import ecal.proto.helper as pb_helper

from enum import Enum

def combine_encoding_and_type(encoding, type):
return encoding + ":" + type

class ProtobufChannelWriter:
def __init__(self, channel_name: str, protobuf_type, meas: ecalhdf5.Meas):
self._protobuf_type = protobuf_type
self._binary_writer = BinaryChannelWriter(channel_name, meas=meas, encoding = self.encoding, type = self.type, descriptor = self.descriptor)

def write_entry(self, data, rcv_timestamp: float, snd_timestamp: float):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def write_entry(self, data, rcv_timestamp: float, snd_timestamp: float):
def write_entry(self, data, rcv_timestamp: int, snd_timestamp: int):

self._binary_writer.write_entry(data.SerializeToString(), rcv_timestamp, snd_timestamp)

@property
def name(self):
return self._channel_name

@property
def descriptor(self):
return pb_helper.get_descriptor_from_type(self._protobuf_type)

@property
def encoding(self):
return "proto"

@property
def type(self):
return self._protobuf_type.DESCRIPTOR.full_name

class BinaryChannelWriter:
def __init__(self, channel_name: str, meas: ecalhdf5.Meas, encoding = "", type = "", descriptor = ""):
self._meas = meas
self._channel_name = channel_name
self._encoding = encoding
self._type = type
self._descriptor = descriptor

self._meas.set_channel_description(channel_name, descriptor)
self._meas.set_channel_type(channel_name, combine_encoding_and_type(encoding, type))

def write_entry(self, data, rcv_timestamp: float, snd_timestamp: float):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def write_entry(self, data, rcv_timestamp: float, snd_timestamp: float):
def write_entry(self, data, rcv_timestamp: int, snd_timestamp: int):

self._meas.add_entry_to_file(data, rcv_timestamp, snd_timestamp, self._channel_name)

@property
def name(self):
return self._channel_name

@property
def encoding(self):
return self._encoding

@property
def type(self):
return self._type

@property
def descriptor(self):
return self._descriptor


class Encoding(Enum):
BINARY = 0
PROTOBUF = 1

class MeasurementWriter:
def __init__(self, output_dir: str, file_name: str, max_size_per_file: int):
self._output_dir = output_dir
self._file_name = file_name
self._max_size_per_file = max_size_per_file
self._channels = {}

self._meas = ecalhdf5.Meas(output_dir, 1)
self._meas.set_file_base_name(file_name)
self._meas.set_max_size_per_file(max_size_per_file)

def create_channel(self, channel_name: str, channel_encoding: Encoding, datatype):
if channel_encoding == Encoding.PROTOBUF:
channel = ProtobufChannelWriter(channel_name = channel_name, protobuf_type = datatype, meas = self._meas)
self._channels[channel_name] = channel
return channel

def __del__(self):
self._meas.close()
Loading