python or dask parallel generator?










1















Is it possible in python (maybe using dask, maybe using multiprocessing) to 'emplace' generators on cores, and then, in parallel, step through the generators and process the results?



It needs to be generators in particular (or objects with __iter__); lists of all the yielded elements the generators yield won't fit into memory.



In particular:



With pandas, I can call read_csv(...iterator=True), which gives me an iterator (TextFileReader) - I can for in it or explicitly call next multiple times. The entire csv never gets read into memory. Nice.



Every time I read a next chunk from the iterator, I also perform some expensive computation on it.



But now I have 2 such files. I would like to create 2 such generators, and 'emplace' 1 on one core and 1 on another, such that I can:



 result = expensive_process(next(iterator))


on each core, in parallel, and then combine and return the result. Repeat this step until one generator or both is out of yield.



It looks like the TextFileReader is not pickleable, nor is a generator. I can't find out how to do this in dask or multiprocessing. Is there a pattern for this?










share|improve this question






















  • Can you be more specific - do you have two CSV files (of the same format maybe - or at least compatible columns or some such) - that you'd like to apply some aggregation function to? What is the nature of expensive_process for instance?

    – Jon Clements
    Nov 16 '18 at 0:54












  • Well, to tell the truth I don't have 2 (yes, identically formatted csvs) - I actually have 300+. And 'expensive_process' means just that - in this case, turn one of the csv columns from string to arrays of numbers, and then render those numbers. So, some expensive function applied to each row of the csv. I'm trying to avoid concatenating many slices, and then sending off the result for parallel processing - that'll be a lot of data transfer...

    – Colin
    Nov 16 '18 at 3:20















1















Is it possible in python (maybe using dask, maybe using multiprocessing) to 'emplace' generators on cores, and then, in parallel, step through the generators and process the results?



It needs to be generators in particular (or objects with __iter__); lists of all the yielded elements the generators yield won't fit into memory.



In particular:



With pandas, I can call read_csv(...iterator=True), which gives me an iterator (TextFileReader) - I can for in it or explicitly call next multiple times. The entire csv never gets read into memory. Nice.



Every time I read a next chunk from the iterator, I also perform some expensive computation on it.



But now I have 2 such files. I would like to create 2 such generators, and 'emplace' 1 on one core and 1 on another, such that I can:



 result = expensive_process(next(iterator))


on each core, in parallel, and then combine and return the result. Repeat this step until one generator or both is out of yield.



It looks like the TextFileReader is not pickleable, nor is a generator. I can't find out how to do this in dask or multiprocessing. Is there a pattern for this?










share|improve this question






















  • Can you be more specific - do you have two CSV files (of the same format maybe - or at least compatible columns or some such) - that you'd like to apply some aggregation function to? What is the nature of expensive_process for instance?

    – Jon Clements
    Nov 16 '18 at 0:54












  • Well, to tell the truth I don't have 2 (yes, identically formatted csvs) - I actually have 300+. And 'expensive_process' means just that - in this case, turn one of the csv columns from string to arrays of numbers, and then render those numbers. So, some expensive function applied to each row of the csv. I'm trying to avoid concatenating many slices, and then sending off the result for parallel processing - that'll be a lot of data transfer...

    – Colin
    Nov 16 '18 at 3:20













1












1








1








Is it possible in python (maybe using dask, maybe using multiprocessing) to 'emplace' generators on cores, and then, in parallel, step through the generators and process the results?



It needs to be generators in particular (or objects with __iter__); lists of all the yielded elements the generators yield won't fit into memory.



In particular:



With pandas, I can call read_csv(...iterator=True), which gives me an iterator (TextFileReader) - I can for in it or explicitly call next multiple times. The entire csv never gets read into memory. Nice.



Every time I read a next chunk from the iterator, I also perform some expensive computation on it.



But now I have 2 such files. I would like to create 2 such generators, and 'emplace' 1 on one core and 1 on another, such that I can:



 result = expensive_process(next(iterator))


on each core, in parallel, and then combine and return the result. Repeat this step until one generator or both is out of yield.



It looks like the TextFileReader is not pickleable, nor is a generator. I can't find out how to do this in dask or multiprocessing. Is there a pattern for this?










