Source code for acispy.dataset

from acispy.msids import MSIDs, CombinedMSIDs, ConcatenatedMSIDs
from acispy.states import States, cmd_state_codes
from acispy.model import Model
from acispy.units import APQuantity, APStringArray
from acispy.fields import create_builtin_derived_msids, \
    DerivedField, FieldContainer, OutputFieldFunction, \
    OutputFieldsNotFound, create_builtin_derived_states
from acispy.time_series import TimeSeriesData, EmptyTimeSeries
from acispy.utils import get_display_name, moving_average, \
    ensure_list
from acispy.units import get_units
import numpy as np
import Ska.engarchive.fetch_sci as fetch
from cxotime import CxoTime


[docs]class Dataset(object): def __init__(self, msids, states, model): self.msids = msids self.states = states self.fields = FieldContainer() self.field_list = [] for ftype in ["msids", "states"]: obj = getattr(self, ftype) self._populate_fields(ftype, obj) if isinstance(model, TimeSeriesData): self.model = model self._populate_fields("model", self.model) else: for key, value in model.items(): setattr(self, key, value) self._populate_fields(key, value) if not isinstance(self.msids, EmptyTimeSeries): create_builtin_derived_msids(self) if not isinstance(self.states, EmptyTimeSeries): create_builtin_derived_states(self) self.data = {} self.state_codes = {} if hasattr(self.msids, "state_codes"): for k, v in self.msids.state_codes.items(): self.state_codes["msids", k] = v self.state_codes.update(cmd_state_codes) self._times = {} self._dates = {} self._checked_fields = [] def _populate_fields(self, ftype, obj): for fname in obj.keys(): func = OutputFieldFunction(ftype, fname) unit = str(getattr(obj[fname], "unit", "")) display_name = get_display_name(ftype, fname) df = DerivedField(ftype, fname, func, unit, display_name=display_name) self.fields.output_fields[ftype, fname] = df if ftype not in self.fields.types: self.fields.types.append(ftype) self.field_list.append((ftype, fname)) def __getitem__(self, item): fd = self._determine_field(item) if fd not in self.data: self.data[fd] = self.fields[fd](self) return self.data[fd] def __contains__(self, item): fd = self._determine_field(item) return fd in self.fields def _determine_field(self, field): if field not in self._checked_fields: if isinstance(field, tuple): if len(field) != 2: raise RuntimeError(f"Invalid field specification {format}!") fd = (field[0].lower(), field[1].lower()) if fd in self.fields: checked_field = fd else: raise RuntimeError(f"Cannot find field {field}!") elif isinstance(field, str): fd = field.lower() candidates = [] for ftype in self.fields.types: if (ftype, fd) in self.fields: candidates.append((ftype, fd)) if len(candidates) > 1: msg = f"Multiple field types for field name {field}!\n" for c in candidates: msg += f" {c}\n" raise RuntimeError(msg) elif len(candidates) == 0: raise RuntimeError(f"Cannot find field {field}!") else: checked_field = candidates[0] else: raise RuntimeError(f"Invalid field specification {field}!") else: checked_field = field return checked_field @property def derived_field_list(self): return list(self.fields.derived_fields.keys()) def _check_derived_field(self, field, df): if df.depends is not None: dep_list = [] for fd in df.depends: if fd not in self.fields.list_all_fields(): dep_list.append(fd) if len(dep_list) > 0: raise OutputFieldsNotFound(field, dep_list) @classmethod def from_hdf5(cls, filename): import h5py f = h5py.File(filename, "r") if "msids" in f: msids = MSIDs.from_hdf5(f["msids"]) else: msids = EmptyTimeSeries() if "states" in f: states = States.from_hdf5(f["states"]) else: states = EmptyTimeSeries() if "model" in f: model = Model.from_hdf5(f["model"]) else: model = EmptyTimeSeries() f.close() return cls(msids, states, model) def write_hdf5(self, filename, overwrite=True): import h5py import os if os.path.exists(filename) and not overwrite: raise IOError(f"The file {filename} already exists and overwrite=False!!") f = h5py.File(filename, "w") if not self.msids._is_empty: gmsids = f.create_group("msids") for k, v in self.msids.items(): d = gmsids.create_dataset(k, data=v.value) d.attrs["times"] = v.times if hasattr(v, "mask"): d.attrs["mask"] = v.mask if hasattr(v, "unit"): d.attrs["unit"] = v.unit gmsids.attrs["state_codes"] = self.msids.state_codes gmsids.attrs["derived_msids"] = self.msids.derived_msids if not self.states._is_empty: gstates = f.create_group("states") for k, v in self.states.items(): d = gstates.create_dataset(k, data=v.value) d.attrs["times"] = v.times if hasattr(v, "unit"): d.attrs["unit"] = v.unit if not self.model._is_empty: gmodel = f.create_group("model") for k, v in self.model.items(): d = gmodel.create_dataset(k, data=v.value) d.attrs["times"] = v.times if hasattr(v, "mask"): d.attrs["mask"] = v.mask d.attrs["unit"] = v.unit f.flush() f.close()
[docs] def add_derived_field(self, ftype, fname, function, units, display_name=None, depends=None): """ Add a new derived field. Parameters ---------- ftype : string The type of the field to add. fname : string The name of the field to add. function : function The function which computes the field. units : string The units of the field. times : array or tuple The timing data for the field in seconds from the beginning of the mission. Can supply an array of times or a field specification. If the latter, then the times for this field will be used. display_name : string, optional The name to use when displaying the field in plots. Examples -------- >>> def _dpaa_power(ds): ... return (ds["msids", "1dp28avo"]*ds["msids", "1dpicacu"]).to("W") >>> ds.add_derived_field("msids", "dpa_a_power", _dpaa_power, ... "W", display_name="DPA-A Power") """ df = DerivedField(ftype, fname, function, units, display_name=display_name, depends=depends) self._check_derived_field((ftype, fname), df) self.fields.derived_fields[ftype, fname] = df if ftype not in self.fields.types: self.fields.types.append(ftype)
[docs] def add_averaged_field(self, field, n=10): """ Add a new field from an average of another. Parameters ---------- field : string or (type, name) tuple The field to be averaged. n : integer, optional The number of samples to average over. Default: 5 Examples -------- >>> ds.add_averaged_field(("msids", "1dpicacu"), n=10) """ ftype, fname = self._determine_field(field) def _avg(ds): v = ds[ftype, fname] return APQuantity(moving_average(v.value, n=n), v.times, unit=v.unit, mask=v.mask) display_name = "Average %s" % self.fields[ftype, fname].display_name units = get_units(ftype, fname) self.add_derived_field(ftype, "avg_%s" % fname, _avg, units, display_name=display_name, depends=[(ftype, fname)])
[docs] def map_state_to_msid(self, state, msid, ftype="msids"): """ Create a new derived field by interpolating a state to the times of a MSID or model component. Parameters ---------- state : string The state to be interpolated. msid : string The msid or model component to interpolate the state to. ftype : string, optional The field type to use. "msids" or "model". Default: "msids" Examples -------- >>> ds.map_state_to_msid("ccd_count", "1dpamzt") """ state = state.lower() msid = msid.lower() ftype = ftype.lower() units = get_units("states", state) def _state(ds): msid_times = ds.times(ftype, msid) state_times = ds.times("states", state)[1] indexes = np.searchsorted(state_times, msid_times) v = ds["states", state][indexes].value if v.dtype.char in ['S', 'U']: return APStringArray(v, msid_times) else: return APQuantity(v, msid_times, unit=units) self.add_derived_field(ftype, state, _state, units, display_name=self.fields["states", state].display_name, depends=[(ftype, msid)])
[docs] def add_diff_data_model_field(self, msid, ftype_model="model"): r""" Create a field which gives the difference between the data and the model for a particular MSID. Parameters ---------- msid : string The MSID to take the diff of data and model of. ftype_model : string, optional The model type (e.g., "model", "model0", etc.) of the model field to be diffed with the MSID. """ msid = msid.lower() ftype_model = ftype_model.lower() units = get_units("msids", msid) def _diff(ds): return ds["msids", msid]-ds[ftype_model, msid] display_name = self.fields["msids", msid].display_name.replace('_', '\_') self.add_derived_field(ftype_model, "diff_%s" % msid, _diff, units, display_name="$\mathrm{\Delta(%s)}$" % display_name, depends=[("msids", msid), (ftype_model, msid)])
[docs] def times(self, *args): """ Return the timing information in seconds from the beginning of the mission for a field. Examples -------- >>> ds.times("msids", "1deamzt") """ if len(args) > 1: field = args[0], args[1] else: field = args[0] ftype, fname = self._determine_field(field) if (ftype, fname) not in self._times: self._times[ftype, fname] = self[ftype, fname].times return self._times[ftype, fname]
[docs] def dates(self, *args): """ Return the timing information in date and time for a field. Examples -------- >>> ds.dates("states", "pitch") """ if len(args) > 1: field = args[0], args[1] else: field = args[0] ftype, fname = self._determine_field(field) if (ftype, fname) not in self._dates: self._dates[ftype, fname] = self[ftype, fname].dates return self._dates[ftype, fname]
[docs] def write_msids(self, filename, fields, mask=None, overwrite=False): """ Write MSIDs (or MSID-like quantities such as model values) to an ASCII table file. This assumes that all of the quantities have been interpolated to a common set of times. Parameters ---------- filename : string The filename to write the quantities to. fields : list of (type, name) field specifications The quantities to be written to the ASCII table. overwrite : boolean, optional If True, an existing file with the same name will be overwritten. """ from astropy.table import Table fields = ensure_list(fields) base_times = self.dates(*fields[0]) if mask is None: mask = slice(None, None, None) if len(fields) > 1: for field in fields[1:]: if not np.all(base_times == self.dates(*field)): raise RuntimeError("To write MSIDs, all of the times should be the same," + "but '%s', '%s' does not have the same " % field + "set of times as '%s', '%s'!" % (fields[0][0], fields[0][1])) data = dict(("_".join(k), self[k].value[mask]) for k in fields) data["time"] = self.times(*fields[0]).value[mask] data["date"] = self.dates(*fields[0])[mask] Table(data).write(filename, format='ascii', overwrite=overwrite)
[docs] def write_states(self, filename, overwrite=False): """ Write commanded states to an ASCII table file. An error will be thrown if there are no commanded states present. Parameters ---------- filename : string The filename to write the states to. overwrite : boolean, optional If True, an existing file with the same name will be overwritten. """ from astropy.table import Table if isinstance(self.states, EmptyTimeSeries): raise RuntimeError("There are no commanded states to be written!") Table(dict((k,v.value) for k, v in self.states.items())).write(filename, format='ascii', overwrite=overwrite)
[docs] def plot(self, fields, field2=None, lw=2, ls='-', ls2='-', lw2=2, fontsize=18, color=None, color2='magenta', figsize=(10, 8), plot=None, fig=None, subplot=None, plot_bad=False): r""" Make a single-panel plot of a quantity (or multiple quantities) vs. date and time from this Dataset. Multiple quantities can be plotted on the left y-axis together if they have the same units, otherwise a quantity with different units can be plotted on the right y-axis. Parameters ---------- fields : tuple of strings or list of tuples of strings A single field or list of fields to plot on the left y-axis. field2 : tuple of strings, optional A single field to plot on the right y-axis. Default: None lw : float or list of floats, optional The width of the lines in the plots. If a list, the length of a the list must be equal to the number of fields. If a single number, it will apply to all plots. Default: 2 px. ls : string, optional The line style of the lines plotted on the left y-axis. Can be a single linestyle or more than one for each line. Default: '-' ls2 : string, optional The line style of the line plotted on the right y-axis. Can be a single linestyle or more than one for each line. Default: '-' lw2 : float, optional The width of the line plotted on the right y-axis. fontsize : integer, optional The font size for the labels in the plot. Default: 18 pt. color : list of strings, optional The colors for the lines plotted on the left y-axis. Can be a single color or more than one in a list. Default: Use the default Matplotlib order of colors. color2 : string, optional The color for the line plotted on the right y-axis. Default: "magenta" fig : :class:`~matplotlib.figure.Figure`, optional A Figure instance to plot in. Default: None, one will be created if not provided. figsize : tuple of integers, optional The size of the plot in (width, height) in inches. Default: (10, 8) plot : :class:`~acispy.plots.DatePlot` or :class:`~acispy.plots.CustomDatePlot`, optional An existing DatePlot to add this plot to. Default: None, one will be created if not provided. plot_bad : boolean, optional If True, "bad" values will be plotted but the ranges of bad values will be marked with translucent blue rectangles. If False, bad values will be removed from the plot. Default: False """ from acispy.plots import DatePlot dp = DatePlot(self, fields, field2=field2, lw=lw, ls=ls, ls2=ls2, lw2=lw2, fontsize=fontsize, color=color, color2=color2, figsize=figsize, plot=plot, fig=fig, subplot=subplot, plot_bad=plot_bad) return dp
[docs]class EngArchiveData(Dataset): """ Fetch MSIDs from the engineering archive and states from the commanded states database. Parameters ---------- tstart : string The start time in YYYY:DOY:HH:MM:SS format tstop : string The stop time in YYYY:DOY:HH:MM:SS format msids : list of strings, optional List of MSIDs to pull from the engineering archive. get_states : boolean, optional Whether or not to retrieve commanded states from kadi. Default: True filter_bad : boolean, optional Whether or not to filter out bad values of MSIDs. Default: False. stat : string, optional return 5-minute or daily statistics ('5min' or 'daily'), or None for raw data. Default: '5min' state_keys : list of strings, optional The states to pull from kadi. If not specified, a default set will be pulled. interpolate : string, optional Whether or not to interpolate to a common set of times, either "nearest" or "linear" interpolation. If the *interpolate_times* argument is not set, then the default is to interpolate at 328 second intervals. Default: None, indicating no interpolation. interpolate_times : array_like of times, optional An array-like object of times to interpolate the MSID data to. Default: None, which means that if *interpolate* is not None the MSIDs will be interpolated at 328 second intervals. Examples -------- >>> from acispy import EngArchiveData >>> tstart = "2016:091:12:05:00.100" >>> tstop = "2016:100:13:07:45.234" >>> msids = ["1deamzt", "1pin1at"] >>> ds = EngArchiveData(tstart, tstop, msids) """ def __init__(self, tstart, tstop, msids, get_states=True, filter_bad=False, stat='5min', state_keys=None, interpolate=None, interpolate_times=None): tstart = CxoTime(tstart).date tstop = CxoTime(tstop).date msids = MSIDs.from_database(msids, tstart, tstop=tstop, filter_bad=filter_bad, stat=stat, interpolate=interpolate, interpolate_times=interpolate_times) if get_states: states = States.from_kadi_states(tstart, tstop, state_keys=state_keys) else: states = EmptyTimeSeries() model = EmptyTimeSeries() super(EngArchiveData, self).__init__(msids, states, model)
[docs]class MaudeData(Dataset): """ Fetch MSID data from Maude. Parameters ---------- tstart : string The start time in YYYY:DOY:HH:MM:SS format tstop : string The stop time in YYYY:DOY:HH:MM:SS format msids : list of strings, optional List of MSIDs to pull from the engineering archive. get_states : boolean, optional Whether or not to retrieve commanded states from kadi. Default: True user : string, optional OCCWEB username to access the MAUDE database with. Default: None, which will use the username in the ${HOME}/.netrc file. password : string, optional OCCWEB password to access the MAUDE database with. Default: None, which will use the password in the ${HOME}/.netrc file. state_keys : list of strings, optional The states to pull from kadi. If not specified, a default set will be pulled. """ def __init__(self, tstart, tstop, msids, get_states=True, user=None, password=None, other_msids=None, state_keys=None): tstart = CxoTime(tstart).date tstop = CxoTime(tstop).date msids = MSIDs.from_maude(msids, tstart, tstop=tstop, user=user, password=password) if other_msids is not None: msids2 = MSIDs.from_database(other_msids, tstart, tstop) msids = CombinedMSIDs([msids, msids2]) if get_states: states = States.from_kadi_states(tstart, tstop, state_keys=state_keys) else: states = EmptyTimeSeries() model = EmptyTimeSeries() super(MaudeData, self).__init__(msids, states, model)
def _parse_tracelogs(tbegin, tend, filenames, other_msids): filenames = ensure_list(filenames) if tbegin is not None: tbegin = CxoTime(tbegin).date if tend is not None: tend = CxoTime(tend).date msid_objs = [] for filename in filenames: # Figure out what kind of file this is f = open(filename, "r") line = f.readline() f.close() if line.startswith("TIME"): msids = MSIDs.from_tracelog(filename, tbegin=tbegin, tend=tend) elif line.startswith("#YEAR") or line.startswith("YEAR"): msids = MSIDs.from_mit_file(filename, tbegin=tbegin, tend=tend) else: raise RuntimeError("I cannot parse this file!") msid_objs.append(msids) if other_msids is not None: msid_objs.append(MSIDs.from_database(other_msids, tbegin, tend)) all_msids = CombinedMSIDs(msid_objs) return all_msids
[docs]class TracelogData(Dataset): """ Fetch MSIDs from a tracelog file and states from the commanded states database. Parameters ---------- filenames : string or list of strings The path to the tracelog file or list of tracelog files tbegin : string The start time in YYYY:DOY:HH:MM:SS format. Default: None, which will read from the beginning of the tracelog. tend : string The stop time in YYYY:DOY:HH:MM:SS format. Default: None, which will read from the beginning of the tracelog. get_states : boolean, optional Whether or not to retrieve commanded states from kadi. Default: True state_keys : list of strings, optional The states to pull from kadi. If not specified, a default set will be pulled. Examples -------- >>> from acispy import TracelogData >>> ds = TracelogData("acisENG10d_00985114479.70.tl") """ def __init__(self, filenames, tbegin=None, tend=None, other_msids=None, get_states=True, state_keys=None): msids = _parse_tracelogs(tbegin, tend, filenames, other_msids) tmin = 1.0e55 tmax = -1.0e55 for v in msids.values(): tmin = min(v.times[0].value, tmin) tmax = max(v.times[-1].value, tmax) if get_states: states = States.from_kadi_states(tmin, tmax, state_keys=state_keys) else: states = EmptyTimeSeries() model = EmptyTimeSeries() super(TracelogData, self).__init__(msids, states, model)
[docs]class EngineeringTracelogData(TracelogData): """ Fetch MSIDs from the engineering tracelog file and states from the commanded states database. Parameters ---------- tbegin : string The start time in YYYY:DOY:HH:MM:SS format. Default: None, which will read from the beginning of the tracelog. tend : string The stop time in YYYY:DOY:HH:MM:SS format. Default: None, which will read from the beginning of the tracelog. get_states : boolean, optional Whether or not to retrieve commanded states from kadi. Default: True state_keys : list of strings, optional The states to pull from kadi. If not specified, a default set will be pulled. """ def __init__(self, tbegin=None, tend=None, other_msids=None, get_states=True, state_keys=None): filename = "/data/acis/eng_plots/acis_eng_10day.tl" super(EngineeringTracelogData, self).__init__( filename, tbegin=tbegin, tend=tend, other_msids=other_msids, get_states=get_states, state_keys=state_keys)
[docs]class DEAHousekeepingTracelogData(TracelogData): """ Fetch MSIDs from the DEA housekeeping tracelog file and states from the commanded states database. Parameters ---------- tbegin : string The start time in YYYY:DOY:HH:MM:SS format. Default: None, which will read from the beginning of the tracelog. tend : string The stop time in YYYY:DOY:HH:MM:SS format. Default: None, which will read from the beginning of the tracelog. get_states : boolean, optional Whether or not to retrieve commanded states from kadi. Default: True state_keys : list of strings, optional The states to pull from kadi. If not specified, a default set will be pulled. """ def __init__(self, tbegin=None, tend=None, other_msids=None, get_states=True, state_keys=None): filename = "/data/acis/eng_plots/acis_dea_10day.tl" super(DEAHousekeepingTracelogData, self).__init__( filename, tbegin=tbegin, tend=tend, other_msids=other_msids, get_states=get_states, state_keys=state_keys)
[docs]class TenDayTracelogData(TracelogData): """ Fetch MSIDs from both the engineering and DEA housekeeping tracelog files in one dataset. Parameters ---------- tbegin : string The start time in YYYY:DOY:HH:MM:SS format. Default: None, which will read from the beginning of the tracelog. tend : string The stop time in YYYY:DOY:HH:MM:SS format. Default: None, which will read from the beginning of the tracelog. get_states : boolean, optional Whether or not to retrieve commanded states from kadi. Default: True state_keys : list of strings, optional The states to pull from kadi. If not specified, a default set will be pulled. """ def __init__(self, tbegin=None, tend=None, other_msids=None, get_states=True, state_keys=None): filenames = ["/data/acis/eng_plots/acis_eng_10day.tl", "/data/acis/eng_plots/acis_dea_10day.tl"] super(TenDayTracelogData, self).__init__( filenames, tbegin=tbegin, tend=tend, other_msids=other_msids, get_states=get_states, state_keys=state_keys)
[docs]class TelemData(Dataset): """ Fetch MSID data from the Ska engineering archive as well as either Maude or one of the tracelog files, in order to ensure the most recent data is obtained. Parameters ---------- tstart : string The start time in YYYY:DOY:HH:MM:SS format tstop : string The stop time in YYYY:DOY:HH:MM:SS format msids : list of strings, optional List of MSIDs to pull from the engineering archive. recent_source : string, optional Which source to use to get the most recent tracelog data. Options are "maude" or "tracelog". Default: "tracelog" filter_bad : boolean, optional Whether or not to filter out bad values of MSIDs. Default: False. stat : string, optional return 5-minute or daily statistics ('5min' or 'daily'), or None for raw data. Default: '5min' user : string, optional OCCWEB username to access the MAUDE database with. Default: None, which will use the username in the ${HOME}/.netrc file. password : string, optional OCCWEB password to access the MAUDE database with. Default: None, which will use the password in the ${HOME}/.netrc file. get_states : boolean, optional Whether or not to retrieve commanded states from kadi. Default: True state_keys : list of strings, optional The states to pull from kadi. If not specified, a default set will be pulled. """ def __init__(self, tstart, tstop, msids, recent_source="maude", filter_bad=False, stat='5min', user=None, password=None, get_states=True, state_keys=None): msids = ensure_list(msids) tstart = CxoTime(tstart).secs tstop = CxoTime(tstop).secs tmid = 1.0e99 for msid in msids: tm = fetch.get_time_range(msid, format="secs")[-1] tmid = min(tmid, tm) tmid = CxoTime(tmid).secs if tmid < tstop: msids1 = MSIDs.from_database(msids, tstart, tstop=tmid, filter_bad=filter_bad, stat=stat) if recent_source == "maude": msids2 = MSIDs.from_maude(msids, tmid, tstop=tstop, user=user, password=password) elif recent_source == "tracelog": msids2 = _parse_tracelogs(tmid, tstop, ["/data/acis/eng_plots/acis_eng_10day.tl", "/data/acis/eng_plots/acis_dea_10day.tl"], None) msids = ConcatenatedMSIDs(msids1, msids2) else: msids = MSIDs.from_database(msids, tstart, tstop=tstop, filter_bad=filter_bad, stat=stat) if get_states: states = States.from_kadi_states(tstart, tstop, state_keys=state_keys) else: states = EmptyTimeSeries() model = EmptyTimeSeries() super(TelemData, self).__init__(msids, states, model)