forked from xarray-contrib/cf-xarray
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcoding.py
More file actions
131 lines (113 loc) · 4.86 KB
/
coding.py
File metadata and controls
131 lines (113 loc) · 4.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
Encoders and decoders for CF conventions not implemented by Xarray.
"""
import numpy as np
import pandas as pd
import xarray as xr
def encode_multi_index_as_compress(ds, idxnames=None):
"""
Encode a MultiIndexed dimension using the "compression by gathering" CF convention.
Parameters
----------
ds : xarray.Dataset
Dataset with at least one MultiIndexed dimension.
idxnames : hashable or iterable of hashable, optional
Dimensions that are MultiIndex-ed. If None, will detect all MultiIndex-ed dimensions.
Returns
-------
xarray.Dataset
Encoded Dataset with ``name`` as a integer coordinate with a ``"compress"`` attribute.
References
----------
CF conventions on `compression by gathering <http://cfconventions.org/Data/cf-conventions/cf-conventions-1.8/cf-conventions.html#compression-by-gathering>`_
"""
if idxnames is None:
idxnames = tuple(
name
for name, idx in ds.indexes.items()
if isinstance(idx, pd.MultiIndex)
# After the flexible indexes refactor, all MultiIndex Levels
# have a MultiIndex but the name won't match.
# Prior to that refactor, there is only a single MultiIndex with name=None
and (idx.name == name if idx.name is not None else True)
)
elif isinstance(idxnames, str):
idxnames = (idxnames,)
if not idxnames:
raise ValueError("No MultiIndex-ed dimensions found in Dataset.")
encoded = ds.reset_index(idxnames)
for idxname in idxnames:
mindex = ds.indexes[idxname]
coords = dict(zip(mindex.names, mindex.levels))
encoded.update(coords)
for c in coords:
encoded[c].attrs = ds[c].attrs
encoded[c].encoding = ds[c].encoding
encoded[idxname] = np.ravel_multi_index(mindex.codes, mindex.levshape)
encoded[idxname].attrs = ds[idxname].attrs.copy()
if (
"compress" in encoded[idxname].encoding
or "compress" in encoded[idxname].attrs
):
raise ValueError(
f"Does not support the 'compress' attribute in {idxname}.encoding or {idxname}.attrs. "
"This is generated automatically."
)
encoded[idxname].attrs["compress"] = " ".join(mindex.names)
return encoded
def decode_compress_to_multi_index(encoded, idxnames=None):
"""
Decode a compressed variable to a pandas MultiIndex.
Parameters
----------
encoded : xarray.Dataset
Encoded Dataset with variables that use "compression by gathering".capitalize.
idxnames : hashable or iterable of hashable, optional
Variable names that represents a compressed dimension. These variables must have
the attribute ``"compress"``. If None, will detect all indexes with a ``"compress"``
attribute and decode those.
Returns
-------
xarray.Dataset
Decoded Dataset with ``name`` as a MultiIndexed dimension.
References
----------
CF conventions on `compression by gathering <http://cfconventions.org/Data/cf-conventions/cf-conventions-1.8/cf-conventions.html#compression-by-gathering>`_
"""
decoded = xr.Dataset(data_vars=encoded.data_vars, attrs=encoded.attrs.copy())
if idxnames is None:
idxnames = tuple(
name for name in encoded.indexes if "compress" in encoded[name].attrs
)
elif isinstance(idxnames, str):
idxnames = (idxnames,)
for idxname in idxnames:
if "compress" not in encoded[idxname].attrs:
raise ValueError("Attribute 'compress' not found in provided Dataset.")
if not isinstance(encoded, xr.Dataset):
raise ValueError(
f"Must provide a Dataset. Received {type(encoded)} instead."
)
names = encoded[idxname].attrs["compress"].split(" ")
shape = [encoded.sizes[dim] for dim in names]
indices = np.unravel_index(encoded[idxname].data, shape)
try:
from xarray.indexes import PandasMultiIndex
variables = {
dim: encoded[dim].isel({dim: xr.Variable(data=index, dims=idxname)})
for dim, index in zip(names, indices)
}
decoded = decoded.assign_coords(variables).set_xindex(
names, PandasMultiIndex
)
except ImportError:
arrays = [encoded[dim].data[index] for dim, index in zip(names, indices)]
mindex = pd.MultiIndex.from_arrays(arrays, names=names)
decoded.coords[idxname] = mindex
decoded[idxname].attrs = encoded[idxname].attrs.copy()
for coord in names:
variable = encoded._variables[coord]
decoded[coord].attrs = variable.attrs.copy()
decoded[coord].encoding = variable.encoding.copy()
del decoded[idxname].attrs["compress"]
return decoded