Source code for sospice.catalog.catalog

from dataclasses import dataclass
from pathlib import Path
from itertools import cycle
import matplotlib.colors as mcolors
import pandas as pd
import numpy as np
import warnings

from parfive import Downloader
from astropy.utils.data import download_file

from .release import Release
from .file_metadata import FileMetadata, required_columns


[docs]@dataclass class Catalog(pd.DataFrame): """ A SPICE catalog, initialized (in that order) either from a filename, a release tag, or a pandas.DataFrame. Parameters ---------- filename: str A file name (or URL) for the catalog release_tag: str A release tag. The catalog is fetched online and downloaded to the astropy cache. data_frame: pandas.DataFrame A pandas DataFrame to be used as SPICE catalog. Some basic checks are made to ensure that is can be used as a SPICE catalog. update_cache: bool Update cached catalog for the given release tag """ filename: str = None release_tag: str = None data_frame: pd.DataFrame = None update_cache: bool = False def __post_init__(self): """ Read catalog and update object """ self._normalize_arguments() if self.release_tag is not None: self._cache_release_catalog() if self.filename is not None: super().__init__(self.read_catalog()) else: if self.data_frame is None: self.data_frame = pd.DataFrame() self._validate_data_frame() super().__init__(self.data_frame) del self.data_frame # needed for memory usage? self.data_frame = None def _normalize_arguments(self): """ Prioritize filename then release tag then data frame """ if self.filename is not None: self.release_tag = None self.data_frame = None elif self.release_tag is not None: self.data_frame = None def _cache_release_catalog(self): """ Used cached catalog or download release catalog to astropy cache """ if self.release_tag is None: return if self.release_tag == "latest": self.release_tag = None release = Release(self.release_tag) assert release.exists cache = "update" if self.update_cache else True self.filename = download_file(release.catalog_url, cache=cache) self.release_tag = None def _validate_data_frame(self): """ Check that the data_frame argument can be considered a valid SPICE catalog (or raise an exception) """ assert self.data_frame is not None if self.data_frame.empty: return True # an empty data frame is valid assert required_columns.issubset(self.data_frame.columns) def read_catalog(self): """ Read SPICE FITS files catalog Return ------ pandas.DataFrame Catalog """ if not Path(self.filename).exists(): raise RuntimeError(f"File {self.filename} does not exist") df = pd.read_csv( self.filename, low_memory=False, ) date_columns = ["DATE-BEG", "DATE", "TIMAQUTC"] for date_column in date_columns: df.loc[df[date_column] == "MISSING", date_column] = "NaT" df[date_column] = pd.to_datetime(df[date_column], format="ISO8601") return df @classmethod def build_query_from_keywords(cls, **kwargs): """ Build a query from the provided parameters: exact keyword matches Parameters ---------- kwargs: dict Parameters and their values Return ------ str Query string for `pandas.DataFrame.query()` Notes: * does not work for dates * keywords are converted to upper case (FITS keywords) * ignores keywords with value None """ queries = list() for key in kwargs: value = kwargs[key] if value is None: continue if isinstance(value, str): query = f'{key.upper()} == "{kwargs[key]}"' else: query = f"{key.upper()} == {kwargs[key]}" queries.append(query) return " and ".join(queries) def find_files_by_keywords(self, **kwargs): """ Find files according to criteria on metadata: exact keyword matches Parameters ---------- kwargs: dict Parameters and their values Return ------ Catalog Matching files """ if self.empty or not kwargs: return self query = Catalog.build_query_from_keywords(**kwargs) if query != "": df = self.query(query) return Catalog(data_frame=df) else: return self def find_files_by_date_range(self, date_min=None, date_max=None): """ Find files with DATE-BEG in some date range. Parameters ---------- date_min: Minimum date of a date range date_max: Maximum date of a date range Return ------ Catalog Matching files """ if self.empty: return self df = self if date_min is not None: if type(date_min) is str: date_min = pd.Timestamp(date_min) df = df[df["DATE-BEG"] >= date_min] if date_max is not None: if type(date_max) is str: date_max = pd.Timestamp(date_max) df = df[df["DATE-BEG"] <= date_max] return Catalog(data_frame=df) def find_file_closest_to_date(self, date, level="L2"): """ Find file closest to some given date Parameters ---------- date: datetime.datetime, pandas.Timestamp... Date (compared to DATE-BEG) level: str Data level Return ------ pandas.Series Matching file """ if date is None: return pd.Series() if type(date) is str: date = pd.Timestamp(date) df = self[self.LEVEL == level] df.set_index("DATE-BEG", inplace=True) index = df.index.get_indexer([date], method="nearest") df.reset_index(inplace=True) return df.iloc[index[0]] def find_files( self, query=None, date_min=None, date_max=None, closest_to_date=None, **kwargs ): """ Find files according to different criteria on metadata. Parameters ---------- query: str Generic pandas.DataFrame.query() string date_min: Minimum date of a date range date_max: Maximum date of a date range closest_to_date: datetime.datetime, pandas.Timestamp... Find the file closest to a date. kwargs: dict Other parameters and their values Return ------ pandas.DataFrame Matching files Notes: * Filtering is done by keyword exact match (LEVEL, SOOPNAME, MISOSTDU...), then using the generic query string, then by date range, then by closest date. * Keywords are converted to upper case (FITS keywords), so they can be passed as lowercase arguments * Selects LEVEL="L2" by default; if you want all levels, please specify LEVEL=None * Date arguments are compared to DATE-BEG. """ if self.empty: return self if "LEVEL" not in [k.upper() for k in kwargs.keys()]: kwargs["LEVEL"] = "L2" df = self.find_files_by_keywords(**kwargs) if query is not None: df = Catalog(data_frame=df.query(query)) df = df.find_files_by_date_range(date_min, date_max) if closest_to_date is not None: df = ( df.find_file_closest_to_date(closest_to_date, level=kwargs["LEVEL"]) .to_frame() .T ) return df def mid_time(self, method=None): """ Find "middle time" for observations in catalog Parameters ---------- method: str Method for determining middle time. Can be * "midrange" (default): middle of time range, from beginning of first observation to end of last observation * "mean": mean of observation times (not weighted by observations durations) * "barycenter": barycenter of the middle times of all observations (weighted by observations durations) """ if method is None or method == "midrange": begin_min = self["DATE-BEG"].min() begin_max = self["DATE-BEG"].max() last_telapse = self[self["DATE-BEG"] == begin_max].TELAPSE.max() end_max = begin_max + pd.Timedelta(seconds=last_telapse) return begin_min + (end_max - begin_min) / 2 elif method == "mean": begin_mean = self["DATE-BEG"].mean() telapse_half_mean = pd.Timedelta(seconds=self.TELAPSE.mean() / 2) return begin_mean + telapse_half_mean elif method == "barycenter": mid_observation = self["DATE-BEG"] + self.apply( lambda row: pd.Timedelta(seconds=row.TELAPSE / 2), axis=1 ) weight = self.TELAPSE t0 = mid_observation.iloc[0] return t0 + ((mid_observation - t0) * weight).sum() / weight.sum() else: raise RuntimeError("Invalid method") @classmethod def _format_time_range(cls, row, timespec="minutes"): """ Format time range for observation Parameters ---------- row: pd.Series Catalog row timespec: str Time terms specification for pandas.Timestamp.isoformat() Return ------ str Formatted time range The end of the time range is known from the single observation in `row` thanks to an additional element `last_DATE-BEG' in the Series. All dates are DATE-BEG, we don't compute a DATE-END. """ t = [row["DATE-BEG"]] is_range = ("last_DATE-BEG" in row.index) and (row["last_DATE-BEG"] != t[0]) if is_range: t.append(row["last_DATE-BEG"]) t_str = [tt.isoformat(timespec=timespec) for tt in t] if is_range and t_str[0][:10] == t_str[1][:10]: t_str[1] = t_str[1][10:] return " - ".join(t_str) def plot_fov(self, ax, **kwargs): """ Plot SPICE FOVs on a background map Parameters ---------- ax: matplotlib.axes.Axes Axes (with relevant projection) kwargs: dict Keyword arguments, passed to FileMetadata.plot_fov(), except: * color: str or list Color(s) cycle for drawing the FOVs (one color per type of study) * merge_by_spiobsid: bool Merge FOV plots by SPIOBSID, drawing the first FOV of the observations of each SPIOBSID with a plain line and the last FOV with a dashed line """ time_range_length = self["DATE-BEG"].max() - self["DATE-BEG"].min() n_fovs = len(self) if (time_range_length > pd.Timedelta(days=60)) or (n_fovs > 1000): print( f"Time range length is {time_range_length} and number of observations is {n_fovs}, " "this is a lot and probably not what you want; aborting" ) return fovs = self.sort_values(by=["DATE"]) merge_by_spiobsid = kwargs.pop("merge_by_spiobsid", True) assert type(merge_by_spiobsid) is bool if merge_by_spiobsid: groups = self.groupby("SPIOBSID") fovs = groups.first() fovs_last = groups.last() fovs["last_DATE-BEG"] = fovs_last["DATE-BEG"] fovs.reset_index(inplace=True) else: fovs = Catalog(data_frame=self[list(required_columns)]) # label at the position of the FOV plot fovs["fov_contour_label"] = fovs.apply(Catalog._format_time_range, axis=1) # label at the level of the plot legend (will be de-duplicated afterwards) fovs["fov_label"] = fovs.apply( lambda row: f"{row.STUDY} ({row.MISOSTUD})", axis=1 ) # color(s) for the different study types color = kwargs.pop("color", None) studies = sorted(list(self.STUDY.unique())) colors = ( mcolors.TABLEAU_COLORS if color is None else color if type(color) is list else [color] ) study_color = dict(zip(studies, cycle(colors))) fovs["fov_color"] = fovs.apply(lambda row: study_color[row.STUDY], axis=1) fovs.apply( lambda row: FileMetadata(row).plot_fov(ax, **kwargs), axis=1, ) if merge_by_spiobsid: # also plot last FOV, with dashes fovs_last.reset_index(inplace=True) fovs_last["fov_color"] = fovs.fov_color fovs_last["fov_linestyle"] = ":" fovs_last = fovs_last[fovs_last.RASTERNO != 0] fovs_last.apply( lambda row: FileMetadata(row).plot_fov(ax, **kwargs), axis=1, ) # De-duplicate labels for legend (an alternative would be # to provide labels only to the first instance of each study) handles, labels = ax.get_legend_handles_labels() unique_indices = [labels.index(x) for x in sorted(set(labels))] handles = list(np.array(handles)[unique_indices]) ax.legend(handles=handles) def download_files( self, base_dir=".", base_url=None, release=None, keep_tree=True, downloader=None, max_download=None, ): """ Download all files from Catalog., Parameters ---------- base_dir: Path or str Base directory to download file to base_url: str Base URL for file release: Release or str Release to download file from keep_tree: bool Keep tree directory structure (by level and date) downloader: parfive.Downloader If provided, enqueue file for download instead of downloading it. To download enqueued files, run `downloader.download()` max_download: int Maximum number of files to be downloaded. Return ------ parfive.Result Download result (or None if file has only been enqueued) """ default_max_download = 1000 if max_download is None: max_download = default_max_download elif max_download > default_max_download: warnings.warn( "You are overriding the default max_download: This might cause performance issues." ) do_download = False if downloader is None: downloader = Downloader(overwrite=False) do_download = True self.iloc[:max_download].apply( lambda row: FileMetadata(row).download_file( base_dir=base_dir, base_url=base_url, release=release, keep_tree=keep_tree, downloader=downloader, ), axis=1, ) if do_download: result = downloader.download() return result return