Skip to content

prefect_planetary_computer.credentials

Module handling Microsoft Planetary Computer credentials

Classes

PlanetaryComputerCredentials

Bases: Block

Block used to manage Microsoft Planetary Computer credentials.

The block stores a subscription key to access the full PC data catalog and a JupyterHub API token to instantiate clusters through Dask Gateway.

Parameters:

Name Type Description Default
subscription_key str

A subscription key to access the full PC data catalog.

required
hub_api_token str

The JupyterHub API token to instantiate clusters through Dask Gateway.

required
Example

Load stored Planetary Computer credentials:

from prefect_planetary_computer import PlanetaryComputerCredentials
pc_credentials_block = PlanetaryComputerCredentials.load("BLOCK_NAME")

Source code in prefect_planetary_computer/credentials.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
class PlanetaryComputerCredentials(Block):
    """
    Block used to manage Microsoft Planetary Computer credentials.

    The block stores a [subscription key](https://planetarycomputer.microsoft.com/docs/concepts/sas#when-an-account-is-needed)
    to access the full PC data catalog and a
    [JupyterHub API token](https://planetarycomputer.microsoft.com/docs/concepts/computing/#request-a-token-from-jupyterhub)
    to instantiate clusters through Dask Gateway.

    Args:
        subscription_key (str, optional): A subscription key to access the full PC data catalog.
        hub_api_token (str, optional): The JupyterHub API token to instantiate clusters through Dask Gateway.

    Example:
        Load stored Planetary Computer credentials:
        ```python
        from prefect_planetary_computer import PlanetaryComputerCredentials
        pc_credentials_block = PlanetaryComputerCredentials.load("BLOCK_NAME")
        ```
    """  # noqa E501

    _block_type_name = "Planetary Computer Credentials"
    _logo_url = "https://upload.wikimedia.org/wikipedia/commons/thumb/4/44/Microsoft_logo.svg/512px-Microsoft_logo.svg.png"  # noqa
    _documentation_url = "https://giorgiobasile.github.io/prefect-planetary-computer/blocks/#prefect_planetary_computer.blocks.PlanetarycomputerBlock"  # noqa

    subscription_key: Optional[SecretStr] = Field(
        default=None,
        description="A subscription key to access the full PC data catalog.",
        title="PC Subscription Key",
    )
    hub_api_token: Optional[SecretStr] = Field(
        default=None,
        description="The JupyterHub API token to instantiate clusters through Dask Gateway.",  # noqa E501
        title="JupyterHub API Token",
    )

    def __eq__(self, other: Any) -> bool:
        """
        Equality comparison between two `PlanetaryComputerCredentials` instances.
        """
        if not isinstance(other, PlanetaryComputerCredentials):
            return False
        return (
            self.subscription_key == other.subscription_key
            and self.hub_api_token.get_secret_value()
            == other.hub_api_token.get_secret_value()
        )

    def get_stac_catalog(
        self, sign_inplace: bool = True, **pystac_kwargs: Dict
    ) -> pystac_client.Client:
        """
        Provides a [PySTAC client](https://pystac-client.readthedocs.io/en/stable/api.html#client) for the PC data catalog,
        optionally signing items automatically as they are retrieved.

        For more information about PC signing, refer to the [docs](https://planetarycomputer.microsoft.com/docs/concepts/sas).

        Args:
            sign_inplace: Whether to automatically sign items through the
                [planetary_computer.sign_inplace](https://github.com/microsoft/planetary-computer-sdk-for-python#automatic-signing) modifier.
            pystac_kwargs: Additional keyword arguments to pass to the
                [`pystac_client.Client.open`](https://pystac-client.readthedocs.io/en/stable/api.html#pystac_client.Client.open) method.

        Returns:
            A PySTAC client for the PC Catalog.

        Example:
            Get a configured PySTAC client with automatic asset signing:
            ```python
            from prefect import flow
            from prefect_planetary_computer import PlanetaryComputerCredentials

            @flow
            def example_get_stac_catalog_flow():
                pc_credentials_block = PlanetaryComputerCredentials.load("BLOCK_NAME")

                catalog = pc_credentials_block.get_stac_catalog()

                # Search STAC catalog for Landsat Collection 2 Level 2 items
                time_range = "2020-12-01/2020-12-31"
                bbox = [-122.2751, 47.5469, -121.9613, 47.7458]

                search = catalog.search(collections=["landsat-c2-l2"], bbox=bbox, datetime=time_range)
                items = search.get_all_items()
                return len(items)

            example_get_stac_catalog_flow()
            ```
        """  # noqa E501

        if self.subscription_key:
            planetary_computer.set_subscription_key(
                self.subscription_key.get_secret_value()
            )

        modifier = planetary_computer.sign_inplace if sign_inplace else None

        return pystac_client.Client.open(
            CATALOG_URL, modifier=modifier, **pystac_kwargs
        )

    def get_gateway(self, **gateway_kwargs: Dict) -> dask_gateway.Gateway:
        """
        Provides a client for the PC Dask Gateway Server,
        setting the proper addresses and Jupyter authentication.

        For examples on how to use the Dask Gateway client, refer to the [PC - Scale with Dask](https://planetarycomputer.microsoft.com/docs/quickstarts/scale-with-dask/) tutorial.

        Args:
            gateway_kwargs: Additional keyword arguments to pass
                to the Dask Gateway client.

        Returns:
            A Dask Gateway client to instantiate clusters.

        Example:
            Get a configured Dask Gateway client:
            ```python
            from prefect import flow
            from prefect_planetary_computer import PlanetaryComputerCredentials

            @flow
            def example_get_gateway_flow():
                pc_credentials_block = PlanetaryComputerCredentials(
                    subscription_key = "sub-key",
                    hub_api_token = "hub-token"
                )
                gateway = pc_credentials_block.get_gateway()

                # List available clusters
                clusters = gateway.list_clusters()
                return len(clusters)

            example_get_gateway_flow()
            ```
        """  # noqa E501
        if self.hub_api_token is None:
            raise ValueError("JupyterHub API Token hasn't been provided.")

        return dask_gateway.Gateway(
            address=GATEWAY_ADDRESS,
            proxy_address=GATEWAY_PROXY_ADDRESS,
            auth=JupyterHubAuth(api_token=self.hub_api_token.get_secret_value()),
            **gateway_kwargs,
        )

    def new_gateway_cluster(
        self,
        worker_cores: Optional[float] = None,
        worker_memory: Optional[float] = None,
        image: Optional[str] = None,
        gpu: Optional[bool] = False,
        environment: Optional[dict] = None,
        **gateway_cluster_kwargs: Dict,
    ) -> dask_gateway.GatewayCluster:
        """
        Instantiate a new cluster from the PC Dask Gateway Server.

        Each argument corresponds to one of the available PC Dask Gateway cluster option.
        PC sets some defaults, but they can be overridden by passing the corresponding argument to this function -
        [see Helm chart](https://github.com/microsoft/planetary-computer-hub/blob/main/helm/chart/config.yaml).

        Args:
            worker_cores: Number of cores per worker, in the 0.1-8 range. Defaults to 1.
            worker_memory: Amount of memory per worker (in GiB) in the 1-64 range. Defaults to 8.
            image: The Docker image to be used for the workers.
                Defaults to [`pangeo/pangeo-notebook:latest`](https://hub.docker.com/layers/pangeo/pangeo-notebook/latest/images/sha256-94e97e24adf14e72c01f18c782b8c4e0efb1e05950a5f2d2e86e67adcbf547f8)
                To use the PC official images, refer to the [`planetary-computer-containers`](https://github.com/Microsoft/planetary-computer-containers) repo.
            gpu: Whether to use GPU workers. Defaults to False.
            environment: Environment variables to set on the workers. Defaults to the GDAL and PYGEOS-related variables set in the PC Hub.
            gateway_cluster_kwargs: Additional keyword arguments to pass to [`dask_gateway.GatewayCluster`](https://gateway.dask.org/api-client.html#dask_gateway.GatewayCluster) constructor.

        Returns:
            A client for the Dask cluster just created.

        Example:
            Instantiate a new cluster using PC Dask Gateway Server:
            ```python
            import dask.array as da
            from prefect import flow
            from prefect_planetary_computer import PlanetaryComputerCredentials

            @flow
            def example_new_cluster_flow():
                pc_credentials_block = PlanetaryComputerCredentials.load("BLOCK_NAME")

                # Create a Dask Gateway cluster with default configuration
                # it will be automatically used for any subsequent dask compute
                cluster = pc_credentials_block.new_cluster()

                # Scale the cluster to at most 10 workers
                cluster.adapt(minimum=2, maximum=10)

                # Create a Dask array with 1 billion elements and sum them
                x = da.random.random(1000000000)
                result = x.sum().compute()

                return result

            example_new_cluster_flow()
            ```
        """  # noqa E501

        gateway_cluster_kwargs.update(
            self._get_cluster_options_dict(
                worker_cores=worker_cores,
                worker_memory=worker_memory,
                image=image,
                gpu=gpu,
                environment=environment,
            )
        )

        return dask_gateway.GatewayCluster(
            address=GATEWAY_ADDRESS,
            proxy_address=GATEWAY_PROXY_ADDRESS,
            auth=JupyterHubAuth(api_token=self.hub_api_token.get_secret_value()),
            **gateway_cluster_kwargs,
        )

    def get_dask_task_runner(
        self,
        worker_cores: Optional[float] = None,
        worker_memory: Optional[float] = None,
        image: Optional[str] = None,
        gpu: Optional[bool] = False,
        environment: Optional[dict] = None,
        cluster_kwargs: Dict = None,
        adapt_kwargs: Dict = None,
        client_kwargs: Dict = None,
    ) -> DaskTaskRunner:

        """
        Provides a [`prefect_dask.DaskTaskRunner`](https://prefecthq.github.io/prefect-dask/task_runners/#prefect_dask.task_runners.DaskTaskRunner)
        with PC-specific configurations.

        This will use the PC Dask Gateway Server to create a new cluster the same way as
        [`PlanetaryComputerCredentials.new_gateway_cluster`](#new_gateway_cluster) does,
        but it will automatically happen at flow submission time.

        Each argument corresponds to one of the available PC Dask Gateway cluster option.
        PC sets some defaults, but they can be overridden by passing the corresponding argument to this function -
        [see Helm chart](https://github.com/microsoft/planetary-computer-hub/blob/main/helm/chart/config.yaml).

        Args:
            worker_cores: Number of cores per worker, in the 0.1-8 range. Defaults to 1.
            worker_memory: Amount of memory per worker (in GiB) in the 1-64 range. Defaults to 8.
            image: The Docker image to be used for the workers.
                Defaults to [`pangeo/pangeo-notebook:latest`](https://hub.docker.com/layers/pangeo/pangeo-notebook/latest/images/sha256-94e97e24adf14e72c01f18c782b8c4e0efb1e05950a5f2d2e86e67adcbf547f8)
                To use the PC official images, refer to the [`planetary-computer-containers`](https://github.com/Microsoft/planetary-computer-containers) repo.
            gpu: Whether to use GPU workers. Defaults to False.
            environment: Environment variables to set on the workers. Defaults to the GDAL and PYGEOS-related variables set in the PC Hub.
            cluster_kwargs: Additional kwargs to pass to
                [`dask_gateway.GatewayCluster`](https://gateway.dask.org/api-client.html#gatewaycluster)
                when creating a temporary dask cluster.
            adapt_kwargs: Additional kwargs to pass to
                [`dask_gateway.Gateway,adapt_cluster`](https://gateway.dask.org/api-client.html#dask_gateway.Gateway.adapt_cluster)
                when creating a temporary cluster.
                Note that adaptive scaling is only enabled if `adapt_kwargs` are provided.
            client_kwargs: Additional kwargs to use when creating a
                [`dask.distributed.Client`](https://distributed.dask.org/en/latest/api.html#client).

        Examples:
            Using a temporary PC Dask Gateway cluster:
            ```python
            from prefect import flow

            pc_credentials = PlanetaryComputerCredentials.load("BLOCK_NAME")

            pc_runner = pc_credentials.get_dask_task_runner()

            @flow(task_runner=pc_runner)
            def my_flow():
                ...
            ```

            Providing additional kwargs to the PC Dask Gateway cluster:
            ```python
            pc_runner = pc_credentials.get_dask_task_runner(
                cluster_kwargs={
                    "image": "pangeo/pangeo-notebook:latest",
                },
                adapt_kwargs={'minimum': 1, 'maximum': 10, 'active': True}
            )
            ```

            Connecting to an existing PC `GatewayCluster` (use `DaskTaskRunner` directly for this):
            ```python
            DaskTaskRunner(
                address=cluster.address,
                client_kwargs={'security': cluster.security}
            )
            ```

        """  # noqa: E501

        if cluster_kwargs is None:
            cluster_kwargs = {}

        cluster_kwargs.update(
            dict(
                address=GATEWAY_ADDRESS,
                proxy_address=GATEWAY_PROXY_ADDRESS,
                auth=JupyterHubAuth(api_token=self.hub_api_token.get_secret_value()),
            )
        )

        cluster_kwargs.update(
            self._get_cluster_options_dict(
                worker_cores=worker_cores,
                worker_memory=worker_memory,
                image=image,
                gpu=gpu,
                environment=environment,
            )
        )

        return DaskTaskRunner(
            cluster_class="dask_gateway.GatewayCluster",
            cluster_kwargs=cluster_kwargs,
            adapt_kwargs=adapt_kwargs,
            client_kwargs=client_kwargs,
        )

    def _get_cluster_options_dict(
        self,
        worker_cores: Optional[float] = None,
        worker_memory: Optional[float] = None,
        image: Optional[str] = None,
        gpu: Optional[bool] = False,
        environment: Optional[dict] = None,
    ) -> Dict[str, Any]:
        """
        Return a dictionary of cluster options accepted by
        the PC `dask_gateway.GatewayCluster` constructor.
        """

        cluster_options = {}

        if worker_cores is not None:
            cluster_options["worker_cores"] = worker_cores
        if worker_memory is not None:
            cluster_options["worker_memory"] = worker_memory
        if image is not None:
            cluster_options["image"] = image
        if gpu is not None:
            cluster_options["gpu"] = gpu
        if environment is not None:
            cluster_options["environment"] = environment

        return cluster_options

