Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/go_modules/go.temporal.io/api-1.26.1
Browse files Browse the repository at this point in the history
  • Loading branch information
josh-berry authored Jan 29, 2024
2 parents 94fdc17 + 8aefcd1 commit 50f25b0
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 50 deletions.
21 changes: 15 additions & 6 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/golang/mock/gomock"
"github.com/pborman/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/temporalio/cli/app"
"github.com/temporalio/cli/client"
Expand Down Expand Up @@ -100,7 +99,6 @@ func (s *cliAppSuite) SetupTest() {
}

func (s *cliAppSuite) TearDownTest() {
s.mockCtrl.Finish() // assert mock’s expectations
}

func (s *cliAppSuite) TestTopLevelCommands() {
Expand All @@ -119,6 +117,19 @@ var describeTaskQueueResponse = &workflowservice.DescribeTaskQueueResponse{
{
LastAccessTime: timestamp.TimePtr(time.Now().UTC()),
Identity: "tester",
WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: "some-build-id",
UseVersioning: false,
},
},
},
TaskQueueStatus: &taskqueuepb.TaskQueueStatus{
BacklogCountHint: 0,
ReadLevel: 100000,
AckLevel: 100000,
TaskIdBlock: &taskqueuepb.TaskIdBlock{
StartId: 100001,
EndId: 200000,
},
},
}
Expand Down Expand Up @@ -176,17 +187,15 @@ func (s *cliAppSuite) TestAcceptStringSliceArgsWithCommas() {
}

func (s *cliAppSuite) TestDescribeTaskQueue() {
s.sdkClient.On("DescribeTaskQueue", mock.Anything, mock.Anything, mock.Anything).Return(describeTaskQueueResponse, nil).Once()
s.frontendClient.EXPECT().DescribeTaskQueue(gomock.Any(), gomock.Any()).Return(describeTaskQueueResponse, nil)
err := s.app.Run([]string{"", "task-queue", "describe", "--task-queue", "test-taskQueue", "--namespace", cliTestNamespace})
s.Nil(err)
s.sdkClient.AssertExpectations(s.T())
}

func (s *cliAppSuite) TestDescribeTaskQueue_Activity() {
s.sdkClient.On("DescribeTaskQueue", mock.Anything, mock.Anything, mock.Anything).Return(describeTaskQueueResponse, nil).Once()
s.frontendClient.EXPECT().DescribeTaskQueue(gomock.Any(), gomock.Any()).Return(describeTaskQueueResponse, nil)
err := s.app.Run([]string{"", "task-queue", "describe", "--namespace", cliTestNamespace, "--task-queue", "test-taskQueue", "--task-queue-type", "activity"})
s.Nil(err)
s.sdkClient.AssertExpectations(s.T())
}

// TestFlagCategory_IsSet verifies that command flags have Category set
Expand Down
3 changes: 3 additions & 0 deletions cmd/temporal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
// Load sqlite storage driver
_ "go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite"

// Embed time zone database as a fallback if platform database can't be found
_ "time/tzdata"

"github.com/temporalio/cli/app"
)

Expand Down
2 changes: 2 additions & 0 deletions common/defs-flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
FlagSearchAttributeDefinition = "Passes Search Attribute in key=value format. Use valid JSON formats for value."
FlagMemoDefinition = "Passes a memo in key=value format. Use valid JSON formats for value."
FlagMemoFileDefinition = "Passes a memo as file input, with each line following key=value format. Use valid JSON formats for value."
FlagStartDelayDefinition = "Specify a delay before the workflow starts (in seconds). Cannot be used with a cron schedule. If the workflow receives a signal or update before the delay has elapsed, it will begin immediately."

