Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AliECS core Kafka producer #520

Merged
merged 43 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
f963751
[core] Refactor serverutil in preparation for eventstream
teo Sep 1, 2022
25fdadd
[core] Events protofile
teo Feb 15, 2024
ca1c759
[core] Additional events
teo Feb 15, 2024
0f92acb
[core] Update events.proto+o2control.proto with NewEnvironmentAsync
teo Feb 16, 2023
cf40d75
[common] Use events.proto in o2control.proto
teo Feb 20, 2024
8c5f41f
[coconut] Fix Protobuf generator call
teo Feb 21, 2024
34b7a0d
[core] Kafka wrapper
teo Feb 23, 2024
0a7c61b
[core] Emit environment events
teo Feb 29, 2024
e983b2c
[coconut] Add asynchronous mode (-y) to coconut env create command
teo Feb 16, 2023
fabece2
[coconut] Implement coconut env create -y
teo Feb 29, 2024
d13d12b
[core] Add task traits and CallEvent to events.proto
teo Feb 16, 2023
231d3d9
[common] Add CallEvent
teo Feb 29, 2024
aa4ac3f
[core] Emit call events to inform on plugin calls
teo Feb 16, 2023
95c41fe
[core] Send EnvId with TaskEvents
teo Feb 29, 2024
225a5f9
[core] Rename busEvent in task.go
teo Mar 8, 2024
48499fb
[core] Add IntegratedServiceEvent and rename Envid field
teo Mar 8, 2024
fe2dc90
[core] Push env vars on workflow load
teo Mar 13, 2024
7530576
[common] Allow event creation with specific timestamp
teo Mar 14, 2024
28c3e68
[common] Various additions to events in events.proto
teo Mar 14, 2024
6b7c8fd
[build] Bump dependencies
teo Mar 14, 2024
92d513f
[core] Include parent role path in task events
teo Mar 14, 2024
e82a40e
[core] Improve call information in CallEvents
teo Mar 14, 2024
dc190db
[core] Emit IntegratedServiceEvents from DCS
teo Mar 14, 2024
8c57a45
[core] Make sure we always output ECS detector codes, not DCS ones
teo Mar 14, 2024
274e73f
[core] Don't forget to include error in DCS ERROR events
teo Mar 14, 2024
b3e3461
[core] Better DCS event descriptions
teo Mar 15, 2024
27eec71
[core] Emit ddscheduler events
teo Mar 15, 2024
68b2c46
[core] Remove legacy ODC handlers
teo Mar 15, 2024
16ce849
[core] Emit ODC events
teo Mar 15, 2024
db62a9f
[core] Emit TRG events
teo Mar 18, 2024
ae22193
[common] Enable AllowAutoTopicCreation in Kafka client
teo Mar 19, 2024
831178d
[core] Correct Kafka topic
teo Mar 19, 2024
9c62431
[build] Generate fdset file for decoding Kafka messages with pq
teo Mar 19, 2024
d17a4a1
[core] Emit call events to aliecs.call topic and include envId
teo Mar 19, 2024
8695b2a
[core] Enable IntegratedServiceEvents
teo Mar 20, 2024
31eb36d
[core] Pass IntegratedServiceEvents by ref
teo Mar 20, 2024
e733190
[core] Write to Kafka asynchronously
teo Mar 21, 2024
6ffe6b6
[core] Nullify odc Devices list before emitting events
teo Mar 21, 2024
d8785f0
[core] Trim down ODC events some more
teo Mar 21, 2024
e3a392f
[core] Publish ODC partition state changes
teo Mar 21, 2024
e3d59c1
[core] Document events.proto and change currentRunNumber field
teo Mar 22, 2024
62dcce8
[core] Document currently unused topics
teo Mar 22, 2024
f7c2786
[docs] Document Kafka producer functionality
teo Mar 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ WHAT_o2-apricot_BUILD_FLAGS=$(BUILD_ENV_FLAGS)

INSTALL_WHAT:=$(patsubst %, install_%, $(WHAT))


GENERATE_DIRS := ./apricot ./coconut/cmd ./common/runtype ./common/system ./core ./core/integration/ccdb ./core/integration/dcs ./core/integration/ddsched ./core/integration/kafka ./core/integration/odc ./executor ./walnut ./core/integration/trg ./core/integration/bookkeeping
GENERATE_DIRS := ./apricot ./coconut/cmd ./common ./common/runtype ./common/system ./core ./core/integration/ccdb ./core/integration/dcs ./core/integration/ddsched ./core/integration/kafka ./core/integration/odc ./executor ./walnut ./core/integration/trg ./core/integration/bookkeeping
SRC_DIRS := ./apricot ./cmd/* ./core ./coconut ./executor ./common ./configuration ./occ/peanut ./walnut
TEST_DIRS := ./configuration/cfgbackend ./configuration/componentcfg
GO_TEST_DIRS := ./core/repos ./core/integration/dcs
Expand All @@ -87,7 +86,7 @@ HAS_PROTOC := $(shell command -v $(GOPROTOCPATH) 2> /dev/null)
.EXPORT_ALL_VARIABLES:
CGO_ENABLED = 0

.PHONY: build all install generate test debugtest vet fmt clean cleanall help $(WHAT) tools vendor doc docs
.PHONY: build all install generate test debugtest vet fmt clean cleanall help $(WHAT) tools vendor doc docs fdset

build: $(WHAT)

Expand Down Expand Up @@ -274,6 +273,15 @@ tools/protoc:

docs: docs/coconut docs/grpc docs/swaggo

fdset:
@echo -e "building fdset files \033[1;33m==>\033[0m \033[1;34m./common/protos\033[0m"

@mkdir -p fdset
@cd common/protos && protoc -o events.fdset events.proto && cd ../..
@mv common/protos/events.fdset fdset

@echo -e "to consume with \033[1;33mhttps://github.com/sevagh/pq\033[0m: FDSET_PATH=./fdset pq kafka aliecs.environment --brokers kafka-broker-hostname:9092 --beginning --msgtype events.Event"

docs/coconut:
@echo -e "generating coconut documentation \033[1;33m==>\033[0m \033[1;34m./coconut/doc\033[0m"
@cd coconut/doc && go run . && cd ../..
Expand Down
2 changes: 1 addition & 1 deletion coconut/cmd/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

//go:generate protoc -I ../../core --go_out=.. --go-grpc_out=.. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protos/o2control.proto
//go:generate protoc -I=../../core -I=../../common --go_out=.. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative --go-grpc_out=require_unimplemented_servers=false:.. protos/o2control.proto

package cmd

Expand Down
1 change: 1 addition & 0 deletions coconut/cmd/environment_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func init() {

environmentCreateCmd.Flags().BoolP("auto", "a", false, "create an autorun environment")
environmentCreateCmd.Flags().BoolP("public", "p", true, "control public rights of the environment")
environmentCreateCmd.Flags().BoolP("asynchronous", "y", false, "use asynchronous mode for environment creation")

environmentCreateCmd.Flags().StringP("extra-vars", "e", "", "values passed using key=value CSV or JSON syntax, interpreted as strings `key1=val1,key2=val2` or `{\"key1\": \"value1\", \"key2\": \"value2\"}`")
}
2 changes: 0 additions & 2 deletions coconut/cmd/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

//go:generate protoc -I ../../core --go_out=.. --go-grpc_out=.. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protos/o2control.proto

package cmd

import (
Expand Down
8 changes: 7 additions & 1 deletion coconut/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,14 @@ func CreateEnvironment(cxt context.Context, rpc *coconut.RpcClient, cmd *cobra.C
// TODO: add support for setting visibility here OCTRL-178
// TODO: add support for acquiring bot config here OCTRL-177

asynchronous, _ := cmd.Flags().GetBool("asynchronous")

var response *pb.NewEnvironmentReply
response, err = rpc.NewEnvironment(cxt, &pb.NewEnvironmentRequest{WorkflowTemplate: wfPath, Vars: extraVarsMap, Public: public}, grpc.EmptyCallOption{})
if asynchronous {
response, err = rpc.NewEnvironmentAsync(cxt, &pb.NewEnvironmentRequest{WorkflowTemplate: wfPath, Vars: extraVarsMap, Public: public}, grpc.EmptyCallOption{})
} else {
response, err = rpc.NewEnvironment(cxt, &pb.NewEnvironmentRequest{WorkflowTemplate: wfPath, Vars: extraVarsMap, Public: public}, grpc.EmptyCallOption{})
}
if err != nil {
return
}
Expand Down
Loading
Loading