Source code for acispy.load_review

import os
from acispy.thermal_models import ThermalModelFromLoad
from acispy.plots import DatePlot
from acispy.utils import mylog, find_load, \
    lr_root, cti_simodes
from collections import defaultdict
from Ska.Matplotlib import cxctime2plotdate
import numpy as np
from datetime import datetime, timezone
import bisect
from cxotime import CxoTime

lr_file = "ACIS-LoadReview.txt"

colors = {"perigee": "dodgerblue",
          "apogee": "dodgerblue",
          "sim_trans": "brown",
          "radmon_disable": "orange",
          "radmon_enable": "orange",
          "start_cti": "darkgreen",
          "end_cti": "darkgreen"}

styles = {"perigee": "--",
          "apogee": ":",
          "sim_trans": "-",
          "radmon_enable": "--",
          "radmon_disable": "--",
          "start_cti": '--',
          "end_cti": '--'}

pretty_names = {"comm_ends": "End of Comm",
                "comm_begins": "Beginning of Comm",
                "perigee": "Perigee",
                "apogee": "Apogee",
                "letg_in": "LETG Inserted",
                "letg_out": "LETG Retracted",
                "hetg_in": "HETG Inserted",
                "hetg_out": "HETG Retracted",
                "radmon_enable": "Enable Radiation Monitor",
                "radmon_disable": "Disable Radiation Monitor",
                "start_cti": "Start CTI Run",
                "end_cti": "End CTI Run",
                "obsid_change": "Change of OBSID",
                "sim_trans": "SIM Translation",
                "enter_belts": "Enter Radiation Belts",
                "exit_belts": "Exit Radiation Belts",
                "fmt_change": "Change of Telemetry Format"}

offsets = {"sim_trans": 0.75}


class LoadReviewEvent(object):
    def __init__(self, name, event):
        self.event = event
        self.name = name

    def __str__(self):
        return self.name

    def __repr__(self):
        return pretty_names[self.name]

    def __getattr__(self, item):
        return self.event[item]


[docs]class ACISLoadReview(object): """ Parse data from a particular load review for access and plotting of data. Parameters ---------- load_name : string The name of the load to examine. Can be the full load specification, e.g. "AUG2717A", or the last letter can be omitted for the latest iteration, e.g. "MAY0216". get_msids : boolean, optional Whether or not to load MSID data as well as model data for temperatures. Default: False interpolate_msids : boolean, optional If True, MSIDs are interpolated to a time sequence that is common with the model data. Default: False tl_file : string, optional If MSID data is to be loaded, load from this tracelog file rather than the engineering archive. Default: None """ def __init__(self, load_name, get_msids=True, tl_file=None): self.load_name = find_load(load_name) self.load_letter = self.load_name[-1] self.load_week = self.load_name[:7] self.load_year = f"20{self.load_week[5:7]}" self.next_year = str(int(self.load_year)+1) loaddir = os.path.join(lr_root, self.load_year, self.load_week) oflsdir = os.path.join(loaddir, f"ofls{self.load_letter.lower()}") self.load_file = os.path.join(oflsdir, lr_file) self.events = defaultdict(dict) self.start_status = self._get_start_status() self.begin_radzone = int(self.start_status['radmon_status'] == "OORMPDS") self.lines, self.line_times = self._populate_event_times() self.ds = ThermalModelFromLoad(self.load_name, get_msids=get_msids, tl_file=tl_file) self._find_cti_runs() def __repr__(self): return self.load_name def __str__(self): return self.load_name def _get_start_status(self): j = -1 find_first_time = True time = None with open(self.load_file, "r") as f: for i, line in enumerate(f.readlines()): words = line.strip().split() if len(words) > 0: if line.startswith(self.load_year) or \ line.startswith(self.next_year): time = words[0] if find_first_time and time is not None: self.first_time = time find_first_time = False if line.startswith("CHANDRA STATUS ARRAY"): j = i+2 if i == j: status = line.strip().split()[-1] self.last_time = time status_values = status.strip("()").split(",") status = {"instrument": status_values[0], "hetg_status": status_values[1], "letg_status": status_values[2], "current_obsid": status_values[3], "radmon_status": status_values[4], "telemetry_format": status_values[5], "dither_status": status_values[6]} return status def _populate_event_times(self): lines = [] line_times = [] time = self.first_time comm_durations = [] with open(self.load_file, "r") as f: for i, line in enumerate(f.readlines()): words = line.strip().split() if len(words) > 0: event = None state = None if line.startswith(self.load_year) or \ line.startswith(self.next_year): time = words[0] if "MP_OBSID" in line: event = "obsid_change" state = words[-1] if "SIMTRANS" in line: event = "sim_trans" state = (int(words[-2]), words[-1].strip("()")) if "HETGIN" in line: event = "hetg_in" if "HETGRE" in line: event = "hetg_out" if "LETGIN" in line: event = "letg_in" if "LETGRE" in line: event = "letg_out" if "CSELFMT" in line and "COMMAND_HW" in line: event = "fmt_change" state = int(words[-1][-1]) if "EPERIGEE" in line and "ORBPOINT" in line: event = "perigee" if "APOGEE" in line and "ORBPOINT" in line: event = "apogee" if "COMM BEGINS" in line: event = "comm_begins" if "COMM ENDS" in line: event = "comm_ends" if "EEF1000" in line and "ORBPOINT" in line: event = "enter_belts" if "XEF1000" in line and "ORBPOINT" in line: event = "exit_belts" if "OORMPDS" in line and "COMMAND_SW" in line: event = "radmon_disable" if "OORMPEN" in line and "COMMAND_SW" in line: event = "radmon_enable" if event is not None: if event not in self.events: self.events[event] = {"times": []} if event == "comm_ends": time = CxoTime(CxoTime(words[0]).secs-1800.0).date self.events[event]["times"].append(time) if state is not None: if "state" not in self.events[event]: self.events[event]["state"] = [] self.events[event]["state"].append(state) if "REAL-TIME COMM" in line: continue if "COMM DURATION" in line: comm_durations.append(float(words[-2])-30.0) continue if line.startswith(self.load_year) or \ line.startswith(self.next_year) or \ "WSPOW COMMAND LOADS" in line or \ "CHANDRA STATUS ARRAY" in line or \ "ACIS integration time" in line or \ "requested time" in line or \ "ObsID change" in line or \ "THERE IS A Z-SIM" in line or \ "==> DITHER" in line: lines.append(line) line_times.append(time) line_times = CxoTime(line_times).secs if len(self.events["comm_begins"]) > 0: lines, line_times = self._fix_comm_times(lines, line_times, comm_durations) return lines, line_times def _find_cti_runs(self): self.events["start_cti"] = {"times": [], "state": []} self.events["end_cti"] = {"times": [], "state": []} si_modes = self.ds["si_mode"] power_cmds = self.ds["power_cmd"] for mode in cti_simodes: where_mode = np.logical_and(si_modes == mode, power_cmds == "XTZ0000005") idxs = np.concatenate([[False], where_mode, [False]]) idxs = np.flatnonzero(idxs[1:] != idxs[:-1]).reshape(-1, 2) for ii, jj in idxs: self.events["start_cti"]["times"].append(si_modes.dates[0,ii]) self.events["end_cti"]["times"].append(si_modes.dates[0,jj+1]) self.events["start_cti"]["state"].append(mode) self.events["end_cti"]["state"].append(mode) def list_attributes(self): for key in self.events.keys(): print("%s: %s" % (key, pretty_names[key])) def get_updated_dsn_comms(self): dsnfile = "/data/acis/dsn_summary.dat" if os.path.getsize(dsnfile) == 0: mylog.warning("DSN summary file is empty. Ignoring.") return tstart = CxoTime(self.first_time).secs tstop = CxoTime(self.last_time).secs bots = [] eots = [] new_durations = [] with open(dsnfile) as f: for line in f.readlines()[2:]: words = line.strip().split() bot = datetime.strptime("%s:%s:00:00:00" % (words[-4], words[-3].split(".")[0]), "%Y:%j:%H:%M:%S") eot = datetime.strptime("%s:%s:00:00:00" % (words[-2], words[-1].split(".")[0]), "%Y:%j:%H:%M:%S") time_bot = CxoTime(bot.strftime("%Y:%j:%H:%M:%S")).secs+86400.0*(float(words[-3]) % 1) time_eot = CxoTime(eot.strftime("%Y:%j:%H:%M:%S")).secs+86400.0*(float(words[-1]) % 1) new_durations.append((time_eot-time_bot)/60.0) if tstart <= time_bot <= tstop: bots.append(time_bot) if tstart <= time_eot <= tstop: eots.append(time_eot) self.events["comm_begins"]["times"] = CxoTime(bots).date self.events["comm_ends"]["times"] = CxoTime(eots).date self.lines, self.line_times = self._fix_comm_times(self.lines, self.line_times, new_durations) def _fix_comm_times(self, lines, line_times, comm_durations): new_lines = [] new_times = [] for i, line in enumerate(lines): if not "REAL-TIME COMM" in line and not "COMM DURATION" in line: new_lines.append(line) new_times.append(line_times[i]) for time in self.events["comm_begins"]["times"]: local_time = datetime.strptime(time, "%Y:%j:%H:%M:%S.%f").replace(tzinfo=timezone.utc).astimezone(tz=None) t = CxoTime(time).secs idx = bisect.bisect_right(new_times, t) new_times.insert(idx, t) new_lines.insert(idx, "%s REAL-TIME COMM BEGINS %s EDT" % (time, local_time.strftime("%Y:%j:%H:%M:%S"))) for i, time in enumerate(self.events["comm_ends"]["times"]): local_time = datetime.strptime(time, "%Y:%j:%H:%M:%S.%f").replace(tzinfo=timezone.utc).astimezone(tz=None) t = CxoTime(time).secs idx = bisect.bisect_right(new_times, t) new_times.insert(idx, t) new_lines.insert(idx, "%s REAL-TIME COMM ENDS %s EDT" % (time, local_time.strftime("%Y:%j:%H:%M:%S"))) new_times.insert(idx+1, t) new_lines.insert(idx+1, "==> COMM DURATION: %g mins." % comm_durations[i]) return new_lines, new_times def __getattr__(self, item): if item in self.events: return LoadReviewEvent(item, self.events[item]) else: raise AttributeError(f"'LoadReview' object has no attribute '{item}'") def _add_annotations(self, plot, annotations, tbegin, tend): for i, line in enumerate(plot.ax.lines): line.set_zorder(100-i) plot_comms = False plot_belts = False if "cti_runs" in annotations: annotations.append("start_cti") annotations.append("end_cti") annotations.remove("cti_runs") for key in annotations: if key == "comms": plot_comms = True continue if key == "belts": plot_belts = True continue color = colors[key] ls = styles[key] for i, t in enumerate(self.events[key]["times"]): tt = CxoTime(t).secs if tt < tbegin or tt > tend: continue plot.add_vline(t, color=color, ls=ls) if "state" in self.events[key] and key in offsets: text = self.events[key]["state"][i] if isinstance(text, tuple): text = text[-1] tdt = CxoTime(tt + 1800.0).date ymin, ymax = plot.ax.get_ylim() y = (1.0-offsets[key])*ymin+offsets[key]*ymax plot.add_text(tdt, y, text, fontsize=15, rotation='vertical', color=color, zorder=100) if plot_belts: self._plot_bands(tbegin, tend, plot, ["radmon_disable", "radmon_enable"], "mediumpurple", alpha=0.333333) if plot_comms: self._plot_bands(tbegin, tend, plot, ["comm_begins", "comm_ends"], "pink", alpha=1.0) def _plot_bands(self, tbegin, tend, plot, events, color, alpha=1.0): tc_start = list(self.events[events[0]]["times"]) tc_end = list(self.events[events[1]]["times"]) if tc_end[0] < tc_start[0]: tc_start.insert(0, self.first_time) if tc_start[-1] > tc_end[-1]: tc_end.append(self.last_time) assert len(tc_start) == len(tc_end) tc_start = CxoTime(tc_start).secs tc_end = CxoTime(tc_end).secs ybot, ytop = plot.ax.get_ylim() t = np.linspace(tbegin, tend, 500) tplot = cxctime2plotdate(t) for tcs, tce in zip(tc_start, tc_end): in_evt = (t >= tcs) & (t <= tce) plot.ax.fill_between(tplot, ybot, ytop, where=in_evt, color=color, alpha=alpha)
[docs] def plot(self, fields, field2=None, lw=1.5, fontsize=18, color=None, color2='magenta', figsize=(10, 8), plot=None, tbegin=None, tend=None, annotations=None, ymin=None, ymax=None, ymin2=None, ymax2=None): """ Plot temperature and state data from a load review. Parameters ---------- fields : tuple of strings or list of tuples of strings A single field or list of fields to plot on the left y-axis. field2 : tuple of strings, optional A single field to plot on the right y-axis. Default: None lw : float, optional The width of the lines in the plots. Default: 1.5 px. fontsize : integer, optional The font size for the labels in the plot. Default: 18 pt. color : list of strings, optional The color for the lines plotted on the left y-axis. Default: ["blue", "red", "green", "black"] color2 : string, optional The color for the line plotted on the right y-axis. Default: "magenta" figsize : tuple of integers, optional The size of the plot in (width, height) in inches. Default: (10, 8) plot : :class:`~acispy.plots.DatePlot` or :class:`~acispy.plots.CustomDatePlot`, optional An existing DatePlot to add this plot to. Default: None, one will be created if not provided. tbegin : string, float, or DateTime object, optional The start time of the plot. Default is to plot from the beginning of the load. tend : string, float, or DateTime object, optional The end time of the plot. Default is to plot to the ending of the load. annotations : list of strings, optional Additional annotations to add to the plot. Available options are "cti_runs", "comms", "belts", "perigee", "sim_trans", and "apogee". Default: None ymin : float, optional Set the minimum value of the y-axis on the left side of the plot. ymax : float, optional Set the maximum value of the y-axis on the left side of the plot. ymin2 : float, optional Set the minimum value of the y-axis on the right side of the plot. ymax2 : float, optional Set the maximum value of the y-axis on the right side of the plot. """ dp = DatePlot(self.ds, fields, field2=field2, lw=lw, fontsize=fontsize, color=color, color2=color2, figsize=figsize, plot=plot) ylimits = dp.ax.get_ylim() if ymin is None: ymin = ylimits[0] if ymax is None: ymax = ylimits[1] dp.set_ylim(ymin, ymax) if field2 is not None: ylimits2 = dp.ax2.get_ylim() if ymin2 is None: ymin2 = ylimits2[0] if ymax2 is None: ymax2 = ylimits2[1] dp.set_ylim2(ymin2, ymax2) if tbegin is None: tbegin = self.first_time if tend is None: tend = self.last_time tbegin = CxoTime(tbegin).secs tend = CxoTime(tend).secs if annotations is not None: self._add_annotations(dp, annotations.copy(), tbegin, tend) dp.set_xlim(CxoTime(tbegin).date, CxoTime(tend).date) return dp