Source code for eocrops.tasks.preprocessing

import copy
import sentinelhub

import numpy as np
import pandas as pd

from eolearn.geometry import VectorToRasterTask
from eolearn.core import FeatureType, EOTask


from eolearn.features.interpolation import (
    LinearInterpolationTask,
    CubicInterpolationTask,
)

import eocrops.utils.base_functions as utils
from eolearn.geometry.morphology import ErosionTask

from eolearn.core import RemoveFeatureTask
from eolearn.core import FeatureType, EOTask


[docs]class PolygonMask(EOTask): """ EOTask that performs rasterization from an inputs shapefile into : - data_timeless feature 'FIELD_ID' (0 nodata; 1,...,N for each observation of the shapefile ~ object IDs) - mask_timeless feature 'MASK' (0 if pixels outside the polygon(s) from the shapefile, 1 otherwise Parameters ---------- geodataframe : TYPE GeoDataFrame Input geodataframe read as GeoDataFrame, each observation represents a polygon (e.g. fields) new_feature_name : TYPE string Name of the new features which contains clustering_task predictions Returns ------- EOPatch """ def __init__(self, geodataframe): self.geodataframe = geodataframe def execute(self, eopatch): # Check CRS and transform into UTM self.geodataframe = utils.check_crs(self.geodataframe) # Get an ID for each polygon from the inputs shapefile self.geodataframe["FIELD_ID"] = list(range(1, self.geodataframe.shape[0] + 1)) if self.geodataframe.shape[0] > 1: bbox = self.geodataframe.geometry.total_bounds MASK = sentinelhub.BBox( bbox=[(bbox[0], bbox[1]), (bbox[2], bbox[3])], crs=self.geodataframe.crs ) self.geodataframe["MASK"] = MASK.geometry else: self.geodataframe["MASK"] = self.geodataframe["geometry"] self.geodataframe["polygon_bool"] = True rasterization_task = VectorToRasterTask( self.geodataframe, (FeatureType.DATA_TIMELESS, "FIELD_ID"), values_column="FIELD_ID", raster_shape=(FeatureType.MASK, "IS_DATA"), raster_dtype=np.uint16, ) eopatch = rasterization_task.execute(eopatch) rasterization_task = VectorToRasterTask( self.geodataframe, (FeatureType.MASK_TIMELESS, "MASK"), values_column="polygon_bool", raster_shape=(FeatureType.MASK, "IS_DATA"), raster_dtype=np.uint16, ) eopatch = rasterization_task.execute(eopatch) eopatch.mask_timeless["MASK"] = eopatch.mask_timeless["MASK"].astype(bool) return eopatch
[docs]class MaskPixels(EOTask): def __init__(self, features, fname="MASK"): """ Parameters ---------- feature (list): of features in data and/or data_timeless fname (str): name of the mask """ self.features = features self.fname = fname @staticmethod def _filter_array(patch, ftype, fname, mask): ivs = patch[ftype][fname] arr0 = np.ma.array( ivs, dtype=np.float32, mask=(1 - mask).astype(bool), fill_value=np.nan ) arr0 = arr0.filled() patch[ftype][fname] = arr0 return patch def execute(self, patch, erosion=0): copy_patch = copy.deepcopy(patch) times = len(patch.timestamp) if erosion: erode = ErosionTask( mask_feature=(FeatureType.MASK_TIMELESS, self.fname), disk_radius=erosion, ) erode.execute(copy_patch) crop_mask = copy_patch["mask_timeless"][self.fname] # Filter the pixels of each features for index in self.features: if index in list(patch.data.keys()): ftype = "data" shape = patch[ftype][index].shape[-1] mask = crop_mask.reshape(1, crop_mask.shape[0], crop_mask.shape[1], 1) mask = [mask for k in range(times)] mask = np.concatenate(mask, axis=0) mask = [mask for k in range(shape)] mask = np.concatenate(mask, axis=-1) else: ftype = "data_timeless" mask = crop_mask patch = self._filter_array(patch, ftype, index, mask) return patch
[docs]class InterpolateFeatures(EOTask): def __init__( self, resampled_range, features=None, algorithm="linear", copy_features=None ): self.resampled_range = resampled_range self.features = features self.algorithm = algorithm self.copy_features = copy_features def _interpolate_feature(self, eopatch, features, mask_feature): kwargs = dict( mask_feature=mask_feature, resample_range=self.resampled_range, feature=features, bounds_error=False, ) if self.resampled_range is not None: kwargs["copy_features"] = self.copy_features if self.algorithm == "linear": interp = LinearInterpolationTask(parallel=True, **kwargs) elif self.algorithm == "cubic": interp = CubicInterpolationTask(**kwargs) eopatch = interp.execute(eopatch) return eopatch def execute(self, eopatch): """Gap filling after data extraction, very useful if did not include it in the data extraction workflow""" mask_feature = None if "VALID_DATA" in list(eopatch.mask.keys()): mask_feature = (FeatureType.MASK, "VALID_DATA") if self.features is None: self.features = [ (ftype, fname) for (ftype, fname) in eopatch.get_features() if ftype == FeatureType.DATA ] dico = {} for ftype, fname in self.features: new_eopatch = copy.deepcopy(eopatch) new_eopatch = self._interpolate_feature( new_eopatch, (ftype, fname), mask_feature ) dico[fname] = new_eopatch[ftype][fname] eopatch["data"] = dico t, h, w, _ = dico[fname].shape eopatch.timestamp = new_eopatch.timestamp eopatch["mask"]["IS_DATA"] = (np.zeros((t, h, w, 1)) + 1).astype(int) eopatch["mask"]["VALID_DATA"] = (np.zeros((t, h, w, 1)) + 1).astype(bool) if "CLM" in eopatch.mask.keys(): remove_feature = RemoveFeatureTask([(FeatureType.MASK, "CLM")]) remove_feature.execute(eopatch) # eopatch.remove_feature(FeatureType.MASK, "CLM") return eopatch