The thing about learning new libraries is that, the first time you're exposed to them, you're probably using them the wrong way. Only after repeatedly reading documentation and trying new ways to use the library do you end up finding newer and (possibly) better ways to do things.

In this case, it's been a while since I talked about dask, so I'd like to revisit how to use dask.

Dask is more than just parallel pandas

It's a very easy mental model to just call Dask a parallelized version of pandas, and that's not incorrect. The dask dataframe has fairly strong parity with many popular pandas dataframe operations. Wherever you might be calling pandas.DataFrame, you could just as easily call dask.DataFrame.

This mental model is really great at hiding any of the underlying implementation details of dask (and dask dataframes), so it's great for beginners.

If we look further, we can add to this mental model as such: Dask operates by distributing (dataframe) operations on small chunks of data (Pandas DataFrames). Now, we understand that Dask is actually orchestrating and scheduling tasks to operate on out-of-memory chunks of data. At this point, we can understand Dask is really a good parallel compute engine, and this might be useful for the grad students submitting serial python code to a cluster that has multiple cores that need to be leveraged.

The parallel processing begins with the "brains", the client

You could do this multiple ways.

First, you can make a dask client within your notebook

from dask.distributed import Client
client = Client()
client
/home/ayang41/mambaforge/envs/ds39/lib/python3.9/site-packages/distributed/node.py:160: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 37001 instead
  warnings.warn(

Client

Client-724d8ad3-c354-11ec-953b-00163e49e059

Connection method: Cluster object Cluster type: LocalCluster
Dashboard: http://127.0.0.1:37001/status

Cluster Info

client.close()

I've actually come to appreciate a separation of client + notebook, where I'll actually use the Dask CLI to create a Client object, then connect to this client within the notebook. This also naturally extends itself to connecting to remote clusters if your tech stack so allows it.

There are two steps to getting this client going:

  • Create the dask scheduler object, in this case I specify the host as just local
    $ dask-scheduler --host localhost
distributed.scheduler - INFO - -----------------------------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:      tcp://127.0.0.1:8786
distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8787
  • In another tab/window, create the dask workers. In this case, I specify how to connect to the scheduler, with specifications on how mnay processes and how many threads per process
    $ dask-worker localhost:8786 --nprocs 2 --nthreads 1
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:34157'
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:35317'
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:40601
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:40601
distributed.worker - INFO -          dashboard at:            127.0.0.1:45819
distributed.worker - INFO - Waiting to connect to:       tcp://localhost:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                 695.88 MiB
distributed.worker - INFO -       Local Directory: /home/ayang41/dask-worker-space/worker-yve3mz6t
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:40317
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:40317
distributed.worker - INFO -          dashboard at:            127.0.0.1:46271
distributed.worker - INFO - Waiting to connect to:       tcp://localhost:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                 695.88 MiB
distributed.worker - INFO -       Local Directory: /home/ayang41/dask-worker-space/worker-n5bnjbki
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:       tcp://localhost:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:       tcp://localhost:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
  • Back on the scheduler, we can see the scheduler has identified two new workers
    distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:40601', name: tcp://127.0.0.1:40601, memory: 0, processing: 0>
    distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:40601
    distributed.core - INFO - Starting established connection
    distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:40317', name: tcp://127.0.0.1:40317, memory: 0, processing: 0>
    distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:40317
    distributed.core - INFO - Starting established connection
client = Client('localhost:8786')
client

Client

Client-74013733-c354-11ec-953b-00163e49e059

Connection method: Direct
Dashboard: http://localhost:8787/status

Scheduler Info

Scheduler

Scheduler-dafee0f7-bfac-4a75-bd8e-a7c993a54ffe

Comm: tcp://127.0.0.1:8786 Workers: 2
Dashboard: http://127.0.0.1:8787/status Total threads: 2
Started: 1 hour ago Total memory: 1.36 GiB

Workers

Worker: tcp://127.0.0.1:40317

Comm: tcp://127.0.0.1:40317 Total threads: 1
Dashboard: http://127.0.0.1:46271/status Memory: 695.88 MiB
Nanny: tcp://127.0.0.1:34157
Local directory: /home/ayang41/dask-worker-space/worker-n5bnjbki
Tasks executing: 0 Tasks in memory: 0
Tasks ready: 0 Tasks in flight: 0
CPU usage: 4.0% Last seen: Just now
Memory usage: 290.43 MiB Spilled bytes: 0 B
Read bytes: 160.31 kiB Write bytes: 160.31 kiB

Worker: tcp://127.0.0.1:40601

Comm: tcp://127.0.0.1:40601 Total threads: 1
Dashboard: http://127.0.0.1:45819/status Memory: 695.88 MiB
Nanny: tcp://127.0.0.1:35317
Local directory: /home/ayang41/dask-worker-space/worker-yve3mz6t
Tasks executing: 0 Tasks in memory: 0
Tasks ready: 0 Tasks in flight: 0
CPU usage: 4.0% Last seen: Just now
Memory usage: 289.03 MiB Spilled bytes: 0 B
Read bytes: 160.24 kiB Write bytes: 160.24 kiB

Miraculously, once you've connected to the client, all subsequent Dask operations will leverage this client

Backtracking a bit, getting the hang of Dask Dataframe operations - groupby operations

For most data analysis purposes, groupby/agg operations will be useful for segmenting your data and understanding data distributions

import dask
import dask.dataframe as dd

# Make some random data
df = dask.datasets.timeseries()

With this random data, we can see that we don't actually "have" the data yet, we have the tasks to create the data

df
Dask DataFrame Structure:
id name x y
npartitions=30
2000-01-01 int64 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask Name: make-timeseries, 30 tasks

Just like Pandas, we can put together some groupby-agg operations. Unlike Pandas, this is a lazy evaluation where the computation hasn't actually happened

some_groupby_agg_operation = df.groupby('name').agg({'x': 'max', 'y': 'mean'})
some_groupby_agg_operation
Dask DataFrame Structure:
x y
npartitions=1
float64 float64
... ...
Dask Name: aggregate-agg, 65 tasks

Visaulizing the tasks, where each color is a different operation, and each row is a dask worker

image

We can also look at how the scheduler is going to execute on these tasks (kind of like the execution plan from spark)

some_groupby_agg_operation.dask.visualize()
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"> -703792975432342871 make-timeseries-#0 1445191817141870676 aggregate-agg-#1 -703792975432342871->1445191817141870676
some_groupby_agg_operation.visualize()

Now we actually crunch the tasks, and we can see the operation happening in the Dask dashboard, where each color corresponds to a particular operation

some_groupby_agg_computed = some_groupby_agg_operation.compute()

And now we've computed the operation, but the actual content isn't relevant here

some_groupby_agg_computed.head()
x y
name
Alice 0.999948 0.002535
Bob 0.999983 0.004007
Charlie 0.999927 -0.000037
Dan 0.999985 0.000333
Edith 0.999993 0.001028

Some apply operations

Apply operations are useful for doing additional computation on the dataframe

df['x2'] = df['x'].apply(lambda val: val**2)
/home/ayang41/mambaforge/envs/ds39/lib/python3.9/site-packages/dask/dataframe/core.py:3623: UserWarning: 
You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('x', 'float64'))

  warnings.warn(meta_warning(meta))

