Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dblock #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import (

"github.com/openshift/assisted-service/internal/imgexpirer"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

"github.com/go-openapi/strfmt"
"github.com/go-openapi/swag"
"github.com/jinzhu/gorm"
Expand Down Expand Up @@ -184,16 +181,10 @@ func main() {
}
generator = job.New(log.WithField("pkg", "k8s-job-wrapper"), kclient, Options.JobConfig)

cfg, cerr := clientcmd.BuildConfigFromFlags("", "")
if cerr != nil {
log.WithError(cerr).Fatalf("Failed to create kubernetes cluster config")
}
k8sClient := kubernetes.NewForConfigOrDie(cfg)
lead = leader.NewElector(k8sClient, Options.LeaderConfig, "assisted-service-leader-election-helper",
log.WithField("pkg", "monitor-runner"))
lead = leader.NewDbElector(db, Options.LeaderConfig, "assisted-service-lock", log)
err = lead.StartLeaderElection(context.Background())
if err != nil {
log.WithError(cerr).Fatalf("Failed to start leader")
log.WithError(err).Fatalf("Failed to start leader")
}

case "onprem":
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/openshift/assisted-service
go 1.13

require (
cirello.io/pglock v1.8.0
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d
github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535
github.com/aws/aws-sdk-go v1.32.6
Expand All @@ -22,6 +23,7 @@ require (
github.com/google/uuid v1.1.1
github.com/jinzhu/gorm v1.9.12
github.com/kelseyhightower/envconfig v1.4.0
github.com/lib/pq v1.8.0
github.com/moby/moby v1.13.1
github.com/onsi/ginkgo v1.14.0
github.com/onsi/gomega v1.10.1
Expand All @@ -43,11 +45,11 @@ require (
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/square/go-jose.v2 v2.2.2
gopkg.in/yaml.v2 v2.3.0
k8s.io/api v0.18.5
k8s.io/apimachinery v0.18.5
k8s.io/client-go v0.18.5
k8s.io/klog/v2 v2.0.0
sigs.k8s.io/controller-runtime v0.6.1
)
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
bazil.org/fuse v0.0.0-20160811212531-371fbbdaa898/go.mod h1:Xbm+BRKSBEpa4q4hTSxohYNQpsxXPbPry4JJWOB3LB8=
cirello.io/pglock v1.8.0 h1:YmXjZ+zE2c6cuRP2efbRDKnk/qu36g0wbshlJetRIzM=
cirello.io/pglock v1.8.0/go.mod h1:iO/b3K4gTIIKO3DhR8t1mYjtjI6tQJhAED2o9oXtP4I=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand All @@ -14,6 +16,7 @@ github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6L
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU=
github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
Expand Down Expand Up @@ -344,6 +347,10 @@ github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgx
github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.1.1 h1:sJZmqHoEaY7f+NPP8pgLB/WxulyR3fewgCM2qaSlBb4=
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg=
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
Expand Down Expand Up @@ -685,6 +692,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gomodules.xyz/jsonpatch/v2 v2.0.1 h1:xyiBuvkD2g5n7cYzx6u2sxQvsAy4QJsZFCzGVdzOXZ0=
gomodules.xyz/jsonpatch/v2 v2.0.1/go.mod h1:IhYNNY4jnS53ZnfE4PAmpKtDpTCj1JFXc+3mwe7XcUU=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
Expand Down
73 changes: 73 additions & 0 deletions pkg/leader/leaderelector_posgress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package leader

import (
"cirello.io/pglock"
"context"
"errors"
"github.com/jinzhu/gorm"
"github.com/lib/pq"
"github.com/sirupsen/logrus"
)


type DbElector struct {
log logrus.FieldLogger
db *gorm.DB
config Config
isLeader bool
leaderName string
}



func NewDbElector(db *gorm.DB, config Config, leaderName string, logger logrus.FieldLogger) *DbElector {
return &DbElector{db: db, log: logger, config: config, isLeader: false, leaderName: leaderName}
}

func (l *DbElector) IsLeader() bool {
return l.isLeader
}

func (l *DbElector) StartLeaderElection(ctx context.Context) error {

c, err := pglock.New(l.db.DB(),
pglock.WithLeaseDuration(l.config.LeaseDuration),
pglock.WithHeartbeatFrequency(l.config.RetryInterval),
pglock.WithCustomTable(l.leaderName))
if err != nil {
l.log.WithError(err).Error("Failed to create db lock")
}
err = c.CreateTable()
if err != nil {
if p, ok := errors.Unwrap(err).(*pq.Error); !ok || p.Code.Name() != "duplicate_table" {
l.log.WithError(err).Infof("CCCCCCCCCCCCCCCCCCCCCCCCCCC")
return err
}
}

go func() {
var lock *pglock.Lock
defer func() {
if lock != nil {
lock.Close()
}
}()
for {
if ctx.Err() != nil {
return
}
l.log.Infof("BBBBBBBBBBBBBBBBBBBBBBBBB")
err = c.Do(ctx, l.leaderName, l.locked)
l.log.WithError(err).Infof("AAAAAAAAAAAAAAAAAAAAAAAA")
}
}()

return nil
}
func (l *DbElector) locked(ctx context.Context, lock *pglock.Lock) error{
l.log.Infof("GGGGGGGGGGGGGGGGGGGGGGGGGGGG")
l.isLeader = true
<-ctx.Done()
l.isLeader = false
return nil
}
4 changes: 2 additions & 2 deletions subsystem/authz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var _ = Describe("test authorization", func() {
var capabilityReviewAdminStubID string

BeforeSuite(func() {
if !Options.EnableAuth {
if Options.EnableAuth {
return
}

Expand All @@ -45,7 +45,7 @@ var _ = Describe("test authorization", func() {
})

AfterSuite(func() {
if !Options.EnableAuth {
if Options.EnableAuth {
return
}

Expand Down
124 changes: 121 additions & 3 deletions subsystem/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package subsystem

import (
"context"
"fmt"
"github.com/jinzhu/gorm"
"github.com/sirupsen/logrus"
"os/user"
"path"
"time"
Expand Down Expand Up @@ -52,13 +55,13 @@ func waitForPredicate(timeout time.Duration, predicate func() bool) {
}

type Test struct {
lead *leader.Elector
lead leader.ElectorInterface
name string
ctx context.Context
cancel context.CancelFunc
}

func NewTest(lead *leader.Elector, name string) *Test {
func NewTest(lead leader.ElectorInterface, name string) *Test {
return &Test{lead: lead, name: name}
}

Expand Down Expand Up @@ -110,7 +113,7 @@ func getHomeDir() string {
return usr.HomeDir
}

var _ = Describe("Leader tests", func() {
var _ = Describe("Leader k8s tests", func() {
configMapName := "leader-test"
kubeconfig := path.Join(getHomeDir(), ".kube/config")
if kubeconfig == "" {
Expand Down Expand Up @@ -148,6 +151,7 @@ var _ = Describe("Leader tests", func() {
leader2 := leader.NewElector(client, cf, configMapName, log)
leader3 := leader.NewElector(client, cf, configMapName, log)


test1 := NewTest(leader1, "leader_1")
test2 := NewTest(leader2, "leader_2")
test3 := NewTest(leader3, "leader_3")
Expand Down Expand Up @@ -234,3 +238,117 @@ var _ = Describe("Leader tests", func() {
waitForPredicate(timeout, test1.isLeader)
})
})

var _ = Describe("Leader db tests", func() {
configMapName := "leader"
cf := leader.Config{LeaseDuration: 1 * time.Second, RetryInterval: 500 * time.Millisecond}
db, err := gorm.Open("postgres",
fmt.Sprintf("host=%s port=%s user=admin dbname=installer password=admin sslmode=disable",
Options.DBHost, Options.DBPort))
if err != nil {
logrus.Fatal("Fail to connect to DB, ", err)
}

var tests []*Test

AfterEach(func() {
for _, test := range tests {
test.stop()
}
})

BeforeEach(func() {
tests = []*Test{}
})

It("Leader test", func() {
leader1 := leader.NewDbElector(db, cf, configMapName, log)
leader2 := leader.NewDbElector(db, cf, configMapName, log)
leader3 := leader.NewDbElector(db, cf, configMapName, log)

test1 := NewTest(leader1, "leader_1")
test2 := NewTest(leader2, "leader_2")
test3 := NewTest(leader3, "leader_3")
tests = []*Test{test1, test2, test3}

By("Start leaders one by one")

test1.start()
waitForPredicate(timeout, test1.isLeader)
test2.start()
test3.start()
// lets wait and verify that leader is not changed
time.Sleep(5 * time.Second)
waitForPredicate(timeout, test1.isLeader)
verifySingleLeader(tests)
log.Infof("Leader 1 is leader %t", leader1.IsLeader())
log.Infof("Leader 2 is leader %t", leader2.IsLeader())
log.Infof("Leader 3 is leader %t", leader3.IsLeader())

oldLeader := test1
By("Cancelling current leader and verifying another one took it")
for i := 0; i < 2; i++ {
oldLeader.stop()
waitForPredicate(timeout, oldLeader.isNotLeader)
log.Infof("Find new leader")
waitForPredicate(timeout, func() bool {
return getLeader(tests) != nil
})
newLeader := getLeader(tests)
log.Infof("New leader is %s", newLeader.name)
Expect(newLeader.name).ShouldNot(Equal(test1.name))
// lets wait and verify that leader is not changed
time.Sleep(5 * time.Second)
waitForPredicate(timeout, newLeader.isLeader)
verifySingleLeader(tests)
oldLeader = newLeader
}

By("Cancelling current")
oldLeader.stop()
waitForPredicate(timeout, oldLeader.isNotLeader)

})

It("Bad db name", func() {
By("Adding leader with bad db name, must fail. Will be the same for any db create error")
badConfigMap := leader.NewDbElector(db, cf, "bad-name", log)
err := badConfigMap.StartLeaderElection(context.Background())
Expect(err).Should(HaveOccurred())
})

It("Test 2 leaders in parallel with different config map", func() {
leader1 := leader.NewDbElector(db, cf, configMapName, log)
test1 := NewTest(leader1, "leader_1")
tests = append(tests, test1)
test1.start()
waitForPredicate(timeout, test1.isLeader)
By("Adding leader with another name, must become a leader")
anotherConfigMap := leader.NewDbElector(db, cf, "another", log)
anotherConfigMapTest := NewTest(anotherConfigMap, "another")
tests = append(tests, anotherConfigMapTest)
anotherConfigMapTest.start()
waitForPredicate(timeout, anotherConfigMapTest.isLeader)
log.Infof("Verify that previous leader was not changed")
waitForPredicate(timeout, test1.isLeader)
})
//It("Deleting configmap in a loop", func() {
// By("Deleting configmap in a loop (it must be recreated all the time), leader will loose leader and retake it")
// leader1 := leader.NewElector(client, cf, configMapName, log)
// test1 := NewTest(leader1, "leader_1")
// tests = append(tests, test1)
// test1.start()
// wasLost := false
// for i := 0; i < 300; i++ {
// _ = client.CoreV1().ConfigMaps(namespace).Delete(context.TODO(), configMapName, metav1.DeleteOptions{})
// if !test1.isLeader() {
// wasLost = true
// break
// }
// time.Sleep(100 * time.Millisecond)
// }
// Expect(wasLost).Should(Equal(true))
// log.Infof("Verifying leader still exists")
// waitForPredicate(timeout, test1.isLeader)
//})
})
2 changes: 1 addition & 1 deletion subsystem/subsystem_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func init() {
logrus.Fatal("Fail to connect to DB, ", err)
}

if Options.EnableAuth {
if !Options.EnableAuth {
wiremock = &WireMock{
OCMHost: Options.OCMHost,
TestToken: Options.TestToken,
Expand Down