Functions

__eq__

Equality comparison between two PlanetaryComputerCredentials instances.

Source code in prefect_planetary_computer/credentials.py
61
62
63
64
65
66
67
68
69
70
71
def __eq__(self, other: Any) -> bool:
    """
    Equality comparison between two `PlanetaryComputerCredentials` instances.
    """
    if not isinstance(other, PlanetaryComputerCredentials):
        return False
    return (
        self.subscription_key == other.subscription_key
        and self.hub_api_token.get_secret_value()
        == other.hub_api_token.get_secret_value()
    )
get_dask_task_runner

Provides a prefect_dask.DaskTaskRunner with PC-specific configurations.

This will use the PC Dask Gateway Server to create a new cluster the same way as PlanetaryComputerCredentials.new_gateway_cluster does, but it will automatically happen at flow submission time.

Each argument corresponds to one of the available PC Dask Gateway cluster option. PC sets some defaults, but they can be overridden by passing the corresponding argument to this function - see Helm chart.

Parameters:

Name Type Description Default
worker_cores Optional[float]

Number of cores per worker, in the 0.1-8 range. Defaults to 1.

None
worker_memory Optional[float]

Amount of memory per worker (in GiB) in the 1-64 range. Defaults to 8.

None
image Optional[str]

The Docker image to be used for the workers. Defaults to pangeo/pangeo-notebook:latest To use the PC official images, refer to the planetary-computer-containers repo.

None
gpu Optional[bool]

Whether to use GPU workers. Defaults to False.

False
environment Optional[dict]

Environment variables to set on the workers. Defaults to the GDAL and PYGEOS-related variables set in the PC Hub.

None
cluster_kwargs Dict

Additional kwargs to pass to dask_gateway.GatewayCluster when creating a temporary dask cluster.

None
adapt_kwargs Dict

Additional kwargs to pass to dask_gateway.Gateway,adapt_cluster when creating a temporary cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided.

None
client_kwargs Dict

Additional kwargs to use when creating a dask.distributed.Client.

None

Examples:

Using a temporary PC Dask Gateway cluster:

from prefect import flow

pc_credentials = PlanetaryComputerCredentials.load("BLOCK_NAME")

pc_runner = pc_credentials.get_dask_task_runner()

@flow(task_runner=pc_runner)
def my_flow():
    ...

Providing additional kwargs to the PC Dask Gateway cluster:

pc_runner = pc_credentials.get_dask_task_runner(
    cluster_kwargs={
        "image": "pangeo/pangeo-notebook:latest",
    },
    adapt_kwargs={'minimum': 1, 'maximum': 10, 'active': True}
)

Connecting to an existing PC GatewayCluster (use DaskTaskRunner directly for this):

DaskTaskRunner(
    address=cluster.address,
    client_kwargs={'security': cluster.security}
)

