Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vision yeet #5

Merged
merged 7 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
*filtered-video
*video-store
FFmpeg
*.mp4
.env
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ export PKG_CONFIG_PATH=$(FFMPEG_BUILD)/lib/pkgconfig

.PHONY: lint tool-install

$(BIN_OUTPUT_PATH)/filtered-video: *.go cam/*.go $(FFMPEG_BUILD)
$(BIN_OUTPUT_PATH)/video-store: *.go cam/*.go $(FFMPEG_BUILD)
go mod tidy
CGO_LDFLAGS=$(CGO_LDFLAGS) CGO_CFLAGS=$(CGO_CFLAGS) go build -o $(BIN_OUTPUT_PATH)/filtered-video main.go
CGO_LDFLAGS=$(CGO_LDFLAGS) CGO_CFLAGS=$(CGO_CFLAGS) go build -o $(BIN_OUTPUT_PATH)/video-store main.go

$(FFMPEG_VERSION_PLATFORM):
git clone https://github.com/FFmpeg/FFmpeg.git --depth 1 --branch $(FFMPEG_TAG) $(FFMPEG_VERSION_PLATFORM)
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Filtered Video
The `filtered-video` module brings security camera functionality to your smart machine! The module consumes a source [Camera](https://docs.viam.com/components/camera/) and a [Vision Service](https://docs.viam.com/services/vision/), saves the camera output as video files to disk, and filters which video clips are uploaded to the cloud based on triggers from the vision service.
# Video Storage
The `video-store` module brings security camera functionality to your smart machine! The module consumes a source [Camera](https://docs.viam.com/components/camera/) and a [Vision Service](https://docs.viam.com/services/vision/), saves the camera output as video files to disk, and filters which video clips are uploaded to the cloud based on triggers from the vision service.

> **Note:** This component is a work in progress and is not yet fully implemented.

## Configure your `filtered-video` component
## Configure your `video-store` component

Fill in the attributes as applicable to the component, according to the example below.

Expand All @@ -12,7 +12,7 @@ Fill in the attributes as applicable to the component, according to the example
"name": "fv-cam",
"namespace": "rdk",
"type": "camera",
"model": "viam:camera:filtered-video",
"model": "viam:camera:video-store",
"attributes": {
"camera": "webcam-1", // name of the camera to use
"vision": "vision-service-1", // name of the vision service dependency
Expand Down
137 changes: 71 additions & 66 deletions cam/cam.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Package filteredvideo contains the implementation of the filtered video camera component.
package filteredvideo
// Package videostore contains the implementation of the video storage camera component.
package videostore

import (
"context"
"errors"
"sync"
"path/filepath"
"time"

"go.viam.com/rdk/components/camera"
Expand All @@ -18,22 +18,23 @@ import (
"go.viam.com/utils"
)

// Model is the model for the filtered video camera component.
// Model is the model for the video storage camera component.
// TODO(seanp): Personal module for now, should be movied to viam module in prod.
var Model = resource.ModelNamespace("seanavery").WithFamily("camera").WithModel("filtered-video")
var Model = resource.ModelNamespace("seanavery").WithFamily("video").WithModel("storage")

const (
defaultClipLength = 30 // seconds
defaultStorageSize = 10 // GB
defaultVideoCodec = "h264"
defaultVideoBitrate = 1000000
defaultVideoPreset = "medium"
defaultVideoFormat = "mp4"
defaultLogLevel = "error"
uploadPath = "/.viam/video-upload/"
defaultSegmentSeconds = 30 // seconds
defaultStorageSize = 10 // GB
defaultVideoCodec = "h264"
defaultVideoBitrate = 1000000
defaultVideoPreset = "medium"
defaultVideoFormat = "mp4"
defaultLogLevel = "error"
defaultUploadPath = ".viam/video-upload"
defaultStoragePath = ".viam/video-storage"
)

type filteredVideo struct {
type videostore struct {
resource.AlwaysRebuild
resource.TriviallyCloseable

Expand All @@ -49,13 +50,13 @@ type filteredVideo struct {

enc *encoder
seg *segmenter

mu sync.Mutex
}

type storage struct {
ClipLength int `json:"clip_length"`
Size int `json:"size"`
SegmentSeconds int `json:"segment_seconds"`
SizeGB int `json:"size_gb"`
UploadPath string `json:"upload_path"`
StoragePath string `json:"storage_path"`
}

type video struct {
Expand All @@ -71,15 +72,12 @@ type cameraProperties struct {
Framerate int `json:"framerate"`
}

// Config is the configuration for the filtered video camera component.
// Config is the configuration for the video storage camera component.
type Config struct {
Camera string `json:"camera"`
Storage storage `json:"storage"`
Video video `json:"video"`

Classifications map[string]float64 `json:"classifications,omitempty"`
Objects map[string]float64 `json:"objects,omitempty"`

// TODO(seanp): Remove once camera properties are returned from camera component.
Properties cameraProperties `json:"cam_props"`
}
Expand All @@ -89,11 +87,11 @@ func init() {
camera.API,
Model,
resource.Registration[camera.Camera, *Config]{
Constructor: newFilteredVideo,
Constructor: newvideostore,
})
}

func newFilteredVideo(
func newvideostore(
ctx context.Context,
deps resource.Dependencies,
conf resource.Config,
Expand All @@ -104,26 +102,27 @@ func newFilteredVideo(
return nil, err
}

fv := &filteredVideo{
vs := &videostore{
name: conf.ResourceName(),
conf: newConf,
logger: logger,
}

// Source camera that provides the frames to be processed.
fv.cam, err = camera.FromDependencies(deps, newConf.Camera)
vs.cam, err = camera.FromDependencies(deps, newConf.Camera)
if err != nil {
return nil, err
}

var errHandlers []gostream.ErrorHandler
fv.stream, err = fv.cam.Stream(ctx, errHandlers...)
vs.stream, err = vs.cam.Stream(ctx, errHandlers...)
if err != nil {
return nil, err
}

// TODO(seanp): make this configurable
logLevel := lookupLogID(defaultLogLevel)
// logLevel := lookupLogID(defaultLogLevel)
logLevel := lookupLogID("debug")
ffmppegLogLevel(logLevel)

// TODO(seanp): Forcing h264 for now until h265 is supported.
Expand All @@ -140,7 +139,7 @@ func newFilteredVideo(
newConf.Video.Format = defaultVideoFormat
}

fv.enc, err = newEncoder(
vs.enc, err = newEncoder(
logger,
newConf.Video.Codec,
newConf.Video.Bitrate,
Expand All @@ -153,29 +152,35 @@ func newFilteredVideo(
return nil, err
}

if newConf.Storage.ClipLength == 0 {
newConf.Storage.ClipLength = defaultClipLength
if newConf.Storage.SegmentSeconds == 0 {
newConf.Storage.SegmentSeconds = defaultSegmentSeconds
}
if newConf.Storage.SizeGB == 0 {
newConf.Storage.SizeGB = defaultStorageSize
}
if newConf.Storage.UploadPath == "" {
newConf.Storage.UploadPath = filepath.Join(getHomeDir(), defaultUploadPath, vs.name.Name)
}
if newConf.Storage.Size == 0 {
newConf.Storage.Size = defaultStorageSize
if newConf.Storage.StoragePath == "" {
newConf.Storage.StoragePath = filepath.Join(getHomeDir(), defaultStoragePath, vs.name.Name)
}
fv.seg, err = newSegmenter(logger, fv.enc, newConf.Storage.Size, newConf.Storage.ClipLength)
vs.seg, err = newSegmenter(logger, vs.enc, newConf.Storage.SizeGB, newConf.Storage.SegmentSeconds, newConf.Storage.StoragePath)
if err != nil {
return nil, err
}

fv.uploadPath = getHomeDir() + uploadPath
err = createDir(fv.uploadPath)
vs.uploadPath = newConf.Storage.UploadPath
err = createDir(vs.uploadPath)
if err != nil {
return nil, err
}

fv.workers = rdkutils.NewStoppableWorkers(fv.processFrames, fv.deleter)
vs.workers = rdkutils.NewStoppableWorkers(vs.processFrames, vs.deleter)

return fv, nil
return vs, nil
}

// Validate validates the configuration for the filtered video camera component.
// Validate validates the configuration for the video storage camera component.
func (cfg *Config) Validate(path string) ([]string, error) {
if cfg.Camera == "" {
return nil, utils.NewConfigValidationFieldRequiredError(path, "camera")
Expand All @@ -189,35 +194,35 @@ func (cfg *Config) Validate(path string) ([]string, error) {
return []string{cfg.Camera}, nil
}

func (fv *filteredVideo) Name() resource.Name {
return fv.name
func (vs *videostore) Name() resource.Name {
return vs.name
}

func (fv *filteredVideo) DoCommand(_ context.Context, _ map[string]interface{}) (map[string]interface{}, error) {
func (vs *videostore) DoCommand(_ context.Context, _ map[string]interface{}) (map[string]interface{}, error) {
return nil, resource.ErrDoUnimplemented
}

func (fv *filteredVideo) Images(_ context.Context) ([]camera.NamedImage, resource.ResponseMetadata, error) {
func (vs *videostore) Images(_ context.Context) ([]camera.NamedImage, resource.ResponseMetadata, error) {
return nil, resource.ResponseMetadata{}, errors.New("not implemented")
}

func (fv *filteredVideo) NextPointCloud(_ context.Context) (pointcloud.PointCloud, error) {
func (vs *videostore) NextPointCloud(_ context.Context) (pointcloud.PointCloud, error) {
return nil, errors.New("not implemented")
}

func (fv *filteredVideo) Projector(ctx context.Context) (transform.Projector, error) {
return fv.cam.Projector(ctx)
func (vs *videostore) Projector(ctx context.Context) (transform.Projector, error) {
return vs.cam.Projector(ctx)
}

func (fv *filteredVideo) Properties(ctx context.Context) (camera.Properties, error) {
p, err := fv.cam.Properties(ctx)
func (vs *videostore) Properties(ctx context.Context) (camera.Properties, error) {
p, err := vs.cam.Properties(ctx)
if err == nil {
p.SupportsPCD = false
}
return p, err
}

func (fv *filteredVideo) Stream(_ context.Context, _ ...gostream.ErrorHandler) (gostream.VideoStream, error) {
func (vs *videostore) Stream(_ context.Context, _ ...gostream.ErrorHandler) (gostream.VideoStream, error) {
return nil, errors.New("not implemented")
}

Expand All @@ -226,41 +231,41 @@ func (fv *filteredVideo) Stream(_ context.Context, _ ...gostream.ErrorHandler) (
// meant for long term storage of video clips that are not necessarily triggered by
// detections.
// TODO(seanp): Should this be throttled to a certain FPS?
func (fv *filteredVideo) processFrames(ctx context.Context) {
func (vs *videostore) processFrames(ctx context.Context) {
for {
// TODO(seanp): How to gracefully exit this loop?
select {
case <-ctx.Done():
return
default:
}
frame, _, err := fv.stream.Next(ctx)
frame, _, err := vs.stream.Next(ctx)
if err != nil {
fv.logger.Error("failed to get frame from camera", err)
vs.logger.Error("failed to get frame from camera", err)
return
}
lazyImage, ok := frame.(*rimage.LazyEncodedImage)
if !ok {
fv.logger.Error("frame is not of type *rimage.LazyEncodedImage")
vs.logger.Error("frame is not of type *rimage.LazyEncodedImage")
return
}
encoded, pts, dts, err := fv.enc.encode(lazyImage.DecodedImage())
encoded, pts, dts, err := vs.enc.encode(lazyImage.DecodedImage())
if err != nil {
fv.logger.Error("failed to encode frame", err)
vs.logger.Error("failed to encode frame", err)
return
}
// segment frame
err = fv.seg.writeEncodedFrame(encoded, pts, dts)
err = vs.seg.writeEncodedFrame(encoded, pts, dts)
if err != nil {
fv.logger.Error("failed to segment frame", err)
vs.logger.Error("failed to segment frame", err)
return
}
}
}

// deleter is a go routine that cleans up old clips if storage is full. It runs every
// minute and deletes the oldest clip until the storage size is below the max.
func (fv *filteredVideo) deleter(ctx context.Context) {
func (vs *videostore) deleter(ctx context.Context) {
// TODO(seanp): Using seconds for now, but should be minutes in prod.
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
Expand All @@ -270,24 +275,24 @@ func (fv *filteredVideo) deleter(ctx context.Context) {
return
case <-ticker.C:
// Perform the deletion of the oldest clip
err := fv.seg.cleanupStorage()
err := vs.seg.cleanupStorage()
if err != nil {
fv.logger.Error("failed to clean up storage", err)
vs.logger.Error("failed to clean up storage", err)
continue
}
}
}
}

// Close closes the filtered video camera component.
// Close closes the video storage camera component.
// It closes the stream, workers, encoder, segmenter, and watcher.
func (fv *filteredVideo) Close(ctx context.Context) error {
err := fv.stream.Close(ctx)
func (vs *videostore) Close(ctx context.Context) error {
err := vs.stream.Close(ctx)
if err != nil {
return err
}
fv.workers.Stop()
fv.enc.Close()
fv.seg.Close()
vs.workers.Stop()
vs.enc.Close()
vs.seg.Close()
return nil
}
2 changes: 1 addition & 1 deletion cam/encoder.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package filteredvideo
package videostore

/*
#include <libavcodec/avcodec.h>
Expand Down
Loading
Loading