Skip to content

Commit

Permalink
test(tpcds): add queries 76-78 (#9954)
Browse files Browse the repository at this point in the history
  • Loading branch information
gforsyth authored Aug 28, 2024
1 parent 7d1390e commit 6acbfc6
Showing 1 changed file with 350 additions and 0 deletions.
350 changes: 350 additions & 0 deletions ibis/backends/tests/tpc/ds/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -3870,6 +3870,356 @@ def _sales(
return expr


@tpc_test("ds")
def test_76(
store_sales,
item,
date_dim,
web_sales,
catalog_sales,
):
def _sales(
sales, customer_sk, sold_date, item_sk, channel, col_name, ext_sales_price
):
return (
sales.join(item, [customer_sk.isnull(), item_sk == item.i_item_sk])
.join(date_dim, sold_date == date_dim.d_date_sk)
.select(
_.d_year,
_.d_qoy,
_.i_category,
ext_sales_price=ext_sales_price,
channel=ibis.literal(channel),
col_name=ibis.literal(col_name),
)
.relocate("channel", "col_name")
)

store = _sales(
store_sales,
_.ss_store_sk,
_.ss_sold_date_sk,
_.ss_item_sk,
"store",
"ss_store_sk",
_.ss_ext_sales_price,
)

web = _sales(
web_sales,
_.ws_ship_customer_sk,
_.ws_sold_date_sk,
_.ws_item_sk,
"web",
"ws_ship_customer_sk",
_.ws_ext_sales_price,
)

catalog = _sales(
catalog_sales,
_.cs_ship_addr_sk,
_.cs_sold_date_sk,
_.cs_item_sk,
"catalog",
"cs_ship_addr_sk",
_.cs_ext_sales_price,
)

foo = store.union(web, catalog)

expr = (
foo.group_by(_.channel, _.col_name, _.d_year, _.d_qoy, _.i_category)
.agg(sales_cnt=_.count(), sales_amt=_.ext_sales_price.sum())
.order_by(
_.channel.asc(nulls_first=True),
_.col_name.asc(nulls_first=True),
_.d_year.asc(nulls_first=True),
_.d_qoy.asc(nulls_first=True),
_.i_category.asc(nulls_first=True),
)
).limit(100)

return expr


@pytest.mark.xfail(raises=AttributeError, reason="requires rollup")
@tpc_test("ds", result_is_empty=True)
def test_77(
store_sales,
date_dim,
store,
store_returns,
catalog_sales,
catalog_returns,
web_sales,
web_page,
web_returns,
):
date_range = (ibis.date(2000, 8, 23), ibis.date(2000, 9, 22))
ss = (
store_sales.join(
date_dim,
[
("ss_sold_date_sk", "d_date_sk"),
date_dim.d_date.between(*date_range),
],
)
.join(store, _.ss_store_sk == store.s_store_sk)
.group_by(_.s_store_sk)
.agg(sales=_.ss_ext_sales_price.sum(), profit=_.ss_net_profit.sum())
)

sr = (
store_returns.join(
date_dim,
[
("sr_returned_date_sk", "d_date_sk"),
date_dim.d_date.between(*date_range),
],
)
.join(store, _.sr_store_sk == store.s_store_sk)
.group_by(_.s_store_sk)
.agg(returns_=_.sr_return_amt.sum(), profit_loss=_.sr_net_loss.sum())
)

cs = (
catalog_sales.join(
date_dim,
[
("cs_sold_date_sk", "d_date_sk"),
date_dim.d_date.between(*date_range),
],
)
.group_by(_.cs_call_center_sk)
.agg(sales=_.cs_ext_sales_price.sum(), profit=_.cs_net_profit.sum())
)

cr = (
catalog_returns.join(
date_dim,
[
("cr_returned_date_sk", "d_date_sk"),
date_dim.d_date.between(*date_range),
],
)
.group_by(_.cr_call_center_sk)
.agg(returns_=_.cr_return_amount.sum(), profit_loss=_.cr_net_loss.sum())
)

ws = (
web_sales.join(
date_dim,
[
("ws_sold_date_sk", "d_date_sk"),
date_dim.d_date.between(*date_range),
],
)
.join(web_page, _.ws_web_page_sk == web_page.wp_web_page_sk)
.group_by(_.wp_web_page_sk)
.agg(sales=_.ws_ext_sales_price.sum(), profit=_.ws_net_profit.sum())
)

wr = (
web_returns.join(
date_dim,
[
("wr_returned_date_sk", "d_date_sk"),
date_dim.d_date.between(*date_range),
],
)
.join(web_page, _.wr_web_page_sk == web_page.wp_web_page_sk)
.group_by(_.wp_web_page_sk)
.agg(returns_=_.wr_return_amt.sum(), profit_loss=_.wr_net_loss.sum())
)

x1 = ss.left_join(sr, "s_store_sk").select(
channel=ibis.literal("store channel"),
id=_.s_store_sk,
sales=_.sales,
returns=_.returns_.coalesce(0),
profit=(_.profit - _.profit_loss.coalesce(0)),
)

x2 = cs.cross_join(cr).select(
channel=ibis.literal("catalog channel"),
id=_.cs_call_center_sk,
sales=_.sales,
returns=_.returns_,
profit=(_.profit - _.profit_loss),
)

x3 = ws.left_join(wr, "wp_web_page_sk").select(
channel=ibis.literal("web channel"),
id=_.wp_web_page_sk,
sales=_.sales,
returns=_.returns_.coalesce(0),
profit=(_.profit - _.profit_loss.coalesce(0)),
)

expr = (
x1.union(x2, x3)
.group_by(ibis.rollup("channel", "id"))
.agg(
sales=_.sales.sum(),
returns_=_.returns_.sum(),
profit=_.profit.sum(),
)
.order_by(
_.channel.asc(nulls_first=True),
_.id.asc(nulls_first=True),
_.returns_.desc(),
)
).limit(100)

return expr


@tpc_test("ds")
def test_78(
web_sales,
web_returns,
date_dim,
catalog_sales,
catalog_returns,
store_sales,
store_returns,
):
ws = (
web_sales.left_join(
web_returns,
[("ws_order_number", "wr_order_number"), ("ws_item_sk", "wr_item_sk")],
)
.join(
date_dim,
[("ws_sold_date_sk", "d_date_sk"), web_returns.wr_order_number.isnull()],
)
.group_by(_.d_year, _.ws_item_sk, _.ws_bill_customer_sk)
.agg(
ws_qty=_.ws_quantity.sum(),
ws_wc=_.ws_wholesale_cost.sum(),
ws_sp=_.ws_sales_price.sum(),
)
.relocate(
ws_sold_year=_.d_year,
ws_item_sk=_.ws_item_sk,
ws_customer_sk=_.ws_bill_customer_sk,
)
)

cs = (
catalog_sales.left_join(
catalog_returns,
[("cs_order_number", "cr_order_number"), ("cs_item_sk", "cr_item_sk")],
)
.join(
date_dim,
[
("cs_sold_date_sk", "d_date_sk"),
catalog_returns.cr_order_number.isnull(),
],
)
.group_by(_.d_year, _.cs_item_sk, _.cs_bill_customer_sk)
.agg(
cs_qty=_.cs_quantity.sum(),
cs_wc=_.cs_wholesale_cost.sum(),
cs_sp=_.cs_sales_price.sum(),
)
.relocate(
cs_sold_year=_.d_year,
cs_item_sk=_.cs_item_sk,
cs_customer_sk=_.cs_bill_customer_sk,
)
)

ss = (
store_sales.left_join(
store_returns,
[("ss_ticket_number", "sr_ticket_number"), ("ss_item_sk", "sr_item_sk")],
)
.join(
date_dim,
[
("ss_sold_date_sk", "d_date_sk"),
store_returns.sr_ticket_number.isnull(),
],
)
.group_by(_.d_year, _.ss_item_sk, _.ss_customer_sk)
.agg(
ss_qty=_.ss_quantity.sum(),
ss_wc=_.ss_wholesale_cost.sum(),
ss_sp=_.ss_sales_price.sum(),
)
.relocate(
ss_sold_year=_.d_year,
ss_item_sk=_.ss_item_sk,
ss_customer_sk=_.ss_customer_sk,
)
)

expr = ss.left_join(
ws,
[
("ss_sold_year", "ws_sold_year"),
("ss_item_sk", "ws_item_sk"),
("ss_customer_sk", "ws_customer_sk"),
],
)

expr = expr.left_join(
cs,
[
("ss_sold_year", "cs_sold_year"),
("ss_item_sk", "cs_item_sk"),
("ss_customer_sk", "cs_customer_sk"),
],
)

expr = (
expr.filter(
_.ss_sold_year == 2000,
(_.ws_qty.coalesce(0) > 0) | (_.cs_qty.coalesce(0) > 0),
)
.mutate(
ratio=(
(_.ss_qty * 1.00) / (_.ws_qty.coalesce(0) + _.cs_qty.coalesce(0))
).round(2),
store_qty=_.ss_qty,
store_wholesale_cost=_.ss_wc,
store_sales_price=_.ss_sp,
other_chan_qty=_.ws_qty.coalesce(0) + _.cs_qty.coalesce(0),
other_chan_wholesale_cost=_.ws_wc.coalesce(0) + _.cs_wc.coalesce(0),
other_chan_sales_price=_.ws_sp.coalesce(0) + _.cs_sp.coalesce(0),
)
.order_by(
_.ss_sold_year,
_.ss_item_sk,
_.ss_customer_sk,
_.ss_qty.desc(),
_.ss_wc.desc(),
_.ss_sp.desc(),
_.other_chan_qty,
_.other_chan_wholesale_cost,
_.other_chan_sales_price,
_.ratio,
)
.select(
"ss_sold_year",
"ss_item_sk",
"ss_customer_sk",
"ratio",
"store_qty",
"store_wholesale_cost",
"store_sales_price",
"other_chan_qty",
"other_chan_wholesale_cost",
"other_chan_sales_price",
)
.limit(100)
)

return expr


@tpc_test("ds")
def test_79(store_sales, date_dim, store, household_demographics, customer):
return (
Expand Down

0 comments on commit 6acbfc6

Please sign in to comment.