Source code for sospice.catalog.catalog

from dataclasses import dataclass
from pathlib import Path
from itertools import cycle

import matplotlib.colors as mcolors
import pandas as pd
import numpy as np

from astropy.utils.data import download_file

from .release import Release
from .file_metadata import FileMetadata, required_columns


[docs]@dataclass class Catalog(pd.DataFrame): """ A SPICE catalog, initialized (in that order) either from a filename, a release tag, or a pandas.DataFrame. Parameters ---------- filename: str A file name (or URL) for the catalog release_tag: str A release tag. The catalog is fetched online and downloaded to the astropy cache. data_frame: pandas.DataFrame A pandas DataFrame to be used as SPICE catalog. Some basic checks are made to ensure that is can be used as a SPICE catalog. update_cache: bool Update cached catalog for the given release tag """ filename: str = None release_tag: str = None data_frame: pd.DataFrame = None update_cache: bool = False def __post_init__(self): """ Read catalog and update object """ self._normalize_arguments() if self.release_tag is not None: self._cache_release_catalog() if self.filename is not None: super().__init__(self.read_catalog()) else: if self.data_frame is None: self.data_frame = pd.DataFrame() self._validate_data_frame() super().__init__(self.data_frame) del self.data_frame # needed for memory usage? self.data_frame = None def _normalize_arguments(self): """ Prioritize filename then release tag then data frame """ if self.filename is not None: self.release_tag = None self.data_frame = None elif self.release_tag is not None: self.data_frame = None def _cache_release_catalog(self): """ Used cached catalog or download release catalog to astropy cache """ if self.release_tag is None: return if self.release_tag == "latest": self.release_tag = None release = Release(self.release_tag) assert release.exists cache = "update" if self.update_cache else True self.filename = download_file(release.catalog_url, cache=cache) self.release_tag = None def _validate_data_frame(self): """ Check that the data_frame argument can be considered a valid SPICE catalog (or raise an exception) """ assert self.data_frame is not None if self.data_frame.empty: return True # an empty data frame is valid assert required_columns.issubset(self.data_frame.columns) def read_catalog(self): """ Read SPICE FITS files catalog Return ------ pandas.DataFrame Catalog """ if not Path(self.filename).exists(): raise RuntimeError(f"File {self.filename} does not exist") df = pd.read_csv( self.filename, low_memory=False, ) date_columns = ["DATE-BEG", "DATE", "TIMAQUTC"] for date_column in date_columns: df.loc[df[date_column] == "MISSING", date_column] = "NaT" df[date_column] = pd.to_datetime(df[date_column], format="ISO8601") return df @classmethod def build_query_from_keywords(cls, **kwargs): """ Build a query from the provided parameters: exact keyword matches Parameters ---------- kwargs: dict Parameters and their values Return ------ str Query string for `pandas.DataFrame.query()` Notes: * does not work for dates * keywords are converted to upper case (FITS keywords) * ignores keywords with value None """ queries = list() for key in kwargs: value = kwargs[key] if value is None: continue if isinstance(value, str): query = f'{key.upper()} == "{kwargs[key]}"' else: query = f"{key.upper()} == {kwargs[key]}" queries.append(query) return " and ".join(queries) def find_files_by_keywords(self, **kwargs): """ Find files according to criteria on metadata: exact keyword matches Parameters ---------- kwargs: dict Parameters and their values Return ------ Catalog Matching files """ if self.empty or not kwargs: return self query = Catalog.build_query_from_keywords(**kwargs) if query != "": df = self.query(query) return Catalog(data_frame=df) else: return self def find_files_by_date_range(self, date_min=None, date_max=None): """ Find files with DATE-BEG in some date range. Parameters ---------- date_min: Minimum date of a date range date_max: Maximum date of a date range Return ------ Catalog Matching files """ if self.empty: return self df = self if date_min is not None: if type(date_min) is str: date_min = pd.Timestamp(date_min) df = df[df["DATE-BEG"] >= date_min] if date_max is not None: if type(date_max) is str: date_max = pd.Timestamp(date_max) df = df[df["DATE-BEG"] <= date_max] return Catalog(data_frame=df) def find_file_closest_to_date(self, date, level="L2"): """ Find file closest to some given date Parameters ---------- date: datetime.datetime, pandas.Timestamp... Date (compared to DATE-BEG) level: str Data level Return ------ pandas.Series Matching file """ if date is None: return pd.Series() if type(date) is str: date = pd.Timestamp(date) df = self[self.LEVEL == level] df.set_index("DATE-BEG", inplace=True) index = df.index.get_indexer([date], method="nearest") df.reset_index(inplace=True) return df.iloc[index[0]] def find_files( self, query=None, date_min=None, date_max=None, closest_to_date=None, **kwargs ): """ Find files according to different criteria on metadata. Parameters ---------- query: str Generic pandas.DataFrame.query() string date_min: Minimum date of a date range date_max: Maximum date of a date range closest_to_date: datetime.datetime, pandas.Timestamp... Find the file closest to a date. kwargs: dict Other parameters and their values Return ------ pandas.DataFrame Matching files Notes: * Filtering is done by keyword exact match (LEVEL, SOOPNAME, MISOSTDU...), then using the generic query string, then by date range, then by closest date. * Keywords are converted to upper case (FITS keywords), so they can be passed as lowercase arguments * Selects LEVEL="L2" by default; if you want all levels, please specify LEVEL=None * Date arguments are compared to DATE-BEG. """ if self.empty: return self if "LEVEL" not in [k.upper() for k in kwargs.keys()]: kwargs["LEVEL"] = "L2" df = self.find_files_by_keywords(**kwargs) if query is not None: df = Catalog(data_frame=df.query(query)) df = df.find_files_by_date_range(date_min, date_max) if closest_to_date is not None: df = ( df.find_file_closest_to_date(closest_to_date, level=kwargs["LEVEL"]) .to_frame() .T ) return df def mid_time(self, method=None): """ Find "middle time" for observations in catalog Parameters ---------- method: str Method for determining middle time. Can be * "midrange" (default): middle of time range, from beginning of first observation to end of last observation * "mean": mean of observation times (not weighted by observations durations) """ if method is None or method == "midrange": begin_min = self["DATE-BEG"].min() begin_max = self["DATE-BEG"].max() last_telapse = self[self["DATE-BEG"] == begin_max].TELAPSE.max() end_max = begin_max + pd.Timedelta(seconds=last_telapse) return begin_min + (end_max - begin_min) / 2 elif method == "mean": begin_mean = self["DATE-BEG"].mean() telapse_mean = pd.Timedelta(seconds=self.TELAPSE.mean()) return begin_mean + telapse_mean elif method == "barycenter": mid_observation = self["DATE-BEG"] + self.apply( lambda row: pd.Timedelta(seconds=row.TELAPSE / 2), axis=1 ) weight = self.TELAPSE t0 = mid_observation.iloc[0] return t0 + ((mid_observation - t0) * weight).sum() / weight.sum() else: raise RuntimeError("Invalid method") @classmethod def _format_time_range(cls, row, timespec="minutes"): """ Format time range for observation Parameters ---------- row: pd.Series Catalog row timespec: str Time terms specification for pandas.Timestamp.isoformat() Return ------ str Formatted time range The end of the time range is known from the single observation in `row` thanks to an additional element `last_DATE-BEG' in the Series. All dates are DATE-BEG, we don't compute a DATE-END. """ t = [row["DATE-BEG"]] is_range = ("last_DATE-BEG" in row.index) and (row["last_DATE-BEG"] != t[0]) if is_range: t.append(row["last_DATE-BEG"]) t_str = [tt.isoformat(timespec=timespec) for tt in t] if is_range and t_str[0][:10] == t_str[1][:10]: t_str[1] = t_str[1][10:] return " - ".join(t_str) def plot_fov(self, ax, **kwargs): """ Plot SPICE FOVs on a background map Parameters ---------- ax: matplotlib.axes.Axes Axes (with relevant projection) color: str or list Color(s) cycle for drawing the FOVs kwargs: dict Keyword arguments, passed to FileMetadata.plot_fov() """ time_range_length = self["DATE-BEG"].max() - self["DATE-BEG"].min() if time_range_length > pd.Timedelta(days=60): print( f"Time range length is {time_range_length}, this is long, and probably not what you want; aborting" ) return merge_by_spiobsid = True if merge_by_spiobsid: groups = self.groupby("SPIOBSID") fovs = groups.first() fovs_last = groups.last() fovs["last_DATE-BEG"] = fovs_last["DATE-BEG"] fovs.reset_index(inplace=True) else: fovs = Catalog(data_frame=self[list(required_columns)]) # label at the position of the plot fovs["fov_text"] = fovs.apply(Catalog._format_time_range, axis=1) # label at the level of the plot (will be de-duplicated afterwards) fovs["fov_label"] = fovs.apply( lambda row: f"{row.STUDY} ({row.MISOSTUD})", axis=1 ) # color(s) color = kwargs.pop("color", None) studies = sorted(list(self.STUDY.unique())) colors = ( mcolors.TABLEAU_COLORS if color is None else color if type(color) is list else [color] ) study_color = dict(zip(studies, cycle(colors))) fovs["fov_color"] = fovs.apply(lambda row: study_color[row.STUDY], axis=1) fovs.apply( lambda row: FileMetadata(row).plot_fov(ax, **kwargs), axis=1, ) if merge_by_spiobsid: # also plot last FOV, with dashes fovs_last.reset_index(inplace=True) fovs_last["fov_color"] = fovs.fov_color fovs_last["fov_linestyle"] = ":" fovs_last = fovs_last[fovs_last.RASTERNO != 0] fovs_last.apply( lambda row: FileMetadata(row).plot_fov(ax, **kwargs), axis=1, ) # De-duplicate labels for legend (an alternative would be # to provide labels only to the first instance of each study) handles, labels = ax.get_legend_handles_labels() unique_indices = [labels.index(x) for x in sorted(set(labels))] handles = list(np.array(handles)[unique_indices]) ax.legend(handles=handles)