Skip to content

Commit

Permalink
Merge pull request #512 from kubernetes-sigs/master
Browse files Browse the repository at this point in the history
🏃 ff release-0.2 branch to master HEAD
  • Loading branch information
droot authored Jul 2, 2019
2 parents aeaf98d + 2495fdd commit aaddbd9
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 26 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 40 additions & 7 deletions pkg/client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
)

Expand Down Expand Up @@ -61,7 +62,27 @@ func init() {
//
// * $HOME/.kube/config if exists
func GetConfig() (*rest.Config, error) {
cfg, err := loadConfig()
return GetConfigWithContext("")
}

// GetConfigWithContext creates a *rest.Config for talking to a Kubernetes API server with a specific context.
// If --kubeconfig is set, will use the kubeconfig file at that location. Otherwise will assume running
// in cluster and use the cluster provided kubeconfig.
//
// It also applies saner defaults for QPS and burst based on the Kubernetes
// controller manager defaults (20 QPS, 30 burst)
//
// Config precedence
//
// * --kubeconfig flag pointing at a file
//
// * KUBECONFIG environment variable pointing at a file
//
// * In-cluster config if running in cluster
//
// * $HOME/.kube/config if exists
func GetConfigWithContext(context string) (*rest.Config, error) {
cfg, err := loadConfig(context)
if err != nil {
return nil, err
}
Expand All @@ -75,30 +96,42 @@ func GetConfig() (*rest.Config, error) {
}

// loadConfig loads a REST Config as per the rules specified in GetConfig
func loadConfig() (*rest.Config, error) {
func loadConfig(context string) (*rest.Config, error) {

// If a flag is specified with the config location, use that
if len(kubeconfig) > 0 {
return clientcmd.BuildConfigFromFlags(apiServerURL, kubeconfig)
return loadConfigWithContext(apiServerURL, kubeconfig, context)
}
// If an env variable is specified with the config locaiton, use that
// If an env variable is specified with the config location, use that
if len(os.Getenv("KUBECONFIG")) > 0 {
return clientcmd.BuildConfigFromFlags(apiServerURL, os.Getenv("KUBECONFIG"))
return loadConfigWithContext(apiServerURL, os.Getenv("KUBECONFIG"), context)
}
// If no explicit location, try the in-cluster config
if c, err := rest.InClusterConfig(); err == nil {
return c, nil
}
// If no in-cluster config, try the default location in the user's home directory
if usr, err := user.Current(); err == nil {
if c, err := clientcmd.BuildConfigFromFlags(
"", filepath.Join(usr.HomeDir, ".kube", "config")); err == nil {
if c, err := loadConfigWithContext(apiServerURL, filepath.Join(usr.HomeDir, ".kube", "config"),
context); err == nil {
return c, nil
}
}

return nil, fmt.Errorf("could not locate a kubeconfig")
}

func loadConfigWithContext(apiServerURL, kubeconfig, context string) (*rest.Config, error) {
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfig},
&clientcmd.ConfigOverrides{
ClusterInfo: clientcmdapi.Cluster{
Server: apiServerURL,
},
CurrentContext: context,
}).ClientConfig()
}

// GetConfigOrDie creates a *rest.Config for talking to a Kubernetes apiserver.
// If --kubeconfig is set, will use the kubeconfig file at that location. Otherwise will assume running
// in cluster and use the cluster provided kubeconfig.
Expand Down
19 changes: 14 additions & 5 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ type controllerManager struct {
// metricsListener is used to serve prometheus metrics
metricsListener net.Listener

mu sync.Mutex
started bool
errChan chan error
mu sync.Mutex
started bool
startedLeader bool
errChan chan error

// internalStop is the stop channel *actually* used by everything involved
// with the manager as a stop channel, so that we can pass a stop channel
Expand Down Expand Up @@ -134,14 +135,18 @@ func (cm *controllerManager) Add(r Runnable) error {
return err
}

var shouldStart bool

// Add the runnable to the leader election or the non-leaderelection list
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
shouldStart = cm.started
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
} else {
shouldStart = cm.startedLeader
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
}

if cm.started {
if shouldStart {
// If already started, start the controller
go func() {
cm.errChan <- r.Start(cm.internalStop)
Expand Down Expand Up @@ -225,17 +230,19 @@ func (cm *controllerManager) GetWebhookServer() *webhook.Server {
}

func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
var metricsPath = "/metrics"
handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
ErrorHandling: promhttp.HTTPErrorOnError,
})
// TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
mux := http.NewServeMux()
mux.Handle("/metrics", handler)
mux.Handle(metricsPath, handler)
server := http.Server{
Handler: mux,
}
// Run the server
go func() {
log.Info("starting metrics server", "path", metricsPath)
if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
cm.errChan <- err
}
Expand Down Expand Up @@ -314,6 +321,8 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
cm.errChan <- ctrl.Start(cm.internalStop)
}()
}

cm.startedLeader = true
}

