Simple dask example – Local Cluster

Author

Eli Holmes (NOAA)

Colab Badge JupyterHub Badge Download Badge

📘 Learning Objectives

  1. Set up a local Dask cluster
  2. Learn how to close your cluster

Overview

In this notebook, I set up a local cluster on the same pod (virtual machine) that I spun up. So if I started with a 4CPU, 2 Gb pod then that is all the local cluster has available. In the next example, we will use Dask Gateway which is very different. Dask Gateway spins up new pods (virtual machines) to run your tasks.

Feature LocalCluster DaskGateway
Runs in your notebook? ✅ Yes ❌ No (runs in new pods)
Uses multiple pods? ❌ No ✅ Yes (scheduler + workers)
Scales beyond pod? ❌ No ✅ Yes
Can use files in /home ✅ Yes ❌ No

Create functions for our tasks

# import needed modules
import time, random

# define our functions
def inc(x):
    time.sleep(random.random())
    return x + 1

def dec(x):
    time.sleep(random.random())
    return x - 1

def add(x, y):
    time.sleep(random.random())
    return x + y

Run tasks non-parallel and parallel

We will use dask local cluster for parallel work.

%%time

# a sequential example with no parallelization
results = []
for x in range(20):
    result = inc(x)
    result = dec(result)
    results.append(result)

print(results)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
CPU times: user 1.21 s, sys: 78.5 ms, total: 1.29 s
Wall time: 18.3 s
# set up our dask local cluster for parallel work
from dask.distributed import Client, LocalCluster
# Set up our cluster with default workers and threads
cluster = LocalCluster(processes=False)
cluster
%%time
# Set up a client for work
client = cluster.get_client()

results = []
for x in range(20):
    result = client.submit(inc, x)
    result = client.submit(dec, result)
    results.append(result)

results = client.gather(results)
print(results)
client.close()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
CPU times: user 1.14 s, sys: 50.1 ms, total: 1.19 s
Wall time: 6.52 s
# When we are done we can close our dask cluster
cluster.close()

Set up a new local cluster

# Set up a new cluster with default 4 workers and 1 thread per worker
cluster = LocalCluster(n_workers=4, processes=False, threads_per_worker=1)
cluster
%%time
# Set up a client for work
client = cluster.get_client()

results = []
for x in range(20):
    result = client.submit(inc, x)
    result = client.submit(dec, result)
    results.append(result)

results = client.gather(results)
print(results)
client.close()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
CPU times: user 831 ms, sys: 39.9 ms, total: 871 ms
Wall time: 5.58 s
# When we are done we can close our dask cluster
cluster.close()

Summary

This shows a simple example of using dask to use all the CPUs in your pod to run tasks. Keep in mind that xarray automatically uses dask so that it uses all the CPUs. If you are using xarray you do not need to set up a local dask cluster. You only need to do that for tasks like those shown in this notebook.