diff --git a/go.mod b/go.mod index dd9be6610..f700bc399 100644 --- a/go.mod +++ b/go.mod @@ -21,10 +21,10 @@ require ( k8s.io/client-go v0.30.3 k8s.io/code-generator v0.30.3 knative.dev/caching v0.0.0-20241118131847-a38b40d8a39c - knative.dev/eventing v0.43.1-0.20241121083601-5ad7dabebd01 + knative.dev/eventing v0.43.1-0.20241126125511-65da6fc77cd4 knative.dev/hack v0.0.0-20241106013728-b7995315deb5 knative.dev/pkg v0.0.0-20241118074447-a7fd9b10bb9f - knative.dev/reconciler-test v0.0.0-20241106013737-0619dc3ecbcf + knative.dev/reconciler-test v0.0.0-20241126141613-2f0e67f51c46 knative.dev/serving v0.43.1-0.20241119121959-3e45e8f8392b sigs.k8s.io/yaml v1.4.0 ) diff --git a/go.sum b/go.sum index 59365e881..541c372f4 100644 --- a/go.sum +++ b/go.sum @@ -1393,16 +1393,16 @@ k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1 k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= knative.dev/caching v0.0.0-20241118131847-a38b40d8a39c h1:fxyQ00VbltLYz+MSZ1RPjzea2gfnH3Y2HDG7PqiwmQw= knative.dev/caching v0.0.0-20241118131847-a38b40d8a39c/go.mod h1:xcxIBx5jKR4HANCbcN4If+uNU1Y76b/tn9eV1byvJKc= -knative.dev/eventing v0.43.1-0.20241121083601-5ad7dabebd01 h1:nmYdUuubc8X9kNEKxKebdXYouVLWAhQz7SKVS1CvK9M= -knative.dev/eventing v0.43.1-0.20241121083601-5ad7dabebd01/go.mod h1:RxMFtxk903ZoxyS140MPdLLePTzBdeaGkVmBTB52t04= +knative.dev/eventing v0.43.1-0.20241126125511-65da6fc77cd4 h1:0jgqiStGgfXkFeTFKXM02tw7cxJSOypE0Q1xhInTucc= +knative.dev/eventing v0.43.1-0.20241126125511-65da6fc77cd4/go.mod h1:RxMFtxk903ZoxyS140MPdLLePTzBdeaGkVmBTB52t04= knative.dev/hack v0.0.0-20241106013728-b7995315deb5 h1:CfU5+6B+ylBd7mSGpvRqpzZV8H5ZQLGUwVygFzbE+1o= knative.dev/hack v0.0.0-20241106013728-b7995315deb5/go.mod h1:R0ritgYtjLDO9527h5vb5X6gfvt5LCrJ55BNbVDsWiY= knative.dev/networking v0.0.0-20241118075147-929a5d5f19d0 h1:3vj6wR95isnuqgjQzcclyrzaodv5Jvjc7xq4Bv0yde8= knative.dev/networking v0.0.0-20241118075147-929a5d5f19d0/go.mod h1:VvJGbKdlbEG6xr8q2LMLpiUlIt8OUiJZBRlT9yUq09w= knative.dev/pkg v0.0.0-20241118074447-a7fd9b10bb9f h1:ggyD8WGF4LbTWfCiLo++EC/Q7rvYY4UI6CzuDt9dXkE= knative.dev/pkg v0.0.0-20241118074447-a7fd9b10bb9f/go.mod h1:C2dxK66GlycMOS0SKqv0SMAnWkxsYbG4hkH32Xg1qD0= -knative.dev/reconciler-test v0.0.0-20241106013737-0619dc3ecbcf h1:FMgW5irj5xRPSRmtVICeLNHnZsnn7t8IFO0Fj3Kf+jo= -knative.dev/reconciler-test v0.0.0-20241106013737-0619dc3ecbcf/go.mod h1:W9Kmdoxelg2mswUpDKerL/4Ih1/ouVhlSMeZeJ5LX9c= +knative.dev/reconciler-test v0.0.0-20241126141613-2f0e67f51c46 h1:myEyBv2K2BSRqOgSERxNhSTAplG5hS+a94E4s6Wy3/g= +knative.dev/reconciler-test v0.0.0-20241126141613-2f0e67f51c46/go.mod h1:zDapuiJIFS67XEVBJWdEnVN5KHE8EvFGooLQGXeDH/c= knative.dev/serving v0.43.1-0.20241119121959-3e45e8f8392b h1:+grx1JH/q3+R925NGKVZCyangIPqTYsu6WFuViEy9Nk= knative.dev/serving v0.43.1-0.20241119121959-3e45e8f8392b/go.mod h1:O39Rmsexjtgrqx2MwzH7cbOl+EP2Da9v+HHuh0N4uIs= nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= diff --git a/vendor/knative.dev/eventing/pkg/apis/sinks/register.go b/vendor/knative.dev/eventing/pkg/apis/sinks/register.go index 676fa75e8..1994d5791 100644 --- a/vendor/knative.dev/eventing/pkg/apis/sinks/register.go +++ b/vendor/knative.dev/eventing/pkg/apis/sinks/register.go @@ -33,6 +33,12 @@ var ( Group: GroupName, Resource: "jobsinks", } + + // IntegrationSinkResource respresents a Knative Eventing sink IntegrationSink + IntegrationSinkResource = schema.GroupResource{ + Group: GroupName, + Resource: "integrationsinks", + } ) type Config struct { diff --git a/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/integration_sink_conversion.go b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/integration_sink_conversion.go new file mode 100644 index 000000000..8f41d2da4 --- /dev/null +++ b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/integration_sink_conversion.go @@ -0,0 +1,36 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "fmt" + + "knative.dev/pkg/apis" +) + +// ConvertTo implements apis.Convertible +// Converts source from v1alpha1.IntegrationSink into a higher version. +func (sink *IntegrationSink) ConvertTo(ctx context.Context, obj apis.Convertible) error { + return fmt.Errorf("v1alpha1 is the highest known version, got: %T", sink) +} + +// ConvertFrom implements apis.Convertible +// Converts source from a higher version into v1beta2.IntegrationSink +func (sink *IntegrationSink) ConvertFrom(ctx context.Context, obj apis.Convertible) error { + return fmt.Errorf("v1alpha1 is the highest known version, got: %T", sink) +} diff --git a/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/integration_sink_defaults.go b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/integration_sink_defaults.go new file mode 100644 index 000000000..f77df267e --- /dev/null +++ b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/integration_sink_defaults.go @@ -0,0 +1,26 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import "context" + +func (sink *IntegrationSink) SetDefaults(ctx context.Context) { + sink.Spec.SetDefaults(ctx) +} + +func (sink *IntegrationSinkSpec) SetDefaults(ctx context.Context) { +} diff --git a/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go new file mode 100644 index 000000000..1ad33e2ca --- /dev/null +++ b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go @@ -0,0 +1,123 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +const ( + // IntegrationSinkConditionReady has status True when the IntegrationSink is ready to send events. + IntegrationSinkConditionReady = apis.ConditionReady + + IntegrationSinkConditionAddressable apis.ConditionType = "Addressable" + + // IntegrationSinkConditionDeploymentReady has status True when the IntegrationSink has been configured with a deployment. + IntegrationSinkConditionDeploymentReady apis.ConditionType = "DeploymentReady" + + // IntegrationSinkConditionEventPoliciesReady has status True when all the applying EventPolicies for this + // IntegrationSink are ready. + IntegrationSinkConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady" +) + +var IntegrationSinkCondSet = apis.NewLivingConditionSet( + IntegrationSinkConditionAddressable, + IntegrationSinkConditionDeploymentReady, + IntegrationSinkConditionEventPoliciesReady, +) + +// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. +func (*IntegrationSink) GetConditionSet() apis.ConditionSet { + return IntegrationSinkCondSet +} + +// GetCondition returns the condition currently associated with the given type, or nil. +func (s *IntegrationSinkStatus) GetCondition(t apis.ConditionType) *apis.Condition { + return IntegrationSinkCondSet.Manage(s).GetCondition(t) +} + +// GetTopLevelCondition returns the top level Condition. +func (ps *IntegrationSinkStatus) GetTopLevelCondition() *apis.Condition { + return IntegrationSinkCondSet.Manage(ps).GetTopLevelCondition() +} + +// IsReady returns true if the resource is ready overall. +func (s *IntegrationSinkStatus) IsReady() bool { + return IntegrationSinkCondSet.Manage(s).IsHappy() +} + +// InitializeConditions sets relevant unset conditions to Unknown state. +func (s *IntegrationSinkStatus) InitializeConditions() { + IntegrationSinkCondSet.Manage(s).InitializeConditions() +} + +// MarkAddressableReady marks the Addressable condition to True. +func (s *IntegrationSinkStatus) MarkAddressableReady() { + IntegrationSinkCondSet.Manage(s).MarkTrue(IntegrationSinkConditionAddressable) +} + +// MarkEventPoliciesFailed marks the EventPoliciesReady condition to False with the given reason and message. +func (s *IntegrationSinkStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) { + IntegrationSinkCondSet.Manage(s).MarkFalse(IntegrationSinkConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +// MarkEventPoliciesUnknown marks the EventPoliciesReady condition to Unknown with the given reason and message. +func (s *IntegrationSinkStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) { + IntegrationSinkCondSet.Manage(s).MarkUnknown(IntegrationSinkConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +// MarkEventPoliciesTrue marks the EventPoliciesReady condition to True. +func (s *IntegrationSinkStatus) MarkEventPoliciesTrue() { + IntegrationSinkCondSet.Manage(s).MarkTrue(IntegrationSinkConditionEventPoliciesReady) +} + +// MarkEventPoliciesTrueWithReason marks the EventPoliciesReady condition to True with the given reason and message. +func (s *IntegrationSinkStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) { + IntegrationSinkCondSet.Manage(s).MarkTrueWithReason(IntegrationSinkConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +func (s *IntegrationSinkStatus) PropagateDeploymentStatus(d *appsv1.DeploymentStatus) { + deploymentAvailableFound := false + for _, cond := range d.Conditions { + if cond.Type == appsv1.DeploymentAvailable { + deploymentAvailableFound = true + if cond.Status == corev1.ConditionTrue { + IntegrationSinkCondSet.Manage(s).MarkTrue(IntegrationSinkConditionDeploymentReady) + } else if cond.Status == corev1.ConditionFalse { + IntegrationSinkCondSet.Manage(s).MarkFalse(IntegrationSinkConditionDeploymentReady, cond.Reason, cond.Message) + } else if cond.Status == corev1.ConditionUnknown { + IntegrationSinkCondSet.Manage(s).MarkUnknown(IntegrationSinkConditionDeploymentReady, cond.Reason, cond.Message) + } + } + } + if !deploymentAvailableFound { + IntegrationSinkCondSet.Manage(s).MarkUnknown(IntegrationSinkConditionDeploymentReady, "DeploymentUnavailable", "The Deployment '%s' is unavailable.", d) + } +} + +func (s *IntegrationSinkStatus) SetAddress(address *duckv1.Addressable) { + s.Address = address + if address == nil || address.URL.IsEmpty() { + IntegrationSinkCondSet.Manage(s).MarkFalse(IntegrationSinkConditionAddressable, "EmptyHostname", "hostname is the empty string") + } else { + IntegrationSinkCondSet.Manage(s).MarkTrue(IntegrationSinkConditionAddressable) + + } +} diff --git a/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/integration_sink_types.go b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/integration_sink_types.go new file mode 100644 index 000000000..5e2dbb46f --- /dev/null +++ b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/integration_sink_types.go @@ -0,0 +1,117 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/eventing/pkg/apis/common/integration/v1alpha1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/kmeta" +) + +// +genclient +// +genreconciler +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +k8s:defaulter-gen=true + +// IntegrationSink is the Schema for the IntegrationSink API. +type IntegrationSink struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec IntegrationSinkSpec `json:"spec,omitempty"` + Status IntegrationSinkStatus `json:"status,omitempty"` +} + +// Check the interfaces that JobSink should be implementing. +var ( + _ runtime.Object = (*IntegrationSink)(nil) + _ kmeta.OwnerRefable = (*IntegrationSink)(nil) + _ apis.Validatable = (*IntegrationSink)(nil) + _ apis.Defaultable = (*IntegrationSink)(nil) + _ apis.HasSpec = (*IntegrationSink)(nil) + _ duckv1.KRShaped = (*IntegrationSink)(nil) + _ apis.Convertible = (*JobSink)(nil) +) + +type IntegrationSinkSpec struct { + Aws *Aws `json:"aws,omitempty"` // AWS source configuration + Log *Log `json:"log,omitempty"` // Log sink configuration +} + +type Log struct { + LoggerName string `json:"loggerName,omitempty" default:"log-sink"` // Name of the logging category to use + Level string `json:"level,omitempty" default:"INFO"` // Logging level to use + LogMask bool `json:"logMask,omitempty" default:"false"` // Mask sensitive information in the log + Marker string `json:"marker,omitempty"` // An optional Marker name to use + Multiline bool `json:"multiline,omitempty" default:"false"` // If enabled, outputs each information on a newline + ShowAllProperties bool `json:"showAllProperties,omitempty" default:"false"` // Show all of the exchange properties (both internal and custom) + ShowBody bool `json:"showBody,omitempty" default:"true"` // Show the message body + ShowBodyType bool `json:"showBodyType,omitempty" default:"true"` // Show the body Java type + ShowExchangePattern bool `json:"showExchangePattern,omitempty" default:"true"` // Show the Message Exchange Pattern (MEP) + ShowHeaders bool `json:"showHeaders,omitempty" default:"false"` // Show the headers received + ShowProperties bool `json:"showProperties,omitempty" default:"false"` // Show the exchange properties (only custom) + ShowStreams bool `json:"showStreams,omitempty" default:"false"` // Show the stream bodies + ShowCachedStreams bool `json:"showCachedStreams,omitempty" default:"true"` // Show cached stream bodies +} + +type Aws struct { + S3 *v1alpha1.AWSS3 `json:"s3,omitempty"` // S3 source configuration + SQS *v1alpha1.AWSSQS `json:"sqs,omitempty"` // SQS source configuration + Auth *v1alpha1.Auth `json:"auth,omitempty"` +} + +type IntegrationSinkStatus struct { + duckv1.Status `json:",inline"` + + // AddressStatus is the part where the JobSink fulfills the Addressable contract. + // It exposes the endpoint as an URI to get events delivered. + // +optional + duckv1.AddressStatus `json:",inline"` + + // AppliedEventPoliciesStatus contains the list of EventPolicies which apply to this JobSink + // +optional + eventingduckv1.AppliedEventPoliciesStatus `json:",inline"` +} + +// GetGroupVersionKind returns the GroupVersionKind. +func (*IntegrationSink) GetGroupVersionKind() schema.GroupVersionKind { + return SchemeGroupVersion.WithKind("IntegrationSink") +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// IntegrationSinkList contains a list of IntegrationSink +type IntegrationSinkList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []IntegrationSink `json:"items"` +} + +// GetUntypedSpec returns the spec of the IntegrationSink. +func (c *IntegrationSink) GetUntypedSpec() interface{} { + return c.Spec +} + +// GetStatus retrieves the status of the IntegrationSink. Implements the KRShaped interface. +func (c *IntegrationSink) GetStatus() *duckv1.Status { + return &c.Status.Status +} diff --git a/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/integration_sink_validation.go b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/integration_sink_validation.go new file mode 100644 index 000000000..c96b83d7d --- /dev/null +++ b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/integration_sink_validation.go @@ -0,0 +1,85 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + + "knative.dev/pkg/apis" +) + +func (sink *IntegrationSink) Validate(ctx context.Context) *apis.FieldError { + ctx = apis.WithinParent(ctx, sink.ObjectMeta) + return sink.Spec.Validate(ctx).ViaField("spec") +} + +func (spec *IntegrationSinkSpec) Validate(ctx context.Context) *apis.FieldError { + var errs *apis.FieldError + + // Count how many fields are set to ensure mutual exclusivity + sinkSetCount := 0 + if spec.Log != nil { + sinkSetCount++ + } + if spec.Aws != nil { + if spec.Aws.S3 != nil { + sinkSetCount++ + } + if spec.Aws.SQS != nil { + sinkSetCount++ + } + } + + // Validate that only one sink field is set + if sinkSetCount > 1 { + errs = errs.Also(apis.ErrGeneric("only one sink type can be set", "spec")) + } else if sinkSetCount == 0 { + errs = errs.Also(apis.ErrGeneric("at least one sink type must be specified", "spec")) + } + + // Only perform AWS-specific validation if exactly one AWS sink is configured + if sinkSetCount == 1 && spec.Aws != nil { + if spec.Aws.S3 != nil || spec.Aws.SQS != nil { + // Check that AWS Auth is properly configured + if !spec.Aws.Auth.HasAuth() { + errs = errs.Also(apis.ErrMissingField("aws.auth.secret.ref.name")) + } + } + + // Additional validation for AWS S3 required fields + if spec.Aws.S3 != nil { + if spec.Aws.S3.Arn == "" { + errs = errs.Also(apis.ErrMissingField("aws.s3.bucketNameOrArn")) + } + if spec.Aws.S3.Region == "" { + errs = errs.Also(apis.ErrMissingField("aws.s3.region")) + } + } + + // Additional validation for AWS SQS required fields + if spec.Aws.SQS != nil { + if spec.Aws.SQS.Arn == "" { + errs = errs.Also(apis.ErrMissingField("aws.sqs.queueNameOrArn")) + } + if spec.Aws.SQS.Region == "" { + errs = errs.Also(apis.ErrMissingField("aws.sqs.region")) + } + } + } + + return errs +} diff --git a/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/job_sink_defaults.go b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/job_sink_defaults.go index 13f62e868..3bd18fbf3 100644 --- a/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/job_sink_defaults.go +++ b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/job_sink_defaults.go @@ -18,7 +18,48 @@ package v1alpha1 import ( "context" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" ) func (sink *JobSink) SetDefaults(ctx context.Context) { + if sink.Spec.Job != nil { + setBatchJobDefaults(sink.Spec.Job) + } +} + +func setBatchJobDefaults(job *batchv1.Job) { + for i := range job.Spec.Template.Spec.Containers { + executionModeFound := false + for j := range job.Spec.Template.Spec.Containers[i].Env { + if job.Spec.Template.Spec.Containers[i].Env[j].Name == ExecutionModeEnvVar { + executionModeFound = true + break + } + } + if executionModeFound { + continue + } + job.Spec.Template.Spec.Containers[i].Env = append(job.Spec.Template.Spec.Containers[i].Env, corev1.EnvVar{ + Name: ExecutionModeEnvVar, + Value: string(ExecutionModeBatch), + }) + } + for i := range job.Spec.Template.Spec.InitContainers { + executionModeFound := false + for j := range job.Spec.Template.Spec.InitContainers[i].Env { + if job.Spec.Template.Spec.InitContainers[i].Env[j].Name == ExecutionModeEnvVar { + executionModeFound = true + break + } + } + if executionModeFound { + continue + } + job.Spec.Template.Spec.InitContainers[i].Env = append(job.Spec.Template.Spec.InitContainers[i].Env, corev1.EnvVar{ + Name: ExecutionModeEnvVar, + Value: string(ExecutionModeBatch), + }) + } } diff --git a/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/job_sink_types.go b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/job_sink_types.go index 857e9f79a..da69766bd 100644 --- a/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/job_sink_types.go +++ b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/job_sink_types.go @@ -22,9 +22,20 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" + + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" +) + +const ( + ExecutionModeEnvVar = "KNATIVE_EXECUTION_MODE" +) + +type ExecutionMode string + +const ( + ExecutionModeBatch ExecutionMode = "batch" ) // +genclient diff --git a/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/register.go b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/register.go index 827ebc28b..89e4f3fbe 100644 --- a/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/register.go +++ b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/register.go @@ -47,6 +47,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, &JobSink{}, &JobSinkList{}, + &IntegrationSink{}, + &IntegrationSinkList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go index 58c9fdfaf..3dff23adf 100644 --- a/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go +++ b/vendor/knative.dev/eventing/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go @@ -24,8 +24,146 @@ package v1alpha1 import ( v1 "k8s.io/api/batch/v1" runtime "k8s.io/apimachinery/pkg/runtime" + integrationv1alpha1 "knative.dev/eventing/pkg/apis/common/integration/v1alpha1" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Aws) DeepCopyInto(out *Aws) { + *out = *in + if in.S3 != nil { + in, out := &in.S3, &out.S3 + *out = new(integrationv1alpha1.AWSS3) + **out = **in + } + if in.SQS != nil { + in, out := &in.SQS, &out.SQS + *out = new(integrationv1alpha1.AWSSQS) + **out = **in + } + if in.Auth != nil { + in, out := &in.Auth, &out.Auth + *out = new(integrationv1alpha1.Auth) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Aws. +func (in *Aws) DeepCopy() *Aws { + if in == nil { + return nil + } + out := new(Aws) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IntegrationSink) DeepCopyInto(out *IntegrationSink) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IntegrationSink. +func (in *IntegrationSink) DeepCopy() *IntegrationSink { + if in == nil { + return nil + } + out := new(IntegrationSink) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *IntegrationSink) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IntegrationSinkList) DeepCopyInto(out *IntegrationSinkList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]IntegrationSink, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IntegrationSinkList. +func (in *IntegrationSinkList) DeepCopy() *IntegrationSinkList { + if in == nil { + return nil + } + out := new(IntegrationSinkList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *IntegrationSinkList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IntegrationSinkSpec) DeepCopyInto(out *IntegrationSinkSpec) { + *out = *in + if in.Aws != nil { + in, out := &in.Aws, &out.Aws + *out = new(Aws) + (*in).DeepCopyInto(*out) + } + if in.Log != nil { + in, out := &in.Log, &out.Log + *out = new(Log) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IntegrationSinkSpec. +func (in *IntegrationSinkSpec) DeepCopy() *IntegrationSinkSpec { + if in == nil { + return nil + } + out := new(IntegrationSinkSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IntegrationSinkStatus) DeepCopyInto(out *IntegrationSinkStatus) { + *out = *in + in.Status.DeepCopyInto(&out.Status) + in.AddressStatus.DeepCopyInto(&out.AddressStatus) + in.AppliedEventPoliciesStatus.DeepCopyInto(&out.AppliedEventPoliciesStatus) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IntegrationSinkStatus. +func (in *IntegrationSinkStatus) DeepCopy() *IntegrationSinkStatus { + if in == nil { + return nil + } + out := new(IntegrationSinkStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *JobSink) DeepCopyInto(out *JobSink) { *out = *in @@ -143,3 +281,19 @@ func (in *JobStatus) DeepCopy() *JobStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Log) DeepCopyInto(out *Log) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Log. +func (in *Log) DeepCopy() *Log { + if in == nil { + return nil + } + out := new(Log) + in.DeepCopyInto(out) + return out +} diff --git a/vendor/knative.dev/eventing/pkg/client/clientset/versioned/typed/sinks/v1alpha1/generated_expansion.go b/vendor/knative.dev/eventing/pkg/client/clientset/versioned/typed/sinks/v1alpha1/generated_expansion.go index 029b7bee8..b444c4460 100644 --- a/vendor/knative.dev/eventing/pkg/client/clientset/versioned/typed/sinks/v1alpha1/generated_expansion.go +++ b/vendor/knative.dev/eventing/pkg/client/clientset/versioned/typed/sinks/v1alpha1/generated_expansion.go @@ -18,4 +18,6 @@ limitations under the License. package v1alpha1 +type IntegrationSinkExpansion interface{} + type JobSinkExpansion interface{} diff --git a/vendor/knative.dev/eventing/pkg/client/clientset/versioned/typed/sinks/v1alpha1/integrationsink.go b/vendor/knative.dev/eventing/pkg/client/clientset/versioned/typed/sinks/v1alpha1/integrationsink.go new file mode 100644 index 000000000..3e75e3aed --- /dev/null +++ b/vendor/knative.dev/eventing/pkg/client/clientset/versioned/typed/sinks/v1alpha1/integrationsink.go @@ -0,0 +1,195 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + "time" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" + v1alpha1 "knative.dev/eventing/pkg/apis/sinks/v1alpha1" + scheme "knative.dev/eventing/pkg/client/clientset/versioned/scheme" +) + +// IntegrationSinksGetter has a method to return a IntegrationSinkInterface. +// A group's client should implement this interface. +type IntegrationSinksGetter interface { + IntegrationSinks(namespace string) IntegrationSinkInterface +} + +// IntegrationSinkInterface has methods to work with IntegrationSink resources. +type IntegrationSinkInterface interface { + Create(ctx context.Context, integrationSink *v1alpha1.IntegrationSink, opts v1.CreateOptions) (*v1alpha1.IntegrationSink, error) + Update(ctx context.Context, integrationSink *v1alpha1.IntegrationSink, opts v1.UpdateOptions) (*v1alpha1.IntegrationSink, error) + UpdateStatus(ctx context.Context, integrationSink *v1alpha1.IntegrationSink, opts v1.UpdateOptions) (*v1alpha1.IntegrationSink, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.IntegrationSink, error) + List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.IntegrationSinkList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.IntegrationSink, err error) + IntegrationSinkExpansion +} + +// integrationSinks implements IntegrationSinkInterface +type integrationSinks struct { + client rest.Interface + ns string +} + +// newIntegrationSinks returns a IntegrationSinks +func newIntegrationSinks(c *SinksV1alpha1Client, namespace string) *integrationSinks { + return &integrationSinks{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the integrationSink, and returns the corresponding integrationSink object, and an error if there is any. +func (c *integrationSinks) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.IntegrationSink, err error) { + result = &v1alpha1.IntegrationSink{} + err = c.client.Get(). + Namespace(c.ns). + Resource("integrationsinks"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of IntegrationSinks that match those selectors. +func (c *integrationSinks) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.IntegrationSinkList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.IntegrationSinkList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("integrationsinks"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested integrationSinks. +func (c *integrationSinks) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("integrationsinks"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a integrationSink and creates it. Returns the server's representation of the integrationSink, and an error, if there is any. +func (c *integrationSinks) Create(ctx context.Context, integrationSink *v1alpha1.IntegrationSink, opts v1.CreateOptions) (result *v1alpha1.IntegrationSink, err error) { + result = &v1alpha1.IntegrationSink{} + err = c.client.Post(). + Namespace(c.ns). + Resource("integrationsinks"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(integrationSink). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a integrationSink and updates it. Returns the server's representation of the integrationSink, and an error, if there is any. +func (c *integrationSinks) Update(ctx context.Context, integrationSink *v1alpha1.IntegrationSink, opts v1.UpdateOptions) (result *v1alpha1.IntegrationSink, err error) { + result = &v1alpha1.IntegrationSink{} + err = c.client.Put(). + Namespace(c.ns). + Resource("integrationsinks"). + Name(integrationSink.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(integrationSink). + Do(ctx). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *integrationSinks) UpdateStatus(ctx context.Context, integrationSink *v1alpha1.IntegrationSink, opts v1.UpdateOptions) (result *v1alpha1.IntegrationSink, err error) { + result = &v1alpha1.IntegrationSink{} + err = c.client.Put(). + Namespace(c.ns). + Resource("integrationsinks"). + Name(integrationSink.Name). + SubResource("status"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(integrationSink). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the integrationSink and deletes it. Returns an error if one occurs. +func (c *integrationSinks) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("integrationsinks"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *integrationSinks) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Namespace(c.ns). + Resource("integrationsinks"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched integrationSink. +func (c *integrationSinks) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.IntegrationSink, err error) { + result = &v1alpha1.IntegrationSink{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("integrationsinks"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/vendor/knative.dev/eventing/pkg/client/clientset/versioned/typed/sinks/v1alpha1/sinks_client.go b/vendor/knative.dev/eventing/pkg/client/clientset/versioned/typed/sinks/v1alpha1/sinks_client.go index 2012a8f57..9d394dfb0 100644 --- a/vendor/knative.dev/eventing/pkg/client/clientset/versioned/typed/sinks/v1alpha1/sinks_client.go +++ b/vendor/knative.dev/eventing/pkg/client/clientset/versioned/typed/sinks/v1alpha1/sinks_client.go @@ -28,6 +28,7 @@ import ( type SinksV1alpha1Interface interface { RESTClient() rest.Interface + IntegrationSinksGetter JobSinksGetter } @@ -36,6 +37,10 @@ type SinksV1alpha1Client struct { restClient rest.Interface } +func (c *SinksV1alpha1Client) IntegrationSinks(namespace string) IntegrationSinkInterface { + return newIntegrationSinks(c, namespace) +} + func (c *SinksV1alpha1Client) JobSinks(namespace string) JobSinkInterface { return newJobSinks(c, namespace) } diff --git a/vendor/knative.dev/eventing/pkg/eventingtls/eventingtls.go b/vendor/knative.dev/eventing/pkg/eventingtls/eventingtls.go index 718a744c1..eedea290d 100644 --- a/vendor/knative.dev/eventing/pkg/eventingtls/eventingtls.go +++ b/vendor/knative.dev/eventing/pkg/eventingtls/eventingtls.go @@ -57,6 +57,9 @@ const ( BrokerFilterServerTLSSecretName = "mt-broker-filter-server-tls" //nolint:gosec // This is not a hardcoded credential // BrokerIngressServerTLSSecretName is the name of the tls secret for the broker ingress server BrokerIngressServerTLSSecretName = "mt-broker-ingress-server-tls" //nolint:gosec // This is not a hardcoded credential + + // IntegrationSinkDispatcherServerTLSSecretName is the name of the tls secret for the integration sink dispatcher server + IntegrationSinkDispatcherServerTLSSecretName = "integration-sink-server-tls" //nolint:gosec // This is not a hardcoded credential ) type ClientConfig struct { diff --git a/vendor/modules.txt b/vendor/modules.txt index 9511c0be5..51624e17c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1368,7 +1368,7 @@ k8s.io/utils/trace ## explicit; go 1.22.7 knative.dev/caching/pkg/apis/caching knative.dev/caching/pkg/apis/caching/v1alpha1 -# knative.dev/eventing v0.43.1-0.20241121083601-5ad7dabebd01 +# knative.dev/eventing v0.43.1-0.20241126125511-65da6fc77cd4 ## explicit; go 1.22.7 knative.dev/eventing/cmd/heartbeats knative.dev/eventing/pkg/apis @@ -1558,8 +1558,8 @@ knative.dev/pkg/webhook knative.dev/pkg/webhook/certificates knative.dev/pkg/webhook/certificates/resources knative.dev/pkg/webhook/resourcesemantics/conversion -# knative.dev/reconciler-test v0.0.0-20241106013737-0619dc3ecbcf -## explicit; go 1.22.0 +# knative.dev/reconciler-test v0.0.0-20241126141613-2f0e67f51c46 +## explicit; go 1.22.7 knative.dev/reconciler-test/cmd/eventshub knative.dev/reconciler-test/pkg/environment knative.dev/reconciler-test/pkg/eventshub