An update and proposal on continuing development of Dask and yt.
Dask and yt: a pre-YTEP
Table of Contents
yt and Dask: an overview
In the past months, I’ve been investigating and working on integrating Dask into the yt codebase. This document provides an overview of my efforts to date but also is meant as a a preliminary YTEP (or pYTEP?) to solicit feedback from the yt community at an early stage before getting to far into the weeds of refactoring.
So in general, Dask provides a flexible framework for managing computations across chunks objects (stored in serial on a single processor or in parallel across workers). The yt operations that could potentially be simplified are any of the operations that rely on the chunking protocol such as data IO, calculating derived quantities, calculating profiles, sampling data (slices, projections) and more. Furthermore, allowing yt to return a dask.array
object to the user would allow the user to create their own parallel workflows more easily.
Before diving in, it’s worth discussing the interplay between Dask and the existing MPI architecture within yt. Dask itself provides mpi management via the dask-mpi package so from a user perspective, anyone already using yt and MPI should see minimal disruption to their workflows.
experiments in daskifying yt
Thus far, my efforts have focused on developing a series of experiments demonstrating yt + Dask integration at different levels withint yt covering using dask to read data off of disk, constructing a daskified version of a non-trivial and parallel yt calculation (profiles) and an initial prototype for adding dask
functionality to unyt
arrays. Each of these subjects has a detailed description at the following links:
I encourage you to check out the detailed descriptions, but I’ll provide a short summary here before describing some general takewaways and then proposing a plan for moving development into the yt pipeline.
1. (particle) data IO (link)
In this experiment, I re-wrote the BaseIOHandler._read_particle_selection()
function (in yt.utilities.io_handler
) to use dask to read in particle data from a Gadget dataset. The implementation iterates over the dataset chunks to build a list of dask.delayed
objects.
delayed_chunks = [
dask.delayed(self._read_single_ptype)(
ch, this_ptf, selector, ptype_meta[ptype]
) for ch in chunks
]
The main challenges here were related to dask communication. The first is that dask uses pickle to serialize and distribute objects to different workers, so any arguments to delayed functions must be pickleable. So in order to implement this, I had to add some pickling methods for the base selector
objects and slightly modify the underlying ParticleContainer
class (that gets stored in each chunk) so that the dataset index is not needlessly rebuilt when unpickling.
The second communication related issue is that when yt pickles a DataSet
object, the hash values are stored in an in-memory cache by default, which is not accessible to the various Dask workers when working in parallel. In the IO prototype, I simply switched to using the on-disk hash storage, but it may be worth considering more direct memory management with Dask, perhaps creating a shared dask context to distribute certain objects across workers.
2. profile calculation (link)
In this experiment, I focused on refactoring a task that leverages chunked data: calculating profiles. I first attempted to write a pure dask version of calculating a binned statistic equivalent to a yt 1D profile but performance wasn’t great and it wasn’t clear how to generalize the code. So instead I focused on building a delayed workflow that directly uses yt‘s optimized 1d binning function, yt.utilities.lib.misc_utilities.new_binprofile1d
. This approach can easily be extended across yt where we are performing collections and reductions across chunks. The modifications to the code would also be fairly minimal – mostly replacing MPI gathering operations with iterations over delayed dask objects (reminder: you would still be able to use MPI as normal, it’s just that dask would handle the MPI communications behind the scenes).
3. dask-unyt arrays (link)
In order to leverage dask wherever chunks are used, we need to be able to return dask arrays from the IO functions. In yt, however, our base arrays are unyt_array
objects. So in this experiment, I built a rough dask-unyt
array prototype. The basic approach was to create a new unyt
class that is subclassed off of the base dask Array
object (dask.array.core.Array
) that behaves as a dask Array
but carries units alongside in hidden unyt
attributes. Since the initial attempt, I’ve started an improved implementation that does a better job of minimizing code duplication (hopefully a PR to unyt soon).
data IO complexity
Finally, it is worth noting that the work here, particularly in the above section on the daskified particle reader, is closely related to Matt Turk’s thoughts on frontend refactoring (Part 1, Part 2, Part 3). While his posts do not mention dask, there are some synergies with the present work. In refactoring to leverage dask, we should considers ways to simplify frontend development.
Development plan
Now that I’ve worked through some isolated experiments in daskifying parts of yt, it makes sense to get a wider range of folks involved. Towards that end, I’m proposing the following work plan:
- Stage 0: initial input from the yt community <—– We are Here
- Stage 1: move development to the yt pipeline
- Stage 2: particle dataset IO
- Stage 3: chunk operations on delayed arrays
- Stage 4: non-particle datasets (and more)
Stage 0: initial input from the yt community
This is the current stage. Do you love/hate any/all of this? Send me your ideas, thoughts, fears and hopes for yt + Dask! You can email me (chavlin@illinois.edu) or come and discuss on the yt slack channel.
Stage 1: move development to the yt pipeline (branch logistics)
So far, my development has mainly proceeded as standalone notebooks and modules in the DXL yt-dask-experiments repository. But in order to start fully devloping these new features, we need to move development into the yt pipeline. Given that these changes will take some time and will likely temporarily break many things, we need to isolate yt-Dask development from the main yt development . Towards that end, we can create a new dask_yt
development branch, after which development would proceed via:
- dask-specific PRs: these are PRs submitted directly to the
dask_yt
branch. They may introduce breaking changes. - “neutral” PRs: these are PRs that make non-breaking changes that are independent of dask and are submitted to yt‘s
master
branch as normal PRs.
Occasionally, we merge yt master
into dask_yt
as neutral changes are merged into master
(and as normal yt development occurs).
Stage 1 Tasks & Follow Up:
- create the new
dask_yt
development branch.
Stage 2: particle dataset IO
The simplest place to start in actual refactoring is to implement a modified prototype particle reader within yt proper. While it will use dask to read the chunks, it can simply return expected in-memory dict with data and will not break anything.
Stage 2 Tasks & Follow Up:
- implement/copy the prototype
_read_particle_fields()
method - consider
dask.array
vsdask.dataframe
usage (at present the protopye usesdask.dataframe
for the initial read to avoid having to know the number of particles a priori) - consider the initial chunk creation – can we use Dask here initialy instead of the
chunk
iterator object?
Stage 3: chunk operations on delayed arrays
Once we have a daskified particle reader in place, we need to add the option to return the data as delayed dask arrays. Once in place, we can refactor many of the operations that use the chunk
iterator object. The main obstacle to this, besides refactoring any of the operations that use the chunks, is the fact that the arrays returned by _read_particle_fields
are converted to unyt_arrays
, so the first step in this stage is completing the dask-unyt_array
implementation:
Stage 3 Tasks & Follow Up:
- implement
dask-unyt_array
class (as upstream contribution tounyt
, in progress here) - add a
return_dask
argument to return dask arrays when reading - refactor the simpler routines that use the
chunk
iteration (derived quantities and profile calculations) to use the dask arrays (following the profile calculation experiment). - start conducting performance tests for the new daskified routines. Compare computation times and memory usage to both serial and MPI-parallel equivalanets on yt
master
branch.
Stage 4: non-particle datasets (and more)
Once we have working IO for particle datasets, the ability to return dask arrays, and some parallel operations succesfully using the dask arrays, the development path becomes a bit broader. Work could start on gridded datasets or some of the other yt operations that leverage chunks could be daskified.
existing PRs
several small related PRs that would qualify as “neutral changes” in the above context already exist: 2416, 2934, 2954.
related links and references
- The dxl repo home to the experiments describe above
- RHytHM2020 talk on Leveraging Dask in yt
- An earlier overview of my yt + Dask efforts
- Matt Turk’s thoughts of frontend refactoring: Part 1, Part 2, Part 3.