New raster cloud mask implementation

This commit is contained in:
heyarne 2021-03-06 16:41:24 +00:00
commit 6387dc28bc
8 changed files with 914 additions and 933 deletions

View file

@ -8,8 +8,9 @@ import geopandas as gpd
from matplotlib import pyplot as plt
import numpy as np
import rasterio as r
from rasterio.features import geometry_mask
from rasterio.features import geometry_mask, geometry_window
from rasterio.warp import calculate_default_transform, reproject, Resampling
from rasterio.windows import Window
from shapely.geometry import shape
from shapely.geometry.polygon import Polygon
@ -101,63 +102,45 @@ def scihub_bgr_paths(product_path, resolution=None):
return scihub_band_paths(product_path, ['B02', 'B03', 'B04'], resolution)
def scihub_cloud_mask(product_path, **kwargs):
def scihub_cloud_mask(product_path, area=None, cloud_probability=0.75, resolution='10m'):
'''
Given a `product_path` pointing to a product downlaoded from the Copernicus
Open Access Hub, returns a shapely geometry representing the included cloud
mask.
If an additional parameter, `rasterize=True` is given, the returned cloud
mask will be a rasterized numpy ndarray instead of a vector geometry. Two
additional parameters, `target_path` and `target_transform` are needed to
determine the size of this array. In this array, pixels with clouds are
`False` and pixels without clouds are `True`.
Returns a numpy array with boolean values representing a product's cloud
mask. Cloudy pixels are True, non-cloudy pixels are False.
'''
with TemporaryDirectory() as tmp_dir:
# we need the temporary directory to work around a problem with reading
# vector files from zip archives
p = Path(product_path)
if p.suffix == '.zip':
# when dealing with zip files we have to read the filenames from the
# archive first
with ZipFile(p) as f:
files = f.namelist()
file = [f for f in files if f.endswith('MSK_CLOUDS_B00.gml')][0]
f.extract(file, tmp_dir)
file = Path(tmp_dir) / file
else:
file = list(p.glob('**/MSK_CLOUDS_B00.gml'))[0]
try:
with fiona.open(file) as features:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
# this returns a warning because the iterator has to be
# rewound; while this is a performance issue, we can ignore it
mask = unary_union([shape(f['geometry']) for f in features])
except ValueError:
# empty cloud mask
mask = Polygon([])
if kwargs.get('rasterize'):
# return raster version of the vector geometry we found above
target_shape = kwargs.get('target_shape')
target_transform = kwargs.get('target_transform')
if not target_transform or not target_shape:
error_msg = 'target_transform and target_shape need to be set ' + \
'to construct a rasterized cloud mask.'
raise ValueError(error_msg)
# completely empty cloud masks have to be handled separately
if mask.is_empty:
return np.full(target_shape, True)
return geometry_mask(mask,
out_shape=target_shape,
transform=target_transform)
# TODO: Subset for area
# there is no mask in 10m resolution an we need to manually upsample it;
# upsampling code is taken from the rasterio documentation:
# https://rasterio.readthedocs.io/en/latest/topics/resampling.html
if resolution in ['20m', '60m']:
mask_resolution = resolution
upscale_factor = 1
else:
return mask
mask_resolution = '20m'
upscale_factor = 2
mask_path = scihub_band_paths(product_path, ['MSK_CLDPRB'], mask_resolution)[0]
with r.open(mask_path) as mask:
if isinstance(area, gpd.GeoDataFrame):
window = geometry_window(mask, area.to_crs(mask.crs)['geometry'])
else:
window = Window(0, 0, mask.width, mask.height)
mask_data = mask.read(
out_shape=(
mask.count,
int(window.height * upscale_factor),
int(window.width * upscale_factor)
),
window=window,
resampling=Resampling.bilinear
)
mask_transform = mask.transform * mask.transform.scale(
(mask.width / mask_data.shape[-1]),
(mask.height / mask_data.shape[-2])
)
# mask_data values range from 0 to 100, cloud_probability from 0 to 1
return mask_data >= (cloud_probability * 100), mask_transform
def scihub_band_date(band):