forked from viam-modules/viamrtsp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rtsp.go
391 lines (334 loc) · 9.96 KB
/
rtsp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
package viamrtsp
import (
"context"
"image"
"io"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/bluenviron/gortsplib/v3"
"github.com/bluenviron/gortsplib/v3/pkg/base"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/formats/rtph264"
"github.com/bluenviron/gortsplib/v3/pkg/formats/rtph265"
"github.com/bluenviron/gortsplib/v3/pkg/liberrors"
"github.com/bluenviron/gortsplib/v3/pkg/media"
"github.com/bluenviron/gortsplib/v3/pkg/url"
"github.com/pion/rtp"
"github.com/pkg/errors"
goutils "go.viam.com/utils"
"go.viam.com/rdk/components/camera"
"go.viam.com/rdk/gostream"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/rimage/transform"
)
var family = resource.ModelNamespace("erh").WithFamily("viamrtsp")
var ModelH264 = family.WithModel("rtsp-h264")
func init() {
resource.RegisterComponent(camera.API, ModelH264, resource.Registration[camera.Camera, *Config]{
Constructor: newRTSPCamera,
})
}
// Config are the config attributes for an RTSP camera model.
type Config struct {
Address string `json:"rtsp_address"`
IntrinsicParams *transform.PinholeCameraIntrinsics `json:"intrinsic_parameters,omitempty"`
DistortionParams *transform.BrownConrady `json:"distortion_parameters,omitempty"`
}
// Validate checks to see if the attributes of the model are valid.
func (conf *Config) Validate(path string) ([]string, error) {
_, err := url.Parse(conf.Address)
if err != nil {
return nil, err
}
if conf.IntrinsicParams != nil {
if err := conf.IntrinsicParams.CheckValid(); err != nil {
return nil, err
}
}
if conf.DistortionParams != nil {
if err := conf.DistortionParams.CheckValid(); err != nil {
return nil, err
}
}
return nil, nil
}
// rtspCamera contains the rtsp client, and the reader function that fulfills the camera interface.
type rtspCamera struct {
gostream.VideoReader
u *url.URL
client *gortsplib.Client
rawDecoder *decoder
cancelCtx context.Context
cancelFunc context.CancelFunc
activeBackgroundWorkers sync.WaitGroup
latestFrame atomic.Pointer[image.Image]
logger logging.Logger
}
// Close closes the camera. It always returns nil, but because of Close() interface, it needs to return an error.
func (rc *rtspCamera) Close(ctx context.Context) error {
rc.cancelFunc()
rc.activeBackgroundWorkers.Wait()
return rc.closeConnection()
}
// clientReconnectBackgroundWorker checks every 5 sec to see if the client is connected to the server, and reconnects if not.
func (rc *rtspCamera) clientReconnectBackgroundWorker() {
rc.activeBackgroundWorkers.Add(1)
goutils.ManagedGo(func() {
for goutils.SelectContextOrWait(rc.cancelCtx, 5*time.Second) {
badState := false
// use an OPTIONS request to see if the server is still responding to requests
if rc.client == nil {
badState = true
} else {
res, err := rc.client.Options(rc.u)
if err != nil && (errors.Is(err, liberrors.ErrClientTerminated{}) ||
errors.Is(err, io.EOF) ||
errors.Is(err, syscall.EPIPE) ||
errors.Is(err, syscall.ECONNREFUSED)) {
rc.logger.Warnw("The rtsp client encountered an error, trying to reconnect", "url", rc.u, "error", err)
badState = true
} else if res != nil && res.StatusCode != base.StatusOK {
rc.logger.Warnw("The rtsp server responded with non-OK status", "url", rc.u, "status code", res.StatusCode)
badState = true
}
}
if badState {
if err := rc.reconnectClient(); err != nil {
rc.logger.Warnw("cannot reconnect to rtsp server", "error", err)
} else {
rc.logger.Infow("reconnected to rtsp server", "url", rc.u)
}
}
}
}, rc.activeBackgroundWorkers.Done)
}
func (rc *rtspCamera) closeConnection() error {
var err error
if rc.client != nil {
err = rc.client.Close()
rc.client = nil
}
if rc.rawDecoder != nil {
rc.rawDecoder.close()
rc.rawDecoder = nil
}
return err
}
// reconnectClient reconnects the RTSP client to the streaming server by closing the old one and starting a new one.
func (rc *rtspCamera) reconnectClient() (err error) {
rc.logger.Warnf("reconnectClient called")
if rc == nil {
return errors.New("rtspCamera is nil")
}
err = rc.closeConnection()
if err != nil {
rc.logger.Debugw("error while closing rtsp client:", "error", err)
}
// replace the client with a new one, but close it if setup is not successful
rc.client = &gortsplib.Client{}
var clientSuccessful bool
defer func() {
if !clientSuccessful {
rc.closeConnection()
}
}()
err = rc.client.Start(rc.u.Scheme, rc.u.Host)
if err != nil {
return err
}
tracks, baseURL, _, err := rc.client.Describe(rc.u)
if err != nil {
return err
}
codecInfo, err := getStreamInfo(rc.u.String())
if err != nil {
return err
}
switch codecInfo {
case H264:
rc.logger.Infof("setting up H264 decoder")
err = rc.initH264(tracks, baseURL)
case H265:
rc.logger.Infof("setting up H265 decoder")
err = rc.initH265(tracks, baseURL)
default:
return errors.Errorf("codec not supported %v", codecInfo)
}
if err != nil {
return err
}
_, err = rc.client.Play(nil)
if err != nil {
return err
}
clientSuccessful = true
return nil
}
// initH264 initializes the H264 decoder and sets up the client to receive H264 packets.
func (rc *rtspCamera) initH264(tracks media.Medias, baseURL *url.URL) (err error) {
// setup RTP/H264 -> H264 decoder
var format *formats.H264
track := tracks.FindFormat(&format)
if track == nil {
rc.logger.Warn("tracks available")
for _, x := range tracks {
rc.logger.Warnf("\t %v", x)
}
return errors.New("h264 track not found")
}
_, err = rc.client.Setup(track, baseURL, 0, 0)
if err != nil {
return err
}
// setup RTP/H264 -> H264 decoder
rtpDec, err := format.CreateDecoder2()
if err != nil {
rc.logger.Errorf("error creating H264 decoder %v", err)
return err
}
// setup H264 -> raw frames decoder
rc.rawDecoder, err = newH264Decoder()
if err != nil {
return err
}
// if SPS and PPS are present into the SDP, send them to the decoder
if format.SPS != nil {
rc.rawDecoder.decode(format.SPS)
} else {
rc.logger.Warn("no SPS found in H264 format")
}
if format.PPS != nil {
rc.rawDecoder.decode(format.PPS)
} else {
rc.logger.Warn("no PPS found in H264 format")
}
// On packet retreival, turn it into an image, and store it in shared memory
rc.client.OnPacketRTP(track, format, func(pkt *rtp.Packet) {
// extract access units from RTP packets
au, _, err := rtpDec.Decode(pkt)
if err != nil {
if err != rtph264.ErrNonStartingPacketAndNoPrevious && err != rtph264.ErrMorePacketsNeeded {
rc.logger.Errorf("error decoding(1) h264 rstp stream %v", err)
}
return
}
for _, nalu := range au {
if len(nalu) < 20 {
// TODO(ERH): this is probably wrong, but fixes a spam issue with "no frame!"
continue
}
// convert NALUs into RGBA frames
lastImage, err := rc.rawDecoder.decode(nalu)
if err != nil {
rc.logger.Error("error decoding(2) h264 rtsp stream %v", err)
return
}
if lastImage != nil {
rc.latestFrame.Store(&lastImage)
}
}
})
return nil
}
// initH265 initializes the H265 decoder and sets up the client to receive H265 packets.
func (rc *rtspCamera) initH265(tracks media.Medias, baseURL *url.URL) (err error) {
var format *formats.H265
track := tracks.FindFormat(&format)
if track == nil {
rc.logger.Warn("tracks available")
for _, x := range tracks {
rc.logger.Warnf("\t %v", x)
}
return errors.New("h265 track not found")
}
_, err = rc.client.Setup(track, baseURL, 0, 0)
if err != nil {
return err
}
rtpDec, err := format.CreateDecoder2()
if err != nil {
rc.logger.Errorf("error creating H265 decoder %v", err)
return err
}
rc.rawDecoder, err = newH265Decoder()
if err != nil {
return err
}
// For H.265, handle VPS, SPS, and PPS
if format.VPS != nil {
rc.rawDecoder.decode(format.VPS)
} else {
rc.logger.Warn("no VPS found in H265 format")
}
if format.SPS != nil {
rc.rawDecoder.decode(format.SPS)
} else {
rc.logger.Warn("no SPS found in H265 format")
}
if format.PPS != nil {
rc.rawDecoder.decode(format.PPS)
} else {
rc.logger.Warnf("no PPS found in H265 format")
}
// On packet retreival, turn it into an image, and store it in shared memory
rc.client.OnPacketRTP(track, format, func(pkt *rtp.Packet) {
// Extract access units from RTP packets
au, _, err := rtpDec.Decode(pkt)
if err != nil {
if err != rtph265.ErrNonStartingPacketAndNoPrevious && err != rtph265.ErrMorePacketsNeeded {
rc.logger.Errorf("error decoding(1) h265 rstp stream %v", err)
}
return
}
for _, nalu := range au {
lastImage, err := rc.rawDecoder.decode(nalu)
if err != nil {
rc.logger.Error("error decoding(2) h265 rtsp stream %v", err)
return
}
if lastImage != nil {
rc.latestFrame.Store(&lastImage)
}
}
})
return nil
}
func newRTSPCamera(ctx context.Context, _ resource.Dependencies, conf resource.Config, logger logging.Logger) (camera.Camera, error) {
newConf, err := resource.NativeConfig[*Config](conf)
if err != nil {
return nil, err
}
u, err := url.Parse(newConf.Address)
if err != nil {
return nil, err
}
rtspCam := &rtspCamera{
u: u,
logger: logger,
}
err = rtspCam.reconnectClient()
if err != nil {
return nil, err
}
cancelCtx, cancel := context.WithCancel(context.Background())
reader := gostream.VideoReaderFunc(func(ctx context.Context) (image.Image, func(), error) {
latest := rtspCam.latestFrame.Load()
if latest == nil {
return nil, func() {}, errors.New("no frame yet")
}
return *latest, func() {}, nil
})
rtspCam.VideoReader = reader
rtspCam.cancelCtx = cancelCtx
rtspCam.cancelFunc = cancel
cameraModel := camera.NewPinholeModelWithBrownConradyDistortion(newConf.IntrinsicParams, newConf.DistortionParams)
rtspCam.clientReconnectBackgroundWorker()
src, err := camera.NewVideoSourceFromReader(ctx, rtspCam, &cameraModel, camera.ColorStream)
if err != nil {
return nil, err
}
return camera.FromVideoSource(conf.ResourceName(), src, logger), nil
}