import warnings
import numpy as np
from eolearn.core import FeatureType, EOTask
from scipy.optimize import curve_fit
from scipy.interpolate import *
[docs]class CurveFitting:
def __init__(self, range_doy=(1, 365), q=0.5):
self.range_doy = range_doy
self.params = None
self.q = q
@staticmethod
def _reformat_arrays(x, y):
x = np.array(x)
if len(x.shape) < 2:
x = x.reshape(x.shape[0], 1)
y = np.array(y)
if len(y.shape) < 2:
y = y.reshape(y.shape[0], 1)
return x, y
def _apply_quantile(self, x, axis):
if self.q != 0.5:
return np.nanquantile(np.copy(x), q=self.q, axis=axis)
else:
return np.nanmedian(x, axis=axis)
[docs] def get_time_series_profile(
self,
eopatch,
feature,
feature_mask="MASK",
threshold=None,
resampling=None,
):
"""
Get aggregated time series at the object level, i.e. aggregate all pixels not masked
Parameters
----------
eopatch : EOPatch
EOPatch saved
feature : str
Name of the feature
feature_mask : str
Name of the boolean mask for pixels outside field boundaries
Returns
-------
Aggregated 1-d time series (np.array)
"""
if feature not in eopatch[FeatureType.DATA].keys():
raise ValueError("The feature name " + feature + " is not in the EOPatch")
feature_array = eopatch[FeatureType.DATA][feature]
if feature_mask not in eopatch[FeatureType.MASK_TIMELESS].keys():
raise ValueError(
"The feature {0} is missing in MASK_TIMELESS".format(feature_mask)
)
crop_mask = eopatch[FeatureType.MASK_TIMELESS][feature_mask]
# Transform mask from 3D to 4D
times, h, w, shape = feature_array.shape
mask = crop_mask.reshape(1, h, w, 1)
mask = [mask for _ in range(times)]
mask = np.concatenate(mask, axis=0)
#######################
mask = [mask for _ in range(shape)]
mask = np.concatenate(mask, axis=-1)
########################
a = np.ma.array(feature_array, mask=np.invert(mask))
ts_mean = np.ma.apply_over_axes(self._apply_quantile, a, [1, 2])
ts_mean = ts_mean.reshape(ts_mean.shape[0], ts_mean.shape[-1])
# Thresholding time series values before and after the season
doy, ids_filter = self.get_doy_period(eopatch)
if self.range_doy is not None and threshold:
ts_mean[: ids_filter[0]] = threshold
ts_mean[ids_filter[-1] :] = threshold
# Replace nas values with nearest valid value
nans, x = self._nan_helper(ts_mean)
ts_mean[nans] = np.interp(x(nans), x(~nans), ts_mean[~nans])
if resampling:
cs = Akima1DInterpolator(doy, ts_mean)
doy = np.arange(1, 365, resampling)
ts_mean = cs(doy)
return doy, ts_mean
[docs] def get_doy_period(self, eopatch):
"""
Get day of the year acquisition dates from eopatch.timestamp
Parameters
----------
eopatch : EOPatch
eopatch to read
Returns
-------
np.array : day of the years, np.array : index of image within the season (range_doy)
"""
first_of_year = eopatch.timestamp[0].timetuple().tm_yday
last_of_year = eopatch.timestamp[-1].timetuple().tm_yday
times = np.asarray([time.toordinal() for time in eopatch.timestamp])
times_ = (times - times[0]) / (times[-1] - times[0])
times_doy = times_ * (last_of_year - first_of_year) + first_of_year
if self.range_doy is None:
return times_doy
ids_filter = np.where(
(times_doy > int(self.range_doy[0])) & (times_doy < int(self.range_doy[1]))
)[0]
return times_doy, ids_filter
def _instance_inputs(
self,
eopatch,
feature,
feature_mask="MASK",
threshold=0.2,
):
"""
Initialize inputs to perform curve fitting
Parameters
----------
eopatch : EOPatch
EOPatch which will be processed
feature : str
name of the feature to process
feature_mask : str
mask name in EOPatch to subset field pixels
function : np.function)
function to aggregate pixels at object level
Returns
-------
np.array : day of the year within growing season , np.array : day of the year min/max w.r.t growing season, np.array : aggregated time series witihin growing season)
"""
times_doy, y = self.get_time_series_profile(eopatch, feature, feature_mask)
x = (times_doy - self.range_doy[0]) / (self.range_doy[-1] - self.range_doy[0])
ids = np.where(((x > 0) & (x < 1)))[0]
times_doy_idx = list(times_doy[ids])
x_idx = list(x[ids])
y_idx = list(y.flatten()[ids])
if times_doy_idx[0] > self.range_doy[0]:
times_doy_idx.insert(0, self.range_doy[0])
x_idx.insert(0, 0.0)
y_idx.insert(0, threshold)
if times_doy_idx[-1] < self.range_doy[-1]:
times_doy_idx.append(self.range_doy[-1])
x_idx.append(1.0)
y_idx.append(threshold)
x_idx = np.array(x_idx)
times_doy_idx, y_idx = self._reformat_arrays(times_doy_idx, y_idx)
return times_doy_idx, x_idx, y_idx
def _init_execute(self, eopatch, feature, feature_mask, resampling, threshold=0):
"""
Initialize the inputs prior applying any method of curve_fitting.
It returns the day of th year from acquisition dates, the time series profile and the resampled day of the year given a resampling period
Parameters
----------
eopatch : EOPatch
patch from a given fied
feature : str
name of the feature in FeatureType.DATA
feature_mask : str
name of the mask in FeatureType.MASK_TIMELESS
function : np.function
Function to aggregate pixels at the field level
resampling : int
length of the period for resampling (e.g. 8 day period)
threshold : float
threshold for values outside the season (below or above range_doy) which can affect the interpolation)
Returns
-------
np.array : day of the year within growing season, np.array : aggregated time series np.array : resampled day of the year, np.array : resampled time series
"""
# Get time series profile
times_doy, x, y = self._instance_inputs(
eopatch, feature, feature_mask, threshold
)
y[times_doy < self.range_doy[0]] = threshold
y[times_doy > self.range_doy[1]] = threshold
y[y < threshold] = threshold
if resampling > 0:
times_doy = np.array(range(1, 365, resampling))
new_x = (times_doy - self.range_doy[0]) / (
self.range_doy[-1] - self.range_doy[0]
)
new_x[new_x < 0] = 0
new_x[new_x > 1] = 1
else:
new_x = x
return x, y.flatten(), times_doy, new_x
[docs] def replace_values(
self, eopatch, ts_mean, doy_o, ts_fitted, doy_r, resampling, cloud_coverage=0.05
):
"""
Given an interpolated time series using a curve fitting method,
replace interpolated values with the nearest observed one iif the image had less than a given cloud coverage
Parameters
----------
eopatch : EOPatch
ts_mean :np.array) : the original observed time series at the field-level
doy_o : np.array)
days of the year from the observed scene
ts_fitted : np.array
fitted time series using a cure fitting method
doy_r : np.array
resampled day of the year
resampling : int
period length to resample time series (e.g. 8 for 8-day period starting on the 1st January)
cloud_coverage : float
maximum percentage of cloud given an image (e.g. we keep only values if <5% cloudy)
Returns
-------
Fitted time series mixing interpolated and observed values
"""
# Replace interpolated values by original ones cloud free
corrected_doy_d = doy_r.copy().flatten()
corrected_doubly_res = ts_fitted.copy().flatten()
for idx, i in enumerate(doy_o):
if (
self.range_doy[0] < i < self.range_doy[-1]
and eopatch.scalar["COVERAGE"][idx][0] < cloud_coverage
):
previous_idx = np.where(doy_r <= i)[0][-1]
next_idx = (
np.where(doy_r > i)[0][0] if i < np.max(doy_r) else previous_idx
)
argmin_ = np.argmin(
np.array(
[
i - corrected_doy_d[previous_idx],
corrected_doy_d[next_idx] - i,
]
)
)
last_idx = previous_idx if argmin_ == 0 else next_idx
corrected_doy_d[last_idx] = i
corrected_doubly_res[last_idx] = ts_mean[idx]
if resampling:
cs = Akima1DInterpolator(corrected_doy_d, corrected_doubly_res)
corrected_doy_d = np.arange(1, 365, resampling)
corrected_doubly_res = cs(corrected_doy_d)
nans, x = self._nan_helper(corrected_doubly_res)
corrected_doubly_res[nans] = np.interp(
x(nans), x(~nans), corrected_doubly_res[~nans]
)
return corrected_doy_d, corrected_doubly_res
def _replace_values_iterate(self, patch, y_array, dates, new_y, new_doy):
if len(y_array.shape) == 1:
y_array = y_array.reshape(y_array.shape[0], 1)
replace_y = np.empty((len(new_y), y_array.shape[1]))
for i in range(y_array.shape[1]):
_, replace_y[:, i] = self.replace_values(
patch,
y_array[:, i].flatten(),
dates,
new_y[:, i].flatten(),
new_doy,
resampling=8,
cloud_coverage=0.05,
)
return replace_y
def _crop_values(self, ts_mean, min_threshold, max_threshold):
if max_threshold:
ts_mean[ts_mean > float(max_threshold)] = max_threshold
if min_threshold:
ts_mean[ts_mean < float(min_threshold)] = min_threshold
return ts_mean
@staticmethod
def _nan_helper(y):
return np.isnan(y), lambda z: z.nonzero()[0]
[docs] def resample_ts(
self, doy, ts_mean_, resampling=8, min_threshold=None, max_threshold=None
):
"""
Apply resampling over fixed period before applying whitakker smoothing
"""
nans, x = self._nan_helper(ts_mean_)
ts_mean_[nans] = np.interp(x(nans), x(~nans), ts_mean_[~nans])
ts_mean = self._crop_values(ts_mean_, min_threshold, max_threshold)
# Interpolate the data to a regular time grid
if resampling:
try:
new_doy = np.arange(
self.range_doy[0], self.range_doy[-1] + 1, resampling
)
ts_mean_resampled = np.empty(
(ts_mean.shape[0], len(new_doy), ts_mean.shape[2])
)
for i in range(ts_mean.shape[0]):
for j in range(ts_mean.shape[2]):
cs = Akima1DInterpolator(doy, ts_mean[i, :, j])
ts_mean_resampled[i, :, j] = cs(new_doy)
ts_mean = ts_mean_resampled
except Exception as e:
warnings.WarningMessage(
f"Cannot interpolate over new periods due to : {e}"
)
else:
new_doy = doy
return new_doy, ts_mean
[docs] def fit_whitakker(self, ts_mean, degree_smoothing, weighted=False, min_threshold=0):
"""
Fit whitakker smoothing given a ndarray (1,T,D)
"""
import modape
from modape.whittaker import ws2d, ws2doptv, ws2doptvp
# Apply Whittaker smoothing
w = np.array((ts_mean > min_threshold) * 1, dtype="float")
if weighted:
w = w * ts_mean / np.max(ts_mean)
if len(ts_mean.shape) > 1:
if ts_mean.shape[1] > 1:
ts_mean = np.array(
[
ws2d(
ts_mean[..., i].flatten().astype("float"),
degree_smoothing,
w[..., i].flatten(),
)
for i in range(ts_mean.shape[1])
]
)
ts_mean = np.swapaxes(ts_mean, 0, 1)
else:
ts_mean = np.array(
ws2d(
ts_mean.flatten().astype("float"), degree_smoothing, w.flatten()
)
)
return ts_mean
[docs] def whittaker_zonal(
self,
eopatch,
feature,
feature_mask="MASK",
degree_smoothing=1,
min_threshold=0,
max_threshold=None,
weighted=False,
resampling=8,
):
"""
Apply whitakker smoothing over time series
Parameters
----------
eopatch : EOPatch
eopatch to apply the smoothing method
feature : str
Feature to apply the smoothing (e.g. NDVI)
feature_mask : str
Feature that refers to the pixel masking outside the field polygon
degree_smoothing : float
Degree of smoothing (0.5 works great)
min_threshold : float
Minimum threshold of the time series. If it is below, we mask as np.nan to avoid outlier values
max_threshold : float
Minimum threshold of the time sereis. If it is above, we mask as np.nan to avoid outlier values
weighted : bool
Weight the smoother w.r.t. the maximimum value to better fit the enveloppe over high value data
resampling : int
Resample over fixed periods from the 1st of January
Returns
-------
np.array : day of the year, np.array : aggregated time series
"""
# Check if we do not have duplicates
doy, ts_data = self.get_time_series_profile(
eopatch,
feature,
feature_mask,
)
indices = np.setdiff1d(
np.arange(len(doy)), np.unique(doy, return_index=True)[1]
)
doy = np.array([int(k) for i, k in enumerate(doy) if i not in indices])
ts_data = np.array([k for i, k in enumerate(ts_data) if i not in indices])
ts_data = self._crop_values(ts_data, min_threshold, max_threshold)
# Resampling
doy_resampled, ts_data_resampled = self.resample_ts(
doy=doy,
ts_mean_=ts_data,
resampling=resampling,
min_threshold=min_threshold,
max_threshold=max_threshold,
)
ts_data_resampled[np.where(np.isnan(ts_data_resampled))[0]] = min_threshold
# Replace interpolated values by original ones
doy_resampled, ts_data_resampled = self.replace_values(
eopatch, ts_data, doy, ts_data_resampled, doy_resampled, resampling
)
ts_data_resampled = ts_data_resampled.reshape(ts_data_resampled.shape[0], 1)
ts_data_resampled = self._fit_whittaker(
ts_data_resampled, degree_smoothing, weighted
)
return self._reformat_arrays(doy_resampled, ts_data_resampled)
[docs]class AsymmetricGaussian(CurveFitting):
def __init__(self, **kwargs):
super().__init__(**kwargs)
@staticmethod
def _asym_gaussian(x, initial_value, scale, a1, a2, a3, a4, a5):
return initial_value + scale * np.piecewise(
x,
[x < a1, x >= a1],
[
lambda y: np.exp(-(((a1 - y) / a4) ** a5)),
lambda y: np.exp(-(((y - a1) / a2) ** a3)),
],
)
def _fit_optimize_asym(self, x_axis, y_axis, initial_parameters=None):
bounds_lower = [
np.quantile(y_axis, 0.1),
-np.inf,
x_axis[0],
-np.inf,
-np.inf,
-np.inf,
-np.inf,
]
bounds_upper = [
np.max(y_axis),
np.inf,
x_axis[-1],
np.inf,
np.inf,
np.inf,
np.inf,
]
if initial_parameters is None:
initial_parameters = [
np.mean(y_axis),
0.2,
x_axis[np.argmax(y_axis)],
0.15,
10,
0.2,
5,
]
popt, pcov = curve_fit(
self._asym_gaussian,
x_axis,
y_axis,
initial_parameters,
bounds=(bounds_lower, bounds_upper),
maxfev=10e10,
absolute_sigma=True,
# method = 'lm'
)
self.params = popt
return popt
def _fit_asym_gaussian(self, x, y):
a = self._fit_optimize_asym(x, y)
return self._asym_gaussian(x, *a)
[docs] def get_fitted_values(
self,
eopatch,
feature,
feature_mask="MASK",
resampling=0,
threshold=0,
):
"""
Apply Asymmetric function to do curve fitting from aggregated time series.
It aims to reconstruct, smooth and extract phenological parameters from time series
Parameters
----------
eopatch : EOPatch
feature : str
name of the feature to process
feature_mask : str
name of the polygon mask
function : np.function
function to aggregate pixels at object level
resampling : np.array
dates from reconstructed time series, np.array : aggregated time series)
Returns
-------
np.array : doy, np.array : fitted values
"""
x, y, times_doy, new_x = self._init_execute(
eopatch, feature, feature_mask, resampling, threshold
)
initial_value, scale, a1, a2, a3, a4, a5 = self._fit_optimize_asym(x, y)
fitted = self._asym_gaussian(new_x, initial_value, scale, a1, a2, a3, a4, a5)
fitted[fitted < threshold] = threshold
return times_doy, fitted
[docs] def execute(
self,
eopatch,
feature,
feature_mask="MASK",
resampling=0,
):
"""
Execute A-G curve fitting function
Parameters
----------
eopatch : EOPatch
feature : str
feature_mask : str
resampling : int
Returns
-------
np.array : doy, np.array : fitted values
"""
# Get time series profile
doy_o_s2, ts_mean_s2 = self.get_time_series_profile(
eopatch, feature, feature_mask=feature_mask
)
# Apply curve fitting
doy_d, doubly_res = self.get_fitted_values(
eopatch,
feature=feature,
resampling=resampling,
)
return doy_d, doubly_res
[docs]class DoublyLogistic(CurveFitting):
def __init__(self, **kwargs):
super().__init__(**kwargs)
@staticmethod
def _doubly_logistic(x, vb, ve, k, c, p, d, q):
return (
vb
+ (k / (1 + np.exp(-c * (x - p))))
- ((k + vb + ve) / (1 + np.exp(d * (x - q))))
)
def _fit_optimize_doubly(self, x_axis, y_axis, initial_parameters=None):
popt, _ = curve_fit(
self._doubly_logistic,
x_axis,
y_axis,
# method="lm",
p0=initial_parameters, # np.array([0.5, 6, 0.2, 150, 0.23, 240]),
maxfev=int(10e6),
bounds=[-np.Inf, np.Inf],
)
self.params = popt
return popt
def _fit_logistic(self, x, y):
a = self._fit_optimize_doubly(x, y)
return self._doubly_logistic(x, *a)
[docs] def get_fitted_values(
self,
eopatch,
feature,
feature_mask="MASK",
resampling=0,
threshold=0.2,
):
"""
Apply Doubly Logistic function to do curve fitting from aggregated time series.
It aims to reconstruct, smooth and extract phenological parameters from time series
Parameters
----------
eopatch : EOPatch
feature : str
name of the feature to process
feature_mask : str
name of the polygon mask
function : np.function
function to aggregate pixels at object level
resampling : np.array
dates from reconstructed time series, np.array : aggregated time series)
Returns
-------
np.array : doy, np.array : fitted values
"""
x, y, times_doy, new_x = self._init_execute(
eopatch, feature, feature_mask, resampling, threshold
)
vb, ve, k, c, p, d, q = self._fit_optimize_doubly(x, y)
fitted = self._doubly_logistic(new_x, vb, ve, k, c, p, d, q)
fitted[fitted < threshold] = threshold
return times_doy, fitted
[docs] def execute(
self,
eopatch,
feature,
feature_mask="MASK",
resampling=0,
threshold=0.2,
):
"""
Execute the smoothing at the field level
Parameters
----------
eopatch : EOPatch
feature : str
feature_mask : str
resampling : int
threshold : float
Returns
-------
np.array : doy, np.array : fitted values
"""
# Get time series profile
doy_o_s2, ts_mean_s2 = self.get_time_series_profile(
eopatch,
feature,
feature_mask,
)
doy_d, doubly_res = self.get_fitted_values(
eopatch,
feature=feature,
resampling=resampling,
threshold=threshold,
)
return doy_d, doubly_res
[docs]class FourierDiscrete(CurveFitting):
def __init__(self, omega=1.5, **kwargs):
super().__init__(**kwargs)
self.omega = omega
@staticmethod
def _clean_data(x_axis, y_axis):
good_vals = np.argwhere(~np.isnan(y_axis.flatten()))
y_axis = np.take(y_axis, good_vals, 0)
x_axis = np.take(x_axis, good_vals, 0)
if x_axis[-1] > 1:
x_axis = (x_axis - x_axis[0]) / (x_axis[-1] - x_axis[0])
return x_axis, y_axis
def _hants(self, x, c, omega, a1, a2, b1, b2):
"""
Formula for a 2nd order Fourier series
Parameters
----------
x : Timepoint t on the x-axis
c : Intercept coefficient
omega : Omega coefficient (Wavelength)
a1 : 1st order cosine coefficient
a2 : 2nd order cosine coefficient
b1 : 1st oder sine coefficient
b2 : 2nd order sine coefficient
Returns
-------
Parameters of the 2nd order Fourier series formula to be used in the "fouriersmoother" function
"""
return (
c
+ (a1 * np.cos(2 * np.pi * omega * 1 * x))
+ (b1 * np.sin(2 * np.pi * omega * 1 * x))
+ (a2 * np.cos(2 * np.pi * omega * 2 * x))
+ (b2 * np.sin(2 * np.pi * omega * 2 * x))
)
def _fourier(self, x, *a):
ret = a[0] * np.cos(np.pi / self.omega * x)
for deg in range(1, len(a)):
ret += a[deg] * np.cos((deg + 1) * np.pi / self.omega * x)
return ret
def _fit_fourier(self, x_axis, y_axis, new_x):
x_axis, y_axis = self._clean_data(x_axis, y_axis)
popt, _ = curve_fit(self._fourier, x_axis[:, 0], y_axis[:, 0], [1.0] * 5)
self.params = popt
return self._fourier(new_x, *popt)
def _fit_hants(self, x_axis, y_axis, new_x):
x_axis, y_axis = self._clean_data(x_axis, y_axis)
popt, _ = curve_fit(
self._hants,
x_axis[:, 0],
y_axis[:, 0],
maxfev=1e5,
method="trf",
p0=[np.mean(y_axis), 1, 1, 1, 1, 1],
)
self.params = popt
return self._hants(new_x, *popt)
[docs] def get_fitted_values(
self,
eopatch,
feature,
feature_mask="MASK",
resampling=0,
fft=True,
threshold=0.2,
):
"""
Apply Fourier function to do curve fitting from aggregated time series.
It aims to reconstruct, smooth and extract phenological parameters from time series
Parameters
----------
eopatch : EOPatch
feature : str
name of the feature to process
feature_mask : str
name of the polygon mask
function : np.function
function to aggregate pixels at object level
resampling : np.array
dates from reconstructed time series, np.array : aggregated time series)
Returns
-------
np.array : doy, np.array : fitted values
"""
x, y, times_doy, new_x = self._init_execute(
eopatch, feature, feature_mask, resampling, threshold
)
fitted = self._fit_hants(x, y, new_x) if fft else self._fit_fourier(x, y, new_x)
fitted[fitted < threshold] = threshold
return times_doy, fitted
[docs] def execute(
self,
eopatch,
feature,
feature_mask="MASK",
resampling=0,
fft=True,
threshold=0,
):
"""
Execute FFT
Parameters
----------
eopatch : EOPatch
feature : str
feature_mask : str
resampling : int
fft : bool
threshold : float
Returns
-------
np.array : doy, np.array : fitted values
"""
# Get time series profile
doy_o_s2, ts_mean_s2 = self.get_time_series_profile(
eopatch, feature, feature_mask=feature_mask
)
# Apply curve fitting
doy_d, doubly_res = self.get_fitted_values(
eopatch,
feature=feature,
resampling=resampling,
fft=fft,
threshold=threshold,
)
return doy_d, doubly_res