File size: 14,488 Bytes
5c718d1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
from datetime import datetime
import ee
from func_timeout import func_set_timeout
import pandas as pd
from PIL import Image
import requests
import tempfile
import io
from tqdm import tqdm
import functools
import re  # Used in an eval statement
from typing import List
from typing import Union
from typing import Any


class DataLoader:
    """
    Main class for loading and exploring data from satellite images.
    The goal is to load an ImageCollection and to filter that collection according to needs, with methods like
    filter, filterDate, filterBounds, select. These will work just like earth engine's methods with the same names.

    This class, just like earth engine, works with lazy loading and compute. This means that running filterBounds
    will not actually filter the image collection until required, e.g. when counting the images by accessing .count
    property.
    However, it will only load once the information it needs, unless additional filtering is made.

    This works thanks to the signal_change decorator. If you develop a new filtering method for this class,
    you will need to decorate your method with @signal_change.
    In addition, if you develop a new method that will require to run getInfo to actually load data from
    Google Earth Engine, you will need to use _get_timeout_info(your object before getInfo). This will run
    getInfo with a timeout (currently set to 10 seconds).
    It is important to use a timeout to avoid unexpected run times.

    Usage:
    >>> dl = DataLoader(satellite_name="COPERNICUS/S2_SR",  \
        start_date='2021-01-01',                            \
        end_date='2021-01-15',                              \
        bands=["TCI_R", "TCI_G", "TCI_B"],                  \
        geographic_bounds=ee.Geometry.Point(*[5.238728194366604, 44.474864056855935]).buffer(500) \
               )

    Get a pandas dataframe with all pixel values as a timeseries:
    >>> dl.getRegion(dl.bounds, 500)
    >>> dl.region.head(2)
    [Out]
            id	longitude	latitude	time	B1	B2	B3	B4	B5	B6	...	WVP	SCL	TCI_R	TCI_G	TCI_B	MSK_CLDPRB	MSK_SNWPRB	QA10	QA20	QA60
    0	20210102T104441_20210102T104435_T31TFK	5.234932	44.473344	2021-01-02 10:48:36.299	6297	5955	5768	5773	5965	5883	...	393	8	255	255	255	0	95	0	0	1024
    1	20210104T103329_20210104T103331_T31TFK	5.234932	44.473344	2021-01-04 10:38:38.304	5547	5355	5184	5090	5254	5229	...	314	9	255	255	255	29	9	0	0	1024

    >>> dl.date_range
    [Out]
    {'max': datetime.datetime(2021, 1, 14, 11, 38, 39, 208000),
     'min': datetime.datetime(2021, 1, 2, 11, 48, 36, 299000)}

    >>> dl.count
    [Out]
    6

    >>> dl.collection_info  # constains a html description of the dataset in "description"

    >>> dl.image_ids
    [Out]
    ['COPERNICUS/S2_SR/20210102T104441_20210102T104435_T31TFK',
     'COPERNICUS/S2_SR/20210104T103329_20210104T103331_T31TFK',
     'COPERNICUS/S2_SR/20210107T104329_20210107T104328_T31TFK',
     'COPERNICUS/S2_SR/20210109T103421_20210109T103431_T31TFK',
     'COPERNICUS/S2_SR/20210112T104411_20210112T104438_T31TFK',
     'COPERNICUS/S2_SR/20210114T103309_20210114T103305_T31TFK']

    # Download the image
    >>> img = dl.download_image(dl.image_ids[3])

    # Download all images as a list
    >>> imgs = dl.download_all_images(scale=1)

    """
    def __init__(self,
                 satellite_name: str,
                 bands: Union[List, str] = None,
                 start_date: str = None,
                 end_date: str = None,
                 geographic_bounds: ee.geometry = None,
                 scale: int = 10,
                 crs: str = "EPSG:32630"
                 ):
        """

        Args:
            satellite_name: satellite to use. Examples: COPERNICUS/S2_SR, COPERNICUS/CORINE/V20/100m.
                See https://developers.google.com/earth-engine/datasets for the full list.
            bands: list of bands to load.
            start_date: lowest possible date. Might be lower than the actual date of the first picture.
            end_date: Latest possible date.
            geographic_bounds: Region of interest.
        """
        self.satellite_name = satellite_name
        if isinstance(bands, str):
            bands = [bands]
        self.bands = bands if bands is not None else list()
        if start_date is None or end_date is None:
            assert (start_date is not None) and (end_date is not None), "start_date and end_date must both be provided"
        self.start_date = start_date
        self.end_date = end_date
        self.bounds = geographic_bounds

        # Lazy computed
        self._available_images = None

        # Start getting info from google cloud
        if satellite_name:
            self.image_collection = ee.ImageCollection(self.satellite_name)
        if self.bounds:
            self.filterBounds(self.bounds)
        if self.start_date is not None:
            self.filterDate(self.start_date, self.end_date)
        self.scale = scale
        self.crs = crs
        self.image_list = None
        self._df_image_list = None
        self.image_collection_info = None
        self._date_range = None
        self.date_filter_change = False
        self._count = None

        # Bool for caching
        self.filter_change = True
        self._describe = None

    def signal_change(func):
        """Signals that additional filtering was performed. To be used
        as a decorator."""
        @functools.wraps(func)
        def wrap(self, *args, **kwargs):
            self.filter_change = True
            self.date_filter_change = True
            return func(self, *args, **kwargs)
        return wrap

    @staticmethod
    @func_set_timeout(10)
    def _get_timeout_info(instance: Any):
        """Runs getInfo on anything that is passed, with a timeout."""
        return instance.getInfo()

    @staticmethod
    def _authenticate_gee():
        """Authenticates earth engine if needed, and initializes."""
        try:
            ee.Initialize()
        except Exception as e:
            # Trigger the authentication flow.
            ee.Authenticate()
            # Initialize the library.
            ee.Initialize()

    def filter(self, ee_filter: ee.Filter):
        """Applies a filter to the image_collection attribute. This can be useful for example
        to filter out clouds

        Args:
            ee_filter: Filter to apply, must be an instance of ee.Filter.

        Returns: self, for operation chaining as possible with the earth engine API.

        """
        self.image_collection = self.image_collection.filter(ee_filter)

        return self

    @property
    def count(self):
        """Number of images in the ImageCollection"""
        if self.filter_change or self._count is None:
            self._count = self._get_timeout_info(self.image_collection.size())
            self.filter_change = False
        return self._count

    @property
    def available_images(self):
        """Gets the ImageCollection info"""
        if self.filter_change or self._available_images is None:
            self._available_images = self._get_timeout_info(self.image_collection)
        return self._available_images

    @signal_change
    def filterDate(self, *args, **kwargs):
        """Wrapper for the filterDate method in earth engine on the ImageCollection"""
        self.image_collection = self.image_collection.filterDate(*args, **kwargs)
        return self

    @signal_change
    def getRegion(self, *args, **kwargs):
        """Wrapper for the getRegion method in earth engine on the ImageCollection.
        Caveat! getRegion does not return an image collection, so the image_list attribute gets
        updated instead of the image_collection attribute. However, the instance of the DataLoader class
        is still returned, so this could be chained with another method on ImageCollection, which wouldn't be
        possible using earth engine.
        """
        self.image_list = self.image_collection.getRegion(*args, **kwargs)
        return self

    @signal_change
    def filterBounds(self, geometry, *args, **kwargs):
        """Wrapper for the filterBounds method in earth engine on the ImageCollection"""
        self.image_collection = self.image_collection.filterBounds(geometry, *args, **kwargs)
        self.bounds = geometry
        return self

    @signal_change
    def select(self, *bands, **kwargs):
        """Wrapper for the select method in earth engine on the ImageCollection"""
        self.image_collection = self.image_collection.select(*bands, **kwargs)
        self.bands = list(set(self.bands) | set(bands))  # Unique bands
        return self

    @property
    def date_range(self):
        """Gets the actual date range of the images in the image collection."""
        if self.date_filter_change or self._date_range is None:
            date_range = self.image_collection.reduceColumns(ee.Reducer.minMax(), ["system:time_start"]).getInfo()
            self._date_range = {key: datetime.fromtimestamp(value/1e3) for key, value in date_range.items()}
            self.date_filter_change = False
        return self._date_range

    @property
    def region(self):
        """Gets a time series as a pandas DataFrame of the band values for the specified region."""
        if self.filter_change:
            if self.image_list is None:
                self.getRegion()
            res_list = self._get_timeout_info(self.image_list)
            df = pd.DataFrame(res_list[1:], columns=res_list[0])
            df.loc[:, "time"] = pd.to_datetime(df.loc[:, "time"], unit="ms")
            self._df_image_list = df
            self.filter_change = False
        return self._df_image_list

    @property
    def collection_info(self):
        """Runs getInfo on the image collection (the first time the next time the previously
        populated attribute will be returned)."""
        if self.count > 5000:
            raise Exception("Too many images to load. Try filtering more")
        if self.filter_change or self.image_collection_info is None:
            self.image_collection_info = self._get_timeout_info(self.image_collection)
        return self.image_collection_info

    @property
    def image_ids(self):
        """list of names of available images in the image collection"""
        return [i["id"] for i in self.collection_info["features"]]

    def __repr__(self):
        try:
            return f"""
Size: {self.count}

Dataset date ranges:
From: {self.date_range["min"]}
To: {self.date_range["max"]}

Selected bands:
{self.bands}
            
            """
        except Exception as e:
            raise Exception("Impossible to represent the dataset. Try filtering more. Error handling to do.")

    def reproject(self, image, **kwargs):
        def resolve(name: str):
            # Resolve crs
            if name in kwargs:
                item = kwargs[name]
            elif getattr(self, name):
                item = getattr(self, name)
            else:
                item = None
            return item
        crs = resolve("crs")
        scale = resolve("scale")
        if crs is not None or scale is not None:
            image = image.reproject(crs, None, scale)
        return image

    def download_image(self, image_id: str, **kwargs):
        """Downloads an image based on its id / name. The additional arguments are passed
        to getThumbUrl, and could be scale, max, min...
        """
        img = ee.Image(image_id).select(*self.bands)
        img = self.reproject(img, **kwargs)
        input_args = {'region': self.bounds}
        input_args.update(**kwargs)
        all_bands = self.collection_info["features"][0]["bands"]
        selected_bands = [band for i, band in enumerate(all_bands) if all_bands[i]["id"] in self.bands]
        if "min" not in input_args:
            input_args.update({"min": selected_bands[0]["data_type"]["min"]})
        if "max" not in input_args:
            input_args.update({"max": selected_bands[0]["data_type"]["max"]})
        url = img.getThumbUrl(input_args)
        buffer = tempfile.SpooledTemporaryFile(max_size=1e9)
        r = requests.get(url, stream=True)
        if r.status_code == 200:
            downloaded = 0
            # filesize = int(r.headers['content-length'])
            for chunk in r.iter_content(chunk_size=1024):
                downloaded += len(chunk)
                buffer.write(chunk)
            buffer.seek(0)
            img = Image.open(io.BytesIO(buffer.read()))
        buffer.close()
        return img

    @staticmethod
    def _regex(regex: str, im_id_list: List[str], include: bool) -> list:
        """
        Filters the im_id_list based on a regular expression. This is useful before downloading
        a collection of images. For example, using (.*)TXT with include=True will only download images
        that end with TXT, wich for Nantes means filtering out empty or half empty images.
        Args:
            regex: python regex as a strng
            im_id_list: list, image id list
            include: whether to include or exclude elements that match the regex.

        Returns: filtered list.

        """
        expression = "re.match('{regex}', '{im_id}') is not None"
        if not include:
            expression = "not " + expression
        filtered_list = list()
        for im_id in im_id_list:
            if eval(expression.format(regex=regex, im_id=im_id)):
                filtered_list.append(im_id)
        return filtered_list

    def download_all_images(self, regex_exclude: str = None, regex_include: str = None, **kwargs):
        """
        Runs download_image in a for loop around the available images.
        Makes it possible to filter images to download based on a regex.
        Args:
            regex_exclude: any image that matches this regex will be excluded.
            regex_include: any image that matches this regex will be included
            **kwargs: arguments to be passed to getThumbUrl

        Returns: list of PIL images
        """
        images = list()
        image_ids = self.image_ids
        if regex_exclude is not None:
            image_ids = self._regex(regex_exclude, image_ids, include=False)
        if regex_include is not None:
            image_ids = self._regex(regex_include, image_ids, include=True)
        for i in tqdm(range(len(image_ids))):
            images.append(self.download_image(image_ids[i], **kwargs))
        return images