Spark dataframe to nested JSON









up vote
1
down vote

favorite












I have a dataframe joinDf created from joining the following four dataframes on userId:



val detailsDf = Seq((123,"first123","xyz"))
.toDF("userId","firstName","address")


val emailDf = Seq((123,"abc@gmail.com"),
(123,"def@gmail.com"))
.toDF("userId","email")


val foodDf = Seq((123,"food2",false,"Italian",2),
(123,"food3",true,"American",3),
(123,"food1",true,"Mediterranean",1))
.toDF("userId","foodName","isFavFood","cuisine","score")


val gameDf = Seq((123,"chess",false,2),
(123,"football",true,1))
.toDF("userId","gameName","isOutdoor","score")

val joinDf = detailsDf
.join(emailDf, Seq("userId"))
.join(foodDf, Seq("userId"))
.join(gameDf, Seq("userId"))


User's food and game favorites should be ordered by score in the ascending order.



I am trying to create a result from this joinDf where the JSON looks like the following:



[

"userId": "123",
"firstName": "first123",
"address": "xyz",
"UserFoodFavourites": [

"foodName": "food1",
"isFavFood": "true",
"cuisine": "Mediterranean",
,

"foodName": "food2",
"isFavFood": "false",
"cuisine": "Italian",
,

"foodName": "food3",
"isFavFood": "true",
"cuisine": "American",

]
"UserEmail": [
"abc@gmail.com",
"def@gmail.com"
]
"UserGameFavourites": [

"gameName": "football",
"isOutdoor": "true"
,

"gameName": "chess",
"isOutdoor": "false"

]

]


Should I use joinDf.groupBy().agg(collect_set())?



Any help would be appreciated.










