Skip to content

Commit

Permalink
CMS example with ServiceX 3 client (#225)
Browse files Browse the repository at this point in the history
* update CMS ttbar example to use ServiceX v3.0 API
  • Loading branch information
ponyisi authored Oct 29, 2024
1 parent 5c0e354 commit 62f51e7
Show file tree
Hide file tree
Showing 9 changed files with 502 additions and 463 deletions.
247 changes: 99 additions & 148 deletions analyses/atlas-open-data-hzz/HZZ_analysis_pipeline.ipynb

Large diffs are not rendered by default.

61 changes: 39 additions & 22 deletions analyses/atlas-open-data-hzz/HZZ_analysis_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,19 @@

import awkward as ak
import cabinetry
from func_adl import ObjectStream
from func_adl_servicex import ServiceXSourceUpROOT
import hist
import mplhep
import numpy as np
import pyhf
import uproot
from servicex import ServiceXDataset

from coffea import processor
from coffea.nanoevents.schemas.base import BaseSchema
import utils
from utils import infofile # contains cross-section information

import servicex

import vector
vector.register_awkward()

Expand All @@ -54,6 +53,9 @@
# ServiceX behavior: ignore cache with repeated queries
IGNORE_CACHE = False

# ServiceX behavior: choose query language
USE_SERVICEX_UPROOT_RAW = True

# %% [markdown]
# ## Introduction
#
Expand Down Expand Up @@ -147,12 +149,14 @@
# <span style="color:darkgreen">**Systematic uncertainty added:**</span> scale factor variation, applied already at event selection stage. Imagine that this could be a calculation that requires a lot of different variables which are no longer needed downstream afterwards, so it makes sense to do it here.

# %%
def get_lepton_query(source: ObjectStream) -> ObjectStream:
"""Performs event selection: require events with exactly four leptons.
def get_lepton_query():
"""Performs event selection with func_adl transformer: require events with exactly four leptons.
Also select all columns needed further downstream for processing &
histogram filling.
"""
return source.Where(lambda event: event.lep_n == 4).Select(
from servicex import query as q
return q.FuncADL_Uproot().FromTree('mini')\
.Where(lambda event: event.lep_n == 4).Select(
lambda e: {
"lep_pt": e.lep_pt,
"lep_eta": e.lep_eta,
Expand All @@ -179,23 +183,33 @@ def get_lepton_query(source: ObjectStream) -> ObjectStream:
}
)

def get_lepton_query_uproot_raw():
"""Performs event selection with uproot-raw transformer: require events with exactly four leptons.
Also select all columns needed further downstream for processing &
histogram filling.
"""
from servicex import query as q
return q.UprootRaw([{'treename': 'mini',
'expressions': ['lep_pt', 'lep_eta', 'lep_phi', 'lep_energy', 'lep_charge',
'lep_typeid', 'mcWeight', 'scaleFactor', 'scaleFactorUP', 'scaleFactorDOWN'],
'aliases': { 'lep_typeid': 'lep_type', 'lep_energy': 'lep_E',
'scaleFactor': 'scaleFactor_ELE*scaleFactor_MUON*scaleFactor_LepTRIGGER*scaleFactor_PILEUP',
'scaleFactorUP': 'scaleFactor*1.1',
'scaleFactorDOWN': 'scaleFactor*0.9' }
}])

# %% [markdown]
# # Caching the queried datasets with `ServiceX`
#
# Using the queries created with `func_adl`, we are using `ServiceX` to read the ATLAS Open Data files to build cached files with only the specific event information as dictated by the query.

# %%
# dummy dataset on which to generate the query
dummy_ds = ServiceXSourceUpROOT("cernopendata://dummy", "mini", backend_name="uproot")

# tell low-level infrastructure not to contact ServiceX yet, only to
# return the qastle string it would have sent
dummy_ds.return_qastle = True

# create the query
lepton_query = get_lepton_query(dummy_ds)
query = lepton_query.value()
if USE_SERVICEX_UPROOT_RAW:
query = get_lepton_query_uproot_raw()
else:
query = get_lepton_query()


# now we query the files and create a fileset dictionary containing the
# URLs pointing to the queried files
Expand All @@ -204,13 +218,15 @@ def get_lepton_query(source: ObjectStream) -> ObjectStream:

fileset = {}

for ds_name in input_files.keys():
ds = ServiceXDataset(input_files[ds_name], backend_name="uproot", ignore_cache=IGNORE_CACHE)
files = ds.get_data_rootfiles_uri(query, as_signed_url=True, title=ds_name)
bundle = { 'General': { 'Delivery': 'URLs' },
'Sample': [ { 'Name': ds_name,
'Query': query,
'Dataset': servicex.dataset.FileList(input_files[ds_name]),
'IgnoreLocalCache': IGNORE_CACHE } for ds_name in input_files.keys() ]
}

fileset[ds_name] = {"files": [f.url for f in files],
"metadata": {"dataset_name": ds_name}
}
results = servicex.deliver(bundle)
fileset = { _: {"files": results[_], "metadata": {"dataset_name": _}} for _ in results }

print(f"execution took {time.time() - t0:.2f} seconds")

Expand Down Expand Up @@ -383,7 +399,8 @@ def postprocess(self, accumulator):
executor = processor.FuturesExecutor(workers=NUM_CORES)
run = processor.Runner(executor=executor, savemetrics=True, metadata_cache={},
chunksize=CHUNKSIZE, schema=BaseSchema)
all_histograms, metrics = run(fileset, "servicex", processor_instance=HZZAnalysis())
# The trees returned by ServiceX will have different names depending on the query language used
all_histograms, metrics = run(fileset, "mini" if USE_SERVICEX_UPROOT_RAW else "servicex", processor_instance=HZZAnalysis())

print(f"execution took {time.time() - t0:.2f} seconds")

Expand Down
11 changes: 0 additions & 11 deletions analyses/atlas-open-data-hzz/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
import os
import shutil

from coffea.processor import servicex
from func_adl import ObjectStream
import matplotlib.pyplot as plt
import numpy as np
from servicex.servicex import ServiceXDataset


def clean_up():
Expand Down Expand Up @@ -61,11 +58,3 @@ def save_figure(figname: str):

for filetype in ["pdf", "png"]:
fig.savefig(f"figures/{figname}.{filetype}")


def make_datasource(fileset:dict, name: str, query: ObjectStream):
"""Creates a ServiceX datasource for a particular ATLAS Open data file."""
datasets = [ServiceXDataset(fileset[name], backend_name="uproot")]
return servicex.DataSource(
query=query, metadata={"dataset_category": name}, datasets=datasets
)
Loading

0 comments on commit 62f51e7

Please sign in to comment.