Again, the lazy evaluation means this column is "scheduled" into the dataframe, but not yet computed

df
Dask DataFrame Structure:
id name x y x2
npartitions=30
2000-01-01 int64 object float64 float64 float64
2000-01-02 ... ... ... ... ...
... ... ... ... ... ...
2000-01-30 ... ... ... ... ...
2000-01-31 ... ... ... ... ...
Dask Name: assign, 120 tasks
applied_computation = df.compute()

img

Map partition functions

Map partition operations are great for taking complex Pandas dataframe operations, but "fanning" it out to all the Dask DataFrame partitions. Generally, the refactoring necessary to go from Pandas to a Dask map_partition function can be straightforward.

For example, if you had some large-ish code to act on multiple columns and create multiple columns, you might wrap it into a single function that takes in a Pandas dataframe and returns a Pandas dataframe.

You could also imagine breaking up this function to two functions, one for each new column. Sometimes this adds overhead to the scheduler because you're scheduling two functions instead of one

def my_multi_function(df):
    df = df.assign(
        big_string=df['id'].astype(str) + "-" + df['name'],
        new_value=df['x'] + df['y']
    )
    return df

multi_df = df.map_partitions(my_multi_function)
multi_df
Dask DataFrame Structure:
id name x y x2 big_string new_value
npartitions=30
2000-01-01 int64 object float64 float64 float64 object float64
2000-01-02 ... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
2000-01-30 ... ... ... ... ... ... ...
2000-01-31 ... ... ... ... ... ... ...
Dask Name: my_multi_function, 150 tasks