Source code in prefect_planetary_computer/credentials.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def get_dask_task_runner(
    self,
    worker_cores: Optional[float] = None,
    worker_memory: Optional[float] = None,
    image: Optional[str] = None,
    gpu: Optional[bool] = False,
    environment: Optional[dict] = None,
    cluster_kwargs: Dict = None,
    adapt_kwargs: Dict = None,
    client_kwargs: Dict = None,
) -> DaskTaskRunner:

    """
    Provides a [`prefect_dask.DaskTaskRunner`](https://prefecthq.github.io/prefect-dask/task_runners/#prefect_dask.task_runners.DaskTaskRunner)
    with PC-specific configurations.

    This will use the PC Dask Gateway Server to create a new cluster the same way as
    [`PlanetaryComputerCredentials.new_gateway_cluster`](#new_gateway_cluster) does,
    but it will automatically happen at flow submission time.

    Each argument corresponds to one of the available PC Dask Gateway cluster option.
    PC sets some defaults, but they can be overridden by passing the corresponding argument to this function -
    [see Helm chart](https://github.com/microsoft/planetary-computer-hub/blob/main/helm/chart/config.yaml).

    Args:
        worker_cores: Number of cores per worker, in the 0.1-8 range. Defaults to 1.
        worker_memory: Amount of memory per worker (in GiB) in the 1-64 range. Defaults to 8.
        image: The Docker image to be used for the workers.
            Defaults to [`pangeo/pangeo-notebook:latest`](https://hub.docker.com/layers/pangeo/pangeo-notebook/latest/images/sha256-94e97e24adf14e72c01f18c782b8c4e0efb1e05950a5f2d2e86e67adcbf547f8)
            To use the PC official images, refer to the [`planetary-computer-containers`](https://github.com/Microsoft/planetary-computer-containers) repo.
        gpu: Whether to use GPU workers. Defaults to False.
        environment: Environment variables to set on the workers. Defaults to the GDAL and PYGEOS-related variables set in the PC Hub.
        cluster_kwargs: Additional kwargs to pass to
            [`dask_gateway.GatewayCluster`](https://gateway.dask.org/api-client.html#gatewaycluster)
            when creating a temporary dask cluster.
        adapt_kwargs: Additional kwargs to pass to
            [`dask_gateway.Gateway,adapt_cluster`](https://gateway.dask.org/api-client.html#dask_gateway.Gateway.adapt_cluster)
            when creating a temporary cluster.
            Note that adaptive scaling is only enabled if `adapt_kwargs` are provided.
        client_kwargs: Additional kwargs to use when creating a
            [`dask.distributed.Client`](https://distributed.dask.org/en/latest/api.html#client).

    Examples:
        Using a temporary PC Dask Gateway cluster:
        ```python
        from prefect import flow

        pc_credentials = PlanetaryComputerCredentials.load("BLOCK_NAME")

        pc_runner = pc_credentials.get_dask_task_runner()

        @flow(task_runner=pc_runner)
        def my_flow():
            ...
        ```

        Providing additional kwargs to the PC Dask Gateway cluster:
        ```python
        pc_runner = pc_credentials.get_dask_task_runner(
            cluster_kwargs={
                "image": "pangeo/pangeo-notebook:latest",
            },
            adapt_kwargs={'minimum': 1, 'maximum': 10, 'active': True}
        )
        ```

        Connecting to an existing PC `GatewayCluster` (use `DaskTaskRunner` directly for this):
        ```python
        DaskTaskRunner(
            address=cluster.address,
            client_kwargs={'security': cluster.security}
        )
        ```

    """  # noqa: E501

    if cluster_kwargs is None:
        cluster_kwargs = {}

    cluster_kwargs.update(
        dict(
            address=GATEWAY_ADDRESS,
            proxy_address=GATEWAY_PROXY_ADDRESS,
            auth=JupyterHubAuth(api_token=self.hub_api_token.get_secret_value()),
        )
    )

    cluster_kwargs.update(
        self._get_cluster_options_dict(
            worker_cores=worker_cores,
            worker_memory=worker_memory,
            image=image,
            gpu=gpu,
            environment=environment,
        )
    )

    return DaskTaskRunner(
        cluster_class="dask_gateway.GatewayCluster",
        cluster_kwargs=cluster_kwargs,
        adapt_kwargs=adapt_kwargs,
        client_kwargs=client_kwargs,
    )
