This notebook is used to query and download PlanetScope imagery using the Skywatch EarthCache API within a specified AOI and date range. The notebook performs the following tasks: 1. Queries the Skywatch API for the area and time of interest. Returns dataframe with all images meeting the specified criteria. 2. Filters results to select the “best” image per day, if there are multiple images on a given day. 3. Visualize results in a variety of graphs. 4. Download selected images.
Code
import timeimport warningsimport mathfrom pathlib import Pathimport requestsimport pandas as pdimport geopandas as gpdimport jsonimport datetimeimport plotly.express as pximport plotly.graph_objects as goimport pickleimport numpy as npimport plotly.offline as pyopyo.init_notebook_mode()
File Paths and Query Parameters
Run the below two cells first, even if resuming process at a later stage.
Code
api_key =""# Put skywatch api key hereaoi_name ="BeitBridge"# Name for AOIo_start_date ="2018-01-01"# Overall start date for queryo_end_date ="2023-03-31"# Overall end date for querycc_thresh =25.0# Maximum allowable image cloud cover percentagecoverage_thresh =80.0# Minimum AOI image coverage percentageoutput_graphs =False# Option to output graphs in Stage 3 to filesaoi_file = Path("../../data/processed/Beitbridge_PS_AOI_Final.gpkg") # AOI geometry file used to query API# Overall path to put all outputsout_path = Path(f"../../data/tdi_demo_files/{aoi_name}")out_path = out_path.resolve()out_path.mkdir(parents=True, exist_ok=True)# Path to put search queriesoutpath_search_df = out_path.joinpath('search_queries')outpath_search_df.mkdir(exist_ok=True)outpath_download_df = out_path.joinpath('download_dataframes') # Path to put download dataframesoutpath_images = out_path.joinpath("images") # Base path where image files will be downloaded
Code
aoi_file = aoi_file.resolve()aoi_data = gpd.read_file(aoi_file)aoi_json = json.loads(aoi_data.to_json())headers = {'Content-Type': 'application/json', 'x-api-key': api_key} # Used to construct queries. Do not changeifnot api_key:print("No API key found. Beware that querying and downloading imagery will fail.")
Stage 1: Search API
This part of the notebook queries the Earthcache API for the specified AOI and TOI. Because of limitations of the API, long date ranges can fail to return all results. To get around this, the overall use-specified date range is split into multiple more manageable queries, with the results combined at the end.
Code
# Range of days to split up into separate queries. Skywatch API fails to return full results if too wide of a date range is used, so this is used as a workaround# The separate queries are combined at the end of the process.date_interval =90o_start_datetime = datetime.datetime.strptime(o_start_date, "%Y-%m-%d")o_end_datetime = datetime.datetime.strptime(o_end_date, "%Y-%m-%d")num_intervals = math.floor((o_end_datetime - o_start_datetime).days / date_interval)date_list = []if num_intervals >0: starting_date =Nonefor i inrange(0, num_intervals):ifnot starting_date: starting_date = o_start_datetime ending_date = starting_date + datetime.timedelta(days=date_interval)if ending_date > o_end_datetime: ending_date = o_end_datetime date_list.append({"start_date": datetime.datetime.strftime(starting_date, "%Y-%m-%d"), "end_date": datetime.datetime.strftime(ending_date, "%Y-%m-%d")}) starting_date = ending_date + datetime.timedelta(days=1)if ending_date < o_end_datetime: final_start_date = ending_date + datetime.timedelta(days=1) date_list.append({"start_date": datetime.datetime.strftime(final_start_date, "%Y-%m-%d"), "end_date": o_end_date})else: date_list.append({"start_date": o_start_date, "end_date": o_end_date})
Code
def get_sat_type(sat_id, source):""" Parses the type of Dove satellite from the image source returned from the Skywatch API as well as the satellite ID string. This is performed because the Skywatch API does not currently make any distinction between the Dove-Classic and Dove-R satellite platforms. Params: --------------------- sat_id: str identification string parsed from the image name source: str image source returned from the Skywatch API Returns: --------------------- sat_type: str satellite type for the input image """# List of dove-r satellite ID strings dove_r_list = ["1047", "1057", "1058", "1059", "105a", "105b", "105d", "105e", "105f", "1060", "1061", "1062", "1063", "1064", "1065", "1066", "1067", "1068", "1069", "106a", "106b", "106c", "106d", "106e", "106f"] sat_parse = sat_id.split('_')[-1]if source =="PlanetScope-SuperDove": sat_type ='superdove'if source =="PlanetScope": sat_type ='dove-c'ifany(sat in sat_parse for sat in dove_r_list): sat_type ="dove-r"return sat_typedef create_search_query(start_date, end_date, aoi_json):""" Creates a json query to use for searching the Skywatch API. Params: --------------------- start_date: str starting date string for query end_date: str ending date string for query aoi_json: geojson geojson geometry for query AOI Returns: --------------------- query: dict formatted query dictionary """ query = {"location": {"type": "Polygon","coordinates": aoi_json["features"][0]["geometry"]["coordinates"][0] },"start_date": start_date,"end_date": end_date,"resolution": ["medium"], # medium resolution = planetscope in skywatch"coverage": 50.0, # default mimimum aoi coverage of 50% for initial query, with a more stringent filter applied later"interval_length": 7,"order_by": "date" }return querydef search_catalog(query, headers):""" Runs a POST request to create search query. Params: --------------------- query: dict formatted query dictionary headers: dict headers for query Returns: --------------------- search_id: str id for search query """ search_url ="https://api.skywatch.co/earthcache/archive/search" search_resp = requests.post(url=search_url, data=json.dumps(query), headers=headers).json() search_id = search_resp['data']['id']return search_iddef get_search_results(search_id, headers):""" GET request to retrieve search results for a specified query Params: --------------------- search_id: str id for search query headers: dict headers for query Returns: --------------------- search_resp_json: json json containing search results for query """ search_url =f"https://api.skywatch.co/earthcache/archive/search/{search_id}/search_results" search_resp = requests.get(url=search_url, headers=headers)# Query results take a bit of time to populate after the initial POST request. This loop# repeats the get request every 5 seconds until results are available.if search_resp.status_code ==202:while search_resp.status_code ==202:print("Search still processing. Waiting 5 seconds.") time.sleep(5) search_resp = requests.get(url=search_url, headers=headers) search_resp_json = search_resp.json()return search_resp_jsondef parse_results(search_resp, headers, search_id, df_list):""" Parses through returned query results and creates pandas dataframes from them. Params: --------------------- search_resp: json query results json headers: dict headers for query search_id: str id for search query df_list: list list of search dataframes created from earlier search results Returns: --------------------- df_list: list list of search dataframes from previous and current search results """# Parses through search json results and populates dataframe with select image attributesfor item in search_resp["data"]: datestr = item['start_time'].split("T")[0] date_obj = datetime.datetime.strptime(datestr, "%Y-%m-%d") year = date_obj.year month = date_obj.month isodate = date_obj.isocalendar()if isodate[0] == (year -1) and isodate[1] ==52: week =1else: week = isodate[1] sat_type = get_sat_type(item['product_name'], item['source']) item_df = pd.DataFrame([{"search_id": search_id, "id": item['id'], "product_name": item['product_name'], "datestr": datestr, "date": date_obj, "year": year, "month": month, "week": week, "source": item['source'], "sat_type": sat_type, "area_sq_km": item['area_sq_km'], "cloud_cover": item['result_cloud_cover_percentage'], "aoi_coverage": item["location_coverage_percentage"],"cost": item['cost'], "preview": item["preview_uri"]}]) df_list.append(item_df)# Check for pagination of search resultstry: cursor = search_resp['pagination']['cursor']['next']except: cursor =None# If results are paginated, perform additional GET queries and create new dataframes until no more pages are leftwhile cursor: search_url3 =f"https://api.skywatch.co/earthcache/archive/search/{search_id}/search_results?cursor={cursor}" search_resp_2 = requests.get(url=search_url3, headers=headers).json()for item in search_resp_2["data"]: datestr = item['start_time'].split("T")[0] date_obj = datetime.datetime.strptime(datestr, "%Y-%m-%d") year = date_obj.year month = date_obj.month isodate = date_obj.isocalendar()if isodate[0] == (year -1) and isodate[1] ==52: week =1else: week = isodate[1] sat_type = get_sat_type(item['product_name'], item['source']) item_df = pd.DataFrame([{"search_id": search_id, "id": item['id'], "product_name": item['product_name'], "datestr": datestr, "date": date_obj, "year": year, "month": month, "week": week, "source": item['source'], "sat_type": sat_type, "area_sq_km": item['area_sq_km'], "cloud_cover": item['result_cloud_cover_percentage'], "aoi_coverage": item["location_coverage_percentage"],"cost": item['cost'], "preview": item["preview_uri"]}]) df_list.append(item_df)try: cursor = search_resp_2['pagination']['cursor']['next']except: cursor =Nonereturn df_list# This loop iterates through the shorter date ranges created in the previous cell,# creating new search queries and retrieving results. The returned json is then parsed# to create Pandas dataframes, which are captured in a listdf_list = [] # Empty list used to hold the separate results dataframesfor entry in date_list: start_date = entry['start_date'] end_date = entry['end_date']print(f"running search query for date range: {start_date} to {end_date}") query = create_search_query(start_date, end_date, aoi_json) search_id = search_catalog(query, headers) search_resp = get_search_results(search_id, headers) df_list = parse_results(search_resp, headers, search_id, df_list)
Code
# This cell combines the returned data from the separate API queries into a single dataframedef combine_query_dataframes(df_list):""" Combined separate results dataframes into a single dataframe. Params: --------------------- df_list: list list of separate dataframes created from query results Returns: --------------------- search_df: pandas dataframe combined dataframe containing all search results """ search_df = pd.concat(df_list) search_df.reset_index(drop=True, inplace=True) search_df['year'] = search_df['year'].astype('category') search_df['month'] = search_df['month'].astype('category') search_df['week'] = search_df['week'].astype('category')# this line adds a new field with an image preview html link, which is used in one of the interactive graphs in Stage 3 search_df["preview_html"] = search_df.apply(lambda x: f'<a href=\"{x["preview"]}\">Preview</a>', axis=1)return search_dfsearch_df = combine_query_dataframes(df_list)
Code
# Put combined search dataframe into pickle file to reload latersearch_pickle = outpath_search_df.joinpath(f'{aoi_name}_{o_start_date}_to_{o_end_date}_search_df.pkl')search_df.to_pickle(search_pickle)
Stage 2: Filter Query Dataframe.
This stage of the process filters down the query results from Stage 1. Continue from here if you ran Stage 1 previously.
Code
# Read in query pickle filesearch_pickle = outpath_search_df.joinpath(f'{aoi_name}_{o_start_date}_to_{o_end_date}_search_df.pkl')search_df = pd.read_pickle(search_pickle)
Code
def filter_dates(df):""" If more than one image exists for a given day, this function will select the "best" image for the day. First, images with poor AOI coverage are removed. If multiple images remain after this step, then the images are filtered by sensor type, with superdove and dove-r being prioritized over dove-classic. Finally, if multiple images still remain, the image with the highest AOI coverage is selected. Params: --------------------- df: pandas dataframe subset dataframe containing only the images for the specific date Returns: --------------------- select_id: str image id string for the selected image """ top_coverage = df.nlargest(1,'aoi_coverage')["aoi_coverage"].tolist()[0]with warnings.catch_warnings(): warnings.simplefilter("ignore") df['cover_percentmax'] = df.apply(lambda row: (row["aoi_coverage"] / top_coverage), axis=1) df = df.loc[(df["cover_percentmax"]>=0.9)]if'superdove'in df['sat_type'].unique() or'dove-r'in df['sat_type']: df = df.loc[(df['sat_type'] =='superdove') | (df['sat_type'] =='dove-r')] df = df.nlargest(1,'aoi_coverage') select_id = df['id'].values[0]return select_iddef filter_dataframe(df, cloud_cover_thresh, coverage_thresh):""" Filters down initial query results based on cloud cover, aoi coverage, and selecting only one image per day. Params: --------------------- df: pandas dataframe dataframe containing full image search results cloud_cover_thresh: float maximum cloud cover percent threshold coverage_thresh: float minimum aoi area coverage percent threshold Returns: --------------------- filtered_df: pandas dataframe filtered image dataframe """ filtered_df = df.copy(deep=True) filtered_df = filtered_df.loc[(search_df["cloud_cover"]<=cloud_cover_thresh) & (filtered_df["aoi_coverage"] >= coverage_thresh)] id_list = [] unique_dates = filtered_df.datestr.unique().tolist()for date_ in unique_dates: filtered_df2 = filtered_df.loc[filtered_df["datestr"]==date_]iflen(filtered_df2) >1: select_id = filter_dates(filtered_df2)else: select_id = filtered_df2['id'].values[0] id_list.append(select_id) filtered_df = filtered_df[filtered_df['id'].isin(id_list)] filtered_df.reset_index(inplace=True, drop=True)return filtered_dffiltered_search = filter_dataframe(search_df, cc_thresh, coverage_thresh)
Code
# Save filtered dataframe to pickle file for subsequent process stagesfilter_search_pickle = outpath_search_df.joinpath(f'{aoi_name}_{o_start_date}_to_{o_end_date}_search_df_filtered.pkl')filtered_search.to_pickle(filter_search_pickle)
Stage 3: Graph Generation
This stage is entirely optional but can be used to visualize the results of the query prior to download. The results of the query are displayed in a variety of interactive graphs
Code
# Path to put put figuresoutpath_figs = outpath_search_df.joinpath('figures')outpath_figs.mkdir(exist_ok=True)
# Graph 1: monthly bar graphs showing the number of images per month broken down by cloud cover percentagedef create_month_cc_graph(df, target_year, output_graph=False):""" Creates interactive bar plots showing the number of images in a given year, broken down by month and by cloud cover percentage. Params: --------------------- df: pandas dataframe query dataframe containing image information target_year: int year to produce graph for (will cause an error if no images present for specified year) output_graph: bool if set to True, graph will be saved to file as an html and png file """ target_df = df.loc[df["year"] == target_year] min_date = target_df.iloc[0]['datestr'] max_date = target_df.iloc[-1]['datestr'] month_df = pd.DataFrame() month_df["0% clouds"] = target_df.loc[target_df['cloud_cover'] ==0]['month'].value_counts() month_df["<=2% clouds"] = target_df.loc[(target_df['cloud_cover'] >0) & (target_df['cloud_cover'] <=2)]['month'].value_counts() month_df["<=5% clouds"] = target_df.loc[(target_df['cloud_cover'] >2) & (target_df['cloud_cover'] <=5)]['month'].value_counts() month_df["<=10% clouds"] = target_df.loc[(target_df['cloud_cover'] >5) & (target_df['cloud_cover'] <=10)]['month'].value_counts() month_df[">10% clouds"] = target_df.loc[target_df['cloud_cover'] >10]['month'].value_counts() month_df.reset_index(inplace=True) month_df.rename(columns={'index': 'month'}, inplace=True) month_df.sort_values('month', inplace=True) month_df.reset_index(inplace=True, drop=True) month_fig = px.bar(month_df, x='month', y=['0% clouds', '<=2% clouds', '<=5% clouds', '<=10% clouds', '>10% clouds'], color_discrete_map={'0% clouds': 'green','<=2% clouds': 'yellow', "<=5% clouds": "orange", "<=10% clouds": "red", ">10% clouds": "maroon"}) month_fig.update_layout(title_text=f"PlanetScope Cloud Cover By Month - {target_year} - {aoi_name}", title_x=0.5, yaxis_title="Image Count", xaxis_title="Month of Year") filenamestr =f"PlanetScope_Monthly_Image_Count_{min_date}_to_{max_date}_{aoi_name}"if output_graph: out_png = outpath_figs.joinpath(f"{filenamestr}.png") out_html = outpath_figs.joinpath(f"{filenamestr}.html") month_fig.write_image(out_png) month_fig.write_html(out_html) pyo.iplot(month_fig)create_month_cc_graph(filtered_search, target_year=2022, output_graph=output_graphs)
Code
# Graph 2: Weekly bar graphs showing the number of images per month broken down by cloud cover percentagedef create_week_cc_graph(df, target_year, output_graph=False):""" Creates interactive bar plots showing the number of images in a given year, broken down by the week of year and by cloud cover percentage. Params: --------------------- df: pandas dataframe query dataframe containing image information target_year: int year to produce graph for (will cause an error if no images present for specified year) output_graph: bool if set to True, graph will be saved to file as an html and png file """ target_df = df.loc[df["year"] == target_year] min_date = target_df.iloc[0]['datestr'] max_date = target_df.iloc[-1]['datestr'] week_df = pd.DataFrame() week_df["0% clouds"] = target_df.loc[target_df['cloud_cover'] ==0]['week'].value_counts() week_df["<=2% clouds"] = target_df.loc[(target_df['cloud_cover'] >0) & (target_df['cloud_cover'] <=2)]['week'].value_counts() week_df["<=5% clouds"] = target_df.loc[(target_df['cloud_cover'] >2) & (target_df['cloud_cover'] <=5)]['week'].value_counts() week_df["<=10% clouds"] = target_df.loc[(target_df['cloud_cover'] >5) & (target_df['cloud_cover'] <=10)]['week'].value_counts() week_df[">10% clouds"] = target_df.loc[target_df['cloud_cover'] >10]['week'].value_counts() week_df.reset_index(inplace=True) week_df.rename(columns={'index': 'week'}, inplace=True) week_df.sort_values('week', inplace=True) week_df.reset_index(inplace=True, drop=True) week_fig = px.bar(week_df, x='week', y=['0% clouds', '<=2% clouds', '<=5% clouds', '<=10% clouds', '>10% clouds'], color_discrete_map={'0% clouds': 'green','<=2% clouds': 'yellow', "<=5% clouds": "orange", "<=10% clouds": "red", ">10% clouds": "maroon"}) week_fig.update_layout(title_text=f"PlanetScope Cloud Cover By Week - {min_date} to {max_date} - {aoi_name}", title_x=0.5, yaxis_title="Image Count", xaxis_title="Week of Year") filenamestr =f"PlanetScope_Weekly_Image_Count_{min_date}_to_{max_date}_{aoi_name}"if output_graph: out_png = outpath_figs.joinpath(f"{filenamestr}.png") out_html = outpath_figs.joinpath(f"{filenamestr}.html") week_fig.write_image(out_png) week_fig.write_html(out_html) pyo.iplot(week_fig)create_week_cc_graph(filtered_search, target_year=2022, output_graph=output_graphs)
Code
# Graph 3: Annual bar chart showing image counts per year broken down by cloud cover percentage.# Use this to display the number of images across multiple years in the query, if applicable.def create_year_cc_graph(df, output_graph=False):""" Creates interactive bar plots showing the number of images in a given year across multiple years, broken down by cloud cover percentage. Params: --------------------- df: pandas dataframe query dataframe containing image information output_graph: bool if set to True, graph will be saved to file as an html and png file """ target_df = df min_year = target_df.iloc[0]['year'] max_year = target_df.iloc[-1]['year'] avg_price = target_df["cost"].mean() year_df = pd.DataFrame() year_df["0% clouds"] = target_df.loc[target_df['cloud_cover'] ==0]['year'].value_counts() year_df["<=2% clouds"] = target_df.loc[(target_df['cloud_cover'] >0) & (target_df['cloud_cover'] <=2)]['year'].value_counts() year_df["<=5% clouds"] = target_df.loc[(target_df['cloud_cover'] >2) & (target_df['cloud_cover'] <=5)]['year'].value_counts() year_df["<=10% clouds"] = target_df.loc[(target_df['cloud_cover'] >5) & (target_df['cloud_cover'] <=10)]['year'].value_counts() year_df[">10% clouds"] = target_df.loc[(target_df['cloud_cover'] >10)]['year'].value_counts() year_df["0% clouds cost"] = year_df.apply(lambda row: round((row["0% clouds"] * avg_price), 2), axis=1) year_df["<=2% clouds cost"] = year_df.apply(lambda row: round((row["<=2% clouds"] * avg_price), 2), axis=1) year_df["<=5% clouds cost"] = year_df.apply(lambda row: round((row["<=5% clouds"] * avg_price), 2), axis=1) year_df["<=10% clouds cost"] = year_df.apply(lambda row: round((row["<=10% clouds"] * avg_price), 2), axis=1) year_df[">10% clouds cost"] = year_df.apply(lambda row: round((row[">10% clouds"] * avg_price), 2), axis=1) year_df.reset_index(inplace=True) year_df.rename(columns={'index': 'year'}, inplace=True) year_df.sort_values('year', inplace=True) year_df.reset_index(inplace=True, drop=True) h_template='''Image Count 0% Clouds: %{customdata[0]} Estimated Cost: %{customdata[5]}<br> Image Count <=2% Clouds: %{customdata[1]} Estimated Cost: %{customdata[6]}<br> Image Count <=5% Clouds: %{customdata[2]} Estimated Cost: %{customdata[7]}<br> Image Count <=10% Clouds: %{customdata[3]} Estimated Cost: %{customdata[8]}<br> Image Count >10% Clouds: %{customdata[4]} Estimated Cost: %{customdata[9]}''' year_fig = px.bar(year_df, x='year', y=['0% clouds', '<=2% clouds', '<=5% clouds', '<=10% clouds', '>10% clouds'], color_discrete_map={'0% clouds': 'green','<=2% clouds': 'yellow', "<=5% clouds": "orange", "<=10% clouds": "red", ">10% clouds": "maroon"}, custom_data=np.stack((year_df['0% clouds'], year_df['<=2% clouds'], year_df['<=5% clouds'], year_df['<=10% clouds'], year_df['>10% clouds'], year_df['0% clouds cost'], year_df['<=2% clouds cost'], year_df['<=5% clouds cost'], year_df['<=10% clouds cost'], year_df['>10% clouds cost']))) year_fig.update_layout(title_text=f"PlanetScope Annual Image Counts and Cost - {min_year} to {max_year} - {aoi_name}", title_x=0.5, yaxis_title="Image Count", xaxis_title="Year") year_fig.update_traces(hovertemplate=h_template) filenamestr =f"PlanetScope_Annual_Image_Count_and_Cost_{min_year}_to_{max_year}_{aoi_name}"if output_graph: out_png = outpath_figs.joinpath(f"{filenamestr}.png") out_html = outpath_figs.joinpath(f"{filenamestr}.html") year_fig.write_image(out_png) year_fig.write_html(out_html) pyo.iplot(year_fig)return year_dfyear_df = create_year_cc_graph(filtered_search, output_graph=output_graphs)
Code
# Graph 4: Monthly bar chart showing monthly image counts broken down by year. User must specify a threshold for cloud coverdef create_month_multiyear_graph(df, cc_threshold, output_graph=False):""" Creates a monthly bar chart to display image counts per month, broken down by imaging year. Image counts are determined based on the input cloud cover threshold. Params: --------------------- df: pandas dataframe query dataframe containing image information cc_threshold: float cloud cover percentage threshold. Images with cloud cover exceeding this number will not be counted output_graph: bool if set to True, graph will be saved to file as an html and png file """ min_year = df.iloc[0]['year'] max_year = df.iloc[-1]['year'] month_df = pd.DataFrame() col_list = []for year in df.year.unique().tolist(): target_df = df.loc[df["year"] == year] col_name =f"{year}_counts" month_df[col_name] = target_df.loc[target_df['cloud_cover'] <= cc_threshold]['month'].value_counts() col_list.append(col_name) month_df.reset_index(inplace=True) month_df.rename(columns={'index': 'month'}, inplace=True) month_df.sort_values('month', inplace=True) month_df.reset_index(inplace=True, drop=True) month_fig = px.bar(month_df, x='month', y=col_list, barmode='group') month_fig.update_layout(title_text=f"PlanetScope Image Count By Year and Month - {cc_threshold}% Cloud Cover - {min_year} to {max_year} - {aoi_name}", title_x=0.5, yaxis_title="Image Count", xaxis_title="Month of Year") filenamestr =f"PlanetScope_Image_Count_Annual_Monthly_{cc_threshold}%CC_{min_year}_to_{max_year}_{aoi_name}"if output_graph: out_png = outpath_figs.joinpath(f"{filenamestr}.png") out_html = outpath_figs.joinpath(f"{filenamestr}.html") month_fig.write_image(out_png) month_fig.write_html(out_html) pyo.iplot(month_fig)create_month_multiyear_graph(filtered_search, cc_threshold=0, output_graph=output_graphs)
Code
# Graph 5: Scatter plot showing individual images, with date on the x-axis and cloud cover percentage# on the y-axis. Hovering over a point displays image information, including a preview link to display the image thumbnaildef create_image_preview_graph(df, output_graph=False):""" Creates an interactive scatterplot displaying individual images, with the image date on the x-axis and cloud cover percentage on the y-axis. Params: --------------------- df: pandas dataframe query dataframe containing image information output_graph: bool if set to True, graph will be saved to file as an html and png file """ layout = go.Layout(title=f'PlanetScope Images - {o_start_date} to {o_end_date} - {aoi_name}', xaxis=dict(title="Image Date"), yaxis=dict(title="Cloud Cover Percentage"))#, autosize=False, width=1200, height=510) fig = go.Figure(layout=layout) h_template='Image Date: %{customdata[0]}<br>%{customdata[3]}<br>Cloud Cover: %{customdata[2]}%<br>Image Source: %{customdata[1]}' fig.add_trace(go.Scatter( y=df['cloud_cover'], x=df['date'], mode='markers', orientation='h', marker=dict( color='red'), customdata=np.stack((df['datestr'], df['source'], df['cloud_cover'], df['preview_html']), axis=-1), hovertemplate=h_template )) fig.update_layout(title={'x': 0.5, 'xanchor': 'center'}) filenamestr = outpath_figs.joinpath(f"PlanetScope_Image_Previews_{o_start_date}_to_{o_end_date}_{aoi_name}")if output_graph: out_png = outpath_figs.joinpath(f"{filenamestr}.png") out_html = outpath_figs.joinpath(f"{filenamestr}.html") fig.write_image(out_png) fig.write_html(out_html) pyo.iplot(fig)create_image_preview_graph(filtered_search, output_graph=output_graphs)
Code
# Graph 6: Monthly bar chart showing monthly image counts for a given year broken down by satellite platform (dove-classic, dove-r, and superdove). # User must specify a threshold for cloud coverdef create_dove_monthly_graph(df, target_year, cc_threshold, output_graph):""" Creates a monthly bar chart to display image counts per month for a specified year, broken down by satellite platform Image counts are determined based on the input cloud cover threshold. Params: --------------------- df: pandas dataframe query dataframe containing image information target_year: int year to produce graph for (will cause an error if no images present for specified year) cc_threshold: float cloud cover percentage threshold. Images with cloud cover exceeding this number will not be counted output_graph: bool if set to True, graph will be saved to file as an html and png file """ target_df = df.loc[df["year"] == target_year] month_df = pd.DataFrame() col_list = []for datasource in df.sat_type.unique().tolist():if datasource =="dove-c": col_name ="Dove-Classic"elif datasource =="dove-r": col_name ="Dove-R"elif datasource =="superdove": col_name ="SuperDove" month_df[col_name] = target_df.loc[(target_df['cloud_cover'] == cc_threshold) & (target_df['sat_type'] == datasource)]['month'].value_counts() col_list.append(col_name) month_df.reset_index(inplace=True) month_df.rename(columns={'index': 'month'}, inplace=True) month_df.sort_values('month', inplace=True) month_df.reset_index(inplace=True, drop=True) month_fig = px.bar(month_df, x='month', y=col_list) month_fig.update_layout(title_text=f"PlanetScope Image Count By Month and By Satellite Type - {cc_threshold}% Cloud Cover - {target_year} - {aoi_name}", title_x=0.5, yaxis_title="Image Count", xaxis_title="Month of Year") filenamestr =f"PlanetScope_Image_Count_Monthly_{cc_threshold}%CC_SatelliteType_{target_year}_{aoi_name}"if output_graph: out_png = outpath_figs.joinpath(f"{filenamestr}.png") out_html = outpath_figs.joinpath(f"{filenamestr}.html") month_fig.write_image(out_png) month_fig.write_html(out_html) pyo.iplot(month_fig)create_dove_monthly_graph(filtered_search, target_year=2021, cc_threshold=0, output_graph=output_graphs)
Code
# Graph 7: Annual bar chart showing image counts per year broken down by satellite platform.# Use this to display the number of images from different satellites across multiple years in the query, if applicable.def create_dove_year_graph(df, cc_threshold, output_graph):""" Creates a yearly bar chart to display image counts per year over multiple years, broken down by satellite platform Image counts are determined based on the input cloud cover threshold. Params: --------------------- df: pandas dataframe query dataframe containing image information cc_threshold: float cloud cover percentage threshold. Images with cloud cover exceeding this number will not be counted output_graph: bool if set to True, graph will be saved to file as an html and png file """ target_df = df min_year = df.iloc[0]['year'] max_year = df.iloc[-1]['year'] year_df = pd.DataFrame() year_df["Dove-Classic"] = target_df.loc[(target_df['cloud_cover'] <=5) & (target_df["sat_type"] =="dove-c")]['year'].value_counts() year_df["Dove-R"] = target_df.loc[(target_df['cloud_cover'] <=5) & (target_df["sat_type"] =="dove-r")]['year'].value_counts() year_df["SuperDove"] = target_df.loc[(target_df['cloud_cover'] <=5) & (target_df["sat_type"] =="superdove")]['year'].value_counts() year_df.reset_index(inplace=True) year_df.rename(columns={'index': 'year'}, inplace=True) year_df.sort_values('year', inplace=True) year_df.reset_index(inplace=True, drop=True) year_fig = px.bar(year_df, x='year', y=["Dove-Classic", "Dove-R", "SuperDove"]) year_fig.update_layout(title_text=f"PlanetScope Image Count By Year and By Satellite Type - {cc_threshold}% Cloud Cover - {min_year} to {max_year} - {aoi_name}", title_x=0.5, yaxis_title="Image Count", xaxis_title="Year") filenamestr =f"PlanetScope_Annual_Image_Count_{cc_threshold}_%CC_SatelliteType_{min_year}_to_{max_year}_{aoi_name}"if output_graph: out_png = outpath_figs.joinpath(f"{filenamestr}.png") out_html = outpath_figs.joinpath(f"{filenamestr}.html") year_fig.write_image(out_png) year_fig.write_html(out_html) pyo.iplot(year_fig)return year_dfyear_df = create_dove_year_graph(filtered_search, cc_threshold=0, output_graph=output_graphs)
Stage 4: Download Imagery
In this stage, the images returned by the query in Stage 1 are downloaded. The user may input final criteria to further subset the images to retrieve based on cloud cover, start and end dates, AOI coverage, and satellite type
# Specify final image download selection criteria here. The original query criteria can also be used by setting all variables to Nonestart_date_download =None# Start date for image download (string in the format "YYYY-MM-DD")end_date_download =None# End date for image download (string in the format "YYYY-MM-DD")cc_thresh_download =None# Maximum cloud cover percentage to use for image downloadaoi_coverage_download =None# Minimum AOI coverage percentage to use for image downloadsat_list =None# List of satellite types to download. Options are 'dove-c', 'dove-r', and 'superdove'. Note: TDI notebook currently only supports superdove# Examples of user-specified criteria for image download# max_cc_download = 0.0# start_date_download = "2022-01-01"# end_date_download = "2023-03-31"# aoi_coverage_download = 90.0# sat_list = ['superdove']# If one of the above download criteria is not specified, the original query parameter is used.if start_date_download isNone: start_date_download = o_start_dateif end_date_download isNone: end_date_download = o_end_dateif cc_thresh_download isNone: cc_thresh_download = cc_threshif aoi_coverage_download isNone: aoi_coverage_download = coverage_threshif sat_list isNone: sat_list = ['dove-c', 'dove-r', 'superdove']start_date_final = datetime.datetime.strptime(start_date_download, "%Y-%m-%d")end_date_final = datetime.datetime.strptime(end_date_download, "%Y-%m-%d")# Path to put downloaded imagesoutpath_dl = outpath_images.joinpath(f"{aoi_name}_{start_date_download}_to_{end_date_download}")outpath_dl.mkdir(parents=True, exist_ok=True)outpath_download_df.mkdir(parents=True, exist_ok=True)filtered_download_pickle = outpath_download_df.joinpath(f'{aoi_name}_{start_date_download}_to_{end_date_download}_download_df.pkl')filtered_download_resp_pickle = outpath_download_df.joinpath(f'{aoi_name}_{start_date_download}_to_{end_date_download}_download_resp_dict.pkl')pl_resp_dict = {}
Code
#Filter results down to final download list based on above criteriafiltered_search_final = filtered_search.copy(deep=True)filtered_search_final = filtered_search_final.loc[(filtered_search_final['date'] >= start_date_final) & (filtered_search_final['date'] <= end_date_final) & (filtered_search_final['cloud_cover'] <= cc_thresh_download)& (filtered_search_final['aoi_coverage'] >= aoi_coverage_download)& (filtered_search_final['sat_type'].isin(sat_list))]filtered_search_final['pl_id'] =""filtered_search_final['pl_status'] =""filtered_search_final.to_pickle(filtered_download_pickle)total_cost =round(filtered_search_final['cost'].sum(), 2)num_images =len(filtered_search_final)print(f"Total number of images to download: {num_images}. Total cost: ${total_cost} USD")
Total number of images to download: 163. Total cost: $1205.88 USD
Code
# Query endpoint to get different output types and build dictionary of selectable output types for downloadoutput_id_url =f"https://api.skywatch.co/earthcache/outputs"output_id_resp = requests.get(url=output_id_url, headers=headers).json()output_type_dict = {}for output in output_id_resp['data']: output_name = output['name'] output_id = output['id'] output_type_dict[output_name] = output_idoutput_type_dict.keys()
# Set desired output type to download based on the output from the previous cell# Note: leave as "All Optical Bands" if using the TDI notebook after image downloadim_download_product ='All Optical Bands'output_id = output_type_dict[im_download_product]
Code
def create_pipeline_query(pipeline_name, search_id, image_id, output_id):""" Creates a json query to use for creating an image download pipeline in the Skywatch API. Params: --------------------- pipeline_name: str Name of the image download pipeline search_id: str id for search query where image appears image_id: str id for image output_id: str id string for selected image product to download Returns: --------------------- query: dict formatted query dictionary """ query = {"name": pipeline_name,"search_id": search_id,"search_results": image_id,"output": {"id": output_id,"format": "geotiff","mosaic": "off" }, }return querydef post_pipeline(query, headers):""" Runs a POST request to create image download pipeline. Params: --------------------- query: dict formatted query dictionary headers: dict headers for query Returns: --------------------- pipeline_id: str id string for download pipeline """ pipeline_url ="https://api.skywatch.co/earthcache/pipelines" pipeline_resp = requests.post(url=pipeline_url, data=json.dumps(query), headers=headers).json() pipeline_id = pipeline_resp['data']['id']return pipeline_iddef get_pipeline(pipeline_id):""" Get request to get the status of an image download pipeline. Params: --------------------- pipeline_id: str id string for download pipeline Returns: --------------------- pipeline_get_resp: requests response object image download pipeline response object """ pipeline_get_url =f"https://api.skywatch.co/earthcache/interval_results?pipeline_id={pipeline_id}" pipeline_get_resp = requests.get(url=pipeline_get_url, headers=headers)return pipeline_get_resp
WARNING: Running the below cell will result in charges being applied to your Earthcache account!
Code
# Iterates through all images in image download dataframe and creates new image download pipelines for each.for index, row in filtered_search_final.iterrows(): time.sleep(1) search_id = row["search_id"] image_id = row["id"] product_name = row["product_name"]print(f"creating download pipeline for image: {product_name}") pipeline_name =f"{aoi_name}_{product_name}"#Pipeline name, which shows up in the Earthcache console. Default combines the AOI name and the image name pl_query = create_pipeline_query(pipeline_name, search_id, image_id, output_id) pl_id = post_pipeline(pl_query, headers) filtered_search_final.loc[index, 'pl_id'] = pl_id # Appends the image download pipeline ID to the dataframe
Code
# Once the image download dataframe has been updated with the download pipeline IDs, # we save it to a pickle file in case notebook terminates before images are downloadedfiltered_search_final.to_pickle(filtered_download_pickle)
Code
# Retrieve filtered download dataframe and dictionary containing json response objects in case# notebook was terminated before images could be downloaded. Resume from here in that case.if filtered_download_pickle.exists(): filtered_search_final = pd.read_pickle(filtered_download_pickle)if filtered_download_resp_pickle.exists():withopen(filtered_download_resp_pickle, 'rb') as f: pl_resp_dict = pickle.load(f)
Code
def query_pipeline(pl_id):""" Queries the image download pipeline to check processing status and retrieve pipeline json response containing image download links for completed pipelines. Params: --------------------- pl_id: str id string for download pipeline Returns: --------------------- pl_status: str processing status of pipeline pl_resp_json: dict json response object from pipeline query. For completed jobs this will contain links for image download. """ pl_resp = get_pipeline(pl_id) pl_resp_json = pl_resp.json() pl_status = pl_resp_json['data'][0]['status']return pl_status, pl_resp_json# Iterates through all images in image download dataframe and checks the status of the image # download pipelines and saves the response json to a dictionary.# If any image download pipelines are still processing, the loop will repeat after a delay # which gets progressively shorter. The initial delay is set to 30 minutes. The delay is progressively # reduced, with a minimum delay of 1 minute being imposed. The loop repeats until the status of # all download pipelines have changed to completed.num_iterations =1results_processing =Truewhile results_processing: status_list = []print('Checking status of image download pipelines.')for index, row in filtered_search_final.iterrows(): time.sleep(1) pl_status = row['pl_status'] pl_id = row['pl_id']if pl_status !='complete': pl_status, pl_resp_json = query_pipeline(pl_id) filtered_search_final.loc[index, 'pl_status'] = pl_resp_json['data'][0]['status'] pl_resp_dict[pl_id] = pl_resp_json status_list.append(pl_status) filtered_search_final.to_pickle(filtered_download_pickle)withopen(filtered_download_resp_pickle, 'wb') as f: pickle.dump(pl_resp_dict, f)if'complete'inset(status_list) andlen(set(status_list)) ==1: results_processing =Falseprint('All image pipelines are finished and ready for download!')else: wait_time =1800/ num_iterationsif wait_time <60: wait_time =60 wait_time_mins =round((wait_time /60), 0)print(f"Results still pending for some items. Waiting for {wait_time_mins} mins and trying again.") num_iterations +=2 time.sleep(wait_time)
Code
def download_file(url, out_name):""" Downloads files using the provided URL. Params: --------------------- url: str url path to the file out_name: str output file name for the local file download Returns: --------------------- """with requests.get(url, stream=True) as r: r.raise_for_status()withopen(out_name, 'wb') as f:for chunk in r.iter_content(chunk_size=None): f.write(chunk)# Iterates through all images in image download dataframe and downloads image and metadatafor index, row in filtered_search_final.iterrows(): product_name = row['product_name'] pl_id = row['pl_id'] pl_resp_json = pl_resp_dict[pl_id] results = pl_resp_json['data'][0]['results'][0]# Dict of image files to download. Currently downloads image tif file and metadata json file.# Cloud mask file currently commented out, as it is not used in subsequent processes. download_dict = {'image': {'url': results['analytics_url'], 'extension': 'analytic.tif'},'metadata': {'url': results['metadata_url'], 'extension': 'metadata.json'}}# 'cloud_mask': {'url': results['raster_files'][0]['uri'], 'extension': 'mask.tif'}} out_basepath = outpath_dl.joinpath(product_name) # Files for each image are placed in a folder named after the image file out_basepath.mkdir(exist_ok=True)print(f'downloading all files for image: {product_name}')for key, value in download_dict.items(): dl_url = value['url'] extension = value['extension'] out_fname = out_basepath.joinpath(f'{product_name}_{extension}') download_file(dl_url, out_fname)