from dateutil.parser import parse as parse_datetime import urllib.parse from pathlib import Path import fiona import folium import geopandas as gpd from matplotlib import pyplot as plt import numpy as np import rasterio as r from rasterio.features import geometry_mask from rasterio.warp import calculate_default_transform, reproject, Resampling from shapely.geometry import shape from shapely.geometry.polygon import Polygon from shapely.ops import unary_union from tempfile import TemporaryDirectory from zipfile import ZipFile import warnings def search_osm(place): ''' Returns a GeoDataFrame with results from OpenStreetMap Nominatim for the given search string. ''' urlescaped_place = urllib.parse.quote(place) search_url = ('https://nominatim.openstreetmap.org/search/?q={}' + '&format=geojson&polygon_geojson=1').format(urlescaped_place) return gpd.read_file(search_url) def plot_downloaded_products(products, area_of_interest, **kwargs): ax = kwargs.get('ax') alpha = kwargs.pop('alpha', 0.1) if not ax: fig, ax = plt.subplots(**kwargs) grey = '#777777' purple = '#988ED5' # allow plotting raw shapely geometries if 'plot' not in dir(products): products = gpd.GeoSeries(products) if 'plot' not in dir(area_of_interest): area_of_interest = gpd.GeoSeries(area_of_interest) # area of interest in background a = area_of_interest.plot(ax=ax, facecolor=grey) # product fill layer b = products.plot(ax=ax, facecolor=purple, alpha=alpha) # product stroke layer products.plot(ax=ax, facecolor='none', edgecolor=purple, alpha=0.4) if not kwargs.get('ax'): ax.set(title='Area of Interest and Available Products') return ax def scihub_band_paths(p, bands, resolution=None): ''' Given a zip file or folder at `p`, returns the paths inside p to the raster files containing information for the given bands. Because some bands are available in more than one resolution, this can be filtered by prodiding a third parameter (e.g. resolution='10m'). - `p` can be a string or a pathlib.Path. - `bands` can be a list of bands or a single band. The returned paths are formatted in the zip scheme as per Apache Commons VFS if necessary and can be directly opened by rasterio. ''' if type(bands) != list: # allow passing in a single band more easily bands = [bands] p = Path(p) # make sure we're dealing with a pathlib.Path if p.suffix == '.zip': # when dealing with zip files we have to read the filenames from the # archive first with ZipFile(p) as f: files = f.namelist() rasters = [Path(f'zip+file://{p.absolute()}!/{f}') for f in files if f.endswith('.jp2')] else: rasters = p.glob('**/*.jp2') # take only the paths that contain one of the given bands rasters = [raster for band in bands for raster in rasters if band in raster.name] # if a resolution is given, further discard the bands we don't need if resolution: rasters = [raster for raster in rasters if resolution in raster.name] return rasters def scihub_bgr_paths(product_path, resolution=None): ''' A convenence function to return the paths to the blue, green and red bands in the downloaded product at `product_path`. ''' return scihub_band_paths(product_path, ['B02', 'B03', 'B04'], resolution) def scihub_cloud_mask(product_path, **kwargs): ''' Given a `product_path` pointing to a product downlaoded from the Copernicus Open Access Hub, returns a shapely geometry representing the included cloud mask. If an additional parameter, `rasterize=True` is given, the returned cloud mask will be a rasterized numpy ndarray instead of a vector geometry. Two additional parameters, `target_path` and `target_transform` are needed to determine the size of this array. In this array, pixels with clouds are `False` and pixels without clouds are `True`. ''' with TemporaryDirectory() as tmp_dir: # we need the temporary directory to work around a problem with reading # vector files from zip archives p = Path(product_path) if p.suffix == '.zip': # when dealing with zip files we have to read the filenames from the # archive first with ZipFile(p) as f: files = f.namelist() file = [f for f in files if f.endswith('MSK_CLOUDS_B00.gml')][0] f.extract(file, tmp_dir) file = Path(tmp_dir) / file else: file = list(p.glob('**/MSK_CLOUDS_B00.gml'))[0] try: with fiona.open(file) as features: with warnings.catch_warnings(): warnings.simplefilter("ignore") # this returns a warning because the iterator has to be # rewound; while this is a performance issue, we can ignore it mask = unary_union([shape(f['geometry']) for f in features]) except ValueError: # empty cloud mask mask = Polygon([]) if kwargs.get('rasterize'): # return raster version of the vector geometry we found above target_shape = kwargs.get('target_shape') target_transform = kwargs.get('target_transform') if not target_transform or not target_shape: error_msg = 'target_transform and target_shape need to be set ' + \ 'to construct a rasterized cloud mask.' raise ValueError(error_msg) # completely empty cloud masks have to be handled separately if mask.is_empty: return np.full(target_shape, True) return geometry_mask(mask, out_shape=target_shape, transform=target_transform) else: return mask def scihub_band_date(band): ''' Given a string, `pathlib.Path` or `rasterio.DataSetReader`, returns the datetime encoded in the filename. ''' if type(band) is r.DatasetReader: file_name = band.name else: file_name = Path(band).name return parse_datetime(file_name.split('_')[-3]) # See https://book.pythontips.com/en/latest/context_managers.html#implementing-a-context-manager-as-a-class class RasterReaderList(): ''' This class allows opening a list of file paths in a `with` block using rasterio.open. They get automatically closed when the context created by the `with` block is left. ''' def __init__(self, paths): self.open_files = [] self.paths = paths def __enter__(self): for f in self.paths: self.open_files.append(r.open(f)) return self.open_files def __exit__(self, _type, _value, _traceback): for f in self.open_files: # wrapped in a block so we still close other fails if one fails # to close try: f.close() except: pass def geodataframe_on_map(geodataframe): ''' Plot a GeoDataframe or GeoSeries on a Leaflet map; map automatically centers ''' bbox = geodataframe.unary_union.bounds minx, miny, maxx, maxy = bbox m = folium.Map([0, 0], tiles='cartodbpositron', scroll_wheel_zoom=False) folium.GeoJson(geodataframe.to_json()).add_to(m) m.fit_bounds([[miny, minx], [maxy, maxx]]) return m