Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/NifiConnection] Implementation of NifiConnection controller #168

Closed
wants to merge 62 commits into from
Closed
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
71b9ff7
Add NifiConnection resource skeleton
juldrixx Jun 29, 2022
fa45165
Comment line
juldrixx Jun 29, 2022
c358332
Manage GetDataflowComponentInformation
juldrixx Jun 30, 2022
3aed6d6
Add cluster verification
juldrixx Jun 30, 2022
9ed7408
Add cluster connection
juldrixx Jun 30, 2022
a5aaef6
Add connection configuration
juldrixx Jul 1, 2022
2575348
Add finalizer, status management and start label management
juldrixx Jul 25, 2022
954c387
Add stop-component label management
juldrixx Jul 27, 2022
b3b56d8
Update port stopping feature
juldrixx Aug 1, 2022
003b8b3
Add function to check if connection exists
juldrixx Aug 1, 2022
9858215
Add annotations to store last nificluster applied
juldrixx Aug 20, 2022
45cca04
-
juldrixx Aug 25, 2022
65dee6b
Rebase on master
juldrixx Aug 25, 2022
06df7fc
Fix issues following rebase
juldrixx Aug 25, 2022
f7451e0
Add capability to identify if an input connection is active
juldrixx Sep 16, 2022
ee079b0
Add capability to update the connection configuration
juldrixx Sep 16, 2022
99b6750
Add update connection destination
juldrixx Sep 16, 2022
4669b88
[WIP] Add update connection destination && update connection source
juldrixx Sep 16, 2022
5fd01cb
WIP update source and destination
juldrixx Sep 17, 2022
2e71ad2
Merge branch 'master' into feature/nificonnection_resource
juldrixx Sep 17, 2022
1b36eff
Merge branch 'master' into feature/nificonnection_resource
juldrixx Sep 17, 2022
1f90a39
Upgrade nigoapi & Add labelIndex & Manage update source/destination i…
juldrixx Sep 17, 2022
820bd44
Remove old code and fix state
juldrixx Sep 17, 2022
a6616e9
Add drop queue flowfiles if drop strategy on connection and dataflow
juldrixx Sep 18, 2022
703587b
Unify kubebuilder validation
juldrixx Sep 18, 2022
4c2f384
Fix DropRequest management
juldrixx Sep 18, 2022
b87082f
Add finilizer deleting
juldrixx Sep 18, 2022
8eb7834
Save clusterRef after creation
juldrixx Sep 19, 2022
d04ae35
Unify finalizer label management
juldrixx Sep 19, 2022
e2eff09
Manage clusterRef changment connection & Modify all update to patch i…
juldrixx Sep 19, 2022
4dec565
Add abilty to force start/stop of dataflow & Update connection deleti…
juldrixx Sep 20, 2022
c3ca1b9
Add comments
juldrixx Sep 21, 2022
b4012a5
Remove debug modifications
juldrixx Sep 21, 2022
f59cd28
Add test getConnection
juldrixx Sep 21, 2022
a658221
Add unit tests for connection
juldrixx Sep 21, 2022
c9575c7
Patch connection creation & add units tests for input-port/output-port
juldrixx Sep 21, 2022
292a100
Add NifiConnection documentation
juldrixx Sep 21, 2022
0259d4d
Update plugin and documentation on it
juldrixx Sep 21, 2022
33062aa
Regenerate helm charts and crds
juldrixx Sep 21, 2022
1ba3caf
Update changelog
juldrixx Sep 21, 2022
768e7b9
Fix changelog
juldrixx Sep 21, 2022
8245911
Fix plugin doc
juldrixx Sep 22, 2022
0dc9b44
Add unit tests and remove license on plugin
juldrixx Sep 22, 2022
2d64957
Rebase on master and fix doc
juldrixx Sep 26, 2022
d4389cf
Remove .config folder from vscode
juldrixx Sep 26, 2022
3379148
Refactoring plugin in golang
juldrixx Sep 26, 2022
34d31eb
Add NifiConnection sample
juldrixx Oct 2, 2022
f51fdb8
Update changelog
juldrixx Oct 2, 2022
6cff180
Add NifiConnection RBAC in Helm Chart
juldrixx Oct 2, 2022
0b9af08
Add example of NifiConnection
juldrixx Oct 3, 2022
5609fca
Change default value with kubebuilder
juldrixx Oct 16, 2022
b752508
Merge branch 'master' into feature/nificonnection_resource
juldrixx Oct 27, 2022
54635db
Rebase
juldrixx Oct 27, 2022
e3ca110
Rebase on master
juldrixx Nov 21, 2022
0dd29e7
fix/tests
juldrixx Nov 22, 2022
9630691
Merge branch 'master' into feature/nificonnection_resource
juldrixx Feb 21, 2023
6e477cd
Rebase on master
juldrixx Apr 25, 2023
3ac605f
Rebase on master
juldrixx Aug 28, 2023
5327219
Fix go.sum
juldrixx Aug 28, 2023
a6f2026
Update CHANGELOG
juldrixx Aug 28, 2023
2475d3d
Fixes for go 1.21
juldrixx Aug 28, 2023
fbd2357
Fix tests
juldrixx Aug 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

