Source code for eocrops.utils.data_loader

from eolearn.core import EOPatch, FeatureType, AddFeatureTask

import glob
import numpy as np

import os

from eolearn.geometry import ErosionTask

import eocrops.tasks.curve_fitting
import eocrops.tasks.preprocessing as preprocessing
import warnings
from eocrops.tasks.vegetation_indices import VegetationIndicesS2 as s2_vis

###########################################################################################################


[docs]class EOPatchDataset: def __init__( self, root_dir, features_data, suffix="", resampling=None, range_doy=(1, 365), bands_name="BANDS-S2-L2A", function=np.nanmedian, ): """ root_dir (str) : root path where EOPatch are saved features_data (list of tuples) : features to aggregate in the dataset suffix (str) : suffix of the EOPatch file names to read only a subset of EOPatch (e.g. '_S2'). This is very useful if you have several data sources in the same root_dir. resampling (dict) : resampling period to make EOPatch at the same time scale and timely comparables (e.g. 8-days period) range_doy (tuple) : suset of the time series w.r.t a range of day of the year (e.g. between the 1st day and the 365th day) function (np.functon) : function to aggegate pixels not masked into a single time series """ import tensorflow as tf import tensorflow_datasets as tfds global tf global tfds self.root_dir = root_dir self.features_data = features_data self.suffix = suffix self.mask = (FeatureType.MASK_TIMELESS, "MASK") self.range_doy = range_doy if resampling is None: resampling = dict(start="01-01", end="12-31", day_periods=8) warnings.warn( "You must specify a resampling periods to make your observations comparable. The default is set to " + str(resampling) ) self.resampling = resampling self.function = function self.bands_name = bands_name try: self.AUTOTUNE = tf.data.AUTOTUNE except: self.AUTOTUNE = tf.data.experimental.AUTOTUNE def _instance_tf_ds(self): """ initalize tf.data.Dataset w.r.t the file names in self.root_dir_or_list """ file_pattern = os.path.join(self.root_dir, "*" + self.suffix) files = glob.glob(file_pattern) if len(files) == 0: raise ValueError( "No file in the root directory " + self.root_dir + " ending with " + self.suffix ) files.sort() self.dataset = tf.data.Dataset.from_tensor_slices(files) self.vector_dataset = tf.data.Dataset.from_tensor_slices(files) @staticmethod def _interpolate_feature(eopatch, features, **kwargs): """ Perform gapfilling over a new time window or not. """ kwargs["features"] = features interp = preprocessing.InterpolateFeatures(**kwargs) eopatch = interp.execute(eopatch) return eopatch def _execute_gap_filling( self, eopatch, resampled_range, copy_features, algorithm="linear" ): """ Gap-filling to interpolate missing pixels and/or resample time series Parameters ---------- eopatch (EOPatch) : patch to preprocess resampled_range (tuple) : resampled range (e.g. ("2020-01-01", "2020-12-12")) copy_features (list) : original features to keep in the EOPatch algorithm (str) : type of algorithm to do gap-filling Returns EOPatch ------- """ kwargs = dict( copy_features=copy_features, resampled_range=resampled_range, algorithm=algorithm, ) features = [ (FeatureType.DATA, fname) for ftype, fname, _, _, _ in self.features_data ] return self._interpolate_feature(eopatch=eopatch, features=features, **kwargs) def _prepare_eopatch( self, patch, resampled_range, algorithm="linear", disk_radius=1 ): """ Preprocess EOPatch (making pixels, gap filling and temporal resampling) Parameters ---------- patch (EOPatch) : patch to preprocess resampled_range (tuple) : period to resample the patch algorithm (str) : typle of algorithm for gap filling (linear or cubic) disk_radius (int) : Number of pixels to erode w.r.t field boundaries Returns EOPatch ------- """ if algorithm not in ["linear", "cubic"]: raise ValueError('Algorithm can only be "linear" of "cubic"') polygon_mask = (patch.data_timeless["FIELD_ID"] > 0).astype(np.int32) add_feature = AddFeatureTask(self.mask) add_feature.execute(eopatch=patch, data=polygon_mask.astype(bool)) # patch.add_feature(self.mask[0], self.mask[1], polygon_mask.astype(bool)) erode = ErosionTask(mask_feature=self.mask, disk_radius=disk_radius) erode.execute(patch) add_vis = s2_vis(biophysical=False, feature_name=self.bands_name) add_vis.execute(eopatch=patch) patch = self._execute_gap_filling( eopatch=patch, resampled_range=resampled_range, algorithm=algorithm, copy_features=[self.mask], ) return patch @staticmethod def _retrieve_range_doy( path, meta_file, range_doy, path_column, planting_date_column, harvest_date_column, ): """ Get for each EOPatch the start and end of the season. Parameters ---------- path (str) : path of the EOPatch saved meta_file (pd.DataFrame) : DataFrame with additional information for each EOPatch range_doy (tuple) : approximate range of starting and ending of the season if not planting and harvest dates available path_column (str) : column name from the meta_file with EOPatch paths planting_date_column (str) : column name from the meta_file with EOPatch paths harvest_date_column (str) : column name from the meta_file with EOPatch paths Returns (tuple) start of the season and the end of the season. ------- """ meta_file_subset = meta_file[meta_file[path_column] == path] if meta_file_subset.shape[0] == 0: raise ValueError( "We cannot find a correspond path for the patch in the meta file" ) if planting_date_column is not None: if planting_date_column not in meta_file_subset.columns: raise ValueError( f"The column {planting_date_column} is not in the meta file" ) range_doy[0] = meta_file_subset[planting_date_column].values[0] if np.isnan(range_doy[0]): range_doy[0] = np.nanmedian(meta_file[planting_date_column].values) if harvest_date_column is not None: if harvest_date_column not in meta_file_subset.columns: raise ValueError( f"The column {harvest_date_column} is not in the meta file" ) range_doy[1] = meta_file_subset[harvest_date_column].values[0] if np.isnan(range_doy[1]): range_doy[1] = np.nanmedian(meta_file[harvest_date_column].values) return range_doy def _read_patch( self, path, algorithm="linear", doubly_logistic=False, asym_gaussian=False, return_params=False, meta_file=None, path_column=None, planting_date_column=None, harvest_date_column=None, window_planting=0, window_harvest=0, ): """ Read and preprocess EOPatch given a path. Asymmetric and logistic function can be fitted (https://www.sciencedirect.com/science/article/abs/pii/S0034425712001629) Parameters ---------- path (str) : path where the EOPatch is saved algorithm (str) : type of algorithm to interpolate missing pixels doubly_logistic (bool) : reconstruct time series using doubly logistic fitting asym_gaussian (bool) : reconstruct time series using asymmetric gaussian return_params (bool) : return parameters estimated from doubly or assymetric functions to get growth metrics meta_file (pd.DataFrame) : DataFrame with meta info regarding planting and dates. Useful for thresholding before preprocessing. path_column (str) : column from meta_file which give the EOPatch path for a given observation. planting_date_column (str) : column from meta_file where planting date column (DOY) is referred harvest_date_column (str) : column from meta_file where harvest date column (DOY) is referred window_planting (int) : window before planting date to fit asymmetric ou doubly before N days window_harvest (int) : window after planting date to fit asymmetric ou doubly after N days Returns (np.arrray) : 3D np.array (1, t, d) ~ EOPatch resampled ------- """ def _func(path): path = path.numpy().decode("utf-8") # Load only relevant features ################################################################ # path = os.path.join(root_dir, os.listdir(root_dir)[0]) patch = EOPatch.load(path) if doubly_logistic or asym_gaussian: resampled_range = None else: year = str(patch.timestamp[0].year) start, end = "{0}-{1}".format( year, self.resampling["start"] ), "{0}-{1}".format(year, self.resampling["end"]) resampled_range = (start, end, self.resampling["day_periods"]) patch = self._prepare_eopatch(patch, resampled_range, algorithm) range_doy = self.range_doy if meta_file is not None: range_doy = self._retrieve_range_doy( path, meta_file, list(range_doy), path_column, planting_date_column, harvest_date_column, ) range_doy = ( int(range_doy[0]) - window_planting, int(range_doy[1]) + window_harvest, ) curve_fitting = eocrops.tasks.curve_fitting.DoublyLogistic( range_doy=range_doy ) if asym_gaussian: curve_fitting = eocrops.tasks.curve_fitting.AsymmetricGaussian( range_doy=range_doy ) ################################################################# data = [] for feat_type, feat_name, _, dtype, _ in self.features_data: if doubly_logistic or asym_gaussian: resampling = self.resampling["day_periods"] doy, arr = curve_fitting.execute( eopatch=patch, feature=feat_name, feature_mask=self.mask[-1], resampling=resampling, ) params = curve_fitting.params if return_params: data.append(params) else: data.append(arr) else: arr = curve_fitting.get_time_series_profile( eopatch=patch, feature=feat_name, feature_mask=self.mask[-1], function=self.function, ) data.append(arr) return data ################################################################# out_types = [tf.as_dtype(data[3]) for data in self.features_data] data = tf.py_function(_func, [path], out_types) out_data = {} for f_data, feature in zip(self.features_data, data): feat_type, feat_name, out_name, dtype, _ = f_data feature.set_shape(feature.get_shape()) out_data[out_name] = feature return out_data def _read_vector_data(self, path, column_path, vector_data, features_list): """ Subset the vector file to match each EOPatch Parameters ---------- vector_data (pd.DataFrame) : dataframe with meta info regarding each EOPatch saved features_list (list) : list of features to read column_path (str) : column name with the corresponding path of each EOPatch Returns (np.array) : np.array from the inputs file ------- """ def _func(path): path = path.numpy().decode("utf-8") vector_data_ = vector_data.copy() vector_data_ = vector_data_[vector_data_[column_path] == path] data = [] for feat, dtype_ in features_list: data.append(np.array(vector_data_[feat].astype(dtype_))) return data data = tf.py_function(_func, [path], [feat[1] for feat in features_list]) out_data = {} for fname, feature in zip(features_list, data): feature.set_shape(feature.get_shape()) out_data[fname] = feature return out_data @staticmethod def _format_feature(out_feature): out_df = [ np.concatenate( [ np.expand_dims(value, axis=1) if len(value.shape) == 1 else value for key, value in dicto.items() ], axis=-1, ) for dicto in out_feature ] out_df = [np.expand_dims(k, axis=0) for k in out_df] return np.concatenate(out_df, axis=0)
[docs] def get_eopatch_tfds( self, algorithm="linear", doubly_logistic=False, asym_gaussian=False, return_params=False, meta_file=None, path_column=None, planting_date_column=None, harvest_date_column=None, window_planting=0, window_harvest=0, ): """ Parameters ---------- algorithm (str) : type of algorithm to interpolate missing pixels doubly_logistic (bool) : reconstruct time series using doubly logistic fitting asym_gaussian (bool) : reconstruct time series using asymmetric gaussian return_params (bool) : return parameters estimated from doubly or assymetric functions to get growth metrics meta_file (pd.DataFrame) : DataFrame with meta info regarding planting and dates. Useful for thresholding before preprocessing. path_column (str) : column from meta_file which give the EOPatch path for a given observation. planting_date_column (str) : column from meta_file where planting date column (DOY) is referred harvest_date_column (str) : column from meta_file where harvest date column (DOY) is referred window_planting (int) : window before planting date to fit asymmetric ou doubly before N days window_harvest (int) : window after planting date to fit asymmetric ou doubly after N days Returns Returns (np.arrray) : 3D np.array (N, t, d) ~ EOPatch aggregated and resampled into the same np.array ------- """ self._instance_tf_ds() ds_numpy = self.dataset.map( lambda x: self._read_patch( path=x, algorithm=algorithm, doubly_logistic=doubly_logistic, asym_gaussian=asym_gaussian, return_params=return_params, meta_file=meta_file, path_column=path_column, planting_date_column=planting_date_column, harvest_date_column=harvest_date_column, window_planting=window_planting, window_harvest=window_harvest, ), num_parallel_calls=self.AUTOTUNE, ) out_feature = list(ds_numpy) return self._format_feature(out_feature)
[docs] def get_vector_tfds(self, vector_data, features_list, column_path): """ Read the vector file into a numpy array. Each observation will match the eopatch_tfds and will be converted into a np.array Parameters ---------- vector_data (pd.DataFrame) : dataframe with meta info regarding each EOPatch saved features_list (list) : list of features to read column_path (str) : column name with the corresponding path of each EOPatch Returns (np.array) : np.array from the inputs file ------- """ self._instance_tf_ds() out_labels = list( self.vector_dataset.map( lambda path: self._read_vector_data( path, column_path, vector_data, features_list ), num_parallel_calls=self.AUTOTUNE, ) ) npy_labels = self._format_feature(out_labels) npy_labels = npy_labels.reshape(npy_labels.shape[0], npy_labels.shape[-1]) return npy_labels