diff --git a/.github/actions/docker-build-push-action/action.yml b/.github/actions/docker-build-push-action/action.yml index 9a96ca3..d2de51f 100644 --- a/.github/actions/docker-build-push-action/action.yml +++ b/.github/actions/docker-build-push-action/action.yml @@ -63,6 +63,10 @@ inputs: description: List of output destinations (format type=local,dest=path) required: false default: '' + platform: + description: "Target platform for the build (e.g., linux/amd64, linux/arm64)" + required: false + default: '' runs: using: composite @@ -94,3 +98,4 @@ runs: target: ${{ inputs.target }} provenance: ${{ inputs.provenance }} outputs: ${{ inputs.outputs }} + platforms: ${{ inputs.platform }} diff --git a/.github/workflows/build-proto-image.yml b/.github/workflows/build-proto-image.yml index 8e7013b..89db5ac 100644 --- a/.github/workflows/build-proto-image.yml +++ b/.github/workflows/build-proto-image.yml @@ -3,13 +3,104 @@ name: Proto Build Workflow on: - workflow_dispatch: + push: + branches: + - "main" + tags: + # semver, e.g. 1.2.0 (does not match 0.1.2) + - "[1-9]+.[0-9]+.[0-9]+" + # semver with prerelease info, e.g. 1.0.2-beta.1 or 1.2.3-rc.10 + - "[1-9]+.[0-9]+.[0-9]+-[a-z]+.[0-9]+" + # do not match prerelease starting w/ 0, e.g. 1.0.2-beta.0 or 1.2.3-rc.01 + - "![1-9]+.[0-9]+.[0-9]+-[a-z]+.[0]*" + # semver with date info, e.g. 1.0.2-20221125 + - "[1-9]+.[0-9]+.[0-9]+-[0-9]+" + # do not match date starting w/ 0, e.g. 1.0.2-01232023 + - "![1-9]+.[0-9]+.[0-9]+-[0]*" pull_request: + paths-ignore: + # do not run if only markdown or other administrative files have been updated + - "*.md" + - "CODEOWNERS" + - "LICENSE" branches: - "main" jobs: - dummy: + build_proto_image: + name: Build Proto Image runs-on: ubuntu-latest + steps: - - run: echo "Placeholder workflow" + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Create image metadata + id: meta + uses: ./.github/actions/metadata-action + with: + repository: axisecp/acap-runtime + get_version: "true" + + - name: Build Proto image + uses: ./.github/actions/docker-build-push-action + with: + dockerfile: ./Dockerfile.proto + push: false + load: true + tags: axisecp/acap-runtime:${{ steps.meta.outputs.version }}-protofiles + use_qemu: true + platform: linux/arm64 + + - name: Extract proto files + run: | + container_id=$(docker create --platform linux/arm64 axisecp/acap-runtime:${{ steps.meta.outputs.version }}-protofiles) + docker cp $container_id:/build/param/proto_utils ./proto_utils_param + docker cp $container_id:/build/vdo/proto_utils ./proto_utils_vdo + docker cp $container_id:/build/tf/proto_utils ./proto_utils_tf + docker rm $container_id + + - name: Verify proto files + run: | + echo "Verifying proto files..." + + # Check param proto files + if [ "$(ls -A ./proto_utils_param)" ]; then + echo "param proto files found:" + ls -l ./proto_utils_param + else + echo "Error: param proto files are missing" + exit 1 + fi + + # Check vdo proto files + if [ "$(ls -A ./proto_utils_vdo)" ]; then + echo "vdo proto files found:" + ls -l ./proto_utils_vdo + else + echo "Error: vdo proto files are missing" + exit 1 + fi + + # Check tf proto files + if [ "$(ls -A ./proto_utils_tf)" ]; then + echo "tf proto files found:" + ls -l ./proto_utils_tf + else + echo "Error: tf proto files are missing" + exit 1 + fi + + echo "All proto files verified successfully" + + - name: Push Proto image + if: success() + uses: ./.github/actions/docker-build-push-action + with: + dockerfile: ./Dockerfile.proto + push: true + platform: linux/arm64 + use_qemu: true + tags: axisecp/acap-runtime:${{ steps.meta.outputs.version }}-protofiles + registry_user: ${{ secrets.ECOSYSTEM_SERVICE_USER_DOCKER_HUB }} + registry_token: ${{ secrets.ECOSYSTEM_ACCESS_TOKEN_DOCKER_HUB }} diff --git a/.gitignore b/.gitignore index f6d5e72..317211d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ .github/test/__pycache__/ apis/ !apis/keyvaluestore.proto +!apis/wrappers build/ build_x86_64-linux-gnu/ larod/ diff --git a/Dockerfile.proto b/Dockerfile.proto new file mode 100644 index 0000000..b8cc035 --- /dev/null +++ b/Dockerfile.proto @@ -0,0 +1,85 @@ +# syntax=docker/dockerfile:1 + +ARG UBUNTU_VERSION=22.04 +ARG TFSERVING_VERSION=2.9.0 +ARG GRPC_VERSION=1.46.3 +ARG PROTOBUF_VERSION=4.21.1 +ARG SIX_VERSION=1.16.0 +ARG GRPCIO_TOOLS_VERSION=1.47.0 + +# Build image, generates proto files +FROM arm64v8/ubuntu:${UBUNTU_VERSION} AS build-image + +ARG TFSERVING_VERSION +ARG GRPC_VERSION +ARG PROTOBUF_VERSION +ARG SIX_VERSION +ARG GRPCIO_TOOLS_VERSION + +RUN < /build/requirements.txt diff --git a/README.md b/README.md index f5d996d..adbcd02 100755 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ If you are new to the world of ACAPs take a moment to check out - [gRPC socket](#grpc-socket) - [Examples](#examples) - [Building ACAP Runtime](#building-acap-runtime) +- [Building protofiles for Python](#building-protofiles-for-python) - [Test suite](#test-suite) - [Contributing](#contributing) - [License](#license) @@ -234,6 +235,16 @@ docker buildx build --file Dockerfile --build-arg ARCH= --tag acap-runtime where `` is either `armv7hf` or `aarch64`. +## Building protofiles for Python + +The repository includes a Dockerfile (`Dockerfile.proto`) for building the APIs protofiles for Python. The Dockerfile generates the necessary Python files from the protobuf definitions, allowing gRPC communication with the ACAP Runtime service. This means that applications can copy these prebuilt files from ACAP Runtime container image instead of having to build the protofiles themselves. + +To build the protofiles: + +```sh +docker build -f Dockerfile.proto -t acap-runtime-proto:latest . +``` + ## Test suite The repo contains a test suite project to verify that ACAP Runtime works as expected diff --git a/apis/keyvaluestore.proto b/apis/keyvaluestore.proto index ab6096a..d86f230 100644 --- a/apis/keyvaluestore.proto +++ b/apis/keyvaluestore.proto @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/apis/wrappers/tf_proto_utils.py b/apis/wrappers/tf_proto_utils.py new file mode 100644 index 0000000..b0cf044 --- /dev/null +++ b/apis/wrappers/tf_proto_utils.py @@ -0,0 +1,173 @@ +''' + Copyright (C) 2020 Axis Communications AB, Lund, Sweden + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +''' + +import numpy as np +import time +from grpc import insecure_channel as grpc_insecure_channel +from tensorflow_serving.apis import predict_pb2, prediction_service_pb2_grpc +from tensorflow.core.framework import types_pb2, tensor_shape_pb2, tensor_pb2 +from tensorflow.core.protobuf import meta_graph_pb2 + + +def make_tensor_proto(values): + if not isinstance(values, (np.ndarray, np.generic)): + values = np.array(values) + if values.dtype == np.float64: + values = values.astype(np.float32) + shape = values.shape + dtype = NP_TO_PB[values.dtype.type] + dims = [tensor_shape_pb2.TensorShapeProto.Dim(size=size) for size in shape] + tensor_shape_proto = tensor_shape_pb2.TensorShapeProto(dim=dims) + return tensor_pb2.TensorProto(dtype=dtype, tensor_shape=tensor_shape_proto, tensor_content=values.tostring()) + + +def make_ndarray(proto): + shape = [d.size for d in proto.tensor_shape.dim] + num_elements = np.prod(shape, dtype=np.int64) + np_dtype = PB_TO_NP[proto.dtype] + if proto.tensor_content: + return np.frombuffer(proto.tensor_content, dtype=np_dtype).copy().reshape(shape) + + if proto.dtype == types_pb2.DT_HALF: + if len(proto.half_val) == 1: + tmp = np.array(proto.half_val[0], dtype=np.uint16) + tmp.dtype = np_dtype + return np.repeat(tmp, num_elements).reshape(shape) + tmp = np.fromiter(proto.half_val, dtype=np.uint16) + tmp.dtype = np_dtype + return tmp.reshape(shape) + + if proto.dtype == types_pb2.DT_STRING: + if len(proto.string_val) == 1: + return np.repeat(np.array(proto.string_val[0], dtype=np_dtype), num_elements).reshape(shape) + return np.array([x for x in proto.string_val], dtype=np_dtype).reshape(shape) + + if proto.dtype == types_pb2.DT_COMPLEX64 or np_dtype == types_pb2.DT_COMPLEX128: + if proto.dtype == types_pb2.DT_COMPLEX64: + proto_value = proto.scomplex_val + else: + proto_value = proto.dcomplex_val + it = iter(proto_value) + if len(proto_value) == 2: + return np.repeat(np.array(complex(proto_value[0], proto_value[1]), dtype=np_dtype), num_elements).reshape(shape) + return np.array([complex(x[0], x[1]) for x in zip(it, it)], dtype=np_dtype).reshape(shape) + + proto_value = None + if proto.dtype == types_pb2.DT_FLOAT: + proto_value = proto.float_val + elif proto.dtype == types_pb2.DT_DOUBLE: + proto_value = proto.double_val + elif proto.dtype in [types_pb2.DT_INT32, types_pb2.DT_UINT8, types_pb2.DT_UINT16, types_pb2.DT_INT16, types_pb2.DT_INT8]: + proto_value = proto.int_val + elif proto.dtype == types_pb2.DT_INT64: + proto_value = proto.int64_val + elif proto.dtype == types_pb2.DT_BOOL: + proto_value = proto.bool_val + if proto_value is not None: + if len(proto.float_val) == 1: + return np.repeat(np.array(proto.float_val[0], dtype=np_dtype), num_elements).reshape(shape) + return np.fromiter(proto.float_val, dtype=np_dtype).reshape(shape) + + raise TypeError("Unsupported type: %s" % proto.dtype) + + +def build_signature_def(inputs=None, outputs=None, method_name=None): + signature_def = meta_graph_pb2.SignatureDef() + if inputs is not None: + for item in inputs: + signature_def.inputs[item].CopyFrom(inputs[item]) + if outputs is not None: + for item in outputs: + signature_def.outputs[item].CopyFrom(outputs[item]) + if method_name is not None: + signature_def.method_name = method_name + return signature_def + + +PB_TO_NP = { + types_pb2.DT_HALF: np.float16, + types_pb2.DT_FLOAT: np.float32, + types_pb2.DT_DOUBLE: np.float64, + types_pb2.DT_INT32: np.int32, + types_pb2.DT_UINT8: np.uint8, + types_pb2.DT_UINT16: np.uint16, + types_pb2.DT_UINT32: np.uint32, + types_pb2.DT_UINT64: np.uint64, + types_pb2.DT_INT16: np.int16, + types_pb2.DT_INT8: np.int8, + types_pb2.DT_STRING: object, + types_pb2.DT_COMPLEX64: np.complex64, + types_pb2.DT_COMPLEX128: np.complex128, + types_pb2.DT_INT64: np.int64, + types_pb2.DT_BOOL: bool, +} + +NP_TO_PB = { + np.float16: types_pb2.DT_HALF, + np.float32: types_pb2.DT_FLOAT, + np.float64: types_pb2.DT_DOUBLE, + np.int32: types_pb2.DT_INT32, + np.uint8: types_pb2.DT_UINT8, + np.uint16: types_pb2.DT_UINT16, + np.uint32: types_pb2.DT_UINT32, + np.uint64: types_pb2.DT_UINT64, + np.int16: types_pb2.DT_INT16, + np.int8: types_pb2.DT_INT8, + object: types_pb2.DT_STRING, + np.complex64: types_pb2.DT_COMPLEX64, + np.complex128: types_pb2.DT_COMPLEX128, + np.int64: types_pb2.DT_INT64, + bool: types_pb2.DT_BOOL, +} + +RPC_TIMEOUT = 120.0 + + +class InferenceClient: + def __init__(self, host, port=0): + if port==0: + #This will use unix socket domain + channel = grpc_insecure_channel(host) + else: + channel = grpc_insecure_channel(host + ':' + str(port)) + self.stub = prediction_service_pb2_grpc.PredictionServiceStub(channel) + + + def infer(self, inputs, model_name, model_version=None, outputs=[]): + request = predict_pb2.PredictRequest() + request.model_spec.name = model_name + + if model_version is not None: + request.model_spec.version.value = model_version + + for input_name, input_data in inputs.items(): + request.inputs[input_name].CopyFrom(make_tensor_proto(np.stack(input_data, axis=0))) + + try: + t0 = time.time() + result = self.stub.Predict(request, timeout=RPC_TIMEOUT, wait_for_ready=True) + t1 = time.time() + print(f'Time for call to inference-server: {1000 * (t1 - t0):.0f} ms') + except Exception as exc: + print(exc) + return False, {} + if not outputs: + output_data = {output_key: make_ndarray(tensor) for output_key, tensor in result.outputs.items()} + else: + output_data = {} + for output_key in outputs: + output_data[output_key] = make_ndarray(result.outputs[output_key]) + return True, output_data diff --git a/apis/wrappers/vdo_proto_utils.py b/apis/wrappers/vdo_proto_utils.py new file mode 100644 index 0000000..55da078 --- /dev/null +++ b/apis/wrappers/vdo_proto_utils.py @@ -0,0 +1,78 @@ +import grpc +import sys +import numpy as np +import cv2 +import videocapture_pb2 as vcap +import videocapture_pb2_grpc as vcap_grpc + +RPC_TIMEOUT = 300 +class VideoCaptureClient: + def __init__(self, socket, stream_width, stream_height, stream_framerate): + self.stream_width = stream_width + self.stream_height = stream_height + self.stream_framerate = stream_framerate + self.grpc_socket = socket + self.capture_client = None + self.stream_id = None + self.setup_grpc_channel() + self.create_video_stream() + + def setup_grpc_channel(self): + print("Setting up GRPC channel") + grpc_channel = grpc.insecure_channel(self.grpc_socket) + try: + grpc.channel_ready_future(grpc_channel).result(timeout=RPC_TIMEOUT) + except grpc.FutureTimeoutError: + print("Error connecting to gRPC server: Timed out") + sys.exit(1) + except Exception as e: + print(f"Error connecting to gRPC server: {e}") + sys.exit(1) + + self.capture_client = vcap_grpc.VideoCaptureStub(grpc_channel) + + def create_video_stream(self): + print("Setting up video stream request") + stream_request = vcap.NewStreamRequest( + settings=vcap.StreamSettings( + format=vcap.VDO_FORMAT_YUV, + width=self.stream_width, + height=self.stream_height, + framerate=self.stream_framerate, + timestamp_type=vcap.VDO_TIMESTAMP_UTC + ) + ) + print("Requesting video stream") + try: + response = self.capture_client.NewStream(stream_request) + self.stream_id = response.stream_id + except Exception as e: + print(f'Could not create stream: {e}') + sys.exit(1) + + def get_frame(self): + frame_request = vcap.GetFrameRequest( + frame_reference=0, # Latest frame + stream_id=self.stream_id + ) + try: + response = self.capture_client.GetFrame(frame_request) + except Exception as e: + print(f"Could not download latest frame: {e}") + return None + + # Convert YUV image to RGB + data = response.data + yuv_image_buffer = np.frombuffer(data, dtype='uint8').reshape(self.stream_height + self.stream_height // 2, self.stream_width) + rgb_image = cv2.cvtColor(yuv_image_buffer, cv2.COLOR_YUV2RGB_NV12) + return rgb_image + + def __del__(self): + print("Cleaning up video stream") + if self.capture_client is not None and self.stream_id is not None: + try: + request = vcap.DeleteStreamRequest(stream_id=self.stream_id) + self.capture_client.DeleteStream(request) + print('Deleted the stream') + except Exception as e: + print(f'Could not delete stream: {e}')