Source code for eocrops.climatools.format_data

import dateutil
import datetime as datetime

import numpy as np
import pandas as pd


[docs]class WeatherPostprocess: def __init__( self, shapefile, id_column, timestamp_column, start_season=None, ): """ Resample time series (e.g. satellite image time series and daily weather data) over accumulated GDU periods (thermal time). Parameters ---------- shapefile : gpd.GeoDataFrame DataFrame generated by meteoblue API in eocrops.inputs.meteoblue id_column : str Column from the weather data file which refers to the identifier of the observation. resample_range : tuple Range of dates and daily temporal resolution of the dataset extracted. By default, extractions are made daily from the 1st January to the 31st of December id_column : str Column from the weather data file which refers to the identifier of the observation. start_season : str, optional Name of the column with the corresponding day of the year in which we discard data before. It is mostly useful for fields in which we have an idea of the beginning of the season """ self.id_column = id_column self.start_season_column = start_season self.input_file = shapefile.copy() self.timestamp_column = timestamp_column if self.start_season_column is not None: if self.start_season_column not in list(shapefile.columns): raise ValueError( "The column " + self.start_season_column + " is not in the input file" ) self.input_file = self.input_file.rename( columns={self.start_season_column: "start_season"} ) self._apply_convert_doy("start_season") def _get_descriptive_period(self, df, stat="mean"): """ Compute descriptive statistics given period """ dict_stats = dict(mean=np.nanmean, max=np.nanmax, min=np.nanmin, sum=np.nansum) df["value"] = df["value"].astype("float32") df_agg = ( df[["variable", "period", "location", "value", "timestamp"]] .groupby(["variable", "period", "location", "timestamp"]) .agg(dict_stats[stat]) ) df_agg.reset_index(inplace=True) df_agg = df_agg.rename( columns={"value": stat + "_value", "location": self.id_column} ) df_agg = df_agg.sort_values(by=["timestamp", "period"]) return df_agg def _get_cumulated_period(self, df): """ Compute the cumulative sum given period. """ df_cum = pd.DataFrame() for var in df.variable.unique(): df_subset = df[df.variable == var] df_agg = ( df_subset[["location", "period", "variable", "value", "timestamp"]] .groupby(["location", "variable", "period", "timestamp"]) .sum() ) df_agg = df_agg.sort_values(by=["timestamp", "period"]) df_agg = df_agg.groupby(level=0).cumsum().reset_index() df_agg = df_agg.rename( columns={"value": "cumsum_value", "location": self.id_column} ) df_cum = df_cum.append(df_agg) return df_cum def _get_resampled_periods(self, timestamp): """ Get the resampled periods from the resample range """ start, end = timestamp[0], timestamp[1] start_date = dateutil.parser.parse(start) end_date = dateutil.parser.parse(end) step = datetime.timedelta(days=1) days = [start_date] while days[-1] + step < end_date: days.append(days[-1] + step) return days def _format_periods(self, periods): df_resampled = pd.melt(periods, id_vars="period").rename( columns={"value": "timestamp", "variable": "key"} ) # Left join periods to the original dataframe df_resampled["timestamp"] = [ np.datetime64(k) for k in df_resampled["timestamp"].values ] return df_resampled def _get_periods(self, df_Meteoblue_): """ Assign the periods to the file obtained through Meteoblue """ def _get_year(x): return x[:4] def _convert_date(x): return dateutil.parser.parse(x[:-5]) df_Meteoblue = df_Meteoblue_.copy() # Assign period ids w.r.t the date from the dataframe df_Meteoblue["timestamp"] = [str(k) for k in df_Meteoblue["timestamp"]] # Assign dates to a single year to retrieve periods df_Meteoblue["Year"] = ( df_Meteoblue["timestamp"].apply(lambda x: _get_year(x)).astype(str) ) df_Meteoblue["timestamp"] = df_Meteoblue["timestamp"].apply( lambda x: _convert_date(x) ) dict_year = {} for timestamp in self.input_file["timestamp"].drop_duplicates().values: dict_year[str(timestamp)] = self._get_resampled_periods(timestamp) periods = pd.DataFrame.from_dict(dict_year) periods = periods.reset_index().rename(columns={"index": "period"}) df_resampled = self._format_periods(periods) copy_input = self.input_file.copy() copy_input["key"] = copy_input[self.timestamp_column].astype(str) df_Meteoblue = pd.merge( df_Meteoblue, copy_input[[self.id_column, "key"]], left_on="location", right_on=self.id_column, how="left", ) df = pd.merge( df_resampled, df_Meteoblue, on=["timestamp", "key"], how="right" ).drop(["key"], axis=1) fill_nas = ( df[["period", "location"]] .groupby("location") .apply( lambda group: group.interpolate( method="pad", ) ) ) df["period"] = fill_nas["period"] return df, df[["period", "timestamp"]].drop_duplicates() def _apply_convert_doy(self, doy_column): """ Convert dates from Meteoblue format into day of the year """ def _convert_doy_to_date(doy, year): date = datetime.datetime(int(year), 1, 1) + datetime.timedelta(doy - 1) return np.datetime64(date) self.input_file[doy_column] = [ _convert_doy_to_date(doy, year[0].split("-")[0]) for doy, year in zip( self.input_file[doy_column], self.input_file[self.timestamp_column] ) ] def _add_growing_stage(self, periods_df, feature="start_season"): """ Retrieve the date from weather data associated with a given growing stage (doy format) from the input file The objective is to not take into account observations before sowing date of after harvest date in the statistics """ return ( pd.merge( periods_df, self.input_file[[feature, self.id_column]].copy(), left_on="timestamp", right_on=feature, how="right", ) .rename(columns={"period": "period_" + feature}) .drop(["timestamp"], axis=1) ) def _init_df(self, df): """ Initialize weather dataframe into periods to do the period calculations """ df = df[~df.variable.isin(["variable"])] df = df.drop_duplicates(subset=["location", "timestamp", "variable"]) self.input_file[self.id_column] = self.input_file[self.id_column].astype(str) df["location"] = df["location"].astype(str) df = df[df.location.isin(self.input_file[self.id_column].unique())] # Reformat into time series only if it is a dynamic variable df, periods_df = self._get_periods(df_Meteoblue_=df.copy()) unique_years = df["Year"].unique() if len(unique_years) > 1: dates = periods_df["timestamp"].values periods_df["period"] = ( (dates - dates[0]).astype("timedelta64[D]").astype(int) ) df["value"] = df["value"].astype("float32") if self.start_season_column is not None: periods_sowing = self._add_growing_stage( periods_df=periods_df, feature="start_season" ) df = pd.merge( df[ [ "period", "timestamp", "location", "variable", "value", "Year", ] ], periods_sowing, left_on="location", right_on=self.id_column, how="left", ) # Observations before planting date are assigned to np.nan df.loc[df.timestamp < df.start_season, ["value"]] = np.nan return df def _prepare_output_file(self, df_stats, stat="mean"): """ Prepare output dataframe with associated statistics over the periods. The output will have the name of the feature and its corresponding period (tuple) """ df_pivot = pd.pivot_table( df_stats, values=[stat + "_value"], index=[self.id_column], columns=["variable", "period"], dropna=False, ) df_pivot.reset_index(inplace=True) df_pivot.columns = [ "-".join([str(x) for x in col]).strip() for col in df_pivot.columns.values ] df_pivot = df_pivot.rename( columns={ self.id_column + "--": self.id_column, "Year" + "--": "Year", } ) df_pivot = df_pivot.sort_values(by=[self.id_column]).reset_index(drop=True) return df_pivot def _get_temperature_difference(self, min_weather, max_weather): """ Compute difference between minimum and maximum temperature observed for each period """ diff_weather = min_weather.copy() tempMax = max_weather.loc[ max_weather.variable.isin(["Temperature"]), ["period", "timestamp", self.id_column, "value"], ].rename(columns={"value": "value_max"}) diff_weather = pd.merge( diff_weather, tempMax, on=["period", "timestamp", self.id_column], how="left", ) diff_weather["value"] = diff_weather["value_max"] - diff_weather["value"] diff_weather["variable"] = "Temperature difference" return diff_weather
[docs] def format_static_variable(self, df_weather, return_pivot=False): """ Format static features from a given output of Meteoblue Parameters ---------- df_weather (pd.DataFrame) : Meteoblue dataframe with stat as daily descriptive statistics return_pivot (bool): The dataframe is formated into a pivot table. Weather data are in columns and each row = 1 unique location Returns ------- pd.DataFrame with weather data ready for machine learning pipeline """ df_weather = df_weather[~df_weather.variable.isin(["variable"])] df_weather = df_weather.drop_duplicates( subset=["location", "timestamp", "variable"] ) df_weather = df_weather[ df_weather.location.isin(self.input_file[self.id_column].unique()) ] df_agg = ( df_weather[["variable", "location", "value"]] .groupby(["variable", "location"]) .agg("mean") ) df_agg.reset_index(inplace=True) df_agg = df_agg.rename( columns={"value": "static_value", "location": self.id_column} ) if return_pivot: df_agg = pd.pivot_table( df_agg, values=["static_value"], index=[self.id_column], columns=["variable"], dropna=False, ) df_agg.reset_index(inplace=True) df_agg.columns = [ "-".join([str(x) for x in col]).strip() for col in df_agg.columns.values ] return df_agg
[docs] def execute(self, df_weather, stat=None, return_pivot=False): """ Execute the workflow to get the dataframe aggregated into periods from Meteoblue data Parameters ---------- df_weather : pd.DataFrame Meteoblue dataframe with stat as daily descriptive statistics stat : str Statistics to compute if we have periods instead of daily data (e.g. 'mean' if you downloaded mean temperature data) return_pivot : bool The dataframe is formated into a pivot table. Weather data are in columns and each row = 1 unique location Returns ------- pd.DataFrame with mean, min, max, sum aggregated into periods defined w.r.t the resample_range """ init_weather = self._init_df(df=df_weather.copy()) if stat == "cumsum": df_stats = self._get_cumulated_period(df=init_weather) else: df_stats = self._get_descriptive_period(df=init_weather, stat=stat) df_stats = df_stats.sort_values(by=[self.id_column, "timestamp", "variable"]) if not return_pivot: return df_stats output = self._prepare_output_file(df_stats=df_stats, stat=stat) output.columns = ["".join(k.split("value-")) for k in output.columns] output.columns = [ tuple(k.split("-")) if k != self.id_column else k for k in output.columns ] output.columns = [ (k[0], float(k[1])) if (type(k) is tuple and len(k) > 1) else k for k in output.columns ] return output