You can run this notebook in a live session or view it on Github.
Applying unvectorized functions with apply_ufunc
¶
This example will illustrate how to conveniently apply an unvectorized function func
to xarray objects using apply_ufunc
. func
expects 1D numpy arrays and returns a 1D numpy array. Our goal is to conveniently apply this function along a dimension of xarray objects that may or may not wrap dask arrays with a signature.
We will illustrate this using np.interp
:
Signature: np.interp(x, xp, fp, left=None, right=None, period=None)
Docstring:
One-dimensional linear interpolation.
Returns the one-dimensional piecewise linear interpolant to a function
with given discrete data points (`xp`, `fp`), evaluated at `x`.
and write an xr_interp
function with signature
xr_interp(xarray_object, dimension_name, new_coordinate_to_interpolate_to)
Load data¶
First lets load an example dataset
[1]:
import xarray as xr
import numpy as np
xr.set_options(display_style="html") # fancy HTML repr
air = (
xr.tutorial.load_dataset("air_temperature")
.air.sortby("lat") # np.interp needs coordinate in ascending order
.isel(time=slice(4), lon=slice(3))
) # choose a small subset for convenience
air
---------------------------------------------------------------------------
gaierror Traceback (most recent call last)
File /usr/lib/python3/dist-packages/urllib3/connection.py:198, in HTTPConnection._new_conn(self)
197 try:
--> 198 sock = connection.create_connection(
199 (self._dns_host, self.port),
200 self.timeout,
201 source_address=self.source_address,
202 socket_options=self.socket_options,
203 )
204 except socket.gaierror as e:
File /usr/lib/python3/dist-packages/urllib3/util/connection.py:60, in create_connection(address, timeout, source_address, socket_options)
58 raise LocationParseError(f"'{host}', label empty or too long") from None
---> 60 for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
61 af, socktype, proto, canonname, sa = res
File /usr/lib/python3.13/socket.py:977, in getaddrinfo(host, port, family, type, proto, flags)
976 addrlist = []
--> 977 for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
978 af, socktype, proto, canonname, sa = res
gaierror: [Errno -3] Temporary failure in name resolution
The above exception was the direct cause of the following exception:
NameResolutionError Traceback (most recent call last)
File /usr/lib/python3/dist-packages/urllib3/connectionpool.py:787, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw)
786 # Make the request on the HTTPConnection object
--> 787 response = self._make_request(
788 conn,
789 method,
790 url,
791 timeout=timeout_obj,
792 body=body,
793 headers=headers,
794 chunked=chunked,
795 retries=retries,
796 response_conn=response_conn,
797 preload_content=preload_content,
798 decode_content=decode_content,
799 **response_kw,
800 )
802 # Everything went great!
File /usr/lib/python3/dist-packages/urllib3/connectionpool.py:488, in HTTPConnectionPool._make_request(self, conn, method, url, body, headers, retries, timeout, chunked, response_conn, preload_content, decode_content, enforce_content_length)
487 new_e = _wrap_proxy_error(new_e, conn.proxy.scheme)
--> 488 raise new_e
490 # conn.request() calls http.client.*.request, not the method in
491 # urllib3.request. It also calls makefile (recv) on the socket.
File /usr/lib/python3/dist-packages/urllib3/connectionpool.py:464, in HTTPConnectionPool._make_request(self, conn, method, url, body, headers, retries, timeout, chunked, response_conn, preload_content, decode_content, enforce_content_length)
463 try:
--> 464 self._validate_conn(conn)
465 except (SocketTimeout, BaseSSLError) as e:
File /usr/lib/python3/dist-packages/urllib3/connectionpool.py:1093, in HTTPSConnectionPool._validate_conn(self, conn)
1092 if conn.is_closed:
-> 1093 conn.connect()
1095 # TODO revise this, see https://github.com/urllib3/urllib3/issues/2791
File /usr/lib/python3/dist-packages/urllib3/connection.py:704, in HTTPSConnection.connect(self)
703 sock: socket.socket | ssl.SSLSocket
--> 704 self.sock = sock = self._new_conn()
705 server_hostname: str = self.host
File /usr/lib/python3/dist-packages/urllib3/connection.py:205, in HTTPConnection._new_conn(self)
204 except socket.gaierror as e:
--> 205 raise NameResolutionError(self.host, self, e) from e
206 except SocketTimeout as e:
NameResolutionError: <urllib3.connection.HTTPSConnection object at 0x7fe88d666900>: Failed to resolve 'github.com' ([Errno -3] Temporary failure in name resolution)
The above exception was the direct cause of the following exception:
MaxRetryError Traceback (most recent call last)
File /usr/lib/python3/dist-packages/requests/adapters.py:667, in HTTPAdapter.send(self, request, stream, timeout, verify, cert, proxies)
666 try:
--> 667 resp = conn.urlopen(
668 method=request.method,
669 url=url,
670 body=request.body,
671 headers=request.headers,
672 redirect=False,
673 assert_same_host=False,
674 preload_content=False,
675 decode_content=False,
676 retries=self.max_retries,
677 timeout=timeout,
678 chunked=chunked,
679 )
681 except (ProtocolError, OSError) as err:
File /usr/lib/python3/dist-packages/urllib3/connectionpool.py:841, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw)
839 new_e = ProtocolError("Connection aborted.", new_e)
--> 841 retries = retries.increment(
842 method, url, error=new_e, _pool=self, _stacktrace=sys.exc_info()[2]
843 )
844 retries.sleep()
File /usr/lib/python3/dist-packages/urllib3/util/retry.py:519, in Retry.increment(self, method, url, response, error, _pool, _stacktrace)
518 reason = error or ResponseError(cause)
--> 519 raise MaxRetryError(_pool, url, reason) from reason # type: ignore[arg-type]
521 log.debug("Incremented Retry for (url='%s'): %r", url, new_retry)
MaxRetryError: HTTPSConnectionPool(host='github.com', port=443): Max retries exceeded with url: /pydata/xarray-data/raw/master/air_temperature.nc (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x7fe88d666900>: Failed to resolve 'github.com' ([Errno -3] Temporary failure in name resolution)"))
During handling of the above exception, another exception occurred:
ConnectionError Traceback (most recent call last)
Cell In[1], line 7
2 import numpy as np
4 xr.set_options(display_style="html") # fancy HTML repr
6 air = (
----> 7 xr.tutorial.load_dataset("air_temperature")
8 .air.sortby("lat") # np.interp needs coordinate in ascending order
9 .isel(time=slice(4), lon=slice(3))
10 ) # choose a small subset for convenience
11 air
File /usr/lib/python3/dist-packages/xarray/tutorial.py:213, in load_dataset(*args, **kwargs)
176 def load_dataset(*args, **kwargs) -> Dataset:
177 """
178 Open, load into memory, and close a dataset from the online repository
179 (requires internet).
(...)
211 load_dataset
212 """
--> 213 with open_dataset(*args, **kwargs) as ds:
214 return ds.load()
File /usr/lib/python3/dist-packages/xarray/tutorial.py:165, in open_dataset(name, cache, cache_dir, engine, **kws)
162 downloader = pooch.HTTPDownloader(headers=headers)
164 # retrieve the file
--> 165 filepath = pooch.retrieve(
166 url=url, known_hash=None, path=cache_dir, downloader=downloader
167 )
168 ds = _open_dataset(filepath, engine=engine, **kws)
169 if not cache:
File /usr/lib/python3/dist-packages/pooch/core.py:239, in retrieve(url, known_hash, fname, path, processor, downloader, progressbar)
236 if downloader is None:
237 downloader = choose_downloader(url, progressbar=progressbar)
--> 239 stream_download(url, full_path, known_hash, downloader, pooch=None)
241 if known_hash is None:
242 get_logger().info(
243 "SHA256 hash of downloaded file: %s\n"
244 "Use this value as the 'known_hash' argument of 'pooch.retrieve'"
(...)
247 file_hash(str(full_path)),
248 )
File /usr/lib/python3/dist-packages/pooch/core.py:807, in stream_download(url, fname, known_hash, downloader, pooch, retry_if_failed)
803 try:
804 # Stream the file to a temporary so that we can safely check its
805 # hash before overwriting the original.
806 with temporary_file(path=str(fname.parent)) as tmp:
--> 807 downloader(url, tmp, pooch)
808 hash_matches(tmp, known_hash, strict=True, source=str(fname.name))
809 shutil.move(tmp, str(fname))
File /usr/lib/python3/dist-packages/pooch/downloaders.py:220, in HTTPDownloader.__call__(self, url, output_file, pooch, check_only)
218 # pylint: enable=consider-using-with
219 try:
--> 220 response = requests.get(url, timeout=timeout, **kwargs)
221 response.raise_for_status()
222 content = response.iter_content(chunk_size=self.chunk_size)
File /usr/lib/python3/dist-packages/requests/api.py:73, in get(url, params, **kwargs)
62 def get(url, params=None, **kwargs):
63 r"""Sends a GET request.
64
65 :param url: URL for the new :class:`Request` object.
(...)
70 :rtype: requests.Response
71 """
---> 73 return request("get", url, params=params, **kwargs)
File /usr/lib/python3/dist-packages/requests/api.py:59, in request(method, url, **kwargs)
55 # By using the 'with' statement we are sure the session is closed, thus we
56 # avoid leaving sockets open which can trigger a ResourceWarning in some
57 # cases, and look like a memory leak in others.
58 with sessions.Session() as session:
---> 59 return session.request(method=method, url=url, **kwargs)
File /usr/lib/python3/dist-packages/requests/sessions.py:589, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
584 send_kwargs = {
585 "timeout": timeout,
586 "allow_redirects": allow_redirects,
587 }
588 send_kwargs.update(settings)
--> 589 resp = self.send(prep, **send_kwargs)
591 return resp
File /usr/lib/python3/dist-packages/requests/sessions.py:703, in Session.send(self, request, **kwargs)
700 start = preferred_clock()
702 # Send the request
--> 703 r = adapter.send(request, **kwargs)
705 # Total elapsed time of the request (approximately)
706 elapsed = preferred_clock() - start
File /usr/lib/python3/dist-packages/requests/adapters.py:700, in HTTPAdapter.send(self, request, stream, timeout, verify, cert, proxies)
696 if isinstance(e.reason, _SSLError):
697 # This branch is for urllib3 v1.22 and later.
698 raise SSLError(e, request=request)
--> 700 raise ConnectionError(e, request=request)
702 except ClosedPoolError as e:
703 raise ConnectionError(e, request=request)
ConnectionError: HTTPSConnectionPool(host='github.com', port=443): Max retries exceeded with url: /pydata/xarray-data/raw/master/air_temperature.nc (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x7fe88d666900>: Failed to resolve 'github.com' ([Errno -3] Temporary failure in name resolution)"))
The function we will apply is np.interp
which expects 1D numpy arrays. This functionality is already implemented in xarray so we use that capability to make sure we are not making mistakes.
[2]:
newlat = np.linspace(15, 75, 100)
air.interp(lat=newlat)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[2], line 2
1 newlat = np.linspace(15, 75, 100)
----> 2 air.interp(lat=newlat)
NameError: name 'air' is not defined
Let’s define a function that works with one vector of data along lat
at a time.
[3]:
def interp1d_np(data, x, xi):
return np.interp(xi, x, data)
interped = interp1d_np(air.isel(time=0, lon=0), air.lat, newlat)
expected = air.interp(lat=newlat)
# no errors are raised if values are equal to within floating point precision
np.testing.assert_allclose(expected.isel(time=0, lon=0).values, interped)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[3], line 5
1 def interp1d_np(data, x, xi):
2 return np.interp(xi, x, data)
----> 5 interped = interp1d_np(air.isel(time=0, lon=0), air.lat, newlat)
6 expected = air.interp(lat=newlat)
8 # no errors are raised if values are equal to within floating point precision
NameError: name 'air' is not defined
No errors are raised so our interpolation is working.¶
This function consumes and returns numpy arrays, which means we need to do a lot of work to convert the result back to an xarray object with meaningful metadata. This is where apply_ufunc
is very useful.
apply_ufunc
¶
Apply a vectorized function for unlabeled arrays on xarray objects.
The function will be mapped over the data variable(s) of the input arguments using
xarray’s standard rules for labeled computation, including alignment, broadcasting,
looping over GroupBy/Dataset variables, and merging of coordinates.
apply_ufunc
has many capabilities but for simplicity this example will focus on the common task of vectorizing 1D functions over nD xarray objects. We will iteratively build up the right set of arguments to apply_ufunc
and read through many error messages in doing so.
[4]:
xr.apply_ufunc(
interp1d_np, # first the function
air.isel(time=0, lon=0), # now arguments in the order expected by 'interp1_np'
air.lat,
newlat,
)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[4], line 3
1 xr.apply_ufunc(
2 interp1d_np, # first the function
----> 3 air.isel(time=0, lon=0), # now arguments in the order expected by 'interp1_np'
4 air.lat,
5 newlat,
6 )
NameError: name 'air' is not defined
apply_ufunc
needs to know a lot of information about what our function does so that it can reconstruct the outputs. In this case, the size of dimension lat has changed and we need to explicitly specify that this will happen. xarray helpfully tells us that we need to specify the kwarg exclude_dims
.
exclude_dims
¶
exclude_dims : set, optional
Core dimensions on the inputs to exclude from alignment and
broadcasting entirely. Any input coordinates along these dimensions
will be dropped. Each excluded dimension must also appear in
``input_core_dims`` for at least one argument. Only dimensions listed
here are allowed to change size between input and output objects.
[5]:
xr.apply_ufunc(
interp1d_np, # first the function
air.isel(time=0, lon=0), # now arguments in the order expected by 'interp1_np'
air.lat,
newlat,
exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be set!
)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[5], line 3
1 xr.apply_ufunc(
2 interp1d_np, # first the function
----> 3 air.isel(time=0, lon=0), # now arguments in the order expected by 'interp1_np'
4 air.lat,
5 newlat,
6 exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be set!
7 )
NameError: name 'air' is not defined
Core dimensions¶
Core dimensions are central to using apply_ufunc
. In our case, our function expects to receive a 1D vector along lat
— this is the dimension that is “core” to the function’s functionality. Multiple core dimensions are possible. apply_ufunc
needs to know which dimensions of each variable are core dimensions.
input_core_dims : Sequence[Sequence], optional
List of the same length as ``args`` giving the list of core dimensions
on each input argument that should not be broadcast. By default, we
assume there are no core dimensions on any input arguments.
For example, ``input_core_dims=[[], ['time']]`` indicates that all
dimensions on the first argument and all dimensions other than 'time'
on the second argument should be broadcast.
Core dimensions are automatically moved to the last axes of input
variables before applying ``func``, which facilitates using NumPy style
generalized ufuncs [2]_.
output_core_dims : List[tuple], optional
List of the same length as the number of output arguments from
``func``, giving the list of core dimensions on each output that were
not broadcast on the inputs. By default, we assume that ``func``
outputs exactly one array, with axes corresponding to each broadcast
dimension.
Core dimensions are assumed to appear as the last dimensions of each
output in the provided order.
Next we specify "lat"
as input_core_dims
on both air
and air.lat
[6]:
xr.apply_ufunc(
interp1d_np, # first the function
air.isel(time=0, lon=0), # now arguments in the order expected by 'interp1_np'
air.lat,
newlat,
input_core_dims=[["lat"], ["lat"], []],
exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be set!
)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[6], line 3
1 xr.apply_ufunc(
2 interp1d_np, # first the function
----> 3 air.isel(time=0, lon=0), # now arguments in the order expected by 'interp1_np'
4 air.lat,
5 newlat,
6 input_core_dims=[["lat"], ["lat"], []],
7 exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be set!
8 )
NameError: name 'air' is not defined
xarray is telling us that it expected to receive back a numpy array with 0 dimensions but instead received an array with 1 dimension corresponding to newlat
. We can fix this by specifying output_core_dims
[7]:
xr.apply_ufunc(
interp1d_np, # first the function
air.isel(time=0, lon=0), # now arguments in the order expected by 'interp1_np'
air.lat,
newlat,
input_core_dims=[["lat"], ["lat"], []], # list with one entry per arg
output_core_dims=[["lat"]],
exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be set!
)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[7], line 3
1 xr.apply_ufunc(
2 interp1d_np, # first the function
----> 3 air.isel(time=0, lon=0), # now arguments in the order expected by 'interp1_np'
4 air.lat,
5 newlat,
6 input_core_dims=[["lat"], ["lat"], []], # list with one entry per arg
7 output_core_dims=[["lat"]],
8 exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be set!
9 )
NameError: name 'air' is not defined
Finally we get some output! Let’s check that this is right
[8]:
interped = xr.apply_ufunc(
interp1d_np, # first the function
air.isel(time=0, lon=0), # now arguments in the order expected by 'interp1_np'
air.lat,
newlat,
input_core_dims=[["lat"], ["lat"], []], # list with one entry per arg
output_core_dims=[["lat"]],
exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be set!
)
interped["lat"] = newlat # need to add this manually
xr.testing.assert_allclose(expected.isel(time=0, lon=0), interped)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[8], line 3
1 interped = xr.apply_ufunc(
2 interp1d_np, # first the function
----> 3 air.isel(time=0, lon=0), # now arguments in the order expected by 'interp1_np'
4 air.lat,
5 newlat,
6 input_core_dims=[["lat"], ["lat"], []], # list with one entry per arg
7 output_core_dims=[["lat"]],
8 exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be set!
9 )
10 interped["lat"] = newlat # need to add this manually
11 xr.testing.assert_allclose(expected.isel(time=0, lon=0), interped)
NameError: name 'air' is not defined
No errors are raised so it is right!
Vectorization with np.vectorize
¶
Now our function currently only works on one vector of data which is not so useful given our 3D dataset. Let’s try passing the whole dataset. We add a print
statement so we can see what our function receives.
[9]:
def interp1d_np(data, x, xi):
print(f"data: {data.shape} | x: {x.shape} | xi: {xi.shape}")
return np.interp(xi, x, data)
interped = xr.apply_ufunc(
interp1d_np, # first the function
air.isel(
lon=slice(3), time=slice(4)
), # now arguments in the order expected by 'interp1_np'
air.lat,
newlat,
input_core_dims=[["lat"], ["lat"], []], # list with one entry per arg
output_core_dims=[["lat"]],
exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be set!
)
interped["lat"] = newlat # need to add this manually
xr.testing.assert_allclose(expected.isel(time=0, lon=0), interped)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[9], line 8
2 print(f"data: {data.shape} | x: {x.shape} | xi: {xi.shape}")
3 return np.interp(xi, x, data)
6 interped = xr.apply_ufunc(
7 interp1d_np, # first the function
----> 8 air.isel(
9 lon=slice(3), time=slice(4)
10 ), # now arguments in the order expected by 'interp1_np'
11 air.lat,
12 newlat,
13 input_core_dims=[["lat"], ["lat"], []], # list with one entry per arg
14 output_core_dims=[["lat"]],
15 exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be set!
16 )
17 interped["lat"] = newlat # need to add this manually
18 xr.testing.assert_allclose(expected.isel(time=0, lon=0), interped)
NameError: name 'air' is not defined
That’s a hard-to-interpret error but our print
call helpfully printed the shapes of the input data:
data: (10, 53, 25) | x: (25,) | xi: (100,)
We see that air
has been passed as a 3D numpy array which is not what np.interp
expects. Instead we want loop over all combinations of lon
and time
; and apply our function to each corresponding vector of data along lat
. apply_ufunc
makes this easy by specifying vectorize=True
:
vectorize : bool, optional
If True, then assume ``func`` only takes arrays defined over core
dimensions as input and vectorize it automatically with
:py:func:`numpy.vectorize`. This option exists for convenience, but is
almost always slower than supplying a pre-vectorized function.
Using this option requires NumPy version 1.12 or newer.
Also see the documentation for np.vectorize
: https://docs.scipy.org/doc/numpy/reference/generated/numpy.vectorize.html. Most importantly
The vectorize function is provided primarily for convenience, not for performance.
The implementation is essentially a for loop.
[10]:
def interp1d_np(data, x, xi):
print(f"data: {data.shape} | x: {x.shape} | xi: {xi.shape}")
return np.interp(xi, x, data)
interped = xr.apply_ufunc(
interp1d_np, # first the function
air, # now arguments in the order expected by 'interp1_np'
air.lat, # as above
newlat, # as above
input_core_dims=[["lat"], ["lat"], []], # list with one entry per arg
output_core_dims=[["lat"]], # returned data has one dimension
exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be set!
vectorize=True, # loop over non-core dims
)
interped["lat"] = newlat # need to add this manually
xr.testing.assert_allclose(expected, interped)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[10], line 8
2 print(f"data: {data.shape} | x: {x.shape} | xi: {xi.shape}")
3 return np.interp(xi, x, data)
6 interped = xr.apply_ufunc(
7 interp1d_np, # first the function
----> 8 air, # now arguments in the order expected by 'interp1_np'
9 air.lat, # as above
10 newlat, # as above
11 input_core_dims=[["lat"], ["lat"], []], # list with one entry per arg
12 output_core_dims=[["lat"]], # returned data has one dimension
13 exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be set!
14 vectorize=True, # loop over non-core dims
15 )
16 interped["lat"] = newlat # need to add this manually
17 xr.testing.assert_allclose(expected, interped)
NameError: name 'air' is not defined
This unfortunately is another cryptic error from numpy.
Notice that newlat
is not an xarray object. Let’s add a dimension name new_lat
and modify the call. Note this cannot be lat
because xarray expects dimensions to be the same size (or broadcastable) among all inputs. output_core_dims
needs to be modified appropriately. We’ll manually rename new_lat
back to lat
for easy checking.
[11]:
def interp1d_np(data, x, xi):
print(f"data: {data.shape} | x: {x.shape} | xi: {xi.shape}")
return np.interp(xi, x, data)
interped = xr.apply_ufunc(
interp1d_np, # first the function
air, # now arguments in the order expected by 'interp1_np'
air.lat, # as above
newlat, # as above
input_core_dims=[["lat"], ["lat"], ["new_lat"]], # list with one entry per arg
output_core_dims=[["new_lat"]], # returned data has one dimension
exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be a set!
vectorize=True, # loop over non-core dims
)
interped = interped.rename({"new_lat": "lat"})
interped["lat"] = newlat # need to add this manually
xr.testing.assert_allclose(
expected.transpose(*interped.dims), interped # order of dims is different
)
interped
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[11], line 8
2 print(f"data: {data.shape} | x: {x.shape} | xi: {xi.shape}")
3 return np.interp(xi, x, data)
6 interped = xr.apply_ufunc(
7 interp1d_np, # first the function
----> 8 air, # now arguments in the order expected by 'interp1_np'
9 air.lat, # as above
10 newlat, # as above
11 input_core_dims=[["lat"], ["lat"], ["new_lat"]], # list with one entry per arg
12 output_core_dims=[["new_lat"]], # returned data has one dimension
13 exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be a set!
14 vectorize=True, # loop over non-core dims
15 )
16 interped = interped.rename({"new_lat": "lat"})
17 interped["lat"] = newlat # need to add this manually
NameError: name 'air' is not defined
Notice that the printed input shapes are all 1D and correspond to one vector along the lat
dimension.
The result is now an xarray object with coordinate values copied over from data
. This is why apply_ufunc
is so convenient; it takes care of a lot of boilerplate necessary to apply functions that consume and produce numpy arrays to xarray objects.
One final point: lat
is now the last dimension in interped
. This is a “property” of core dimensions: they are moved to the end before being sent to interp1d_np
as was noted in the docstring for input_core_dims
Core dimensions are automatically moved to the last axes of input
variables before applying ``func``, which facilitates using NumPy style
generalized ufuncs [2]_.
Parallelization with dask¶
So far our function can only handle numpy arrays. A real benefit of apply_ufunc
is the ability to easily parallelize over dask chunks when needed.
We want to apply this function in a vectorized fashion over each chunk of the dask array. This is possible using dask’s blockwise
, map_blocks
, or apply_gufunc
. Xarray’s apply_ufunc
wraps dask’s apply_gufunc
and asking it to map the function over chunks using apply_gufunc
is as simple as specifying dask="parallelized"
. With this level of flexibility we need to provide dask with some extra information: 1. output_dtypes
: dtypes of all returned objects, and 2.
output_sizes
: lengths of any new dimensions.
Here we need to specify output_dtypes
since apply_ufunc
can infer the size of the new dimension new_lat
from the argument corresponding to the third element in input_core_dims
. Here I choose the chunk sizes to illustrate that np.vectorize
is still applied so that our function receives 1D vectors even though the blocks are 3D.
[12]:
def interp1d_np(data, x, xi):
print(f"data: {data.shape} | x: {x.shape} | xi: {xi.shape}")
return np.interp(xi, x, data)
interped = xr.apply_ufunc(
interp1d_np, # first the function
air.chunk(
{"time": 2, "lon": 2}
), # now arguments in the order expected by 'interp1_np'
air.lat, # as above
newlat, # as above
input_core_dims=[["lat"], ["lat"], ["new_lat"]], # list with one entry per arg
output_core_dims=[["new_lat"]], # returned data has one dimension
exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be a set!
vectorize=True, # loop over non-core dims
dask="parallelized",
output_dtypes=[air.dtype], # one per output
).rename({"new_lat": "lat"})
interped["lat"] = newlat # need to add this manually
xr.testing.assert_allclose(expected.transpose(*interped.dims), interped)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[12], line 8
2 print(f"data: {data.shape} | x: {x.shape} | xi: {xi.shape}")
3 return np.interp(xi, x, data)
6 interped = xr.apply_ufunc(
7 interp1d_np, # first the function
----> 8 air.chunk(
9 {"time": 2, "lon": 2}
10 ), # now arguments in the order expected by 'interp1_np'
11 air.lat, # as above
12 newlat, # as above
13 input_core_dims=[["lat"], ["lat"], ["new_lat"]], # list with one entry per arg
14 output_core_dims=[["new_lat"]], # returned data has one dimension
15 exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be a set!
16 vectorize=True, # loop over non-core dims
17 dask="parallelized",
18 output_dtypes=[air.dtype], # one per output
19 ).rename({"new_lat": "lat"})
20 interped["lat"] = newlat # need to add this manually
21 xr.testing.assert_allclose(expected.transpose(*interped.dims), interped)
NameError: name 'air' is not defined
Yay! our function is receiving 1D vectors, so we’ve successfully parallelized applying a 1D function over a block. If you have a distributed dashboard up, you should see computes happening as equality is checked.
High performance vectorization: gufuncs, numba & guvectorize¶
np.vectorize
is a very convenient function but is unfortunately slow. It is only marginally faster than writing a for loop in Python and looping. A common way to get around this is to write a base interpolation function that can handle nD arrays in a compiled language like Fortran and then pass that to apply_ufunc
.
Another option is to use the numba package which provides a very convenient guvectorize
decorator: https://numba.pydata.org/numba-doc/latest/user/vectorize.html#the-guvectorize-decorator
Any decorated function gets compiled and will loop over any non-core dimension in parallel when necessary. We need to specify some extra information:
Our function cannot return a variable any more. Instead it must receive a variable (the last argument) whose contents the function will modify. So we change from
def interp1d_np(data, x, xi)
todef interp1d_np_gufunc(data, x, xi, out)
. Our computed results must be assigned toout
. All values ofout
must be assigned explicitly.guvectorize
needs to know the dtypes of the input and output. This is specified in string form as the first argument. Each element of the tuple corresponds to each argument of the function. In this case, we specifyfloat64
for all inputs and outputs:"(float64[:], float64[:], float64[:], float64[:])"
corresponding todata, x, xi, out
Now we need to tell numba the size of the dimensions the function takes as inputs and returns as output i.e. core dimensions. This is done in symbolic form i.e.
data
andx
are vectors of the same length, sayn
;xi
and the outputout
have a different length, saym
. So the second argument is (again as a string)"(n), (n), (m) -> (m)."
corresponding again todata, x, xi, out
[13]:
from numba import float64, guvectorize
@guvectorize("(float64[:], float64[:], float64[:], float64[:])", "(n), (n), (m) -> (m)")
def interp1d_np_gufunc(data, x, xi, out):
# numba doesn't really like this.
# seem to support fstrings so do it the old way
print(
"data: " + str(data.shape) + " | x:" + str(x.shape) + " | xi: " + str(xi.shape)
)
out[:] = np.interp(xi, x, data)
# gufuncs don't return data
# instead you assign to a the last arg
# return np.interp(xi, x, data)
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
Cell In[13], line 1
----> 1 from numba import float64, guvectorize
4 @guvectorize("(float64[:], float64[:], float64[:], float64[:])", "(n), (n), (m) -> (m)")
5 def interp1d_np_gufunc(data, x, xi, out):
6 # numba doesn't really like this.
7 # seem to support fstrings so do it the old way
8 print(
9 "data: " + str(data.shape) + " | x:" + str(x.shape) + " | xi: " + str(xi.shape)
10 )
ModuleNotFoundError: No module named 'numba'
The warnings are about object-mode compilation relating to the print
statement. This means we don’t get much speed up: https://numba.pydata.org/numba-doc/latest/user/performance-tips.html#no-python-mode-vs-object-mode. We’ll keep the print
statement temporarily to make sure that guvectorize
acts like we want it to.
[14]:
interped = xr.apply_ufunc(
interp1d_np_gufunc, # first the function
air.chunk(
{"time": 2, "lon": 2}
), # now arguments in the order expected by 'interp1_np'
air.lat, # as above
newlat, # as above
input_core_dims=[["lat"], ["lat"], ["new_lat"]], # list with one entry per arg
output_core_dims=[["new_lat"]], # returned data has one dimension
exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be a set!
# vectorize=True, # not needed since numba takes care of vectorizing
dask="parallelized",
output_dtypes=[air.dtype], # one per output
).rename({"new_lat": "lat"})
interped["lat"] = newlat # need to add this manually
xr.testing.assert_allclose(expected.transpose(*interped.dims), interped)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[14], line 2
1 interped = xr.apply_ufunc(
----> 2 interp1d_np_gufunc, # first the function
3 air.chunk(
4 {"time": 2, "lon": 2}
5 ), # now arguments in the order expected by 'interp1_np'
6 air.lat, # as above
7 newlat, # as above
8 input_core_dims=[["lat"], ["lat"], ["new_lat"]], # list with one entry per arg
9 output_core_dims=[["new_lat"]], # returned data has one dimension
10 exclude_dims=set(("lat",)), # dimensions allowed to change size. Must be a set!
11 # vectorize=True, # not needed since numba takes care of vectorizing
12 dask="parallelized",
13 output_dtypes=[air.dtype], # one per output
14 ).rename({"new_lat": "lat"})
15 interped["lat"] = newlat # need to add this manually
16 xr.testing.assert_allclose(expected.transpose(*interped.dims), interped)
NameError: name 'interp1d_np_gufunc' is not defined
Yay! Our function is receiving 1D vectors and is working automatically with dask arrays. Finally let’s comment out the print line and wrap everything up in a nice reusable function
[15]:
from numba import float64, guvectorize
@guvectorize(
"(float64[:], float64[:], float64[:], float64[:])",
"(n), (n), (m) -> (m)",
nopython=True,
)
def interp1d_np_gufunc(data, x, xi, out):
out[:] = np.interp(xi, x, data)
def xr_interp(data, dim, newdim):
interped = xr.apply_ufunc(
interp1d_np_gufunc, # first the function
data, # now arguments in the order expected by 'interp1_np'
data[dim], # as above
newdim, # as above
input_core_dims=[[dim], [dim], ["__newdim__"]], # list with one entry per arg
output_core_dims=[["__newdim__"]], # returned data has one dimension
exclude_dims=set((dim,)), # dimensions allowed to change size. Must be a set!
# vectorize=True, # not needed since numba takes care of vectorizing
dask="parallelized",
output_dtypes=[
data.dtype
], # one per output; could also be float or np.dtype("float64")
).rename({"__newdim__": dim})
interped[dim] = newdim # need to add this manually
return interped
xr.testing.assert_allclose(
expected.transpose(*interped.dims),
xr_interp(air.chunk({"time": 2, "lon": 2}), "lat", newlat),
)
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
Cell In[15], line 1
----> 1 from numba import float64, guvectorize
4 @guvectorize(
5 "(float64[:], float64[:], float64[:], float64[:])",
6 "(n), (n), (m) -> (m)",
7 nopython=True,
8 )
9 def interp1d_np_gufunc(data, x, xi, out):
10 out[:] = np.interp(xi, x, data)
ModuleNotFoundError: No module named 'numba'
This technique is generalizable to any 1D function.