share|improve this question














Is it possible in python (maybe using dask, maybe using multiprocessing) to 'emplace' generators on cores, and then, in parallel, step through the generators and process the results?



It needs to be generators in particular (or objects with __iter__); lists of all the yielded elements the generators yield won't fit into memory.



In particular:



With pandas, I can call read_csv(...iterator=True), which gives me an iterator (TextFileReader) - I can for in it or explicitly call next multiple times. The entire csv never gets read into memory. Nice.



Every time I read a next chunk from the iterator, I also perform some expensive computation on it.



But now I have 2 such files. I would like to create 2 such generators, and 'emplace' 1 on one core and 1 on another, such that I can:



 result = expensive_process(next(iterator))


on each core, in parallel, and then combine and return the result. Repeat this step until one generator or both is out of yield.



It looks like the TextFileReader is not pickleable, nor is a generator. I can't find out how to do this in dask or multiprocessing. Is there a pattern for this?







python pandas python-multiprocessing dask






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 16 '18 at 0:47









ColinColin

2,47711625




2,47711625












  • Can you be more specific - do you have two CSV files (of the same format maybe - or at least compatible columns or some such) - that you'd like to apply some aggregation function to? What is the nature of expensive_process for instance?

    – Jon Clements
    Nov 16 '18 at 0:54












  • Well, to tell the truth I don't have 2 (yes, identically formatted csvs) - I actually have 300+. And 'expensive_process' means just that - in this case, turn one of the csv columns from string to arrays of numbers, and then render those numbers. So, some expensive function applied to each row of the csv. I'm trying to avoid concatenating many slices, and then sending off the result for parallel processing - that'll be a lot of data transfer...

    – Colin
    Nov 16 '18 at 3:20

















  • Can you be more specific - do you have two CSV files (of the same format maybe - or at least compatible columns or some such) - that you'd like to apply some aggregation function to? What is the nature of expensive_process for instance?

    – Jon Clements
    Nov 16 '18 at 0:54












  • Well, to tell the truth I don't have 2 (yes, identically formatted csvs) - I actually have 300+. And 'expensive_process' means just that - in this case, turn one of the csv columns from string to arrays of numbers, and then render those numbers. So, some expensive function applied to each row of the csv. I'm trying to avoid concatenating many slices, and then sending off the result for parallel processing - that'll be a lot of data transfer...

    – Colin
    Nov 16 '18 at 3:20
















Can you be more specific - do you have two CSV files (of the same format maybe - or at least compatible columns or some such) - that you'd like to apply some aggregation function to? What is the nature of expensive_process for instance?

– Jon Clements
Nov 16 '18 at 0:54






Can you be more specific - do you have two CSV files (of the same format maybe - or at least compatible columns or some such) - that you'd like to apply some aggregation function to? What is the nature of expensive_process for instance?

– Jon Clements
Nov 16 '18 at 0:54














Well, to tell the truth I don't have 2 (yes, identically formatted csvs) - I actually have 300+. And 'expensive_process' means just that - in this case, turn one of the csv columns from string to arrays of numbers, and then render those numbers. So, some expensive function applied to each row of the csv. I'm trying to avoid concatenating many slices, and then sending off the result for parallel processing - that'll be a lot of data transfer...

– Colin
Nov 16 '18 at 3:20





Well, to tell the truth I don't have 2 (yes, identically formatted csvs) - I actually have 300+. And 'expensive_process' means just that - in this case, turn one of the csv columns from string to arrays of numbers, and then render those numbers. So, some expensive function applied to each row of the csv. I'm trying to avoid concatenating many slices, and then sending off the result for parallel processing - that'll be a lot of data transfer...

– Colin
Nov 16 '18 at 3:20












2 Answers
2






active

oldest

votes


















1














Dask's read_csv is designed to load data from multiple files in chunks, with a chunk-size that you can specify. When you operate on the resultant dataframe, you will be working chunk-wise, which is exactly the point of using Dask in the first place. There should be no need to use your iterator method.



The dask dataframe method you will want to use, most likely, is map_partitions().



If you really wanted to use the iterator idea, you should look into dask.delayed, which is able to parallelise arbitrary python functions, by sending each invocation of the function (with a different file-name for each) to your workers.






