In this section we will take our large data set, merge it with another one and perform some other basic manipulations supported by dplyr. On the way we will encounter two key Spark functions: compute() and collect().
The country of Urghuland is geograpically divided in counties, and this information is stored in the variable geo. There are approximately two thousand counties in Urghuland, which is far too many to use in a regression analysis. Since counties are grouped into states we can use the state variable instead, in order to account for geography.
The mapping from counties to states is contained in the file “geo_state.csv”, and therefore we will load it into Spark and merge it into our data using geo as merging key.
state <- spark_read_csv(sc, name="state",
path=paste(data.raw.dir,"geo_state.csv",sep=""))
merged <- dat %>% left_join(state,by="geo")
## Warning: package 'treemap' was built under R version 3.6.2
Figure 1: People in Urghuland live in 9 different states of quite different sizes.
Notice that this operation takes a surprisingly small amount of time: this is because Spark is “lazy”, and will not really perform this evaluation until it is needed. In other words all Spark has done is to prepare the SQL query that it will run behind the scenes and stored it in merged, until it is the moment to actually use the merged file. You can actually see the SQL query with the following command:
## <SQL>
## SELECT `LHS`.`geo` AS `geo`, `LHS`.`gender` AS `gender`, `LHS`.`age` AS `age`, `LHS`.`id` AS `id`, `LHS`.`bmi` AS `bmi`, `LHS`.`smoking` AS `smoking`, `LHS`.`income` AS `income`, `LHS`.`education` AS `education`, `LHS`.`marital_status` AS `marital_status`, `LHS`.`heart` AS `heart`, `LHS`.`diabetes` AS `diabetes`, `LHS`.`hypertension` AS `hypertension`, `LHS`.`stroke` AS `stroke`, `LHS`.`cancer` AS `cancer`, `LHS`.`gvt_cost` AS `gvt_cost`, `LHS`.`cost` AS `cost`, `RHS`.`state` AS `state`
## FROM `Urghuland_data` AS `LHS`
## LEFT JOIN `state` AS `RHS`
## ON (`LHS`.`geo` = `RHS`.`geo`)
In order to force Spark to actually perform the merge we can use the function compute() and store the result in a Spark table that we call “merged_table”. If we call show_query() after compute() we see that the SQL call has changed, and it merely reads the table from storage:
## <SQL>
## SELECT *
## FROM `merged_table`
Since this table now sits in Spark RDD you can inspect with the Rstudio previewer under the “Connection” tab.
Issue: If you are piping several dplyr commands you may often need to force an evaluation using compute(), usually after a join. It seems that Spark lazy evaluation gets confused when piping a bunch of SQL statements.
As an example of this issue with piped SQL statements let us try to subset the merged data set and extract only the first 100 records. It would be natural to use this syntax:
The code will run, but any attempt to actually use the object submerge will give a Java error. However, the following code will work:
submerge = dat %>% left_join(state,by="geo") %>%
compute("submerge_table") %>% sample_n(100)
### let us look at the SQL
show_query(submerge)
## <SQL>
## SELECT *
## FROM (SELECT *
## FROM `submerge_table` TABLESAMPLE (100 rows) ) `dbplyr_005`
At this point we have a small data set we may want to import into memory so it becomes a regular R object and we can do things with it. This is done with the function collect(), that retrieves the data in a local tibble. If we do not need the Spark object we can simply rewrite on it:
Page built: 2020-01-21