Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Revert "Migrate Beam Go to use the new Opaque Protocol Buffers (#33434)"
Browse files Browse the repository at this point in the history
This reverts commit 4797d75.
lostluck authored Jan 16, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 46699a0 commit 2bd5d7b
Showing 124 changed files with 12,021 additions and 52,793 deletions.
2 changes: 1 addition & 1 deletion sdks/go/cmd/beamctl/cmd/artifact.go
Original file line number Diff line number Diff line change
@@ -95,7 +95,7 @@ func listFn(cmd *cobra.Command, args []string) error {
}

for _, a := range md.GetManifest().GetArtifact() {
cmd.Println(a.GetName())
cmd.Println(a.Name)
}
return nil
}
2 changes: 1 addition & 1 deletion sdks/go/container/boot.go
Original file line number Diff line number Diff line change
@@ -184,7 +184,7 @@ func main() {

enableGoogleCloudProfiler := strings.Contains(options, enableGoogleCloudProfilerOption)
if enableGoogleCloudProfiler {
err := configureGoogleCloudProfilerEnvVars(ctx, logger, info.GetMetadata())
err := configureGoogleCloudProfilerEnvVars(ctx, logger, info.Metadata)
if err != nil {
logger.Printf(ctx, "could not configure Google Cloud Profiler variables, got %v", err)
}
38 changes: 19 additions & 19 deletions sdks/go/container/boot_test.go
Original file line number Diff line number Diff line change
@@ -29,11 +29,11 @@ import (
)

func TestEnsureEndpointsSet_AllSet(t *testing.T) {
provisionInfo := fnpb.ProvisionInfo_builder{
LoggingEndpoint: pipepb.ApiServiceDescriptor_builder{Url: "testLoggingEndpointUrl"}.Build(),
ArtifactEndpoint: pipepb.ApiServiceDescriptor_builder{Url: "testArtifactEndpointUrl"}.Build(),
ControlEndpoint: pipepb.ApiServiceDescriptor_builder{Url: "testControlEndpointUrl"}.Build(),
}.Build()
provisionInfo := &fnpb.ProvisionInfo{
LoggingEndpoint: &pipepb.ApiServiceDescriptor{Url: "testLoggingEndpointUrl"},
ArtifactEndpoint: &pipepb.ApiServiceDescriptor{Url: "testArtifactEndpointUrl"},
ControlEndpoint: &pipepb.ApiServiceDescriptor{Url: "testControlEndpointUrl"},
}
*loggingEndpoint = ""
*artifactEndpoint = ""
*controlEndpoint = ""
@@ -53,11 +53,11 @@ func TestEnsureEndpointsSet_AllSet(t *testing.T) {
}

func TestEnsureEndpointsSet_OneMissing(t *testing.T) {
provisionInfo := fnpb.ProvisionInfo_builder{
LoggingEndpoint: pipepb.ApiServiceDescriptor_builder{Url: "testLoggingEndpointUrl"}.Build(),
ArtifactEndpoint: pipepb.ApiServiceDescriptor_builder{Url: "testArtifactEndpointUrl"}.Build(),
ControlEndpoint: pipepb.ApiServiceDescriptor_builder{Url: ""}.Build(),
}.Build()
provisionInfo := &fnpb.ProvisionInfo{
LoggingEndpoint: &pipepb.ApiServiceDescriptor{Url: "testLoggingEndpointUrl"},
ArtifactEndpoint: &pipepb.ApiServiceDescriptor{Url: "testArtifactEndpointUrl"},
ControlEndpoint: &pipepb.ApiServiceDescriptor{Url: ""},
}
*loggingEndpoint = ""
*artifactEndpoint = ""
*controlEndpoint = ""
@@ -85,7 +85,7 @@ func TestGetGoWorkerArtifactName_NoArtifacts(t *testing.T) {

func TestGetGoWorkerArtifactName_OneArtifact(t *testing.T) {
artifact := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path", "sha")
artifacts := []*pipepb.ArtifactInformation{artifact}
artifacts := []*pipepb.ArtifactInformation{&artifact}

val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
@@ -99,7 +99,7 @@ func TestGetGoWorkerArtifactName_OneArtifact(t *testing.T) {
func TestGetGoWorkerArtifactName_MultipleArtifactsFirstIsWorker(t *testing.T) {
artifact1 := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path", "sha")
artifact2 := constructArtifactInformation(t, "other role", "test/path2", "sha")
artifacts := []*pipepb.ArtifactInformation{artifact1, artifact2}
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
@@ -113,7 +113,7 @@ func TestGetGoWorkerArtifactName_MultipleArtifactsFirstIsWorker(t *testing.T) {
func TestGetGoWorkerArtifactName_MultipleArtifactsSecondIsWorker(t *testing.T) {
artifact1 := constructArtifactInformation(t, "other role", "test/path", "sha")
artifact2 := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path2", "sha")
artifacts := []*pipepb.ArtifactInformation{artifact1, artifact2}
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
@@ -127,7 +127,7 @@ func TestGetGoWorkerArtifactName_MultipleArtifactsSecondIsWorker(t *testing.T) {
func TestGetGoWorkerArtifactName_MultipleArtifactsLegacyWay(t *testing.T) {
artifact1 := constructArtifactInformation(t, "other role", "test/path", "sha")
artifact2 := constructArtifactInformation(t, "other role", "worker", "sha")
artifacts := []*pipepb.ArtifactInformation{artifact1, artifact2}
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
@@ -141,7 +141,7 @@ func TestGetGoWorkerArtifactName_MultipleArtifactsLegacyWay(t *testing.T) {
func TestGetGoWorkerArtifactName_MultipleArtifactsNoneMatch(t *testing.T) {
artifact1 := constructArtifactInformation(t, "other role", "test/path", "sha")
artifact2 := constructArtifactInformation(t, "other role", "test/path2", "sha")
artifacts := []*pipepb.ArtifactInformation{artifact1, artifact2}
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

_, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err == nil {
@@ -193,16 +193,16 @@ func TestCopyExe(t *testing.T) {
}
}

func constructArtifactInformation(t *testing.T, roleUrn string, path string, sha string) *pipepb.ArtifactInformation {
func constructArtifactInformation(t *testing.T, roleUrn string, path string, sha string) pipepb.ArtifactInformation {
t.Helper()

typePayload, _ := proto.Marshal(pipepb.ArtifactFilePayload_builder{Path: path, Sha256: sha}.Build())
typePayload, _ := proto.Marshal(&pipepb.ArtifactFilePayload{Path: path, Sha256: sha})

return pipepb.ArtifactInformation_builder{
return pipepb.ArtifactInformation{
RoleUrn: roleUrn,
TypeUrn: artifact.URNFileArtifact,
TypePayload: typePayload,
}.Build()
}
}

func TestConfigureGoogleCloudProfilerEnvVars(t *testing.T) {
22 changes: 11 additions & 11 deletions sdks/go/container/pool/workerpool.go
Original file line number Diff line number Diff line change
@@ -80,24 +80,24 @@ func (s *Process) StartWorker(_ context.Context, req *fnpb.StartWorkerRequest) (
s.mu.Lock()
defer s.mu.Unlock()
if s.workers == nil {
return fnpb.StartWorkerResponse_builder{
return &fnpb.StartWorkerResponse{
Error: "worker pool shutting down",
}.Build(), nil
}, nil
}

if _, ok := s.workers[req.GetWorkerId()]; ok {
return fnpb.StartWorkerResponse_builder{
return &fnpb.StartWorkerResponse{
Error: fmt.Sprintf("worker with ID %q already exists", req.GetWorkerId()),
}.Build(), nil
}, nil
}
if req.GetLoggingEndpoint() == nil {
return fnpb.StartWorkerResponse_builder{Error: fmt.Sprintf("Missing logging endpoint for worker %v", req.GetWorkerId())}.Build(), nil
return &fnpb.StartWorkerResponse{Error: fmt.Sprintf("Missing logging endpoint for worker %v", req.GetWorkerId())}, nil
}
if req.GetControlEndpoint() == nil {
return fnpb.StartWorkerResponse_builder{Error: fmt.Sprintf("Missing control endpoint for worker %v", req.GetWorkerId())}.Build(), nil
return &fnpb.StartWorkerResponse{Error: fmt.Sprintf("Missing control endpoint for worker %v", req.GetWorkerId())}, nil
}
if req.GetLoggingEndpoint().HasAuthentication() || req.GetControlEndpoint().HasAuthentication() {
return fnpb.StartWorkerResponse_builder{Error: "[BEAM-10610] Secure endpoints not supported."}.Build(), nil
if req.GetLoggingEndpoint().Authentication != nil || req.GetControlEndpoint().Authentication != nil {
return &fnpb.StartWorkerResponse{Error: "[BEAM-10610] Secure endpoints not supported."}, nil
}

ctx := grpcx.WriteWorkerID(s.root, req.GetWorkerId())
@@ -118,7 +118,7 @@ func (s *Process) StartWorker(_ context.Context, req *fnpb.StartWorkerRequest) (
cmd.Env = nil // Use the current environment.

if err := cmd.Start(); err != nil {
return fnpb.StartWorkerResponse_builder{Error: fmt.Sprintf("Unable to start boot for worker %v: %v", req.GetWorkerId(), err)}.Build(), nil
return &fnpb.StartWorkerResponse{Error: fmt.Sprintf("Unable to start boot for worker %v: %v", req.GetWorkerId(), err)}, nil
}
return &fnpb.StartWorkerResponse{}, nil
}
@@ -137,9 +137,9 @@ func (s *Process) StopWorker(_ context.Context, req *fnpb.StopWorkerRequest) (*f
delete(s.workers, req.GetWorkerId())
return &fnpb.StopWorkerResponse{}, nil
}
return fnpb.StopWorkerResponse_builder{
return &fnpb.StopWorkerResponse{
Error: fmt.Sprintf("no worker with id %q running", req.GetWorkerId()),
}.Build(), nil
}, nil

}

60 changes: 30 additions & 30 deletions sdks/go/container/pool/workerpool_test.go
Original file line number Diff line number Diff line change
@@ -31,15 +31,15 @@ func TestProcess(t *testing.T) {
t.Skip("Binary `true` doesn't exist, skipping tests.")
}

endpoint := pipepb.ApiServiceDescriptor_builder{
endpoint := &pipepb.ApiServiceDescriptor{
Url: "localhost:0",
}.Build()
secureEndpoint := pipepb.ApiServiceDescriptor_builder{
}
secureEndpoint := &pipepb.ApiServiceDescriptor{
Url: "localhost:0",
Authentication: pipepb.AuthenticationSpec_builder{
Authentication: &pipepb.AuthenticationSpec{
Urn: "beam:authentication:oauth2_client_credentials_grant:v1",
}.Build(),
}.Build()
},
}

ctx, cancelFn := context.WithCancel(context.Background())
t.Cleanup(cancelFn)
@@ -54,60 +54,60 @@ func TestProcess(t *testing.T) {
errExpected bool
}{
{
req: fnpb.StartWorkerRequest_builder{
req: &fnpb.StartWorkerRequest{
WorkerId: "Worker1",
ControlEndpoint: endpoint,
LoggingEndpoint: endpoint,
}.Build(),
},
}, {
req: fnpb.StartWorkerRequest_builder{
req: &fnpb.StartWorkerRequest{
WorkerId: "Worker2",
ControlEndpoint: endpoint,
LoggingEndpoint: endpoint,
}.Build(),
},
}, {
req: fnpb.StartWorkerRequest_builder{
req: &fnpb.StartWorkerRequest{
WorkerId: "Worker1",
ControlEndpoint: endpoint,
LoggingEndpoint: endpoint,
}.Build(),
},
errExpected: true, // Repeated start
}, {
req: fnpb.StartWorkerRequest_builder{
req: &fnpb.StartWorkerRequest{
WorkerId: "missingControl",
LoggingEndpoint: endpoint,
}.Build(),
},
errExpected: true,
}, {
req: fnpb.StartWorkerRequest_builder{
req: &fnpb.StartWorkerRequest{
WorkerId: "missingLogging",
ControlEndpoint: endpoint,
}.Build(),
},
errExpected: true,
}, {
req: fnpb.StartWorkerRequest_builder{
req: &fnpb.StartWorkerRequest{
WorkerId: "secureLogging",
LoggingEndpoint: secureEndpoint,
ControlEndpoint: endpoint,
}.Build(),
},
errExpected: true,
}, {
req: fnpb.StartWorkerRequest_builder{
req: &fnpb.StartWorkerRequest{
WorkerId: "secureControl",
LoggingEndpoint: endpoint,
ControlEndpoint: secureEndpoint,
}.Build(),
},
errExpected: true,
},
}
for _, test := range startTests {
resp, err := server.StartWorker(ctx, test.req)
if test.errExpected {
if err != nil || resp.GetError() == "" {
if err != nil || resp.Error == "" {
t.Errorf("Expected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp)
}
} else {
if err != nil || resp.GetError() != "" {
if err != nil || resp.Error != "" {
t.Errorf("Unexpected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp)
}
}
@@ -117,29 +117,29 @@ func TestProcess(t *testing.T) {
errExpected bool
}{
{
req: fnpb.StopWorkerRequest_builder{
req: &fnpb.StopWorkerRequest{
WorkerId: "Worker1",
}.Build(),
},
}, {
req: fnpb.StopWorkerRequest_builder{
req: &fnpb.StopWorkerRequest{
WorkerId: "Worker1",
}.Build(),
},
errExpected: true,
}, {
req: fnpb.StopWorkerRequest_builder{
req: &fnpb.StopWorkerRequest{
WorkerId: "NonExistent",
}.Build(),
},
errExpected: true,
},
}
for _, test := range stopTests {
resp, err := server.StopWorker(ctx, test.req)
if test.errExpected {
if err != nil || resp.GetError() == "" {
if err != nil || resp.Error == "" {
t.Errorf("Expected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp)
}
} else {
if err != nil || resp.GetError() != "" {
if err != nil || resp.Error != "" {
t.Errorf("Unexpected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp)
}
}
24 changes: 12 additions & 12 deletions sdks/go/container/tools/buffered_logging_test.go
Original file line number Diff line number Diff line change
@@ -63,11 +63,11 @@ func TestBufferedLogger(t *testing.T) {

received := catcher.msgs[0].GetLogEntries()[0]

if got, want := received.GetMessage(), "test message"; got != want {
if got, want := received.Message, "test message"; got != want {
t.Errorf("got message %q, want %q", got, want)
}

if got, want := received.GetSeverity(), fnpb.LogEntry_Severity_DEBUG; got != want {
if got, want := received.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
t.Errorf("got severity %v, want %v", got, want)
}
})
@@ -96,11 +96,11 @@ func TestBufferedLogger(t *testing.T) {
received := catcher.msgs[0].GetLogEntries()

for i, message := range received {
if got, want := message.GetMessage(), messages[i]; got != want {
if got, want := message.Message, messages[i]; got != want {
t.Errorf("got message %q, want %q", got, want)
}

if got, want := message.GetSeverity(), fnpb.LogEntry_Severity_DEBUG; got != want {
if got, want := message.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
t.Errorf("got severity %v, want %v", got, want)
}
}
@@ -125,11 +125,11 @@ func TestBufferedLogger(t *testing.T) {

received := catcher.msgs[0].GetLogEntries()[0]

if got, want := received.GetMessage(), "test error"; got != want {
if got, want := received.Message, "test error"; got != want {
t.Errorf("got message %q, want %q", got, want)
}

if got, want := received.GetSeverity(), fnpb.LogEntry_Severity_ERROR; got != want {
if got, want := received.Severity, fnpb.LogEntry_Severity_ERROR; got != want {
t.Errorf("got severity %v, want %v", got, want)
}
})
@@ -158,11 +158,11 @@ func TestBufferedLogger(t *testing.T) {
received := catcher.msgs[0].GetLogEntries()

for i, message := range received {
if got, want := message.GetMessage(), messages[i]; got != want {
if got, want := message.Message, messages[i]; got != want {
t.Errorf("got message %q, want %q", got, want)
}

if got, want := message.GetSeverity(), fnpb.LogEntry_Severity_ERROR; got != want {
if got, want := message.Severity, fnpb.LogEntry_Severity_ERROR; got != want {
t.Errorf("got severity %v, want %v", got, want)
}
}
@@ -177,11 +177,11 @@ func TestBufferedLogger(t *testing.T) {

received := catcher.msgs[0].GetLogEntries()[0]

if got, want := received.GetMessage(), "foo bar"; got != want {
if got, want := received.Message, "foo bar"; got != want {
t.Errorf("l.Printf(\"foo %%v\", \"bar\"): got message %q, want %q", got, want)
}

if got, want := received.GetSeverity(), fnpb.LogEntry_Severity_DEBUG; got != want {
if got, want := received.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
t.Errorf("l.Printf(\"foo %%v\", \"bar\"): got severity %v, want %v", got, want)
}
})
@@ -229,11 +229,11 @@ func TestBufferedLogger(t *testing.T) {
messages = append(messages, lastMessage)

for i, message := range received {
if got, want := message.GetMessage(), messages[i]; got != want {
if got, want := message.Message, messages[i]; got != want {
t.Errorf("got message %q, want %q", got, want)
}

if got, want := message.GetSeverity(), fnpb.LogEntry_Severity_DEBUG; got != want {
if got, want := message.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
t.Errorf("got severity %v, want %v", got, want)
}
}
Loading

0 comments on commit 2bd5d7b

Please sign in to comment.