Skip to content Skip to sidebar Skip to footer

Read Process And Concatenate Pandas Dataframe In Parallel With Dask

I'm trying to read and process in parallel a list of csv files and concatenate the output in a single pandas dataframe for further processing. My workflow consist of 3 steps: crea

Solution 1:

Pandas

In Pandas I would use the apply method

In [1]: import pandas as pd

In [2]: df = pd.DataFrame({'a': [1, 2, 3], 'b': [3, 2, 1]})

In [3]: def makegeom(row):
   ...:      a, b = row
   ...:      return 'Point(%s %s)' % (a, b)
   ...: 

In [4]: df.apply(makegeom, axis=1)
Out[4]: 
0    Point(1 3)
1    Point(2 2)
2    Point(3 1)
dtype: object

Dask.dataframe

In dask.dataframe you can do the same thing

In [5]: import dask.dataframe as dd

In [6]: ddf = dd.from_pandas(df, npartitions=2)

In [7]: ddf.apply(makegeom, axis=1).compute()
Out[7]: 
0    Point(1 3)
1    Point(2 2)
2    Point(3 1)

Add new series

In either case you can then add the new series to the dataframe

df['geom'] = df[['a', 'b']].apply(makegeom)

Create

If you have CSV data then I would use the dask.dataframe.read_csv function

ddf = dd.read_csv('filenames.*.csv')

If you have other kinds of data then I would use dask.delayed


Solution 2:

In the meantime, I have found other ways (alternative to Dask), in my opinion relatively easier, to perform a function func in parallel over a pandas data frame. In both cases, I took advantage of the numpy.array_split method.

One uses a combination of the python multiprocessing.Pool, numpy.array_split and pandas.concat and will work this way:

import numpy as np

def func(array):
    # do some computation on the given array
    pass

def parallelize_dataframe(df, func, n_cores=72):
    df_split = np.array_split(df, n_cores)
    pool = Pool(n_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

Another is by using the powerful but simple ray cluster (which is quite useful if you can run the code over multiple machines):

# connect to a ray cluster
# 

import ray

ray.init(address="auto", redis_password="5241590000000000")

import numpy as np


@ray.remote
def func(df):
    # do some computation on the given dataframe
    pass

df_split = np.array_split(df, 288)
result = pd.concat(ray.get([func.remote(i) for i in df_split]))

The methods above are working quite well for simple methods func where the computation can be carried out with numpy and the product which is returned can be concatenated back into a pandas data frame - for methods that do simpler file manipulation I also found useful parmap.map - but that is off-topic for this S.O. question.


Post a Comment for "Read Process And Concatenate Pandas Dataframe In Parallel With Dask"