How to return an array of struct or class from UDF into dataframe column value?










0















d = ['ID': '1', 'pID': 1000, 'startTime':'2018.07.02T03:34:20', 'endTime':'2018.07.03T02:40:20', 'ID': '1', 'pID': 1000, 'startTime':'2018.07.02T03:45:20', 'endTime':'2018.07.03T02:50:20', 'ID': '2', 'pID': 2000, 'startTime':'2018.07.02T03:34:20', 'endTime':'2018.07.03T02:40:20', 'ID': '2', 'pID': 2000, 'startTime':'2018.07.02T03:45:20', 'endTime':'2018.07.03T02:50:20']

df = spark.createDataFrame(d)

Dates = namedtuple("Dates", "startTime endTime")


def MergeAdjacentUsage(timeSets):
DatesArray =
for times in timeSets:
DatesArray.append(Dates(startTime=times.startTime, endTime=times.endTime))
return DatesArray


MergeAdjacentUsages = udf(MergeAdjacentUsage,ArrayType(Dates()))

df1=df.groupBy(['ID','pID']).agg(MergeAdjacentUsages(F.collect_list(struct('startTime','endTime'))).alias("Times"))

display(df1)


All I want is to set column value to an array of stuct that is returned by UDF. It is giving me error as:




TypeError: new() takes exactly 3 arguments (1 given)



TypeError Traceback (most recent call
last) in ()
22 return DatesArray
23
---> 24 MergeAdjacentUsages = udf(MergeAdjacentUsage,ArrayType(Dates()))
25
26 df1=df.groupBy(['ID','pID']).agg(MergeAdjacentUsages(F.collect_list(struct('startTime','endTime'))).alias("Times"))




Any help, idea or hint will be appreciated.