get_gateway

Provides a client for the PC Dask Gateway Server, setting the proper addresses and Jupyter authentication.

For examples on how to use the Dask Gateway client, refer to the PC - Scale with Dask tutorial.

Parameters:

Name Type Description Default
gateway_kwargs Dict

Additional keyword arguments to pass to the Dask Gateway client.

{}

Returns:

Type Description
Gateway

A Dask Gateway client to instantiate clusters.

Example

Get a configured Dask Gateway client:

from prefect import flow
from prefect_planetary_computer import PlanetaryComputerCredentials

@flow
def example_get_gateway_flow():
    pc_credentials_block = PlanetaryComputerCredentials(
        subscription_key = "sub-key",
        hub_api_token = "hub-token"
    )
    gateway = pc_credentials_block.get_gateway()

    # List available clusters
    clusters = gateway.list_clusters()
    return len(clusters)

example_get_gateway_flow()

Source code in prefect_planetary_computer/credentials.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def get_gateway(self, **gateway_kwargs: Dict) -> dask_gateway.Gateway:
    """
    Provides a client for the PC Dask Gateway Server,
    setting the proper addresses and Jupyter authentication.

    For examples on how to use the Dask Gateway client, refer to the [PC - Scale with Dask](https://planetarycomputer.microsoft.com/docs/quickstarts/scale-with-dask/) tutorial.

    Args:
        gateway_kwargs: Additional keyword arguments to pass
            to the Dask Gateway client.

    Returns:
        A Dask Gateway client to instantiate clusters.

    Example:
        Get a configured Dask Gateway client:
        ```python
        from prefect import flow
        from prefect_planetary_computer import PlanetaryComputerCredentials

        @flow
        def example_get_gateway_flow():
            pc_credentials_block = PlanetaryComputerCredentials(
                subscription_key = "sub-key",
                hub_api_token = "hub-token"
            )
            gateway = pc_credentials_block.get_gateway()

            # List available clusters
            clusters = gateway.list_clusters()
            return len(clusters)

        example_get_gateway_flow()
        ```
    """  # noqa E501
    if self.hub_api_token is None:
        raise ValueError("JupyterHub API Token hasn't been provided.")

    return dask_gateway.Gateway(
        address=GATEWAY_ADDRESS,
        proxy_address=GATEWAY_PROXY_ADDRESS,
        auth=JupyterHubAuth(api_token=self.hub_api_token.get_secret_value()),
        **gateway_kwargs,
    )
get_stac_catalog

Provides a PySTAC client for the PC data catalog, optionally signing items automatically as they are retrieved.

For more information about PC signing, refer to the docs.

Parameters:

Name Type Description Default
sign_inplace bool

Whether to automatically sign items through the planetary_computer.sign_inplace modifier.

True
pystac_kwargs Dict

Additional keyword arguments to pass to the pystac_client.Client.open method.

{}

Returns:

Type Description
Client

A PySTAC client for the PC Catalog.

Example

Get a configured PySTAC client with automatic asset signing:

from prefect import flow
from prefect_planetary_computer import PlanetaryComputerCredentials

@flow
def example_get_stac_catalog_flow():
    pc_credentials_block = PlanetaryComputerCredentials.load("BLOCK_NAME")

    catalog = pc_credentials_block.get_stac_catalog()

    # Search STAC catalog for Landsat Collection 2 Level 2 items
    time_range = "2020-12-01/2020-12-31"
    bbox = [-122.2751, 47.5469, -121.9613, 47.7458]

    search = catalog.search(collections=["landsat-c2-l2"], bbox=bbox, datetime=time_range)
    items = search.get_all_items()
    return len(items)

example_get_stac_catalog_flow()

Source code in prefect_planetary_computer/credentials.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def get_stac_catalog(
    self, sign_inplace: bool = True, **pystac_kwargs: Dict
) -> pystac_client.Client:
    """
    Provides a [PySTAC client](https://pystac-client.readthedocs.io/en/stable/api.html#client) for the PC data catalog,
    optionally signing items automatically as they are retrieved.

    For more information about PC signing, refer to the [docs](https://planetarycomputer.microsoft.com/docs/concepts/sas).

    Args:
        sign_inplace: Whether to automatically sign items through the
            [planetary_computer.sign_inplace](https://github.com/microsoft/planetary-computer-sdk-for-python#automatic-signing) modifier.
        pystac_kwargs: Additional keyword arguments to pass to the
            [`pystac_client.Client.open`](https://pystac-client.readthedocs.io/en/stable/api.html#pystac_client.Client.open) method.

    Returns:
        A PySTAC client for the PC Catalog.

    Example:
        Get a configured PySTAC client with automatic asset signing:
        ```python
        from prefect import flow
        from prefect_planetary_computer import PlanetaryComputerCredentials

        @flow
        def example_get_stac_catalog_flow():
            pc_credentials_block = PlanetaryComputerCredentials.load("BLOCK_NAME")

            catalog = pc_credentials_block.get_stac_catalog()

            # Search STAC catalog for Landsat Collection 2 Level 2 items
            time_range = "2020-12-01/2020-12-31"
            bbox = [-122.2751, 47.5469, -121.9613, 47.7458]

            search = catalog.search(collections=["landsat-c2-l2"], bbox=bbox, datetime=time_range)
            items = search.get_all_items()
            return len(items)

        example_get_stac_catalog_flow()
        ```
    """  # noqa E501

    if self.subscription_key:
        planetary_computer.set_subscription_key(
            self.subscription_key.get_secret_value()
        )

    modifier = planetary_computer.sign_inplace if sign_inplace else None

    return pystac_client.Client.open(
        CATALOG_URL, modifier=modifier, **pystac_kwargs
    )