// Other Workflow flags
FlagResetPointsUsage = "Only show auto-reset points."
Expand Down Expand Up @@ -135,6 +136,7 @@ const (
// Task Queue flags
FlagTaskQueueName = "Name of the Task Queue."
FlagTaskQueueTypeDefinition = "Task Queue type [workflow|activity]"
FlagPartitionsDefinition = "Query for all partitions up to this number (experimental+temporary feature)"

// Namespace update flags
FlagActiveClusterDefinition = "Active cluster name."
Expand Down
7 changes: 7 additions & 0 deletions common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var (
FlagOverlapPolicy = "overlap-policy"
FlagOwnerEmail = "email"
FlagParallelism = "input-parallelism"
FlagPartitions = "partitions"
FlagPause = "pause"
FlagPauseOnFailure = "pause-on-failure"
FlagPort = "port"
Expand All @@ -104,6 +105,7 @@ var (
FlagSkipBaseIsNotCurrent = "skip-base-is-not-current"
FlagSkipCurrentOpen = "skip-current-open"
FlagStartTime = "start-time"
FlagStartDelay = "start-delay"
FlagTaskQueue = "task-queue"
FlagTaskQueueAlias = []string{"t"}
FlagTaskQueueType = "task-queue-type"
Expand Down Expand Up @@ -364,6 +366,11 @@ var FlagsForStartWorkflowT = []cli.Flag{
Usage: FlagMemoFileDefinition,
Category: CategoryMain,
},
&cli.StringFlag{
Name: FlagStartDelay,
Usage: FlagStartDelayDefinition,
Category: CategoryMain,
},
}

var FlagsForWorkflowFiltering = []cli.Flag{
Expand Down
12 changes: 2 additions & 10 deletions searchattribute/search_attribute_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,10 @@ func AddSearchAttributes(c *cli.Context) error {
return fmt.Errorf("unable to parse search attribute type %s: %w", typeStr, err)
}
existingSearchAttributeType, searchAttributeExists := existingSearchAttributes.CustomAttributes[names[i]]
if !searchAttributeExists {
searchAttributes[names[i]] = enumspb.IndexedValueType(typeInt)
continue
}
if existingSearchAttributeType != enumspb.IndexedValueType(typeInt) {
if searchAttributeExists && existingSearchAttributeType != enumspb.IndexedValueType(typeInt) {
return fmt.Errorf("search attribute %s already exists and has different type %s: %w", names[i], existingSearchAttributeType, err)
}
}

if len(searchAttributes) == 0 {
fmt.Println(color.Yellow(c, "Search attributes already exist"))
return nil
searchAttributes[names[i]] = enumspb.IndexedValueType(typeInt)
}

request := &operatorservice.AddSearchAttributesRequest{
Expand Down
3 changes: 3 additions & 0 deletions server/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ func NewServerCommands(defaultCfg *sconfig.Config) []*cli.Command {
Codec: uiconfig.Codec{
Endpoint: uiCodecEndpoint,
},
CORS: uiconfig.CORS{
CookieInsecure: true,
},
}

opt, err := newUIOption(uiBaseCfg)
Expand Down
7 changes: 7 additions & 0 deletions taskqueue/task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ func NewTaskQueueCommands() []*cli.Command {
Usage: common.FlagTaskQueueTypeDefinition,
Category: common.CategoryMain,
},
// TOOD: remove this when the server does partition fan-out
&cli.IntFlag{
Name: common.FlagPartitions,
Value: 1,
Usage: common.FlagPartitionsDefinition,
Category: common.CategoryMain,
},
}, common.FlagsForFormatting...),
Action: func(c *cli.Context) error {
return DescribeTaskQueue(c)
Expand Down
85 changes: 74 additions & 11 deletions taskqueue/task_queue_commands.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package taskqueue

import (
"encoding/json"
"fmt"
"strings"

Expand All @@ -9,37 +10,99 @@ import (
"github.com/temporalio/tctl-kit/pkg/color"
"github.com/temporalio/tctl-kit/pkg/output"
"github.com/urfave/cli/v2"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/common/tqname"
)

// DescribeTaskQueue show pollers info of a given taskqueue
func DescribeTaskQueue(c *cli.Context) error {
sdkClient, err := client.GetSDKClient(c)
taskQueue := c.String(common.FlagTaskQueue)
tqName, err := tqname.FromBaseName(taskQueue)
if err != nil {
return err
}
taskQueue := c.String(common.FlagTaskQueue)
taskQueueType := strToTaskQueueType(c.String(common.FlagTaskQueueType))
partitions := c.Int(common.FlagPartitions)

ctx, cancel := common.NewContext(c)
defer cancel()
resp, err := sdkClient.DescribeTaskQueue(ctx, taskQueue, taskQueueType)

frontendClient := client.Factory(c.App).FrontendClient(c)
namespace, err := common.RequiredFlag(c, common.FlagNamespace)
if err != nil {
return fmt.Errorf("unable to describe task queue: %w", err)
return err
}

type statusWithPartition struct {
Partition int `json:"partition"`
taskqueuepb.TaskQueueStatus
}
type pollerWithPartition struct {
Partition int `json:"partition"`
taskqueuepb.PollerInfo
// copy this out to display nicer in table or card, but not json
Versioning *commonpb.WorkerVersionCapabilities `json:"-"`
}

var statuses []any
var pollers []any

// TOOD: remove this when the server does partition fan-out
for p := 0; p < partitions; p++ {
resp, err := frontendClient.DescribeTaskQueue(ctx, &workflowservice.DescribeTaskQueueRequest{
Namespace: namespace,
TaskQueue: &taskqueuepb.TaskQueue{
Name: tqName.WithPartition(p).FullName(),
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
},
TaskQueueType: taskQueueType,
IncludeTaskQueueStatus: true,
})
// note that even if it doesn't exist before this call, DescribeTaskQueue will return something
if err != nil {
return fmt.Errorf("unable to describe task queue: %w", err)
}
statuses = append(statuses, &statusWithPartition{
Partition: p,
TaskQueueStatus: *resp.TaskQueueStatus,
})
for _, pi := range resp.Pollers {
pollers = append(pollers, &pollerWithPartition{
Partition: p,
PollerInfo: *pi,
Versioning: pi.WorkerVersionCapabilities,
})
}
}

if output.OutputOption(c.String(output.FlagOutput)) == output.JSON {
// handle specially so we output a single object instead of two
b, err := json.MarshalIndent(map[string]any{
"taskQueues": statuses,
"pollers": pollers,
}, "", " ")
if err != nil {
return err
}
_, err = fmt.Println(string(b))
return err
}

opts := &output.PrintOptions{
// TODO enable when versioning feature is out
// Fields: []string{"Identity", "LastAccessTime", "RatePerSecond", "WorkerVersioningId"},
Fields: []string{"Identity", "LastAccessTime", "RatePerSecond"},
Fields: []string{"Partition", "TaskQueueStatus.RatePerSecond", "TaskQueueStatus.BacklogCountHint", "TaskQueueStatus.ReadLevel", "TaskQueueStatus.AckLevel", "TaskQueueStatus.TaskIdBlock"},
}
var items []interface{}
for _, e := range resp.Pollers {
items = append(items, e)
err = output.PrintItems(c, statuses, opts)
if err != nil {
return err
}
return output.PrintItems(c, items, opts)

opts = &output.PrintOptions{
Fields: []string{"Partition", "PollerInfo.Identity", "PollerInfo.LastAccessTime", "PollerInfo.RatePerSecond", "Versioning.BuildId", "Versioning.UseVersioning"},
}
return output.PrintItems(c, pollers, opts)
}

// ListTaskQueuePartitions gets all the taskqueue partition and host information.
Expand Down
49 changes: 49 additions & 0 deletions trace/summary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package trace

import (
"fmt"

"github.com/temporalio/cli/common"
"github.com/temporalio/tctl-kit/pkg/output"
"github.com/urfave/cli/v2"
sdkclient "go.temporal.io/sdk/client"
)

type Row struct {
Key string
Value string
}

func PrintWorkflowSummary(c *cli.Context, sdkClient sdkclient.Client, wfId, runId string) error {
tcCtx, cancel := common.NewIndefiniteContext(c)
defer cancel()

res, err := sdkClient.DescribeWorkflowExecution(tcCtx, wfId, runId)
if err != nil {
return err
}

info := res.GetWorkflowExecutionInfo()

_, _ = title.Println("Execution summary:")
rows := []Row{
{"Workflow Id", info.GetExecution().GetWorkflowId()},
{"Workflow Run Id", info.GetExecution().GetRunId()},
{"Workflow Type", info.GetType().GetName()},
{"Task Queue", info.GetTaskQueue()},
}
var i []interface{}
for _, row := range rows {
i = append(i, row)
}

err = output.PrintItems(c, i,
&output.PrintOptions{
Fields: []string{"Key", "Value"},
NoHeader: true,
},
)
_, _ = fmt.Println()

return err
}
Loading

0 comments on commit 50f25b0

Please sign in to comment.