Read Process And Concatenate Pandas Dataframe In Parallel With Dask
Solution 1:
Pandas
In Pandas I would use the apply method
In [1]: import pandas as pd
In [2]: df = pd.DataFrame({'a': [1, 2, 3], 'b': [3, 2, 1]})
In [3]: def makegeom(row):
...: a, b = row
...: return 'Point(%s %s)' % (a, b)
...:
In [4]: df.apply(makegeom, axis=1)
Out[4]:
0 Point(1 3)
1 Point(2 2)
2 Point(3 1)
dtype: object
Dask.dataframe
In dask.dataframe you can do the same thing
In [5]: import dask.dataframe as dd
In [6]: ddf = dd.from_pandas(df, npartitions=2)
In [7]: ddf.apply(makegeom, axis=1).compute()
Out[7]:
0 Point(1 3)
1 Point(2 2)
2 Point(3 1)
Add new series
In either case you can then add the new series to the dataframe
df['geom'] = df[['a', 'b']].apply(makegeom)
Create
If you have CSV data then I would use the dask.dataframe.read_csv function
ddf = dd.read_csv('filenames.*.csv')
If you have other kinds of data then I would use dask.delayed
Solution 2:
In the meantime, I have found other ways (alternative to Dask), in my opinion relatively easier, to perform a function func
in parallel over a pandas data frame. In both cases, I took advantage of the numpy.array_split
method.
One uses a combination of the python multiprocessing.Pool
, numpy.array_split
and pandas.concat
and will work this way:
import numpy as np
def func(array):
# do some computation on the given array
pass
def parallelize_dataframe(df, func, n_cores=72):
df_split = np.array_split(df, n_cores)
pool = Pool(n_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
Another is by using the powerful but simple ray
cluster (which is quite useful if you can run the code over multiple machines):
# connect to a ray cluster
#
import ray
ray.init(address="auto", redis_password="5241590000000000")
import numpy as np
@ray.remote
def func(df):
# do some computation on the given dataframe
pass
df_split = np.array_split(df, 288)
result = pd.concat(ray.get([func.remote(i) for i in df_split]))
The methods above are working quite well for simple methods func
where the computation can be carried out with numpy and the product which is returned can be concatenated back into a pandas data frame - for methods that do simpler file manipulation I also found useful parmap.map
- but that is off-topic for this S.O. question.
Post a Comment for "Read Process And Concatenate Pandas Dataframe In Parallel With Dask"