Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/NifiConnection] Add NifiConnection controller and kubectl plugin #291

Merged
merged 2 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ Session.vim
tags
### VisualStudioCode ###
.vscode/*
.history
.config/*
.history
__debug_bin*
# End of https://www.gitignore.io/api/go,vim,emacs,visualstudiocode

Expand Down
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
- [PR #292](https://github.com/konpyutaika/nifikop/pull/292) - **[Operator/NifiCluster]** Modify RBAC kubebuilder annotations so NiFiKop works on OpenShift
- [PR #292](https://github.com/konpyutaika/nifikop/pull/292) - **[Helm Chart]** Add Parameter for RunAsUser for OpenShift

- [PR #291](https://github.com/konpyutaika/nifikop/pull/291) - **[Plugin]** Implementation on NiFiKop's plugin.
- [PR #291](https://github.com/konpyutaika/nifikop/pull/291) - **[Operator/NifiConnection]** Implementation on NifiConnection controller.
-
### Changed

- [PR #290](https://github.com/konpyutaika/nifikop/pull/290) - **[Operator/NifiCluster]** Change default sensitive algorithm
- [PR #290](https://github.com/konpyutaika/nifikop/pull/290) - **[Operator/NifiCluster]** Change default sensitive algorithm.

### Fixed Bugs

Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,8 @@ catalog-build: opm ## Build a catalog image.
# Push the catalog image.
.PHONY: catalog-push
catalog-push: ## Push a catalog image.
$(MAKE) docker-push IMG=$(CATALOG_IMG)
$(MAKE) docker-push IMG=$(CATALOG_IMG)

.PHONY: kubectl-nifikop
kubectl-nifikop:
go build -o bin/kubectl-nifikop ./cmd/kubectl-nifikop/main.go
9 changes: 9 additions & 0 deletions PROJECT
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,13 @@ resources:
kind: NifiNodeGroupAutoscaler
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
namespaced: true
controller: true
domain: konpyutaika.com
group: nifi
kind: NifiConnection
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
version: "3"
13 changes: 9 additions & 4 deletions api/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ type DataflowState string
// DataflowUpdateRequestType defines the type of versioned flow update request
type DataflowUpdateRequestType string

// DataflowUpdateStrategy defines the type of strategy to update a flow
type DataflowUpdateStrategy string
// ComponentUpdateStrategy defines the type of strategy to update a component
// +kubebuilder:validation:Enum={"drop","drain"}
type ComponentUpdateStrategy string

// RackAwarenessState stores info about rack awareness status
type RackAwarenessState string
Expand All @@ -46,12 +47,15 @@ type ConfigurationState string
type InitClusterNode bool

// PKIBackend represents an interface implementing the PKIManager
// +kubebuilder:validation:Enum={"cert-manager","vault"}
type PKIBackend string

// ClientConfigType represents an interface implementing the ClientConfigManager
// +kubebuilder:validation:Enum={"tls","basic"}
type ClientConfigType string

// ClusterType represents an interface implementing the ClientConfigManager
// +kubebuilder:validation:Enum={"external","internal"}
type ClusterType string

// AccessPolicyType represents the type of access policy
Expand Down Expand Up @@ -285,9 +289,9 @@ const (

// DrainStrategy leads to shutting down only input components (Input processors, remote input process group)
// and dropping all flowfiles from the flow.
DrainStrategy DataflowUpdateStrategy = "drain"
DrainStrategy ComponentUpdateStrategy = "drain"
// DropStrategy leads to shutting down all components and dropping all flowfiles from the flow.
DropStrategy DataflowUpdateStrategy = "drop"
DropStrategy ComponentUpdateStrategy = "drop"

// UserStateCreated describes the status of a NifiUser as created
UserStateCreated UserState = "created"
Expand Down Expand Up @@ -437,6 +441,7 @@ func SecretRefsEquals(secretRefs []SecretReference) bool {
return true
}

// +kubebuilder:validation:Enum={"never","always","once"}
type DataflowSyncMode string

const (
Expand Down
3 changes: 0 additions & 3 deletions api/v1/nificluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ const (
// NifiClusterSpec defines the desired state of NifiCluster
type NifiClusterSpec struct {
// clientType defines if the operator will use basic or tls authentication to query the NiFi cluster.
// +kubebuilder:validation:Enum={"tls","basic"}
ClientType ClientConfigType `json:"clientType,omitempty"`
// type defines if the cluster is internal (i.e manager by the operator) or external.
// +kubebuilder:validation:Enum={"external","internal"}
Type ClusterType `json:"type,omitempty"`
// nodeURITemplate used to dynamically compute node uri (used if external type)
NodeURITemplate string `json:"nodeURITemplate,omitempty"`
Expand Down Expand Up @@ -415,7 +413,6 @@ type SSLSecrets struct {
// https://cert-manager.io/docs/concepts/issuer/
IssuerRef *cmmeta.ObjectReference `json:"issuerRef,omitempty"`
// TODO : add vault
// +kubebuilder:validation:Enum={"cert-manager","vault"}
PKIBackend PKIBackend `json:"pkiBackend,omitempty"`
//,"vault"
}
Expand Down
4 changes: 1 addition & 3 deletions api/v1/nifidataflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type NifiDataflowSpec struct {
// contains the reference to the ParameterContext with the one the dataflow is linked.
ParameterContextRef *ParameterContextReference `json:"parameterContextRef,omitempty"`
// if the flow will be synchronized once, continuously or never
// +kubebuilder:validation:Enum={"never","always","once"}
SyncMode *DataflowSyncMode `json:"syncMode,omitempty"`
// whether the flow is considered as ran if some controller services are still invalid or not.
SkipInvalidControllerService bool `json:"skipInvalidControllerService,omitempty"`
Expand All @@ -33,8 +32,7 @@ type NifiDataflowSpec struct {
// contains the reference to the NifiRegistry with the one the dataflow is linked.
RegistryClientRef *RegistryClientReference `json:"registryClientRef,omitempty"`
// describes the way the operator will deal with data when a dataflow will be updated : drop or drain
// +kubebuilder:validation:Enum={"drop","drain"}
UpdateStrategy DataflowUpdateStrategy `json:"updateStrategy"`
UpdateStrategy ComponentUpdateStrategy `json:"updateStrategy"`
}

type FlowPosition struct {
Expand Down
99 changes: 95 additions & 4 deletions api/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v1alpha1
import (
"fmt"

v1 "github.com/konpyutaika/nifikop/api/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -21,11 +22,15 @@ type ClusterScalingStrategy string
// DataflowState defines the state of a NifiDataflow
type DataflowState string

// ConnectionState defines the state of a NifiConnection
type ConnectionState string

// DataflowUpdateRequestType defines the type of versioned flow update request
type DataflowUpdateRequestType string

// DataflowUpdateStrategy defines the type of strategy to update a flow
type DataflowUpdateStrategy string
// ComponentUpdateStrategy defines the type of strategy to update a component
// +kubebuilder:validation:Enum={"drop","drain"}
type ComponentUpdateStrategy string

// RackAwarenessState stores info about rack awareness status
type RackAwarenessState string
Expand All @@ -46,12 +51,15 @@ type ConfigurationState string
type InitClusterNode bool

// PKIBackend represents an interface implementing the PKIManager
// +kubebuilder:validation:Enum={"cert-manager","vault"}
type PKIBackend string

// ClientConfigType represents an interface implementing the ClientConfigManager
// +kubebuilder:validation:Enum={"tls","basic"}
type ClientConfigType string

// ClusterType represents an interface implementing the ClientConfigManager
// +kubebuilder:validation:Enum={"external","internal"}
type ClusterType string

// AccessPolicyType represents the type of access policy
Expand Down Expand Up @@ -278,16 +286,23 @@ const (
// DataflowStateInSync describes the status of a NifiDataflow as in sync
DataflowStateInSync DataflowState = "InSync"

// ConnectionStateOutOfSync describes the status of a NifiConnection as out of sync
ConnectionStateOutOfSync ConnectionState = "OutOfSync"
// ConnectionStateInSync describes the status of a NifiConnection as in sync
ConnectionStateInSync ConnectionState = "InSync"
// ConnectionStateCreated describes the status of a NifiConnection as created
ConnectionStateCreated ConnectionState = "Created"

// RevertRequestType defines a revert changes request.
RevertRequestType DataflowUpdateRequestType = "Revert"
// UpdateRequestType defines an update version request.
UpdateRequestType DataflowUpdateRequestType = "Update"

// DrainStrategy leads to shutting down only input components (Input processors, remote input process group)
// and dropping all flowfiles from the flow.
DrainStrategy DataflowUpdateStrategy = "drain"
DrainStrategy ComponentUpdateStrategy = "drain"
// DropStrategy leads to shutting down all components and dropping all flowfiles from the flow.
DropStrategy DataflowUpdateStrategy = "drop"
DropStrategy ComponentUpdateStrategy = "drop"

// UserStateCreated describes the status of a NifiUser as created
UserStateCreated UserState = "created"
Expand Down Expand Up @@ -437,6 +452,21 @@ func SecretRefsEquals(secretRefs []SecretReference) bool {
return true
}

func ComponentRefsEquals(componentRefs []ComponentReference) bool {
c1 := componentRefs[0]
name := c1.Name
ns := c1.Namespace

for _, component := range componentRefs {
if name != component.Name || ns != component.Namespace || ns != string(component.Type) || ns != component.SubName {
return false
}
}

return true
}

// +kubebuilder:validation:Enum={"never","always","once"}
type DataflowSyncMode string

const (
Expand All @@ -462,3 +492,64 @@ const (
// downscale strategy targeting nodes which are least busy in terms of # flowfiles in queues
LeastBusyClusterDownscaleStrategy ClusterScalingStrategy = "leastbusy"
)

// Change the list to {"dataflow","input-port","output-port","processor","process-group"} when all the type are available
// +kubebuilder:validation:Enum={"dataflow"}
type ComponentType string

const (
ComponentDataflow ComponentType = "dataflow"
ComponentInputPort ComponentType = "input-port"
ComponentOutputPort ComponentType = "output-port"
ComponentProcessor ComponentType = "processor"
ComponentFunnel ComponentType = "funnel"
ComponentProcessGroup ComponentType = "process-group"
)

type ComponentInformation struct {
Id string `json:"id"`
GroupId string `json:"groupId"`
Type string `json:"type"`
ParentGroupId string `json:"parentGroupId"`
ClusterRef v1.ClusterReference `json:"clusterRef"`
}

// +kubebuilder:validation:Enum={"DO_NOT_LOAD_BALANCE","PARTITION_BY_ATTRIBUTE","ROUND_ROBIN","SINGLE"}
type ConnectionLoadBalanceStrategy string

const (
// Do not load balance FlowFiles between nodes in the cluster.
StrategyDoNotLoadBalance ConnectionLoadBalanceStrategy = "DO_NOT_LOAD_BALANCE"
// Determine which node to send a given FlowFile to based on the value of a user-specified FlowFile Attribute. All FlowFiles that have the same value for said Attribute will be sent to the same node in the cluster.
StrategyPartitionByAttribute ConnectionLoadBalanceStrategy = "PARTITION_BY_ATTRIBUTE"
// FlowFiles will be distributed to nodes in the cluster in a Round-Robin fashion. However, if a node in the cluster is not able to receive data as fast as other nodes, that node may be skipped in one or more iterations in order to maximize throughput of data distribution across the cluster.
StrategyRoundRobin ConnectionLoadBalanceStrategy = "ROUND_ROBIN"
// All FlowFiles will be sent to the same node. Which node they are sent to is not defined.
StrategySingle ConnectionLoadBalanceStrategy = "SINGLE"
)

// +kubebuilder:validation:Enum={"DO_NOT_COMPRESS","COMPRESS_ATTRIBUTES_ONLY","COMPRESS_ATTRIBUTES_AND_CONTENT"}
type ConnectionLoadBalanceCompression string

const (
// FlowFiles will not be compressed.
CompressionDoNotCompress ConnectionLoadBalanceCompression = "DO_NOT_COMPRESS"
// FlowFiles' attributes will be compressed, but the FlowFiles' contents will not be
CompressionCompressAttributesOnly ConnectionLoadBalanceCompression = "COMPRESS_ATTRIBUTES_ONLY"
// FlowFiles' attributes and content will be compressed
CompressionCompressAttributesAndContent ConnectionLoadBalanceCompression = "COMPRESS_ATTRIBUTES_AND_CONTENT"
)

// +kubebuilder:validation:Enum={"FirstInFirstOutPrioritizer","NewestFlowFileFirstPrioritizer","OldestFlowFileFirstPrioritizer","PriorityAttributePrioritizer"}
type ConnectionPrioritizer string

const (
// Given two FlowFiles, the one that reached the connection first will be processed first.
PrioritizerFirstInFirstOutPrioritizer ConnectionPrioritizer = "FirstInFirstOutPrioritizer"
// Given two FlowFiles, the one that is newest in the dataflow will be processed first.
PrioritizerNewestFlowFileFirstPrioritizer ConnectionPrioritizer = "NewestFlowFileFirstPrioritizer"
// Given two FlowFiles, the one that is oldest in the dataflow will be processed first. 'This is the default scheme that is used if no prioritizers are selected'.
PrioritizerOldestFlowFileFirstPrioritizer ConnectionPrioritizer = "OldestFlowFileFirstPrioritizer"
// Given two FlowFiles, an attribute called “priority” will be extracted. The one that has the lowest priority value will be processed first.
PrioritizerPriorityAttributePrioritizer ConnectionPrioritizer = "PriorityAttributePrioritizer"
)
3 changes: 0 additions & 3 deletions api/v1alpha1/nificluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ const (
// NifiClusterSpec defines the desired state of NifiCluster
type NifiClusterSpec struct {
// clientType defines if the operator will use basic or tls authentication to query the NiFi cluster.
// +kubebuilder:validation:Enum={"tls","basic"}
ClientType ClientConfigType `json:"clientType,omitempty"`
// type defines if the cluster is internal (i.e manager by the operator) or external.
// +kubebuilder:validation:Enum={"external","internal"}
Type ClusterType `json:"type,omitempty"`
// nodeURITemplate used to dynamically compute node uri (used if external type)
NodeURITemplate string `json:"nodeURITemplate,omitempty"`
Expand Down Expand Up @@ -373,7 +371,6 @@ type SSLSecrets struct {
// https://cert-manager.io/docs/concepts/issuer/
IssuerRef *cmmeta.ObjectReference `json:"issuerRef,omitempty"`
// TODO : add vault
// +kubebuilder:validation:Enum={"cert-manager","vault"}
PKIBackend PKIBackend `json:"pkiBackend,omitempty"`
//,"vault"
}
Expand Down
Loading