share|improve this question




























    0















    d = ['ID': '1', 'pID': 1000, 'startTime':'2018.07.02T03:34:20', 'endTime':'2018.07.03T02:40:20', 'ID': '1', 'pID': 1000, 'startTime':'2018.07.02T03:45:20', 'endTime':'2018.07.03T02:50:20', 'ID': '2', 'pID': 2000, 'startTime':'2018.07.02T03:34:20', 'endTime':'2018.07.03T02:40:20', 'ID': '2', 'pID': 2000, 'startTime':'2018.07.02T03:45:20', 'endTime':'2018.07.03T02:50:20']

    df = spark.createDataFrame(d)

    Dates = namedtuple("Dates", "startTime endTime")


    def MergeAdjacentUsage(timeSets):
    DatesArray =
    for times in timeSets:
    DatesArray.append(Dates(startTime=times.startTime, endTime=times.endTime))
    return DatesArray


    MergeAdjacentUsages = udf(MergeAdjacentUsage,ArrayType(Dates()))

    df1=df.groupBy(['ID','pID']).agg(MergeAdjacentUsages(F.collect_list(struct('startTime','endTime'))).alias("Times"))

    display(df1)


    All I want is to set column value to an array of stuct that is returned by UDF. It is giving me error as:




    TypeError: new() takes exactly 3 arguments (1 given)



    TypeError Traceback (most recent call
    last) in ()
    22 return DatesArray
    23
    ---> 24 MergeAdjacentUsages = udf(MergeAdjacentUsage,ArrayType(Dates()))
    25
    26 df1=df.groupBy(['ID','pID']).agg(MergeAdjacentUsages(F.collect_list(struct('startTime','endTime'))).alias("Times"))




    Any help, idea or hint will be appreciated.










    share|improve this question


























      0












      0








      0








      d = ['ID': '1', 'pID': 1000, 'startTime':'2018.07.02T03:34:20', 'endTime':'2018.07.03T02:40:20', 'ID': '1', 'pID': 1000, 'startTime':'2018.07.02T03:45:20', 'endTime':'2018.07.03T02:50:20', 'ID': '2', 'pID': 2000, 'startTime':'2018.07.02T03:34:20', 'endTime':'2018.07.03T02:40:20', 'ID': '2', 'pID': 2000, 'startTime':'2018.07.02T03:45:20', 'endTime':'2018.07.03T02:50:20']

      df = spark.createDataFrame(d)

      Dates = namedtuple("Dates", "startTime endTime")


      def MergeAdjacentUsage(timeSets):
      DatesArray =
      for times in timeSets:
      DatesArray.append(Dates(startTime=times.startTime, endTime=times.endTime))
      return DatesArray


      MergeAdjacentUsages = udf(MergeAdjacentUsage,ArrayType(Dates()))

      df1=df.groupBy(['ID','pID']).agg(MergeAdjacentUsages(F.collect_list(struct('startTime','endTime'))).alias("Times"))

      display(df1)


      All I want is to set column value to an array of stuct that is returned by UDF. It is giving me error as:




      TypeError: new() takes exactly 3 arguments (1 given)



      TypeError Traceback (most recent call
      last) in ()
      22 return DatesArray
      23
      ---> 24 MergeAdjacentUsages = udf(MergeAdjacentUsage,ArrayType(Dates()))
      25
      26 df1=df.groupBy(['ID','pID']).agg(MergeAdjacentUsages(F.collect_list(struct('startTime','endTime'))).alias("Times"))




      Any help, idea or hint will be appreciated.










      share|improve this question
















      d = ['ID': '1', 'pID': 1000, 'startTime':'2018.07.02T03:34:20', 'endTime':'2018.07.03T02:40:20', 'ID': '1', 'pID': 1000, 'startTime':'2018.07.02T03:45:20', 'endTime':'2018.07.03T02:50:20', 'ID': '2', 'pID': 2000, 'startTime':'2018.07.02T03:34:20', 'endTime':'2018.07.03T02:40:20', 'ID': '2', 'pID': 2000, 'startTime':'2018.07.02T03:45:20', 'endTime':'2018.07.03T02:50:20']

      df = spark.createDataFrame(d)

      Dates = namedtuple("Dates", "startTime endTime")


      def MergeAdjacentUsage(timeSets):
      DatesArray =
      for times in timeSets:
      DatesArray.append(Dates(startTime=times.startTime, endTime=times.endTime))
      return DatesArray


      MergeAdjacentUsages = udf(MergeAdjacentUsage,ArrayType(Dates()))

      df1=df.groupBy(['ID','pID']).agg(MergeAdjacentUsages(F.collect_list(struct('startTime','endTime'))).alias("Times"))

      display(df1)


      All I want is to set column value to an array of stuct that is returned by UDF. It is giving me error as:




      TypeError: new() takes exactly 3 arguments (1 given)



      TypeError Traceback (most recent call
      last) in ()
      22 return DatesArray
      23
      ---> 24 MergeAdjacentUsages = udf(MergeAdjacentUsage,ArrayType(Dates()))
      25
      26 df1=df.groupBy(['ID','pID']).agg(MergeAdjacentUsages(F.collect_list(struct('startTime','endTime'))).alias("Times"))




      Any help, idea or hint will be appreciated.







      arrays dataframe struct pyspark user-defined-functions






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 14 '18 at 13:30









      Ali AzG

      6331616




      6331616










      asked Nov 14 '18 at 11:27









      Bilal ShafqatBilal Shafqat

      155




      155






















          1 Answer
          1






          active

          oldest

          votes


















          0














          pyspark does not let user defined Class objects as Dataframe Column Types. Instead we need to create the StructType which can be used similar to a class / named tuple in python.



          For example:



          from pyspark.sql.types import *
          from pyspark.sql.functions import udf
          from pyspark.sql import functions as F
          # from pyspark.sql.functions import *

          d = ['ID': '1', 'pID': 1000, 'startTime': '2018.07.02T03:34:20', 'endTime': '2018.07.03T02:40:20',
          'ID': '1', 'pID': 1000, 'startTime': '2018.07.02T03:45:20', 'endTime': '2018.07.03T02:50:20',
          'ID': '2', 'pID': 2000, 'startTime': '2018.07.02T03:34:20', 'endTime': '2018.07.03T02:40:20',
          'ID': '2', 'pID': 2000, 'startTime': '2018.07.02T03:45:20', 'endTime': '2018.07.03T02:50:20']

          df = spark.createDataFrame(d)

          # Dates = namedtuple("Dates", "startTime endTime")

          schema = ArrayType(StructType([
          StructField("startTime", StringType(), False),
          StructField("endTime", StringType(), False)
          ]))


          MergeAdjacentUsages = udf(lambda xs: xs, schema)

          df1 = df.groupBy(['ID', 'pID']).agg(MergeAdjacentUsages(
          F.collect_list(F.struct('startTime', 'endTime'))).alias("Times"))
          df1.show(truncate=False)

          +---+----+----------------------------------------------------------------------------------------+
          |ID |pID |Times |
          +---+----+----------------------------------------------------------------------------------------+
          |2 |2000|[[2018.07.02T03:34:20, 2018.07.03T02:40:20], [2018.07.02T03:45:20, 2018.07.03T02:50:20]]|
          |1 |1000|[[2018.07.02T03:34:20, 2018.07.03T02:40:20], [2018.07.02T03:45:20, 2018.07.03T02:50:20]]|
          +---+----+----------------------------------------------------------------------------------------+


          Hope this helps!






          share|improve this answer

























          • Yeah, that was what I was looking for. Thanks alot.

            – Bilal Shafqat
            Nov 20 '18 at 13:08










          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53299143%2fhow-to-return-an-array-of-struct-or-class-from-udf-into-dataframe-column-value%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0














          pyspark does not let user defined Class objects as Dataframe Column Types. Instead we need to create the StructType which can be used similar to a class / named tuple in python.



          For example:



          from pyspark.sql.types import *
          from pyspark.sql.functions import udf
          from pyspark.sql import functions as F
          # from pyspark.sql.functions import *

          d = ['ID': '1', 'pID': 1000, 'startTime': '2018.07.02T03:34:20', 'endTime': '2018.07.03T02:40:20',
          'ID': '1', 'pID': 1000, 'startTime': '2018.07.02T03:45:20', 'endTime': '2018.07.03T02:50:20',
          'ID': '2', 'pID': 2000, 'startTime': '2018.07.02T03:34:20', 'endTime': '2018.07.03T02:40:20',
          'ID': '2', 'pID': 2000, 'startTime': '2018.07.02T03:45:20', 'endTime': '2018.07.03T02:50:20']

          df = spark.createDataFrame(d)

          # Dates = namedtuple("Dates", "startTime endTime")

          schema = ArrayType(StructType([
          StructField("startTime", StringType(), False),
          StructField("endTime", StringType(), False)
          ]))


          MergeAdjacentUsages = udf(lambda xs: xs, schema)

          df1 = df.groupBy(['ID', 'pID']).agg(MergeAdjacentUsages(
          F.collect_list(F.struct('startTime', 'endTime'))).alias("Times"))
          df1.show(truncate=False)

          +---+----+----------------------------------------------------------------------------------------+
          |ID |pID |Times |
          +---+----+----------------------------------------------------------------------------------------+
          |2 |2000|[[2018.07.02T03:34:20, 2018.07.03T02:40:20], [2018.07.02T03:45:20, 2018.07.03T02:50:20]]|
          |1 |1000|[[2018.07.02T03:34:20, 2018.07.03T02:40:20], [2018.07.02T03:45:20, 2018.07.03T02:50:20]]|
          +---+----+----------------------------------------------------------------------------------------+


          Hope this helps!






          share|improve this answer

























          • Yeah, that was what I was looking for. Thanks alot.

            – Bilal Shafqat
            Nov 20 '18 at 13:08















          0














          pyspark does not let user defined Class objects as Dataframe Column Types. Instead we need to create the StructType which can be used similar to a class / named tuple in python.



          For example:



          from pyspark.sql.types import *
          from pyspark.sql.functions import udf
          from pyspark.sql import functions as F
          # from pyspark.sql.functions import *

          d = ['ID': '1', 'pID': 1000, 'startTime': '2018.07.02T03:34:20', 'endTime': '2018.07.03T02:40:20',
          'ID': '1', 'pID': 1000, 'startTime': '2018.07.02T03:45:20', 'endTime': '2018.07.03T02:50:20',
          'ID': '2', 'pID': 2000, 'startTime': '2018.07.02T03:34:20', 'endTime': '2018.07.03T02:40:20',
          'ID': '2', 'pID': 2000, 'startTime': '2018.07.02T03:45:20', 'endTime': '2018.07.03T02:50:20']

          df = spark.createDataFrame(d)

          # Dates = namedtuple("Dates", "startTime endTime")

          schema = ArrayType(StructType([
          StructField("startTime", StringType(), False),
          StructField("endTime", StringType(), False)
          ]))


          MergeAdjacentUsages = udf(lambda xs: xs, schema)

          df1 = df.groupBy(['ID', 'pID']).agg(MergeAdjacentUsages(
          F.collect_list(F.struct('startTime', 'endTime'))).alias("Times"))
          df1.show(truncate=False)

          +---+----+----------------------------------------------------------------------------------------+
          |ID |pID |Times |
          +---+----+----------------------------------------------------------------------------------------+
          |2 |2000|[[2018.07.02T03:34:20, 2018.07.03T02:40:20], [2018.07.02T03:45:20, 2018.07.03T02:50:20]]|
          |1 |1000|[[2018.07.02T03:34:20, 2018.07.03T02:40:20], [2018.07.02T03:45:20, 2018.07.03T02:50:20]]|
          +---+----+----------------------------------------------------------------------------------------+


          Hope this helps!






          share|improve this answer

























          • Yeah, that was what I was looking for. Thanks alot.

            – Bilal Shafqat
            Nov 20 '18 at 13:08













          0












          0








          0







          pyspark does not let user defined Class objects as Dataframe Column Types. Instead we need to create the StructType which can be used similar to a class / named tuple in python.



          For example:



          from pyspark.sql.types import *
          from pyspark.sql.functions import udf
          from pyspark.sql import functions as F
          # from pyspark.sql.functions import *

          d = ['ID': '1', 'pID': 1000, 'startTime': '2018.07.02T03:34:20', 'endTime': '2018.07.03T02:40:20',
          'ID': '1', 'pID': 1000, 'startTime': '2018.07.02T03:45:20', 'endTime': '2018.07.03T02:50:20',
          'ID': '2', 'pID': 2000, 'startTime': '2018.07.02T03:34:20', 'endTime': '2018.07.03T02:40:20',
          'ID': '2', 'pID': 2000, 'startTime': '2018.07.02T03:45:20', 'endTime': '2018.07.03T02:50:20']

          df = spark.createDataFrame(d)

          # Dates = namedtuple("Dates", "startTime endTime")

          schema = ArrayType(StructType([
          StructField("startTime", StringType(), False),
          StructField("endTime", StringType(), False)
          ]))


          MergeAdjacentUsages = udf(lambda xs: xs, schema)

          df1 = df.groupBy(['ID', 'pID']).agg(MergeAdjacentUsages(
          F.collect_list(F.struct('startTime', 'endTime'))).alias("Times"))
          df1.show(truncate=False)

          +---+----+----------------------------------------------------------------------------------------+
          |ID |pID |Times |
          +---+----+----------------------------------------------------------------------------------------+
          |2 |2000|[[2018.07.02T03:34:20, 2018.07.03T02:40:20], [2018.07.02T03:45:20, 2018.07.03T02:50:20]]|
          |1 |1000|[[2018.07.02T03:34:20, 2018.07.03T02:40:20], [2018.07.02T03:45:20, 2018.07.03T02:50:20]]|
          +---+----+----------------------------------------------------------------------------------------+


          Hope this helps!






          share|improve this answer















          pyspark does not let user defined Class objects as Dataframe Column Types. Instead we need to create the StructType which can be used similar to a class / named tuple in python.



          For example:



          from pyspark.sql.types import *
          from pyspark.sql.functions import udf
          from pyspark.sql import functions as F
          # from pyspark.sql.functions import *

          d = ['ID': '1', 'pID': 1000, 'startTime': '2018.07.02T03:34:20', 'endTime': '2018.07.03T02:40:20',
          'ID': '1', 'pID': 1000, 'startTime': '2018.07.02T03:45:20', 'endTime': '2018.07.03T02:50:20',
          'ID': '2', 'pID': 2000, 'startTime': '2018.07.02T03:34:20', 'endTime': '2018.07.03T02:40:20',
          'ID': '2', 'pID': 2000, 'startTime': '2018.07.02T03:45:20', 'endTime': '2018.07.03T02:50:20']

          df = spark.createDataFrame(d)

          # Dates = namedtuple("Dates", "startTime endTime")

          schema = ArrayType(StructType([
          StructField("startTime", StringType(), False),
          StructField("endTime", StringType(), False)
          ]))


          MergeAdjacentUsages = udf(lambda xs: xs, schema)

          df1 = df.groupBy(['ID', 'pID']).agg(MergeAdjacentUsages(
          F.collect_list(F.struct('startTime', 'endTime'))).alias("Times"))
          df1.show(truncate=False)

          +---+----+----------------------------------------------------------------------------------------+
          |ID |pID |Times |
          +---+----+----------------------------------------------------------------------------------------+
          |2 |2000|[[2018.07.02T03:34:20, 2018.07.03T02:40:20], [2018.07.02T03:45:20, 2018.07.03T02:50:20]]|
          |1 |1000|[[2018.07.02T03:34:20, 2018.07.03T02:40:20], [2018.07.02T03:45:20, 2018.07.03T02:50:20]]|
          +---+----+----------------------------------------------------------------------------------------+


          Hope this helps!







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 17 '18 at 1:07

























          answered Nov 16 '18 at 23:03









          Pavithran RamachandranPavithran Ramachandran

          43338




          43338












          • Yeah, that was what I was looking for. Thanks alot.

            – Bilal Shafqat
            Nov 20 '18 at 13:08

















          • Yeah, that was what I was looking for. Thanks alot.

            – Bilal Shafqat
            Nov 20 '18 at 13:08
















          Yeah, that was what I was looking for. Thanks alot.

          – Bilal Shafqat
          Nov 20 '18 at 13:08





          Yeah, that was what I was looking for. Thanks alot.

          – Bilal Shafqat
          Nov 20 '18 at 13:08

















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53299143%2fhow-to-return-an-array-of-struct-or-class-from-udf-into-dataframe-column-value%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Top Tejano songwriter Luis Silva dead of heart attack at 64

          ReactJS Fetched API data displays live - need Data displayed static

          政党