In the previous example we noted that spark_apply() is very slow and should be used as a last resort. An alternative is to use registered HIVE functions or to use SQL queries. Let us start with SQL.
The strategy is as follows: we will group dat by state, and then within each value of state we generate a row number in random order. This randomizes the records within each group, so that we can then select the first row from each group and retain that id variable for that row. In Spark you can perform SQL queries with the function sdf_sql(), that takes as input the Spark connection and the SQL query you want to run, as a string. Therefore we run the following:
dat = compute(dat,"urghuland_data")
state = sdf_sql(sc, "SELECT *, ROW_NUMBER() OVER (PARTITION BY state ORDER BY rand()) AS rn FROM urghuland_data") %>%
group_by(state) %>% filter(rn==1) %>% select(state, id)
state
## # Source: spark<?> [?? x 2]
## # Groups: state
## state id
## <chr> <int>
## 1 Berbangmu 15036107
## 2 Aboda 48569
## 3 Haivismal 12387221
## 4 Morgenor 11583387
## 5 Blitzbar 6250761
## 6 Sintbu 8477715
## 7 Isnor 1905984
## 8 Itsware 6161812
Notice that first I saved data to Spark using compute() because sdf_sql() reads from the Spark connection, not from R. This operation is orders of magnitude faster than using spark_apply().
The code above may seem mysterious to researchers who are not familiar with SQL, and indeed it seems unnecessarily complicated. Let us see then how we can solve the same problem with a simpler method.
A simple strategy consists in generating a random variable random for each record, then group the data by state and within each group select the record with the minimum (or maximum) value of the random variable. In dplyr we would do as follows:
nr = sdf_nrow(dat)
state = dat %>% mutate(random=runif(nr)) %>%
group_by(state) %>% filter(random == min(random, na.rm=TRUE)) %>%
select(state, id)
This almost works in sparklyr, except for the fact that runif() is not a registered function. However HIVE has the function rand() that we are allowed to use because HIVE functions are all registered. Therefore we substitute runif with rand and run:
state = dat %>% mutate(random=rand()) %>%
group_by(state) %>% filter(random == min(random, na.rm=TRUE)) %>%
select(state, id)
state
## # Source: spark<?> [?? x 2]
## # Groups: state
## state id
## <chr> <int>
## 1 Berbangmu 16598116
## 2 Aboda 4097141
## 3 Haivismal 17837054
## 4 Morgenor 2128844
## 5 Blitzbar 8759090
## 6 Sintbu 2461646
## 7 Isnor 11268890
## 8 Itsware 10579345
Page built: 2020-01-21