#!/bin/env python
# -*- coding: utf-8 -*-
# """
# Created on 09.12.16
# Created for pymepps
# @author: Tobias Sebastian Finn,
#     Copyright (C) {2016}  {Tobias Sebastian Finn}
#     This program is free software: you can redistribute it and/or modify
#     it under the terms of the GNU General Public License as published by
#     the Free Software Foundation, either version 3 of the License, or
#     (at your option) any later version.
#     This program is distributed in the hope that it will be useful,
#     but WITHOUT ANY WARRANTY; without even the implied warranty of
#     GNU General Public License for more details.
#     You should have received a copy of the GNU General Public License
#     along with this program.  If not, see <>.
# """
# System modules
import logging
from collections import OrderedDict
import re
import datetime

# External modules
import xarray as xr
import numpy as np

# Internal modules
import pymepps
from .base import MetData
from pymepps.grid.builder import GridBuilder
from pymepps.loader.datasets.tsdataset import TSDataset
from pymepps.loader.filehandler.netcdfhandler import cube_to_series

logger = logging.getLogger(__name__)

[docs]@xr.register_dataarray_accessor('pp') class SpatialAccessor(MetData): """ The SpatialAccessor extends a xarray.DataArray for post-processing of meteorological numerical weather model data. The SpatialAccessor works mostly with grid and gridded data. """ def __init__(self, data, grid=None): super().__init__(data) self._grid = None self.grid = grid def __str__(self): try: grid = str(self.grid) except TypeError: grid = str(None) name = "{0:s}({1:s})".format(self.__class__.__name__, return "{0:s}\n{1:s}\nGrid: {1:s}".format(name, '-'*len(name), grid) @property def grid(self): """ The corresponding grid of this xarray.DataArray instance. This grid is used to interpolate/remap the data and to select the nearest grid point to a given longitude/latitude pair. """ if self._grid is None: raise TypeError('This DataArray has no grid defined!') else: return self._grid @grid.setter def grid(self, grid): if grid is not None and not hasattr(grid, '_grid_dict'): raise TypeError('The given grid is not a valid defined grid type!') self._grid = grid
[docs] def set_grid(self, grid=None): """ Set the grid to the given grid and set the grid coordinates. It is assumed that the last n dimensions (n=1 for unstructured grid, n=2 all other grids) are the grid coordinates. Please make sure that this assumption is fulfilled! Parameters ---------- grid : Grid or None, optional This grid is used to set the grid and the grid coordinates of the returned array. If this is None, the grid of this DataArray instance is used. Default is None. Returns ------- gridded_array : xarray.DataArray The DataArray with the grid coordinates and the grid. Raises ------ ValueError A ValueError is raised if the grid of this instance is used and not grid set. """ if grid is None and self._grid is not None: grid = self.grid elif grid is None: raise ValueError('The grid of this DataArray is used and not set!') coord_names = grid.get_coord_names() rename_dict = {new: old for new, old in zip([-grid.len_coords:], coord_names,)} gridded_array = new_coordinates = grid.get_coords() for coord in coord_names: gridded_array.coords[coord] = new_coordinates[coord] gridded_array.pp.grid = grid gridded_array = gridded_array.pp.check_data_coordinates(gridded_array) return gridded_array
[docs] def check_data_coordinates(self, item): """ Check if items grid coordinates shape is the same as those of the grid. Parameters ---------- item: xarray.DataArray Instance to test for type and grid dimension length. Returns ------- item: xarray.DataArray The checked item. Raises ------ TypeError: The grid is not set. ValueError: The given item has not the same last coordinates as the grid. """ item_grid_shape = item.shape[-self.grid.len_coords:] coords_equal = all( [np.equal(item, grid) for item, grid in zip(item_grid_shape, self.grid.shape)]) if not coords_equal: raise ValueError('The item {0:s} has not the right last dimensions.' 'They need to be the same as the grid!') return item
[docs] def normalize_coords(self, runtime=None, ensemble='det', validtime=None, height=None): """ Normalize the coordinates of the DataArray. The number, order and names of the coordinates are normalized. The number of coordinates will be four to six, depending if the DataArray is a merged multi-variable DataArray and the number of grid coordinates. The values of the added coordinates is set to the given values or will be None as filling value. The order and name of the DataArray will be: - (variable) (Only if the DataArray is a multi-variable DataArray. This is the variable name) - runtime (The analysis time of the model. The model is started at this time. The runtime is np.datetime64 as type) - ensemble (The ensemble member of the model.) - validtime (The lead time of the model. The model is valid for this times. The validtime timedelta to the runtime.) - height (The height information of the model.) - first grid coordinate - (second grid coordinate) (Only if the grid is not an unstructured grid) Parameters ---------- runtime : datetime.datetime, np.datetime64 or None, optional The runtime of the model. The runtime will be converted to np.datetime64[ns] if it is not already this type. Default is None. ensemble : int or str, optional The ensemble member of the model. An integer is indicating the member number, with zero as control run. Default is 'det'. validtime : datetime.datetime, np.datetiime64, np.timedelta or None, optional The validtime of the model. The validtime is converted to np.timedelta. Default is None. height : int, str or None, optional The height of the model. Default is None. Returns ------- normalized_array : xr.DataArray The DataArray with normalized coordinates. """ arg_dict = locals() coord_dict = OrderedDict( height=dict( approx=['height', 'surf', 'sig', 'lev', 'press', 'alti'], exact=['height', ] ), validtime=dict( approx=['lead', ], exact=['validtime', 'time'] ), ensemble=dict( approx=['ens', 'mem', 'num'], exact=['ensemble', ] ), runtime=dict( approx=['ana', 'ref', 'run'], exact=['runtime'] ) ) normalized_array = for key in coord_dict.keys(): coord_name = self._get_coord_name(data=normalized_array, variants=coord_dict[key]) if coord_name is None: normalized_array = self._create_coord(coord=key, value=arg_dict[key], data=normalized_array) else: normalized_array = self._rename_coord(orig=coord_name, to=key, data=normalized_array) normalized_array = self._get_normalized_order(normalized_array) normalized_array = self._transform_datetime(normalized_array) normalized_array = self._validtime_to_timedelta(normalized_array) try: normalized_array = normalized_array.pp.set_grid(self.grid) except TypeError: pass return normalized_array
@staticmethod def _get_coord_name(data, variants): """ Check if the coordinate name variants is any dimensions within the DataArray. Parameters ---------- data : xarray.DataArray The coordinate is searched within the coordinates of this DataArray. variants : dict(str, list(str)) or list(str) These variants are checked within the dimensions of the DataArray. If variants is a dict, it needs exact and approx as key with a sublist of variants. The exact list is used to check the exact dimension name. The approx list is used to check if the variant is within the name of a dimension. If variants is a list of strings the list values are used for exact matching. Returns ------- str or None The matched dimension is returned. If the return value is None, no matching dimension was found. """ if isinstance(variants, (tuple, list)): variants = dict(exact=variants, approx=[]) data_dims = [re.sub('[^a-zA-Z]+', '', d) for d in data.dims] for k, dim in enumerate(data_dims): for variant in variants['exact']: if variant == dim: return data.dims[k] for variant in variants['approx']: if variant in dim: return data.dims[k] return None @staticmethod def _get_normalized_order(data): if 'variable' in data.dims: normalized_order = ['variable'] else: normalized_order = [] normalized_order.extend(['runtime', 'ensemble', 'validtime', 'height']) normalized_order.extend([ dim for dim in data.dims if dim not in normalized_order ]) normalized_array = data.transpose(*normalized_order) return normalized_array @staticmethod def _transform_datetime(data): """ Transform the datetime dimensions of the given data to np.datetime64[ns]. Parameters ---------- data : xr.DataArray The dimensions of this DataArray are used for the transformation. Returns ------- transformed_data : xr.DataArray The DataArray with the transformed time coordinates. """ transformed_data = data.copy() dims_to_transform = [ dim for dim in data.dims if isinstance(data[dim].values[0], (datetime.datetime, np.datetime64))] for dim in dims_to_transform: transformed_data[dim] = transformed_data[dim].astype( 'datetime64[ns]') return transformed_data @staticmethod def _create_coord(data, coord, value=None,): """ Create a coordinate within the given DataArray with given value. Parameters ---------- data : xr.DataArray The DataArray is used to create and add the coordinate. coord : str The name of the coordinate. value : obj or None, optional The value of the coordinate. None is used as filling value. Default is None. Returns ------- coordinated_array : xr.DataArray The DataArray with the added coordinate. Raises ------ ValueError A coordinate with the same name already exists within the coordinates dict or the dimension list of the DataArray. """ coordinated_array = data.expand_dims(coord) coordinated_array[coord] = np.array((value,)) return coordinated_array @staticmethod def _rename_coord(data, orig, to): """ Rename a given coordinate to a new name. Parameters ---------- data : xr.DataArray The DataArray is used to rename the coordinate. orig : str The name of the original coordinate. The coordinate needs to be within the DataArray coordinates. to : str The coordinate is renamed to this name. Returns ------- renamed_array : xr.DataArray The DataArray with the renamed coordinate. """ renamed_array = data.rename({orig: to}) return renamed_array @staticmethod def _validtime_to_timedelta(data, validtime='validtime', runtime='runtime'): """ Transform the validtime coordinate from a np.datetime64 coordinate to a np.timedelta coordinate if also the runtime coordinate is a np.datetime64 coordinate. The timedelta is create with :math:`validtime-runtime`. Parameters ---------- data : xr.DataArray The DataArray is used to transform the validtime coordinate. validtime : str, optional Name of the validtime coordinate. Default is validtime. runtime : str, optional Name of the runtime coordinate. Default is runtime. Returns ------- transformed_array : xr.DataArray The DataArray with the transformed validtime coordinate. """ transformed_array = data.copy() runtime_values = data[runtime].values validtime_values = data[validtime].values if np.issubdtype(runtime_values.dtype, np.datetime64) and \ np.issubdtype(validtime_values.dtype, np.datetime64): transformed_array[validtime] = validtime_values - runtime_values return transformed_array
[docs] def merge(self, *items): """ The merge routine could be used to merge this SpatialData instance with other instances. The merge creates a new merge dimension, named after the variable names. The grid of this instance is used as merged grid. Parameters ---------- items : xarray.DataArray The items are merged with this xarray.DataArray instance. The grid dimensions have to be same as the grid. Returns ------- merged_array : xarray.DataArray The DataArray instance with the merged data. """ update_data = [self.check_data_coordinates(, ] update_data += [self.check_data_coordinates(item) for item in items] dataset_data = [ item.to_dataset('variable') if 'variable' in item.coords else item for item in update_data] merged_data = xr.merge(dataset_data).to_array(name='merged_array') merged_data.pp.grid = self.grid return merged_data
[docs] def update(self, *items): """ The update routine could be used to update the DataArray, based on other DataArrays. There are some assumptions done: 1. The used data to update this DataArray instance has the same grid coordinates as this instance. 2. Beginning from the left the given items are used to update the data. Such that intersection problems are resolved in favor of the newest data. Parameters ---------- items : xarray.DataArray The items are merged with this xarray.DataArray instance. The grid dimensions have to be same as the grid. Returns ------- merged_array : xarray.DataArray The DataArray instance with the updated data. """ update_data = [self.check_data_coordinates(, ] update_data += [self.check_data_coordinates(item) for item in items] stack_dims = [dim for dim in if dim not in self.grid.get_coord_names()] logger.debug('Stack dimension names: {0}'.format(stack_dims)) stacked_data = (d.stack(merge=stack_dims) for d in update_data) try: concated_array = xr.concat(stacked_data, dim='merge') except (ValueError, TypeError) as e: raise e.__class__("The concatenation doesn't working, for " 'please see above for the reasons!') resolving_indexes = ~concated_array.indexes['merge'].duplicated( keep='last') logger.debug('Number of resolving indexes: {0:d}/{1:d}'.format( len(resolving_indexes), len(concated_array.indexes['merge']))) resolved_array = concated_array[..., resolving_indexes] unstacked_array = resolved_array.unstack('merge') updated_array = unstacked_array.transpose(* updated_array.pp.grid = self.grid return updated_array
[docs] def merge_analysis_timedelta(self, analysis_axis='runtime', timedelta_axis='validtime'): """ The analysis time axis will be merged with the valid time axis, which should be given as timedelta. The merged time coordinate is called time and will be the first coordinate . Parameters ---------- analysis_axis : str, optional The analysis time axis name. This axis will be used as basis for the valid time. Default is runtime. timedelta_axis : str, optional The time delta axis name. This axis should contain the difference to the analysis time. Returns ------- merged_array : xarray.DataArray The DataArray with the merged analysis and timedelta coordinate. """ stacked_data = time=[analysis_axis, timedelta_axis]) stacked_data.coords['time'] = [ val[0]+val[1] for val in stacked_data.time.values] dims_to_transpose = ['time', ] + list(stacked_data.dims[:-1]) merged_data = stacked_data.transpose(*dims_to_transpose) try: merged_data.pp.grid = self.grid except TypeError: pass return merged_data
[docs] def to_pandas(self, lonlat=None): """ Transform the DataArray to Pandas based on given coordinates. If coordinates are given this method selects the nearest neighbour grid point to this coordinates. The data is flatten to a 2d-DataFrame with the time as row axis. Parameters ---------- lonlat : tuple(float, float) or None The nearest grid point to this coordinates (longitude, latitude) is used to generate the pandas data. If lonlat is None no coordinates will be selected and the data is flatten. If the horizontal grid coordinates are not a single point it is recommended to set lonlat. Returns ------- extracted_data : pandas.Series or pandas.DataFrame The extracted pandas data. The data is based on either a Series (1 Column) or Dataframe (multiple column) depending on the dimensions. """ if isinstance(lonlat, (list, tuple)) and len(lonlat) == 2: extracted_data = self.grid.get_nearest_point(, coord=reversed(lonlat)) dims_wo_grid = [dim for dim in if dim not in self.grid.get_coord_names()] coords = {dim:[dim] for dim in dims_wo_grid} cube = xr.DataArray( extracted_data, coords=coords, dims=dims_wo_grid, ) else: cube = series_data = cube_to_series(cube, ts_ds = TSDataset(None, data_origin=self, lonlat=lonlat) extracted_data = ts_ds.data_merge(series_data, return extracted_data
[docs] def remapnn(self, new_grid): """ Remap the horizontal grid with a nearest neighbour approach to a given new grid. Parameters ---------- new_grid : Child instance of Grid The data is remapped to this grid. Returns ------- remapped_array : xarray.DataArray The xarray.DataArray with the replaced grid. """ remapped_array = self.grid.interpolate(, new_grid, order=0) remapped_array.pp.grid = new_grid return remapped_array
[docs] def remapbil(self, new_grid): """ Remap the horizontal grid with a bilinear approach to a given new grid. Parameters ---------- new_grid : Child instance of Grid The data is remapped to this grid. Returns ------- remapped_array : xarray.DataArray The xarray.DataArray with the replaced grid. """ remapped_array = self.grid.interpolate(, new_grid, order=1) remapped_array.pp.grid = new_grid return remapped_array
[docs] def selpoint(self, lonlat): """ Select a longitude, latitude point within this DataArray. A new lonlat grid with a single point is created. Parameters ---------- lonlat : tuple(float) The longitude and latitude point as degree. The nearest neighbour point to this given coordinate pair is used. Returns ------- sliced_array : xarray.DataArray The sliced data array with the data for the nearest neighbour point to the given coordinates. """ sliced_array = self.grid.get_nearest_point(, reversed(lonlat)) grid_dict = { 'gridtype': 'lonlat', 'xlongname': 'longitude', 'xname': 'lon', 'xunits': 'degrees', 'ylongname': 'latitude', 'yname': 'lat', 'yunits': 'degrees', 'xsize': 1, 'ysize': 1, 'xvals': [lonlat[0], ], 'yvals': [lonlat[1], ] } sliced_grid = GridBuilder(grid_dict).build_grid() grid_coords = sliced_grid.get_coord_names() dim_order = list(sliced_array.dims)+list(grid_coords) sliced_array = sliced_array.expand_dims(grid_coords) sliced_array = sliced_array.transpose(*dim_order) sliced_array = sliced_array.pp.set_grid(sliced_grid) return sliced_array
[docs] def sellonlatbox(self, lonlatbox): """ This DataArray instance is sliced by given lonlatbox. A new grid is created and set based on the sliced coordinates. Parameters ---------- lonlatbox : tuple(float) The longitude and latitude box with four entries as degree. The entries are handled in the following way: (left/west, top/north, right/east, bottom/south) Returns ------- sliced_array : xarray.DataArray The sliced data array with the new grid. Notes ----- For some grids the new grid is based on an UnstructuredGrid, due to technical limitations. """ # TODO: Normalize lon lat values. sliced_array, sliced_grid = self.grid.lonlatbox(, lonlatbox) sliced_array.pp.grid = sliced_grid return sliced_array
[docs] def grid_to_attrs(self): grid_array = grid_attr = {'ppgrid_{0:s}'.format(k): self.grid._grid_dict[k] for k in self.grid._grid_dict} grid_array.attrs.update(grid_attr) return grid_array
[docs] def save(self, save_path): """ Save the DataArray and the grid as attributes together. The grid attributes are used by the load method to recreate the grid, but it is also possible to load the data with the normal xarray load functions. Parameters ---------- save_path : str The path where the netcdf file should be saved. """ try: save_array = self.grid_to_attrs() except TypeError: save_array = save_array.to_netcdf(save_path)
[docs] @staticmethod def load(load_path): """ Load a NetCDF-based previously saved xarray.DataArray instance. If the NetCDF file has grid attributes they will be decoded as new grid. Parameters ---------- load_path : str The path to the saved xarray.DataArray instance. Returns ------- loaded_array : xarray.DataArray The loaded DataArray instance. If a grid could be created it will be set to the DataArray instance. """ loaded_array = xr.open_dataarray(load_path) grid_attrs = [attr for attr in loaded_array.attrs if attr[:7] == 'ppgrid_'] grid_dict = {attr[7:]: loaded_array.attrs[attr] for attr in grid_attrs} try: loaded_grid = GridBuilder(grid_dict).build_grid() loaded_array.pp.grid = loaded_grid for key in grid_attrs: loaded_array.attrs.pop(key, None) except (KeyError, ValueError): pass return loaded_array