prefect-planetary-computer
Warning
The Planetary Computer has retired its computing environment, which this extension helped Prefect users to work with. Therefore, the repository has been archived, and it is recommended to directly use pystac_client to interact with the Data Catalog, which remains openly available.
Prefect integrations with the Microsoft Planetary Computer (PC).
Overview
This collection includes a Credentials Block 🔑 to store and retrieve a PC subscription key and Jupyter Hub token, with convenience methods to easily interact with the PC Data Catalog 🌍 and Dask Gateway 🚀 server.
For more information about:
- using Azure services with Prefect and the Planetary Computer, check out the
prefect-azure
collection. - the integration between Prefect and Dask, check out the
prefect-dask
collection. - taking advantage of the Planetary Computer data catalog and compute resources, check out the Planetary Computer documentation.
Resources
For more tips on how to use tasks and flows in a Collection, check out Using Collections!
Installation
Install prefect-planetary-computer
with pip
:
pip install prefect-planetary-computer
Requires an installation of Python 3.8+.
We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.
These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.
Usage
Note
-
The following Examples are adapted from Planetary Computer - Scale with Dask.
-
Require the following additional packages:
pip install xarray zarr adlfs netcdf4 prefect_azure
- Make sure to share the same python dependencies - in particular
dask
anddistributed
- between the flow execution environment, the Dask Scheduler and Workers, as explained in the Dask docs.
Computing Dask Collections
Dask collection computations, such as Dask DataFrames, can be supported from within a Prefect task by creating a Dask Gateway cluster using the credentials block within the main flow or task itself.
# Prefect tasks are executed using the default ConcurrentTaskRunner
# Dask Collections tasks are executed on a new temporary Dask cluster
import xarray as xr
from prefect import flow, task, get_run_logger
from prefect_planetary_computer import PlanetaryComputerCredentials
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_upload
pc_credentials = PlanetaryComputerCredentials.load("PC_BLOCK_NAME")
bs_credentials = AzureBlobStorageCredentials.load("BS_BLOCK_NAME")
@task
def compute_mean(asset):
logger = get_run_logger()
with pc_credentials.new_gateway_cluster(
name="test-cluster",
image="pangeo/pangeo-notebook:latest"
) as cluster:
cluster.adapt(minimum=2, maximum=10)
client = cluster.get_client()
ds = xr.open_zarr(
asset.href,
**asset.extra_fields["xarray:open_kwargs"],
storage_options=asset.extra_fields["xarray:storage_options"]
)
logger.info(f"Daymet dataset info\n: {ds}")
timeseries = ds["tmin"].mean(dim=["x", "y"]).compute()
logger.info(f"Mean timeseries info\n: {timeseries}")
return timeseries
@flow
def pc_dask_flow():
# get a configured PySTAC client
catalog = pc_credentials.get_stac_catalog()
# compute the minimum daily temperature averaged over all of Hawaii,
# using the Daymet dataset
asset = catalog.get_collection("daymet-daily-hi").assets["zarr-abfs"]
prefect_future = compute_mean.submit(asset)
timeseries = prefect_future.result()
# save NetCDF timeseries file
timeseries.to_netcdf("timeseries.nc")
# upload to 'my-container' blob storage container
with open("timeseries.nc", "rb") as f:
blob = blob_storage_upload(
data=f.read(),
container="my-container",
blob="timeseries.nc",
blob_storage_credentials=bs_credentials,
overwrite=False,
)
# return the blob name of the uploaded timeseries object
return blob
pc_dask_flow()
Using the Dask Task Runner
Prefect's prefect_dask.DaskTaskRunner
automatically instatiates a temporary Dask cluster at flow execution time, enabling submission of both Prefect and Dask Collections tasks.
Warning
prefect-dask
requires:distributed==2022.2.0; python_version < '3.8' distributed>=2022.5.0,<=2023.3.1
- It requires less configuration on the Dask Workers side when using Prefect Cloud, you can get started for free.
# Both Prefect tasks and Dask Collections task are executed
# on a new temporary Dask cluster
import xarray as xr
from prefect import flow, task, get_run_logger
from prefect_planetary_computer import PlanetaryComputerCredentials
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_upload
from prefect_dask import get_dask_client
pc_credentials = PlanetaryComputerCredentials.load("PC_BLOCK_NAME")
bs_credentials = AzureBlobStorageCredentials.load("BS_BLOCK_NAME")
pc_runner = pc_credentials.get_dask_task_runner(
cluster_kwargs={
"image": "pangeo/pangeo-notebook:latest",
},
adapt_kwargs={'minimum': 1, 'maximum': 10, 'active': True}
)
@task
def compute_mean(asset):
logger = get_run_logger()
with get_dask_client() as client:
ds = xr.open_zarr(
asset.hr
**asset.extra_fields["xarray:open_kwargs"],
storage_options=asset.extra_fields["xarray:storage_options"]
)
logger.info(f"Daymet dataset info\n: {ds}")
timeseries = ds["tmin"].mean(dim=["x", "y"]).compute()
logger.info(f"Mean timeseries info\n: {timeseries}")
return timeseries
@flow(task_runner=pc_runner)
def pc_dask_flow():
# get a configured PySTAC client
catalog = pc_credentials.get_stac_catalog()
# compute the minimum daily temperature averaged over all of Hawaii,
# using the Daymet dataset
asset = catalog.get_collection("daymet-daily-hi").assets["zarr-abfs"]
mean_task = compute_mean.submit(asset)
timeseries = mean_task.result()
# save NetCDF timeseries file
timeseries.to_netcdf("timeseries.nc")
# upload to 'my-container' blob storage container
with open("timeseries.nc", "rb") as f:
blob = blob_storage_upload(
data=f.read(),
container="my-container",
blob="timeseries.nc",
blob_storage_credentials=bs_credentials,
overwrite=False,
)
# return the blob name of the uploaded timeseries object
return blob
pc_dask_flow()
Feedback
If you encounter any bugs while using prefect-planetary-computer
, feel free to open an issue in the prefect-planetary-computer repository.
If you have any questions or issues while using prefect-planetary-computer
, you can find help in either the Prefect Discourse forum or the Prefect Slack community.
Feel free to star or watch prefect-planetary-computer
for updates too!
Contributing
If you'd like to help contribute to fix an issue or add a feature to prefect-planetary-computer
, please propose changes through a pull request from a fork of the repository.
Here are the steps:
- Fork the repository
- Clone the forked repository
- Install the repository and its dependencies:
pip install -e ".[dev]"
- Make desired changes
- Add tests
- Insert an entry to CHANGELOG.md
- Install
pre-commit
to perform quality checks prior to commit:pre-commit install
git commit
,git push
, and create a pull request