share|improve this question



























    up vote
    1
    down vote

    favorite












    I have a dataframe joinDf created from joining the following four dataframes on userId:



    val detailsDf = Seq((123,"first123","xyz"))
    .toDF("userId","firstName","address")


    val emailDf = Seq((123,"abc@gmail.com"),
    (123,"def@gmail.com"))
    .toDF("userId","email")


    val foodDf = Seq((123,"food2",false,"Italian",2),
    (123,"food3",true,"American",3),
    (123,"food1",true,"Mediterranean",1))
    .toDF("userId","foodName","isFavFood","cuisine","score")


    val gameDf = Seq((123,"chess",false,2),
    (123,"football",true,1))
    .toDF("userId","gameName","isOutdoor","score")

    val joinDf = detailsDf
    .join(emailDf, Seq("userId"))
    .join(foodDf, Seq("userId"))
    .join(gameDf, Seq("userId"))


    User's food and game favorites should be ordered by score in the ascending order.



    I am trying to create a result from this joinDf where the JSON looks like the following:



    [

    "userId": "123",
    "firstName": "first123",
    "address": "xyz",
    "UserFoodFavourites": [

    "foodName": "food1",
    "isFavFood": "true",
    "cuisine": "Mediterranean",
    ,

    "foodName": "food2",
    "isFavFood": "false",
    "cuisine": "Italian",
    ,

    "foodName": "food3",
    "isFavFood": "true",
    "cuisine": "American",

    ]
    "UserEmail": [
    "abc@gmail.com",
    "def@gmail.com"
    ]
    "UserGameFavourites": [

    "gameName": "football",
    "isOutdoor": "true"
    ,

    "gameName": "chess",
    "isOutdoor": "false"

    ]

    ]


    Should I use joinDf.groupBy().agg(collect_set())?



    Any help would be appreciated.










    share|improve this question

























      up vote
      1
      down vote

      favorite









      up vote
      1
      down vote

      favorite











      I have a dataframe joinDf created from joining the following four dataframes on userId:



      val detailsDf = Seq((123,"first123","xyz"))
      .toDF("userId","firstName","address")


      val emailDf = Seq((123,"abc@gmail.com"),
      (123,"def@gmail.com"))
      .toDF("userId","email")


      val foodDf = Seq((123,"food2",false,"Italian",2),
      (123,"food3",true,"American",3),
      (123,"food1",true,"Mediterranean",1))
      .toDF("userId","foodName","isFavFood","cuisine","score")


      val gameDf = Seq((123,"chess",false,2),
      (123,"football",true,1))
      .toDF("userId","gameName","isOutdoor","score")

      val joinDf = detailsDf
      .join(emailDf, Seq("userId"))
      .join(foodDf, Seq("userId"))
      .join(gameDf, Seq("userId"))


      User's food and game favorites should be ordered by score in the ascending order.



      I am trying to create a result from this joinDf where the JSON looks like the following:



      [

      "userId": "123",
      "firstName": "first123",
      "address": "xyz",
      "UserFoodFavourites": [

      "foodName": "food1",
      "isFavFood": "true",
      "cuisine": "Mediterranean",
      ,

      "foodName": "food2",
      "isFavFood": "false",
      "cuisine": "Italian",
      ,

      "foodName": "food3",
      "isFavFood": "true",
      "cuisine": "American",

      ]
      "UserEmail": [
      "abc@gmail.com",
      "def@gmail.com"
      ]
      "UserGameFavourites": [

      "gameName": "football",
      "isOutdoor": "true"
      ,

      "gameName": "chess",
      "isOutdoor": "false"

      ]

      ]


      Should I use joinDf.groupBy().agg(collect_set())?



      Any help would be appreciated.










      share|improve this question















      I have a dataframe joinDf created from joining the following four dataframes on userId:



      val detailsDf = Seq((123,"first123","xyz"))
      .toDF("userId","firstName","address")


      val emailDf = Seq((123,"abc@gmail.com"),
      (123,"def@gmail.com"))
      .toDF("userId","email")


      val foodDf = Seq((123,"food2",false,"Italian",2),
      (123,"food3",true,"American",3),
      (123,"food1",true,"Mediterranean",1))
      .toDF("userId","foodName","isFavFood","cuisine","score")


      val gameDf = Seq((123,"chess",false,2),
      (123,"football",true,1))
      .toDF("userId","gameName","isOutdoor","score")

      val joinDf = detailsDf
      .join(emailDf, Seq("userId"))
      .join(foodDf, Seq("userId"))
      .join(gameDf, Seq("userId"))


      User's food and game favorites should be ordered by score in the ascending order.



      I am trying to create a result from this joinDf where the JSON looks like the following:



      [

      "userId": "123",
      "firstName": "first123",
      "address": "xyz",
      "UserFoodFavourites": [

      "foodName": "food1",
      "isFavFood": "true",
      "cuisine": "Mediterranean",
      ,

      "foodName": "food2",
      "isFavFood": "false",
      "cuisine": "Italian",
      ,

      "foodName": "food3",
      "isFavFood": "true",
      "cuisine": "American",

      ]
      "UserEmail": [
      "abc@gmail.com",
      "def@gmail.com"
      ]
      "UserGameFavourites": [

      "gameName": "football",
      "isOutdoor": "true"
      ,

      "gameName": "chess",
      "isOutdoor": "false"

      ]

      ]


      Should I use joinDf.groupBy().agg(collect_set())?



      Any help would be appreciated.







      apache-spark dataframe apache-spark-sql apache-spark-dataset






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 9 at 15:25









      Joel

      1,6086719




      1,6086719










      asked Nov 8 at 2:02









      dreddy

      104110




      104110






















          2 Answers
          2






          active

          oldest

          votes

















          up vote
          1
          down vote













          My solution is based on the answers found here and here



          It uses the Window function. It shows how to create a nested list of food preferences for a given userid based on the food score. Here we are creating a struct of FoodDetails from the columns we have



          val foodModifiedDf = foodDf.withColumn("FoodDetails",
          struct("foodName","isFavFood", "cuisine","score"))
          .drop("foodName","isFavFood", "cuisine","score")

          println("Just printing the food detials here")
          foodModifiedDf.show(10, truncate = false)


          Here we are creating a Windowing function which will accumulate the list for a userId based on the FoodDetails.score in descending order. The windowing function when applied goes on accumulating the list as it encounters new rows with same userId. After we have done accumulating, we have to do a groupBy over the userId to select the largest list.



          import org.apache.spark.sql.expressions.Window


          val window = Window.partitionBy("userId").orderBy( desc("FoodDetails.score"))

          val userAndFood = detailsDf.join(foodModifiedDf, "userId")

          val newUF = userAndFood.select($"*", collect_list("FoodDetails").over(window) as "FDNew")

          println(" UserAndFood dataframe after windowing function applied")
          newUF.show(10, truncate = false)

          val resultUF = newUF.groupBy("userId")
          .agg(max("FDNew"))

          println("Final result after select the maximum length list")
          resultUF.show(10, truncate = false)


          This is how the result looks like finally :



          +------+-----------------------------------------------------------------------------------------+
          |userId|max(FDNew) |
          +------+-----------------------------------------------------------------------------------------+
          |123 |[[food3, true, American, 3], [food2, false, Italian, 2], [food1, true, Mediterranean, 1]]|
          +------+-----------------------------------------------------------------------------------------+


          Given this dataframe, it should be easier to write out the nested json.






          share|improve this answer






















          • In this solution how will you carry over "firstName","address" columns from the detailsDf into resultDF ?
            – alexeipab
            Nov 10 at 20:19











          • In the aggregation, you can use newUF.groupBy("userId").agg(max("FDNew"), first(firstName), first(address))
            – user238607
            Nov 11 at 8:00










          • So basically groupBy after join is used to dedupe records after join had caused it. And more joins will add extra duplication and extra work to the executors.
            – alexeipab
            Nov 11 at 8:33










          • Read the question properly. We need the food preferences to be sorted by the score. That's what the Window function does here. You can run the code and see the results yourself.
            – user238607
            Nov 11 at 9:15










          • Indeed, I missed it, it was quite late in the night when I was doing it, thank you for pointing out and I have updated my answer with a quick fix and got the desired output and avoided the extra load on the executors.
            – alexeipab
            Nov 11 at 9:31

















          up vote
          0
          down vote













          The main problem of joining before grouping and collecting lists is the fact that join will produce a lot of records for group by to collapse, in your example it is 12 records after join and before groupby, also you would need to worry about picking "firstName","address" out detailsDf out of 12 duplicates. To avoid both problems your could pre-process the food, email and game dataframes using struct and groupBy and join them to the detailsDf with no risk of exploding your data due to multiple records with the same userId in the joined tables.



          val detailsDf = Seq((123,"first123","xyz"))
          .toDF("userId","firstName","address")


          val emailDf = Seq((123,"abc@gmail.com"),
          (123,"def@gmail.com"))
          .toDF("userId","email")


          val foodDf = Seq((123,"food2",false,"Italian",2),
          (123,"food3",true,"American",3),
          (123,"food1",true,"Mediterranean",1))
          .toDF("userId","foodName","isFavFood","cuisine","score")


          val gameDf = Seq((123,"chess",false,2),
          (123,"football",true,1))
          .toDF("userId","gameName","isOutdoor","score")

          val emailGrp = emailDf.groupBy("userId").agg(collect_list("email").as("UserEmail"))

          val foodGrp = foodDf
          .select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites"))
          .groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))

          val gameGrp = gameDf
          .select($"userId", struct("gameName","isOutdoor","score").as("UserGameFavourites"))
          .groupBy("userId").agg(collect_list("UserGameFavourites").as("UserGameFavourites"))

          val result = detailsDf.join(emailGrp, Seq("userId"))
          .join(foodGrp, Seq("userId"))
          .join(gameGrp, Seq("userId"))

          result.show(100, false)


          Output:



          +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
          |userId|firstName|address|UserEmail |UserFoodFavourites |UserGameFavourites |
          +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
          |123 |first123 |xyz |[abc@gmail.com, def@gmail.com]|[[1, food1, true, Mediterranean], [2, food2, false, Italian], [3, food3, true, American]]|[[chess, false, 2], [football, true, 1]]|
          +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+


          As all groupBy are done on userId and joins as well, spark will optimise it quite well.



          UPDATE 1: After @user238607 pointed out that I have missed the original requirement of food preferences being sorted by score, did a quick fix and placed the score column as first element of structure UserFoodFavourites and used sort_array function to arrange data in desired order without forcing extra shuffle operation. Updated the code and its output.






          share|improve this answer






















            Your Answer






            StackExchange.ifUsing("editor", function ()
            StackExchange.using("externalEditor", function ()
            StackExchange.using("snippets", function ()
            StackExchange.snippets.init();
            );
            );
            , "code-snippets");

            StackExchange.ready(function()
            var channelOptions =
            tags: "".split(" "),
            id: "1"
            ;
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function()
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled)
            StackExchange.using("snippets", function()
            createEditor();
            );

            else
            createEditor();

            );

            function createEditor()
            StackExchange.prepareEditor(
            heartbeatType: 'answer',
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader:
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            ,
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            );



            );













             

            draft saved


            draft discarded


















            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53200529%2fspark-dataframe-to-nested-json%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown

























            2 Answers
            2






            active

            oldest

            votes








            2 Answers
            2






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes








            up vote
            1
            down vote













            My solution is based on the answers found here and here



            It uses the Window function. It shows how to create a nested list of food preferences for a given userid based on the food score. Here we are creating a struct of FoodDetails from the columns we have



            val foodModifiedDf = foodDf.withColumn("FoodDetails",
            struct("foodName","isFavFood", "cuisine","score"))
            .drop("foodName","isFavFood", "cuisine","score")

            println("Just printing the food detials here")
            foodModifiedDf.show(10, truncate = false)


            Here we are creating a Windowing function which will accumulate the list for a userId based on the FoodDetails.score in descending order. The windowing function when applied goes on accumulating the list as it encounters new rows with same userId. After we have done accumulating, we have to do a groupBy over the userId to select the largest list.



            import org.apache.spark.sql.expressions.Window


            val window = Window.partitionBy("userId").orderBy( desc("FoodDetails.score"))

            val userAndFood = detailsDf.join(foodModifiedDf, "userId")

            val newUF = userAndFood.select($"*", collect_list("FoodDetails").over(window) as "FDNew")

            println(" UserAndFood dataframe after windowing function applied")
            newUF.show(10, truncate = false)

            val resultUF = newUF.groupBy("userId")
            .agg(max("FDNew"))

            println("Final result after select the maximum length list")
            resultUF.show(10, truncate = false)


            This is how the result looks like finally :



            +------+-----------------------------------------------------------------------------------------+
            |userId|max(FDNew) |
            +------+-----------------------------------------------------------------------------------------+
            |123 |[[food3, true, American, 3], [food2, false, Italian, 2], [food1, true, Mediterranean, 1]]|
            +------+-----------------------------------------------------------------------------------------+


            Given this dataframe, it should be easier to write out the nested json.






            share|improve this answer






















            • In this solution how will you carry over "firstName","address" columns from the detailsDf into resultDF ?
              – alexeipab
              Nov 10 at 20:19











            • In the aggregation, you can use newUF.groupBy("userId").agg(max("FDNew"), first(firstName), first(address))
              – user238607
              Nov 11 at 8:00










            • So basically groupBy after join is used to dedupe records after join had caused it. And more joins will add extra duplication and extra work to the executors.
              – alexeipab
              Nov 11 at 8:33










            • Read the question properly. We need the food preferences to be sorted by the score. That's what the Window function does here. You can run the code and see the results yourself.
              – user238607
              Nov 11 at 9:15










            • Indeed, I missed it, it was quite late in the night when I was doing it, thank you for pointing out and I have updated my answer with a quick fix and got the desired output and avoided the extra load on the executors.
              – alexeipab
              Nov 11 at 9:31














            up vote
            1
            down vote













            My solution is based on the answers found here and here



            It uses the Window function. It shows how to create a nested list of food preferences for a given userid based on the food score. Here we are creating a struct of FoodDetails from the columns we have



            val foodModifiedDf = foodDf.withColumn("FoodDetails",
            struct("foodName","isFavFood", "cuisine","score"))
            .drop("foodName","isFavFood", "cuisine","score")

            println("Just printing the food detials here")
            foodModifiedDf.show(10, truncate = false)


            Here we are creating a Windowing function which will accumulate the list for a userId based on the FoodDetails.score in descending order. The windowing function when applied goes on accumulating the list as it encounters new rows with same userId. After we have done accumulating, we have to do a groupBy over the userId to select the largest list.



            import org.apache.spark.sql.expressions.Window


            val window = Window.partitionBy("userId").orderBy( desc("FoodDetails.score"))

            val userAndFood = detailsDf.join(foodModifiedDf, "userId")

            val newUF = userAndFood.select($"*", collect_list("FoodDetails").over(window) as "FDNew")

            println(" UserAndFood dataframe after windowing function applied")
            newUF.show(10, truncate = false)

            val resultUF = newUF.groupBy("userId")
            .agg(max("FDNew"))

            println("Final result after select the maximum length list")
            resultUF.show(10, truncate = false)


            This is how the result looks like finally :



            +------+-----------------------------------------------------------------------------------------+
            |userId|max(FDNew) |
            +------+-----------------------------------------------------------------------------------------+
            |123 |[[food3, true, American, 3], [food2, false, Italian, 2], [food1, true, Mediterranean, 1]]|
            +------+-----------------------------------------------------------------------------------------+


            Given this dataframe, it should be easier to write out the nested json.






            share|improve this answer






















            • In this solution how will you carry over "firstName","address" columns from the detailsDf into resultDF ?
              – alexeipab
              Nov 10 at 20:19











            • In the aggregation, you can use newUF.groupBy("userId").agg(max("FDNew"), first(firstName), first(address))
              – user238607
              Nov 11 at 8:00










            • So basically groupBy after join is used to dedupe records after join had caused it. And more joins will add extra duplication and extra work to the executors.
              – alexeipab
              Nov 11 at 8:33










            • Read the question properly. We need the food preferences to be sorted by the score. That's what the Window function does here. You can run the code and see the results yourself.
              – user238607
              Nov 11 at 9:15










            • Indeed, I missed it, it was quite late in the night when I was doing it, thank you for pointing out and I have updated my answer with a quick fix and got the desired output and avoided the extra load on the executors.
              – alexeipab
              Nov 11 at 9:31












            up vote
            1
            down vote










            up vote
            1
            down vote









            My solution is based on the answers found here and here



            It uses the Window function. It shows how to create a nested list of food preferences for a given userid based on the food score. Here we are creating a struct of FoodDetails from the columns we have



            val foodModifiedDf = foodDf.withColumn("FoodDetails",
            struct("foodName","isFavFood", "cuisine","score"))
            .drop("foodName","isFavFood", "cuisine","score")

            println("Just printing the food detials here")
            foodModifiedDf.show(10, truncate = false)


            Here we are creating a Windowing function which will accumulate the list for a userId based on the FoodDetails.score in descending order. The windowing function when applied goes on accumulating the list as it encounters new rows with same userId. After we have done accumulating, we have to do a groupBy over the userId to select the largest list.



            import org.apache.spark.sql.expressions.Window


            val window = Window.partitionBy("userId").orderBy( desc("FoodDetails.score"))

            val userAndFood = detailsDf.join(foodModifiedDf, "userId")

            val newUF = userAndFood.select($"*", collect_list("FoodDetails").over(window) as "FDNew")

            println(" UserAndFood dataframe after windowing function applied")
            newUF.show(10, truncate = false)

            val resultUF = newUF.groupBy("userId")
            .agg(max("FDNew"))

            println("Final result after select the maximum length list")
            resultUF.show(10, truncate = false)


            This is how the result looks like finally :



            +------+-----------------------------------------------------------------------------------------+
            |userId|max(FDNew) |
            +------+-----------------------------------------------------------------------------------------+
            |123 |[[food3, true, American, 3], [food2, false, Italian, 2], [food1, true, Mediterranean, 1]]|
            +------+-----------------------------------------------------------------------------------------+


            Given this dataframe, it should be easier to write out the nested json.






            share|improve this answer














            My solution is based on the answers found here and here



            It uses the Window function. It shows how to create a nested list of food preferences for a given userid based on the food score. Here we are creating a struct of FoodDetails from the columns we have



            val foodModifiedDf = foodDf.withColumn("FoodDetails",
            struct("foodName","isFavFood", "cuisine","score"))
            .drop("foodName","isFavFood", "cuisine","score")

            println("Just printing the food detials here")
            foodModifiedDf.show(10, truncate = false)


            Here we are creating a Windowing function which will accumulate the list for a userId based on the FoodDetails.score in descending order. The windowing function when applied goes on accumulating the list as it encounters new rows with same userId. After we have done accumulating, we have to do a groupBy over the userId to select the largest list.



            import org.apache.spark.sql.expressions.Window


            val window = Window.partitionBy("userId").orderBy( desc("FoodDetails.score"))

            val userAndFood = detailsDf.join(foodModifiedDf, "userId")

            val newUF = userAndFood.select($"*", collect_list("FoodDetails").over(window) as "FDNew")

            println(" UserAndFood dataframe after windowing function applied")
            newUF.show(10, truncate = false)

            val resultUF = newUF.groupBy("userId")
            .agg(max("FDNew"))

            println("Final result after select the maximum length list")
            resultUF.show(10, truncate = false)


            This is how the result looks like finally :



            +------+-----------------------------------------------------------------------------------------+
            |userId|max(FDNew) |
            +------+-----------------------------------------------------------------------------------------+
            |123 |[[food3, true, American, 3], [food2, false, Italian, 2], [food1, true, Mediterranean, 1]]|
            +------+-----------------------------------------------------------------------------------------+


            Given this dataframe, it should be easier to write out the nested json.







            share|improve this answer














            share|improve this answer



            share|improve this answer








            edited Nov 10 at 19:20

























            answered Nov 10 at 19:02









            user238607

            680711




            680711











            • In this solution how will you carry over "firstName","address" columns from the detailsDf into resultDF ?
              – alexeipab
              Nov 10 at 20:19











            • In the aggregation, you can use newUF.groupBy("userId").agg(max("FDNew"), first(firstName), first(address))
              – user238607
              Nov 11 at 8:00










            • So basically groupBy after join is used to dedupe records after join had caused it. And more joins will add extra duplication and extra work to the executors.
              – alexeipab
              Nov 11 at 8:33










            • Read the question properly. We need the food preferences to be sorted by the score. That's what the Window function does here. You can run the code and see the results yourself.
              – user238607
              Nov 11 at 9:15










            • Indeed, I missed it, it was quite late in the night when I was doing it, thank you for pointing out and I have updated my answer with a quick fix and got the desired output and avoided the extra load on the executors.
              – alexeipab
              Nov 11 at 9:31
















            • In this solution how will you carry over "firstName","address" columns from the detailsDf into resultDF ?
              – alexeipab
              Nov 10 at 20:19











            • In the aggregation, you can use newUF.groupBy("userId").agg(max("FDNew"), first(firstName), first(address))
              – user238607
              Nov 11 at 8:00










            • So basically groupBy after join is used to dedupe records after join had caused it. And more joins will add extra duplication and extra work to the executors.
              – alexeipab
              Nov 11 at 8:33










            • Read the question properly. We need the food preferences to be sorted by the score. That's what the Window function does here. You can run the code and see the results yourself.
              – user238607
              Nov 11 at 9:15










            • Indeed, I missed it, it was quite late in the night when I was doing it, thank you for pointing out and I have updated my answer with a quick fix and got the desired output and avoided the extra load on the executors.
              – alexeipab
              Nov 11 at 9:31















            In this solution how will you carry over "firstName","address" columns from the detailsDf into resultDF ?
            – alexeipab
            Nov 10 at 20:19





            In this solution how will you carry over "firstName","address" columns from the detailsDf into resultDF ?
            – alexeipab
            Nov 10 at 20:19













            In the aggregation, you can use newUF.groupBy("userId").agg(max("FDNew"), first(firstName), first(address))
            – user238607
            Nov 11 at 8:00




            In the aggregation, you can use newUF.groupBy("userId").agg(max("FDNew"), first(firstName), first(address))
            – user238607
            Nov 11 at 8:00












            So basically groupBy after join is used to dedupe records after join had caused it. And more joins will add extra duplication and extra work to the executors.
            – alexeipab
            Nov 11 at 8:33




            So basically groupBy after join is used to dedupe records after join had caused it. And more joins will add extra duplication and extra work to the executors.
            – alexeipab
            Nov 11 at 8:33












            Read the question properly. We need the food preferences to be sorted by the score. That's what the Window function does here. You can run the code and see the results yourself.
            – user238607
            Nov 11 at 9:15




            Read the question properly. We need the food preferences to be sorted by the score. That's what the Window function does here. You can run the code and see the results yourself.
            – user238607
            Nov 11 at 9:15












            Indeed, I missed it, it was quite late in the night when I was doing it, thank you for pointing out and I have updated my answer with a quick fix and got the desired output and avoided the extra load on the executors.
            – alexeipab
            Nov 11 at 9:31




            Indeed, I missed it, it was quite late in the night when I was doing it, thank you for pointing out and I have updated my answer with a quick fix and got the desired output and avoided the extra load on the executors.
            – alexeipab
            Nov 11 at 9:31












            up vote
            0
            down vote













            The main problem of joining before grouping and collecting lists is the fact that join will produce a lot of records for group by to collapse, in your example it is 12 records after join and before groupby, also you would need to worry about picking "firstName","address" out detailsDf out of 12 duplicates. To avoid both problems your could pre-process the food, email and game dataframes using struct and groupBy and join them to the detailsDf with no risk of exploding your data due to multiple records with the same userId in the joined tables.



            val detailsDf = Seq((123,"first123","xyz"))
            .toDF("userId","firstName","address")


            val emailDf = Seq((123,"abc@gmail.com"),
            (123,"def@gmail.com"))
            .toDF("userId","email")


            val foodDf = Seq((123,"food2",false,"Italian",2),
            (123,"food3",true,"American",3),
            (123,"food1",true,"Mediterranean",1))
            .toDF("userId","foodName","isFavFood","cuisine","score")


            val gameDf = Seq((123,"chess",false,2),
            (123,"football",true,1))
            .toDF("userId","gameName","isOutdoor","score")

            val emailGrp = emailDf.groupBy("userId").agg(collect_list("email").as("UserEmail"))

            val foodGrp = foodDf
            .select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites"))
            .groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))

            val gameGrp = gameDf
            .select($"userId", struct("gameName","isOutdoor","score").as("UserGameFavourites"))
            .groupBy("userId").agg(collect_list("UserGameFavourites").as("UserGameFavourites"))

            val result = detailsDf.join(emailGrp, Seq("userId"))
            .join(foodGrp, Seq("userId"))
            .join(gameGrp, Seq("userId"))

            result.show(100, false)


            Output:



            +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
            |userId|firstName|address|UserEmail |UserFoodFavourites |UserGameFavourites |
            +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
            |123 |first123 |xyz |[abc@gmail.com, def@gmail.com]|[[1, food1, true, Mediterranean], [2, food2, false, Italian], [3, food3, true, American]]|[[chess, false, 2], [football, true, 1]]|
            +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+


            As all groupBy are done on userId and joins as well, spark will optimise it quite well.



            UPDATE 1: After @user238607 pointed out that I have missed the original requirement of food preferences being sorted by score, did a quick fix and placed the score column as first element of structure UserFoodFavourites and used sort_array function to arrange data in desired order without forcing extra shuffle operation. Updated the code and its output.






            share|improve this answer


























              up vote
              0
              down vote













              The main problem of joining before grouping and collecting lists is the fact that join will produce a lot of records for group by to collapse, in your example it is 12 records after join and before groupby, also you would need to worry about picking "firstName","address" out detailsDf out of 12 duplicates. To avoid both problems your could pre-process the food, email and game dataframes using struct and groupBy and join them to the detailsDf with no risk of exploding your data due to multiple records with the same userId in the joined tables.



              val detailsDf = Seq((123,"first123","xyz"))
              .toDF("userId","firstName","address")


              val emailDf = Seq((123,"abc@gmail.com"),
              (123,"def@gmail.com"))
              .toDF("userId","email")


              val foodDf = Seq((123,"food2",false,"Italian",2),
              (123,"food3",true,"American",3),
              (123,"food1",true,"Mediterranean",1))
              .toDF("userId","foodName","isFavFood","cuisine","score")


              val gameDf = Seq((123,"chess",false,2),
              (123,"football",true,1))
              .toDF("userId","gameName","isOutdoor","score")

              val emailGrp = emailDf.groupBy("userId").agg(collect_list("email").as("UserEmail"))

              val foodGrp = foodDf
              .select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites"))
              .groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))

              val gameGrp = gameDf
              .select($"userId", struct("gameName","isOutdoor","score").as("UserGameFavourites"))
              .groupBy("userId").agg(collect_list("UserGameFavourites").as("UserGameFavourites"))

              val result = detailsDf.join(emailGrp, Seq("userId"))
              .join(foodGrp, Seq("userId"))
              .join(gameGrp, Seq("userId"))

              result.show(100, false)


              Output:



              +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
              |userId|firstName|address|UserEmail |UserFoodFavourites |UserGameFavourites |
              +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
              |123 |first123 |xyz |[abc@gmail.com, def@gmail.com]|[[1, food1, true, Mediterranean], [2, food2, false, Italian], [3, food3, true, American]]|[[chess, false, 2], [football, true, 1]]|
              +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+


              As all groupBy are done on userId and joins as well, spark will optimise it quite well.



              UPDATE 1: After @user238607 pointed out that I have missed the original requirement of food preferences being sorted by score, did a quick fix and placed the score column as first element of structure UserFoodFavourites and used sort_array function to arrange data in desired order without forcing extra shuffle operation. Updated the code and its output.






              share|improve this answer
























                up vote
                0
                down vote










                up vote
                0
                down vote









                The main problem of joining before grouping and collecting lists is the fact that join will produce a lot of records for group by to collapse, in your example it is 12 records after join and before groupby, also you would need to worry about picking "firstName","address" out detailsDf out of 12 duplicates. To avoid both problems your could pre-process the food, email and game dataframes using struct and groupBy and join them to the detailsDf with no risk of exploding your data due to multiple records with the same userId in the joined tables.



                val detailsDf = Seq((123,"first123","xyz"))
                .toDF("userId","firstName","address")


                val emailDf = Seq((123,"abc@gmail.com"),
                (123,"def@gmail.com"))
                .toDF("userId","email")


                val foodDf = Seq((123,"food2",false,"Italian",2),
                (123,"food3",true,"American",3),
                (123,"food1",true,"Mediterranean",1))
                .toDF("userId","foodName","isFavFood","cuisine","score")


                val gameDf = Seq((123,"chess",false,2),
                (123,"football",true,1))
                .toDF("userId","gameName","isOutdoor","score")

                val emailGrp = emailDf.groupBy("userId").agg(collect_list("email").as("UserEmail"))

                val foodGrp = foodDf
                .select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites"))
                .groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))

                val gameGrp = gameDf
                .select($"userId", struct("gameName","isOutdoor","score").as("UserGameFavourites"))
                .groupBy("userId").agg(collect_list("UserGameFavourites").as("UserGameFavourites"))

                val result = detailsDf.join(emailGrp, Seq("userId"))
                .join(foodGrp, Seq("userId"))
                .join(gameGrp, Seq("userId"))

                result.show(100, false)


                Output:



                +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
                |userId|firstName|address|UserEmail |UserFoodFavourites |UserGameFavourites |
                +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
                |123 |first123 |xyz |[abc@gmail.com, def@gmail.com]|[[1, food1, true, Mediterranean], [2, food2, false, Italian], [3, food3, true, American]]|[[chess, false, 2], [football, true, 1]]|
                +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+


                As all groupBy are done on userId and joins as well, spark will optimise it quite well.



                UPDATE 1: After @user238607 pointed out that I have missed the original requirement of food preferences being sorted by score, did a quick fix and placed the score column as first element of structure UserFoodFavourites and used sort_array function to arrange data in desired order without forcing extra shuffle operation. Updated the code and its output.






                share|improve this answer














                The main problem of joining before grouping and collecting lists is the fact that join will produce a lot of records for group by to collapse, in your example it is 12 records after join and before groupby, also you would need to worry about picking "firstName","address" out detailsDf out of 12 duplicates. To avoid both problems your could pre-process the food, email and game dataframes using struct and groupBy and join them to the detailsDf with no risk of exploding your data due to multiple records with the same userId in the joined tables.



                val detailsDf = Seq((123,"first123","xyz"))
                .toDF("userId","firstName","address")


                val emailDf = Seq((123,"abc@gmail.com"),
                (123,"def@gmail.com"))
                .toDF("userId","email")


                val foodDf = Seq((123,"food2",false,"Italian",2),
                (123,"food3",true,"American",3),
                (123,"food1",true,"Mediterranean",1))
                .toDF("userId","foodName","isFavFood","cuisine","score")


                val gameDf = Seq((123,"chess",false,2),
                (123,"football",true,1))
                .toDF("userId","gameName","isOutdoor","score")

                val emailGrp = emailDf.groupBy("userId").agg(collect_list("email").as("UserEmail"))

                val foodGrp = foodDf
                .select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites"))
                .groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))

                val gameGrp = gameDf
                .select($"userId", struct("gameName","isOutdoor","score").as("UserGameFavourites"))
                .groupBy("userId").agg(collect_list("UserGameFavourites").as("UserGameFavourites"))

                val result = detailsDf.join(emailGrp, Seq("userId"))
                .join(foodGrp, Seq("userId"))
                .join(gameGrp, Seq("userId"))

                result.show(100, false)


                Output:



                +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
                |userId|firstName|address|UserEmail |UserFoodFavourites |UserGameFavourites |
                +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
                |123 |first123 |xyz |[abc@gmail.com, def@gmail.com]|[[1, food1, true, Mediterranean], [2, food2, false, Italian], [3, food3, true, American]]|[[chess, false, 2], [football, true, 1]]|
                +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+


                As all groupBy are done on userId and joins as well, spark will optimise it quite well.



                UPDATE 1: After @user238607 pointed out that I have missed the original requirement of food preferences being sorted by score, did a quick fix and placed the score column as first element of structure UserFoodFavourites and used sort_array function to arrange data in desired order without forcing extra shuffle operation. Updated the code and its output.







                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Nov 11 at 9:28

























                answered Nov 10 at 20:18









                alexeipab

                3,338815




                3,338815



























                     

                    draft saved


                    draft discarded















































                     


                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function ()
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53200529%2fspark-dataframe-to-nested-json%23new-answer', 'question_page');

                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Top Tejano songwriter Luis Silva dead of heart attack at 64

                    政党

                    天津地下鉄3号線