About
This package applies parallel-in-time integration, specifically Parareal, to OpenFOAM. We have two example cases: flow around a cylinder and laminar flow through a pipe.

Installing
To install additional requirements, have OpenFOAM installed and run:
poetry install
Architecture
From the perspective of the Parareal algorithm we are solving an ODE.
Implementation
The abstract Vector
, defined below, represents any single state in the simulation. In OpenFOAM we have the following folder structure:
├── 0
│ ├── p
│ └── U
├── 1
│ └── <... data fields ...>
├── <... time directories ...>
├── constant
│ ├── transportProperties
│ └── turbulenceProperties
└── system
├── blockMeshDict
├── controlDict
├── decomposeParDict
├── fvSchemes
└── fvSolution
For our application a Vector
is then a combination of an OpenFOAM case (i.e. the folder structure above), and a string denoting the time directory matching the referred snapshot. The directory structure containing only the 0
time is now the BaseCase
. We can copy a Vector
by copying the contents of the BaseCase
and the single time directory that belongs to that Vector
.
file:pintFoam/vector.py
from __future__ import annotations
import operator
import mmap
import weakref
import gc
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from uuid import uuid4
from shutil import copytree, rmtree # , copy
from typing import List, Optional
from byteparsing import parse_bytes, foam_file
from PyFoam.RunDictionary.ParsedParameterFile import ParsedParameterFile # type: ignore
from PyFoam.RunDictionary.SolutionDirectory import SolutionDirectory # type: ignore
<<base-case>>
<<pintfoam-vector>>
Base case
We will operate on a Vector
the same way everything is done in OpenFOAM, that is:
- Copy-paste a case (known as the base case)
- Edit the copy with new simulation parameters
- Run the simulation
This is why for every Vector
we define a BaseCase
that is used to generate new vectors. The BaseCase
should have only one time directory containing the initial conditions, namely 0
. The simulation generates new folders containing the data corresponding to different times. The time is coded, somewhat uncomfortably, in the directory name (0.01
, 0.02
, and so on), which is why we store the time coordinate as a string.
The class Vector
takes care of all those details.
«base-case»
@dataclass
class BaseCase:
"""Base case is a cleaned version of the system. If it contains any fields,
it will only be the `0` time. Any field that is copied for manipulation will
do so on top of an available base case in the `0` slot."""
root: Pathstr
case: str]] = None
fields: Optional[List[
@property
def path(self):
return self.root / self.case
def new_vector(self, name: Optional[str] = None):
"""Creates new `Vector` using this base case."""
= name or uuid4().hex
new_case = self.root / new_case
new_path if not new_path.exists():
self.path, new_path)
copytree(return Vector(self, new_case, "0")
def all_vector_paths(self):
"""Iterates all sub-directories in the root."""
return (x for x in self.root.iterdir()
if x.is_dir() and x.name != self.case)
def clean(self):
"""Deletes all vectors of this base-case."""
for path in self.all_vector_paths():
rmtree(path)
In our implementation, if no name is given to a new vector, a random one is generated.
Retrieving files and time directories
Note that the BaseCase
has a property path
. The same property will be defined in Vector
. We can use this common property to retrieve a SolutionDirectory
, ParameterFile
or TimeDirectory
.
- These are PyFoam routines that may need to be replaced
«pintfoam-vector»
def solution_directory(case):
return SolutionDirectory(case.path)
def parameter_file(case, relative_path):
return ParsedParameterFile(case.path / relative_path)
def time_directory(case):
return solution_directory(case)[case.time]
def get_times(path):
"""Get all the snapshots in a case, sorted on floating point value."""
def isfloat(s: str) -> bool:
try:
float(s)
return True
except ValueError:
return False
return sorted(
for s in path.iterdir() if isfloat(s.name)],
[s.name =float) key
Vector
The Vector
class stores a reference to the BaseCase
, a case name and a time.
«pintfoam-vector»
@dataclass
class Vector:
base: BaseCasestr
case: str
time:
<<pintfoam-vector-properties>>
<<pintfoam-vector-clone>>
<<pintfoam-vector-operate>>
<<pintfoam-vector-operators>>
From a vector we can extract a file path pointing to the specified time slot, list the containing files and read out internalField
from any of those files.
«pintfoam-vector-properties»
@property
def path(self):
"""Case path, i.e. the path containing `system`, `constant` and snapshots."""
return self.base.root / self.case
@property
def fields(self):
"""All fields relevant to this base case."""
return self.base.fields
@property
def dirname(self):
"""The path of this snapshot."""
return self.path / self.time
def all_times(self):
"""Get all available times, in order."""
return [Vector(self.base, self.case, t)
for t in get_times(self.path)]
We do arithmetic on Vector
by cloning an existing Vector
and then modify the internalField
values inside. This can be done very efficiently using memory-mapped array access. The mmap_data
member takes care of loading the data and closing the file when we’re done with it in a nifty context manager.
«pintfoam-vector-properties»
@contextmanager
def mmap_data(self, field):
"""Context manager that yields a **mutable** reference to the data contained
in this snapshot. Mutations done to this array are mmapped to the disk directly."""
= (self.dirname / field).open(mode="r+b")
f with mmap.mmap(f.fileno(), 0) as mm:
= parse_bytes(foam_file, mm)
content try:
= content["data"]["internalField"]
result except KeyError as e:
print(content)
raise e
yield weakref.ref(result)
del result
del content
gc.collect()
We clone a vector by creating a new vector and copying the internal fields.
«pintfoam-vector-clone»
def clone(self, name: Optional[str] = None) -> Vector:
"""Clone this vector to a new one. The clone only contains this single snapshot."""
= self.base.new_vector(name)
x = self.time
x.time =True)
rmtree(x.dirname, ignore_errorsself.dirname, x.dirname)
copytree(return x
In order to apply the parareal algorithm to our vectors (or indeed, any other algorithm worth that name), we need to define how to operate with them. Particularly, we’ll need to implement:
- Vector addition and subtraction (as we’ll need to sum and subtract the results of applying the coarse and fine integrators)
- Vector scaling (as the integrators involve scaling with the inverse of the step)
In order to achieve this, first we’ll write generic recipes for any operation between vectors and any operation between a scalar and a vector:
«pintfoam-vector-operate»
def zip_with(self, other: Vector, op) -> Vector:
= self.clone()
x
for f in self.fields:
with x.mmap_data(f) as a, other.mmap_data(f) as b:
= op(a(), b())
a()[:] return x
def map(self, f) -> Vector:
= self.clone()
x
for f in self.fields:
with x.mmap_data(f) as a:
= f(a())
a()[:] return x
We now have the tools to define vector addition, subtraction and scaling.
«pintfoam-vector-operators»
def __sub__(self, other: Vector) -> Vector:
return self.zip_with(other, operator.sub)
def __add__(self, other: Vector) -> Vector:
return self.zip_with(other, operator.add)
def __mul__(self, scale: float) -> Vector:
return self.map(lambda x: x * scale)
In the code chunk above we used the so-called magic methods. If we use a minus sign to subtract two vectors, the method __sub__
is being executed under the hood.
OpenFOAM calls
file:pintFoam/foam.py
import subprocess
import math
from typing import Optional, Union
from .vector import (BaseCase, Vector, parameter_file, get_times)
<<pintfoam-map-fields>>
<<pintfoam-set-fields>>
<<pintfoam-block-mesh>>
<<pintfoam-epsilon>>
<<pintfoam-solution>>
setFields
utility
We may want to call setFields
on our Vector
to setup some test cases.
«pintfoam-set-fields»
def set_fields(v, *, default_field_values, regions):
"""Wrapper for OpenFOAM's setFields."""
= parameter_file(v, "system/setFieldsDict")
x 'defaultFieldValues'] = default_field_values
x['regions'] = regions
x[
x.writeFile()"setFields", cwd=v.path, check=True) subprocess.run(
mapFields
The mapFields
utility interpolates a field from one mesh onto another. The resulting field values are written to the 0
time directory, so we need to rename that directory after calling mapFields
for consistency with the Vector
infrastrucutre.
«pintfoam-map-fields»
def map_fields(source: Vector, target: BaseCase, consistent=True, map_method=None) -> Vector:
"""Wrapper for OpenFOAM's mapFields
Use consistent=False if the initial and final boundaries differ.
Valid arguments for `map_method`: mapNearest, interpolate, cellPointInterpolate
"""
= target.new_vector()
result = source.time
result.time = ["mapFields"]
arg_lst if consistent:
"-consistent")
arg_lst.append(if map_method is not None:
"-mapMethod", map_method])
arg_lst.extend(["-sourceTime", source.time, source.path.resolve()])
arg_lst.extend([=result.path, check=True)
subprocess.run(arg_lst, cwd/ "0").rename(result.dirname)
(result.path return result
blockMesh
The blockMesh
utility generates an OpenFOAM mesh from a description in the blockMesh
format. This is usually called on a baseCase
so that the mesh information is shared by all vectors.
«pintfoam-block-mesh»
def block_mesh(case: BaseCase):
"""Wrapper for OpenFOAM's blockMesh."""
"blockMesh", cwd=case.path, check=True) subprocess.run(
Implementation of Solution
Remember, the definition of a Solution
,
= Callable[[Vector, float, float], Vector] Solution
meaning, we write a function taking a current state Vector
, the time now, and the target time, returning a new Vector
for the target time.
The solver will write directories with floating-point valued names. This is a very bad idea by the folks at OpenFOAM, but it is one we’ll have to live with. Suppose you have a time-step of \(0.1\), what will be the names of the directories if you integrate from \(0\) to \(0.5\)?
* 0.1 for x in range(6)] [x
In Python 3.7, this gives [0.0, 0.1, 0.2, 0.30000000000000004, 0.4, 0.5]
. Surely, if you give a time-step of \(0.1\) to OpenFOAM, it will create a directory named 0.3
instead. We’ll define the constant epsilon
to aid us in identifying the correct state directory given a floating-point time.
«pintfoam-epsilon»
= 1e-6 epsilon
Our solution depends on the solver chosen and the given time-step:
«pintfoam-solution»
def foam(solver: str, dt: float, x: Vector, t_0: float, t_1: float,
float] = None,
write_interval: Optional[str] = None,
job_name: Optional[str = "runTime") -> Vector:
write_control: """Call an OpenFOAM code.
Args:
solver: The name of the solver (e.g. "icoFoam", "scalarTransportFoam" etc.)
dt: deltaT parameter
x: initial state
t_0: startTime (should match that in initial state)
t_1: endTime
write_interval: if not given, this is computed so that only the endTime
is written.
Returns:
The `Vector` representing the end state.
"""
<<pintfoam-solution-function>>
The solver clones a new vector, sets the controlDict
, runs the solver and then creates a new vector representing the last time slice.
«pintfoam-solution-function»
assert abs(float(x.time) - t_0) < epsilon, f"Times should match: {t_0} != {x.time}."
= x.clone(job_name)
y = write_interval or (t_1 - t_0)
write_interval <<set-control-dict>>
<<run-solver>>
<<return-result>>
controlDict
Because writing the controlDict
sometimes fails, we try it a few times. For this we need to create a backup of the original contents of controlDict
.
«set-control-dict»
= open(y.path / "system" / "controlDict", "r").read()
backup for i in range(5): # this sometimes fails, so we try a few times, maybe disk sync issue?
try:
print(f"Attempt {i+1} at writing controlDict")
= parameter_file(y, "system/controlDict")
controlDict 'startFrom'] = "latestTime"
controlDict.content['startTime'] = float(t_0)
controlDict.content['endTime'] = float(t_1)
controlDict.content['deltaT'] = float(dt)
controlDict.content['writeInterval'] = float(write_interval)
controlDict.content['writeControl'] = write_control
controlDict.content[
controlDict.writeFile()break
except Exception as e:
= e
exception open(y.path / "system" / "controlDict", "w").write(backup)
else:
raise exception
Run solver
«run-solver»
with open(y.path / "log.stdout", "w") as logfile, \
open(y.path / "log.stderr", "w") as errfile:
=y.path, check=True, stdout=logfile, stderr=errfile) subprocess.run(solver, cwd
Return result
We retrieve the time of the result by looking at the last time directory.
«return-result»
= get_times(y.path)[-1]
t1_str return Vector(y.base, y.case, t1_str)
Appendix A: Utils
file:pintFoam/utils.py
<<push-dir>>
<<job-names>>
Cleaning up
file:pintFoam/clean.py
import argh # type:ignore
from pathlib import Path
from .vector import BaseCase
@argh.arg("target", help="target path to clean")
@argh.arg("--base_case", help="name of the base-case")
def main(target: Path, base_case: str = "baseCase"):
"""Auxiliary function that deletes all vectors of this base-case."""
BaseCase(Path(target), base_case).clean()
if __name__ == "__main__":
argh.dispatch_command(main)
pushd
I haven’t been able (with simple attempts) to run a case outside the definition directory. Similar to the pushd
bash command, I define a little utility in Python:
«push-dir»
import os
from pathlib import Path
from contextlib import contextmanager
from typing import Union
import functools
from math import (floor, log10)
def decorator(f):
"""Creates a parametric decorator from a function. The resulting decorator
will optionally take keyword arguments."""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
if args and len(args) == 1:
return f(*args, **kwargs)
if args:
raise TypeError(
"This decorator only accepts extra keyword arguments.")
return lambda g: f(g, **kwargs)
return decorated_function
@contextmanager
def pushd(path: Union[str, Path]):
"""Context manager to change directory to given path,
and get back to current dir at exit."""
= Path.cwd()
prev
os.chdir(path)
try:
yield
finally:
os.chdir(prev)
Job names
«job-names»
def generate_job_name(n, t_0, t_1, uid, id, tlength=4):
""" Auxiliary function to generate a job name."""
def integrify(t, length=tlength):
""" Auxiliary function for converting a float into an integer. """
if t==0:
return 0
else:
= t * 10 ** -floor(log10(t)) # Remove trailing zeros
aux = aux * 10 ** (length - 1) # Displace the decimal point to the right
aux return int(aux)
def stringify(t, length=tlength):
if integrify(t) == 0:
return "0" * length
else:
return str(integrify(t))
return f"{n}-{stringify(t_0)}-{stringify(t_1)}-{id}-{uid.hex}"
Example using MPI and Dask Futures
file:examples/mpi_futures.py
from __future__ import annotations
import argh # type: ignore
import numpy as np
# import numpy.typing as npt
from pathlib import Path
from dataclasses import dataclass, field
from typing import (Union, Callable, Optional, Any, Iterator)
from abc import (ABC, abstractmethod)
<<example-mpi-imports>>
= 1.0
OMEGA0 = 0.1
ZETA = 0.001
H
<<vector-expressions>>
<<example-mpi-coarse>>
<<example-mpi-fine>>
<<example-mpi-history>>
def get_data(files: list[Path]) -> Iterator[np.ndarray]:
for n in files:
with h5.File(n, "r") as f:
yield f["data"][:]
def combine_fine_data(files: list[Path]) -> np.ndarray:
= get_data(files)
data = next(data)
first return np.concatenate([first] + [x[1:] for x in data], axis=0)
# def list_files(path: Path) -> list[Path]:
# all_files = path.glob("*.h5")
# return []
def main(log: str = "WARNING", log_file: Optional[str] = None):
"""Run model of dampened hormonic oscillator in Dask"""
= getattr(logging, log.upper(), None)
log_level if not isinstance(log_level, int):
raise ValueError(f"Invalid log level `{log}`")
=log_level, filename=log_file)
logging.basicConfig(level<<example-mpi-main>>
if __name__ == "__main__":
argh.dispatch_command(main)
There are two modes in which we may run Dask with MPI. One with a dask-mpi
running as external scheduler, the other running everything as a single script. For this example we opt for the second, straight from the dask-mpi documentation:
«example-mpi-imports»
from dask_mpi import initialize # type: ignore
from dask.distributed import Client # type: ignore
# from mpi4py import MPI
«example-mpi-main»
initialize()= Client() client
Vector Arithmetic Expressions
Best practices
The following shows some best practices when working with orchestrated computations. One is about using well established data standards, the other about reducing overhead on the distributed scheduler. We can solve both these issues by abstracting over the representation of the state vector in our system. This technique is definitely overkill for our harmonic oscillator example, but it is also in general a good recipe for running Numpy based simulations in an organized manner.
Abstract vectors
It may be convenient to treat our Vector
operations such that they are only performed once their output is needed. That way, we only need to schedule the actual integration steps as external jobs. In the meantime we have to store the arithmetic in a serializable Expression
value. By doing this we reduce the amount of jobs that have to be handled by the scheduler.
We will be using functools.partial
and functions operator.add
, operator.mul
etc, to create a data structure that describes all the operations that we might do on a Vector
. Results may be stored for reference in a hdf5
file, a feature that can also be hidden behind our Vector
interface.
«example-mpi-imports»
import operator
from functools import partial
import h5py as h5 # type: ignore
We create a Vector
class that satisfies the Vector
concept outlined earlier. We store the operations in terms of unary and binary operators.
«vector-expressions»
class Vector(ABC):
@abstractmethod
def reduce(self: Vector) -> np.ndarray:
pass
def __add__(self, other):
return BinaryExpr(operator.add, self, other)
def __sub__(self, other):
return BinaryExpr(operator.sub, self, other)
def __mul__(self, scale):
return UnaryExpr(partial(operator.mul, scale), self)
def __rmul__(self, scale):
return UnaryExpr(partial(operator.mul, scale), self)
The Vector
class acts as a base class for the implementation of BinaryExpr
and UnaryExpr
, so that we can nest expressions accordingly. To force computation of a Vector
, we supply the reduce_expr
function that, in an example of terrible duck-typing, calls the reduce
method recursively, until an object is reached that doesn’t have the reduce
method.
«vector-expressions»
def reduce_expr(expr: Union[np.ndarray, Vector]) -> np.ndarray:
if isinstance(expr, Vector):
return expr.reduce()
else:
return expr
HDF5 Vectors
This means we can also hide variables that are stored in an HDF5 file behind this interface. We often want to store more information than just the state vector. In the case of parareal, we have results from fine integration and coarse integration. In the case of fine integration, what we need to represent is the final state of the integration, but we are also interested in the intermediate steps.
«vector-expressions»
@dataclass
class H5Snap(Vector):
path: Pathstr
loc: slice: list[Union[None, int, slice]]
def data(self):
with h5.File(self.path, "r") as f:
return f[self.loc].__getitem__(tuple(self.slice))
def reduce(self):
= self.data()
x = logging.getLogger()
logger f"read {x} from {self.path}")
logger.debug(return self.data()
To generate slices in a nice manner we can use a helper class:
«vector-expressions»
class Index:
def __getitem__(self, idx):
if isinstance(idx, tuple):
return list(idx)
else:
return [idx]
= Index() index
Then index[a:b,c]
returns a list of slices [slice(a,b), c]
(type list[Union[None, int, slice]]
).
Operators
There are two classes of operators, unary and binary (more arguments can usually be expressed as a composition of unary and binary forms). We store the arguments together with a function operating on the arguments. The function should be serializable (e.g. using pickle
), meaning that lambda
expressions are not allowed, but partial
applications and functions in operator
typically are ok.
«vector-expressions»
@dataclass
class UnaryExpr(Vector):
func: Callable[[np.ndarray], np.ndarray]
inp: Vector
def reduce(self):
= reduce_expr(self.inp)
a return self.func(a)
@dataclass
class BinaryExpr(Vector):
func: Callable[[np.ndarray, np.ndarray], np.ndarray]
inp1: Vector
inp2: Vector
def reduce(self):
= reduce_expr(self.inp1)
a = reduce_expr(self.inp2)
b return self.func(a, b)
Literal expressions
To bootstrap our computation we may need to define a Vector
directly represented by a Numpy array.
«vector-expressions»
@dataclass
class LiteralExpr(Vector):
value: np.ndarray
def reduce(self):
return self.value
Running the harmonic oscillator
«example-mpi-imports»
from pintFoam.parareal.futures import (Parareal)
from pintFoam.parareal.forward_euler import forward_euler
# from pintFoam.parareal.iterate_solution import iterate_solution
from pintFoam.parareal.tabulate_solution import tabulate
from pintFoam.parareal.harmonic_oscillator import (underdamped_solution, harmonic_oscillator)
import math
# from uuid import uuid4
import logging
«example-mpi-main»
= 1.0
OMEGA0 = 0.5
ZETA = 0.001
H = harmonic_oscillator(OMEGA0, ZETA) system
«example-mpi-coarse»
@dataclass
class Coarse:
int
n_iter:
system: Any= harmonic_oscillator(OMEGA0, ZETA)
system
def solution(self, y, t0, t1):
= LiteralExpr(forward_euler(self.system)(reduce_expr(y), t0, t1))
a f"coarse result: {y} {reduce_expr(y)} {t0} {t1} {a}")
logging.debug(return a
«example-mpi-fine»
def generate_filename(name: str, n_iter: int, t0: float, t1: float) -> str:
return f"{name}-{n_iter:04}-{int(t0*1000):06}-{int(t1*1000):06}.h5"
@dataclass
class Fine:
parent: Pathstr
name: int
n_iter:
system: Anyfloat
h:
def solution(self, y, t0, t1):
= logging.getLogger()
logger = math.ceil((t1 - t0) / self.h)
n = np.linspace(t0, t1, n + 1)
t
self.parent.mkdir(parents=True, exist_ok=True)
= self.parent / generate_filename(self.name, self.n_iter, t0, t1)
path
with h5.File(path, "w") as f:
"fine %f - %f", t0, t1)
logger.debug(= reduce_expr(y)
y0 ": %s -> %s", y, y0)
logger.debug(= tabulate(forward_euler(self.system), reduce_expr(y), t)
x = f.create_dataset("data", data=x)
ds "t0"] = t0
ds.attrs["t1"] = t1
ds.attrs["h"] = self.h
ds.attrs["n"] = n
ds.attrs[return H5Snap(path, "data", index[-1])
«example-mpi-main»
= np.array([1.0, 0.0])
y0 = np.linspace(0.0, 15.0, 20)
t = Path("./output/euler")
archive
underdamped_solution(OMEGA0, ZETA)(t)"fine", 0, system, H).solution, LiteralExpr(y0), t)
tabulate(Fine(archive,
# euler_files = archive.glob("*.h5")
«example-mpi-history»
@dataclass
class History:
archive: Pathlist[list[Vector]] = field(default_factory=list)
history:
def convergence_test(self, y) -> bool:
= logging.getLogger()
logger self.history.append(y)
if len(self.history) < 2:
return False
= np.array([reduce_expr(x) for x in self.history[-2]])
a = np.array([reduce_expr(x) for x in self.history[-1]])
b = np.abs(a - b).max()
maxdif = maxdif < 1e-4
converged "maxdif of %f", maxdif)
logger.info(if converged:
"Converged after %u iteration", len(self.history))
logger.info(return converged
«example-mpi-main»
= Path("./output/parareal")
archive = Parareal(
p
client,lambda n: Coarse(n, system).solution,
lambda n: Fine(archive, "fine", n, system, H).solution)
= p.schedule(LiteralExpr(y0), t)
jobs = History(archive)
history p.wait(jobs, history.convergence_test)
Implementation of Parareal using Futures
We reimplement Parareal in the futures
framework of Dask. We have a few helper functions: identity
to be used as default instance for the mappings between coarse and fine meshes, and pairs
, a function that iterates through successive pairs of a list.
file:pintFoam/parareal/futures.py
from .abstract import (Solution, Mapping, Vector)
from typing import (Callable)
from dataclasses import dataclass
from math import ceil
import numpy as np
from numpy.typing import NDArray
from dask.distributed import Client, Future # type: ignore
import logging
def identity(x):
return x
def pairs(lst):
return zip(lst[:-1], lst[1:])
We need to send every operation to a remote worker, that includes summing the vectors from coarse and fine integrators.
«parareal-futures»
def combine(c1: Vector, f1: Vector, c2: Vector) -> Vector:
f"combine: {c1} {f1} {c2}")
logging.debug(return c1 + f1 - c2
«time-windows»
def time_windows(times, window_size):
"""Split the times vector in a set of time windows of a given size.
Args:
times: The times vector
window_size: The number of steps per window (note that n steps
correspond to n + 1 elements in the window). The last window may
be smaller.
"""
= len(times) - 1
n_intervals = int(ceil(n_intervals / window_size))
n = window_size
m return [times[i*m:min(i*m+m+1, len(times))] for i in range(n)]
Every call that actually requires some of the data needs to be sent to the remote worker(s). Where we could get away before with putting everything in a closure, now it is easier to make a class that includes the Dask Client
instance.
«parareal-futures»
@dataclass
class Parareal:
client: Clientint], Solution]
coarse: Callable[[int], Solution]
fine: Callable[[= identity
c2f: Mapping = identity
f2c: Mapping
def _c2f(self, x: Future) -> Future:
if self.c2f is identity:
return x
return self.client.submit(self.c2f, x)
def _f2c(self, x: Future) -> Future:
if self.f2c is identity:
return x
return self.client.submit(self.f2c, x)
def _coarse(self, n_iter: int, y: Future, t0: float, t1: float) -> Future:
"Coarse run: %s, %s, %s", y, t0, t1)
logging.debug(return self.client.submit(self.coarse(n_iter), y, t0, t1)
def _fine(self, n_iter: int, y: Future, t0: float, t1: float) -> Future:
"Fine run: %s, %s, %s", y, t0, t1)
logging.debug(return self.client.submit(self.fine(n_iter), y, t0, t1)
<<parareal-methods>>
The step
method implements the core parareal algorithm.
«parareal-methods»
def step(self, n_iter: int, y_prev: list[Future], t: NDArray[np.float64]) -> list[Future]:
= t.size
m = [None] * m
y_next for i in range(n_iter):
= y_prev[i]
y_next[i]
for i in range(n_iter, m):
= self._c2f(self._coarse(n_iter, self.f2c(y_next[i-1]), t[i-1], t[i]))
c1 = self._fine(n_iter, y_prev[i-1], t[i-1], t[i])
f1 = self._c2f(self._coarse(n_iter, self.f2c(y_prev[i-1]), t[i-1], t[i]))
c2 = self.client.submit(combine, c1, f1, c2)
y_next[i]
return y_next
We schedule every possible iteration of parareal as a future. The tactic is to cancel remaining jobs only once we found a converging result. This way, workers can compute next iterations, even if the last step of the previous iteration is not yet complete and tested for convergence.
«parareal-methods»
def schedule(self, y_0: Vector, t: NDArray[np.float64]) -> list[list[Future]]:
# schedule initial coarse integration
= [self.client.scatter(y_0)]
y_init for (a, b) in pairs(t):
self._coarse(0, y_init[-1], a, b))
y_init.append(
# schedule all iterations of parareal
= [y_init]
jobs for n_iter in range(len(t)):
self.step(n_iter+1, jobs[-1], t))
jobs.append(
return jobs
The wait
method then gathers results and returns the first iteration that satisfies convergence_test
.
«parareal-methods»
def wait(self, jobs, convergence_test):
for i in range(len(jobs)):
= self.client.gather(jobs[i])
result if convergence_test(result):
for j in jobs[i+1:]:
self.client.cancel(j)
return result
return result
Harmonic oscillator
We may test this on the harmonic oscillator.
file:test/test_futures.py
from dataclasses import dataclass, field
from functools import partial
import logging
from numpy.typing import NDArray
import numpy as np
from pintFoam.parareal.futures import Parareal
from pintFoam.parareal.harmonic_oscillator import harmonic_oscillator
from pintFoam.parareal.forward_euler import forward_euler
from pintFoam.parareal.iterate_solution import iterate_solution
from pintFoam.parareal.tabulate_solution import tabulate
from dask.distributed import Client # type: ignore
= 1.0
OMEGA0 = 0.5
ZETA = 0.001
H = harmonic_oscillator(OMEGA0, ZETA)
system
def coarse(_, y, t0, t1):
return forward_euler(system)(y, t0, t1)
def fine(_, y, t0, t1):
return iterate_solution(forward_euler(system), H)(y, t0, t1)
@dataclass
class History:
list[NDArray[np.float64]] = field(default_factory=list)
history:
def convergence_test(self, y):
self.history.append(np.array(y))
"got result: %s", self.history[-1])
logging.debug(if len(self.history) < 2:
return False
return np.allclose(self.history[-1], self.history[-2], atol=1e-4)
def test_parareal():
= Client()
client = Parareal(client, lambda n: partial(coarse, n), lambda n: partial(fine, n))
p = np.linspace(0.0, 15.0, 30)
t = np.array([0.0, 1.0])
y0 = History()
history = p.schedule(y0, t)
jobs
p.wait(jobs, history.convergence_test)
if __name__ == "__main__":
=logging.DEBUG)
logging.basicConfig(level= np.array([1.0, 0.0])
y0 = np.linspace(0.0, 15.0, 30)
t = partial(fine, None) # A function of only (x, t_0, t_1) is expected
fine_solution = tabulate(fine_solution, y0, t)
result print(result)
Parareal
file:pintFoam/parareal/__init__.py
from .tabulate_solution import tabulate
from .parareal import parareal
from . import abstract
= ["tabulate", "parareal", "schedule", "abstract"] __all__
Components
We may present the Parareal algorithm in abstract terms, and match those terms with corresponding type definitions in Python.
We need to define the following:
Vector
A
Vector
is an object that represents the state of a solution at any one time. On this state we need to be able to do addition, subtraction and scalar multiplication, in order to perform the Parareal algorithm.Solution
A
Solution
is a function that takes an initialVector
, a timet_0
and a timet
, returning the stateVector
at timet
.Mapping
A
Mapping
is a function from one stateVector
to another, for example a mapping from a coarse to a fine mesh or vice-versa.- Fine
Solution
The fine solution is the solution at the desired resolution. If we were not doing parallel-in-time, this would be the integrator to get at the correct result. We may also use the fine solution to find a ground thruth in testing the Parareal solution.
- Coarse
Solution
The coarse solution is the solution that is fast but less accurate.
file:pintFoam/parareal/abstract.py
from __future__ import annotations
from typing import (Callable, Protocol, TypeVar, Union)
<<abstract-types>>
Vector
We have an ODE in the form
\[y' = f(y, t).\](1)
Here \(y\) can be a scalar value, a vector of values (say a numpy
array), or any expression of state. A naive implementation of an ODE integrator would be
\[y_{n+1} = y_{n} + \Delta t f(y_{n}, t).\](2)
eq. 2 is known as the forward Euler method. We can capture the state \(y\) in an abstract class we’ll call Vector
. We chose this name because we expect this objects to share (some of) the arithmetic properties of mathematical vectors. Namely, we want to be able to add, subtract and scale them. The chunk below states this need of a basic arithmetic in the form of abstract methods.
«abstract-types»
= TypeVar("TVector", bound="Vector")
TVector
class Vector(Protocol):
def __add__(self: TVector, other: TVector) -> TVector:
...
def __sub__(self: TVector, other: TVector) -> TVector:
...
def __mul__(self: TVector, other: float) -> TVector:
...
def __rmul__(self: TVector, other: float) -> TVector:
...
We don’t actually need to implement these methods right now. All this is saying, is that any type that has these methods defined can stand in for a Vector
.
Note that we don’t make a formal distinction here between a state vector and a vector representing a change in state.
«abstract-types»
= Callable[[TVector], TVector] Mapping
Problem
An ODE is then given as a function taking a Vector
(the state \(y\)) and a float
(the time \(t\)) returning a Vector
(the derivative \(y' = f(y,t)\) evaluated at \((y,t)\)). We define the type Problem
:
«abstract-types»
= Callable[[TVector, float], TVector] Problem
In mathematical notation the snippet above means:
\[{\rm Problem} : (y, t) \to f(y, t) = y'\]
Solution
If we have a Problem
, we’re after a Solution
: a function that, given an initial Vector
(the initial condition \(y_0\)), initial time (\(t_0\)) and final time (\(t\)), gives the resulting Vector
(the solution, \(y(t)\) for the given initial conditions).
«abstract-types»
= Union[Callable[[TVector, float, float], TVector],
Solution Callable[..., TVector]]
Those readers more familiar with classical physics or mathematics may notice that our Problem
object corresponds with the function \(f\) in (eq. 1). The Solution
object, on the other hand, corresponds with the evolution operator \(\phi\) in equation 3.
\[{\rm Solution} : (y_0, t_0; t) \to \phi(y_0, t_0; t) = y.\](3)
Intuitively, \(\phi\) represents any method that solves (even approximately) our initial value problem.
Example
An example of a Problem
would be the function,
\[f(y, t) = r y,\]
in which case the corresponding Solution
is,
\[\phi(y_0, t_0; t) = y_0 e^{r(t - t_0)}.\]
Solver
The challenge is, of course, to find a way of transforming a Problem
into a Solution
. This is what integration algorithms, or solvers do:
\[{\rm Solver} : {\rm Problem} \to {\rm Solution}.\]
If we look a bit closely at the definitions of Problem
and Solution
we’ll notice that a solver is indeed a functional that accepts functions of \((y,t)\) as an input and returns functions of \((y_0, t_0, t)\) as an output.
An example of such a solver is the forward Euler method (eq. 2), that can be implemented as:
file:pintFoam/parareal/forward_euler.py
from .abstract import (Vector, Problem, Solution)
def forward_euler(f: Problem) -> Solution:
"""Forward-Euler solver."""
def step(y: Vector, t_0: float, t_1: float) -> Vector:
"""Stepping function of Euler method."""
return y + (t_1 - t_0) * f(y, t_0)
return step
Any existing solution can be iterated over to provide a solution over a larger time interval. The iterate_solution
function runs a given solution with a step-size fixed to \(\Delta t = h\).
file:pintFoam/parareal/iterate_solution.py
from .abstract import (Vector, Solution)
import numpy as np
import math
def iterate_solution(step: Solution, h: float) -> Solution:
def iter_step(y: Vector, t_0: float, t_1: float) -> Vector:
"""Stepping function of iterated solution."""
= math.ceil((t_1 - t_0) / h)
n = np.linspace(t_0, t_1, n + 1)
steps for t_a, t_b in zip(steps[:-1], steps[1:]):
= step(y, t_a, t_b)
y return y
return iter_step
Example: damped harmonic oscillator
We give a bit more attention to the example of the harmonic oscillator, because it will also serve as a first test case for the Parareal algorithm later on.
The harmonic oscillator can model the movement of a pendulum or the vibration of a mass on a string.
\[y'' + 2\zeta \omega_0 y' + \omega_0^2 y = 0,\]
where \(\omega_0 = \sqrt{k/m}\) and \(\zeta = c / 2\sqrt{mk}\), \(k\) being the spring constant, \(m\) the test mass and \(c\) the friction constant.
To solve this second order ODE we need to introduce a second variable to solve for. Say \(q = y\) and \(p = y'\).
\[\begin{aligned} q' &= p\\ p' &= -2\zeta \omega_0 p + \omega_0^2 q \end{aligned}\](4)
The Problem
is then given as
file:pintFoam/parareal/harmonic_oscillator.py
from .abstract import (Problem)
from typing import Callable
from numpy.typing import NDArray
import numpy as np
def harmonic_oscillator(omega_0: float, zeta: float) -> Problem:
def f(y, t):
return np.r_[y[1], -2 * zeta * omega_0 * y[1] - omega_0**2 * y[0]]
return f
<<harmonic-oscillator-solution>>
if __name__ == "__main__":
import numpy as np # type: ignore
import pandas as pd # type: ignore
from plotnine import ggplot, geom_line, aes # type: ignore
from pintFoam.parareal.harmonic_oscillator import harmonic_oscillator
from pintFoam.parareal.forward_euler import forward_euler
from pintFoam.parareal.iterate_solution import iterate_solution
from pintFoam.parareal.tabulate_solution import tabulate_np
= 1.0
OMEGA0 = 0.5
ZETA = 0.001
H = harmonic_oscillator(OMEGA0, ZETA)
system
def coarse(y, t0, t1):
return forward_euler(system)(y, t0, t1)
# fine :: Solution[NDArray]
def fine(y, t0, t1):
return iterate_solution(forward_euler(system), H)(y, t0, t1)
= np.array([1.0, 0.0])
y0 = np.linspace(0.0, 15.0, 100)
t = underdamped_solution(OMEGA0, ZETA)(t)
exact_result = tabulate_np(fine, y0, t)
euler_result
= pd.DataFrame({
data "time": t,
"exact_q": exact_result[:,0],
"exact_p": exact_result[:,1],
"euler_q": euler_result[:,0],
"euler_p": euler_result[:,1]})
= ggplot(data) \
plot + geom_line(aes("time", "exact_q")) \
+ geom_line(aes("time", "euler_q"), color="#000088")
"plot.svg") plot.save(
Exact solution
The damped harmonic oscillator has an exact solution, given the ansatz \(y = A \exp(z t)\), we get
\[z_{\pm} = \omega_0\left(-\zeta \pm \sqrt{\zeta^2 - 1}\right).\]
and thus the general solution:
\[y(t) = A \exp(z_+ t) + B \exp(z_- t) \ : \zeta \neq 1 \] \[y(t) = (A + Bt) \exp(-\omega_0 t) : \zeta = 1 \]
This dynamical system has three qualitatively different solutions, each of them depending on the sign of the contents of the square root. Particularly, if the contents of the square root are negative, the two possible values for \(z\) will be complex numbers, making oscillations possible. More specifically, the three cases are:
- overdamped (\(\zeta > 1\) and, thus, both \(z\) are real numbers)
- critical dampening (\(\zeta = 1\) and \(z\) is real and equal to \(-\omega_0\))
- underdamped (\(\mid \zeta \mid < 1\), and \(z = -\omega_0\zeta \mp i \omega_0 \sqrt{1 - \zeta^2}\)).
The underdamped case is typically the most interesting one. In this case we have solutions of the form:
\[y = A\quad \underbrace{\exp(-\omega_0\zeta t)}_{\rm dampening}\quad\underbrace{\exp(\pm i \omega_0 \sqrt{1 - \zeta^2} t)}_{\rm oscillation},\]
Given an initial condition \(q_0 = 1, p_0 = 0\), the solution is computed as
«harmonic-oscillator-solution»
def underdamped_solution(omega_0: float, zeta: float) \
-> Callable[[NDArray[np.float64]], NDArray[np.float64]]:
= 1 / np.sqrt(1 - zeta**2)
amp = np.arcsin(zeta)
phase = omega_0 * np.sqrt(1 - zeta**2)
freq
def f(t: NDArray[np.float64]) -> NDArray[np.float64]:
= np.exp(-omega_0*zeta*t)
dampening = amp * dampening * np.cos(freq * t - phase)
q = - amp * omega_0 * dampening * np.sin(freq * t)
p return np.c_[q, p]
return f
Numeric solution
To plot a Solution
, we need to tabulate the results for a given sequence of time points.
file:pintFoam/parareal/tabulate_solution.py
from .abstract import (Solution, Vector)
from typing import (Sequence, Any)
import numpy as np
= Any
Array
def tabulate(step: Solution, y_0: Vector, t: Array) -> Sequence[Vector]:
"""Tabulate the step-wise solution, starting from `y_0`, for every time
point given in array `t`."""
# if isinstance(y_0, np.ndarray):
# return tabulate_np(step, y_0, t)
= [y_0]
y for i in range(1, t.size):
= step(y[i-1], t[i-1], t[i])
y_i
y.append(y_i)return y
<<tabulate-np>>
In the case that the Vector
type is actually a numpy array, we can specialize the tabulate
routine to return a larger array.
«tabulate-np»
def tabulate_np(step: Solution, y_0: Array, t: Array) -> Array:
= np.zeros(dtype=y_0.dtype, shape=(t.size,) + y_0.shape)
y 0] = y_0
y[for i in range(1, t.size):
= step(y[i-1], t[i-1], t[i])
y[i] return y
We can compare the results from the numeric integration with the exact solution.
Parareal
From Wikipedia:
Parareal solves an initial value problem of the form
\[\dot{y}(t) = f(y(t), t), \quad y(t_0) = y_0 \quad \text{with} \quad t_0 \leq t \leq T.\]
Here, the right hand side \(f\) can correspond to the spatial discretization of a partial differential equation in a method of lines approach. Parareal now requires a decomposition of the time interval \([t_0, T]\) into \(P\) so-called time slices \([t_j, t_{j+1}]\) such that
\[[t_0, T] = [t_0, t_1] \cup [t_1, t_2] \cup \ldots \cup [t_{P-1}, t_{P} ].\]
Each time slice is assigned to one processing unit when parallelizing the algorithm, so that \(P\) is equal to the number of processing units used for Parareal.
Parareal is based on the iterative application of two methods for integration of ordinary differential equations. One, commonly labelled \({\mathcal {F}}\), should be of high accuracy and computational cost while the other, typically labelled \({\mathcal {G}}\), must be computationally cheap but can be much less accurate. Typically, some form of Runge-Kutta method is chosen for both coarse and fine integrator, where \({\mathcal {G}}\) might be of lower order and use a larger time step than \({\mathcal {F}}\). If the initial value problem stems from the discretization of a PDE, \({\mathcal {G}}\) can also use a coarser spatial discretization, but this can negatively impact convergence unless high order interpolation is used. The result of numerical integration with one of these methods over a time slice \([t_{j}, t_{j+1}]\) for some starting value \(y_{j}\) given at \(t_{j}\) is then written as
\[y = \mathcal{F}(y_j, t_j, t_{j+1})\ {\rm or}\ y = \mathcal{G}(y_j, t_j, t_{j+1}).\]
Serial time integration with the fine method would then correspond to a step-by-step computation of
\[y_{j+1} = \mathcal{F}(y_j, t_j, t_{j+1}), \quad j=0, \ldots, P-1.\]
Parareal instead uses the following iteration
\[y_{j+1}^{k+1} = \mathcal{G}(y^{k+1}_j, t_j, t_{j+1}) + \mathcal{F}(y^k_j, t_j, t_{j+1}) - \mathcal{G}(y^k_j, t_j, t_{j+1}),\\ \quad j=0, \ldots, P-1, \quad k=0, \ldots, K-1,\]
where \(k\) is the iteration counter. As the iteration converges and \(y^{k+1}_j - y^k_j \to 0\), the terms from the coarse method cancel out and Parareal reproduces the solution that is obtained by the serial execution of the fine method only. It can be shown that Parareal converges after a maximum of \(P\) iterations. For Parareal to provide speedup, however, it has to converge in a number of iterations significantly smaller than the number of time slices, that is \(K \ll P\).
In the Parareal iteration, the computationally expensive evaluation of \(\mathcal{F}(y^k_j, t_j, t_{j+1})\) can be performed in parallel on \(P\) processing units. By contrast, the dependency of \(y^{k+1}_{j+1}\) on \(\mathcal{G}(y^{k+1}_j, t_j, t_{j+1})\) means that the coarse correction has to be computed in serial order.
Don’t get blinded by the details of the algorithm. After all, everything boils down to an update equation that uses a state vector \(y\) to calculate the state at the immediately next future step (in the same fashion as equation eq. 2 did). The core equation translates to:
«parareal-core-1»
= coarse(y_n[i-1], t[i-1], t[i]) \
y_n[i] + fine(y[i-1], t[i-1], t[i]) \
- coarse(y[i-1], t[i-1], t[i])
If we include a Mapping
between fine and coarse meshes into the equation, we get:
«parareal-core-2»
= c2f(coarse(f2c(y_n[i-1]), t[i-1], t[i])) \
y_n[i] + fine(y[i-1], t[i-1], t[i]) \
- c2f(coarse(f2c(y[i-1]), t[i-1], t[i]))
The rest is boiler plate. For the c2f
and f2c
mappings we provide a default argument of the identity function.
file:pintFoam/parareal/parareal.py
from .abstract import (Solution, Mapping)
import numpy as np
def identity(x):
return x
def parareal(
coarse: Solution,
fine: Solution,= identity,
c2f: Mapping = identity):
f2c: Mapping def f(y, t):
= t.size
m = [None] * m
y_n 0] = y[0]
y_n[for i in range(1, m):
<<parareal-core-2>>
return y_n
return f
def parareal_np(
coarse: Solution,
fine: Solution,= identity,
c2f: Mapping = identity):
f2c: Mapping def f(y, t):
= t.size
m = np.zeros_like(y)
y_n 0] = y[0]
y_n[for i in range(1, m):
<<parareal-core-2>>
return y_n
return f
Running in parallel
«import-dask»
from dask import delayed # type: ignore
«daskify»
<<import-dask>>
import numpy as np
from pintFoam.parareal.harmonic_oscillator import \
( harmonic_oscillator, underdamped_solution )from pintFoam.parareal.forward_euler import \
( forward_euler )from pintFoam.parareal.tabulate_solution import \
( tabulate )from pintFoam.parareal.parareal import \
( parareal )from pintFoam.parareal.iterate_solution import \
( iterate_solution)
= {}
attrs
def green(f):
def greened(*args):
= f(*args)
node = {"fillcolor": "#8888cc", "style": "filled"}
attrs[node.key] return node
return greened
@delayed
def gather(*args):
return list(args)
To see what Noodles does, first we’ll daskify the direct integration routine in tabulate
. We take the same harmonic oscillator we had before. For the sake of argument let’s divide the time line in three steps (so four points).
«daskify»
= 1.0
omega_0 = 0.5
zeta = harmonic_oscillator(omega_0, zeta)
f = np.linspace(0.0, 15.0, 4) t
We now define the fine
integrator:
«daskify»
= 0.01
h
@green
@delayed
def fine(x, t_0, t_1):
return iterate_solution(forward_euler(f), h)(x, t_0, t_1)
It doesn’t really matter what the fine integrator does, since we won’t run anything. We’ll just pretend. The delayed
decorator makes sure that the integrator is never called, we just store the information that we want to call the fine
function. The resulting value is a promise that at some point we will call the fine
function. The nice thing is, that this promise behaves like any other Python object, it even qualifies as a Vector
! The tabulate
routine returns a Sequence
of Vector
s, in this case a list of promises. The gather
function takes a list of promises and turns it into a promise of a list.
«daskify»
= tabulate(fine, np.array([1.0, 0.0]), t) y_euler
We can draw the resulting workflow:
«daskify»
*y_euler).visualize("seq-graph.svg", rankdir="LR", data_attributes=attrs) gather(
This workflow is entirely sequential, every step depending on the preceding one. Now for Parareal! We also define the coarse
integrator.
«daskify»
@delayed
def coarse(x, t_0, t_1):
return forward_euler(f)(x, t_0, t_1)
Parareal is initialised with the ODE integrated by the coarse integrator, just like we did before with the fine one.
«daskify»
= tabulate(coarse, np.array([1.0, 0.0]), t) y_first
We can now perform a single iteration of Parareal to see what the workflow looks like:
«daskify»
= gather(*parareal(coarse, fine)(y_first, t)) y_parareal
«daskify»
"parareal-graph.pdf", rankdir="LR", data_attributes=attrs) y_parareal.visualize(
Create example file
file:examples/harmonic_oscillator.py
<<daskify>>
Unit testing
Unit testing needs to be done on cases that are easy. For the moment we have selected pitzDaily
for this.
file:test/test_foam_run.py
from pathlib import Path
from shutil import copytree
import numpy as np
from pintFoam.vector import BaseCase
from pintFoam.foam import (block_mesh, foam)
= {
pitzDaily_fields "T", "U", "phi"
}
def test_basic_pitzDaily(tmp_path):
= Path(tmp_path) / "case0"
path = Path(".") / "test" / "cases" / "pitzDaily"
data / "base")
copytree(data, path
= BaseCase(path, "base")
base_case = pitzDaily_fields
base_case.fields
block_mesh(base_case)
= base_case.new_vector()
base_vec = foam("scalarTransportFoam", 0.001, base_vec, 0.0, 0.001)
init_vec # init_vec.time = "0"
= foam("scalarTransportFoam", 0.01, init_vec, 0.001, 0.1)
end_vec
assert end_vec.dirname.exists()
= end_vec.clone()
end_vec_clone assert end_vec_clone.time == end_vec.time
# assert get_times(end_vec_clone.path) == ["0", "0.1"]
= end_vec - init_vec
diff_vec
for f in pitzDaily_fields:
with end_vec.mmap_data(f) as a, \
as b, \
init_vec.mmap_data(f) as c:
diff_vec.mmap_data(f) assert np.abs(a() - b() - c()).mean() < 1e-6
# assert diff_vec.time == "0.1"
= init_vec + diff_vec
orig_vec = end_vec - orig_vec
should_be_zero for f in pitzDaily_fields:
with should_be_zero.mmap_data(f) as v:
assert np.all(np.abs(v()) < 1e-6)
def test_restart(tmp_path):
= Path(tmp_path) / "case0"
path = Path(".") / "test" / "cases" / "pitzDaily"
data / "base")
copytree(data, path
= BaseCase(path, "base")
base_case = pitzDaily_fields
base_case.fields
block_mesh(base_case)
= base_case.new_vector()
init_vec = foam("scalarTransportFoam", 0.01, init_vec, 0.0, 0.2)
check = foam("scalarTransportFoam", 0.01, init_vec, 0.0, 0.1)
end_vec = end_vec.clone()
init_vec = foam("scalarTransportFoam", 0.01, init_vec, 0.1, 0.2)
end_vec = end_vec - check
diff for f in pitzDaily_fields:
with diff.mmap_data(f) as v:
assert np.all(np.abs(v()) < 1e-6)