Skip to content
Harish Butani edited this page Apr 18, 2012 · 8 revisions

Analytical Queries with Hive: SQLWindowing, and Table Functions.

Overview

Table Functions are a powerful mechanism to extend a database’s functionality. Further a Partitioned Table function capability combines extensibility with the ability to partition & parallelize how a function operates on the input data set. Many databases provide this capability whether as Parallel Pipelined Functions in Oracle, or as SQL +MR in Aster.

Hive enables this capability by allowing users to specify Map & Reduce scripts where a Table could appear. But this lacks the simplicity of injecting a table function into SQL. SQL Windowing is a subarea of table functions where aggregations operate on partitions or windows of rows. We feel that both these use cases could benefit from a SQL like interface for a User. SQLWindowing for Hive is an open source project that provides a layer on top of Hive that enables users to specify Windowing and Table Function based analysis in a HQL like fashion. For detailed technical writeup, please look at Windowing.pdf and for details on the project please visit the project page. In this document we give a brief overview of the project, from a End User’s perspective.

There are 2 forms of Queries supported:

Windowing

Here the input to the Windowing clauses can be any Hive table or Query. The Windowing functions support aggregation, lead, lag, linear regression etc. See examples below.

from <hive table or query> partition by ... order by
with
  windowing expressions
select cols
where ...

Table functions

The difference here is that the partitions from the Hive Query are handed to a table function, for e.g. NPath (similar to Aster’s NPath function), which then processes a partition and outputs a partition.

from tableFunction(<hive table or query> partition by ... order by)
select cols
where ...

Table Functions maybe chained together. It will also be possible to chain Queries, so the input to a Table function could be another SQLWindowing Query.

Setup and install

  • download com.sap.hadoop.windowing-0.0.2-SNAPSHOT.jar
    cp com.sap.hadoop.windowing-0.0.2-SNAPSHOT.jar to $HIVE_HOME/lib
    cp $HIVE_HOME/bin/ext/cli.sh $HIVE_HOME/bin/ext/windowCli.sh
    download groovy-all-1.8.0.jar and copy it to $HIVE_HOME/lib. If you want a more recent version of groovy, download from http://groovy.codehaus.org/Download
        
  • edit windowCli.sh; change to:
THISSERVICE=windowingCli
export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "

windowingCli () {
  CLASS=com.sap.hadoop.windowing.WindowingHiveCliDriver
  if $cygwin; then
    HIVE_LIB=`cygpath -w "$HIVE_LIB"`
  fi
  JAR=${HIVE_LIB}/com.sap.hadoop.windowing-0.0.2-SNAPSHOT.jar
  exec $HADOOP jar $JAR $CLASS "$@"
}

windowingCli_help () {
  windowingCli "--help"
} 

Usage

The library is usable in 2 forms: as a Command Line Interface and a as a Java library.

Command Line Interface

Start the windowingCLI service for e.g.:

$hive --service windowingCli

In this mode you can intermix Hive & SQLWindowing queries from the shell. Use ‘wmode’ to switch between hive & SQLWindowing mode. For e.g. here is a sample session:

SELECT * FROM bucketed_users TABLESAMPLE(BUCKET 1 OUT OF 4 ON id);
wmode windowing;

from census_q1 partition by county order by county, arealand desc with rank() as r select county, tract, arealand, r into path='/tmp/wout';

wmode hive;
select count(*) from movieratings2;
wmode windowing;

from <select origin_city_name, year, month, day_of_month, dep_time from flightsdata where dest_city_name = 'New York' and dep_time != '' and day_of_week = 1> partition by origin_city_name, year, month, day_of_month order by dep_time select origin_city_name, year, month, day_of_month, dep_time, <lag('dep_time', 1)> as lastdep[string] where <((dep_time[0..1] as int) - (lag('dep_time', 1)[0..1] as int)) * 60 + ((dep_time[2..3] as int) - (lag('dep_time',1)[2..3] as int)) \> 60> into path='/tmp/wout' serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' with serdeproperties('field.delim'=',') format 'org.apache.hadoop.mapred.TextOutputFormat';

from census_q2 partition by county order by pop100 desc with rank() as r, sum(pop100) as s, first_value(pop100) as fv select county, name, pop100, r, <((double)pop100)/fv *100> as percentOfTopSubCounty[double], <lag('pop100', 1) - pop100> as diffFromNextLargestSubCounty[int]  into path='/tmp/wout';

wmode hive;
select /*+ mapjoin(bucketed_movies_rc2) */ bucketed_movieratings_rc2.userid
from bucketed_movieratings_rc2 join bucketed_movies_rc2
  on (bucketed_movieratings_rc2.movieid = bucketed_movies_rc2.movie_id);
wmode windowing;

from <select county, tract, arealand from geo_header_sf1 where sumlev
= 140> partition by county order by county, arealand desc with rank()
as r select county, tract, arealand, r into path='/tmp/wout' serde
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' format
'org.apache.hadoop.mapred.TextOutputFormat';

Java Lib

The service is packaged as a java jar that can be embedded in your App. There is a simple API to interact with the WindowingEngine; essentially $execute(query)$. Here is how you setup a Windowing Shell

        // setup your hadoop configuration
	Configuration conf = WORK_LOCALMR();

       // create WindowingShell
	WindowingShell wshell = new WindowingShell(conf, new
                               	MRTranslator(), new MRExecutor());

       // in API mode setup a Thrift Based HiveQuery Executor
	//embedded Hive Queries will be run using this Executor.
	wshell.hiveQryExec = new ThriftBasedHiveQueryExecutor(conf);

      // execute a Query.
      wshell.execute("""
	   from <select origin_city_name, year, month, day_of_month, arr_delay, fl_num
			 from flightsdata
			 where dest_city_name = 'New York' and dep_time != ''>
	   partition by fl_num
	   order by year, month, day_of_month
	   with sum(<arr_delay < 0 ? 1 : 0>) over rows between 5 preceding and current row as delaycount[int]
	   select origin_city_name, fl_num, year, month, day_of_month, delaycount
	   where <delaycount \\>= 5>
	   into path='/tmp/wout'
	   serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
	   with serdeproperties('field.delim'=',')
	   format 'org.apache.hadoop.mapred.TextOutputFormat'""")

Sample Queries

SQL Windowing

Windowing

Currently there are 21 functions available. These are loosely divided into Ranking, Aggregation, Navigation and Statistics functions. Not all functions support a windowing clause. Functions are described by an annotation that specifies their details: name, description, windowing support, args with their names, types and if they are required/optional.

from part_rc
partition by p_mfgr
order by p_mfgr, p_name
with
	rank() as r,
        sum(p_size) over rows between unbounded preceding and current row as s,
        sum(p_size) over rows between current row and unbounded following as s1,
	denserank() as dr,
	cumedist() as cud,
	percentrank() as pr,
	ntile(<3>) as nt,
	count(<p_size>) as c,
	count(<p_size>, 'all') as ca,
	count(<p_size>, 'distinct') as cd,
	avg(<p_size>) as avg, stddev(p_size) as st,
	first_value(p_size) as fv, last_value(p_size) as lv,
	first_value(p_size, 'true') over rows between 2 preceding and 2 following as fv2
select p_mfgr,p_name, p_size, r, dr, cud, pr, 
       nt, c, ca, cd, avg, st, fv,lv, fv2

TopN, Lead/Lag

We have loaded a subset of the 2011 Census data into our Hive instance.

Calculate the Top 3 Tracts(based on land area) by County.

from <select county, tract, arealand 
         from geo_header_sf1 
         where sumlev = 140>
   partition by county
   order by county, arealand desc
with rank() as r,
     sum(arealand) over rows between 
                   unbounded preceding and 
                   current row as cum_area,
select county, tract, arealand, r, cum_area
where <r < 3>

Compute the percentage of largest subcounty, difference from next largest county

from census_q2
partition by county
order by pop100 desc
with rank() as r,
     sum(pop100) as s,
     first_value(pop100) as fv
