8 User defined functions and HIVE SQL

In the previous example we noted that spark_apply() is very slow and should be used as a last resort. An alternative is to use registered HIVE functions or to use SQL queries. Let us start with SQL.

The strategy is as follows: we will group dat by state, and then within each value of state we generate a row number in random order. This randomizes the records within each group, so that we can then select the first row from each group and retain that id variable for that row. In Spark you can perform SQL queries with the function sdf_sql(), that takes as input the Spark connection and the SQL query you want to run, as a string. Therefore we run the following:

dat = compute(dat,"urghuland_data")
state = sdf_sql(sc, "SELECT *, ROW_NUMBER() OVER (PARTITION BY state ORDER BY rand()) AS rn FROM urghuland_data") %>% 
  group_by(state) %>% filter(rn==1) %>% select(state, id)
state
## # Source: spark<?> [?? x 2]
## # Groups: state
##   state           id
##   <chr>        <int>
## 1 Berbangmu 15036107
## 2 Aboda        48569
## 3 Haivismal 12387221
## 4 Morgenor  11583387
## 5 Blitzbar   6250761
## 6 Sintbu     8477715
## 7 Isnor      1905984
## 8 Itsware    6161812

Notice that first I saved data to Spark using compute() because sdf_sql() reads from the Spark connection, not from R. This operation is orders of magnitude faster than using spark_apply().

The code above may seem mysterious to researchers who are not familiar with SQL, and indeed it seems unnecessarily complicated. Let us see then how we can solve the same problem with a simpler method.

A simple strategy consists in generating a random variable random for each record, then group the data by state and within each group select the record with the minimum (or maximum) value of the random variable. In dplyr we would do as follows:

nr = sdf_nrow(dat)
state = dat %>% mutate(random=runif(nr)) %>% 
  group_by(state) %>% filter(random == min(random, na.rm=TRUE)) %>% 
  select(state, id) 

This almost works in sparklyr, except for the fact that runif() is not a registered function. However HIVE has the function rand() that we are allowed to use because HIVE functions are all registered. Therefore we substitute runif with rand and run:

state = dat %>% mutate(random=rand()) %>% 
  group_by(state) %>% filter(random == min(random, na.rm=TRUE)) %>% 
  select(state, id) 
state
## # Source: spark<?> [?? x 2]
## # Groups: state
##   state           id
##   <chr>        <int>
## 1 Berbangmu 16598116
## 2 Aboda      4097141
## 3 Haivismal 17837054
## 4 Morgenor   2128844
## 5 Blitzbar   8759090
## 6 Sintbu     2461646
## 7 Isnor     11268890
## 8 Itsware   10579345

Page built: 2020-01-21