img

multi_df_computed = multi_df.compute()
multi_df_computed
id name x y x2 big_string new_value
timestamp
2000-01-01 00:00:00 987 Bob 0.873155 0.513619 0.762399 987-Bob 1.386773
2000-01-01 00:00:01 1030 Ray 0.090236 -0.927625 0.008142 1030-Ray -0.837389
2000-01-01 00:00:02 975 Alice -0.286592 0.999372 0.082135 975-Alice 0.712780
2000-01-01 00:00:03 951 Ingrid 0.579439 0.381533 0.335749 951-Ingrid 0.960972
2000-01-01 00:00:04 1035 Oliver -0.751126 0.625851 0.564190 1035-Oliver -0.125275
... ... ... ... ... ... ... ...
2000-01-30 23:59:55 981 Frank 0.165396 0.655417 0.027356 981-Frank 0.820813
2000-01-30 23:59:56 984 Ray -0.933662 0.721310 0.871725 984-Ray -0.212352
2000-01-30 23:59:57 1020 Dan 0.053868 0.147757 0.002902 1020-Dan 0.201624
2000-01-30 23:59:58 1037 Norbert 0.883781 -0.331872 0.781069 1037-Norbert 0.551909
2000-01-30 23:59:59 973 Yvonne 0.733777 0.317511 0.538428 973-Yvonne 1.051287

2592000 rows × 7 columns

Final words on some dask dataframe operations

If you need to beef up your Pandas computations, then it's reasonable to use some Dask functions. In some situations, you can use the Dask functions to boil down your data into an eventual Pandas dataframe, and go about your work.

I also want to suggest an alternative, avoiding this use of Pandas/Dask in the first place. For a lot of data scientists who come from more of a coding/scripting + academic background, falling back to Python libraries is safe and popular because the data tend to be safely stored locally (or stored elsewhere and easy to load/stream into memory). However, I've found many operations can easily be done with a SQL client (sqlworkbench, snowflake, athena, your company's inhouse sql). The beauty here is that the computations with SQL clients are generally offloaded from you measly laptop and closer to your SQL engine, where it's probably even faster to connect/load data, and the basic SQL operations are just as easily optimized and parallelized with the underlying SQL engine.

Dask as general compute

Okay returning to the main point, Dask is a general task scheduler, and in this way, we can deal with a wider variety of objects.

Create a Person object with some properties like a personal value

from random import random

class Person:
    personal_value: float
    name: str
        
    def __init__(self, name: str):
        self.name = name
        self.personal_value = random()
    
    def __repr__(self):
        return f"<{self.name}, {self.personal_value}>"
        

Generate some names, create Person objects

all_names = df['name'].unique().compute()
list_of_people = [
    Person(name)
    for name in all_names
]
list_of_people
[<Bob, 0.42504889952972125>,
 <Ray, 0.140133244820701>,
 <Alice, 0.8950808612559186>,
 <Ingrid, 0.4878482307432528>,
 <Oliver, 0.03795210374666336>,
 <Sarah, 0.8064995691446044>,
 <Tim, 0.7810972931183621>,
 <Hannah, 0.07092572040972078>,
 <Laura, 0.9014321190822749>,
 <Ursula, 0.16321344422412154>,
 <Zelda, 0.1788424722282269>,
 <Xavier, 0.9303267823294392>,
 <Edith, 0.8930653869581339>,
 <Charlie, 0.6534186030857254>,
 <George, 0.8785832935565809>,
 <Kevin, 0.21880064681626354>,
 <Patricia, 0.27406092333203325>,
 <Wendy, 0.6017376251772102>,
 <Victor, 0.13094904715957834>,
 <Michael, 0.15156561575725258>,
 <Norbert, 0.6410888507405104>,
 <Quinn, 0.691469191035896>,
 <Yvonne, 0.5232511896527469>,
 <Dan, 0.8860489448170724>,
 <Jerry, 0.19627008484839015>,
 <Frank, 0.1600804719921317>]

Simple function we're going to perform, take two Person objects and add their values

def add_two_people(a: Person, b:Person) -> float:
    return a.personal_value + b.personal_value

Use dask delayed to create a bunch of tasks we want to perform

import itertools as it
from dask import delayed

delayed_operations = [
    delayed(add_two_people)(person1, person2)
    for person1, person2 in it.permutations(list_of_people, 2)
]

We have a list of delayed operations. These add_two_people functions haven't yet been computed, but we've created the tasks in which we compute them

delayed_operations[:10]
[Delayed('add_two_people-32a8a338-87f1-4e24-871f-dcda7eecd2d7'),
 Delayed('add_two_people-343d91ad-c084-4d44-ae58-58ec82fb5a08'),
 Delayed('add_two_people-e15fce2e-01bd-4bd9-b44a-b384ddd040da'),
 Delayed('add_two_people-81908552-7d25-4fc3-a90a-63c4c59dcb98'),
 Delayed('add_two_people-ee6a51b7-d849-43a2-86dd-beb6fba88c23'),
 Delayed('add_two_people-3d733471-6ba9-4117-9d15-a7f043b2746f'),
 Delayed('add_two_people-30cd466b-982e-4f22-bdf7-0ccb626a49d7'),
 Delayed('add_two_people-aa8142fb-62a1-4f41-9bf5-a5064a934b9a'),
 Delayed('add_two_people-2d911d3d-9384-46ed-83ac-42b90bb7e61b'),
 Delayed('add_two_people-448909df-f4d5-4631-bd91-ca0b223133cf')]

Visualizing this is pretty simple because we're only doing one operation

dask.visualize(delayed_operations[:10])

Actually compute the operations

added_operation = client.compute(delayed_operations)

image.png

Each computed operation is a Future object, from which we need to pull the real result

added_operation[:5]
[<Future: pending, key: add_two_people-32a8a338-87f1-4e24-871f-dcda7eecd2d7>,
 <Future: pending, key: add_two_people-343d91ad-c084-4d44-ae58-58ec82fb5a08>,
 <Future: pending, key: add_two_people-e15fce2e-01bd-4bd9-b44a-b384ddd040da>,
 <Future: pending, key: add_two_people-81908552-7d25-4fc3-a90a-63c4c59dcb98>,
 <Future: pending, key: add_two_people-ee6a51b7-d849-43a2-86dd-beb6fba88c23>]
added_operation[0].result()
0.5651821443504222

Scatter: When you have large data you want to pass around more efficiently

Sometimes large objects are a little cumbersome for dask to continually serialize and move around.

Before you schedule the operation, you can scatter an object beforehand, simplifying dask's execution

scattered_list_of_people = client.scatter(list_of_people)
scattered_list_of_people
[<Future: finished, type: __main__.Person, key: Person-5645ffd00a222804a0c9df1b93446288>,
 <Future: finished, type: __main__.Person, key: Person-0a670c0b0724f0b995d3da1391d30c09>,
 <Future: finished, type: __main__.Person, key: Person-77b61f5989a2964580e5903719724614>,
 <Future: finished, type: __main__.Person, key: Person-66dd5684642148ad319f525e45ae8230>,
 <Future: finished, type: __main__.Person, key: Person-81901dc51f9ec65d485f36361f017f71>,
 <Future: finished, type: __main__.Person, key: Person-fd25f5b2bd807c92261bc5d5073457e7>,
 <Future: finished, type: __main__.Person, key: Person-d49c166aff94341da9a389a6866182f2>,
 <Future: finished, type: __main__.Person, key: Person-25b332aa451bdc6870044f857a033ef6>,
 <Future: finished, type: __main__.Person, key: Person-aa4c0dd5041143619b422305dde782dd>,
 <Future: finished, type: __main__.Person, key: Person-182dff16000f2450571662ebcd35f9ae>,
 <Future: finished, type: __main__.Person, key: Person-b4a76162a982e213a8b6f6068f1b5255>,
 <Future: finished, type: __main__.Person, key: Person-f2f9eae442b80a7c197d334b511bf102>,
 <Future: finished, type: __main__.Person, key: Person-f7a917bf050c4c8ab70a95074b347aec>,
 <Future: finished, type: __main__.Person, key: Person-9bd561d2fac80264ecebe3745750375c>,
 <Future: finished, type: __main__.Person, key: Person-9d012ca5c6e668d169891c9d415fae79>,
 <Future: finished, type: __main__.Person, key: Person-31f91e1366daa424f5dc3380d1012ab7>,
 <Future: finished, type: __main__.Person, key: Person-9d70de96c44faa8af5df262fb1e87576>,
 <Future: finished, type: __main__.Person, key: Person-c464fdafbb3262fe98d7d5285ec68072>,
 <Future: finished, type: __main__.Person, key: Person-6bffe9cd1259b3710a1e6350900a69c0>,
 <Future: finished, type: __main__.Person, key: Person-6d7cd0725d7828aaf4ca730a0db0ece1>,
 <Future: finished, type: __main__.Person, key: Person-75cae09584cf355da2dff081dfb56441>,
 <Future: finished, type: __main__.Person, key: Person-9c0a085e1147504cce42ae4c618a1958>,
 <Future: finished, type: __main__.Person, key: Person-c925e6386486b43d254e03d0a0b9c568>,
 <Future: finished, type: __main__.Person, key: Person-ce3f69edf73a5f89b11b71f28bcb6656>,
 <Future: finished, type: __main__.Person, key: Person-6b4690c383a577e4e6514082cdd450d5>,
 <Future: finished, type: __main__.Person, key: Person-eebb29c02ca33f5e946fca31580ffb0e>]

New operation is just to add an arbitrary int to a Person's value

def add_a_value(p: Person, v: int):
    return p.personal_value + v

The syntax is the same, but the nuance here is we're passing these scattered Person objects

delayed_operations = [
    delayed(add_a_value)(scattered_person, i)
    for i, scattered_person in enumerate(scattered_list_of_people)
]

The dask graph still looks very similar as before, we're just doing a single task over and over, on some different input data

dask.visualize(delayed_operations[:10])

And we can use our client to compute the operations as before

computed_add_operation = client.compute(delayed_operations)

The task stream shows the scattered Person objects in addition to the add_a_value functions

img

computed_add_operation[0].result()
0.42504889952972125

A more applied example, let's say you have a model you want to do some parallel predictions on

from sklearn.linear_model import LinearRegression
import numpy as np

X = np.random.random(100)
y = np.random.random(100)
my_model = LinearRegression().fit(X.reshape(-1,1), y)
X_new = np.random.random(10)

Again, define a function, and use the delayed wrapper to schedule a bunch of operations

def run_prediction(model, x):
    return model.predict(np.array(x).reshape(-1,1)) # Doing some array-reshaping to fit required predict formats

my_model_scattered = client.scatter(my_model)
lots_of_predictions = [
    delayed(run_prediction)(my_model_scattered, val) 
    for val in X_new
]

Then compute the operations

predictions = client.compute(lots_of_predictions)
predictions[0].result()
array([0.50623046])

Parting thoughts, Dask can be used as multi-processing engine

While dask.DataFrame has a lot of parity with pandas.DataFrame such that you can supercharge your pandas-code, you can also use Dask to handle arbitrary python objects. In doing so, you open a lot of additional code up for parallelization. With operations like scatter, you can also observe some parallelization optimizations that you might not observe from using something built-in like concurrent.futures

Because python code is generally serial in operation, if you submit python code to a cluster, you might not be successfully leveraging all the the cores available. The use of dask, delayed, and even scatter will help add some straightforward parallelization