Simple dask example

Author

Eli Holmes (NOAA)

# import needed modules
import time, random

# define our functions
def inc(x):
    time.sleep(random.random())
    return x + 1

def dec(x):
    time.sleep(random.random())
    return x - 1

def add(x, y):
    time.sleep(random.random())
    return x + y
%%time

# a sequential example with no parallelization
results = []
for x in range(20):
    result = inc(x)
    result = dec(result)
    results.append(result)

print(results)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
CPU times: user 3.4 ms, sys: 92 µs, total: 3.49 ms
Wall time: 17.4 s
# import dask for parallel work
from dask.distributed import Client, LocalCluster
# Set up our cluster with default workers and threads
cluster = LocalCluster(processes=False)
cluster
%%time
# Set up a client for work
client = cluster.get_client()

results = []
for x in range(20):
    result = client.submit(inc, x)
    result = client.submit(dec, result)
    results.append(result)

results = client.gather(results)
print(results)
client.close()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
CPU times: user 450 ms, sys: 63 ms, total: 513 ms
Wall time: 6.01 s
# When we are done we can close our dask cluster
cluster.close()
# Set up a new cluster with default 4 workers and 1 thread per worker
cluster = LocalCluster(n_workers=4, processes=False, threads_per_worker=1)
cluster
%%time
# Set up a client for work
client = cluster.get_client()

results = []
for x in range(20):
    result = client.submit(inc, x)
    result = client.submit(dec, result)
    results.append(result)

results = client.gather(results)
print(results)
client.close()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
CPU times: user 500 ms, sys: 55 ms, total: 555 ms
Wall time: 4.47 s
# When we are done we can close our dask cluster
cluster.close()