Skip to content

Commit

Permalink
Merge pull request #117 from konpyutaika/auto-scaler
Browse files Browse the repository at this point in the history
[Feature/NiFiCluster] Auto scaler
  • Loading branch information
erdrix authored Aug 19, 2022
2 parents 03ed90f + 7f702d1 commit b3a6369
Show file tree
Hide file tree
Showing 93 changed files with 10,496 additions and 169 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,6 @@ testbin/*

# editor and IDE paraphernalia

*~
*~

vendor/
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@

### Removed

## v0.13.0

### Added

- [PR #89](https://github.com/konpyutaika/nifikop/pull/89) - **[Operator/NifiNodeGroupAutoscaler]** Add NifiNodeGroupAutoscaler to automatically horizontally scale a NifiCluster resource via the Kubernetes HorizontalPodAutoscaler.

## v0.12.0

### Added
Expand Down
48 changes: 14 additions & 34 deletions PROJECT
Original file line number Diff line number Diff line change
@@ -1,81 +1,61 @@
domain: konpyutaika.com
layout: go.kubebuilder.io/v3
layout:
- go.kubebuilder.io/v3
plugins:
manifests.sdk.operatorframework.io/v2: {}
scorecard.sdk.operatorframework.io/v2: {}
projectName: nifikop
repo: github.com/konpyutaika/nifikop
resources:
- api:
crdVersion: v1
# TODO(user): Uncomment the below line if this resource's CRD is namespace scoped, else delete it.
# namespaced: true
# TODO(user): Uncomment the below line if this resource implements a controller, else delete it.
# controller: true
domain: konpyutaika.com
group: nifi
kind: NifiCluster
# TODO(user): Update the package path for your API if the below value is incorrect.
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
# TODO(user): Uncomment the below line if this resource's CRD is namespace scoped, else delete it.
# namespaced: true
# TODO(user): Uncomment the below line if this resource implements a controller, else delete it.
# controller: true
domain: konpyutaika.com
group: nifi
kind: NifiUserGroup
# TODO(user): Update the package path for your API if the below value is incorrect.
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
# TODO(user): Uncomment the below line if this resource's CRD is namespace scoped, else delete it.
# namespaced: true
# TODO(user): Uncomment the below line if this resource implements a controller, else delete it.
# controller: true
domain: konpyutaika.com
group: nifi
kind: NifiUser
# TODO(user): Update the package path for your API if the below value is incorrect.
path: github.com/Okonpyutaika/nifikop/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
# TODO(user): Uncomment the below line if this resource's CRD is namespace scoped, else delete it.
# namespaced: true
# TODO(user): Uncomment the below line if this resource implements a controller, else delete it.
# controller: true
domain: konpyutaika.com
group: nifi
kind: NifiRegistryClient
# TODO(user): Update the package path for your API if the below value is incorrect.
path: github.com/Okonpyutaika/nifikop/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
# TODO(user): Uncomment the below line if this resource's CRD is namespace scoped, else delete it.
# namespaced: true
# TODO(user): Uncomment the below line if this resource implements a controller, else delete it.
# controller: true
domain: konpyutaika.com
group: nifi
kind: NifiDataflow
# TODO(user): Update the package path for your API if the below value is incorrect.
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
# TODO(user): Uncomment the below line if this resource's CRD is namespace scoped, else delete it.
# namespaced: true
# TODO(user): Uncomment the below line if this resource implements a controller, else delete it.
# controller: true
domain: konpyutaika.com
group: nifi
kind: NifiParameterContext
# TODO(user): Update the package path for your API if the below value is incorrect.
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
namespaced: true
controller: true
domain: konpyutaika.com
group: nifi
kind: NifiNodeGroupAutoscaler
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
version: "3"
plugins:
manifests.sdk.operatorframework.io/v2: {}
scorecard.sdk.operatorframework.io/v2: {}
40 changes: 39 additions & 1 deletion api/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package v1alpha1

import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// DataflowState defines the state of a NifiDataflow
Expand All @@ -25,10 +27,22 @@ type ActionStep string
// ClusterState holds info about the cluster state
type ClusterState string

// NodeGroupAutoscalerState holds info autoscaler state
type NodeGroupAutoscalerState string

// ClusterReplicas holds info about the current number of replicas in the cluster
type ClusterReplicas int32

// ClusterReplicaSelector holds info about the pod selector for cluster replicas
type ClusterReplicaSelector string

// ClusterScalingStrategy holds info about how a cluster should be scaled
type ClusterScalingStrategy string

// ConfigurationState holds info about the configuration state
type ConfigurationState string

// InitClusterNode holds info about if the node was part of the init cluster setup
// InitClusterNode holds info about if the node was part of the init cluster setup
type InitClusterNode bool

// PKIBackend represents an interface implementing the PKIManager
Expand Down Expand Up @@ -325,6 +339,12 @@ type NodeState struct {
InitClusterNode InitClusterNode `json:"initClusterNode"`
// PodIsReady whether or not the associated pod is ready
PodIsReady bool `json:"podIsReady"`
// CreationTime is the time at which this node was created. This must be sortable.
// +optional
CreationTime *metav1.Time `json:"creationTime,omitempty"`
// LastUpdatedTime is the last time at which this node was updated. This must be sortable.
// +optional
LastUpdatedTime metav1.Time `json:"lastUpdatedTime,omitempty"`
}

// RackAwarenessState holds info about rack awareness status
Expand Down Expand Up @@ -392,6 +412,24 @@ const (
NotInitClusterNode InitClusterNode = false
)

const (
// AutoscalerStateOutOfSync describes the status of a NifiNodeGroupAutoscaler as out of sync
AutoscalerStateOutOfSync NodeGroupAutoscalerState = "OutOfSync"
// AutoscalerStateInSync describes the status of a NifiNodeGroupAutoscaler as in sync
AutoscalerStateInSync NodeGroupAutoscalerState = "InSync"

// upscale strategy representing 'Scale > Disconnect the nodes > Offload data > Reconnect the node' strategy
GracefulClusterUpscaleStrategy ClusterScalingStrategy = "graceful"
// simply add a node to the cluster and nothing else
SimpleClusterUpscaleStrategy ClusterScalingStrategy = "simple"
// downscale strategy to remove the last node added
LIFOClusterDownscaleStrategy ClusterScalingStrategy = "lifo"
// downscale strategy avoiding primary/coordinator nodes
NonPrimaryClusterDownscaleStrategy ClusterScalingStrategy = "nonprimary"
// downscale strategy targeting nodes which are least busy in terms of # flowfiles in queues
LeastBusyClusterDownscaleStrategy ClusterScalingStrategy = "leastbusy"
)

func ClusterRefsEquals(clusterRefs []ClusterReference) bool {
c1 := clusterRefs[0]
name := c1.Name
Expand Down
54 changes: 51 additions & 3 deletions api/v1alpha1/nificluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package v1alpha1

import (
"fmt"
"sort"
"strconv"
"strings"

cmmeta "github.com/jetstack/cert-manager/pkg/apis/meta/v1"
Expand All @@ -15,7 +17,7 @@ const (
HttpListenerType = "http"
HttpsListenerType = "https"
S2sListenerType = "s2s"
prometheusListenerType = "prometheus"
PrometheusListenerType = "prometheus"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
Expand Down Expand Up @@ -75,7 +77,7 @@ type NifiClusterSpec struct {
// NodeUserIdentityTemplate specifies the template to be used when naming the node user identity (e.g. node-%d-mysuffix)
NodeUserIdentityTemplate *string `json:"nodeUserIdentityTemplate,omitempty"`
// all node requires an image, unique id, and storageConfigs settings
Nodes []Node `json:"nodes"`
Nodes []Node `json:"nodes" patchStrategy:"merge" patchMergeKey:"id"`
// Defines the configuration for PodDisruptionBudget
DisruptionBudget DisruptionBudget `json:"disruptionBudget,omitempty"`
// LdapConfiguration specifies the configuration if you want to use LDAP
Expand Down Expand Up @@ -161,6 +163,9 @@ type Node struct {
ReadOnlyConfig *ReadOnlyConfig `json:"readOnlyConfig,omitempty"`
// node configuration
NodeConfig *NodeConfig `json:"nodeConfig,omitempty"`
// Labels are used to distinguish nodes from one another. They are also used by NifiNodeGroupAutoscaler
// to be automatically scaled. See NifiNodeGroupAutoscaler.Spec.NodeLabelsSelector
Labels map[string]string `json:"labels,omitempty"`
}

type ReadOnlyConfig struct {
Expand Down Expand Up @@ -726,7 +731,7 @@ func (nProperties NifiProperties) GetAuthorizer() string {
func (nSpec *NifiClusterSpec) GetMetricPort() *int {

for _, iListener := range nSpec.ListenersConfig.InternalListeners {
if iListener.Type == prometheusListenerType {
if iListener.Type == PrometheusListenerType {
val := int(iListener.ContainerPort)
return &val
}
Expand Down Expand Up @@ -804,3 +809,46 @@ func (cluster NifiCluster) IsReady() bool {
func (cluster *NifiCluster) Id() string {
return cluster.Name
}

type Pair struct {
Key string
Value metav1.Time
}
type PairList []Pair

func (p PairList) Len() int { return len(p) }
func (p PairList) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p PairList) Less(i, j int) bool { return p[i].Value.Before(&p[j].Value) }

// Order the nodes in the cluster by the time they were created. The list will be in ascending order.
// Older nodes will be in the beginning of the list, newer nodes at the end.
// Nodes for Clusters that existed prior to this feature (v0.11.0+) will not have a creationTime. In this case,
// LIFO will not be able to reliably determine the oldest node. A rolling restart of nodes in the cluster will
// resolve this issue going forward.
func (cluster *NifiCluster) GetCreationTimeOrderedNodes() []Node {
nodeIdCreationPairs := PairList{}

for k, v := range cluster.Status.NodesState {
nodeIdCreationPairs = append(nodeIdCreationPairs, Pair{k, *v.CreationTime})
}

// nodeIdCreationPairs is now sorted by creation time in ascending order.
sort.Sort(nodeIdCreationPairs)

nodesMap := NodesToIdMap(cluster.Spec.Nodes)
timeOrderedNodes := []Node{}

for _, pair := range nodeIdCreationPairs {
id, _ := strconv.Atoi(pair.Key)
timeOrderedNodes = append(timeOrderedNodes, nodesMap[int32(id)])
}
return timeOrderedNodes
}

func NodesToIdMap(nodes []Node) (nodeMap map[int32]Node) {
nodeMap = make(map[int32]Node)
for _, node := range nodes {
nodeMap[node.Id] = node
}
return
}
51 changes: 51 additions & 0 deletions api/v1alpha1/nificluster_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package v1alpha1

import (
"testing"
"time"

"k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestGetCreationTimeOrderedNodes(t *testing.T) {
time1 := v1.NewTime(time.Now().UTC().Add(time.Duration(5) * time.Hour))
time2 := v1.NewTime(time.Now().UTC().Add(time.Duration(10) * time.Hour))
time3 := v1.NewTime(time.Now().UTC().Add(time.Duration(15) * time.Hour))
time4 := v1.NewTime(time.Now().UTC().Add(time.Duration(20) * time.Hour))

cluster := &NifiCluster{
Spec: NifiClusterSpec{
Nodes: []Node{
{Id: 2, NodeConfigGroup: "scale-group", Labels: map[string]string{"scale_me": "true"}},
{Id: 3, NodeConfigGroup: "scale-group", Labels: map[string]string{"scale_me": "true"}},
{Id: 4, NodeConfigGroup: "scale-group", Labels: map[string]string{"scale_me": "true"}},
{Id: 5, NodeConfigGroup: "other-group", Labels: map[string]string{"other_group": "true"}},
},
},
Status: NifiClusterStatus{
NodesState: map[string]NodeState{
"2": {
CreationTime: &time1,
},
"3": {
CreationTime: &time3,
},
"4": {
CreationTime: &time2,
},
"5": {
CreationTime: &time4,
},
},
},
}

nodeList := cluster.GetCreationTimeOrderedNodes()

if len(nodeList) != 4 {
t.Errorf("Incorrect node list: %v+", nodeList)
}
if nodeList[0].Id != 2 || nodeList[1].Id != 4 || nodeList[2].Id != 3 || nodeList[3].Id != 5 {
t.Errorf("Incorrect node list: %v+", nodeList)
}
}
Loading

0 comments on commit b3a6369

Please sign in to comment.