share|improve this answer























  • Hm, I'm confused. Right now if I have two files and call dask.dataframe.read_csv([f1, f2], blocksize=100000) I get 2219 partitions - the first ~1000 are from f1, the rest from f2 . What I'd like to do is step through f1 and f2 in parallel: the first 500 rows of f1 and f2, the 2nd 500 rows of f1&f2, etc... If I call map_partitions, doesn't it immediately step through all of the partitions? This is because I can't fit all the results of mapping in memory at the same time - I have to consume them a batch at a time...

    – Colin
    Nov 18 '18 at 20:33


















0














So luckily I think this problem maps nicely onto python's multiprocessing .Process and .Queue.



def data_generator(whatever):
for v in something(whatever):
yield v

def generator_constructor(whatever):
def generator(outputQueue):
for d in data_generator(whatever):
outputQueue.put(d)
outputQueue.put(None) # sentinel
return generator

def procSumGenerator():
outputQs = [Queue(size) for _ in range(NumCores)]
procs = [Process(target=generator_constructor(whatever),
args=(outputQs[i],))
for i in range(NumCores)]

for proc in procs: proc.start()

# until any output queue returns a None, collect
# from all and yield
done = False
while not done:
results = [oq.get() for oq in outputQs]
done = any(res is None for res in results)
if not done:
yield some_combination_of(results)

for proc in procs: terminate()

for v in procSumGenerator():
print(v)


Maybe this can be done better with Dask? I find that my solution fairly quickly saturates the network for large sizes of generated data - I'm manipulating csvs with pandas and returning large numpy arrays.



https://github.com/colinator/doodle_generator/blob/master/data_generator_uniform_final.ipynb






