Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

job-manager: add priority plugin interface #3311

Closed
garlick opened this issue Nov 5, 2020 · 94 comments
Closed

job-manager: add priority plugin interface #3311

garlick opened this issue Nov 5, 2020 · 94 comments

Comments

@garlick
Copy link
Member

garlick commented Nov 5, 2020

A secondary priority value for jobs was described in #3256.

Presumably we would develop a job manager plugin of some sort to generate updates to this secondary priority based on multiple factors, one of which is the per-user fair share value from flux-accounting.

This issue is about specifying a mechanism for flux-accounting to communicate periodic updates of per-user fair share values to flux-core.

@garlick
Copy link
Member Author

garlick commented Nov 6, 2020

Here is one straw man proposal after thinking a bit about our conversation this afternoon.

Have the job manager keep a "active users" hash, with use counts per active job, and a key-value hash per user. RPCs could be implemented to:

  • list the current active users
  • set key=value for a given user
  • perform a bulk update of key=value for multiple users

The schema of keys could be unknown to the job manager, but a priority plugin, such as one that is fair share aware, could read specific per-user keys and involve them in calculation of a priority update.

When the job manager unloads, it could dump this user hash to the KVS, and reload it when it starts up again.

When flux accounting wants to recalculate its fair share values (say every hour via a cron job) it could

  • ask job-manager for current active user list
  • perform bulk update of fairshare=value for those users (future optimization: only those who have changed)

After the update, the plugin could run to recalculate the secondary priority for jobs in the queue.

@garlick garlick changed the title job-manager: add mechanism for update fair share values job-manager: add mechanism for updating fair share values Nov 6, 2020
@garlick
Copy link
Member Author

garlick commented Nov 6, 2020

That was kind of an overly detailed, job-manager centric thought bubble. Sorry about that.

If I could boil it down, I think my main thought above was that the job manager could implement a generic mechanism, such as a key/value store, for caching user data needed by a priority plugin, and that an external program (e.g. run by cron) could push that data into the job manager, limiting its updates to only the current active users. So the external program and an associated priority plugin would be aware of a particular "schema" for the data and control its update frequency. The job manager would be unaware of that schema. It would simply trigger the configured plugin to run when any of its inputs might have changed (possibly handing it a list of users and/or jobs). The plugin's updates to the secondary priority would in turn possibly trigger a move of a job within the queue.

I might still be too bogged down in the details (sorry!)

@grondo
Copy link
Contributor

grondo commented Nov 6, 2020

An alternative would be to cache data within the plugin itself. job-manager plugins could have a init callback which would (similar to shell plugins) allow them to register a service endpoint under the job-manager, only if necessary. Then the external part of the fairshare priority mechanism would send updates directly to this handler. This would allow job-manager plugins to keep state in a manner of their choosing. Other priority plugin types could setup other mechanisms for updating priority, e.g. set a timer_watcher, etc. Plugins could register a blob for the job-manager to store in the KVS when it is unloaded, or they could be required to handle this themselves.

The benefit here is less touching of the job-manager code, and better abstraction for priority plugins. The plugin architecture could even allow multiple plugins to be loaded at once for generically extending job-manager functionality. (i.e. priority plugin is just one type of job-manager plugin)

This approach may also allow more rapid development of the initial multifactor priority plugin since we do not first need to develop a generalized data caching mechanism for the job-manager proper.

Edit: I just mean to throw this idea out there as another idea. Apologies if I'm also getting us bogged down in the details.

@garlick
Copy link
Member Author

garlick commented Nov 6, 2020

That makes sense to me.

A requirements question is: do jobs need to have accurate secondary priorities assigned before they become eligible for scheduling?

@grondo
Copy link
Contributor

grondo commented Nov 6, 2020

Good question for @dongahn, but I would assume so. Then perhaps callback for priority plugin should be made before jobs are inserted into the queue to calculate initial secondary priority using current info.

@dongahn
Copy link
Member

dongahn commented Nov 7, 2020

I think so. If the instance is configured to use the secondary priorities, a job must be assigned to a value before being scheduled.

@dongahn
Copy link
Member

dongahn commented Nov 7, 2020

perform bulk update of fairshare=value for those users (future optimization: only those who have changed)

@grondo, @SteVwonder, @cmoussa1 and @milroy discussed this a bit at today's coffee hour.

Currently the fair share value is essentially equal to the global rank of users. So it is likely the fairshares of a majority of users will change for common cases at every invocation of fshare calculation. E.g., if the previously highest-priority job becomes the lowest priority for the current round, this single change will change the rank of every user.

Once we make reasonable progress with unoptimized update protocol, there are some techniques we can look into. I briefly mentioned this yesterday but perhaps the concept of the edit distance can be expanded. Introducing floating point schemes such a way that we minimize the changes to the previously calculated fairshare values can also be fruitful. As is, We're a bit tied to the normalized fair share values between [1.0, 0), which won't work well to support such an augmented scheme.

In flux-accounting, I will open up a ticket to augment libweighted_tree to incur minimal fairshare value changes.

BTW, @SteVwonder correctly noticed that solving this may also allow us to incur minimal changes on subsequent job-manager to fluxion-qmanager updates.

@dongahn
Copy link
Member

dongahn commented Nov 7, 2020

@garlick and @grondo: For unoptimized update protocol, once the weighted tree walks the tree, it will have a sorted vector of users:

For example, in this simple test code

walk: https://github.com/flux-framework/flux-accounting/blob/master/src/fairness/weighted_tree/test/weighted_tree_test01.cpp#L78

iteration of ordered user vector: https://github.com/flux-framework/flux-accounting/blob/master/src/fairness/weighted_tree/test/weighted_tree_test01.cpp#L84

Perhaps we can agree on the proper format of this order user vector (that goes into flux-core) so that a program can be developed at flux-accounting in the near future? For example, essentially the payload schema for the upcoming RPC or similar?

I guess we may need to decide whether plugin will do this or job-manager will do this to decide on the schema... Or would it matter when the schema is likely a key-value set?

@grondo
Copy link
Contributor

grondo commented Nov 7, 2020

I guess we may need to decide whether plugin will do this or job-manager will do this to decide on the schema... Or would it matter when the schema is likely a key-value set?

Yes, I think this is the first design decision we should make. My opinion would be that we design an interface for a generic plugin architecture for the job-manager (perhaps priority-specific at this point) with an interface for plugins to register callbacks in the job-managers flux_t handle. This would allow the plugin to independently be developed in parallel (and wholly in flux-accounting) with the job-manager plugin interface in flux-core.

In any event, it would seem you could make considerable progress on priority plugin building blocks which could be tested using unit tests. Maybe seeing this development would inform a better design of the priority plugin interface in the job-manager. I still feel a bit in the dark on how the whole process fits together (e.g. job completion data from job-archive goes into accounting database, something calculates global fairshare vector, somehow the output of this vector creates a factor that is used in a multi-factor calculation to augment existing job priority values). It would allow flux-accounting to make good progress if you had some way to drive this process with mock data in flux-account unit and/or system tests.

@dongahn
Copy link
Member

dongahn commented Nov 7, 2020

In any event, it would seem you could make considerable progress on priority plugin building blocks which could be tested using unit tests.

flux-framework/flux-accounting#65 is the big part of this building blocks including unit tests. When you have a chance, please take a look at the unit tests (flux-framework/flux-accounting@746a6ce) to let us know if it is along the line of what you are thinking about.

How about for me or @cmoussa1 to take the next step to create a mock program that uses this library and outputs a JSON with order vector of users? The program can either pass the output as an output file or similar (how do you do this with python job validator?) or sends the vector through an yet-to-be developed RPC -- in this case it would use flux_open

I still feel a bit in the dark on how the whole process fits together (e.g. job completion data from job-archive goes into accounting database,

This is the block that @cmoussa1 is developing under the guidance of @chu11. He should chime in with where he is.

something calculates global fairshare vector, somehow the output of this vector creates a factor

This is the block (libweighted_tree) that was developed as above.

that is used in a multi-factor calculation to augment existing job priority values).

This is the block that should be (co-)architected into flux-core, I think.

@grondo
Copy link
Contributor

grondo commented Nov 7, 2020

This sounds good to me @dongahn! I will take a look at flux-framework/flux-accounting#65. Thanks!

@dongahn
Copy link
Member

dongahn commented Nov 9, 2020

flux-framework/flux-accounting#65 is the big part of this building blocks including unit tests. When you have a chance, please take a look at the unit tests (flux-framework/flux-accounting@746a6ce) to let us know if it is along the line of what you are thinking about.

How about for me or @cmoussa1 to take the next step to create a mock program that uses this library and outputs a JSON with order vector of users? The program can either pass the output as an output file or similar (how do you do this with python job validator?) or sends the vector through an yet-to-be developed RPC -- in this case it would use flux_open

@cmoussa1: I can make today's 2PM coffee hour for the first 30 mins. Perhaps we should discuss the initial step there? Now that all of the PRs have been landed for flux-accounting, it would be good to generate what need to be done for the next release cycle.

@cmoussa1
Copy link
Member

cmoussa1 commented Nov 9, 2020

I still feel a bit in the dark on how the whole process fits together (e.g. job completion data from job-archive goes into accounting database,

This is the block that @cmoussa1 is developing under the guidance of @chu11. He should chime in with where he is.

Before we shifted to working on first implementations for the weighted tree library, I was working on implementing a job usage calculation that utilizes @chu11's job-archive module (and that eventually makes its way into the flux-accounting database). I think I've made some good progress there, but I am sure there will be good feedback and suggestions to further optimize it. Maybe this week during the flux-accounting meeting I can talk about where I am at and see if this is indeed the right course of action.

@cmoussa1: I can make today's 2PM coffee hour for the first 30 mins. Perhaps we should discuss the initial step there? Now that all of the PRs have been landed for flux-accounting, it would be good to generate what need to be done for the next release cycle.

@dongahn - that sounds like a good plan. Talk to you then.

@garlick garlick changed the title job-manager: add mechanism for updating fair share values job-manager: add priority plugin interface Nov 9, 2020
@garlick
Copy link
Member Author

garlick commented Nov 9, 2020

Let me try to summarize the architecture we discussed in today's 2pm coffee call, or how I am picturing it. Please comment!

The job manager would offer a "priority plugin API". This would be a published API, similar to the job shell plugin API, that would enable out of tree projects to provide job manager plugins that (a) can access various job attributes, and (b) can use that info, and other info perhaps, to set/update, the secondary priority of individual jobs.

The secondary priority would, in turn, be used by the job manager and schedulers to order the queue of alloc requests. Specifically, the order would be determined by (in descending precedence): 1) administrative priority, 2) secondary priority, 3) submit time. Aside: actually let's revisit this. Perhaps there should only be one priority that is calculated by the plugin and takes 1) and 3) as input.

The API will need to include a mechanism for the job manager to ask the plugin for an initial priority priority value for a new job. I think this "request" from the job manager to the plugin would need to occur and be satisfied on transition of a job to SCHED state, since the priority is required to be set before an alloc request is sent to the scheduler.

There would be another API interface that would allow the plugin to asynchronously update the secondary priority of a pending job, much like flux job priority allows the administrative priority to be updated.

Other plugin API interfaces that I think we can project will be necessary:

  • get job attribute: e.g. submit time, admin priority, job owner, current secondary priority, jobspec (resources section)
  • notification of jobs that are no longer pending
  • a mechanism for registering RPC sevices like the job shell, e.g. job-manager.priority-<method>.

We discussed that a priority plugin that uses the above interface to implement a "fair share" specific multi-factor priority calculation could be part of the flux-accounting project. Its priority calculation would take as inputs some of the above data from the job manager plugin API, and some data from flux-accounting, especially the user's fair share factor. This info would be pre-loaded by the plugin on initialization. Furthermore, since the fair share factors evolve over time, the plugin would need to set up a service method to accept updates periodically.

It would be useful to further flesh out the data that a priority plugin might require. We have a start on the factors needed for a fair share based multi-factor calculation in flux-framework/flux-accounting#8 although it is pretty slurm specific and needs to be fluxified.

@dongahn
Copy link
Member

dongahn commented Nov 10, 2020

Thanks @garlick. This sounds like a great start.

The API will need to include a mechanism for the job manager to ask the plugin for an initial priority priority value for a new job. I think this "request" from the job manager to the plugin would need to occur and be satisfied on transition of a job to SCHED state, since the priority is required to be set before an alloc request is sent to the scheduler.

Perhaps we will need support for no plug-in or dummy plug-in support. For nested instances where plug-ins are not provided, job manager still wants to transition a job to SCHED with no or equal secondary priority.

get job attribute: e.g. submit time, admin priority, job owner, current secondary priority, jobspec (resources section)

It seems we need some thoughts about those job attributes that job-manager may not know immediately. Currently we set things like queue in the attr section of the job spec. Maybe the job spec can be passed to the multi factor priorities to help it harvest those scheduler specific attributes as well.

There would be another API interface that would allow the plugin to asynchronously update the secondary priority of a pending job, much like flux job priority allows the administrative priority to be updated.

Does it mean the multi-factor priority plugin needs to keep track of all of pending jobs? I initially thought that we can avoid this, by designing this asyn interface to operate at the user account level. But then job-manager probably doesn't even want to have the concept of bank accounts/users, so perhaps replicating job list within this plugin is unavoidable...

@garlick
Copy link
Member Author

garlick commented Nov 10, 2020

Good comments!

For nested instances where plug-ins are not provided, job manager still wants to transition a job to SCHED with no or equal secondary priority.

Agreed. I was assuming that the plugin would not be loaded by default and things would work as they do now. The system instance could be explicitly configured to load the fair share plugin. Alternatively (see aside above), we could load a default plugin that calculates a single priority value, taking as input the administrative priority and submit time.

It seems we need some thoughts about those job attributes that job-manager may not know immediately. Currently we set things like queue in the attr section of the job spec. Maybe the job spec can be passed to the multi factor priorities to help it harvest those scheduler specific attributes as well.

Passing the jobspec through may be a good option. I was thinking we would just pass the resources section to avoid the long environment but attributes surely would be needed too. Note that the job manager currently doesn't have the jobpsec in hand for each job so that may need some work.

Does it mean the multi-factor priority plugin needs to keep track of all of pending jobs?

Hmm, maybe, or maybe we could implement a query at the plugin API level for listing all jobs by owner?

@dongahn
Copy link
Member

dongahn commented Nov 10, 2020

Hmm, maybe, or maybe we could implement a query at the plugin API level for listing all jobs by owner?

I like this idea. But in this case, we may need some good performance requirement for the query to return the list jobs own by the user. BTW, is the assumption here the user account seen by flux-accounting will be the same user ID maintained by flux-core?

@garlick
Copy link
Member Author

garlick commented Nov 10, 2020

we may need some good performance requirement for the query to return the list jobs own by the user.

Excellent point. If this plugin runs in the job manager's thread (what I was proposing) then it should not hold onto it for long or it will negatively impact job throughput. It will get control through its update RPC, when the job manager calls in to get priority for a new job, or when any of its own reactor watchers run. Any API call we offer should be designed to be fast even when the number of pending jobs is large.

When the plugin has a significant batch of work such as processing a new vector of fair share values for multiple users, it may need to interleave its work with letting the job manager run, for example using the prep/check/idle reactor idiom.

BTW, is the assumption here the user account seen by flux-accounting will be the same user ID maintained by flux-core?

Yes I think that is OK for now. At some point when the flux-accounting db is multi-cluster, and multiple sites are using it, we may need to map to an organization's unique ID in some way, but I don't think we need to worry about that right now.

@dongahn
Copy link
Member

dongahn commented Nov 10, 2020

a mechanism for registering RPC sevices like the job shell, e.g. job-manager.priority-.

Is this so that the multi-factor priority (MFP) plugin client can do an RPC to the MFP plugin server or something else?

@dongahn
Copy link
Member

dongahn commented Nov 10, 2020

It would be useful to further flesh out the data that a priority plugin might require. We have a start on the factors needed for a fair share based multi-factor calculation in flux-framework/flux-accounting#8 although it is pretty slurm specific and needs to be fluxified.

I'd ask @cmoussa1 to propose a minimal set that we should tackle for the first round. As part of that, we should get the terms right to be more appropriate for Flux.

@grondo
Copy link
Contributor

grondo commented Nov 10, 2020

The API will need to include a mechanism for the job manager to ask the plugin for an initial priority priority value for a new job. I think this "request" from the job manager to the plugin would need to occur and be satisfied on transition of a job to SCHED state, since the priority is required to be set before an alloc request is sent to the scheduler.

At first I was wondering if there was really a need for a separate call for the initial priority. If priority is a function of job parameters + some internal plugin state, then a single get-priority call should work the same for the initial priority vs updates.

However, on second thought, it might make things easier for a plugin developer to have a callback as a job-manager job enters each main job state (i.e. "init", "depend", "sched", "run", "cleanup", "inactive"). This would allow a priority plugin to hook in to the correct place to initialize its internal state (e.g. sometimes a priority plugin would want to create internal state for jobs even if they are in DEPEND, or keep that state while they are in CLEANUP). Any plugin callback after SCHED would not be able to update job data such as the priority, but a sophisticated plugin could use this data to update internal state that influences the priority of pending jobs). At all times, the "inactive" callback could be used to delete any internal job state (such as a count of running jobs for a given user, etc)

This kind of implies that the job-manager would drive the recalculation of priorities on a per-job basis, e.g. the job priority plugin interface would have callbacks that pass in a single job. Perhaps when internal state in a priority plugin has been updated, the plugin can set a flag in the job-manager to request a re-prioritization loop, which would be driven at the job-manager's discretion?

Sorry this may have gotten a little off-topic.

@grondo
Copy link
Contributor

grondo commented Nov 10, 2020

a mechanism for registering RPC sevices like the job shell, e.g. job-manager.priority-.

Is this so that the multi-factor priority (MFP) plugin client can do an RPC to the MFP plugin server or something else?

Yes, AIUI, the idea here is to allow the plugin to accept an update of data from an external source. In this case a provided script or cron job could periodically push updates of one or more factors to the plugin. The service registered by the plugin could also respond to a request for the current data values for debugging or informational purposes.

@garlick
Copy link
Member Author

garlick commented Nov 10, 2020

This kind of implies that the job-manager would drive the recalculation of priorities on a per-job basis, e.g. the job priority plugin interface would have callbacks that pass in a single job. Perhaps when internal state in a priority plugin has been updated, the plugin can set a flag in the job-manager to request a re-prioritization loop, which would be driven at the job-manager's discretion?

Ah I was thinking that the plugin would be required to select which jobs need to be updated. If we wanted to just let the plugin trigger a job manager iteration over all pending jobs when it receives updated factors, that would simplify things quite a bit:

  • no fast getjobsbyuser() lookup required
  • responsiveness of the job manager reactor during a bulk update could be managed in the job manager using prep/check/idle or similar, not in the plugin
  • a really dumb plugin could just implement the single getpriority() callback.

I hadn't thought of doing it that way but now that you suggest it, I really like it especially for a first cut since it will significantly simplify the plugin and its API.

One question: Is it reasonable to require that a getpriority() callback must always return a result immediately, or must we design some sort of async interface that allows the plugin to make RPCs? It would be a lot simpler design if we could assume an immediate response, but that would presume that any externally-sourced data is preloaded in the plugin at initialization (for example the fair share factors for all possible users).

@garlick
Copy link
Member Author

garlick commented Nov 10, 2020

Another way to go might be to add a new PRIORITY job state between DEPEND and SCHED. The job would transition from PRIORITY to SCHED once its priority is established. The priority plugin could watch for a job state transition into PRIORITY (or earlier) which would trigger it to fetch a user's data, if not in cache, and call a plugin API function to set the job's priority.

After that we could assume that a user's data is cached and drive priority updates from the job manager, expecting a getpriority() function to return immediately.

I guess the PRIORITY state is not really necessary and we could have another notification in SCHED state that the job needs its priority set, and hold back the alloc request until its done. However, I kind of like the idea of a new state here as if flux-accounting turns into a center resource, it might occasionally be slow or unavailable, and having jobs stuck in a PRIORITY state would make it pretty obvious what is going on. That also might be useful when the priority plugin is provided by a site, and the "why is flux slow" question could be answered if the time spent in the priority state is shown to be long.

@grondo
Copy link
Contributor

grondo commented Nov 10, 2020

I like the idea of the PRIORITY state. However, I think we should discourage a design where a priority plugin fetches something from a remote service or does a blocking operation in the common case to calculate the initial PRIORITY. The plugin should have enough information from the previous out-of-band update of internal state to calculate an initial priority for any new job.

However, I do like that the PRIORITY state allows more flexibility and would allow a plugin to be developed in this way if desired!

I also agree that priority updates should be instant (no RPCs, no blocking) in a well-behaved plugin. RPCs to update internal state should be handled either out-of-band, or I suppose could be scheduled via a timer watcher (with no blocking).

@grondo
Copy link
Contributor

grondo commented Nov 10, 2020

One reason to still allow a priority plugin to have callbacks even in non-PRIORITY states is that this will allow a plugin to update some of its internal state in-band, e.g. total count of queued jobs for a user, perhaps the number of running jobs for a given user, or some implementations may want to count jobs in CLEANUP, some may not, etc. By having a place to hook into the job-manager in all of these cases, we allow the plugin developer maximum flexibility in the implementation.

@garlick
Copy link
Member Author

garlick commented Nov 10, 2020

Oh yeah I meant to state agreement with the idea that it should be possible through the plugin API to register a callback on all job state transitions. In fact maybe this could just be a subscription to the already-defined job state transition event.

However, I do like that the PRIORITY state allows more flexibility and would allow a plugin to be developed in this way if desired!

It would let the fair share priority plugin be implemented different ways:

  • initially load fair share factors for all center users, and call setpriority() for a job immediately upon its entry to PRIORITY state. In the rare case that a user is not known at the time of startup, job could remain in PRIORITY state until the next periodic update of fair share factors brings in the user
  • initially load nothing, and leave jobs in PRIORITY until first periodic update of fair share factors primes the cache
  • initially load nothing, and upon entry into PRIORITY, fetch the user's fair share factor from flux-accounting, calling setpriority() once response is received.

In the latter case, fair share factors could be timed out of the cache if desired.

@grondo
Copy link
Contributor

grondo commented Nov 14, 2020

That sounds reasonable to me.

