Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AliECS core Kafka producer #520

Merged
merged 43 commits into from
Mar 22, 2024
Merged

AliECS core Kafka producer #520

merged 43 commits into from
Mar 22, 2024

Conversation

teo
Copy link
Member

@teo teo commented Feb 29, 2024

No description provided.

@teo teo marked this pull request as draft February 29, 2024 14:48
@teo teo requested review from knopers8 and justonedev1 February 29, 2024 14:48
@@ -28,53 +28,22 @@ package o2control;
option java_package = "ch.cern.alice.o2.control.rpcserver";
option go_package = "github.com/AliceO2Group/Control/core/protos;pb";

//////////////// Common event messages ///////////////
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the go files for this proto are stored in coconut/protos and core/protos, why not move the .proto and pb.go files to common/protos?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably do this, yes.

Comment on lines +275 to +285
// We might leave RUNNING not only through STOP_ACTIVITY. In such cases we also need a run stop time.
if e.Src == "RUNNING" {
endTime, ok := env.workflow.GetUserVars().Get("run_end_time_ms")
if ok && endTime == "" {
runEndTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
env.workflow.SetRuntimeVar("run_end_time_ms", runEndTime)
} else {
log.WithField("partition", envId.String()).
Debug("O2 End time already set before leave_RUNNING")
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really a part of adding the core Kafka producer. I would propose to remove it from this PR and discuss it separately. We had discussions with Ruben and Chiara which we did not finalize, while this affects the order of EOR and EOX timestamps. I will raise to Vasco that we should bring that discussion to an end.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, but the events do need to be emitted at some point.

// Before we load the workflow, we get the list of currently active detectors. This query must be performed before
// loading the workflow in order to compare the currently used detectors with the detectors required by the newly
// created environment.
alreadyActiveDetectors := envs.GetActiveDetectors()

the.EventWriterWithTopic(topic.Environment).WriteEvent(&evpb.Ev_EnvironmentEvent{
EnvironmentId: newId.String(),
State: "PENDING",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if this does not mean introducing a new (quasi-)state... At least it could be treated as such by consumers of this topic.

If we do not want to talk about states before the environment exists, perhaps it should be left empty? More of a thought rather than a request.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It kind of is a fake state. It should be fine as far as consumers are concerned, the GUI would likely appreciate it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we can check to be sure. @graduta @martinboulais ECS will send some events before an environment is considered created and has a state. Do you have any preference whether in such case the "state" field should be empty or contain "PENDING"?

core/task/task.go Outdated Show resolved Hide resolved
@teo teo force-pushed the kafka-producer branch 2 times, most recently from 6ea2809 to 4cec802 Compare March 14, 2024 15:01
@teo teo force-pushed the kafka-producer branch from 57080db to 9c62431 Compare March 19, 2024 13:20
@teo teo marked this pull request as ready for review March 21, 2024 16:28
@teo
Copy link
Member Author

teo commented Mar 21, 2024

@justonedev1 @knopers8 this was tested on STAGING, the event logs are attached to OCTRL-765
The patched core is still there if you'd like to take it for a spin. I recommend https://github.com/sevagh/pq to consume the topics, details in the Makefile.

justonedev1
justonedev1 previously approved these changes Mar 21, 2024
Copy link
Collaborator

@justonedev1 justonedev1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see anything wrong with the code itself. However I left some comments regarding code quality and some questions regarding content of this PR, as it seemed, that there as quite a few things bloating this PR while not being necessary and should be done as different PR.

Code quality comments can be taken for the whole code base: there are a lot of messages created in place instead of one parametrized function. This causes really big bloat in function itself masking the logic.

But I don't see the reason why to delay this PR because of these "cosmetic" things.

However, it would be probably best if @knopers8 also takes a look, as he knows much more about functionality and code in general

Message: "transition completed successfully",
Transition: t.eventName(),
})
}
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a comment: I would create some function which creates all these event messages, because right now there are a lot of new lines just declaring new EnvironmentEvents and 'hiding' the actual logic + bloating the length of the function by hundreds of lines of code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean. We do need to define the environment events we're about to send, and while some fields are the same others change from one event to the other. Whether the fields are arguments to a function or fields of a struct doesn't change the fact that we need to provide them. Maybe I'm missing what you're proposing, can you elaborate?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not proposing changes of functionality, I just see that you are creating a lot of pb.Ev_EnvironmentEvent messages in place with a lot of same parameters, the only changes are for parameters Message and Error ... So it would be great to create a function (eg. createEV_Environment) that would do it. and you would call it just like this: createEv_Environment(env, error, message). This would reduce the bloat of the function significantly.

@@ -1357,128 +1377,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
}
return
}
stack["ConfigureLegacy"] = func() (out string) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I guess correctly, that this has nothing to do with kafka producer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is a bit of legacy code we haven't been using for over a year, perhaps years. I remove it because we don't need it any more, and I do it as part of this PR because it saves me the trouble of implementing event production for this function.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I still think that it should have been separate branch and PR before of this PR, but it is just nitpicking

streams: make(map[string]chan *pb.Event),
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood correctly, you are just moving this somewhere else... is it connected to kafka producer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, streamutil.go. It's part of a general cleanup at the very start of this effort.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Collaborator

@knopers8 knopers8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @teo !

I agree with @justonedev1 regarding reducing the boiler-plate where possible, because indeed these calls are quite verbose and make reading the code more inconvenient. I don't have any clear ideas to propose though, so I leave it up to you to consider if something can be done about it.

Then, since this the kafka events might be accessed by multiple parties with different familiarity in the repository structure, it would be extremely useful to have some documentation of what is published, where to find proto files and what are possible values of less obvious fields like Ev_TaskEvent.className, Ev_TaskEvents.path, Ev_CallEvent.path. For plugins, the expected JSON payloads should be documented. I suppose you could modify the existing kafka.md, which will become obsoleted.

I don't have much detailed comments, perhaps I missed something, but the PR is really huge. I believe it could have been done in smaller chunks, but this is only my suggestion for the future.

common/protos/events.proto Outdated Show resolved Hide resolved
common/event/topic/topic.go Outdated Show resolved Hide resolved
@teo
Copy link
Member Author

teo commented Mar 22, 2024

Thanks, @teo !

I agree with @justonedev1 regarding reducing the boiler-plate where possible, because indeed these calls are quite verbose and make reading the code more inconvenient. I don't have any clear ideas to propose though, so I leave it up to you to consider if something can be done about it.

Then, since this the kafka events might be accessed by multiple parties with different familiarity in the repository structure, it would be extremely useful to have some documentation of what is published, where to find proto files and what are possible values of less obvious fields like Ev_TaskEvent.className, Ev_TaskEvents.path, Ev_CallEvent.path. For plugins, the expected JSON payloads should be documented. I suppose you could modify the existing kafka.md, which will become obsoleted.

I don't have much detailed comments, perhaps I missed something, but the PR is really huge. I believe it could have been done in smaller chunks, but this is only my suggestion for the future.

Thank you both for reviewing. First the easy part: I've added some comments to the protofile to clarify these fields, and I've included some documentation in kafka.md as proposed.

As for the verbosity of the event instantiations, I understand they can get repetitive and I see your points. Generally in Go one values simplicity and sometimes verbosity over hiding things within constructors, factories and such, but repetition repetition can get annoying annoying. I'll give this a serious thought for the next iteration, in the meantime I propose we merge as it is.

@teo teo merged commit 8ec6061 into master Mar 22, 2024
2 checks passed
@teo teo deleted the kafka-producer branch March 22, 2024 11:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants