Skip to content

Commit

Permalink
Merge pull request #14 from nlnwa/import_update
Browse files Browse the repository at this point in the history
Add call option wait for ready to grpc calls
  • Loading branch information
johnerikhalse authored Mar 14, 2019
2 parents 53c8ecc + 71938be commit 4fc565d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 25 deletions.
6 changes: 3 additions & 3 deletions src/cmd/importcmd/convertoos.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ var convertOosCmd = &cobra.Command{
return fmt.Errorf("%v already exists", v)
}

seed := &seed{
seed := &seedDesc{
Uri: v,
SeedLabel: []*configV1.Label{{Key: "source", Value: "oosh"}},
EntityLabel: []*configV1.Label{{Key: "source", Value: "oosh"}},
Expand Down Expand Up @@ -185,7 +185,7 @@ type converter struct {
httpClient *http.Client
}

func (c *converter) checkUri(s *seed) (err error) {
func (c *converter) checkUri(s *seedDesc) (err error) {
uri, err := url.Parse(s.Uri)
if err != nil {
return fmt.Errorf("unparseable URL '%v', cause: %v", s.Uri, err)
Expand All @@ -208,7 +208,7 @@ func (c *converter) checkUri(s *seed) (err error) {
return
}

func (c *converter) checkRedirect(uri string, s *seed, count int) {
func (c *converter) checkRedirect(uri string, s *seedDesc, count int) {
if count > 5 {
return
}
Expand Down
47 changes: 26 additions & 21 deletions src/cmd/importcmd/importseed.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"os"
)

type seed struct {
type seedDesc struct {
EntityName string
Uri string
EntityLabel []*configV1.Label
Expand Down Expand Up @@ -107,20 +107,20 @@ var importSeedCmd = &cobra.Command{

// Processor for converting oos records into import records
proc := func(value interface{}) error {
s := value.(*seed)
if err := i.topLevelUri(s); err != nil {
sd := value.(*seedDesc)
if err := i.topLevelUri(sd); err != nil {
return err
}
exists, err := impf.Check(s.Uri)
exists, err := impf.Check(sd.Uri)
if exists.Code > importutil.DUPLICATE_NEW {
return fmt.Errorf("seed already exists: %v", s.Uri)
return fmt.Errorf("seed already exists: %v", sd.Uri)
}
if err := i.checkUri(s); err != nil {
if err := i.checkUri(sd); err != nil {
return err
}

exists, err = impf.CheckAndUpdateVeidemann(s.Uri, s, func(client configV1.ConfigClient, data interface{}) (id string, err error) {
obj := data.(*seed)
exists, err = impf.CheckAndUpdateVeidemann(sd.Uri, sd, func(client configV1.ConfigClient, data interface{}) (id string, err error) {
obj := data.(*seedDesc)
e := &configV1.ConfigObject{
ApiVersion: "v1",
Kind: configV1.Kind_crawlEntity,
Expand All @@ -130,10 +130,11 @@ var importSeedCmd = &cobra.Command{
Label: obj.EntityLabel,
},
}
ctx := context.Background()
log.Debugf("store entity: %v", e)
e, err = client.SaveConfigObject(context.Background(), e)
e, err = client.SaveConfigObject(ctx, e)
if err != nil {
client.DeleteConfigObject(context.Background(), e)
//client.DeleteConfigObject(context.Background(), e)
return "", fmt.Errorf("Error writing crawl entity: %v", err)
}

Expand All @@ -156,9 +157,13 @@ var importSeedCmd = &cobra.Command{
},
}
log.Debugf("store seed: %v", s)
_, err = client.SaveConfigObject(context.Background(), s)
_, err = client.SaveConfigObject(ctx, s)
if err != nil {
client.DeleteConfigObject(context.Background(), e)
if d, err := client.DeleteConfigObject(ctx, e); err == nil {
fmt.Println("Delete entity: ", d)
} else {
fmt.Println("Failed deletion of entity: ", err)
}
return "", fmt.Errorf("Error writing seed: %v", err)
}
return s.Id, nil
Expand All @@ -167,7 +172,7 @@ var importSeedCmd = &cobra.Command{
return err
}
if exists.Code > importutil.NEW {
return fmt.Errorf("seed already exists: %v", s.Uri)
return fmt.Errorf("seed already exists: %v", sd.Uri)
}

return nil
Expand All @@ -177,7 +182,7 @@ var importSeedCmd = &cobra.Command{
errorHandler := func(state *importutil.StateVal) {
var uri string
if state.Val != nil {
uri = state.Val.(*seed).Uri
uri = state.Val.(*seedDesc).Uri
}
_, _ = fmt.Fprintf(errFile, "{\"uri\": \"%s\", \"err\": \"%s\", \"file\": \"%s\", \"recNum\": %v}\n", uri, state.GetError(), state.GetFilename(), state.GetRecordNum())
}
Expand All @@ -194,8 +199,8 @@ var importSeedCmd = &cobra.Command{

// Process
for {
var ts seed
state, err := rr.Next(&ts)
var sd seedDesc
state, err := rr.Next(&sd)
if err == io.EOF {
break
}
Expand All @@ -204,10 +209,10 @@ var importSeedCmd = &cobra.Command{
os.Exit(1)
}
if importFlags.crawlJobId != "" {
ts.crawlJobRef = crawlJobRef
sd.crawlJobRef = crawlJobRef
}

conv.Do(state, &ts)
conv.Do(state, &sd)
}

conv.Finish()
Expand All @@ -233,7 +238,7 @@ func init() {
importSeedCmd.PersistentFlags().BoolVarP(&importFlags.resetDb, "reset-db", "r", false, "Clean state db")
}

func (i *importer) topLevelUri(s *seed) (err error) {
func (i *importer) topLevelUri(s *seedDesc) (err error) {
uri, err := url.Parse(s.Uri)
if err != nil {
return fmt.Errorf("unparseable URL '%v', cause: %v", s.Uri, err)
Expand All @@ -252,14 +257,14 @@ func (i *importer) topLevelUri(s *seed) (err error) {
return
}

func (i *importer) checkUri(s *seed) (err error) {
func (i *importer) checkUri(s *seedDesc) (err error) {
if importFlags.checkUri {
i.checkRedirect(s.Uri, s, 0)
}
return
}

func (i *importer) checkRedirect(uri string, s *seed, count int) {
func (i *importer) checkRedirect(uri string, s *seedDesc, count int) {
if count > 5 {
return
}
Expand Down
5 changes: 4 additions & 1 deletion src/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ func newConnection() *grpc.ClientConn {
func connect(idp string, tls bool) (*grpc.ClientConn, bool) {
address := configutil.GlobalFlags.ControllerAddress

dialOptions := []grpc.DialOption{}
dialOptions := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
}

if idp != "" {
dialOptions = AddCredentials(idp, dialOptions)
}
Expand Down

0 comments on commit 4fc565d

Please sign in to comment.