It would impossible to avoid cycles if any job state needs to be re-obtained after a job-manager restart.
That makes me wonder, would a more generic reload event or similar be more useful than priority-invalidate, or do you think priority is the only job state that would need to be invalidated on reload?

I wonder if in the future we'll want to be able to revert back to DEPEND state for jobs for some reason, e.g. if we allow job updates and a new dependency or minimum job begin-time is applied.

Sorry, I'm probably not helping here.

@garlick
Copy link
Member Author

garlick commented Nov 14, 2020

Oh I view the eventlog replay as a process that restores the current state, not actually re-acquiring those states. For example, during the replay of an inactive job, we don't send an alloc request to the scheduler as it passes through SCHED state.

We actually want the job to transition from SCHED back to PRIORITY on a restart because we want the priority plugin to be contacted in that conext to allow for an asynchronous response. Now maybe that design point could be separately debated. I think the result of the previous discusson on it was that we should allow for the plugin to go get info about a user that it doesn't know about (or at least the possibility of that) before the job becomes schedulable.

Does that make sense and did I understand your point?

I wonder if in the future we'll want to be able to revert back to DEPEND state for jobs for some reason, e.g. if we allow job updates and a new dependency or minimum job begin-time is applied.

Yeah a transition from ALLOC back to DEPEND probably does make sense in that case IMHO. So I guess that's a +1 in favor of dropping the no-cycles rule.

@chu11
Copy link
Member

chu11 commented Nov 14, 2020

On synchronizing job-info, first I don't think the queue priority should be considered valid when the job is not in SCHED state (in a sense it is not in a queue at that point). I do think, for testing if nothing else, users should be able to fetch the priority value, for example via a custom flux jobs format, but it would be appropriate to print - or similar if the job is not in SCHED state.

That sounds good. Its definitely not needed except for testing / curiosity. Perhaps queue priority is available on a "best effort" level for non-pending jobs.

Edit: oh we still have the problem of getting the latest priority if job-info is reloaded, since the journal might have wrapped. Hmm.

Just throwing out an idea. Could the job-manager be queried with a set of jobids and the queue priorities could be returned? Or perhaps it simply informs the job-manager to write those queue priorities to the journal and doesn't even have to respond to the RPC?

@grondo
Copy link
Contributor

grondo commented Nov 14, 2020 via email

@garlick
Copy link
Member Author

garlick commented Nov 14, 2020

by "state" I meant job data acquired by some part of the
system which isn't available in the eventlog, the priority plugin
initialization being one example.

You mean like if we had another state like PRIORITY used to acquire other data with the same semantics? I guess then you could use the same reload event to transition out of either state. That seems unlikely to me but I wouldn't be opposed to renaming priority-invalidate to reload or similar just in case. I guess it would mean that we would want to post a reload event to every active job, not just those in SCHED state. Hmm, actually, that could well be a useful thing to log for running jobs too, for diagnosing future issues. OK, I kind of like that!

@garlick
Copy link
Member Author

garlick commented Nov 14, 2020

Just throwing out an idea. Could the job-manager be queried with a set of jobids and the queue priorities could be returned? Or perhaps it simply informs the job-manager to write those queue priorities to the journal and doesn't even have to respond to the RPC?

Yeah I was mulling around ideas like that, but wanted to be sure we don't a) pollute the journal concept, or b) create more ordering issues that increase complexity for journal consumer.

@chu11
Copy link
Member

chu11 commented Nov 14, 2020

Yeah I was mulling around ideas like that, but wanted to be sure we don't a) pollute the journal concept, or b) create more ordering issues that increase complexity for journal consumer.

Slightly alternate idea, RPC to the job-manager tells it to replay the entire eventlog journal for a specific jobid.

@garlick
Copy link
Member Author

garlick commented Nov 14, 2020

Slightly alternate idea, RPC to the job-manager tells it to replay the entire eventlog journal for a specific jobid.

Except we don't currently keep any of the events around in the job manager...

I was just wondering if we should get rid of the JOURNAL_ONLY event idea and instead have a streaming RPC that tracks requested non-persistent attributes like priority?

You'd know better than I what complications would arise when trying to consume data from both, when they are only loosely synchronized. I guess you'd have to deal with a priority update before you know about the job from the journal. Maybe if those things were kept in a separate hash in job-info that's accessed on demand (rather than having an RPC in the job-manager to provide them on demand?) Could we go one further and put that "attribute hash" or priority hash or whatever in a reusable libjob container?

I think we've gone off topic for the plugin interface, and this issue is getting quite long. I'll open a new issue.

@garlick
Copy link
Member Author

garlick commented Dec 21, 2020

Summary of points discussed by @grondo and @garlick offline:

  • let's focus on minimal priority plugin interface now, while planning for generalized job manager plugins in the future
  • there should be a default priority "builtin plugin"
  • TOML config can select loaded job manager plugins:
    • load a glob of plugin paths: [job-manager] plugins = "glob"
    • per plugin config also: [job-amanger.plugin.priority] accounting-db = "uri"
  • let's reuse <flux/plugin.h> to define plugin interfaces
  • callback: new job received (plugin may set priority asynchronously)
  • callback: update priority (plugin must respond immediately, e.g. this is a function call)
  • plugin may request job manager to update all jobs' priorities; job manager would then iterate over priority update
  • a job manager handle with accessor to module flux_t will be needed
  • a handle representing a job will be needed
  • plugin may store arbitrary data with job manager or job handles

I might have missed a few nuances! Please fill in @grondo

@grondo
Copy link
Contributor

grondo commented Dec 21, 2020

Looks pretty good to me!

  • callback: new job received (plugin may set priority asynchronously)
  • callback: update priority (plugin must respond immediately, e.g. this s a function call)

Will a priority plugin also need a callback when a job is inactive? e.g. if it has some internal state related to jobs it will need to know when it can free that state (assuming the state is not directly associated with the job handle, in which case the state will be auto-destroyed).