### Added

- [PR #168](https://github.com/konpyutaika/nifikop/pull/168) - **[Operator/NifiConnection]** Implementation on NifiConnection controller.
juldrixx marked this conversation as resolved.
Show resolved Hide resolved

### Changed

- [PR #160](https://github.com/konpyutaika/nifikop/pull/160) - **[Documentation]** Upgrade documentation dependencies.
Expand Down
9 changes: 9 additions & 0 deletions PROJECT
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,13 @@ resources:
kind: NifiNodeGroupAutoscaler
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
namespaced: true
controller: true
domain: konpyutaika.com
group: nifi
kind: NifiConnection
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
version: "3"
98 changes: 94 additions & 4 deletions api/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ import (
// DataflowState defines the state of a NifiDataflow
type DataflowState string

// ConnectionState defines the state of a NifiConnection
type ConnectionState string

// DataflowUpdateRequestType defines the type of versioned flow update request
type DataflowUpdateRequestType string

// DataflowUpdateStrategy defines the type of strategy to update a flow
type DataflowUpdateStrategy string
// ComponentUpdateStrategy defines the type of strategy to update a component
// +kubebuilder:validation:Enum={"drop","drain"}
type ComponentUpdateStrategy string

// RackAwarenessState stores info about rack awareness status
type RackAwarenessState string
Expand Down Expand Up @@ -46,12 +50,15 @@ type ConfigurationState string
type InitClusterNode bool

// PKIBackend represents an interface implementing the PKIManager
// +kubebuilder:validation:Enum={"cert-manager","vault"}
type PKIBackend string

// ClientConfigType represents an interface implementing the ClientConfigManager
// +kubebuilder:validation:Enum={"tls","basic"}
type ClientConfigType string

// ClusterType represents an interface implementing the ClientConfigManager
// +kubebuilder:validation:Enum={"external","internal"}
type ClusterType string

// AccessPolicyType represents the type of access policy
Expand Down Expand Up @@ -278,16 +285,23 @@ const (
// DataflowStateInSync describes the status of a NifiDataflow as in sync
DataflowStateInSync DataflowState = "InSync"

// ConnectionStateOutOfSync describes the status of a NifiConnection as out of sync
ConnectionStateOutOfSync ConnectionState = "OutOfSync"
// ConnectionStateInSync describes the status of a NifiConnection as in sync
ConnectionStateInSync ConnectionState = "InSync"
// ConnectionStateCreated describes the status of a NifiConnection as created
ConnectionStateCreated ConnectionState = "Created"

// RevertRequestType defines a revert changes request.
RevertRequestType DataflowUpdateRequestType = "Revert"
// UpdateRequestType defines an update version request.
UpdateRequestType DataflowUpdateRequestType = "Update"

// DrainStrategy leads to shutting down only input components (Input processors, remote input process group)
// and dropping all flowfiles from the flow.
DrainStrategy DataflowUpdateStrategy = "drain"
DrainStrategy ComponentUpdateStrategy = "drain"
// DropStrategy leads to shutting down all components and dropping all flowfiles from the flow.
DropStrategy DataflowUpdateStrategy = "drop"
DropStrategy ComponentUpdateStrategy = "drop"

// UserStateCreated describes the status of a NifiUser as created
UserStateCreated UserState = "created"
Expand Down Expand Up @@ -455,10 +469,86 @@ func SecretRefsEquals(secretRefs []SecretReference) bool {
return true
}

func ComponentRefsEquals(componentRefs []ComponentReference) bool {
c1 := componentRefs[0]
name := c1.Name
ns := c1.Namespace

for _, component := range componentRefs {
if name != component.Name || ns != component.Namespace || ns != string(component.Type) || ns != component.SubName {
return false
}
}

return true
}

// +kubebuilder:validation:Enum={"never","always","once"}
type DataflowSyncMode string

const (
SyncNever DataflowSyncMode = "never"
SyncOnce DataflowSyncMode = "once"
SyncAlways DataflowSyncMode = "always"
)

// Change the list to {"dataflow","input-port","output-port","processor","process-group"} when all the type are available
// +kubebuilder:validation:Enum={"dataflow"}
type ComponentType string

const (
ComponentDataflow ComponentType = "dataflow"
ComponentInputPort ComponentType = "input-port"
ComponentOutputPort ComponentType = "output-port"
ComponentProcessor ComponentType = "processor"
ComponentFunnel ComponentType = "funnel"
ComponentProcessGroup ComponentType = "process-group"
)

type ComponentInformation struct {
Id string `json:"id"`
GroupId string `json:"groupId"`
Type string `json:"type"`
ParentGroupId string `json:"parentGroupId"`
ClusterRef ClusterReference `json:"clusterRef"`
}

// +kubebuilder:validation:Enum={"DO_NOT_LOAD_BALANCE","PARTITION_BY_ATTRIBUTE","ROUND_ROBIN","SINGLE"}
type ConnectionLoadBalanceStrategy string

const (
// Do not load balance FlowFiles between nodes in the cluster.
StrategyDoNotLoadBalance ConnectionLoadBalanceStrategy = "DO_NOT_LOAD_BALANCE"
// Determine which node to send a given FlowFile to based on the value of a user-specified FlowFile Attribute. All FlowFiles that have the same value for said Attribute will be sent to the same node in the cluster.
StrategyPartitionByAttribute ConnectionLoadBalanceStrategy = "PARTITION_BY_ATTRIBUTE"
// FlowFiles will be distributed to nodes in the cluster in a Round-Robin fashion. However, if a node in the cluster is not able to receive data as fast as other nodes, that node may be skipped in one or more iterations in order to maximize throughput of data distribution across the cluster.
StrategyRoundRobin ConnectionLoadBalanceStrategy = "ROUND_ROBIN"
// All FlowFiles will be sent to the same node. Which node they are sent to is not defined.
StrategySingle ConnectionLoadBalanceStrategy = "SINGLE"
)

// +kubebuilder:validation:Enum={"DO_NOT_COMPRESS","COMPRESS_ATTRIBUTES_ONLY","COMPRESS_ATTRIBUTES_AND_CONTENT"}
type ConnectionLoadBalanceCompression string

const (
// FlowFiles will not be compressed.
CompressionDoNotCompress ConnectionLoadBalanceCompression = "DO_NOT_COMPRESS"
// FlowFiles' attributes will be compressed, but the FlowFiles' contents will not be
CompressionCompressAttributesOnly ConnectionLoadBalanceCompression = "COMPRESS_ATTRIBUTES_ONLY"
// FlowFiles' attributes and content will be compressed
CompressionCompressAttributesAndContent ConnectionLoadBalanceCompression = "COMPRESS_ATTRIBUTES_AND_CONTENT"
)

// +kubebuilder:validation:Enum={"FirstInFirstOutPrioritizer","NewestFlowFileFirstPrioritizer","OldestFlowFileFirstPrioritizer","PriorityAttributePrioritizer"}
type ConnectionPrioritizer string

const (
// Given two FlowFiles, the one that reached the connection first will be processed first.
PrioritizerFirstInFirstOutPrioritizer ConnectionPrioritizer = "FirstInFirstOutPrioritizer"
// Given two FlowFiles, the one that is newest in the dataflow will be processed first.
PrioritizerNewestFlowFileFirstPrioritizer ConnectionPrioritizer = "NewestFlowFileFirstPrioritizer"
// Given two FlowFiles, the one that is oldest in the dataflow will be processed first. 'This is the default scheme that is used if no prioritizers are selected'.
PrioritizerOldestFlowFileFirstPrioritizer ConnectionPrioritizer = "OldestFlowFileFirstPrioritizer"
// Given two FlowFiles, an attribute called “priority” will be extracted. The one that has the lowest priority value will be processed first.
PrioritizerPriorityAttributePrioritizer ConnectionPrioritizer = "PriorityAttributePrioritizer"
)
3 changes: 0 additions & 3 deletions api/v1alpha1/nificluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ const (
// NifiClusterSpec defines the desired state of NifiCluster
type NifiClusterSpec struct {
// clientType defines if the operator will use basic or tls authentication to query the NiFi cluster.
// +kubebuilder:validation:Enum={"tls","basic"}
juldrixx marked this conversation as resolved.
Show resolved Hide resolved
ClientType ClientConfigType `json:"clientType,omitempty"`
// type defines if the cluster is internal (i.e manager by the operator) or external.
// +kubebuilder:validation:Enum={"external","internal"}
juldrixx marked this conversation as resolved.
Show resolved Hide resolved
Type ClusterType `json:"type,omitempty"`
// nodeURITemplate used to dynamically compute node uri (used if external type)
NodeURITemplate string `json:"nodeURITemplate,omitempty"`
Expand Down Expand Up @@ -374,7 +372,6 @@ type SSLSecrets struct {
// https://cert-manager.io/docs/concepts/issuer/
IssuerRef *cmmeta.ObjectReference `json:"issuerRef,omitempty"`
// TODO : add vault
// +kubebuilder:validation:Enum={"cert-manager","vault"}
juldrixx marked this conversation as resolved.
Show resolved Hide resolved
PKIBackend PKIBackend `json:"pkiBackend,omitempty"`
//,"vault"
}
Expand Down
167 changes: 167 additions & 0 deletions api/v1alpha1/nificonnection_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

// NifiConnectionSpec defines the desired state of NifiConnection
type NifiConnectionSpec struct {
// the Source component of the connection.
Source ComponentReference `json:"source"`
// the Destination component of the connection.
Destination ComponentReference `json:"destination"`
// the Configuration of the connection.
Configuration ConnectionConfiguration `json:"configuration,omitempty"`
// describes the way the operator will deal with data when a connection will be updated : drop or drain.
UpdateStrategy ComponentUpdateStrategy `json:"updateStrategy"`
}

type ComponentReference struct {
// the name of the component.
Name string `json:"name"`
// the namespace of the component.
Namespace string `json:"namespace,omitempty"`
// the type of the component (e.g. nifidataflow).
Type ComponentType `json:"type"`
// the name of the sub component (e.g. queue or port name).
SubName string `json:"subName,omitempty"`
}

type ConnectionConfiguration struct {
// the maximum amount of time an object may be in the flow before it will be automatically aged out of the flow.
FlowFileExpiration string `json:"flowFileExpiration,omitempty"`
// the maximum data size of objects that can be queued before back pressure is applied.
BackPressureDataSizeThreshold string `json:"backPressureDataSizeThreshold,omitempty"`
// the maximum number of objects that can be queued before back pressure is applied.
BackPressureObjectThreshold *int64 `json:"backPressureObjectThreshold,omitempty"`
// how to load balance the data in this Connection across the nodes in the cluster.
LoadBalanceStrategy ConnectionLoadBalanceStrategy `json:"loadBalanceStrategy,omitempty"`
// the FlowFile Attribute to use for determining which node a FlowFile will go to.
LoadBalancePartitionAttribute string `json:"loadBalancePartitionAttribute,omitempty"`
// whether or not data should be compressed when being transferred between nodes in the cluster.
LoadBalanceCompression ConnectionLoadBalanceCompression `json:"loadBalanceCompression,omitempty"`
// the comparators used to prioritize the queue.
Prioritizers []ConnectionPrioritizer `json:"prioritizers,omitempty"`
// the index of the bend point where to place the connection label.
LabelIndex *int32 `json:"labelIndex,omitempty"`
// the bend points on the connection.
Bends []ConnectionBend `json:"bends,omitempty"`
}

type ConnectionBend struct {
// The x coordinate.
X *int64 `json:"posX,omitempty"`
// The y coordinate.
Y *int64 `json:"posY,omitempty"`
}

// NifiConnectionStatus defines the observed state of NifiConnection
type NifiConnectionStatus struct {
// connection ID.
ConnectionId string `json:"connectionID"`
// connection current state.
State ConnectionState `json:"state"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

// NifiConnection is the Schema for the nificonnections API
type NifiConnection struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec NifiConnectionSpec `json:"spec,omitempty"`
Status NifiConnectionStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// NifiConnectionList contains a list of NifiConnection
type NifiConnectionList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []NifiConnection `json:"items"`
}

func init() {
SchemeBuilder.Register(&NifiConnection{}, &NifiConnectionList{})
}

func (nCon *NifiConnectionSpec) IsValid() bool {
return nCon.Source.IsValid() && nCon.Destination.IsValid() && nCon.Configuration.IsValid()
}

func (ref *ComponentReference) IsValid() bool {
return ref.Type == ComponentDataflow && ref.SubName != ""
}

func (conf *ConnectionConfiguration) IsValid() bool {
if conf.GetLoadBalanceStrategy() == StrategyPartitionByAttribute && len(conf.GetLoadBalancePartitionAttribute()) == 0 {
return false
}
return true
}

func (conf *ConnectionConfiguration) GetFlowFileExpiration() string {
return conf.FlowFileExpiration
}

func (conf *ConnectionConfiguration) GetBackPressureDataSizeThreshold() string {
if len(conf.BackPressureDataSizeThreshold) > 0 {
return conf.BackPressureDataSizeThreshold
}
return "1 GB"
}

func (conf *ConnectionConfiguration) GetBackPressureObjectThreshold() int64 {
if conf.BackPressureObjectThreshold != nil {
return *conf.BackPressureObjectThreshold
}
return 10000
}

func (conf *ConnectionConfiguration) GetLoadBalanceStrategy() ConnectionLoadBalanceStrategy {
if len(conf.LoadBalanceStrategy) > 0 {
return conf.LoadBalanceStrategy
}
return StrategyDoNotLoadBalance
}

func (conf *ConnectionConfiguration) GetLoadBalancePartitionAttribute() string {
return conf.LoadBalancePartitionAttribute
}

func (conf *ConnectionConfiguration) GetLoadBalanceCompression() ConnectionLoadBalanceCompression {
if len(conf.LoadBalanceCompression) > 0 {
return conf.LoadBalanceCompression
}
return CompressionDoNotCompress
}

func (conf *ConnectionConfiguration) GetPrioritizers() []ConnectionPrioritizer {
return conf.Prioritizers
}

func (conf *ConnectionConfiguration) GetStringPrioritizers() []string {
var prefix string = "org.apache.nifi.prioritizer."
prioritizers := []string{}
for _, prioritizer := range conf.Prioritizers {
prioritizers = append(prioritizers, prefix+string(prioritizer))
}
return prioritizers
}

func (conf *ConnectionConfiguration) GetLabelIndex() int32 {
if conf.LabelIndex != nil {
return *conf.LabelIndex
}
return 0
}

func (conf *ConnectionConfiguration) GetBends() []ConnectionBend {
return conf.Bends
}
4 changes: 1 addition & 3 deletions api/v1alpha1/nifidataflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type NifiDataflowSpec struct {
// contains the reference to the ParameterContext with the one the dataflow is linked.
ParameterContextRef *ParameterContextReference `json:"parameterContextRef,omitempty"`
// if the flow will be synchronized once, continuously or never
// +kubebuilder:validation:Enum={"never","always","once"}
juldrixx marked this conversation as resolved.
Show resolved Hide resolved
SyncMode *DataflowSyncMode `json:"syncMode,omitempty"`
// whether the flow is considered as ran if some controller services are still invalid or not.
SkipInvalidControllerService bool `json:"skipInvalidControllerService,omitempty"`
Expand All @@ -33,8 +32,7 @@ type NifiDataflowSpec struct {
// contains the reference to the NifiRegistry with the one the dataflow is linked.
RegistryClientRef *RegistryClientReference `json:"registryClientRef,omitempty"`
// describes the way the operator will deal with data when a dataflow will be updated : drop or drain
// +kubebuilder:validation:Enum={"drop","drain"}
juldrixx marked this conversation as resolved.
Show resolved Hide resolved
UpdateStrategy DataflowUpdateStrategy `json:"updateStrategy"`
UpdateStrategy ComponentUpdateStrategy `json:"updateStrategy"`
}

type FlowPosition struct {
Expand Down
Loading