From 9ff143bdef4e1f72a6d839e3e37158a4a5354b00 Mon Sep 17 00:00:00 2001 From: Igal Tsoiref Date: Tue, 15 Sep 2020 11:42:38 +0300 Subject: [PATCH] dblock --- cmd/main.go | 13 +-- go.mod | 4 +- go.sum | 9 ++ pkg/leader/leaderelector_posgress.go | 73 ++++++++++++++++ subsystem/authz_test.go | 4 +- subsystem/leader_test.go | 124 ++++++++++++++++++++++++++- subsystem/subsystem_suite_test.go | 2 +- 7 files changed, 211 insertions(+), 18 deletions(-) create mode 100644 pkg/leader/leaderelector_posgress.go diff --git a/cmd/main.go b/cmd/main.go index 4a7888e2537..3754b66f055 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -13,9 +13,6 @@ import ( "github.com/openshift/assisted-service/internal/imgexpirer" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" - "github.com/go-openapi/strfmt" "github.com/go-openapi/swag" "github.com/jinzhu/gorm" @@ -184,16 +181,10 @@ func main() { } generator = job.New(log.WithField("pkg", "k8s-job-wrapper"), kclient, Options.JobConfig) - cfg, cerr := clientcmd.BuildConfigFromFlags("", "") - if cerr != nil { - log.WithError(cerr).Fatalf("Failed to create kubernetes cluster config") - } - k8sClient := kubernetes.NewForConfigOrDie(cfg) - lead = leader.NewElector(k8sClient, Options.LeaderConfig, "assisted-service-leader-election-helper", - log.WithField("pkg", "monitor-runner")) + lead = leader.NewDbElector(db, Options.LeaderConfig, "assisted-service-lock", log) err = lead.StartLeaderElection(context.Background()) if err != nil { - log.WithError(cerr).Fatalf("Failed to start leader") + log.WithError(err).Fatalf("Failed to start leader") } case "onprem": diff --git a/go.mod b/go.mod index c18a729ed65..9735471358a 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/openshift/assisted-service go 1.13 require ( + cirello.io/pglock v1.8.0 github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 github.com/aws/aws-sdk-go v1.32.6 @@ -22,6 +23,7 @@ require ( github.com/google/uuid v1.1.1 github.com/jinzhu/gorm v1.9.12 github.com/kelseyhightower/envconfig v1.4.0 + github.com/lib/pq v1.8.0 github.com/moby/moby v1.13.1 github.com/onsi/ginkgo v1.14.0 github.com/onsi/gomega v1.10.1 @@ -43,11 +45,11 @@ require ( golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 // indirect + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect gopkg.in/square/go-jose.v2 v2.2.2 gopkg.in/yaml.v2 v2.3.0 k8s.io/api v0.18.5 k8s.io/apimachinery v0.18.5 k8s.io/client-go v0.18.5 - k8s.io/klog/v2 v2.0.0 sigs.k8s.io/controller-runtime v0.6.1 ) diff --git a/go.sum b/go.sum index e327095e240..b90d08be04d 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ bazil.org/fuse v0.0.0-20160811212531-371fbbdaa898/go.mod h1:Xbm+BRKSBEpa4q4hTSxohYNQpsxXPbPry4JJWOB3LB8= +cirello.io/pglock v1.8.0 h1:YmXjZ+zE2c6cuRP2efbRDKnk/qu36g0wbshlJetRIzM= +cirello.io/pglock v1.8.0/go.mod h1:iO/b3K4gTIIKO3DhR8t1mYjtjI6tQJhAED2o9oXtP4I= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -14,6 +16,7 @@ github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6L github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU= github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= @@ -344,6 +347,10 @@ github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgx github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.1 h1:sJZmqHoEaY7f+NPP8pgLB/WxulyR3fewgCM2qaSlBb4= github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg= +github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -685,6 +692,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gomodules.xyz/jsonpatch/v2 v2.0.1 h1:xyiBuvkD2g5n7cYzx6u2sxQvsAy4QJsZFCzGVdzOXZ0= gomodules.xyz/jsonpatch/v2 v2.0.1/go.mod h1:IhYNNY4jnS53ZnfE4PAmpKtDpTCj1JFXc+3mwe7XcUU= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= diff --git a/pkg/leader/leaderelector_posgress.go b/pkg/leader/leaderelector_posgress.go new file mode 100644 index 00000000000..8dfaa397743 --- /dev/null +++ b/pkg/leader/leaderelector_posgress.go @@ -0,0 +1,73 @@ +package leader + +import ( + "cirello.io/pglock" + "context" + "errors" + "github.com/jinzhu/gorm" + "github.com/lib/pq" + "github.com/sirupsen/logrus" +) + + +type DbElector struct { + log logrus.FieldLogger + db *gorm.DB + config Config + isLeader bool + leaderName string +} + + + +func NewDbElector(db *gorm.DB, config Config, leaderName string, logger logrus.FieldLogger) *DbElector { + return &DbElector{db: db, log: logger, config: config, isLeader: false, leaderName: leaderName} +} + +func (l *DbElector) IsLeader() bool { + return l.isLeader +} + +func (l *DbElector) StartLeaderElection(ctx context.Context) error { + + c, err := pglock.New(l.db.DB(), + pglock.WithLeaseDuration(l.config.LeaseDuration), + pglock.WithHeartbeatFrequency(l.config.RetryInterval), + pglock.WithCustomTable(l.leaderName)) + if err != nil { + l.log.WithError(err).Error("Failed to create db lock") + } + err = c.CreateTable() + if err != nil { + if p, ok := errors.Unwrap(err).(*pq.Error); !ok || p.Code.Name() != "duplicate_table" { + l.log.WithError(err).Infof("CCCCCCCCCCCCCCCCCCCCCCCCCCC") + return err + } + } + + go func() { + var lock *pglock.Lock + defer func() { + if lock != nil { + lock.Close() + } + }() + for { + if ctx.Err() != nil { + return + } + l.log.Infof("BBBBBBBBBBBBBBBBBBBBBBBBB") + err = c.Do(ctx, l.leaderName, l.locked) + l.log.WithError(err).Infof("AAAAAAAAAAAAAAAAAAAAAAAA") + } + }() + + return nil +} +func (l *DbElector) locked(ctx context.Context, lock *pglock.Lock) error{ + l.log.Infof("GGGGGGGGGGGGGGGGGGGGGGGGGGGG") + l.isLeader = true + <-ctx.Done() + l.isLeader = false + return nil +} \ No newline at end of file diff --git a/subsystem/authz_test.go b/subsystem/authz_test.go index 451131dc961..a6cb1b22b48 100644 --- a/subsystem/authz_test.go +++ b/subsystem/authz_test.go @@ -27,7 +27,7 @@ var _ = Describe("test authorization", func() { var capabilityReviewAdminStubID string BeforeSuite(func() { - if !Options.EnableAuth { + if Options.EnableAuth { return } @@ -45,7 +45,7 @@ var _ = Describe("test authorization", func() { }) AfterSuite(func() { - if !Options.EnableAuth { + if Options.EnableAuth { return } diff --git a/subsystem/leader_test.go b/subsystem/leader_test.go index eb8990bb440..eccd0001229 100644 --- a/subsystem/leader_test.go +++ b/subsystem/leader_test.go @@ -2,6 +2,9 @@ package subsystem import ( "context" + "fmt" + "github.com/jinzhu/gorm" + "github.com/sirupsen/logrus" "os/user" "path" "time" @@ -52,13 +55,13 @@ func waitForPredicate(timeout time.Duration, predicate func() bool) { } type Test struct { - lead *leader.Elector + lead leader.ElectorInterface name string ctx context.Context cancel context.CancelFunc } -func NewTest(lead *leader.Elector, name string) *Test { +func NewTest(lead leader.ElectorInterface, name string) *Test { return &Test{lead: lead, name: name} } @@ -110,7 +113,7 @@ func getHomeDir() string { return usr.HomeDir } -var _ = Describe("Leader tests", func() { +var _ = Describe("Leader k8s tests", func() { configMapName := "leader-test" kubeconfig := path.Join(getHomeDir(), ".kube/config") if kubeconfig == "" { @@ -148,6 +151,7 @@ var _ = Describe("Leader tests", func() { leader2 := leader.NewElector(client, cf, configMapName, log) leader3 := leader.NewElector(client, cf, configMapName, log) + test1 := NewTest(leader1, "leader_1") test2 := NewTest(leader2, "leader_2") test3 := NewTest(leader3, "leader_3") @@ -234,3 +238,117 @@ var _ = Describe("Leader tests", func() { waitForPredicate(timeout, test1.isLeader) }) }) + +var _ = Describe("Leader db tests", func() { + configMapName := "leader" + cf := leader.Config{LeaseDuration: 1 * time.Second, RetryInterval: 500 * time.Millisecond} + db, err := gorm.Open("postgres", + fmt.Sprintf("host=%s port=%s user=admin dbname=installer password=admin sslmode=disable", + Options.DBHost, Options.DBPort)) + if err != nil { + logrus.Fatal("Fail to connect to DB, ", err) + } + + var tests []*Test + + AfterEach(func() { + for _, test := range tests { + test.stop() + } + }) + + BeforeEach(func() { + tests = []*Test{} + }) + + It("Leader test", func() { + leader1 := leader.NewDbElector(db, cf, configMapName, log) + leader2 := leader.NewDbElector(db, cf, configMapName, log) + leader3 := leader.NewDbElector(db, cf, configMapName, log) + + test1 := NewTest(leader1, "leader_1") + test2 := NewTest(leader2, "leader_2") + test3 := NewTest(leader3, "leader_3") + tests = []*Test{test1, test2, test3} + + By("Start leaders one by one") + + test1.start() + waitForPredicate(timeout, test1.isLeader) + test2.start() + test3.start() + // lets wait and verify that leader is not changed + time.Sleep(5 * time.Second) + waitForPredicate(timeout, test1.isLeader) + verifySingleLeader(tests) + log.Infof("Leader 1 is leader %t", leader1.IsLeader()) + log.Infof("Leader 2 is leader %t", leader2.IsLeader()) + log.Infof("Leader 3 is leader %t", leader3.IsLeader()) + + oldLeader := test1 + By("Cancelling current leader and verifying another one took it") + for i := 0; i < 2; i++ { + oldLeader.stop() + waitForPredicate(timeout, oldLeader.isNotLeader) + log.Infof("Find new leader") + waitForPredicate(timeout, func() bool { + return getLeader(tests) != nil + }) + newLeader := getLeader(tests) + log.Infof("New leader is %s", newLeader.name) + Expect(newLeader.name).ShouldNot(Equal(test1.name)) + // lets wait and verify that leader is not changed + time.Sleep(5 * time.Second) + waitForPredicate(timeout, newLeader.isLeader) + verifySingleLeader(tests) + oldLeader = newLeader + } + + By("Cancelling current") + oldLeader.stop() + waitForPredicate(timeout, oldLeader.isNotLeader) + + }) + + It("Bad db name", func() { + By("Adding leader with bad db name, must fail. Will be the same for any db create error") + badConfigMap := leader.NewDbElector(db, cf, "bad-name", log) + err := badConfigMap.StartLeaderElection(context.Background()) + Expect(err).Should(HaveOccurred()) + }) + + It("Test 2 leaders in parallel with different config map", func() { + leader1 := leader.NewDbElector(db, cf, configMapName, log) + test1 := NewTest(leader1, "leader_1") + tests = append(tests, test1) + test1.start() + waitForPredicate(timeout, test1.isLeader) + By("Adding leader with another name, must become a leader") + anotherConfigMap := leader.NewDbElector(db, cf, "another", log) + anotherConfigMapTest := NewTest(anotherConfigMap, "another") + tests = append(tests, anotherConfigMapTest) + anotherConfigMapTest.start() + waitForPredicate(timeout, anotherConfigMapTest.isLeader) + log.Infof("Verify that previous leader was not changed") + waitForPredicate(timeout, test1.isLeader) + }) + //It("Deleting configmap in a loop", func() { + // By("Deleting configmap in a loop (it must be recreated all the time), leader will loose leader and retake it") + // leader1 := leader.NewElector(client, cf, configMapName, log) + // test1 := NewTest(leader1, "leader_1") + // tests = append(tests, test1) + // test1.start() + // wasLost := false + // for i := 0; i < 300; i++ { + // _ = client.CoreV1().ConfigMaps(namespace).Delete(context.TODO(), configMapName, metav1.DeleteOptions{}) + // if !test1.isLeader() { + // wasLost = true + // break + // } + // time.Sleep(100 * time.Millisecond) + // } + // Expect(wasLost).Should(Equal(true)) + // log.Infof("Verifying leader still exists") + // waitForPredicate(timeout, test1.isLeader) + //}) +}) diff --git a/subsystem/subsystem_suite_test.go b/subsystem/subsystem_suite_test.go index 84951a20a4f..6195f203535 100644 --- a/subsystem/subsystem_suite_test.go +++ b/subsystem/subsystem_suite_test.go @@ -74,7 +74,7 @@ func init() { logrus.Fatal("Fail to connect to DB, ", err) } - if Options.EnableAuth { + if !Options.EnableAuth { wiremock = &WireMock{ OCMHost: Options.OCMHost, TestToken: Options.TestToken,