Therefore, for future extensibility, I'd propose a set of job. callbacks, starting with job.new and job.inactive here. Maybe it would make more sense to call these job event callbacks, which are issued when the job transitions to the named state. Later, a callback for each job state could be added, along with other events like job.update (but that is for the generic job-manager plugin case)

@dongahn
Copy link
Member

dongahn commented Dec 21, 2020

Here are some of my random notes from today's coffee hour so that @cmoussa1 will have some context for tomorrow:

  • Each job state transition even will be passed to the priority plugin. Expectation is that the plugin will use the information to extract requisite global metadata data (e.g., how many running jobs a user has and use that to enforce running job limit etc)

  • The jobspec for each job will be passed to the plugin as well. If the user uses non-default bank, the plugin can find the bank info the jobspec's attributes section. Different banks will likely produce different priorities. (TODO: Ask Ryan Day to see how important it is to support a single use multiple banks configurations at LLNL.)

  • There needs to be a way for a priority plugin to be able to register an RPC -- this way, the plugin can use it to receive a bulk fairshare update from flux-accounting on each priority recalculation. RPC needs to be highly performing utilizing asynchronous computations as much as possible. You don't want this third-parity RPC to head-of-line-block other critical RPCs of job-manager. We will need a benchmark to measure its performance (scaling the job count to 10K or similar).

  • A job needs to be able to get its priority updated while it is in either PRIORITY or SCHED state.

  • We agreed that bank validation etc should happen by the priority plugin. We need a way for the plugin to raise an exception. Initial we said flux_raise_exception but it seems there are some implication using that. So details are TBD.

@chu11
Copy link
Member

chu11 commented Jan 4, 2021

Began working on a prototype implementation for #3325, mostly to get my head around libflux/plugin.h b/c I didn't know the API too well. Some design thoughts/questions I hit along the way (dunno if this should be spun off into a different issue or discussion, posting here for now).

callback: update priority (plugin must respond immediately, e.g. this is a function call)

I was imagining a plugin callback that was something like:

int update_priority (int urgency, uint64_t t_submit, unsigned int *priority);

or with our job-manager job handle

int update_priority (int urgency, uint64_t t_submit, job_manager_job_t *job);

But with the plugin handler call:

int flux_plugin_call (flux_plugin_t *p, const char *name,                                                                                   
                      flux_plugin_arg_t *args);                                                                                             

I don't think you can pass a pointer. There are non-optimal ways to do it:

  • via the void *arg you set when the handler is created you could set the "job-manager handle", the job could be looked up in the job-manager context and set there. But having the user lookup the job seems like more work than we should expect them to do? Especially for the default priority plugin, which is just going to return the same urgency it was given.
  • via an aux set/get, which isn't the most obvious/expected way to do it
  • hackish way, pass pointer via {s:I} in the args, I don't think that's a good idea

Almost feel like flux_plugin_f should be:

typedef int (*flux_plugin_f) (flux_plugin_t *p,
                              const char *topic,
                              flux_plugin_arg_t *args,
                              void *handler_data,
                              void *call_data);

handler_data being the "global" arg set at handler creation time and call_data set on each call.

callback: new job received (plugin may set priority asynchronously)

Using the current plugin API and the flux_plugin_call(), wasn't obvious how to do this. I assumed a prep-check reactor in the plugin to occasionally process new jobs and then call some job_manager_set_job_priority (flux_jobid_t id, unsigned int priority)?

Perhaps would be easier to setup a service that the job-manager would send RPCs too?

Both not the easiest API for a plugin writer. But we can add a convenience library similar to libschedutil that would hide gory details.

Extra Note:

during my prototyping, I did learn that it's scary for the plugin to do searches for job in the job hash. B/c if you have something like this:

job = zhashx_first()
while (job) {
   call update job;
   job = zhashx_next();
}

and "update job" does a zhashx_lookup(), that messes up the current pointer in the iteration. I'm not sure of how to design this safely going for the time being, but something to keep in mind.

Extra Extra Note:

b/c the job-manager ctx->active_jobs is created via job_hash_create() with a bunch of special duplicators and what not set, zhashx_dup() is not safe to use with ctx->active_jobs. Learned this the hard way as I tried to hack around the above issue while prototyping.

See: zeromq/czmq#2144

@garlick
Copy link
Member Author

garlick commented Jan 4, 2021

Uh oh, I think @grondo already has a prototype working for this. I'll let him respond to your comments on the plugin.h interface. I'm pretty sure we want the job manager to do all lookups/iterations on the job hash on behalf of the plugin (or unbeknownst to the plugin).

@grondo
Copy link
Contributor

grondo commented Jan 4, 2021

Yeah, sorry @chu11, before the break I spent a little time on a job manager plugin API which would hopefully be sufficient to develop a priority plugin. The intent was to just come up with the plugin-side of the API (e.g. equivalent to flux/shell.h), but that ended up being difficult to do without also making a prototype - though the prototype ended up being quite simple.

First some general comments on the plugin.h API:

Almost feel like flux_plugin_f should be:

typedef int (*flux_plugin_f) (flux_plugin_t *p,
                              const char *topic,
                              flux_plugin_arg_t *args,
                              void *handler_data,
                              void *call_data);

handler_data being the "global" arg set at handler creation time and call_data set on each call.

void * args to plugin callbacks were purposely left out of the API. When raw pointers to internal data are passed as plugin args, it becomes too tempting to pass pointers to structs which might change between different versions of Flux, thus meaning that a plugin will only work with the version of flux-core it was compiled against. We want to avoid that when possible.

There's a way around this though, and that is to share opaque "handles" with the plugin via the flux_plugin_t *p aux container. This is what is done in the shell plugin API:

/*  Get flux_shell_t object from flux plugin handle
 */
flux_shell_t * flux_plugin_get_shell (flux_plugin_t *p);

/*  Return the current task for task_init, task_exec, and task_exit callbacks:
 *
 *  Returns NULL in any other context.
 */
flux_shell_task_t * flux_shell_current_task (flux_shell_t *shell);

This is safer and more efficient since you aren't passing the flux_shell_t * and flux_shell_task_t * as void *arg, and they are only fetched as needed.

callback: new job received (plugin may set priority asynchronously)

Using the current plugin API and the flux_plugin_call(), wasn't obvious how to do this. I assumed a prep-check reactor in the plugin to occasionally process new jobs and then call some job_manager_set_job_priority (flux_jobid_t id, unsigned int priority)?

In the prototype, I simply added a call to the plugin to each new state transition, before event_job_action(). A priority plugin could then subscribe for a state.priority callback, which would indicate a job is entering the PRIORITY state. The plugin would then either set a priority immediately by pushing a priority on the FLUX_PLUGIN_ARG_OUT. If it doesn't set a priority immediately, the job would stay in PRIORITY state until the plugin asynchronously sets a new priority. How this is done exactly is TDB, but I think @garlick agreed with your idea that a new RPC for setting job priority would work (it would be job-manager sending an RPC to itself), or we could add a new function to the plugin API like

int flux_plugin_set_job_priority (flux_plugin_t *p, flux_jobid_t id, unsigned int priority);

Which would trigger the job manager to move this job out of PRIORITY state on the next check/prep watcher or something.

The other open question is how to allow the plugin to request a reprioritization of all jobs. I think we had said the plugin would request reprioritization and then the job manager would call a priority.set or similar callback for each job in its own good time.

The current prototype is here (without any support for asynchronous priority setting in PRIORITY state, nor a way to request a reprioritization of all jobs)

grondo@b57858f

The prototype calls job-manager plugins "jobtap" plugins (because flux_job_manager_plugin.h was too long, and also because they "tap" into job state transitions)

Instead of adding a flux_jobtap_job_t * handle with getters for each bit of job information, the jobtap interface passes all the job data as args in the flux_plugin_arg_t, e.g.:

    flux_plugin_arg_t *args = flux_plugin_arg_create ();
    flux_plugin_arg_pack (args,
                          FLUX_PLUGIN_ARG_IN,
                          "{s:O s:I s:i s:i s:i s:I s:f}",
                          "jobspec", job->jobspec_redacted,
                          "id", job->id,
                          "userid", job->userid,
                          "urgency", job->urgency,
                          "state", job->state,
                          "priority", job->priority,
                          "t_submit", job->t_submit);

The prototype includes 3 "builtin" plugins, e.g. the following is the builtin.default plugin:

/*  The current implementation of priority.set just copies
 *   the urgency to the priority.
 */
static int priority_cb (flux_plugin_t *p,
                        const char *topic,
                        flux_plugin_arg_t *args,
                        void *data)
{
    int urgency = 16;
    flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN,
                            "{s:i}",
                            "urgency", &urgency);
    flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT,
                          "{s:i}",
                          "priority", urgency);
    return 0;
}

int default_priority_plugin_init (flux_plugin_t *p)
{
    if (flux_plugin_set_name (p, "priority") < 0
        || flux_plugin_add_handler (p,
                                    "state.priority",
                                    priority_cb,
                                    NULL) < 0
        || flux_plugin_add_handler (p,
                                    "priority.set",
                                    priority_cb,
                                    NULL) < 0) {
        return -1;
    }
    return 0;
}

and a hold plugin, which sets prioirty=0 on all incoming jobs:

/* Always set priority=0 so all jobs are submitted in held state
 */
static int hold_cb (flux_plugin_t *p,
                        const char *topic,
                        flux_plugin_arg_t *args,
                        void *arg)
{
    flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT, "{s:i}", "priority", 0);
    return 0;
}


int hold_priority_plugin_init (flux_plugin_t *p)
{
    return flux_plugin_add_handler (p, "state.priority", hold_cb, NULL);
}

Finally, here's a plugin that subscribes to all jobtap callbacks and dumps all available info for each job:

static int cb (flux_plugin_t *p,
               const char *topic,
               flux_plugin_arg_t *args,
               void *arg)
{
    json_t *resources;
    flux_jobid_t id;
    uint32_t userid;
    int urgency;
    unsigned int priority;
    flux_job_state_t state;
    double t_submit;
    char *s;

    flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN,
                            "{s:{s:o} s:I s:i s:i s:i s:i s:f}",
                            "jobspec", "resources", &resources,
                            "id", &id,
                            "userid", &userid,
                            "urgency", &urgency,
                            "priority", &priority,
                            "state", &state,
                            "t_submit", &t_submit);

    s = json_dumps (resources, JSON_COMPACT);
    fprintf (stderr, "%s: id=%ju uid=%u urg=%d pri=%u state=%d t_submit=%.5f\n",
             topic, id, userid, urgency, priority, state, t_submit);
    fprintf (stderr, "resources: %s\n", s);
    free (s);
    return 0;
}

int flux_plugin_init (flux_plugin_t *p)
{
    return flux_plugin_add_handler (p, "*", cb, NULL);
}

For testing purposes, the branch allows plugins to be loaded via a new job-manager.jobtap RPC. The builtin "demo" plugins can be loaded as builtin.default, builtin.age, or builtin.hold, or a path to a DSO can be specified. Use the following python script:

import flux
import sys
print(flux.Flux().rpc("job-manager.jobtap", {"load": sys.argv[1]}).get())

The special none argument will unload the current jobtap plugin and disable jobtap (again for testing purposes).