new_gateway_cluster

Instantiate a new cluster from the PC Dask Gateway Server.

Each argument corresponds to one of the available PC Dask Gateway cluster option. PC sets some defaults, but they can be overridden by passing the corresponding argument to this function - see Helm chart.

Parameters:

Name Type Description Default
worker_cores Optional[float]

Number of cores per worker, in the 0.1-8 range. Defaults to 1.

None
worker_memory Optional[float]

Amount of memory per worker (in GiB) in the 1-64 range. Defaults to 8.

None
image Optional[str]

The Docker image to be used for the workers. Defaults to pangeo/pangeo-notebook:latest To use the PC official images, refer to the planetary-computer-containers repo.

None
gpu Optional[bool]

Whether to use GPU workers. Defaults to False.

False
environment Optional[dict]

Environment variables to set on the workers. Defaults to the GDAL and PYGEOS-related variables set in the PC Hub.

None
gateway_cluster_kwargs Dict

Additional keyword arguments to pass to dask_gateway.GatewayCluster constructor.

{}

Returns:

Type Description
GatewayCluster

A client for the Dask cluster just created.

Example

Instantiate a new cluster using PC Dask Gateway Server:

import dask.array as da
from prefect import flow
from prefect_planetary_computer import PlanetaryComputerCredentials

@flow
def example_new_cluster_flow():
    pc_credentials_block = PlanetaryComputerCredentials.load("BLOCK_NAME")

    # Create a Dask Gateway cluster with default configuration
    # it will be automatically used for any subsequent dask compute
    cluster = pc_credentials_block.new_cluster()

    # Scale the cluster to at most 10 workers
    cluster.adapt(minimum=2, maximum=10)

    # Create a Dask array with 1 billion elements and sum them
    x = da.random.random(1000000000)
    result = x.sum().compute()

    return result

example_new_cluster_flow()

Source code in prefect_planetary_computer/credentials.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def new_gateway_cluster(
    self,
    worker_cores: Optional[float] = None,
    worker_memory: Optional[float] = None,
    image: Optional[str] = None,
    gpu: Optional[bool] = False,
    environment: Optional[dict] = None,
    **gateway_cluster_kwargs: Dict,
) -> dask_gateway.GatewayCluster:
    """
    Instantiate a new cluster from the PC Dask Gateway Server.

    Each argument corresponds to one of the available PC Dask Gateway cluster option.
    PC sets some defaults, but they can be overridden by passing the corresponding argument to this function -
    [see Helm chart](https://github.com/microsoft/planetary-computer-hub/blob/main/helm/chart/config.yaml).

    Args:
        worker_cores: Number of cores per worker, in the 0.1-8 range. Defaults to 1.
        worker_memory: Amount of memory per worker (in GiB) in the 1-64 range. Defaults to 8.
        image: The Docker image to be used for the workers.
            Defaults to [`pangeo/pangeo-notebook:latest`](https://hub.docker.com/layers/pangeo/pangeo-notebook/latest/images/sha256-94e97e24adf14e72c01f18c782b8c4e0efb1e05950a5f2d2e86e67adcbf547f8)
            To use the PC official images, refer to the [`planetary-computer-containers`](https://github.com/Microsoft/planetary-computer-containers) repo.
        gpu: Whether to use GPU workers. Defaults to False.
        environment: Environment variables to set on the workers. Defaults to the GDAL and PYGEOS-related variables set in the PC Hub.
        gateway_cluster_kwargs: Additional keyword arguments to pass to [`dask_gateway.GatewayCluster`](https://gateway.dask.org/api-client.html#dask_gateway.GatewayCluster) constructor.

    Returns:
        A client for the Dask cluster just created.

    Example:
        Instantiate a new cluster using PC Dask Gateway Server:
        ```python
        import dask.array as da
        from prefect import flow
        from prefect_planetary_computer import PlanetaryComputerCredentials

        @flow
        def example_new_cluster_flow():
            pc_credentials_block = PlanetaryComputerCredentials.load("BLOCK_NAME")

            # Create a Dask Gateway cluster with default configuration
            # it will be automatically used for any subsequent dask compute
            cluster = pc_credentials_block.new_cluster()

            # Scale the cluster to at most 10 workers
            cluster.adapt(minimum=2, maximum=10)

            # Create a Dask array with 1 billion elements and sum them
            x = da.random.random(1000000000)
            result = x.sum().compute()

            return result

        example_new_cluster_flow()
        ```
    """  # noqa E501

    gateway_cluster_kwargs.update(
        self._get_cluster_options_dict(
            worker_cores=worker_cores,
            worker_memory=worker_memory,
            image=image,
            gpu=gpu,
            environment=environment,
        )
    )

    return dask_gateway.GatewayCluster(
        address=GATEWAY_ADDRESS,
        proxy_address=GATEWAY_PROXY_ADDRESS,
        auth=JupyterHubAuth(api_token=self.hub_api_token.get_secret_value()),
        **gateway_cluster_kwargs,
    )