func (cm *controllerManager) waitForCache() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type Options struct {
Namespace string

// MetricsBindAddress is the TCP address that the controller should bind to
// for serving prometheus metrics
// for serving prometheus metrics.
// It can be set to "0" to disable the metrics serving.
MetricsBindAddress string

// Port is the port that the webhook server serves at.
Expand Down
13 changes: 7 additions & 6 deletions pkg/metrics/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ import (
"net"
)

// DefaultBindAddress sets the default bind address for the metrics
// listener
// The metrics is off by default.
// TODO: Flip the default by changing DefaultBindAddress back to ":8080" in the v0.2.0.
var DefaultBindAddress = "0"
// DefaultBindAddress sets the default bind address for the metrics listener
// The metrics is on by default.
var DefaultBindAddress = ":8080"

// NewListener creates a new TCP listener bound to the given address.
func NewListener(addr string) (net.Listener, error) {
Expand All @@ -39,9 +37,12 @@ func NewListener(addr string) (net.Listener, error) {
return nil, nil
}

log.Info("metrics server is starting to listen", "addr", addr)
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("error listening on %s: %v", addr, err)
er := fmt.Errorf("error listening on %s: %v", addr, err)
log.Error(er, "metrics server failed to listen. You may want to disable the metrics server or use another port if it is due to conflicts")
return nil, er
}
return ln, nil
}
29 changes: 22 additions & 7 deletions pkg/webhook/conversion/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ func (wh *Webhook) convertViaHub(src, dst conversion.Convertible) error {

// getHub returns an instance of the Hub for passed-in object's group/kind.
func (wh *Webhook) getHub(obj runtime.Object) (conversion.Hub, error) {
gvks := objectGVKs(wh.scheme, obj)
gvks, err := objectGVKs(wh.scheme, obj)
if err != nil {
return nil, err
}
if len(gvks) == 0 {
return nil, fmt.Errorf("error retrieving gvks for object : %v", obj)
}
Expand Down Expand Up @@ -223,7 +226,10 @@ func (wh *Webhook) allocateDstObject(apiVersion, kind string) (runtime.Object, e
func IsConvertible(scheme *runtime.Scheme, obj runtime.Object) (bool, error) {
var hubs, spokes, nonSpokes []runtime.Object

gvks := objectGVKs(scheme, obj)
gvks, err := objectGVKs(scheme, obj)
if err != nil {
return false, err
}
if len(gvks) == 0 {
return false, fmt.Errorf("error retrieving gvks for object : %v", obj)
}
Expand Down Expand Up @@ -273,18 +279,27 @@ func IsConvertible(scheme *runtime.Scheme, obj runtime.Object) (bool, error) {
}

// objectGVKs returns all (Group,Version,Kind) for the Group/Kind of given object.
func objectGVKs(scheme *runtime.Scheme, obj runtime.Object) []schema.GroupVersionKind {
var gvks []schema.GroupVersionKind

objGVK := obj.GetObjectKind().GroupVersionKind()
func objectGVKs(scheme *runtime.Scheme, obj runtime.Object) ([]schema.GroupVersionKind, error) {
// NB: we should not use `obj.GetObjectKind().GroupVersionKind()` to get the
// GVK here, since it is parsed from apiVersion and kind fields and it may
// return empty GVK if obj is an uninitialized object.
objGVKs, _, err := scheme.ObjectKinds(obj)
if err != nil {
return nil, err
}
if len(objGVKs) != 1 {
return nil, fmt.Errorf("expect to get only one GVK for %v", obj)
}
objGVK := objGVKs[0]
knownTypes := scheme.AllKnownTypes()

var gvks []schema.GroupVersionKind
for gvk := range knownTypes {
if objGVK.GroupKind() == gvk.GroupKind() {
gvks = append(gvks, gvk)
}
}
return gvks
return gvks, nil
}

// PartialImplementationError represents an error due to partial conversion
Expand Down
22 changes: 22 additions & 0 deletions pkg/webhook/conversion/conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
appsv1beta1 "k8s.io/api/apps/v1beta1"
apix "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
kscheme "k8s.io/client-go/kubernetes/scheme"

Expand Down Expand Up @@ -312,6 +313,27 @@ var _ = Describe("IsConvertible", func() {
Expect(jobsv3.AddToScheme(scheme)).To(Succeed())
})

It("should not error for uninitialized types", func() {
obj := &jobsv2.ExternalJob{}

ok, err := IsConvertible(scheme, obj)
Expect(err).NotTo(HaveOccurred())
Expect(ok).To(BeTrue())
})

It("should not error for unstructured types", func() {
obj := &unstructured.Unstructured{
Object: map[string]interface{}{
"kind": "ExternalJob",
"apiVersion": "jobs.testprojects.kb.io/v2",
},
}

ok, err := IsConvertible(scheme, obj)
Expect(err).NotTo(HaveOccurred())
Expect(ok).To(BeTrue())
})

It("should return true for convertible types", func() {
obj := &jobsv2.ExternalJob{
TypeMeta: metav1.TypeMeta{
Expand Down

0 comments on commit aaddbd9

Please sign in to comment.