@chu11
Copy link
Member

chu11 commented Jan 4, 2021

it becomes too tempting to pass pointers to structs which might change between different versions of Flux, thus meaning that a plugin will only work with the version of flux-core it was compiled against. We want to avoid that when possible.

Yup I agree. I had read "plugin may store arbitrary data with job manager or job handles" in the design notes above as meaning having access to struct job * and such. But it meaning access in an opaque getter/setter manner makes more sense.

The plugin would then either set a priority immediately by pushing a priority on the FLUX_PLUGIN_ARG_OUT.

Ahhh, this was the big subtlety I missed in the plugin API. That the args parameter could be in & out. That's the way a value could be passed back to the caller.

If it doesn't set a priority immediately, the job would stay in PRIORITY state until the plugin asynchronously sets a new priority. How this is done exactly is TDB, but I think @garlick agreed with your idea that a new RPC for setting job priority would work (it would be job-manager sending an RPC to itself), or we could add a new function to the plugin API like

int flux_plugin_set_job_priority (flux_plugin_t *p, flux_jobid_t id, unsigned int priority);

Which would trigger the job manager to move this job out of PRIORITY state on the next check/prep watcher or something.

It seems you reached the same TBD point that I did. I like the idea of the flux_plugin_set_job_priority() like function, but I kept on coming back to trying to make sure the plugin writer would never have to setup their own prep/check to do asynchronous priority calculations. But perhaps it is inevitable that a plugin writer would have to on occasions.

@grondo
Copy link
Contributor

grondo commented Jan 4, 2021

but I kept on coming back to trying to make sure the plugin writer would never have to setup their own prep/check to do asynchronous priority calculations. But perhaps it is inevitable that a plugin writer would have to on occasions.

Yeah, IIUC in most cases a priority plugin would be able to return a priority immediately on entry to the PRIORITY state, so asynchronous exit from this state would be the uncommon case.

The use case I believe we were considering is that a fair share plugin might have to contact a database or other service when, for example, it sees the first job for a given user. In this case, the plugin might send an RPC to a remote service to fetch data for that user, and then would update the job priority in the continuation callback of that RPC, when it has the necessary information.

I don't see a need for the plugin itself to register prep/check or idle watchers in this case. The job manager itself should be responsible for any work that should be done in this manner (if necessary).

@chu11
Copy link
Member

chu11 commented Jan 4, 2021

The use case I believe we were considering is that a fair share plugin might have to contact a database or other service when, for example, it sees the first job for a given user. In this case, the plugin might send an RPC to a remote service to fetch data for that user, and then would update the job priority in the continuation callback of that RPC, when it has the necessary information.

I was also thinking of a case where the plugin wanted to "batch" calculate a bunch of priorities. Perhaps the database needs to do some big calculation or something so we're going to wait awhile. Perhaps this is not a common case and we don't have to think about it too much.

@garlick
Copy link
Member Author

garlick commented Jan 4, 2021

As I recall the idea we talked about was to have some way for the plugin to notify the job manager that it wants to iterate over all jobs (or perhaps all jobs in a particular set of states), calling the plugin callback for each job. That would let the job manager do the prep/check/idle, if necessary to avoid becoming unresponsive if, say, it needs to iterate over a million jobs.

@chu11
Copy link
Member

chu11 commented Jan 4, 2021

As I recall the idea we talked about was to have some way for the plugin to notify the job manager that it wants to iterate over all jobs (or perhaps all jobs in a particular set of states), calling the plugin callback for each job. That would let the job manager do the prep/check/idle, if necessary to avoid becoming unresponsive if, say, it needs to iterate over a million jobs.

I was thinking more about the case a bajillion new jobs were submitted in short order. So the priority plugin got a bunch of job.new notifications in short order.

@garlick
Copy link
Member Author

garlick commented Jan 4, 2021

Oh I see. Hmm, well I guess that sort of thing would be possible since the plugin has access to the flux_t handle and reactor, but the idea would be to minimize the need to place less burden on the plugin writer.

@chu11
Copy link
Member

chu11 commented Jan 4, 2021

Oh I see. Hmm, well I guess that sort of thing would be possible since the plugin has access to the flux_t handle and reactor, but the idea would be to minimize the need to place less burden on the plugin writer.

Its certainly not a common case and probably one we don't need to fret about too much right now. I'm sure my thoughts were just wandering over many ideas over the long weekend :-)

@grondo
Copy link
Contributor

grondo commented Jan 4, 2021

I was thinking more about the case a bajillion new jobs were submitted in short order. So the priority plugin got a bunch of job.new notifications in short order.

The batching could be done in a plugin by having internal state that keeps a list of jobs which need their priority updated if an asynchronous update is in progress.

For example, the first job state.priority callback triggers an RPC to a database service and puts the current job as the first job on a list of pending updates. While the RPC is in progress, new jobs are simply appended to the internal list. Once the RPC continuation callback runs, the priority plugin could iterate over all jobs recalculating their priorities and calling e.g. flux_plugin_set_job_priority(p, jobid, priority) for each job.

Otherwise, if no RPC is in progress and the priority can be calculated immediately (i.e. all factors are currently available), then the plugin would just calculate and immediately return the priority.

Note that a plugin could also schedule a periodic database query by using a timer or periodic watcher on the flux handle. That RPC could be treated the same as above, or once it completes, the plugin could request a priority recalc for all jobs.

@garlick
Copy link
Member Author

garlick commented Jan 4, 2021

Oh duh, that could be very common. Good discussion!

@grondo
Copy link
Contributor

grondo commented Jan 25, 2021

Should have been closed by #3464, though there may be some unimplemented thoughts from this issue we want to push off to new issues. (Feel free to reopen if desired)

@grondo grondo closed this as completed Jan 25, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants