Spark dataframe to nested JSON
up vote
1
down vote
favorite
I have a dataframe joinDf
created from joining the following four dataframes on userId
:
val detailsDf = Seq((123,"first123","xyz"))
.toDF("userId","firstName","address")
val emailDf = Seq((123,"abc@gmail.com"),
(123,"def@gmail.com"))
.toDF("userId","email")
val foodDf = Seq((123,"food2",false,"Italian",2),
(123,"food3",true,"American",3),
(123,"food1",true,"Mediterranean",1))
.toDF("userId","foodName","isFavFood","cuisine","score")
val gameDf = Seq((123,"chess",false,2),
(123,"football",true,1))
.toDF("userId","gameName","isOutdoor","score")
val joinDf = detailsDf
.join(emailDf, Seq("userId"))
.join(foodDf, Seq("userId"))
.join(gameDf, Seq("userId"))
User
's food and game favorites should be ordered by score in the ascending order.
I am trying to create a result from this joinDf
where the JSON looks like the following:
[
"userId": "123",
"firstName": "first123",
"address": "xyz",
"UserFoodFavourites": [
"foodName": "food1",
"isFavFood": "true",
"cuisine": "Mediterranean",
,
"foodName": "food2",
"isFavFood": "false",
"cuisine": "Italian",
,
"foodName": "food3",
"isFavFood": "true",
"cuisine": "American",
]
"UserEmail": [
"abc@gmail.com",
"def@gmail.com"
]
"UserGameFavourites": [
"gameName": "football",
"isOutdoor": "true"
,
"gameName": "chess",
"isOutdoor": "false"
]
]
Should I use joinDf.groupBy().agg(collect_set())
?
Any help would be appreciated.
apache-spark dataframe apache-spark-sql apache-spark-dataset
add a comment |
up vote
1
down vote
favorite
I have a dataframe joinDf
created from joining the following four dataframes on userId
:
val detailsDf = Seq((123,"first123","xyz"))
.toDF("userId","firstName","address")
val emailDf = Seq((123,"abc@gmail.com"),
(123,"def@gmail.com"))
.toDF("userId","email")
val foodDf = Seq((123,"food2",false,"Italian",2),
(123,"food3",true,"American",3),
(123,"food1",true,"Mediterranean",1))
.toDF("userId","foodName","isFavFood","cuisine","score")
val gameDf = Seq((123,"chess",false,2),
(123,"football",true,1))
.toDF("userId","gameName","isOutdoor","score")
val joinDf = detailsDf
.join(emailDf, Seq("userId"))
.join(foodDf, Seq("userId"))
.join(gameDf, Seq("userId"))
User
's food and game favorites should be ordered by score in the ascending order.
I am trying to create a result from this joinDf
where the JSON looks like the following:
[
"userId": "123",
"firstName": "first123",
"address": "xyz",
"UserFoodFavourites": [
"foodName": "food1",
"isFavFood": "true",
"cuisine": "Mediterranean",
,
"foodName": "food2",
"isFavFood": "false",
"cuisine": "Italian",
,
"foodName": "food3",
"isFavFood": "true",
"cuisine": "American",
]
"UserEmail": [
"abc@gmail.com",
"def@gmail.com"
]
"UserGameFavourites": [
"gameName": "football",
"isOutdoor": "true"
,
"gameName": "chess",
"isOutdoor": "false"
]
]
Should I use joinDf.groupBy().agg(collect_set())
?
Any help would be appreciated.
apache-spark dataframe apache-spark-sql apache-spark-dataset
add a comment |
up vote
1
down vote
favorite
up vote
1
down vote
favorite
I have a dataframe joinDf
created from joining the following four dataframes on userId
:
val detailsDf = Seq((123,"first123","xyz"))
.toDF("userId","firstName","address")
val emailDf = Seq((123,"abc@gmail.com"),
(123,"def@gmail.com"))
.toDF("userId","email")
val foodDf = Seq((123,"food2",false,"Italian",2),
(123,"food3",true,"American",3),
(123,"food1",true,"Mediterranean",1))
.toDF("userId","foodName","isFavFood","cuisine","score")
val gameDf = Seq((123,"chess",false,2),
(123,"football",true,1))
.toDF("userId","gameName","isOutdoor","score")
val joinDf = detailsDf
.join(emailDf, Seq("userId"))
.join(foodDf, Seq("userId"))
.join(gameDf, Seq("userId"))
User
's food and game favorites should be ordered by score in the ascending order.
I am trying to create a result from this joinDf
where the JSON looks like the following:
[
"userId": "123",
"firstName": "first123",
"address": "xyz",
"UserFoodFavourites": [
"foodName": "food1",
"isFavFood": "true",
"cuisine": "Mediterranean",
,
"foodName": "food2",
"isFavFood": "false",
"cuisine": "Italian",
,
"foodName": "food3",
"isFavFood": "true",
"cuisine": "American",
]
"UserEmail": [
"abc@gmail.com",
"def@gmail.com"
]
"UserGameFavourites": [
"gameName": "football",
"isOutdoor": "true"
,
"gameName": "chess",
"isOutdoor": "false"
]
]
Should I use joinDf.groupBy().agg(collect_set())
?
Any help would be appreciated.
apache-spark dataframe apache-spark-sql apache-spark-dataset
I have a dataframe joinDf
created from joining the following four dataframes on userId
:
val detailsDf = Seq((123,"first123","xyz"))
.toDF("userId","firstName","address")
val emailDf = Seq((123,"abc@gmail.com"),
(123,"def@gmail.com"))
.toDF("userId","email")
val foodDf = Seq((123,"food2",false,"Italian",2),
(123,"food3",true,"American",3),
(123,"food1",true,"Mediterranean",1))
.toDF("userId","foodName","isFavFood","cuisine","score")
val gameDf = Seq((123,"chess",false,2),
(123,"football",true,1))
.toDF("userId","gameName","isOutdoor","score")
val joinDf = detailsDf
.join(emailDf, Seq("userId"))
.join(foodDf, Seq("userId"))
.join(gameDf, Seq("userId"))
User
's food and game favorites should be ordered by score in the ascending order.
I am trying to create a result from this joinDf
where the JSON looks like the following:
[
"userId": "123",
"firstName": "first123",
"address": "xyz",
"UserFoodFavourites": [
"foodName": "food1",
"isFavFood": "true",
"cuisine": "Mediterranean",
,
"foodName": "food2",
"isFavFood": "false",
"cuisine": "Italian",
,
"foodName": "food3",
"isFavFood": "true",
"cuisine": "American",
]
"UserEmail": [
"abc@gmail.com",
"def@gmail.com"
]
"UserGameFavourites": [
"gameName": "football",
"isOutdoor": "true"
,
"gameName": "chess",
"isOutdoor": "false"
]
]
Should I use joinDf.groupBy().agg(collect_set())
?
Any help would be appreciated.
apache-spark dataframe apache-spark-sql apache-spark-dataset
apache-spark dataframe apache-spark-sql apache-spark-dataset
edited Nov 9 at 15:25
Joel
1,6086719
1,6086719
asked Nov 8 at 2:02
dreddy
104110
104110
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
up vote
1
down vote
My solution is based on the answers found here and here
It uses the Window function. It shows how to create a nested list of food preferences for a given userid
based on the food score. Here we are creating a struct
of FoodDetails
from the columns we have
val foodModifiedDf = foodDf.withColumn("FoodDetails",
struct("foodName","isFavFood", "cuisine","score"))
.drop("foodName","isFavFood", "cuisine","score")
println("Just printing the food detials here")
foodModifiedDf.show(10, truncate = false)
Here we are creating a Windowing function which will accumulate the list for a userId
based on the FoodDetails.score
in descending order. The windowing function when applied goes on accumulating the list as it encounters new rows with same userId
. After we have done accumulating, we have to do a groupBy
over the userId
to select the largest list.
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("userId").orderBy( desc("FoodDetails.score"))
val userAndFood = detailsDf.join(foodModifiedDf, "userId")
val newUF = userAndFood.select($"*", collect_list("FoodDetails").over(window) as "FDNew")
println(" UserAndFood dataframe after windowing function applied")
newUF.show(10, truncate = false)
val resultUF = newUF.groupBy("userId")
.agg(max("FDNew"))
println("Final result after select the maximum length list")
resultUF.show(10, truncate = false)
This is how the result looks like finally :
+------+-----------------------------------------------------------------------------------------+
|userId|max(FDNew) |
+------+-----------------------------------------------------------------------------------------+
|123 |[[food3, true, American, 3], [food2, false, Italian, 2], [food1, true, Mediterranean, 1]]|
+------+-----------------------------------------------------------------------------------------+
Given this dataframe, it should be easier to write out the nested json.
In this solution how will you carry over "firstName","address" columns from the detailsDf into resultDF ?
– alexeipab
Nov 10 at 20:19
In the aggregation, you can usenewUF.groupBy("userId").agg(max("FDNew"), first(firstName), first(address))
– user238607
Nov 11 at 8:00
So basically groupBy after join is used to dedupe records after join had caused it. And more joins will add extra duplication and extra work to the executors.
– alexeipab
Nov 11 at 8:33
Read the question properly. We need the food preferences to be sorted by thescore
. That's what the Window function does here. You can run the code and see the results yourself.
– user238607
Nov 11 at 9:15
Indeed, I missed it, it was quite late in the night when I was doing it, thank you for pointing out and I have updated my answer with a quick fix and got the desired output and avoided the extra load on the executors.
– alexeipab
Nov 11 at 9:31
|
show 1 more comment
up vote
0
down vote
The main problem of joining before grouping and collecting lists is the fact that join will produce a lot of records for group by to collapse, in your example it is 12 records after join and before groupby, also you would need to worry about picking "firstName","address" out detailsDf out of 12 duplicates. To avoid both problems your could pre-process the food, email and game dataframes using struct and groupBy and join them to the detailsDf with no risk of exploding your data due to multiple records with the same userId in the joined tables.
val detailsDf = Seq((123,"first123","xyz"))
.toDF("userId","firstName","address")
val emailDf = Seq((123,"abc@gmail.com"),
(123,"def@gmail.com"))
.toDF("userId","email")
val foodDf = Seq((123,"food2",false,"Italian",2),
(123,"food3",true,"American",3),
(123,"food1",true,"Mediterranean",1))
.toDF("userId","foodName","isFavFood","cuisine","score")
val gameDf = Seq((123,"chess",false,2),
(123,"football",true,1))
.toDF("userId","gameName","isOutdoor","score")
val emailGrp = emailDf.groupBy("userId").agg(collect_list("email").as("UserEmail"))
val foodGrp = foodDf
.select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites"))
.groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))
val gameGrp = gameDf
.select($"userId", struct("gameName","isOutdoor","score").as("UserGameFavourites"))
.groupBy("userId").agg(collect_list("UserGameFavourites").as("UserGameFavourites"))
val result = detailsDf.join(emailGrp, Seq("userId"))
.join(foodGrp, Seq("userId"))
.join(gameGrp, Seq("userId"))
result.show(100, false)
Output:
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|userId|firstName|address|UserEmail |UserFoodFavourites |UserGameFavourites |
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|123 |first123 |xyz |[abc@gmail.com, def@gmail.com]|[[1, food1, true, Mediterranean], [2, food2, false, Italian], [3, food3, true, American]]|[[chess, false, 2], [football, true, 1]]|
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
As all groupBy are done on userId and joins as well, spark will optimise it quite well.
UPDATE 1: After @user238607 pointed out that I have missed the original requirement of food preferences being sorted by score, did a quick fix and placed the score column as first element of structure UserFoodFavourites and used sort_array function to arrange data in desired order without forcing extra shuffle operation. Updated the code and its output.
add a comment |
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
1
down vote
My solution is based on the answers found here and here
It uses the Window function. It shows how to create a nested list of food preferences for a given userid
based on the food score. Here we are creating a struct
of FoodDetails
from the columns we have
val foodModifiedDf = foodDf.withColumn("FoodDetails",
struct("foodName","isFavFood", "cuisine","score"))
.drop("foodName","isFavFood", "cuisine","score")
println("Just printing the food detials here")
foodModifiedDf.show(10, truncate = false)
Here we are creating a Windowing function which will accumulate the list for a userId
based on the FoodDetails.score
in descending order. The windowing function when applied goes on accumulating the list as it encounters new rows with same userId
. After we have done accumulating, we have to do a groupBy
over the userId
to select the largest list.
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("userId").orderBy( desc("FoodDetails.score"))
val userAndFood = detailsDf.join(foodModifiedDf, "userId")
val newUF = userAndFood.select($"*", collect_list("FoodDetails").over(window) as "FDNew")
println(" UserAndFood dataframe after windowing function applied")
newUF.show(10, truncate = false)
val resultUF = newUF.groupBy("userId")
.agg(max("FDNew"))
println("Final result after select the maximum length list")
resultUF.show(10, truncate = false)
This is how the result looks like finally :
+------+-----------------------------------------------------------------------------------------+
|userId|max(FDNew) |
+------+-----------------------------------------------------------------------------------------+
|123 |[[food3, true, American, 3], [food2, false, Italian, 2], [food1, true, Mediterranean, 1]]|
+------+-----------------------------------------------------------------------------------------+
Given this dataframe, it should be easier to write out the nested json.
In this solution how will you carry over "firstName","address" columns from the detailsDf into resultDF ?
– alexeipab
Nov 10 at 20:19
In the aggregation, you can usenewUF.groupBy("userId").agg(max("FDNew"), first(firstName), first(address))
– user238607
Nov 11 at 8:00
So basically groupBy after join is used to dedupe records after join had caused it. And more joins will add extra duplication and extra work to the executors.
– alexeipab
Nov 11 at 8:33
Read the question properly. We need the food preferences to be sorted by thescore
. That's what the Window function does here. You can run the code and see the results yourself.
– user238607
Nov 11 at 9:15
Indeed, I missed it, it was quite late in the night when I was doing it, thank you for pointing out and I have updated my answer with a quick fix and got the desired output and avoided the extra load on the executors.
– alexeipab
Nov 11 at 9:31
|
show 1 more comment
up vote
1
down vote
My solution is based on the answers found here and here
It uses the Window function. It shows how to create a nested list of food preferences for a given userid
based on the food score. Here we are creating a struct
of FoodDetails
from the columns we have
val foodModifiedDf = foodDf.withColumn("FoodDetails",
struct("foodName","isFavFood", "cuisine","score"))
.drop("foodName","isFavFood", "cuisine","score")
println("Just printing the food detials here")
foodModifiedDf.show(10, truncate = false)
Here we are creating a Windowing function which will accumulate the list for a userId
based on the FoodDetails.score
in descending order. The windowing function when applied goes on accumulating the list as it encounters new rows with same userId
. After we have done accumulating, we have to do a groupBy
over the userId
to select the largest list.
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("userId").orderBy( desc("FoodDetails.score"))
val userAndFood = detailsDf.join(foodModifiedDf, "userId")
val newUF = userAndFood.select($"*", collect_list("FoodDetails").over(window) as "FDNew")
println(" UserAndFood dataframe after windowing function applied")
newUF.show(10, truncate = false)
val resultUF = newUF.groupBy("userId")
.agg(max("FDNew"))
println("Final result after select the maximum length list")
resultUF.show(10, truncate = false)
This is how the result looks like finally :
+------+-----------------------------------------------------------------------------------------+
|userId|max(FDNew) |
+------+-----------------------------------------------------------------------------------------+
|123 |[[food3, true, American, 3], [food2, false, Italian, 2], [food1, true, Mediterranean, 1]]|
+------+-----------------------------------------------------------------------------------------+
Given this dataframe, it should be easier to write out the nested json.
In this solution how will you carry over "firstName","address" columns from the detailsDf into resultDF ?
– alexeipab
Nov 10 at 20:19
In the aggregation, you can usenewUF.groupBy("userId").agg(max("FDNew"), first(firstName), first(address))
– user238607
Nov 11 at 8:00
So basically groupBy after join is used to dedupe records after join had caused it. And more joins will add extra duplication and extra work to the executors.
– alexeipab
Nov 11 at 8:33
Read the question properly. We need the food preferences to be sorted by thescore
. That's what the Window function does here. You can run the code and see the results yourself.
– user238607
Nov 11 at 9:15
Indeed, I missed it, it was quite late in the night when I was doing it, thank you for pointing out and I have updated my answer with a quick fix and got the desired output and avoided the extra load on the executors.
– alexeipab
Nov 11 at 9:31
|
show 1 more comment
up vote
1
down vote
up vote
1
down vote
My solution is based on the answers found here and here
It uses the Window function. It shows how to create a nested list of food preferences for a given userid
based on the food score. Here we are creating a struct
of FoodDetails
from the columns we have
val foodModifiedDf = foodDf.withColumn("FoodDetails",
struct("foodName","isFavFood", "cuisine","score"))
.drop("foodName","isFavFood", "cuisine","score")
println("Just printing the food detials here")
foodModifiedDf.show(10, truncate = false)
Here we are creating a Windowing function which will accumulate the list for a userId
based on the FoodDetails.score
in descending order. The windowing function when applied goes on accumulating the list as it encounters new rows with same userId
. After we have done accumulating, we have to do a groupBy
over the userId
to select the largest list.
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("userId").orderBy( desc("FoodDetails.score"))
val userAndFood = detailsDf.join(foodModifiedDf, "userId")
val newUF = userAndFood.select($"*", collect_list("FoodDetails").over(window) as "FDNew")
println(" UserAndFood dataframe after windowing function applied")
newUF.show(10, truncate = false)
val resultUF = newUF.groupBy("userId")
.agg(max("FDNew"))
println("Final result after select the maximum length list")
resultUF.show(10, truncate = false)
This is how the result looks like finally :
+------+-----------------------------------------------------------------------------------------+
|userId|max(FDNew) |
+------+-----------------------------------------------------------------------------------------+
|123 |[[food3, true, American, 3], [food2, false, Italian, 2], [food1, true, Mediterranean, 1]]|
+------+-----------------------------------------------------------------------------------------+
Given this dataframe, it should be easier to write out the nested json.
My solution is based on the answers found here and here
It uses the Window function. It shows how to create a nested list of food preferences for a given userid
based on the food score. Here we are creating a struct
of FoodDetails
from the columns we have
val foodModifiedDf = foodDf.withColumn("FoodDetails",
struct("foodName","isFavFood", "cuisine","score"))
.drop("foodName","isFavFood", "cuisine","score")
println("Just printing the food detials here")
foodModifiedDf.show(10, truncate = false)
Here we are creating a Windowing function which will accumulate the list for a userId
based on the FoodDetails.score
in descending order. The windowing function when applied goes on accumulating the list as it encounters new rows with same userId
. After we have done accumulating, we have to do a groupBy
over the userId
to select the largest list.
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("userId").orderBy( desc("FoodDetails.score"))
val userAndFood = detailsDf.join(foodModifiedDf, "userId")
val newUF = userAndFood.select($"*", collect_list("FoodDetails").over(window) as "FDNew")
println(" UserAndFood dataframe after windowing function applied")
newUF.show(10, truncate = false)
val resultUF = newUF.groupBy("userId")
.agg(max("FDNew"))
println("Final result after select the maximum length list")
resultUF.show(10, truncate = false)
This is how the result looks like finally :
+------+-----------------------------------------------------------------------------------------+
|userId|max(FDNew) |
+------+-----------------------------------------------------------------------------------------+
|123 |[[food3, true, American, 3], [food2, false, Italian, 2], [food1, true, Mediterranean, 1]]|
+------+-----------------------------------------------------------------------------------------+
Given this dataframe, it should be easier to write out the nested json.
edited Nov 10 at 19:20
answered Nov 10 at 19:02
user238607
680711
680711
In this solution how will you carry over "firstName","address" columns from the detailsDf into resultDF ?
– alexeipab
Nov 10 at 20:19
In the aggregation, you can usenewUF.groupBy("userId").agg(max("FDNew"), first(firstName), first(address))
– user238607
Nov 11 at 8:00
So basically groupBy after join is used to dedupe records after join had caused it. And more joins will add extra duplication and extra work to the executors.
– alexeipab
Nov 11 at 8:33
Read the question properly. We need the food preferences to be sorted by thescore
. That's what the Window function does here. You can run the code and see the results yourself.
– user238607
Nov 11 at 9:15
Indeed, I missed it, it was quite late in the night when I was doing it, thank you for pointing out and I have updated my answer with a quick fix and got the desired output and avoided the extra load on the executors.
– alexeipab
Nov 11 at 9:31
|
show 1 more comment
In this solution how will you carry over "firstName","address" columns from the detailsDf into resultDF ?
– alexeipab
Nov 10 at 20:19
In the aggregation, you can usenewUF.groupBy("userId").agg(max("FDNew"), first(firstName), first(address))
– user238607
Nov 11 at 8:00
So basically groupBy after join is used to dedupe records after join had caused it. And more joins will add extra duplication and extra work to the executors.
– alexeipab
Nov 11 at 8:33
Read the question properly. We need the food preferences to be sorted by thescore
. That's what the Window function does here. You can run the code and see the results yourself.
– user238607
Nov 11 at 9:15
Indeed, I missed it, it was quite late in the night when I was doing it, thank you for pointing out and I have updated my answer with a quick fix and got the desired output and avoided the extra load on the executors.
– alexeipab
Nov 11 at 9:31
In this solution how will you carry over "firstName","address" columns from the detailsDf into resultDF ?
– alexeipab
Nov 10 at 20:19
In this solution how will you carry over "firstName","address" columns from the detailsDf into resultDF ?
– alexeipab
Nov 10 at 20:19
In the aggregation, you can use
newUF.groupBy("userId").agg(max("FDNew"), first(firstName), first(address))
– user238607
Nov 11 at 8:00
In the aggregation, you can use
newUF.groupBy("userId").agg(max("FDNew"), first(firstName), first(address))
– user238607
Nov 11 at 8:00
So basically groupBy after join is used to dedupe records after join had caused it. And more joins will add extra duplication and extra work to the executors.
– alexeipab
Nov 11 at 8:33
So basically groupBy after join is used to dedupe records after join had caused it. And more joins will add extra duplication and extra work to the executors.
– alexeipab
Nov 11 at 8:33
Read the question properly. We need the food preferences to be sorted by the
score
. That's what the Window function does here. You can run the code and see the results yourself.– user238607
Nov 11 at 9:15
Read the question properly. We need the food preferences to be sorted by the
score
. That's what the Window function does here. You can run the code and see the results yourself.– user238607
Nov 11 at 9:15
Indeed, I missed it, it was quite late in the night when I was doing it, thank you for pointing out and I have updated my answer with a quick fix and got the desired output and avoided the extra load on the executors.
– alexeipab
Nov 11 at 9:31
Indeed, I missed it, it was quite late in the night when I was doing it, thank you for pointing out and I have updated my answer with a quick fix and got the desired output and avoided the extra load on the executors.
– alexeipab
Nov 11 at 9:31
|
show 1 more comment
up vote
0
down vote
The main problem of joining before grouping and collecting lists is the fact that join will produce a lot of records for group by to collapse, in your example it is 12 records after join and before groupby, also you would need to worry about picking "firstName","address" out detailsDf out of 12 duplicates. To avoid both problems your could pre-process the food, email and game dataframes using struct and groupBy and join them to the detailsDf with no risk of exploding your data due to multiple records with the same userId in the joined tables.
val detailsDf = Seq((123,"first123","xyz"))
.toDF("userId","firstName","address")
val emailDf = Seq((123,"abc@gmail.com"),
(123,"def@gmail.com"))
.toDF("userId","email")
val foodDf = Seq((123,"food2",false,"Italian",2),
(123,"food3",true,"American",3),
(123,"food1",true,"Mediterranean",1))
.toDF("userId","foodName","isFavFood","cuisine","score")
val gameDf = Seq((123,"chess",false,2),
(123,"football",true,1))
.toDF("userId","gameName","isOutdoor","score")
val emailGrp = emailDf.groupBy("userId").agg(collect_list("email").as("UserEmail"))
val foodGrp = foodDf
.select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites"))
.groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))
val gameGrp = gameDf
.select($"userId", struct("gameName","isOutdoor","score").as("UserGameFavourites"))
.groupBy("userId").agg(collect_list("UserGameFavourites").as("UserGameFavourites"))
val result = detailsDf.join(emailGrp, Seq("userId"))
.join(foodGrp, Seq("userId"))
.join(gameGrp, Seq("userId"))
result.show(100, false)
Output:
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|userId|firstName|address|UserEmail |UserFoodFavourites |UserGameFavourites |
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|123 |first123 |xyz |[abc@gmail.com, def@gmail.com]|[[1, food1, true, Mediterranean], [2, food2, false, Italian], [3, food3, true, American]]|[[chess, false, 2], [football, true, 1]]|
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
As all groupBy are done on userId and joins as well, spark will optimise it quite well.
UPDATE 1: After @user238607 pointed out that I have missed the original requirement of food preferences being sorted by score, did a quick fix and placed the score column as first element of structure UserFoodFavourites and used sort_array function to arrange data in desired order without forcing extra shuffle operation. Updated the code and its output.
add a comment |
up vote
0
down vote
The main problem of joining before grouping and collecting lists is the fact that join will produce a lot of records for group by to collapse, in your example it is 12 records after join and before groupby, also you would need to worry about picking "firstName","address" out detailsDf out of 12 duplicates. To avoid both problems your could pre-process the food, email and game dataframes using struct and groupBy and join them to the detailsDf with no risk of exploding your data due to multiple records with the same userId in the joined tables.
val detailsDf = Seq((123,"first123","xyz"))
.toDF("userId","firstName","address")
val emailDf = Seq((123,"abc@gmail.com"),
(123,"def@gmail.com"))
.toDF("userId","email")
val foodDf = Seq((123,"food2",false,"Italian",2),
(123,"food3",true,"American",3),
(123,"food1",true,"Mediterranean",1))
.toDF("userId","foodName","isFavFood","cuisine","score")
val gameDf = Seq((123,"chess",false,2),
(123,"football",true,1))
.toDF("userId","gameName","isOutdoor","score")
val emailGrp = emailDf.groupBy("userId").agg(collect_list("email").as("UserEmail"))
val foodGrp = foodDf
.select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites"))
.groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))
val gameGrp = gameDf
.select($"userId", struct("gameName","isOutdoor","score").as("UserGameFavourites"))
.groupBy("userId").agg(collect_list("UserGameFavourites").as("UserGameFavourites"))
val result = detailsDf.join(emailGrp, Seq("userId"))
.join(foodGrp, Seq("userId"))
.join(gameGrp, Seq("userId"))
result.show(100, false)
Output:
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|userId|firstName|address|UserEmail |UserFoodFavourites |UserGameFavourites |
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|123 |first123 |xyz |[abc@gmail.com, def@gmail.com]|[[1, food1, true, Mediterranean], [2, food2, false, Italian], [3, food3, true, American]]|[[chess, false, 2], [football, true, 1]]|
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
As all groupBy are done on userId and joins as well, spark will optimise it quite well.
UPDATE 1: After @user238607 pointed out that I have missed the original requirement of food preferences being sorted by score, did a quick fix and placed the score column as first element of structure UserFoodFavourites and used sort_array function to arrange data in desired order without forcing extra shuffle operation. Updated the code and its output.
add a comment |
up vote
0
down vote
up vote
0
down vote
The main problem of joining before grouping and collecting lists is the fact that join will produce a lot of records for group by to collapse, in your example it is 12 records after join and before groupby, also you would need to worry about picking "firstName","address" out detailsDf out of 12 duplicates. To avoid both problems your could pre-process the food, email and game dataframes using struct and groupBy and join them to the detailsDf with no risk of exploding your data due to multiple records with the same userId in the joined tables.
val detailsDf = Seq((123,"first123","xyz"))
.toDF("userId","firstName","address")
val emailDf = Seq((123,"abc@gmail.com"),
(123,"def@gmail.com"))
.toDF("userId","email")
val foodDf = Seq((123,"food2",false,"Italian",2),
(123,"food3",true,"American",3),
(123,"food1",true,"Mediterranean",1))
.toDF("userId","foodName","isFavFood","cuisine","score")
val gameDf = Seq((123,"chess",false,2),
(123,"football",true,1))
.toDF("userId","gameName","isOutdoor","score")
val emailGrp = emailDf.groupBy("userId").agg(collect_list("email").as("UserEmail"))
val foodGrp = foodDf
.select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites"))
.groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))
val gameGrp = gameDf
.select($"userId", struct("gameName","isOutdoor","score").as("UserGameFavourites"))
.groupBy("userId").agg(collect_list("UserGameFavourites").as("UserGameFavourites"))
val result = detailsDf.join(emailGrp, Seq("userId"))
.join(foodGrp, Seq("userId"))
.join(gameGrp, Seq("userId"))
result.show(100, false)
Output:
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|userId|firstName|address|UserEmail |UserFoodFavourites |UserGameFavourites |
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|123 |first123 |xyz |[abc@gmail.com, def@gmail.com]|[[1, food1, true, Mediterranean], [2, food2, false, Italian], [3, food3, true, American]]|[[chess, false, 2], [football, true, 1]]|
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
As all groupBy are done on userId and joins as well, spark will optimise it quite well.
UPDATE 1: After @user238607 pointed out that I have missed the original requirement of food preferences being sorted by score, did a quick fix and placed the score column as first element of structure UserFoodFavourites and used sort_array function to arrange data in desired order without forcing extra shuffle operation. Updated the code and its output.
The main problem of joining before grouping and collecting lists is the fact that join will produce a lot of records for group by to collapse, in your example it is 12 records after join and before groupby, also you would need to worry about picking "firstName","address" out detailsDf out of 12 duplicates. To avoid both problems your could pre-process the food, email and game dataframes using struct and groupBy and join them to the detailsDf with no risk of exploding your data due to multiple records with the same userId in the joined tables.
val detailsDf = Seq((123,"first123","xyz"))
.toDF("userId","firstName","address")
val emailDf = Seq((123,"abc@gmail.com"),
(123,"def@gmail.com"))
.toDF("userId","email")
val foodDf = Seq((123,"food2",false,"Italian",2),
(123,"food3",true,"American",3),
(123,"food1",true,"Mediterranean",1))
.toDF("userId","foodName","isFavFood","cuisine","score")
val gameDf = Seq((123,"chess",false,2),
(123,"football",true,1))
.toDF("userId","gameName","isOutdoor","score")
val emailGrp = emailDf.groupBy("userId").agg(collect_list("email").as("UserEmail"))
val foodGrp = foodDf
.select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites"))
.groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))
val gameGrp = gameDf
.select($"userId", struct("gameName","isOutdoor","score").as("UserGameFavourites"))
.groupBy("userId").agg(collect_list("UserGameFavourites").as("UserGameFavourites"))
val result = detailsDf.join(emailGrp, Seq("userId"))
.join(foodGrp, Seq("userId"))
.join(gameGrp, Seq("userId"))
result.show(100, false)
Output:
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|userId|firstName|address|UserEmail |UserFoodFavourites |UserGameFavourites |
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|123 |first123 |xyz |[abc@gmail.com, def@gmail.com]|[[1, food1, true, Mediterranean], [2, food2, false, Italian], [3, food3, true, American]]|[[chess, false, 2], [football, true, 1]]|
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
As all groupBy are done on userId and joins as well, spark will optimise it quite well.
UPDATE 1: After @user238607 pointed out that I have missed the original requirement of food preferences being sorted by score, did a quick fix and placed the score column as first element of structure UserFoodFavourites and used sort_array function to arrange data in desired order without forcing extra shuffle operation. Updated the code and its output.
edited Nov 11 at 9:28
answered Nov 10 at 20:18
alexeipab
3,338815
3,338815
add a comment |
add a comment |
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53200529%2fspark-dataframe-to-nested-json%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown