# import needed modules
import time, random
# define our functions
def inc(x):
time.sleep(random.random())return x + 1
def dec(x):
time.sleep(random.random())return x - 1
def add(x, y):
time.sleep(random.random())return x + y
Simple dask example – Local Cluster
📘 Learning Objectives
- Set up a local Dask cluster
- Learn how to close your cluster
Overview
In this notebook, I set up a local cluster on the same pod (virtual machine) that I spun up. So if I started with a 4CPU, 2 Gb pod then that is all the local cluster has available. In the next example, we will use Dask Gateway which is very different. Dask Gateway spins up new pods (virtual machines) to run your tasks.
Feature | LocalCluster |
DaskGateway |
---|---|---|
Runs in your notebook? | ✅ Yes | ❌ No (runs in new pods) |
Uses multiple pods? | ❌ No | ✅ Yes (scheduler + workers) |
Scales beyond pod? | ❌ No | ✅ Yes |
Can use files in /home | ✅ Yes | ❌ No |
Create functions for our tasks
Run tasks non-parallel and parallel
We will use dask local cluster for parallel work.
%%time
# a sequential example with no parallelization
= []
results for x in range(20):
= inc(x)
result = dec(result)
result
results.append(result)
print(results)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
CPU times: user 1.21 s, sys: 78.5 ms, total: 1.29 s
Wall time: 18.3 s
# set up our dask local cluster for parallel work
from dask.distributed import Client, LocalCluster
# Set up our cluster with default workers and threads
= LocalCluster(processes=False)
cluster cluster
%%time
# Set up a client for work
= cluster.get_client()
client
= []
results for x in range(20):
= client.submit(inc, x)
result = client.submit(dec, result)
result
results.append(result)
= client.gather(results)
results print(results)
client.close()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
CPU times: user 1.14 s, sys: 50.1 ms, total: 1.19 s
Wall time: 6.52 s
# When we are done we can close our dask cluster
cluster.close()
Set up a new local cluster
# Set up a new cluster with default 4 workers and 1 thread per worker
= LocalCluster(n_workers=4, processes=False, threads_per_worker=1)
cluster cluster
%%time
# Set up a client for work
= cluster.get_client()
client
= []
results for x in range(20):
= client.submit(inc, x)
result = client.submit(dec, result)
result
results.append(result)
= client.gather(results)
results print(results)
client.close()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
CPU times: user 831 ms, sys: 39.9 ms, total: 871 ms
Wall time: 5.58 s
# When we are done we can close our dask cluster
cluster.close()
Summary
This shows a simple example of using dask to use all the CPUs in your pod to run tasks. Keep in mind that xarray
automatically uses dask so that it uses all the CPUs. If you are using xarray
you do not need to set up a local dask cluster. You only need to do that for tasks like those shown in this notebook.