select county, name, pop100, r, 
       <((double)pop100)/fv *100> as percentOfTopSubCounty[double],
      <lag('pop100', 1) - pop100> as diffFromNextLargestSubCounty[int] 

Time Series Analysis

These examples are based on the Flights on-time dataset from TSA.

List flights to NY on Mondays, for which there are no flights within 1 hour before this flight.

from <select origin_city_name, year, month, day_of_month, dep_time 
      from flightsdata 
      where dest_city_name = 'New York' and dep_time != '' and day_of_week = 1>
partition by origin_city_name, year, month, day_of_month
order by dep_time
select origin_city_name, year, month, day_of_month, dep_time, <lag('dep_time', 1)> as lastdep[string]
where <((dep_time[0..1] as int) - (lag('dep_time', 1)[0..1] as int)) * 60 + 
	   	((dep_time[2..3] as int) - (lag('dep_time',1)[2..3] as int)) \\> 60>

List incidents by airline where a passenger would have missed a connecting flight (of the same airline) because of a delay.

  • create a set of flights arriving or leaving NY by doing a union all on 2 queries on flightsdata
  • tag arriving flights as 1, leaving as 2
  • partition by carrier, because we are comparing flights from the same airline
  • sort by carrier, year, month, day_of_month and time(arrival or departure)
  • for any arriving flight look forward to find any leaving flight within the window delay + 30 mins. Possible solutions:
    1. sum(<1>) over range current row and <t+ delay> 30.0 more as numflights; count flights that are within 30’s or arrival time. Problem need to only look at departing flights
    2. Below just showing expression to compare with next row. Soln 1 is a better soln.
    3. Custom function.
from <select * 
      from (
            select unique_carrier, fl_num, year, month, day_of_month, 
                   CRS_ARR_TIME as t, arr_delay as delay, 1 as flight
            from flightsdata
            where  dest_city_name = 'New York' and dep_time != '' 
                   and arr_delay is not null
            union all
            select unique_carrier, fl_num, year, month, day_of_month, 
                   CRS_DEP_TIME as t, dep_delay as delay, 2 as flight
           from flightsdata
           where  origin_city_name = 'New York' and dep_time != ''
       ) t>
partition by unique_carrier
order by year, month, day_of_month, t
select unique_carrier, fl_num, year, month, day_of_month, t
where <(flight == 1) && (delay + 30.0 \\> (((lead('t', 1)[0..1] as int) - (t[0..1] as int)) * 60 + 
                            ((lead('t', 1)[2..3] as int) - (t[2..3] as int))
                          )
                  )>

NPath table function

NPath(String Pattern, GroovyExpr Symbols, GroovyExpr Results)

Returns rows that meet a specified pattern. Use Symbols to specify a list of expressions to match. Pattern is used to specify a Path. The results list can contain expressions based on the input columns and also the matched Path.

List incidents where a Flight(to NY) has been more than 15 minutes late 5 or more times in a row.

from npath(<select origin_city_name, year, month, day_of_month, arr_delay, fl_num
	    from flightsdata
	    where dest_city_name = 'New York' and dep_time != ''>
	partition by fl_num
	order by year, month, day_of_month,
	'LATE.LATE.LATE.LATE.LATE+',
	<[LATE : "arr_delay \\> 15"]>,
	<["origin_city_name", "fl_num", "year", "month", "day_of_month",
		["(path.sum() { it.arr_delay})/((double)count)", "double", "avgDelay"],
		["count", "int", "numOfDelays"]
	]>)
select origin_city_name, fl_num, year, month, day_of_month, avgDelay, numOfDelays

Statistics Functions

LinearRegSlope
computes the slope of the regression line fitted to non-null (x, y) pairs.
LinearRegIntercept
computes the intercept of the regression line fitted to non-null (x, y) pairs.
RegCount
returns the number of non-null (x, y) pairs.
from statisticsdataset
partition by p_mfgr
order by p_mfgr, p_name
with
   linearRegSlope(val1, val2) as b,
   linearRegIntercept(val1, val2) as a,
   regCount(val1, val2) as c
select a, b, c

For details on the functions we plan to support look at Table Functions