Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add deployment config #77

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open

feat: Add deployment config #77

wants to merge 9 commits into from

Conversation

ayirr7
Copy link
Member

@ayirr7 ayirr7 commented Mar 19, 2025

Adds deployment configuration files which operate on a per-pipeline basis. Each file has an env section for general configuration (e.g. topics), a pipeline section for the local / dev configs, and allows for additional runtime-specific sections. For example, there is a flink section.

Adds a flag to the runner (-c) to denote container mode. This is useful in toggling between the local/dev configuration and container/non-local configuration. For the FlinkAdapter, this will mean that we can run the same example locally and in container mode down the road. Open to other suggestions for how to do this in the config formatting, etc.

myreduce:
parallelism: 3

flink:
Copy link
Member Author

@ayirr7 ayirr7 Mar 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like us to move towards runtime-specific overrides (right now it's a full copy of the general pipeline config). I wanted to use Pydantic for some utils that make operations like deep updates of dictionaries easier, but I can't manage to install it.

@ayirr7 ayirr7 changed the title wip: deployment config feat: Add deployment config Mar 20, 2025
@ayirr7
Copy link
Member Author

ayirr7 commented Mar 20, 2025

Haven't split the PR yet because I think seeing whole diff is useful

@ayirr7 ayirr7 marked this pull request as ready for review March 20, 2025 06:44
Copy link
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of high level suggestions:

  • Define a schema for the config so we can validate it upfront rather than fail when a missing field is loaded. You can use jsonschema (yes. it is yaml but json is a subset so you can use jsonschema to validate).
  • Let's have a config class that provides some solid abstracitons to the specific conmponents (i's recommend TypedDict that are lightweight but still type checkable).

@@ -10,7 +10,7 @@ x-flink-config: &flink-config |
jobmanager.memory.process.size: 1024m
jobmanager.rpc.address: jobmanager
taskmanager.memory.process.size: 1024m
taskmanager.numberOfTaskSlots: 2
taskmanager.numberOfTaskSlots: 10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this so I can test out parallelism of steps with Flink

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are missing the concept of pipeline segments to define where we break chains.
Plus I am not sure the adapter specific config should be allowed to override any parameter. For example the flink version should not be able to override the kafka broker config. This makes me think that should be a separation between adapter specific config vs generic config.

What about something like this:

  graph TD;
     subgraph pipeline
       subgraph segment
          subgraph step
              common_config
              subgraph adapter_config
                  adapter1
                  adapter2
              end
           end
       end
    end
Loading

Where a pipeline is composed of semgnets (or chains) chains are wired up together in the same worker.
In each chain we provide config for each step (that needs config)
each step config can have a common config and a list of config elements per adapter.

There should be a way to provide some common environment like you did to map streams to topics.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be the case that every Step will have access to the chain or segment it belongs to?

flink:
parallelism: 2
sources_config:
myinput:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #79 I abstracted away the type of source/sink. So we should pick the right source type in configuration. So you should have a representation of KafkaSource (with kafka paramters) and opening up the possibility to have other types.


reduce_config:
myreduce:
parallelism: 3
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't parallelism a property of a segment that contains a sequence of steps.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants