Skywatch API Notebook

Author

Sparkgeo

This notebook is used to query and download PlanetScope imagery using the Skywatch EarthCache API within a specified AOI and date range.
The notebook performs the following tasks: 1. Queries the Skywatch API for the area and time of interest. Returns dataframe with all images meeting the specified criteria. 2. Filters results to select the “best” image per day, if there are multiple images on a given day. 3. Visualize results in a variety of graphs. 4. Download selected images.

Code
import time
import warnings
import math
from pathlib import Path
import requests
import pandas as pd
import geopandas as gpd
import json
import datetime
import plotly.express as px
import plotly.graph_objects as go
import pickle
import numpy as np
import plotly.offline as pyo
pyo.init_notebook_mode()

File Paths and Query Parameters

Run the below two cells first, even if resuming process at a later stage.

Code
api_key = "" # Put skywatch api key here
aoi_name = "BeitBridge" # Name for AOI
o_start_date = "2018-01-01" # Overall start date for query
o_end_date = "2023-03-31" # Overall end date for query
cc_thresh = 25.0 # Maximum allowable image cloud cover percentage
coverage_thresh = 80.0 # Minimum AOI image coverage percentage
output_graphs = False # Option to output graphs in Stage 3 to files

aoi_file = Path("../../data/processed/Beitbridge_PS_AOI_Final.gpkg") # AOI geometry file used to query API

# Overall path to put all outputs
out_path = Path(f"../../data/tdi_demo_files/{aoi_name}")
out_path = out_path.resolve()
out_path.mkdir(parents=True, exist_ok=True)

# Path to put search queries
outpath_search_df = out_path.joinpath('search_queries')
outpath_search_df.mkdir(exist_ok=True)

outpath_download_df = out_path.joinpath('download_dataframes') # Path to put download dataframes
outpath_images = out_path.joinpath("images") # Base path where image files will be downloaded
Code
aoi_file = aoi_file.resolve()
aoi_data = gpd.read_file(aoi_file)
aoi_json = json.loads(aoi_data.to_json())

headers = {'Content-Type': 'application/json', 'x-api-key': api_key} # Used to construct queries. Do not change

if not api_key:
    print("No API key found. Beware that querying and downloading imagery will fail.")

Stage 1: Search API

This part of the notebook queries the Earthcache API for the specified AOI and TOI. Because of limitations of the API, long date ranges can fail to return all results. To get around this, the overall use-specified date range is split into multiple more manageable queries, with the results combined at the end.

Code
# Range of days to split up into separate queries. Skywatch API fails to return full results if too wide of a date range is used, so this is used as a workaround
# The separate queries are combined at the end of the process.
date_interval = 90

o_start_datetime = datetime.datetime.strptime(o_start_date, "%Y-%m-%d")
o_end_datetime = datetime.datetime.strptime(o_end_date, "%Y-%m-%d")
num_intervals = math.floor((o_end_datetime - o_start_datetime).days / date_interval)

date_list = []

if num_intervals > 0:
    starting_date = None
    for i in range(0, num_intervals):
        if not starting_date:
            starting_date = o_start_datetime
        ending_date = starting_date + datetime.timedelta(days=date_interval)

        if ending_date > o_end_datetime:
            ending_date = o_end_datetime

        date_list.append({"start_date": datetime.datetime.strftime(starting_date, "%Y-%m-%d"), "end_date": datetime.datetime.strftime(ending_date, "%Y-%m-%d")})
        starting_date = ending_date + datetime.timedelta(days=1)

    if ending_date < o_end_datetime:
        final_start_date = ending_date + datetime.timedelta(days=1)
        date_list.append({"start_date": datetime.datetime.strftime(final_start_date, "%Y-%m-%d"), "end_date": o_end_date})

else:
    date_list.append({"start_date": o_start_date, "end_date": o_end_date})
Code
def get_sat_type(sat_id, source):
    """
    Parses the type of Dove satellite from the image source returned from the Skywatch API as well as the satellite ID string.
    This is performed because the Skywatch API does not currently make any distinction between the Dove-Classic and Dove-R
    satellite platforms.

    Params:
    ---------------------
    sat_id: str
        identification string parsed from the image name
    source: str
        image source returned from the Skywatch API

    Returns:
    ---------------------
    sat_type: str
        satellite type for the input image

    """
    # List of dove-r satellite ID strings
    dove_r_list = ["1047", "1057", "1058", "1059", "105a", "105b", "105d", "105e", "105f", "1060", "1061", "1062", "1063", "1064", "1065", "1066", "1067", "1068", "1069", "106a", "106b", "106c", "106d", "106e", "106f"]

    sat_parse = sat_id.split('_')[-1]
    if source == "PlanetScope-SuperDove":
        sat_type = 'superdove'
    if source == "PlanetScope":
        sat_type = 'dove-c'
    
    if any(sat in sat_parse for sat in dove_r_list):
        sat_type = "dove-r"

    return sat_type

def create_search_query(start_date, end_date, aoi_json):
    """
    Creates a json query to use for searching the Skywatch API.

    Params:
    ---------------------
    start_date: str
        starting date string for query
    end_date: str
        ending date string for query
    aoi_json: geojson
        geojson geometry for query AOI

    Returns:
    ---------------------
    query: dict
        formatted query dictionary

    """
    query = {
            "location": {
                "type": "Polygon",
                "coordinates": aoi_json["features"][0]["geometry"]["coordinates"][0]
            },
            "start_date": start_date,
            "end_date": end_date,
            "resolution": ["medium"], # medium resolution = planetscope in skywatch
            "coverage": 50.0, # default mimimum aoi coverage of 50% for initial query, with a more stringent filter applied later
            "interval_length": 7,
            "order_by": "date"
        }
    
    return query

def search_catalog(query, headers):
    """
    Runs a POST request to create search query.

    Params:
    ---------------------
    query: dict
        formatted query dictionary
    headers: dict
        headers for query

    Returns:
    ---------------------
    search_id: str
        id for search query

    """
    search_url = "https://api.skywatch.co/earthcache/archive/search"
    search_resp = requests.post(url=search_url, data=json.dumps(query), headers=headers).json()
    search_id = search_resp['data']['id']
    
    return search_id

def get_search_results(search_id, headers):
    """
    GET request to retrieve search results for a specified query

    Params:
    ---------------------
    search_id: str
        id for search query
    headers: dict
        headers for query

    Returns:
    ---------------------
    search_resp_json: json
        json containing search results for query

    """
    search_url = f"https://api.skywatch.co/earthcache/archive/search/{search_id}/search_results"
    search_resp = requests.get(url=search_url, headers=headers)

    # Query results take a bit of time to populate after the initial POST request. This loop
    # repeats the get request every 5 seconds until results are available.
    if search_resp.status_code == 202:
        while search_resp.status_code == 202:
            print("Search still processing. Waiting 5 seconds.")
            time.sleep(5)
            search_resp = requests.get(url=search_url, headers=headers)
    
    search_resp_json = search_resp.json()
    
    return search_resp_json

def parse_results(search_resp, headers, search_id, df_list):
    """
    Parses through returned query results and creates pandas dataframes from them.

    Params:
    ---------------------
    search_resp: json
        query results json
    headers: dict
        headers for query
    search_id: str
        id for search query
    df_list: list
        list of search dataframes created from earlier search results
    
    Returns:
    ---------------------
    df_list: list
        list of search dataframes from previous and current search results
    """
    # Parses through search json results and populates dataframe with select image attributes
    for item in search_resp["data"]:
        datestr = item['start_time'].split("T")[0]
        date_obj = datetime.datetime.strptime(datestr, "%Y-%m-%d")
        year = date_obj.year
        month = date_obj.month
        isodate = date_obj.isocalendar()
        if isodate[0] == (year - 1) and isodate[1] == 52:
            week = 1
        else:
            week = isodate[1]
        
        sat_type = get_sat_type(item['product_name'], item['source'])
        item_df = pd.DataFrame([{"search_id": search_id, "id": item['id'], "product_name": item['product_name'], "datestr": datestr, 
                                "date": date_obj, "year": year, "month": month, "week": week, "source": item['source'], "sat_type": sat_type, "area_sq_km": 
                                item['area_sq_km'], "cloud_cover": item['result_cloud_cover_percentage'], "aoi_coverage": item["location_coverage_percentage"],
                                "cost": item['cost'], "preview": item["preview_uri"]}])
        df_list.append(item_df)
    
    # Check for pagination of search results
    try:
        cursor = search_resp['pagination']['cursor']['next']
    except:
        cursor = None
    
    # If results are paginated, perform additional GET queries and create new dataframes until no more pages are left
    while cursor:
        search_url3 = f"https://api.skywatch.co/earthcache/archive/search/{search_id}/search_results?cursor={cursor}"
        search_resp_2 = requests.get(url=search_url3, headers=headers).json()
        for item in search_resp_2["data"]:
            datestr = item['start_time'].split("T")[0]
            date_obj = datetime.datetime.strptime(datestr, "%Y-%m-%d")
            year = date_obj.year
            month = date_obj.month
            isodate = date_obj.isocalendar()
            if isodate[0] == (year - 1) and isodate[1] == 52:
                week = 1
            else:
                week = isodate[1]

            sat_type = get_sat_type(item['product_name'], item['source'])
            item_df = pd.DataFrame([{"search_id": search_id, "id": item['id'], "product_name": item['product_name'], "datestr": datestr, 
                                    "date": date_obj, "year": year, "month": month, "week": week, "source": item['source'], "sat_type": sat_type, "area_sq_km": 
                                    item['area_sq_km'], "cloud_cover": item['result_cloud_cover_percentage'], "aoi_coverage": item["location_coverage_percentage"],
                                    "cost": item['cost'], "preview": item["preview_uri"]}])
            df_list.append(item_df)
        
        try:
            cursor = search_resp_2['pagination']['cursor']['next']
        except:
            cursor = None
    
    return df_list

# This loop iterates through the shorter date ranges created in the previous cell,
# creating new search queries and retrieving results. The returned json is then parsed
# to create Pandas dataframes, which are captured in a list

df_list = [] # Empty list used to hold the separate results dataframes
for entry in date_list:
    start_date = entry['start_date']
    end_date = entry['end_date']
    print(f"running search query for date range: {start_date} to {end_date}")
    query = create_search_query(start_date, end_date, aoi_json)
    search_id = search_catalog(query, headers)
    search_resp = get_search_results(search_id, headers)
    df_list = parse_results(search_resp, headers, search_id, df_list)
Code
# This cell combines the returned data from the separate API queries into a single dataframe
def combine_query_dataframes(df_list):
    """
    Combined separate results dataframes into a single dataframe.

    Params:
    ---------------------
    df_list: list
        list of separate dataframes created from query results

    Returns:
    ---------------------
    search_df: pandas dataframe
        combined dataframe containing all search results
    """
    search_df = pd.concat(df_list)
    search_df.reset_index(drop=True, inplace=True)
    search_df['year'] = search_df['year'].astype('category')
    search_df['month'] = search_df['month'].astype('category')
    search_df['week'] = search_df['week'].astype('category')

    # this line adds a new field with an image preview html link, which is used in one of the interactive graphs in Stage 3
    search_df["preview_html"] = search_df.apply(lambda x: f'<a href=\"{x["preview"]}\">Preview</a>', axis=1)

    return search_df

search_df = combine_query_dataframes(df_list)
Code
# Put combined search dataframe into pickle file to reload later
search_pickle = outpath_search_df.joinpath(f'{aoi_name}_{o_start_date}_to_{o_end_date}_search_df.pkl')
search_df.to_pickle(search_pickle)

Stage 2: Filter Query Dataframe.

This stage of the process filters down the query results from Stage 1. Continue from here if you ran Stage 1 previously.

Code
# Read in query pickle file
search_pickle = outpath_search_df.joinpath(f'{aoi_name}_{o_start_date}_to_{o_end_date}_search_df.pkl')
search_df = pd.read_pickle(search_pickle)
Code
def filter_dates(df):
    """
    If more than one image exists for a given day, this function will select the "best" image for the day.
    First, images with poor AOI coverage are removed. If multiple images remain after this step, then the
    images are filtered by sensor type, with superdove and dove-r being prioritized over dove-classic.
    Finally, if multiple images still remain, the image with the highest AOI coverage is selected.

    Params:
    ---------------------
    df: pandas dataframe
        subset dataframe containing only the images for the specific date

    Returns:
    ---------------------
    select_id: str
        image id string for the selected image
    """

    top_coverage = df.nlargest(1,'aoi_coverage')["aoi_coverage"].tolist()[0]
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        df['cover_percentmax'] = df.apply(lambda row: (row["aoi_coverage"] / top_coverage), axis=1)
        
    df = df.loc[(df["cover_percentmax"]>=0.9)]

    if 'superdove' in df['sat_type'].unique() or 'dove-r' in df['sat_type']:
        df = df.loc[(df['sat_type'] == 'superdove') | (df['sat_type'] == 'dove-r')]

    df = df.nlargest(1,'aoi_coverage')
    select_id = df['id'].values[0]

    return select_id

def filter_dataframe(df, cloud_cover_thresh, coverage_thresh):
    """
    Filters down initial query results based on cloud cover, aoi coverage, and selecting only one
    image per day.

    Params:
    ---------------------
    df: pandas dataframe
        dataframe containing full image search results
    cloud_cover_thresh: float
        maximum cloud cover percent threshold
    coverage_thresh: float
        minimum aoi area coverage percent threshold

    Returns:
    ---------------------
    filtered_df: pandas dataframe
        filtered image dataframe
    """
    filtered_df = df.copy(deep=True)
    filtered_df = filtered_df.loc[(search_df["cloud_cover"]<=cloud_cover_thresh) & (filtered_df["aoi_coverage"] >= coverage_thresh)]

    id_list = []
    unique_dates = filtered_df.datestr.unique().tolist()
    for date_ in unique_dates:
        filtered_df2 = filtered_df.loc[filtered_df["datestr"]==date_]
        if len(filtered_df2) > 1:
            select_id = filter_dates(filtered_df2)
        else:
            select_id = filtered_df2['id'].values[0]
        
        id_list.append(select_id)

    filtered_df = filtered_df[filtered_df['id'].isin(id_list)]

    filtered_df.reset_index(inplace=True, drop=True)
    return filtered_df

filtered_search = filter_dataframe(search_df, cc_thresh, coverage_thresh)
Code
# Save filtered dataframe to pickle file for subsequent process stages
filter_search_pickle = outpath_search_df.joinpath(f'{aoi_name}_{o_start_date}_to_{o_end_date}_search_df_filtered.pkl')
filtered_search.to_pickle(filter_search_pickle)

Stage 3: Graph Generation

This stage is entirely optional but can be used to visualize the results of the query prior to download. The results of the query are displayed in a variety of interactive graphs

Code
# Path to put put figures
outpath_figs = outpath_search_df.joinpath('figures')
outpath_figs.mkdir(exist_ok=True)
Code
# Read filtered search dataframe from pickle file
filter_search_pickle = outpath_search_df.joinpath(f'{aoi_name}_{o_start_date}_to_{o_end_date}_search_df_filtered.pkl')
filtered_search = pd.read_pickle(filter_search_pickle)
Code
# Graph 1: monthly bar graphs showing the number of images per month broken down by cloud cover percentage
def create_month_cc_graph(df, target_year, output_graph=False):
    """
    Creates interactive bar plots showing the number of images in a given year, broken down by month
    and by cloud cover percentage.

    Params:
    ---------------------
    df: pandas dataframe
        query dataframe containing image information
    target_year: int
        year to produce graph for (will cause an error if no images present for specified year)
    output_graph: bool
        if set to True, graph will be saved to file as an html and png file
    """
    
    target_df = df.loc[df["year"] == target_year]
    min_date = target_df.iloc[0]['datestr']
    max_date = target_df.iloc[-1]['datestr']

    month_df = pd.DataFrame()
    month_df["0% clouds"] = target_df.loc[target_df['cloud_cover'] == 0]['month'].value_counts()
    month_df["<=2% clouds"] = target_df.loc[(target_df['cloud_cover'] > 0) & (target_df['cloud_cover'] <= 2)]['month'].value_counts()
    month_df["<=5% clouds"] = target_df.loc[(target_df['cloud_cover'] > 2) & (target_df['cloud_cover'] <= 5)]['month'].value_counts()
    month_df["<=10% clouds"] = target_df.loc[(target_df['cloud_cover'] > 5) & (target_df['cloud_cover'] <= 10)]['month'].value_counts()
    month_df[">10% clouds"] = target_df.loc[target_df['cloud_cover'] > 10]['month'].value_counts()

    month_df.reset_index(inplace=True)
    month_df.rename(columns={'index': 'month'}, inplace=True)
    month_df.sort_values('month', inplace=True)
    month_df.reset_index(inplace=True, drop=True)

    month_fig = px.bar(month_df, x='month', y=['0% clouds', '<=2% clouds', '<=5% clouds', '<=10% clouds', '>10% clouds'], color_discrete_map={'0% clouds': 'green','<=2% clouds': 'yellow', "<=5% clouds": "orange", "<=10% clouds": "red", ">10% clouds": "maroon"})
    month_fig.update_layout(title_text=f"PlanetScope Cloud Cover By Month - {target_year} - {aoi_name}", title_x=0.5, yaxis_title="Image Count", xaxis_title="Month of Year")

    filenamestr = f"PlanetScope_Monthly_Image_Count_{min_date}_to_{max_date}_{aoi_name}"
    if output_graph:
        out_png = outpath_figs.joinpath(f"{filenamestr}.png")
        out_html = outpath_figs.joinpath(f"{filenamestr}.html")
        month_fig.write_image(out_png)
        month_fig.write_html(out_html)

    pyo.iplot(month_fig)

create_month_cc_graph(filtered_search, target_year=2022, output_graph=output_graphs)
Code
# Graph 2: Weekly bar graphs showing the number of images per month broken down by cloud cover percentage
def create_week_cc_graph(df, target_year, output_graph=False):
    """
    Creates interactive bar plots showing the number of images in a given year, broken down by the week of year
    and by cloud cover percentage.

    Params:
    ---------------------
    df: pandas dataframe
        query dataframe containing image information
    target_year: int
        year to produce graph for (will cause an error if no images present for specified year)
    output_graph: bool
        if set to True, graph will be saved to file as an html and png file
    """
    target_df = df.loc[df["year"] == target_year]
    min_date = target_df.iloc[0]['datestr']
    max_date = target_df.iloc[-1]['datestr']
    week_df = pd.DataFrame()
    week_df["0% clouds"] = target_df.loc[target_df['cloud_cover'] == 0]['week'].value_counts()
    week_df["<=2% clouds"] = target_df.loc[(target_df['cloud_cover'] > 0) & (target_df['cloud_cover'] <= 2)]['week'].value_counts()
    week_df["<=5% clouds"] = target_df.loc[(target_df['cloud_cover'] > 2) & (target_df['cloud_cover'] <= 5)]['week'].value_counts()
    week_df["<=10% clouds"] = target_df.loc[(target_df['cloud_cover'] > 5) & (target_df['cloud_cover'] <= 10)]['week'].value_counts()
    week_df[">10% clouds"] = target_df.loc[target_df['cloud_cover'] > 10]['week'].value_counts()

    week_df.reset_index(inplace=True)
    week_df.rename(columns={'index': 'week'}, inplace=True)
    week_df.sort_values('week', inplace=True)
    week_df.reset_index(inplace=True, drop=True)

    week_fig = px.bar(week_df, x='week', y=['0% clouds', '<=2% clouds', '<=5% clouds', '<=10% clouds', '>10% clouds'], color_discrete_map={'0% clouds': 'green','<=2% clouds': 'yellow', "<=5% clouds": "orange", "<=10% clouds": "red", ">10% clouds": "maroon"})
    week_fig.update_layout(title_text=f"PlanetScope Cloud Cover By Week - {min_date} to {max_date} - {aoi_name}", title_x=0.5, yaxis_title="Image Count", xaxis_title="Week of Year")

    filenamestr = f"PlanetScope_Weekly_Image_Count_{min_date}_to_{max_date}_{aoi_name}"
    if output_graph:
        out_png = outpath_figs.joinpath(f"{filenamestr}.png")
        out_html = outpath_figs.joinpath(f"{filenamestr}.html")
        week_fig.write_image(out_png)
        week_fig.write_html(out_html)

    pyo.iplot(week_fig)

create_week_cc_graph(filtered_search, target_year=2022, output_graph=output_graphs)
Code
# Graph 3: Annual bar chart showing image counts per year broken down by cloud cover percentage.
# Use this to display the number of images across multiple years in the query, if applicable.
def create_year_cc_graph(df, output_graph=False):
    """
    Creates interactive bar plots showing the number of images in a given year across multiple years,
    broken down by cloud cover percentage.

    Params:
    ---------------------
    df: pandas dataframe
        query dataframe containing image information
    output_graph: bool
        if set to True, graph will be saved to file as an html and png file
    """
    target_df = df
    min_year = target_df.iloc[0]['year']
    max_year = target_df.iloc[-1]['year']
    avg_price = target_df["cost"].mean()
    year_df = pd.DataFrame()
    year_df["0% clouds"] = target_df.loc[target_df['cloud_cover'] == 0]['year'].value_counts()
    year_df["<=2% clouds"] = target_df.loc[(target_df['cloud_cover'] > 0) & (target_df['cloud_cover'] <= 2)]['year'].value_counts()
    year_df["<=5% clouds"] = target_df.loc[(target_df['cloud_cover'] > 2) & (target_df['cloud_cover'] <= 5)]['year'].value_counts()
    year_df["<=10% clouds"] = target_df.loc[(target_df['cloud_cover'] > 5) & (target_df['cloud_cover'] <= 10)]['year'].value_counts()
    year_df[">10% clouds"] = target_df.loc[(target_df['cloud_cover'] > 10)]['year'].value_counts()
    year_df["0% clouds cost"] = year_df.apply(lambda row: round((row["0% clouds"] * avg_price), 2), axis=1)
    year_df["<=2% clouds cost"] = year_df.apply(lambda row: round((row["<=2% clouds"] * avg_price), 2), axis=1)
    year_df["<=5% clouds cost"] = year_df.apply(lambda row: round((row["<=5% clouds"] * avg_price), 2), axis=1)
    year_df["<=10% clouds cost"] = year_df.apply(lambda row: round((row["<=10% clouds"] * avg_price), 2), axis=1)
    year_df[">10% clouds cost"] = year_df.apply(lambda row: round((row[">10% clouds"] * avg_price), 2), axis=1)

    year_df.reset_index(inplace=True)
    year_df.rename(columns={'index': 'year'}, inplace=True)
    year_df.sort_values('year', inplace=True)
    year_df.reset_index(inplace=True, drop=True)

    h_template='''Image Count 0% Clouds: %{customdata[0]} Estimated Cost: %{customdata[5]}<br>
                Image Count <=2% Clouds: %{customdata[1]} Estimated Cost: %{customdata[6]}<br>
                Image Count <=5% Clouds: %{customdata[2]} Estimated Cost: %{customdata[7]}<br>
                Image Count <=10% Clouds: %{customdata[3]} Estimated Cost: %{customdata[8]}<br>
                Image Count >10% Clouds: %{customdata[4]} Estimated Cost: %{customdata[9]}'''
    year_fig = px.bar(year_df, x='year', y=['0% clouds', '<=2% clouds', '<=5% clouds', '<=10% clouds', '>10% clouds'], 
                      color_discrete_map={'0% clouds': 'green','<=2% clouds': 'yellow', "<=5% clouds": "orange", "<=10% clouds": "red", ">10% clouds": "maroon"}, 
                      custom_data=np.stack((year_df['0% clouds'], year_df['<=2% clouds'], year_df['<=5% clouds'], year_df['<=10% clouds'], year_df['>10% clouds'], 
                                            year_df['0% clouds cost'], year_df['<=2% clouds cost'], year_df['<=5% clouds cost'], year_df['<=10% clouds cost'], year_df['>10% clouds cost'])))
    
    year_fig.update_layout(title_text=f"PlanetScope Annual Image Counts and Cost - {min_year} to {max_year} - {aoi_name}", title_x=0.5, yaxis_title="Image Count", xaxis_title="Year")
    year_fig.update_traces(hovertemplate=h_template)

    filenamestr = f"PlanetScope_Annual_Image_Count_and_Cost_{min_year}_to_{max_year}_{aoi_name}"
    if output_graph:
        out_png = outpath_figs.joinpath(f"{filenamestr}.png")
        out_html = outpath_figs.joinpath(f"{filenamestr}.html")
        year_fig.write_image(out_png)
        year_fig.write_html(out_html)

    pyo.iplot(year_fig)

    return year_df

year_df = create_year_cc_graph(filtered_search, output_graph=output_graphs)
Code
# Graph 4: Monthly bar chart showing monthly image counts broken down by year. User must specify a threshold for cloud cover
def create_month_multiyear_graph(df, cc_threshold, output_graph=False):
    """
    Creates a monthly bar chart to display image counts per month, broken down by imaging year.
    Image counts are determined based on the input cloud cover threshold.

    Params:
    ---------------------
    df: pandas dataframe
        query dataframe containing image information
    cc_threshold: float
        cloud cover percentage threshold. Images with cloud cover exceeding this number will not be counted
    output_graph: bool
        if set to True, graph will be saved to file as an html and png file
    """
    
    min_year = df.iloc[0]['year']
    max_year = df.iloc[-1]['year']

    month_df = pd.DataFrame()
    col_list = []
    for year in df.year.unique().tolist():
        target_df = df.loc[df["year"] == year]
        col_name = f"{year}_counts"
        month_df[col_name] = target_df.loc[target_df['cloud_cover'] <= cc_threshold]['month'].value_counts()
        col_list.append(col_name)

    month_df.reset_index(inplace=True)
    month_df.rename(columns={'index': 'month'}, inplace=True)
    month_df.sort_values('month', inplace=True)
    month_df.reset_index(inplace=True, drop=True)

    month_fig = px.bar(month_df, x='month', y=col_list, barmode='group')
    month_fig.update_layout(title_text=f"PlanetScope Image Count By Year and Month - {cc_threshold}% Cloud Cover - {min_year} to {max_year} - {aoi_name}", title_x=0.5, yaxis_title="Image Count", xaxis_title="Month of Year")

    filenamestr = f"PlanetScope_Image_Count_Annual_Monthly_{cc_threshold}%CC_{min_year}_to_{max_year}_{aoi_name}"
    if output_graph:
        out_png = outpath_figs.joinpath(f"{filenamestr}.png")
        out_html = outpath_figs.joinpath(f"{filenamestr}.html")
        month_fig.write_image(out_png)
        month_fig.write_html(out_html)

    pyo.iplot(month_fig)

create_month_multiyear_graph(filtered_search, cc_threshold=0, output_graph=output_graphs)
Code
# Graph 5: Scatter plot showing individual images, with date on the x-axis and cloud cover percentage
# on the y-axis. Hovering over a point displays image information, including a preview link to display the image thumbnail
def create_image_preview_graph(df, output_graph=False):
    """
    Creates an interactive scatterplot displaying individual images, with the image date on the 
    x-axis and cloud cover percentage on the y-axis.

    Params:
    ---------------------
    df: pandas dataframe
        query dataframe containing image information
    output_graph: bool
        if set to True, graph will be saved to file as an html and png file
    """
    layout = go.Layout(title=f'PlanetScope Images - {o_start_date} to {o_end_date} - {aoi_name}', xaxis=dict(title="Image Date"), yaxis=dict(title="Cloud Cover Percentage"))#, autosize=False, width=1200, height=510)

    fig = go.Figure(layout=layout)

    h_template='Image Date: %{customdata[0]}<br>%{customdata[3]}<br>Cloud Cover: %{customdata[2]}%<br>Image Source: %{customdata[1]}'

    fig.add_trace(go.Scatter(
        y=df['cloud_cover'],
        x=df['date'],
        mode='markers',
        orientation='h',
        marker=dict(
            color='red'),
        customdata=np.stack((df['datestr'], df['source'], df['cloud_cover'], df['preview_html']), axis=-1),
        hovertemplate=h_template
    ))

    fig.update_layout(title={'x': 0.5, 'xanchor': 'center'})

    filenamestr = outpath_figs.joinpath(f"PlanetScope_Image_Previews_{o_start_date}_to_{o_end_date}_{aoi_name}")
    if output_graph:
        out_png = outpath_figs.joinpath(f"{filenamestr}.png")
        out_html = outpath_figs.joinpath(f"{filenamestr}.html")
        fig.write_image(out_png)
        fig.write_html(out_html)

    pyo.iplot(fig)

create_image_preview_graph(filtered_search, output_graph=output_graphs)
Code
# Graph 6: Monthly bar chart showing monthly image counts for a given year broken down by satellite platform (dove-classic, dove-r, and superdove). 
# User must specify a threshold for cloud cover
def create_dove_monthly_graph(df, target_year, cc_threshold, output_graph):
    """
    Creates a monthly bar chart to display image counts per month for a specified year, broken down by satellite platform
    Image counts are determined based on the input cloud cover threshold.

    Params:
    ---------------------
    df: pandas dataframe
        query dataframe containing image information
    target_year: int
        year to produce graph for (will cause an error if no images present for specified year)
    cc_threshold: float
        cloud cover percentage threshold. Images with cloud cover exceeding this number will not be counted
    output_graph: bool
        if set to True, graph will be saved to file as an html and png file
    """
    
    target_df = df.loc[df["year"] == target_year]

    month_df = pd.DataFrame()
    col_list = []
    for datasource in df.sat_type.unique().tolist():
        if datasource == "dove-c":
            col_name = "Dove-Classic"
        elif datasource == "dove-r":
            col_name = "Dove-R"
        elif datasource == "superdove":
            col_name = "SuperDove"
        
        month_df[col_name] = target_df.loc[(target_df['cloud_cover'] == cc_threshold) & (target_df['sat_type'] == datasource)]['month'].value_counts()
        col_list.append(col_name)

    month_df.reset_index(inplace=True)
    month_df.rename(columns={'index': 'month'}, inplace=True)
    month_df.sort_values('month', inplace=True)
    month_df.reset_index(inplace=True, drop=True)

    month_fig = px.bar(month_df, x='month', y=col_list)
    month_fig.update_layout(title_text=f"PlanetScope Image Count By Month and By Satellite Type - {cc_threshold}% Cloud Cover - {target_year} - {aoi_name}", title_x=0.5, yaxis_title="Image Count", xaxis_title="Month of Year")

    filenamestr = f"PlanetScope_Image_Count_Monthly_{cc_threshold}%CC_SatelliteType_{target_year}_{aoi_name}"
    if output_graph:
        out_png = outpath_figs.joinpath(f"{filenamestr}.png")
        out_html = outpath_figs.joinpath(f"{filenamestr}.html")
        month_fig.write_image(out_png)
        month_fig.write_html(out_html)

    pyo.iplot(month_fig)

create_dove_monthly_graph(filtered_search, target_year=2021, cc_threshold=0, output_graph=output_graphs)
Code
# Graph 7: Annual bar chart showing image counts per year broken down by satellite platform.
# Use this to display the number of images from different satellites across multiple years in the query, if applicable.
def create_dove_year_graph(df, cc_threshold, output_graph):
    """
    Creates a yearly bar chart to display image counts per year over multiple years, broken down by satellite platform
    Image counts are determined based on the input cloud cover threshold.

    Params:
    ---------------------
    df: pandas dataframe
        query dataframe containing image information
    cc_threshold: float
        cloud cover percentage threshold. Images with cloud cover exceeding this number will not be counted
    output_graph: bool
        if set to True, graph will be saved to file as an html and png file
    """
    target_df = df
    min_year = df.iloc[0]['year']
    max_year = df.iloc[-1]['year']
  
    year_df = pd.DataFrame()
    year_df["Dove-Classic"] = target_df.loc[(target_df['cloud_cover'] <= 5) & (target_df["sat_type"] == "dove-c")]['year'].value_counts()
    year_df["Dove-R"] = target_df.loc[(target_df['cloud_cover'] <= 5) & (target_df["sat_type"] == "dove-r")]['year'].value_counts()
    year_df["SuperDove"] = target_df.loc[(target_df['cloud_cover'] <= 5) & (target_df["sat_type"] == "superdove")]['year'].value_counts()
    
    year_df.reset_index(inplace=True)
    year_df.rename(columns={'index': 'year'}, inplace=True)
    year_df.sort_values('year', inplace=True)
    year_df.reset_index(inplace=True, drop=True)

    year_fig = px.bar(year_df, x='year', y=["Dove-Classic", "Dove-R", "SuperDove"])
    year_fig.update_layout(title_text=f"PlanetScope Image Count By Year and By Satellite Type - {cc_threshold}% Cloud Cover - {min_year} to {max_year} - {aoi_name}", title_x=0.5, yaxis_title="Image Count", xaxis_title="Year")

    filenamestr = f"PlanetScope_Annual_Image_Count_{cc_threshold}_%CC_SatelliteType_{min_year}_to_{max_year}_{aoi_name}"
    if output_graph:
        out_png = outpath_figs.joinpath(f"{filenamestr}.png")
        out_html = outpath_figs.joinpath(f"{filenamestr}.html")
        year_fig.write_image(out_png)
        year_fig.write_html(out_html)

    pyo.iplot(year_fig)

    return year_df

year_df = create_dove_year_graph(filtered_search, cc_threshold=0, output_graph=output_graphs)

Stage 4: Download Imagery

In this stage, the images returned by the query in Stage 1 are downloaded. The user may input final criteria to further subset the images to retrieve based on cloud cover, start and end dates, AOI coverage, and satellite type

Code
# Read filtered search dataframe from pickle file
filter_search_pickle = outpath_search_df.joinpath(f'{aoi_name}_{o_start_date}_to_{o_end_date}_search_df_filtered.pkl')
filtered_search = pd.read_pickle(filter_search_pickle)
Code
# Specify final image download selection criteria here. The original query criteria can also be used by setting all variables to None
start_date_download = None # Start date for image download (string in the format "YYYY-MM-DD")
end_date_download = None # End date for image download (string in the format "YYYY-MM-DD")
cc_thresh_download = None # Maximum cloud cover percentage to use for image download
aoi_coverage_download = None # Minimum AOI coverage percentage to use for image download
sat_list = None # List of satellite types to download. Options are 'dove-c', 'dove-r', and 'superdove'. Note: TDI notebook currently only supports superdove

# Examples of user-specified criteria for image download
# max_cc_download = 0.0
# start_date_download = "2022-01-01"
# end_date_download = "2023-03-31"
# aoi_coverage_download = 90.0
# sat_list = ['superdove']

# If one of the above download criteria is not specified, the original query parameter is used.
if start_date_download is None:
    start_date_download = o_start_date
if end_date_download is None:
    end_date_download = o_end_date
if cc_thresh_download is None:
    cc_thresh_download = cc_thresh
if aoi_coverage_download is None:
    aoi_coverage_download = coverage_thresh
if sat_list is None:
    sat_list = ['dove-c', 'dove-r', 'superdove']
    

start_date_final = datetime.datetime.strptime(start_date_download, "%Y-%m-%d")
end_date_final = datetime.datetime.strptime(end_date_download, "%Y-%m-%d")

# Path to put downloaded images
outpath_dl = outpath_images.joinpath(f"{aoi_name}_{start_date_download}_to_{end_date_download}")
outpath_dl.mkdir(parents=True, exist_ok=True)

outpath_download_df.mkdir(parents=True, exist_ok=True)

filtered_download_pickle = outpath_download_df.joinpath(f'{aoi_name}_{start_date_download}_to_{end_date_download}_download_df.pkl')
filtered_download_resp_pickle = outpath_download_df.joinpath(f'{aoi_name}_{start_date_download}_to_{end_date_download}_download_resp_dict.pkl')

pl_resp_dict = {}
Code

#Filter results down to final download list based on above criteria
filtered_search_final = filtered_search.copy(deep=True)
filtered_search_final = filtered_search_final.loc[(filtered_search_final['date'] >= start_date_final) 
                                                  & (filtered_search_final['date'] <= end_date_final) 
                                                  & (filtered_search_final['cloud_cover'] <= cc_thresh_download)
                                                  & (filtered_search_final['aoi_coverage'] >= aoi_coverage_download)
                                                  & (filtered_search_final['sat_type'].isin(sat_list))]

filtered_search_final['pl_id'] = ""
filtered_search_final['pl_status'] = ""

filtered_search_final.to_pickle(filtered_download_pickle)

total_cost = round(filtered_search_final['cost'].sum(), 2)
num_images = len(filtered_search_final)
print(f"Total number of images to download: {num_images}. Total cost: ${total_cost} USD")
Total number of images to download: 163. Total cost: $1205.88 USD
Code
# Query endpoint to get different output types and build dictionary of selectable output types for download
output_id_url = f"https://api.skywatch.co/earthcache/outputs"
output_id_resp = requests.get(url=output_id_url, headers=headers).json()
output_type_dict = {}
for output in output_id_resp['data']:
    output_name = output['name']
    output_id = output['id']
    output_type_dict[output_name] = output_id

output_type_dict.keys()
dict_keys(['NDVI', 'False Colour Infrared', 'Panchromatic', 'Near-infrared', 'MSAVI2', 'EVI', 'False Colour Urban', 'True Colour Image', 'SAR Terrain Flattened', 'RGB+NIR', 'NDWI2', 'All Optical Bands', 'SAR Terrain Corrected'])
Code
# Set desired output type to download based on the output from the previous cell
# Note: leave as "All Optical Bands" if using the TDI notebook after image download
im_download_product = 'All Optical Bands'
output_id = output_type_dict[im_download_product]
Code
def create_pipeline_query(pipeline_name, search_id, image_id, output_id):
    """
    Creates a json query to use for creating an image download pipeline in the Skywatch API.

    Params:
    ---------------------
    pipeline_name: str
        Name of the image download pipeline
    search_id: str
        id for search query where image appears
    image_id: str
        id for image
    output_id: str
        id string for selected image product to download
    

    Returns:
    ---------------------
    query: dict
        formatted query dictionary
    """
    query = {
            "name": pipeline_name,
            "search_id": search_id,
            "search_results": image_id,
            "output": {
                "id": output_id,
                "format": "geotiff",
                "mosaic": "off"
            },
            }
    
    return query

def post_pipeline(query, headers):
    """
    Runs a POST request to create image download pipeline.

    Params:
    ---------------------
    query: dict
        formatted query dictionary
    headers: dict
        headers for query
        
    Returns:
    ---------------------
    pipeline_id: str
        id string for download pipeline

    """
    pipeline_url = "https://api.skywatch.co/earthcache/pipelines"
    pipeline_resp = requests.post(url=pipeline_url, data=json.dumps(query), headers=headers).json()
    pipeline_id = pipeline_resp['data']['id']
    
    return pipeline_id

def get_pipeline(pipeline_id):
    """
    Get request to get the status of an image download pipeline.

    Params:
    ---------------------
    pipeline_id: str
        id string for download pipeline

    Returns:
    ---------------------
    pipeline_get_resp: requests response object
        image download pipeline response object

    """
    pipeline_get_url = f"https://api.skywatch.co/earthcache/interval_results?pipeline_id={pipeline_id}"
    pipeline_get_resp = requests.get(url=pipeline_get_url, headers=headers)

    return pipeline_get_resp

WARNING: Running the below cell will result in charges being applied to your Earthcache account!

Code
# Iterates through all images in image download dataframe and creates new image download pipelines for each.
for index, row in filtered_search_final.iterrows():
    time.sleep(1)
    search_id = row["search_id"]
    image_id = row["id"]
    product_name = row["product_name"]
    print(f"creating download pipeline for image: {product_name}")
    pipeline_name = f"{aoi_name}_{product_name}" #Pipeline name, which shows up in the Earthcache console. Default combines the AOI name and the image name
    pl_query = create_pipeline_query(pipeline_name, search_id, image_id, output_id)
    pl_id = post_pipeline(pl_query, headers)
    filtered_search_final.loc[index, 'pl_id'] = pl_id # Appends the image download pipeline ID to the dataframe
Code
# Once the image download dataframe has been updated with the download pipeline IDs, 
# we save it to a pickle file in case notebook terminates before images are downloaded
filtered_search_final.to_pickle(filtered_download_pickle)
Code
# Retrieve filtered download dataframe and dictionary containing json response objects in case
# notebook was terminated before images could be downloaded. Resume from here in that case.

if filtered_download_pickle.exists():
    filtered_search_final = pd.read_pickle(filtered_download_pickle)

if filtered_download_resp_pickle.exists():
    with open(filtered_download_resp_pickle, 'rb') as f:
        pl_resp_dict = pickle.load(f)
Code
def query_pipeline(pl_id):
    """
    Queries the image download pipeline to check processing status and retrieve pipeline json
    response containing image download links for completed pipelines.

    Params:
    ---------------------
    pl_id: str
        id string for download pipeline

    Returns:
    ---------------------
    pl_status: str
        processing status of pipeline
    pl_resp_json: dict
        json response object from pipeline query. For completed jobs this will contain
        links for image download.
    """
    pl_resp = get_pipeline(pl_id)
    pl_resp_json = pl_resp.json()
    pl_status = pl_resp_json['data'][0]['status']
    return pl_status, pl_resp_json

# Iterates through all images in image download dataframe and checks the status of the image 
# download pipelines and saves the response json to a dictionary.
# If any image download pipelines are still processing, the loop will repeat after a delay 
# which gets progressively shorter. The initial delay is set to 30 minutes. The delay is progressively 
# reduced, with a minimum delay of 1 minute being imposed. The loop repeats until the status of 
# all download pipelines have changed to completed.

num_iterations = 1
results_processing = True
while results_processing:
    status_list = []
    print('Checking status of image download pipelines.')
    for index, row in filtered_search_final.iterrows():
        time.sleep(1)
        pl_status = row['pl_status']
        pl_id = row['pl_id']
        if pl_status != 'complete':
            pl_status, pl_resp_json = query_pipeline(pl_id)
            filtered_search_final.loc[index, 'pl_status'] = pl_resp_json['data'][0]['status']
            pl_resp_dict[pl_id] = pl_resp_json
        
        status_list.append(pl_status)

    filtered_search_final.to_pickle(filtered_download_pickle)
    with open(filtered_download_resp_pickle, 'wb') as f:
        pickle.dump(pl_resp_dict, f)
    
    if 'complete' in set(status_list) and len(set(status_list)) == 1:
        results_processing = False
        print('All image pipelines are finished and ready for download!')
    else:
        wait_time = 1800 / num_iterations
        if wait_time < 60:
            wait_time = 60
        wait_time_mins = round((wait_time / 60), 0)
        print(f"Results still pending for some items. Waiting for {wait_time_mins} mins and trying again.")
        num_iterations += 2
        time.sleep(wait_time)
Code
def download_file(url, out_name):
    """
    Downloads files using the provided URL.

    Params:
    ---------------------
    url: str
        url path to the file
    out_name: str
        output file name for the local file download

    Returns:
    ---------------------

    """
    with requests.get(url, stream=True) as r:
        r.raise_for_status()
        with open(out_name, 'wb') as f:
            for chunk in r.iter_content(chunk_size=None):
                f.write(chunk)

# Iterates through all images in image download dataframe and downloads image and metadata
for index, row in filtered_search_final.iterrows():
    product_name = row['product_name']
    pl_id = row['pl_id']
    pl_resp_json = pl_resp_dict[pl_id]
    results = pl_resp_json['data'][0]['results'][0]

    # Dict of image files to download. Currently downloads image tif file and metadata json file.
    # Cloud mask file currently commented out, as it is not used in subsequent processes.
    download_dict = {'image': {'url': results['analytics_url'], 'extension': 'analytic.tif'},
                    'metadata': {'url': results['metadata_url'], 'extension': 'metadata.json'}}
                    # 'cloud_mask': {'url': results['raster_files'][0]['uri'], 'extension': 'mask.tif'}}

    out_basepath = outpath_dl.joinpath(product_name) # Files for each image are placed in a folder named after the image file
    out_basepath.mkdir(exist_ok=True)

    print(f'downloading all files for image: {product_name}')
    for key, value in download_dict.items():
        dl_url = value['url']
        extension = value['extension']
        out_fname = out_basepath.joinpath(f'{product_name}_{extension}')
        download_file(dl_url, out_fname)