share|improve this answer
























    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53329891%2fpython-or-dask-parallel-generator%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    2 Answers
    2






    active

    oldest

    votes








    2 Answers
    2






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    1














    Dask's read_csv is designed to load data from multiple files in chunks, with a chunk-size that you can specify. When you operate on the resultant dataframe, you will be working chunk-wise, which is exactly the point of using Dask in the first place. There should be no need to use your iterator method.



    The dask dataframe method you will want to use, most likely, is map_partitions().



    If you really wanted to use the iterator idea, you should look into dask.delayed, which is able to parallelise arbitrary python functions, by sending each invocation of the function (with a different file-name for each) to your workers.






    share|improve this answer























    • Hm, I'm confused. Right now if I have two files and call dask.dataframe.read_csv([f1, f2], blocksize=100000) I get 2219 partitions - the first ~1000 are from f1, the rest from f2 . What I'd like to do is step through f1 and f2 in parallel: the first 500 rows of f1 and f2, the 2nd 500 rows of f1&f2, etc... If I call map_partitions, doesn't it immediately step through all of the partitions? This is because I can't fit all the results of mapping in memory at the same time - I have to consume them a batch at a time...

      – Colin
      Nov 18 '18 at 20:33















    1














    Dask's read_csv is designed to load data from multiple files in chunks, with a chunk-size that you can specify. When you operate on the resultant dataframe, you will be working chunk-wise, which is exactly the point of using Dask in the first place. There should be no need to use your iterator method.



    The dask dataframe method you will want to use, most likely, is map_partitions().



    If you really wanted to use the iterator idea, you should look into dask.delayed, which is able to parallelise arbitrary python functions, by sending each invocation of the function (with a different file-name for each) to your workers.






    share|improve this answer























    • Hm, I'm confused. Right now if I have two files and call dask.dataframe.read_csv([f1, f2], blocksize=100000) I get 2219 partitions - the first ~1000 are from f1, the rest from f2 . What I'd like to do is step through f1 and f2 in parallel: the first 500 rows of f1 and f2, the 2nd 500 rows of f1&f2, etc... If I call map_partitions, doesn't it immediately step through all of the partitions? This is because I can't fit all the results of mapping in memory at the same time - I have to consume them a batch at a time...

      – Colin
      Nov 18 '18 at 20:33













    1












    1








    1







    Dask's read_csv is designed to load data from multiple files in chunks, with a chunk-size that you can specify. When you operate on the resultant dataframe, you will be working chunk-wise, which is exactly the point of using Dask in the first place. There should be no need to use your iterator method.



    The dask dataframe method you will want to use, most likely, is map_partitions().



    If you really wanted to use the iterator idea, you should look into dask.delayed, which is able to parallelise arbitrary python functions, by sending each invocation of the function (with a different file-name for each) to your workers.






    share|improve this answer













    Dask's read_csv is designed to load data from multiple files in chunks, with a chunk-size that you can specify. When you operate on the resultant dataframe, you will be working chunk-wise, which is exactly the point of using Dask in the first place. There should be no need to use your iterator method.



    The dask dataframe method you will want to use, most likely, is map_partitions().



    If you really wanted to use the iterator idea, you should look into dask.delayed, which is able to parallelise arbitrary python functions, by sending each invocation of the function (with a different file-name for each) to your workers.







    share|improve this answer












    share|improve this answer



    share|improve this answer










    answered Nov 18 '18 at 17:27









    mdurantmdurant

    11.2k11741




    11.2k11741












    • Hm, I'm confused. Right now if I have two files and call dask.dataframe.read_csv([f1, f2], blocksize=100000) I get 2219 partitions - the first ~1000 are from f1, the rest from f2 . What I'd like to do is step through f1 and f2 in parallel: the first 500 rows of f1 and f2, the 2nd 500 rows of f1&f2, etc... If I call map_partitions, doesn't it immediately step through all of the partitions? This is because I can't fit all the results of mapping in memory at the same time - I have to consume them a batch at a time...

      – Colin
      Nov 18 '18 at 20:33

















    • Hm, I'm confused. Right now if I have two files and call dask.dataframe.read_csv([f1, f2], blocksize=100000) I get 2219 partitions - the first ~1000 are from f1, the rest from f2 . What I'd like to do is step through f1 and f2 in parallel: the first 500 rows of f1 and f2, the 2nd 500 rows of f1&f2, etc... If I call map_partitions, doesn't it immediately step through all of the partitions? This is because I can't fit all the results of mapping in memory at the same time - I have to consume them a batch at a time...

      – Colin
      Nov 18 '18 at 20:33
















    Hm, I'm confused. Right now if I have two files and call dask.dataframe.read_csv([f1, f2], blocksize=100000) I get 2219 partitions - the first ~1000 are from f1, the rest from f2 . What I'd like to do is step through f1 and f2 in parallel: the first 500 rows of f1 and f2, the 2nd 500 rows of f1&f2, etc... If I call map_partitions, doesn't it immediately step through all of the partitions? This is because I can't fit all the results of mapping in memory at the same time - I have to consume them a batch at a time...

    – Colin
    Nov 18 '18 at 20:33





    Hm, I'm confused. Right now if I have two files and call dask.dataframe.read_csv([f1, f2], blocksize=100000) I get 2219 partitions - the first ~1000 are from f1, the rest from f2 . What I'd like to do is step through f1 and f2 in parallel: the first 500 rows of f1 and f2, the 2nd 500 rows of f1&f2, etc... If I call map_partitions, doesn't it immediately step through all of the partitions? This is because I can't fit all the results of mapping in memory at the same time - I have to consume them a batch at a time...

    – Colin
    Nov 18 '18 at 20:33













    0














    So luckily I think this problem maps nicely onto python's multiprocessing .Process and .Queue.



    def data_generator(whatever):
    for v in something(whatever):
    yield v

    def generator_constructor(whatever):
    def generator(outputQueue):
    for d in data_generator(whatever):
    outputQueue.put(d)
    outputQueue.put(None) # sentinel
    return generator

    def procSumGenerator():
    outputQs = [Queue(size) for _ in range(NumCores)]
    procs = [Process(target=generator_constructor(whatever),
    args=(outputQs[i],))
    for i in range(NumCores)]

    for proc in procs: proc.start()

    # until any output queue returns a None, collect
    # from all and yield
    done = False
    while not done:
    results = [oq.get() for oq in outputQs]
    done = any(res is None for res in results)
    if not done:
    yield some_combination_of(results)

    for proc in procs: terminate()

    for v in procSumGenerator():
    print(v)


    Maybe this can be done better with Dask? I find that my solution fairly quickly saturates the network for large sizes of generated data - I'm manipulating csvs with pandas and returning large numpy arrays.



    https://github.com/colinator/doodle_generator/blob/master/data_generator_uniform_final.ipynb






    share|improve this answer





























      0














      So luckily I think this problem maps nicely onto python's multiprocessing .Process and .Queue.



      def data_generator(whatever):
      for v in something(whatever):
      yield v

      def generator_constructor(whatever):
      def generator(outputQueue):
      for d in data_generator(whatever):
      outputQueue.put(d)
      outputQueue.put(None) # sentinel
      return generator

      def procSumGenerator():
      outputQs = [Queue(size) for _ in range(NumCores)]
      procs = [Process(target=generator_constructor(whatever),
      args=(outputQs[i],))
      for i in range(NumCores)]

      for proc in procs: proc.start()

      # until any output queue returns a None, collect
      # from all and yield
      done = False
      while not done:
      results = [oq.get() for oq in outputQs]
      done = any(res is None for res in results)
      if not done:
      yield some_combination_of(results)

      for proc in procs: terminate()

      for v in procSumGenerator():
      print(v)


      Maybe this can be done better with Dask? I find that my solution fairly quickly saturates the network for large sizes of generated data - I'm manipulating csvs with pandas and returning large numpy arrays.



      https://github.com/colinator/doodle_generator/blob/master/data_generator_uniform_final.ipynb






      share|improve this answer



























        0












        0








        0







        So luckily I think this problem maps nicely onto python's multiprocessing .Process and .Queue.



        def data_generator(whatever):
        for v in something(whatever):
        yield v

        def generator_constructor(whatever):
        def generator(outputQueue):
        for d in data_generator(whatever):
        outputQueue.put(d)
        outputQueue.put(None) # sentinel
        return generator

        def procSumGenerator():
        outputQs = [Queue(size) for _ in range(NumCores)]
        procs = [Process(target=generator_constructor(whatever),
        args=(outputQs[i],))
        for i in range(NumCores)]

        for proc in procs: proc.start()

        # until any output queue returns a None, collect
        # from all and yield
        done = False
        while not done:
        results = [oq.get() for oq in outputQs]
        done = any(res is None for res in results)
        if not done:
        yield some_combination_of(results)

        for proc in procs: terminate()

        for v in procSumGenerator():
        print(v)


        Maybe this can be done better with Dask? I find that my solution fairly quickly saturates the network for large sizes of generated data - I'm manipulating csvs with pandas and returning large numpy arrays.



        https://github.com/colinator/doodle_generator/blob/master/data_generator_uniform_final.ipynb






        share|improve this answer















        So luckily I think this problem maps nicely onto python's multiprocessing .Process and .Queue.



        def data_generator(whatever):
        for v in something(whatever):
        yield v

        def generator_constructor(whatever):
        def generator(outputQueue):
        for d in data_generator(whatever):
        outputQueue.put(d)
        outputQueue.put(None) # sentinel
        return generator

        def procSumGenerator():
        outputQs = [Queue(size) for _ in range(NumCores)]
        procs = [Process(target=generator_constructor(whatever),
        args=(outputQs[i],))
        for i in range(NumCores)]

        for proc in procs: proc.start()

        # until any output queue returns a None, collect
        # from all and yield
        done = False
        while not done:
        results = [oq.get() for oq in outputQs]
        done = any(res is None for res in results)
        if not done:
        yield some_combination_of(results)

        for proc in procs: terminate()

        for v in procSumGenerator():
        print(v)


        Maybe this can be done better with Dask? I find that my solution fairly quickly saturates the network for large sizes of generated data - I'm manipulating csvs with pandas and returning large numpy arrays.



        https://github.com/colinator/doodle_generator/blob/master/data_generator_uniform_final.ipynb







        share|improve this answer














        share|improve this answer



        share|improve this answer








        edited Nov 20 '18 at 17:26

























        answered Nov 19 '18 at 20:21









        ColinColin

        2,47711625




        2,47711625



























            draft saved

            draft discarded
















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53329891%2fpython-or-dask-parallel-generator%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Top Tejano songwriter Luis Silva dead of heart attack at 64

            政党

            天津地下鉄3号線