from dateutil.parser import parse as parse_datetime import urllib.parse from pathlib import Path import fiona import folium import geopandas as gpd from matplotlib import pyplot as plt import numpy as np import rasterio as r from rasterio.features import geometry_mask, geometry_window from rasterio.warp import calculate_default_transform, reproject, Resampling from rasterio.windows import Window from shapely.geometry import shape from shapely.geometry.polygon import Polygon from shapely.ops import unary_union from tempfile import TemporaryDirectory from zipfile import ZipFile import warnings def search_osm(place): ''' Returns a GeoDataFrame with results from OpenStreetMap Nominatim for the given search string. ''' urlescaped_place = urllib.parse.quote(place) search_url = ('https://nominatim.openstreetmap.org/search/?q={}' + '&format=geojson&polygon_geojson=1').format(urlescaped_place) return gpd.read_file(search_url) def plot_product_extent(products, area_of_interest, **kwargs): ax = kwargs.get('ax') alpha = kwargs.pop('alpha', 0.1) if not ax: fig, ax = plt.subplots(**kwargs) grey = '#777777' purple = '#988ED5' # allow plotting raw shapely geometries if 'plot' not in dir(products): products = gpd.GeoSeries(products) if 'plot' not in dir(area_of_interest): area_of_interest = gpd.GeoSeries(area_of_interest) # area of interest in background a = area_of_interest.plot(ax=ax, facecolor=grey) # product fill layer b = products.plot(ax=ax, facecolor=purple, alpha=alpha) # product stroke layer products.plot(ax=ax, facecolor='none', edgecolor=purple, alpha=0.4) if not kwargs.get('ax'): ax.set(title='Area of Interest and Available Products') return ax def scihub_band_paths(p, bands, resolution=None): ''' Given a zip file or folder at `p`, returns the paths inside p to the raster files containing information for the given bands. Because some bands are available in more than one resolution, this can be filtered by prodiding a third parameter (e.g. resolution='10m'). - `p` can be a string or a pathlib.Path. - `bands` can be a list of bands or a single band. The returned paths are formatted in the zip scheme as per Apache Commons VFS if necessary and can be directly opened by rasterio. ''' if type(bands) != list: # allow passing in a single band more easily bands = [bands] p = Path(p) # make sure we're dealing with a pathlib.Path if p.suffix == '.zip': # when dealing with zip files we have to read the filenames from the # archive first with ZipFile(p) as f: files = f.namelist() rasters = [Path(f'zip+file://{p.absolute()}!/{f}') for f in files if f.endswith('.jp2')] else: rasters = p.glob('**/*.jp2') # take only the paths that contain one of the given bands rasters = [raster for band in bands for raster in rasters if band in raster.name] # if a resolution is given, further discard the bands we don't need if resolution: rasters = [raster for raster in rasters if resolution in raster.name] return rasters def scihub_bgr_paths(product_path, resolution=None): ''' A convenence function to return the paths to the blue, green and red bands in the downloaded product at `product_path`. ''' return scihub_band_paths(product_path, ['B02', 'B03', 'B04'], resolution) def scihub_cloud_mask(product_path, area=None, cloud_probability=0.75, resolution='10m'): ''' Returns a numpy array with boolean values representing a product's cloud mask. Cloudy pixels are True, non-cloudy pixels are False. ''' # TODO: Subset for area # there is no mask in 10m resolution an we need to manually upsample it; # upsampling code is taken from the rasterio documentation: # https://rasterio.readthedocs.io/en/latest/topics/resampling.html if resolution in ['20m', '60m']: mask_resolution = resolution upscale_factor = 1 else: mask_resolution = '20m' upscale_factor = 2 mask_path = scihub_band_paths(product_path, ['MSK_CLDPRB'], mask_resolution)[0] with r.open(mask_path) as mask: if isinstance(area, gpd.GeoDataFrame): window = geometry_window(mask, area.to_crs(mask.crs)['geometry']) else: window = Window(0, 0, mask.width, mask.height) mask_data = mask.read( out_shape=( mask.count, int(window.height * upscale_factor), int(window.width * upscale_factor) ), window=window, resampling=Resampling.bilinear ) mask_transform = mask.transform * mask.transform.scale( (mask.width / mask_data.shape[-1]), (mask.height / mask_data.shape[-2]) ) # mask_data values range from 0 to 100, cloud_probability from 0 to 1 return mask_data >= (cloud_probability * 100), mask_transform def scihub_band_date(band): ''' Given a string, `pathlib.Path` or `rasterio.DataSetReader`, returns the datetime encoded in the filename. ''' if type(band) is r.DatasetReader: file_name = band.name else: file_name = Path(band).name return parse_datetime(file_name.split('_')[-3]) # See https://book.pythontips.com/en/latest/context_managers.html#implementing-a-context-manager-as-a-class class RasterReaderList(): ''' This class allows opening a list of file paths in a `with` block using rasterio.open. They get automatically closed when the context created by the `with` block is left. ''' def __init__(self, paths): self.open_files = [] self.paths = paths def __enter__(self): for f in self.paths: self.open_files.append(r.open(f)) return self.open_files def __exit__(self, _type, _value, _traceback): for f in self.open_files: # wrapped in a block so we still close other fails if one fails # to close try: f.close() except: pass def geodataframe_on_map(geodataframe): ''' Plot a GeoDataframe or GeoSeries on a Leaflet map; map automatically centers ''' bbox = geodataframe.unary_union.bounds minx, miny, maxx, maxy = bbox m = folium.Map([0, 0], tiles='cartodbpositron', scroll_wheel_zoom=False) folium.GeoJson(geodataframe.to_json()).add_to(m) m.fit_bounds([[miny, minx], [maxy, maxx]]) return m