Skip to content

Commit

Permalink
Remove vision service integration
Browse files Browse the repository at this point in the history
  • Loading branch information
seanavery committed Aug 27, 2024
1 parent b6cb04f commit 477f1a6
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 121 deletions.
123 changes: 3 additions & 120 deletions cam/cam.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,16 @@ package filteredvideo
import (
"context"
"errors"
"fmt"
"image"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/fsnotify/fsnotify"
"go.viam.com/rdk/components/camera"
"go.viam.com/rdk/gostream"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/pointcloud"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/rimage"
"go.viam.com/rdk/rimage/transform"
"go.viam.com/rdk/services/vision"
rdkutils "go.viam.com/rdk/utils"
"go.viam.com/utils"
)
Expand Down Expand Up @@ -51,19 +44,13 @@ type filteredVideo struct {

cam camera.Camera
stream gostream.VideoStream
vis vision.Service

workers rdkutils.StoppableWorkers

enc *encoder
seg *segmenter

mu sync.Mutex
triggers map[string]bool
watcher *fsnotify.Watcher

latestFrame atomic.Pointer[image.Image]
lastFile string
mu sync.Mutex

Check failure on line 53 in cam/cam.go

View workflow job for this annotation

GitHub Actions / quality-checks

field `mu` is unused (unused)
}

type storage struct {
Expand All @@ -87,7 +74,6 @@ type cameraProperties struct {
// Config is the configuration for the filtered video camera component.
type Config struct {
Camera string `json:"camera"`
Vision string `json:"vision"`
Storage storage `json:"storage"`
Video video `json:"video"`

Expand Down Expand Up @@ -130,12 +116,6 @@ func newFilteredVideo(
return nil, err
}

// Vision service that provides the detections for the frames.
fv.vis, err = vision.FromDependencies(deps, newConf.Vision)
if err != nil {
return nil, err
}

var errHandlers []gostream.ErrorHandler
fv.stream, err = fv.cam.Stream(ctx, errHandlers...)
if err != nil {
Expand Down Expand Up @@ -184,26 +164,13 @@ func newFilteredVideo(
return nil, err
}

watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
fv.watcher = watcher

fv.triggers = make(map[string]bool)
fv.uploadPath = getHomeDir() + uploadPath
err = createDir(fv.uploadPath)
if err != nil {
return nil, err
}

fv.workers = rdkutils.NewStoppableWorkers(fv.processFrames, fv.processDetections, fv.deleter, fv.copier)

// Start fsnotify watcher to listen for new files created in the storage path.
err = watcher.Add(fv.seg.storagePath)
if err != nil {
return nil, err
}
fv.workers = rdkutils.NewStoppableWorkers(fv.processFrames, fv.deleter)

return fv, nil
}
Expand All @@ -214,16 +181,12 @@ func (cfg *Config) Validate(path string) ([]string, error) {
return nil, utils.NewConfigValidationFieldRequiredError(path, "camera")
}

if cfg.Vision == "" {
return nil, utils.NewConfigValidationFieldRequiredError(path, "vision")
}

// TODO(seanp): Remove once camera properties are returned from camera component.
if cfg.Properties == (cameraProperties{}) {
return nil, utils.NewConfigValidationFieldRequiredError(path, "cam_props")
}

return []string{cfg.Camera, cfg.Vision}, nil
return []string{cfg.Camera}, nil
}

func (fv *filteredVideo) Name() resource.Name {
Expand Down Expand Up @@ -276,7 +239,6 @@ func (fv *filteredVideo) processFrames(ctx context.Context) {
fv.logger.Error("failed to get frame from camera", err)
return
}
fv.latestFrame.Store(&frame)
lazyImage, ok := frame.(*rimage.LazyEncodedImage)
if !ok {
fv.logger.Error("frame is not of type *rimage.LazyEncodedImage")
Expand All @@ -296,38 +258,6 @@ func (fv *filteredVideo) processFrames(ctx context.Context) {
}
}

// processDetections reads frames from the camera, processes them with the vision service,
// and triggers the copier to copy the frame to upload storage if a detection is found.
func (fv *filteredVideo) processDetections(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
}
frame := fv.latestFrame.Load()
if frame == nil {
continue
}
res, err := fv.vis.Detections(ctx, *frame, nil)
if err != nil {
fv.logger.Error("failed to get detections from vision service", err)
return
}
for _, detection := range res {
label := detection.Label()
score := detection.Score()

if threshold, exists := fv.conf.Objects[label]; exists && score > threshold {
fv.mu.Lock()
fv.logger.Debugf("detected %s with score %f", label, score)
fv.triggers[label] = true
fv.mu.Unlock()
}
}
}
}

// deleter is a go routine that cleans up old clips if storage is full. It runs every
// minute and deletes the oldest clip until the storage size is below the max.
func (fv *filteredVideo) deleter(ctx context.Context) {
Expand All @@ -349,49 +279,6 @@ func (fv *filteredVideo) deleter(ctx context.Context) {
}
}

// copier is go routine that copies the latest frame to the upload storage directory.
// It listens for files created in the storage path. If detection triggers are found during
// the last clip window the previous clip is copied to the upload storage directory with the
// trigger keys in the filename.
func (fv *filteredVideo) copier(ctx context.Context) {
for {
select {
case event, ok := <-fv.watcher.Events:
if !ok {
return
}
if event.Op&fsnotify.Create == fsnotify.Create {
fv.mu.Lock()
fv.logger.Debugf("new file created: %s", event.Name)
if len(fv.triggers) > 0 && fv.lastFile != "" {
filename := filepath.Base(fv.lastFile)
var triggerKeys []string
for key := range fv.triggers {
triggerKeys = append(triggerKeys, key)
}
triggersStr := strings.Join(triggerKeys, "_")
copyName := fmt.Sprintf("%s%s_%s", fv.uploadPath, triggersStr, filename)
fv.logger.Debugf("copying %s to %s", fv.lastFile, copyName)
err := copyFile(fv.lastFile, copyName)
if err != nil {
fv.logger.Error("failed to copy file", err)
}
}
fv.lastFile = event.Name
fv.triggers = make(map[string]bool)
fv.mu.Unlock()
}
case err, ok := <-fv.watcher.Errors:
if !ok {
return
}
fv.logger.Error("error:", err)
case <-ctx.Done():
return
}
}
}

// Close closes the filtered video camera component.
// It closes the stream, workers, encoder, segmenter, and watcher.
func (fv *filteredVideo) Close(ctx context.Context) error {
Expand All @@ -402,9 +289,5 @@ func (fv *filteredVideo) Close(ctx context.Context) error {
fv.workers.Stop()
fv.enc.Close()
fv.seg.Close()
err = fv.watcher.Close()
if err != nil {
return err
}
return nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/AlekSi/gocov-xml v1.0.0
github.com/axw/gocov v1.1.0
github.com/edaniels/golinters v0.0.5-0.20220906153528-641155550742
github.com/fsnotify/fsnotify v1.7.0
github.com/fullstorydev/grpcurl v1.8.6
github.com/golangci/golangci-lint v1.54.0
github.com/rhysd/actionlint v1.6.24
Expand Down Expand Up @@ -88,6 +87,7 @@ require (
github.com/fatih/structtag v1.2.0 // indirect
github.com/firefart/nonamedreturns v1.0.4 // indirect
github.com/fogleman/gg v1.3.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/fzipp/gocyclo v0.6.0 // indirect
github.com/gen2brain/malgo v0.11.21 // indirect
github.com/go-critic/go-critic v0.8.2 // indirect
Expand Down

0 comments on commit 477f1a6

Please sign in to comment.