Skip to content

Commit

Permalink
updates to go bulk export
Browse files Browse the repository at this point in the history
  • Loading branch information
corkrean committed Dec 13, 2024
1 parent b0be8b2 commit fd7e7aa
Showing 1 changed file with 42 additions and 26 deletions.
68 changes: 42 additions & 26 deletions code-examples/go/exportBulk/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"
"log"
"time"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/authzed/authzed-go/v1"
Expand All @@ -17,7 +18,6 @@ const spicedbEndpoint = "localhost:50051"
func main() {
client, err := authzed.NewClient(
spicedbEndpoint,
//this example connects to an insecure endpoint
grpcutil.WithInsecureBearerToken("abc123"),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
Expand All @@ -30,42 +30,58 @@ func main() {

var cursor *v1.Cursor
exportResults := make([]*v1.Relationship, 0)
batchCount := 0

// Prepare request with cursor if available
totalProcessed := 0

const maxRetries = 5

var endOfFile bool

request := &v1.ExportBulkRelationshipsRequest{
//optimal limit is around 1,000
OptionalLimit: 1000,
}
if cursor != nil {
request.OptionalCursor = cursor
}

// Fetch the stream
stream, err := client.PermissionsServiceClient.ExportBulkRelationships(ctx, request)
if err != nil {
log.Fatalf("failed to export relationships: %s", err)
}

batchSize := 0
retries := 0
for {
item, err := stream.Recv()
if err == io.EOF {
// End of the current stream
break
if cursor != nil {
request.OptionalCursor = cursor
}
stream, err := client.PermissionsServiceClient.ExportBulkRelationships(ctx, request)
if err != nil {
log.Fatalf("stream error: %s", err)
retries++
if retries >= maxRetries {
log.Fatalf("Max retries reached: %s", err)
}
log.Printf("Retrying (%d/%d) after error: %s", retries, maxRetries, err)
time.Sleep(time.Second * time.Duration(retries)) // Backoff
continue
}

// Append relationships and update the cursor
exportResults = append(exportResults, item.Relationships...)
cursor = item.AfterResultCursor
batchSize += len(item.Relationships)
batchCount++
for {
item, err := stream.Recv()
if err == io.EOF {
endOfFile = true
break
}
if err != nil {
log.Printf("Stream error: %s", err)
break
}

log.Printf("Processed batch %d. %d relationships so far.\n", batchCount, batchSize)
exportResults = append(exportResults, item.Relationships...)

cursor = item.AfterResultCursor

totalProcessed += len(item.Relationships)
log.Printf("Processed total: %d.", totalProcessed)

}

if endOfFile {
break
}
}

log.Printf("Export complete: %d relationships retrieved\n", len(exportResults))
log.Printf("Export complete: %d relationships retrieved\n", totalProcessed)

}

0 comments on commit fd7e7aa

Please sign in to comment.