Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large dask graph size/memory usage when opening multiple correctionlib.CorrectionSet #1131

Open
kawaho opened this issue Jul 11, 2024 · 4 comments
Labels
question Further information is requested

Comments

@kawaho
Copy link

kawaho commented Jul 11, 2024

I am trying to use correctionlib within coffea+dask to apply a set of corrections. Specifically, I am applying jet smearing, pileup corrections, muon id/iso/trigger, electron reco/id scale factors (pretty standard workflow). The following code is a minimal example:

import awkward as ak
import dask_awkward as dak

from coffea import processor
from correctionlib import CorrectionSet

class MyProcessor(processor.ProcessorABC):
    def __init__(self, **kwargs):
      pass

    def process(self, events):

      #jet smearing     
      jerc = CorrectionSet.from_file(f'/cvmfs/cms.cern.ch/rsync/cms-nanoAOD/jsonpog-integration/POG/JME/2018_UL/jet_jerc.json.gz')
      jer_smear = CorrectionSet.from_file(f'/cvmfs/cms.cern.ch/rsync/cms-nanoAOD/jsonpog-integration/POG/JME/jer_smear.json.gz')['JERSmear']
      jets = events.Jet
      jets["event_rho"] = events.fixedGridRhoFastjetAll

      jets['jer'] = jerc['Summer19UL18_JRV2_MC_ScaleFactor_AK4PFchs'].evaluate(jets.eta, 'nom')
      jets['jersf'] = jerc['Summer19UL18_JRV2_MC_PtResolution_AK4PFchs'].evaluate(jets.eta, jets.pt, jets.event_rho)

      jersmear_factor = jer_smear.evaluate(jets.pt, jets.eta, jets.matched_gen.pt, jets.event_rho, events.event, jets.jer, jets.jersf)

      events['Jet', 'pt'] = jets.pt*jersmear_factor
      events['Jet', 'mass'] = jets.mass*jersmear_factor

      #PU Weights
      pu_sf = CorrectionSet.from_file("/cvmfs/cms.cern.ch/rsync/cms-nanoAOD/jsonpog-integration/POG/LUM/2018_UL/puWeights.json.gz")
      puWeight = pu_sf[f"Collisions18_UltraLegacy_goldenJSON"].evaluate(events.Pileup.nTrueInt, "nominal")
      weight = events.genWeight*puWeight

      #Mu ID/ISO SF
      m_sf = CorrectionSet.from_file("/cvmfs/cms.cern.ch/rsync/cms-nanoAOD/jsonpog-integration/POG/MUO/2018_UL/muon_Z.json.gz")
      M_collections = ak.pad_none(events.Muon[(events.Muon.pt > 26) & (abs(events.Muon.eta) < 2.4)], 1)[:,0]
      MuID_SF = m_sf["NUM_TightID_DEN_TrackerMuons"].evaluate(abs(M_collections.eta), M_collections.pt, "nominal")
      MuISO_SF = m_sf["NUM_TightRelIso_DEN_TightIDandIPCut"].evaluate(abs(M_collections.eta), M_collections.pt, "nominal")
      weight = weight*MuID_SF*MuISO_SF

      #SingleMu Trigger SF
      Trig_SF = m_sf['NUM_IsoMu24_DEN_CutBasedIdTight_and_PFIsoTight'].evaluate(abs(M_collections.eta), M_collections.pt, "nominal")
      weight = weight*Trig_SF

      #Electron SF
      e_sf = CorrectionSet.from_file("/cvmfs/cms.cern.ch/rsync/cms-nanoAOD/jsonpog-integration/POG/EGM/2018_UL/electron.json.gz")
      E_collections = ak.pad_none(events.Electron[(events.Electron.pt > 20) & (abs(events.Electron.eta) < 2.5)], 1)[:,0]
      EleReco_SF = e_sf["UL-Electron-ID-SF"].evaluate('2018', "sf", "RecoAbove20", E_collections.eta, E_collections.pt)
      EleID_SF = e_sf["UL-Electron-ID-SF"].evaluate('2018',"sf", "wp90iso", E_collections.eta, E_collections.pt)
      weight = weight*EleReco_SF*EleID_SF

      events['Jet', 'weight'] = ak.fill_none(weight, 0)

      quantities = ak.zip(
                             {
                              'jet_pt': ak.flatten(events['Jet']).pt,
                              'weight': ak.flatten(events['Jet']).weight,
                             }
                           )

      dak.to_parquet(
          quantities,
          "test_corr",
          compute=True,
      )

      return

    def postprocess(self, accumulator):
        pass

However, dask gives warning about large graph size

UserWarning: Sending large graph of size 18.62 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.

and more importantly, on the dask dashboard, the workers show unreasonable memory usage (labelled as umanaged (old) memory in dask), for example with step_size=20_000,

Screenshot 2024-07-11 at 4 32 11 PM

Eventually, the jobs would fail because of the high memory usage.

If one chooses to run only the smearing or the scale factors, the problem disappears but the large graph size warning persists when running jet smearing alone.

coffea version is '2024.5.0'
correctionlib version is '2.5.0'

@kawaho kawaho added the question Further information is requested label Jul 11, 2024
@lgray
Copy link
Collaborator

lgray commented Jul 11, 2024

Oh that's not a serious warning you may ignore it. I've not seen any serious slowdown with graphs that are 100s of MB in size.

@lgray
Copy link
Collaborator

lgray commented Jul 11, 2024

The large memory usage you're observing is coming from another source.

@kawaho
Copy link
Author

kawaho commented Jul 11, 2024

About the large memory, the number of workers that have particularly high memory usage causing the failure is exactly the number of correctionlib objects I am opening, which is 9 in this case. What I understood is that these workers are the ones dealing with the SFs/smearing stuff. Naively, I would expect if I launched x workers, there will be at least 9x of such tasks but the task number always remain 9 (even with like 100 workers). I wonder if it can be handled better with some tricks because a lot of memory is wasted.

@lgray
Copy link
Collaborator

lgray commented Aug 26, 2024

@kawaho sorry for the long reply time.

This might be something we can fix by inlining the correctionlib objects in the graph so that it appears on each node that need to access correctionlib. We'd have to play with it a bit.

Right now it opens the correctionlib CorrectionSet 9 times for the whole workflow and transports the needed parts to the workers that need a specific correction within the file. I'll see if this can be done a little bit more leanly (at the cost of speed, probably).

I think you can also get some mileage of out not opening the correction sets each time you call process, but rather ones when you make the processor.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants