Skip to content

Commit

Permalink
Merge pull request #24 from codefresh-io/CR-7756-sync-v1.5.5
Browse files Browse the repository at this point in the history
Cr 7756 sync v1.5.5
  • Loading branch information
daniel-codefresh authored Dec 21, 2021
2 parents f5a6ead + e8292d8 commit 162fba7
Show file tree
Hide file tree
Showing 51 changed files with 650 additions and 427 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ RUN apk update && apk upgrade && \
apk add ca-certificates && \
apk --no-cache add tzdata

ENV ARGO_VERSION=v3.1.1
ENV ARGO_VERSION=v3.2.4

RUN wget -q https://github.com/argoproj/argo-workflows/releases/download/${ARGO_VERSION}/argo-linux-${ARCH}.gz
RUN gunzip -f argo-linux-${ARCH}.gz
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ EXECUTABLES = curl docker gzip go
# docker image publishing options
DOCKER_PUSH?=false
IMAGE_NAMESPACE?=quay.io/argoproj
VERSION?=v1.5.3-cap-CR-7899
BASE_VERSION:=v1.5.3-cap-CR-7899
VERSION?=v1.5.5-cap-CR-7756
BASE_VERSION:=v1.5.5-cap-CR-7756

override LDFLAGS += \
-X ${PACKAGE}.version=${VERSION} \
Expand Down
6 changes: 5 additions & 1 deletion api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions api/sensor.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions api/sensor.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions controllers/sensor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,6 @@ func validateAWSLambdaTrigger(trigger *v1alpha1.AWSLambdaTrigger) error {
if trigger.Region == "" {
return errors.New("region in not specified")
}
if trigger.AccessKey == nil || trigger.SecretKey == nil {
return errors.New("either accesskey or secretkey secret selector is not specified")
}
if trigger.Payload == nil {
return errors.New("payload parameters are not specified")
}
Expand Down
13 changes: 13 additions & 0 deletions eventsources/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package common

import "github.com/cloudevents/sdk-go/v2/event"

type Options func(*event.Event) error

// Option to set different ID for event
func WithID(id string) Options {
return func(e *event.Event) error {
e.SetID(id)
return nil
}
}
5 changes: 3 additions & 2 deletions eventsources/common/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
metrics "github.com/argoproj/argo-events/metrics"
"github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
)
Expand Down Expand Up @@ -177,7 +178,7 @@ func activateRoute(router Router, controller *Controller) {
}

// manageRouteChannels consumes data from route's data channel and stops the processing when the event source is stopped/removed
func manageRouteChannels(router Router, dispatch func([]byte) error) {
func manageRouteChannels(router Router, dispatch func([]byte, ...eventsourcecommon.Options) error) {
route := router.GetRoute()
logger := route.Logger
for {
Expand All @@ -198,7 +199,7 @@ func manageRouteChannels(router Router, dispatch func([]byte) error) {
}

// ManagerRoute manages the lifecycle of a route
func ManageRoute(ctx context.Context, router Router, controller *Controller, dispatch func([]byte) error) error {
func ManageRoute(ctx context.Context, router Router, controller *Controller, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
route := router.GetRoute()

logger := route.Logger
Expand Down
11 changes: 9 additions & 2 deletions eventsources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/argoproj/argo-events/common/logging"
"github.com/argoproj/argo-events/eventbus"
eventbusdriver "github.com/argoproj/argo-events/eventbus/driver"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/sources/amqp"
"github.com/argoproj/argo-events/eventsources/sources/awssns"
"github.com/argoproj/argo-events/eventsources/sources/awssqs"
Expand Down Expand Up @@ -64,7 +65,7 @@ type EventingServer interface {
GetEventSourceType() apicommon.EventSourceType

// Function to start listening events.
StartListening(ctx context.Context, dispatch func([]byte) error) error
StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error
}

// GetEventingServers returns the mapping of event source type and list of eventing servers
Expand Down Expand Up @@ -410,13 +411,19 @@ func (e *EventSourceAdaptor) run(ctx context.Context, servers map[apicommon.Even
Jitter: &jitter,
}
if err = common.Connect(&backoff, func() error {
return s.StartListening(ctx, func(data []byte) error {
return s.StartListening(ctx, func(data []byte, opts ...eventsourcecommon.Options) error {
event := cloudevents.NewEvent()
event.SetID(fmt.Sprintf("%x", uuid.New()))
event.SetType(string(s.GetEventSourceType()))
event.SetSource(s.GetEventSourceName())
event.SetSubject(s.GetEventName())
event.SetTime(time.Now())
for _, opt := range opts {
err := opt(&event)
if err != nil {
return err
}
}
err := event.SetData(cloudevents.ApplicationJSON, data)
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions eventsources/sources/amqp/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/sources"
metrics "github.com/argoproj/argo-events/metrics"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
Expand Down Expand Up @@ -58,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())

Expand Down Expand Up @@ -153,7 +154,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}

func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, msg amqplib.Delivery, dispatch func([]byte) error, log *zap.SugaredLogger) error {
func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, msg amqplib.Delivery, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error {
defer func(start time.Time) {
el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond))
}(time.Now())
Expand Down
3 changes: 2 additions & 1 deletion eventsources/sources/awssns/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
commonaws "github.com/argoproj/argo-events/eventsources/common/aws"
"github.com/argoproj/argo-events/eventsources/common/webhook"
"github.com/argoproj/argo-events/eventsources/sources"
Expand Down Expand Up @@ -267,7 +268,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts an SNS event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
logger := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())

Expand Down
5 changes: 3 additions & 2 deletions eventsources/sources/awssqs/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.uber.org/zap"

"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
awscommon "github.com/argoproj/argo-events/eventsources/common/aws"
"github.com/argoproj/argo-events/eventsources/sources"
metrics "github.com/argoproj/argo-events/metrics"
Expand Down Expand Up @@ -60,7 +61,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
log.Info("started processing the AWS SQS event source...")
Expand Down Expand Up @@ -122,7 +123,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}

func (el *EventListener) processMessage(ctx context.Context, message *sqslib.Message, dispatch func([]byte) error, ack func(), log *zap.SugaredLogger) {
func (el *EventListener) processMessage(ctx context.Context, message *sqslib.Message, dispatch func([]byte, ...eventsourcecommon.Options) error, ack func(), log *zap.SugaredLogger) {
defer func(start time.Time) {
el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond))
}(time.Now())
Expand Down
3 changes: 2 additions & 1 deletion eventsources/sources/azureeventshub/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/sources"
metrics "github.com/argoproj/argo-events/metrics"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
Expand Down Expand Up @@ -59,7 +60,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
log.Info("started processing the Azure Events Hub event source...")
Expand Down
3 changes: 2 additions & 1 deletion eventsources/sources/bitbucketserver/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/common/webhook"
"github.com/argoproj/argo-events/eventsources/sources"
"github.com/argoproj/argo-events/pkg/apis/events"
Expand Down Expand Up @@ -157,7 +158,7 @@ func (router *Router) PostInactivate() error {
}

// StartListening starts an event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
defer sources.Recover(el.GetEventName())

bitbucketserverEventSource := &el.BitbucketServerEventSource
Expand Down
3 changes: 2 additions & 1 deletion eventsources/sources/calendar/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/persist"
metrics "github.com/argoproj/argo-events/metrics"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
Expand Down Expand Up @@ -134,7 +135,7 @@ func (el *EventListener) getExecutionTime() (time.Time, error) {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
el.log = logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
el.log.Info("started processing the calendar event source...")
Expand Down
3 changes: 2 additions & 1 deletion eventsources/sources/emitter/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/sources"
metrics "github.com/argoproj/argo-events/metrics"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
Expand Down Expand Up @@ -58,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
log.Info("started processing the Emitter event source...")
Expand Down
7 changes: 4 additions & 3 deletions eventsources/sources/file/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.uber.org/zap"

"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/common/fsevent"
"github.com/argoproj/argo-events/eventsources/sources"
metrics "github.com/argoproj/argo-events/metrics"
Expand Down Expand Up @@ -60,7 +61,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
defer sources.Recover(el.GetEventName())
Expand All @@ -81,7 +82,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}

// listenEvents listen to file related events.
func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte) error, log *zap.SugaredLogger) error {
func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error {
fileEventSource := &el.FileEventSource

// create new fs watcher
Expand Down Expand Up @@ -161,7 +162,7 @@ func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte)
}

// listenEvents listen to file related events using polling.
func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte) error, log *zap.SugaredLogger) error {
func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error {
fileEventSource := &el.FileEventSource

// create new fs watcher
Expand Down
3 changes: 2 additions & 1 deletion eventsources/sources/gcppubsub/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/sources"
metrics "github.com/argoproj/argo-events/metrics"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
Expand Down Expand Up @@ -65,7 +66,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening listens to GCP PubSub events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
// In order to listen events from GCP PubSub,
// 1. Parse the event source that contains configuration to connect to GCP PubSub
// 2. Create a new PubSub client
Expand Down
Loading

0 comments on commit 162fba7

Please sign in to comment.