Skip to content

Commit

Permalink
Adjust some code to avoid duplicate
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangpengcheng committed Jun 6, 2024
1 parent abd7728 commit 3bf6d79
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 204 deletions.
6 changes: 1 addition & 5 deletions api/compute/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,14 @@ type PulsarMessaging struct {
CleanupAuthConfig *AuthConfig `json:"cleanupAuthConfig,omitempty"`
}

type TLSConfig struct {
type PulsarTLSConfig struct {
Enabled bool `json:"enabled,omitempty"`
AllowInsecure bool `json:"allowInsecure,omitempty"`
HostnameVerification bool `json:"hostnameVerification,omitempty"`
CertSecretName string `json:"certSecretName,omitempty"`
CertSecretKey string `json:"certSecretKey,omitempty"`
}

type PulsarTLSConfig struct {
TLSConfig `json:",inline"`
}

func (c *PulsarTLSConfig) IsEnabled() bool {
return c.Enabled
}
Expand Down
16 changes: 0 additions & 16 deletions api/compute/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,15 @@ func ConvertHPAV2ToV2beta2(hpa *autov2.HorizontalPodAutoscaler) *autoscalingv2be
}

result := &autoscalingv2beta2.HorizontalPodAutoscaler{
TypeMeta: metav1.TypeMeta{
APIVersion: "autoscaling/v2beta2",
Kind: "HorizontalPodAutoscaler",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: hpa.Namespace,
Name: hpa.Name,
Namespace: hpa.Namespace,
Name: hpa.Name,
Labels: hpa.Labels,
OwnerReferences: hpa.OwnerReferences,
},
Spec: autoscalingv2beta2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv2beta2.CrossVersionObjectReference{
Expand Down
68 changes: 36 additions & 32 deletions controllers/spec/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,15 @@ func MakeHeadlessServiceName(serviceName string) string {
return fmt.Sprintf("%s-headless", serviceName)
}

func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderImage string,
container *corev1.Container, filebeatContainer *corev1.Container,
volumes []corev1.Volume, labels map[string]string, policy v1alpha1.PodPolicy, pulsar v1alpha1.PulsarMessaging,
javaRuntime *v1alpha1.JavaRuntime, pythonRuntime *v1alpha1.PythonRuntime,
goRuntime *v1alpha1.GoRuntime, definedVolumeMounts []corev1.VolumeMount,
volumeClaimTemplates []corev1.PersistentVolumeClaim,
func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderImage string, container *corev1.Container,
volumes []corev1.Volume, labels map[string]string, policy v1alpha1.PodPolicy, authConfig *v1alpha1.AuthConfig,
tlsConfig TLSConfig, pulsarConfig, authSecret, tlsSecret string, javaRuntime *v1alpha1.JavaRuntime,
pythonRuntime *v1alpha1.PythonRuntime, goRuntime *v1alpha1.GoRuntime, env []corev1.EnvVar, name, logTopic, filebeatImage string,
logTopicAgent v1alpha1.LogTopicAgent, definedVolumeMounts []corev1.VolumeMount, volumeClaimTemplates []corev1.PersistentVolumeClaim,
persistentVolumeClaimRetentionPolicy *appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy) *appsv1.StatefulSet {

filebeatContainer := makeFilebeatContainer(definedVolumeMounts, env, name, logTopic, logTopicAgent, tlsConfig, authConfig, pulsarConfig, tlsSecret, authSecret, filebeatImage)

volumeMounts := generateDownloaderVolumeMountsForDownloader(javaRuntime, pythonRuntime, goRuntime)
var downloaderContainer *corev1.Container
var podVolumes = volumes
Expand All @@ -232,12 +233,12 @@ func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderI

// mount auth and tls related VolumeMounts when download package from pulsar
if !hasHTTPPrefix(downloadPath) {
if pulsar.AuthConfig != nil && pulsar.AuthConfig.OAuth2Config != nil {
volumeMounts = append(volumeMounts, generateVolumeMountFromOAuth2Config(pulsar.AuthConfig.OAuth2Config))
if authConfig != nil && authConfig.OAuth2Config != nil {
volumeMounts = append(volumeMounts, generateVolumeMountFromOAuth2Config(authConfig.OAuth2Config))
}

if !reflect.ValueOf(pulsar.TLSConfig).IsNil() && pulsar.TLSConfig.HasSecretVolume() {
volumeMounts = append(volumeMounts, generateVolumeMountFromTLSConfig(pulsar.TLSConfig))
if !reflect.ValueOf(tlsConfig).IsNil() && tlsConfig.HasSecretVolume() {
volumeMounts = append(volumeMounts, generateVolumeMountFromTLSConfig(tlsConfig))
}
}
volumeMounts = append(volumeMounts, definedVolumeMounts...)
Expand All @@ -253,15 +254,15 @@ func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderI
Name: DownloaderName,
Image: image,
Command: []string{"sh", "-c",
strings.Join(getDownloadCommand(downloadPath, componentPackage, true, true,
pulsar.AuthSecret != "", pulsar.TLSSecret != "", pulsar.TLSConfig, pulsar.AuthConfig), " ")},
strings.Join(GetDownloadCommand(downloadPath, componentPackage, true, true,
authSecret != "", tlsSecret != "", tlsConfig, authConfig), " ")},
VolumeMounts: volumeMounts,
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{{
Name: "HOME",
Value: "/tmp",
}},
EnvFrom: generateContainerEnvFrom(pulsar.PulsarConfig, pulsar.AuthSecret, pulsar.TLSSecret),
EnvFrom: GenerateContainerEnvFrom(pulsarConfig, authSecret, tlsSecret),
}
podVolumes = append(podVolumes, corev1.Volume{
Name: DownloaderVolume,
Expand Down Expand Up @@ -415,7 +416,7 @@ func MakeJavaFunctionCommand(downloadPath, packageFile, name, clusterName, gener
authConfig, maxPendingAsyncRequests, logConfigFileName), " ")
if downloadPath != "" && !utils.EnableInitContainers {
// prepend download command if the downPath is provided
downloadCommand := strings.Join(getDownloadCommand(downloadPath, packageFile, hasPulsarctl, hasWget,
downloadCommand := strings.Join(GetDownloadCommand(downloadPath, packageFile, hasPulsarctl, hasWget,
authProvided, tlsProvided, tlsConfig, authConfig), " ")
processCommand = downloadCommand + " && " + processCommand
}
Expand All @@ -431,7 +432,7 @@ func MakePythonFunctionCommand(downloadPath, packageFile, name, clusterName, gen
details, uid, authProvided, tlsProvided, secretMaps, state, tlsConfig, authConfig), " ")
if downloadPath != "" && !utils.EnableInitContainers {
// prepend download command if the downPath is provided
downloadCommand := strings.Join(getDownloadCommand(downloadPath, packageFile, hasPulsarctl, hasWget,
downloadCommand := strings.Join(GetDownloadCommand(downloadPath, packageFile, hasPulsarctl, hasWget,
authProvided,
tlsProvided, tlsConfig, authConfig), " ")
processCommand = downloadCommand + " && " + processCommand
Expand All @@ -450,7 +451,7 @@ func MakeGoFunctionCommand(downloadPath, goExecFilePath string, function *v1alph
hasPulsarctl = true
hasWget = true
}
downloadCommand := strings.Join(getDownloadCommand(downloadPath, goExecFilePath,
downloadCommand := strings.Join(GetDownloadCommand(downloadPath, goExecFilePath,
hasPulsarctl, hasWget, function.Spec.Pulsar.AuthSecret != "",
function.Spec.Pulsar.TLSSecret != "", function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.AuthConfig), " ")
processCommand = downloadCommand + " && ls -al && pwd &&" + processCommand
Expand All @@ -466,7 +467,7 @@ func MakeGenericFunctionCommand(downloadPath, functionFile, language, clusterNam
details, uid, authProvided, tlsProvided, secretMaps, state, tlsConfig, authConfig), " ")
if downloadPath != "" && !utils.EnableInitContainers {
// prepend download command if the downPath is provided
downloadCommand := strings.Join(getDownloadCommand(downloadPath, functionFile, true, true,
downloadCommand := strings.Join(GetDownloadCommand(downloadPath, functionFile, true, true,
authProvided,
tlsProvided, tlsConfig, authConfig), " ")
processCommand = downloadCommand + " && " + processCommand
Expand All @@ -486,7 +487,7 @@ func MakeLivenessProbe(liveness *v1alpha1.Liveness) *corev1.Probe {
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/",
Port: intstr.FromInt(int(MetricsPort.ContainerPort)),
Port: intstr.FromInt32(MetricsPort.ContainerPort),
},
},
InitialDelaySeconds: initialDelay,
Expand Down Expand Up @@ -788,7 +789,7 @@ func getCleanUpCommand(hasPulsarctl, authProvided, tlsProvided bool, tlsConfig T
" ")}
}

func getDownloadCommand(downloadPath, componentPackage string, hasPulsarctl, hasWget, authProvided, tlsProvided bool,
func GetDownloadCommand(downloadPath, componentPackage string, hasPulsarctl, hasWget, authProvided, tlsProvided bool,
tlsConfig TLSConfig,
authConfig *v1alpha1.AuthConfig) []string {
var args []string
Expand Down Expand Up @@ -822,7 +823,7 @@ func getDownloadCommand(downloadPath, componentPackage string, hasPulsarctl, has
return args
}

func generateJavaLogConfigCommand(runtime *v1alpha1.JavaRuntime, agent v1alpha1.LogTopicAgent) string {
func GenerateJavaLogConfigCommand(runtime *v1alpha1.JavaRuntime, agent v1alpha1.LogTopicAgent) string {
if runtime == nil || (runtime.Log != nil && runtime.Log.LogConfig != nil) {
return ""
}
Expand Down Expand Up @@ -857,7 +858,7 @@ func generateJavaLogConfigCommand(runtime *v1alpha1.JavaRuntime, agent v1alpha1.
return ""
}

func generateJavaLogConfigFileName(runtime *v1alpha1.JavaRuntime) string {
func GenerateJavaLogConfigFileName(runtime *v1alpha1.JavaRuntime) string {
if runtime == nil || (runtime.Log != nil && runtime.Log.LogConfig != nil) {
return DefaultJavaLogConfigPath
}
Expand Down Expand Up @@ -1524,12 +1525,15 @@ func generateBasicContainerEnv(secrets map[string]v1alpha1.SecretRef, env []core
return vars
}

func generateContainerEnvFrom(messagingConfig string, authSecret string, tlsSecret string) []corev1.EnvFromSource {
envs := []corev1.EnvFromSource{{
ConfigMapRef: &corev1.ConfigMapEnvSource{
LocalObjectReference: corev1.LocalObjectReference{Name: messagingConfig},
},
}}
func GenerateContainerEnvFrom(messagingConfig string, authSecret string, tlsSecret string) []corev1.EnvFromSource {
var envs []corev1.EnvFromSource
if messagingConfig != "" {
envs = append(envs, corev1.EnvFromSource{
ConfigMapRef: &corev1.ConfigMapEnvSource{
LocalObjectReference: corev1.LocalObjectReference{Name: messagingConfig},
},
})
}

if authSecret != "" {
envs = append(envs, corev1.EnvFromSource{
Expand Down Expand Up @@ -1814,7 +1818,7 @@ func generateContainerVolumeMountsFromProducerConf(conf *v1alpha1.ProducerConfig
return mounts
}

func generateContainerVolumeMounts(volumeMounts []corev1.VolumeMount, producerConf *v1alpha1.ProducerConfig,
func GenerateContainerVolumeMounts(volumeMounts []corev1.VolumeMount, producerConf *v1alpha1.ProducerConfig,
consumerConfs map[string]v1alpha1.ConsumerConfig, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig,
logConfigs map[int32]*v1alpha1.RuntimeLogConfig, agent v1alpha1.LogTopicAgent) []corev1.VolumeMount {
mounts := []corev1.VolumeMount{}
Expand All @@ -1839,7 +1843,7 @@ func generateContainerVolumeMounts(volumeMounts []corev1.VolumeMount, producerCo
return mounts
}

func generatePodVolumes(podVolumes []corev1.Volume, producerConf *v1alpha1.ProducerConfig,
func GeneratePodVolumes(podVolumes []corev1.Volume, producerConf *v1alpha1.ProducerConfig,
consumerConfs map[string]v1alpha1.ConsumerConfig, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig,
logConfigs map[int32]*v1alpha1.RuntimeLogConfig,
agent v1alpha1.LogTopicAgent) []corev1.Volume {
Expand Down Expand Up @@ -1999,7 +2003,7 @@ const (
golangRuntimeLog
)

func getRuntimeLogConfigNames(java *v1alpha1.JavaRuntime, python *v1alpha1.PythonRuntime,
func GetRuntimeLogConfigNames(java *v1alpha1.JavaRuntime, python *v1alpha1.PythonRuntime,
golang *v1alpha1.GoRuntime) map[int32]*v1alpha1.RuntimeLogConfig {

var configs = map[int32]*v1alpha1.RuntimeLogConfig{}
Expand Down Expand Up @@ -2249,7 +2253,7 @@ func makeFilebeatContainer(volumeMounts []corev1.VolumeMount, envVar []corev1.En
}
imagePullPolicy := corev1.PullIfNotPresent
allowPrivilegeEscalation := false
mounts := generateContainerVolumeMounts(volumeMounts, nil, nil, tlsConfig, authConfig, nil, agent)
mounts := GenerateContainerVolumeMounts(volumeMounts, nil, nil, tlsConfig, authConfig, nil, agent)

var uid int64 = 1000

Expand Down Expand Up @@ -2332,7 +2336,7 @@ func makeFilebeatContainer(volumeMounts []corev1.VolumeMount, envVar []corev1.En
Command: []string{"/bin/sh", "-c", "--", "echo " + fmt.Sprintf("\"%s\"", tpl.String()) + " > " + DefaultFilebeatConfig + " && /usr/share/filebeat/filebeat -e -c " + DefaultFilebeatConfig},
Env: envs,
ImagePullPolicy: imagePullPolicy,
EnvFrom: generateContainerEnvFrom(pulsarConfig, authSecret, tlsSecret),
EnvFrom: GenerateContainerEnvFrom(pulsarConfig, authSecret, tlsSecret),
VolumeMounts: mounts,
SecurityContext: &corev1.SecurityContext{
Capabilities: &corev1.Capabilities{
Expand Down
Loading

0 comments on commit 3bf6d79

Please sign in to comment.