Source code for eocrops.inputs.vhrs

from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session

from datetime import datetime
import json

import time


from sentinelhub import (
    DataCollection,
)

import eocrops.tasks.vegetation_indices as vegetation_indices

from eolearn.io import SentinelHubEvalscriptTask

import datetime as dt
import geopandas as gpd
import eolearn
import os

import eocrops.utils.base_functions as utils
import eocrops.tasks.preprocessing as preprocessing
import eocrops.tasks.cmd_otb as cmd_otb
import eocrops.inputs.utils_sh as utils_sh
from eolearn.core import (
    SaveTask,
    FeatureType,
    OverwritePermission,
    linearly_connect_tasks,
    EOWorkflow,
    OutputTask,
)


[docs]class DownloadVHRSSentinelHub: def __init__( self, shapefile, time_stamp, config, maxCloudCoverage, url="https://services.sentinel-hub.com/api/v1/dataimport/", ): """Download VHRS data from Sentinelhub""" self.shapefile = shapefile self.time_stamp = time_stamp self.config = config self.maxCloudCoverage = maxCloudCoverage self.url = url self.request = False self.order = False
[docs] def get_quotas(self): """Get quota from you SH account""" client = BackendApplicationClient(client_id=self.config.sh_client_id) oauth = OAuth2Session(client=client) response = oauth.get(url=f"{self.url}quotas") response.raise_for_status() return response.json()
def _create_token(self): """ Create a token for the HTTP API from Sentinelhub third party data import Parameters ---------- url : default url for 3rd party data collection Returns ------- oauth and the fetch token """ client = BackendApplicationClient(client_id=self.config.sh_client_id) oauth = OAuth2Session(client=client) # Get token for the session token = oauth.fetch_token( token_url="https://services.sentinel-hub.com/oauth/token", client_secret=self.config.sh_client_secret, ) # All requests using this session will have an access token automatically added resp = oauth.get("https://services.sentinel-hub.com/oauth/tokeninfo") print(resp.content) return oauth, token def _get_input_polygon(self): """ Format the input polygon for the HTTP API request into json format Parameters ---------- Returns JSON format of the polygon of the field with its corresponding UTM CRS ------- """ shapefile_boundary = utils.check_crs(self.shapefile.copy()) field_bbox = utils.get_bounding_box(shapefile_boundary) input_polygon_json = gpd.GeoSeries([field_bbox.geometry]).to_json() res = json.loads(input_polygon_json) input_polygon = res["features"][0]["geometry"]["coordinates"] return input_polygon, field_bbox, str(field_bbox.crs).split(":")[-1] def _init_param_query(self, provider=("AIRBUS", "PHR")): """ Initialize query parameters Parameters ---------- provider : tuple choice between ('AIRBUS', 'PHR'), ('AIRBUS', 'SPOT'),('PLANET', 'PSScene4Band') Returns ------- dictionary with your query parameters """ global dict_query input_polygon, field_bbox, CRS = self._get_input_polygon() if provider not in [ ("AIRBUS", "PHR"), ("AIRBUS", "SPOT"), ("PLANET", "PSScene4Band"), ]: raise ValueError('Available providers are only "PLANET" and "AIRBUS"') if provider[0] == "PLANET": dict_query = dict( provider=provider[0], maxCloudCoverage=self.maxCloudCoverage, data=[ { "itemType": provider[-1], "productBundle": "analytic_sr_udm2", "dataFilter": { "timeRange": {"from": "", "to": ""}, "self.maxCloudCoverage": self.maxCloudCoverage, "nativeFilter": { "type": "StringInFilter", "field_name": "quality_category", "config": ["standard"], }, }, } ], ) elif provider[0] == "AIRBUS": dict_query = dict( provider=provider[0], data=[ { "constellation": provider[1], "dataFilter": {"timeRange": {"from": "", "to": ""}}, } ], CRS=CRS, ) dict_query["input_polygon"] = input_polygon dict_query["field_bbox"] = field_bbox dict_query["CRS"] = CRS try: date_object = datetime.strptime(self.time_stamp[0], "%Y-%m-%d") except: raise ValueError("You must provide a date into yyyy-mm-dd string format") dict_query["data"][0]["dataFilter"]["timeRange"][ "from" ] = f"{self.time_stamp[0]}T00:00:00.000Z" dict_query["data"][0]["dataFilter"]["timeRange"][ "to" ] = f"{self.time_stamp[1]}T00:00:00.000Z" return dict_query def _search_query(self, dict_query, oauth): """ Search satellite images available with respect to your input query Parameters ---------- dict_query : dictionary dictionary with the parameters to search the query oauth : oauth client Returns ------- JSON with the results and the list of item ids available """ # Last error : need to specify as well product bundle in the search query query = { "provider": dict_query["provider"], "bounds": { "properties": { "crs": "http://www.opengis.net/def/crs/EPSG/0/" + dict_query["CRS"] }, "geometry": { "type": "Polygon", "coordinates": dict_query["input_polygon"], }, }, "data": dict_query["data"], } response = oauth.post(f"{self.url}search", json=query) response.raise_for_status() results = response.json() # get product ids if dict_query["provider"] == "PLANET": item_ids = [feature["id"] for feature in results["features"]] elif dict_query["provider"] == "AIRBUS": item_ids = [feature["properties"]["id"] for feature in results["features"]] else: raise ValueError("Provider must be set to Planet or Airbus") return results, item_ids def _order_query(self, dict_query, provider, oauth, item_ids, name_query=""): """ Order the query Parameters ---------- dict_query : dictionary dictionary from init_param_query() that allows to set all the parameters for your order provider :tuple : choice between ('AIRBUS', 'PHR'), ('AIRBUS', 'SPOT'),('PLANET', 'PSScene4Band') oauth : oauth client item_ids : list list of the item ids returned by SH name_query : str name that you can specify to retrieve easily on init_param_query Returns ------- order """ # Order product payload = dict( name=dict_query["provider"].lower() + " products " + name_query, input={ "provider": dict_query["provider"], "bounds": { "properties": { "crs": "http://www.opengis.net/def/crs/EPSG/0/" + dict_query["CRS"] }, "geometry": { "type": "Polygon", "coordinates": dict_query["input_polygon"], }, }, "data": _payload_data(item_ids, provider), }, ) if dict_query["provider"] == "PLANET": payload["input"]["planetApiKey"] = self.config.planet_key response = oauth.post(f"{self.url}orders", json=payload) response.raise_for_status() order = response.json() # Confirm the order print(order["id"]) print(order["sqkm"]) print(order["status"]) return order
[docs] def execute_query(self, provider, name_query=""): """ Execute workflow to get commercial data Parameters ---------- provider : tuple choice between ('AIRBUS', 'PHR'), ('AIRBUS', 'SPOT'),('PLANET', 'PSScene4Band') name_query : str name that you can specify to retrieve easily on init_param_query Returns ------- order and collection id to download the data, accessible as well in https://apps.sentinel-hub.com/dashboard/#/tpdi """ if self.request: raise ValueError( "You already have a search request running. Please confirm with confirm_order() method, or call the method reset_order() to launch a new order" ) self.oauth, _ = self._create_token() dict_query = self._init_param_query(provider) results, item_ids = self._search_query(dict_query, self.oauth) order = self._order_query( dict_query, provider, self.oauth, item_ids, name_query ) self.request = True return order["id"], results
[docs] def confirm_order(self, order_id): """ Confirm the order after the having ordered the query Parameters ---------- order_id : str id of the order Returns ------- collection_id for SH """ if self.order: raise ValueError( "You already have an order confirmed. Please wait a little for the order to be ingested in SH and/or reset the workflow to make a new order using reset_order() method" ) self.order = True # Confirm the order response = self.oauth.post(f"{self.url}orders/{order_id}/confirm") response.raise_for_status() jobStatus = response.json()["status"] if jobStatus == "RUNNING": time.sleep(5) # pause jobStatus = response.json()["status"] print(jobStatus) ########################################################### response = self.oauth.get(f"{self.url}orders/{order_id}") response.raise_for_status() order = response.json() if jobStatus == "RUNNING": time.sleep(2) # pause to let query ingested self.running = True return order_id, order["collectionId"]
[docs] def check_status(self, order_id): oauth, _ = self._create_token() return oauth.get(f"{self.url}orders/{order_id}").json()["status"]
[docs] def reset_workflow(self): """Reset workflow to make any new order""" self.request = False self.order = False
def _workflow_vhrs( self, byoc, provider, resolution, pansharpen=False, otb_path=None, saving_path=None, bands_name="BANDS", ): """ Workflow to process third party commercial data Parameters ---------- byoc : bring your own collection provider : tuple Provider resolution : int Spatial resolution of the output. You should check the native resolution of the constellation pansharpen : bool Option to know if we apply pansharpening method to resample bands over panchromatic. It requires to have OrfeoToolBox installed. otb_path : str Path of the OrfeoToolBox bin saving_path : str Path to save the output EOPatch Returns ------- EOPatch """ evalscript_byoc = _get_evalscript(provider) input_task = SentinelHubEvalscriptTask( features=[ (FeatureType.DATA, "BANDS"), (FeatureType.MASK, "IS_DATA"), (FeatureType.MASK, "CLM"), ], data_collection=byoc, resolution=resolution, config=self.config, time_difference=dt.timedelta(hours=12), evalscript=evalscript_byoc, ) cloud_mask = utils.CloudMaskFromCLM() add_polygon_mask = stf_tasks.PolygonMask(self.shapefile) pixels_masking = stf_tasks.MaskPixels([bands_name], fname="polygon_mask") if pansharpen: if otb_path is None: raise ValueError( "You must provide path to the bin directory of OrfeoToolBox" " https://www.orfeo-toolbox.org/CookBook/Installation.html" ) pansharpen_task = stf_tasks.PanSharpening(otb_path=otb_path) bands_name += "-PAN" else: pansharpen_task = utils.EmptyTask() vis = vegetation_indices.VegetationIndicesVHRS(bands_name) if saving_path is None: save = utils.EmptyTask() else: if not os.path.isdir(saving_path): os.makedirs(saving_path) save = SaveTask( saving_path, overwrite_permission=OverwritePermission.OVERWRITE_PATCH ) output_task = OutputTask("eopatch") workflow_nodes = linearly_connect_tasks( input_task, cloud_mask, add_polygon_mask, pixels_masking, pansharpen_task, vis, save, output_task, ) workflow = EOWorkflow(workflow_nodes) field_bbox = utils.get_bounding_box(self.shapefile) result = workflow.execute( {workflow_nodes[0]: {"bbox": field_bbox, "time_interval": self.time_stamp}} ) return result.outputs["eopatch"]
[docs] def get_data( self, order_id, collection_id, provider, resolution, pansharpen=False, otb_path=None, ): """ Download the data from Sentinelhub using eo-learn package Parameters ---------- order_id : str order id from the 3rd party import https://apps.sentinel-hub.com/dashboard/#/tpdi collection_id : str collection id that we get when we order the data.You can also copy/paste it from your SentinelHub account (see ingested data) pansharpen : bool Apply pansharpening using OTB otb_path : str path where you have your OTB bin installed Returns ------- EOPatch """ field_bbox = utils.get_bounding_box(self.shapefile) ########################################## # Define the byoc byoc = DataCollection.define_byoc( collection_id=collection_id, name=str(order_id), ) return self._workflow_vhrs( byoc, provider, resolution, pansharpen=pansharpen, otb_path=otb_path, )
def _payload_data(item_ids, provider=("PLANET", "PSScene4Band")): """ Define the payload for your HTTP API Parameters ---------- provider : tuple choice between ('AIRBUS', 'PHR'), ('AIRBUS', 'SPOT'),('PLANET', 'PSScene4Band') item_id : list list of the item ids Returns ------- JSON with the payload """ output = {} if provider[0] == "AIRBUS": output["data"] = [ dict( constellation=provider[1], products=[{"id": item_id} for item_id in item_ids], ) ] elif provider[0] == "PLANET": output["data"] = [ dict( itemType=provider[1], harmonizeTo="NONE", productBundle="analytic_sr_udm2", itemIds=item_ids, ) ] return output["data"] def _get_evalscript(provider): """ Get evaluation script to download the data Parameters ---------- provider : tuple choice between ('AIRBUS', 'PHR'), ('AIRBUS', 'SPOT'),('PLANET', 'PSScene4Band') w.r.t the execution order Returns ------- SH evaluation script """ global evalscript_byoc if provider[0] == "PLANET": evalscript_byoc = """ function setup() { return { input: ["B1", "B2", "B3", "B4", "UDM2_Cloud","dataMask"], output: [ {id: "BANDS", bands: 4, sampleType: SampleType.FLOAT32}, {id:"IS_DATA", bands:1, sampleType: SampleType.UINT8 }, {id:"CLM", bands:1, sampleType: SampleType.UINT8 } ], } } function evaluatePixel(sample) { return { BANDS: [2.5 * sample.B1 / 10000, 2.5 * sample.B2 / 10000, 2.5 * sample.B3 / 10000, 2.5 * sample.B4 / 10000], CLM : [sample.UDM2_Cloud], IS_DATA : [sample.dataMask] } } """ elif provider[0] == "AIRBUS": evalscript_byoc = """ function setup() { return { input: ["B0", "B1", "B2", "B3","PAN", "dataMask", "dataMask"], output: [ {id: "BANDS", bands: 5, sampleType: SampleType.FLOAT32}, {id:"IS_DATA", bands:1, sampleType: SampleType.UINT8}, {id:"CLM", bands:1, sampleType: SampleType.UINT8} ], } } function evaluatePixel(sample) { v = 4000 return { BANDS: [sample.B0/v, sample.B1/v, sample.B2/v, sample.B3/v,sample.PAN/v], IS_DATA : [sample.dataMask], CLM : [sample.dataMask] } } """ return evalscript_byoc