import xija
from astropy.units import Quantity
from astropy.io import ascii
from acispy.dataset import Dataset
from acispy.plots import DatePlot
import numpy as np
from cxotime import CxoTime
from acispy.states import States
from acispy.model import Model
from acispy.msids import MSIDs
from acispy.time_series import EmptyTimeSeries
from acispy.utils import mylog, \
ensure_list, plotdate2cxctime, \
dict_to_array
import Ska.Numpy
import Ska.engarchive.fetch_sci as fetch
import matplotlib.pyplot as plt
from kadi import events, commands
import importlib
from matplotlib import font_manager
from pathlib import Path
from acis_thermal_check import get_acis_limits
short_name = {"1deamzt": "dea",
"1dpamzt": "dpa",
"1pdeaat": "psmc",
"fptemp_11": "acisfp",
"tmp_fep1_mong": "fep1_mong",
"tmp_fep1_actel": "fep1_actel",
"tmp_fep1_fb": "fep1_fb",
"tmp_bep_pcb": "bep_pcb",
"aacccdpt": "aca",
"pftank2t": "pftank2t",
"fwdblkhd": "4rt700t",
"minusyz": "minusyz"}
short_name_rev = {v: k for k, v in short_name.items()}
full_name = {"1deamzt": "DEA",
"1dpamzt": "DPA",
"1pdeaat": "PSMC",
"fptemp_11": "Focal Plane",
"tmp_fep1_mong": "FEP1 Mongoose",
"tmp_fep1_actel": "FEP1 Actel",
"tmp_fep1_fb": "FEP1 FB",
"tmp_bep_pcb": "BEP PCB"}
acis_models = ["1deamzt",
"1dpamzt",
"1pdeaat",
"fptemp_11",
"tmp_fep1_mong",
"tmp_fep1_actel",
"tmp_bep_pcb"]
model_classes = {
"dpa": "DPACheck",
"dea": "DEACheck",
"psmc": "PSMCCheck",
"acisfp": "ACISFPCheck",
"fep1_mong": "FEP1MongCheck",
"fep1_actel": "FEP1ActelCheck",
"bep_pcb": "BEPPCBCheck"
}
limits_map = {
"odb.caution.high": "yellow_hi",
"odb.caution.low": "yellow_lo",
"safety.caution.high": "yellow_hi",
"safety.caution.low": "yellow_lo",
"planning.warning.high": "planning_hi",
"planning.warning.low": "planning_lo",
"planning.data_quality.high.acisi": "acis_i",
"planning.data_quality.high.aciss": "acis_s",
"planning.data_quality.high.aciss_hot": "acis_hot",
"planning.data_quality.high.cold_ecs": "cold_ecs"
}
def find_json(name, model_spec, repo_path, version):
from xija.get_model_spec import get_xija_model_spec
msg = f"The JSON file {model_spec} does not exist! Please " \
f"specify a JSON file using the 'model_spec' keyword argument."
if model_spec is None:
name = short_name.get(name, name)
try:
model_spec, version = get_xija_model_spec(name,
repo_path=repo_path,
version=version)
except ValueError:
raise IOError(msg)
mylog.info("Using model for %s from chandra_models version = %s", name, version)
else:
model_path = Path(model_spec).resolve()
if not model_path.exists():
raise IOError(msg)
mylog.info("model_spec = %s", model_path)
return model_spec
class ModelDataset(Dataset):
def __init__(self, msids, states, model):
super(ModelDataset, self).__init__(msids, states, model)
def write_model(self, filename, overwrite=False):
"""
Write the model data vs. time to an ASCII text file.
Parameters
----------
filename : string
The filename to write the data to.
overwrite : boolean, optional
If True, an existing file with the same name will be overwritten.
"""
if Path(filename).exists() and not overwrite:
raise IOError(f"File {filename} already exists, but overwrite=False!")
names = []
arrays = []
for i, msid in enumerate(self.model.keys()):
if i == 0:
times = self.times("model", msid).value
dates = self.dates("model", msid)
names += ['time', 'date']
arrays += [times, dates]
names.append(msid)
arrays.append(self["model", msid].value)
temp_array = np.rec.fromarrays(arrays, names=names)
fmt = {(name, '%.2f') for name in names if name != "date"}
out = open(filename, 'w')
Ska.Numpy.pprint(temp_array, fmt, out)
out.close()
def write_model_and_data(self, filename, overwrite=False,
mask_radzones=False, mask_fmt1=False,
mask_badtimes=True, tstart=None,
tstop=None):
"""
Write the model, telemetry, and states data vs. time to
an ASCII text file. The state data is interpolated to the
times of the model so that everything is at a common set
of times.
Parameters
----------
filename : string
The filename to write the data to.
overwrite : boolean, optional
If True, an existing file with the same name will be overwritten.
"""
states_to_map = ["vid_board", "pitch", "clocking", "simpos",
"ccd_count", "fep_count", "off_nom_roll"]
out = []
for i, msid in enumerate(self.model.keys()):
if i == 0:
if self.states._is_empty:
out += [("model", state) for state in states_to_map]
else:
for state in states_to_map:
self.map_state_to_msid(state, msid)
out.append(("msids", state))
out.append(("model", msid))
if ("msids", msid) in self.field_list:
self.add_diff_data_model_field(msid)
out += [("msids", msid), ("model", f"diff_{msid}")]
msid = list(self.model.keys())[0]
telem = self["msids", msid]
mask = np.ones_like(telem.value, dtype='bool')
if tstart is not None:
tstart = CxoTime(tstart).secs
mask[telem.times.value < tstart] = False
if tstop is not None:
tstop = CxoTime(tstop).secs
mask[telem.times.value > tstop] = False
if mask_radzones:
rad_zones = events.rad_zones.filter(start=telem.dates[0],
stop=telem.dates[-1])
for rz in rad_zones:
idxs = np.logical_and(telem.times.value >= rz.tstart,
telem.times.value <= rz.tstop)
mask[idxs] = False
if mask_fmt1:
which = self["msids", "ccsdstmf"] == "FMT1"
mask[which] = False
self.write_msids(filename, out, overwrite=overwrite, mask=mask)
def _get_msids(self, model, comps, tl_file):
comps = [comp.lower() for comp in comps]
times = model[comps[0]].times.value
tstart = times[0] - 700.0
tstop = times[-1] + 700.0
start = CxoTime(tstart).date
stop = CxoTime(tstop).date
if tl_file is not None:
msids = MSIDs.from_tracelog(tl_file, tbegin=tstart, tend=tstop)
else:
if "earth_solid_angle" in comps:
comps.remove("earth_solid_angle")
comps.append("ccsdstmf")
tlast = 1.0e99
for comp in comps:
tlast = min(fetch.get_time_range(comp, format='secs')[1], tlast)
if tstop > tlast:
raise RuntimeError("The model extends past the the last date in the "
"engineering archive. Please set get_msids=False.")
msids = MSIDs.from_database(comps, start, tstop=stop, filter_bad=True,
interpolate='nearest', interpolate_times=times)
return msids
def make_dashboard_plots(self, msid, tstart=None, tstop=None, yplotlimits=None,
errorplotlimits=None, fig=None, figfile=None,
bad_times=None, mask_radzones=False, plot_limits=True,
mask_fmt1=False):
"""
Make dashboard plots for the particular thermal model.
Parameters
----------
msid : string
The MSID name to plot in the dashboard.
tstart : string, optional
The start time of the data for the dashboard plot. If not specified,
the beginning of the thermal model run is used.
tstop : string, optional
The stop time of the data for the dashboard plot. If not specified,
the end of the thermal model run is used.
yplotlimits : two-element array_like, optional
The (min, max) bounds on the temperature to use for the
temperature vs. time plot. Default: Determine the min/max
bounds from the telemetry and model prediction and
decrease/increase by degrees to determine the plot limits.
errorplotlimits : two-element array_like, optional
The (min, max) error bounds to use for the error plot.
Default: [-15, 15]
fig : :class:`~matplotlib.figure.Figure`, optional
A Figure instance to plot in. Default: None, one will be
created if not provided.
figfile : string, optional
The file to write the dashboard plot to. One will be created
if not provided.
bad_times : list of tuples, optional
Provide a set of times to exclude from the creation of the
dashboard plot.
mask_radzones : boolean, optional
If True, mask out radzone periods for dashboard plots of the
focal plane model. Default: False
plot_limits : boolean, optional
If True, plot the yellow caution and planning limits on the
dashboard plots. Default: True
"""
from xijafit import dashboard as dash
if fig is None:
fig = plt.figure(figsize=(20,10))
if ("msids", msid) not in self.field_list:
raise RuntimeError("You must include the real data if you want to make a "
"dashboard plot! Set get_msids=True when creating the"
"thermal model!")
telem = self["msids", msid]
pred = self["model", msid]
mask = np.logical_and(telem.mask, pred.mask)
if tstart is not None:
tstart = CxoTime(tstart).secs
mask[telem.times.value < tstart] = False
if tstop is not None:
tstop = CxoTime(tstop).secs
mask[telem.times.value > tstop] = False
if bad_times is not None:
for (left, right) in bad_times:
idxs = np.logical_and(telem.times.value >= CxoTime(left).secs,
telem.times.value <= CxoTime(right).secs)
mask[idxs] = False
if msid == "fptemp_11" and mask_radzones:
rad_zones = events.rad_zones.filter(start=telem.dates[0],
stop=telem.dates[-1])
for rz in rad_zones:
idxs = np.logical_and(telem.times.value >= rz.tstart,
telem.times.value <= rz.tstop)
mask[idxs] = False
if mask_fmt1:
which = self["msids", "ccsdstmf"] == "FMT1"
mask[which] = False
times = telem.times.value[mask]
if yplotlimits is None:
ymin = min(telem.value[mask].min(), pred.value[mask].min())-2
ymax = min(telem.value[mask].max(), pred.value[mask].max())+2
yplotlimits = [ymin, ymax]
if errorplotlimits is None:
errorplotlimits = [-5, 5]
if plot_limits:
m = "fptemp" if msid == "fptemp_11" else msid
mylimits = self.limits[m]
else:
mylimits = {"units": "C"}
dash.dashboard(pred.value[mask], telem.value[mask], times, mylimits,
msid=msid, modelname=full_name.get(msid, msid),
errorplotlimits=errorplotlimits, yplotlimits=yplotlimits,
fig=fig, savefig=False)
if figfile is not None:
fig.savefig(figfile)
return fig
[docs]class ThermalModelFromRun(ModelDataset):
"""
Fetch multiple temperature models and their associated commanded states
from ASCII table files generated by xija or model check tools. If MSID
data will be added, it will be interpolated to the times of the model
data.
Parameters
----------
loc : string or list of strings
Path to the directory where the model and state data are stored.
get_msids : boolean, optional
Whether or not to load the MSIDs corresponding to the
temperature models for the same time period from the
engineering archive. Default: False.
tl_file : string
Path to the location of the tracelog file to get the MSID data from.
Default: None, which means the engineering archive will be queried
if get_msids=True.
Examples
--------
>>> from acispy import ThermalModelFromRun
>>> ds = ThermalModelFromRun("/data/acis/LoadReviews/2019/MAY2019/ofls/out_dpa",
... get_msids=True)
"""
def __init__(self, loc, get_msids=False, tl_file=None):
loc = Path(loc)
temp_file = loc / "temperatures.dat"
state_file = loc / "states.dat"
esa_file = loc / "earth_solid_angle.dat"
if not state_file.exists():
state_file = None
if not esa_file.exists():
esa_file = None
model = Model.from_load_file(temp_file, esa_file=esa_file)
comps = list(model.keys())
if state_file is not None:
states = States.from_load_file(state_file)
else:
states = EmptyTimeSeries()
if get_msids:
msids = self._get_msids(model, comps, tl_file)
else:
msids = EmptyTimeSeries()
super(ThermalModelFromRun, self).__init__(msids, states, model)
[docs]class ThermalModelFromLoad(ModelDataset):
"""
Fetch a temperature model and its associated commanded states
from a load review. Optionally get MSIDs for the same time period.
If MSID data will be added, it will be interpolated to the times
of the model data.
Parameters
----------
load : string
The load review to get the model from, i.e. "JAN2516A".
comps : list of strings, optional
List of temperature components to get from the load models. If
not specified all four components will be loaded.
get_msids : boolean, optional
Whether or not to load the MSIDs corresponding to the
temperature models for the same time period from the
engineering archive. Default: False.
states_comp : string, optional
The thermal model page to use to get the states. "DEA", "DPA",
"PSMC", or "FP". Default: "DPA"
Examples
--------
>>> from acispy import ThermalModelFromLoad
>>> comps = ["1deamzt", "1pdeaat", "fptemp_11"]
>>> ds = ThermalModelFromLoad("APR0416C", comps, get_msids=True)
"""
def __init__(self, load, comps=None, get_msids=False,
tl_file=None, states_comp="DPA"):
if comps is None:
comps = ["1deamzt", "1dpamzt", "1pdeaat", "fptemp_11",
"tmp_fep1_mong", "tmp_fep1_actel", "tmp_bep_pcb"]
comps = ensure_list(comps)
model = Model.from_load_page(load, comps)
states = States.from_load_page(load, comp=states_comp)
if get_msids:
msids = self._get_msids(model, comps, tl_file)
else:
msids = EmptyTimeSeries()
super(ThermalModelFromLoad, self).__init__(msids, states, model)
[docs]class ThermalModelRunner(ModelDataset):
"""
Class for running Xija thermal models.
Parameters
----------
name : string
The name of the MSID to simulate, e.g. "1dpamzt"
tstart : string
The start time in YYYY:DOY:HH:MM:SS format.
tstop : string
The stop time in YYYY:DOY:HH:MM:SS format.
states : dict, optional
A dictionary of modeled commanded states required for the model. The
states can either be a constant value or NumPy arrays. If not supplied,
the thermal model will be run with states from the commanded states
database.
T_init : float, optional
The initial temperature for the thermal model run. If None,
an initial temperature will be determined from telemetry.
Default: None
other_init : dict, optional
A dictionary of names of nodes (such as pseudo-nodes) and initial
values, which can be supplied to initialize these nodes for the
start of the model run. Default: None
get_msids : boolean, optional
Whether or not to pull data from the engineering archive.
Default: False
dt : float, optional
The timestep to use for this run. Default is 328 seconds or is provided
by the model specification file.
model_spec : string, optional
Path to the model spec JSON file for the model. Default: None, the
standard model path will be used.
mask_bad_times : boolean, optional
If set, bad times from the data are included in the array masks
and plots. Default: False
ephem_file : string, optional
An AstroPy ASCII table containing a custom ephemeris. Must have
the following columns:
times: CXC seconds
orbitephem0_x: Chandra orbit ephemeris x-component in units of m
orbitephem0_y: Chandra orbit ephemeris y-component in units of m
orbitephem0_z: Chandra orbit ephemeris z-component in units of m
solarephem0_x: Solar orbit ephemeris x-component in units of m
solarephem0_y: Solar orbit ephemeris y-component in units of m
solarephem0_z: Solar orbit ephemeris z-component in units of m
Default : None, which means the ephemeris will be taken from the
cheta archive.
evolve_method : integer, optional
Whether to use the old xija core solver (1) or the new one (2).
Default: None, which defaults to the value in the model spec
file.
rk4 : integer, optional
Whether to use 4th-order Runge-Kutta (1) instead of 2nd order (0).
Only works with evolve_method=2. Default: None, which defaults
to the value in the model spec file.
tl_file : string, optional
The path to a tracelog file which will supply MSID information
if ``use_msids=True``. Default: None
compute_model_supp : callable, optional
A function which takes the model name, tstart, tstop,
and a XijaModel object, and allows the user to
perform custom operations on the model.
chandra_models_path : str, optional
The path to the chandra_models repository to be used
when obtaining a model specification file. Default:
$SKA/data/chandra_models
chandra_models_version : str, optional
The version of the chandra_models repository to be used
when obtaining a model specification file. Can be a
version number or a named branch. Default is to use the
latest tagged version.
Examples
--------
>>> states = {"ccd_count": np.array([5, 6, 1]),
... "pitch": np.array([150.0] * 3),
... "fep_count": np.array([5, 6, 1]),
... "clocking": np.array([1] * 3),
... "vid_board": np.array([1] * 3),
... "off_nom_roll": np.array([0.0] * 3),
... "simpos": np.array([-99616.0] * 3),
... "datestart": np.array(["2015:002:00:00:00", "2015:002:12:00:00", "2015:003:12:00:00"]),
... "datestop": np.array(["2015:002:12:00:00", "2015:003:12:00:00", "2015:005:00:00:00"])}
>>> dpa_model = ThermalModelRunner("1dpamzt", "2015:002:00:00:00",
... "2015:005:00:00:00", states=states,
... T_init=10.1)
"""
def __init__(self, name, tstart, tstop, states=None, T_init=None,
other_init=None, get_msids=False, dt=328.0, model_spec=None,
mask_bad_times=False, ephem_file=None, evolve_method=None,
rk4=None, tl_file=None, compute_model_supp=None,
chandra_models_path=None, chandra_models_version=None):
if name in short_name_rev:
name = short_name_rev[name]
self.name = name.lower()
self.sname = short_name.get(name, name)
if self.name in acis_models:
self.model_check = importlib.import_module(
f"acis_thermal_check.apps.{self.sname}_check")
else:
self.model_check = None
model_spec = find_json(name, model_spec,
repo_path=chandra_models_path,
version=chandra_models_version)
self.ephem_file = ephem_file
self.compute_model_supp = compute_model_supp
tstart = CxoTime(tstart).date
tstop = CxoTime(tstop).date
tstart_secs = CxoTime(tstart).secs
tstop_secs = CxoTime(tstop).secs
self.datestart = tstart
self.datestop = tstop
self.tstart = Quantity(tstart_secs, "s")
self.tstop = Quantity(tstop_secs, "s")
last_ecl_time = fetch.get_time_range("aoeclips", format='secs')[1]
self.no_eclipse = tstop_secs > last_ecl_time
self.no_earth_heat = getattr(self, "no_earth_heat", False)
if states is not None:
if isinstance(states, States):
states = states.as_array()
elif isinstance(states, dict):
if "tstart" not in states:
states["tstart"] = CxoTime(states["datestart"]).secs
if "tstop" not in states:
states["tstop"] = CxoTime(states["datestop"]).secs
num_states = states["tstart"].size
if "letg" not in states:
states["letg"] = np.array(["RETR"] * num_states)
if "hetg" not in states:
states["hetg"] = np.array(["RETR"] * num_states)
states = dict_to_array(states)
if T_init is None:
last_tlm_date = fetch.get_time_range(self.name, format='secs')[1]
if tstart_secs+700.0 > last_tlm_date:
raise RuntimeError(f"T_init=None, but the start time of {tstart} "
"is ahead of the last time in telemetry. "
"Please specify T_init or choose a different "
"time.")
T_init = fetch.MSID(self.name, tstart_secs-700., tstart_secs).vals[-1]
self.T_init = Quantity(T_init, "deg_C")
if self.name in acis_models and states is not None:
self.xija_model = self._compute_acis_model(self.name, tstart, tstop,
states, dt, T_init, model_spec,
rk4=rk4, other_init=other_init,
evolve_method=evolve_method)
else:
self.xija_model = self._compute_model(name, tstart, tstop, dt, T_init,
states, model_spec,
other_init=other_init,
evolve_method=evolve_method,
rk4=rk4)
self.model_spec = self.xija_model.model_spec
self.limits = self.xija_model.limits
if states is None:
states = self.xija_model.cmd_states
states_obj = States(states)
self.bad_times = getattr(self.xija_model, "bad_times", None)
self.bad_times_indices = getattr(self.xija_model, "bad_times_indices", None)
if isinstance(states, dict):
states.pop("dh_heater", None)
components = [self.name]
if 'dpa_power' in self.xija_model.comp:
components.append('dpa_power')
if 'earthheat__fptemp' in self.xija_model.comp:
components.append('earthheat__fptemp')
if states is None:
for c in ["pitch", "roll", "fep_count", "vid_board", "clocking",
"ccd_count", "sim_z"]:
if c in self.xija_model.comp:
components.append(c)
masks = {}
if mask_bad_times and self.bad_times is not None:
masks[self.name] = np.ones(self.xija_model.times.shape, dtype='bool')
for (left, right) in self.bad_times_indices:
masks[self.name][left:right] = False
model_obj = Model.from_xija(self.xija_model, components, masks=masks)
if get_msids:
msids_obj = self._get_msids(model_obj, [self.name], tl_file)
else:
msids_obj = EmptyTimeSeries()
super(ThermalModelRunner, self).__init__(msids_obj, states_obj, model_obj)
def _get_ephemeris(self, tstart, tstop, times):
msids = [f"orbitephem0_{axis}" for axis in "xyz"]
msids += [f"solarephem0_{axis}" for axis in "xyz"]
ephem = {}
if self.ephem_file is None:
e = fetch.MSIDset(msids, tstart - 2000.0, tstop + 2000.0)
for msid in msids:
ephem[msid] = Ska.Numpy.interpolate(e[msid].vals, e[msid].times,
times)
else:
e = ascii.read(self.ephem_file)
idxs = np.logical_and(e["times"] >= tstart - 2000.0,
e["times"] <= tstop + 2000.0)
for msid in msids:
ephem[msid] = Ska.Numpy.interpolate(e[msid][idxs],
e["times"][idxs], times)
return ephem
def _compute_model(self, name, tstart, tstop, dt, T_init, states,
model_spec, other_init=None, evolve_method=None,
rk4=None):
if name == "fptemp_11":
name = "fptemp"
model = xija.XijaModel(name, start=tstart, stop=tstop, dt=dt,
model_spec=model_spec,
evolve_method=evolve_method, rk4=rk4)
model.comp[name].set_data(T_init)
for t in ["dea0", "dpa0"]:
if t in model.comp:
model.comp[t].set_data(T_init)
if other_init is not None:
for k, v in other_init.items():
model.comp[k].set_data(v)
if states is not None:
if isinstance(states, np.ndarray):
state_names = states.dtype.names
else:
state_names = list(states.keys())
state_times = CxoTime(
np.array([states["datestart"], states["datestop"]])).secs
for k in state_names:
if k in model.comp:
model.comp[k].set_data(states[k], state_times)
if self.no_eclipse:
model.comp["eclipse"].set_data(False)
if self.compute_model_supp is not None:
self.compute_model_supp(name, tstart, tstop, model)
model.make()
model.calc()
return model
def _compute_acis_model(self, name, tstart, tstop, states, dt, T_init,
model_spec, other_init=None, evolve_method=None,
rk4=None):
import re
from acis_thermal_check.utils import calc_pitch_roll
check_obj = getattr(self.model_check, model_classes[self.sname])()
if name == "fptemp_11":
name = "fptemp"
model = xija.XijaModel(name, start=tstart, stop=tstop, dt=dt,
model_spec=model_spec, rk4=rk4,
evolve_method=evolve_method)
ephem = self._get_ephemeris(model.tstart, model.tstop, model.times)
if states is None:
state_times = model.times
state_names = ["ccd_count", "fep_count", "vid_board",
"clocking", "pitch", "roll"]
if 'aoattqt1' in model.comp:
state_names += ["q1", "q2", "q3", "q4"]
states = {}
pattern = re.compile("q[1-4]")
for n in state_names:
nstate = n
ncomp = n
if pattern.match(n):
ncomp = f'aoattqt{n[-1]}'
elif name == "roll":
nstate = "off_nom_roll"
states[nstate] = np.array(model.comp[ncomp].dvals)
else:
if isinstance(states, np.ndarray):
state_names = states.dtype.names
else:
state_names = list(states.keys())
state_times = np.array([states["tstart"], states["tstop"]])
model.comp['sim_z'].set_data(np.array(states['simpos']), state_times)
if 'pitch' in state_names:
model.comp['pitch'].set_data(np.array(states['pitch']), state_times)
else:
pitch, roll = calc_pitch_roll(model.times, ephem, states)
model.comp['pitch'].set_data(pitch, model.times)
model.comp['roll'].set_data(roll, model.times)
for st in ('ccd_count', 'fep_count', 'vid_board', 'clocking'):
model.comp[st].set_data(np.array(states[st]), state_times)
if 'dh_heater' in model.comp:
dhh = states["dh_heater"] if "dh_heater" in state_names else 0
model.comp['dh_heater'].set_data(dhh, state_times)
if "off_nom_roll" in state_names:
roll = np.array(states["off_nom_roll"])
model.comp["roll"].set_data(roll, state_times)
if 'dpa_power' in model.comp:
# This is just a hack, we're not
# really setting the power to zero.
# But this value has no effect on
# model evolution.
model.comp['dpa_power'].set_data(0.0)
model.comp[name].set_data(T_init)
if self.no_eclipse:
model.comp["eclipse"].set_data(False)
check_obj._calc_model_supp(model, state_times, states, ephem, None)
if self.name == "fptemp_11" and self.no_earth_heat:
model.comp["earthheat__fptemp"].k = 0.0
if other_init is not None:
for k, v in other_init.items():
model.comp[k] = v
if self.compute_model_supp is not None:
self.compute_model_supp(name, tstart, tstop, model)
model.make()
model.calc()
return model
[docs] @classmethod
def from_states_file(cls, name, states_file, **kwargs):
"""
Run a xija thermal model using a states.dat file.
Parameters
----------
name : string
The name of the MSID to simulate, e.g.
"1dpamzt"
states_file : string
A file containing commanded states, in the same
format as "states.dat" which is outputted by ACIS
thermal model runs for loads.
All other keyword arguments which are passed to the main
:class:`~acispy.thermal_models.ThermalModelRunner`
constructor can be passed to this method as well.
"""
states = States.from_load_file(states_file)
tstart = CxoTime(states['tstart'].value[0]).date
tstop = CxoTime(states['tstop'].value[-1]).date
return cls(name, tstart, tstop, states=states, **kwargs)
[docs] @classmethod
def from_commands(cls, name, cmds, **kwargs):
"""
Parameters
----------
name : string
The name of the MSID to simulate, e.g. "1dpamzt"
cmds : list of commands or CommandTable
The commands from which to derive states.
All other keyword arguments which are passed to the main
:class:`~acispy.thermal_models.ThermalModelRunner`
constructor can be passed to this method as well.
"""
if not isinstance(cmds, commands.CommandTable):
cmds = commands.CommandTable(cmds)
states = States.from_commands(cmds)
return cls(name, states["datestart"][0], states["datestop"][-1],
states=states, **kwargs)
[docs] @classmethod
def from_backstop(cls, name, backstop_file, days=3, T_init=None,
other_cmds=None, **kwargs):
"""
Run a thermal model using states derived from a backstop
file. Continuity with previous states will be automatically
handled.
Parameters
----------
name : string
The name of the MSID to simulate, e.g. "1dpamzt"
backstop_file : string
The path to the backstop file.
days : float
T_init : float, optional
The initial temperature for the thermal model run. If None,
an initial temperature will be determined from telemetry.
Default: None
other_cmds : list of commands or CommandTable
Other commands to be included in the list.
All other keyword arguments which are passed to the main
:class:`~acispy.thermal_models.ThermalModelRunner`
constructor can be passed to this method as well.
"""
bs_cmds = commands.get_cmds_from_backstop(backstop_file)
bs_dates = bs_cmds["date"]
bs_cmds['time'] = CxoTime(bs_cmds['date']).secs
last_tlm_date = fetch.get_time_range(name, format='date')[1]
last_tlm_time = CxoTime(last_tlm_date).secs
tstart = min(last_tlm_time-3600.0, bs_cmds['time'][0]-days*86400.)
if T_init is None:
T_init = fetch.MSID(name, tstart-700., tstart).vals[-1]
ok = bs_cmds['event_type'] == 'RUNNING_LOAD_TERMINATION_TIME'
if np.any(ok):
rltt = CxoTime(bs_dates[ok][0])
else:
# Handle the case of old loads (prior to backstop 6.9) where there
# is no RLTT. If the first command is AOACRSTD this indicates the
# beginning of a maneuver ATS which may overlap by 3 mins with the
# previous loads because of the AOACRSTD command. So move the RLTT
# forward by 3 minutes (exactly 180.0 sec). If the first command is
# not AOACRSTD then that command time is used as RLTT.
if bs_cmds['tlmsid'][0] == 'AOACRSTD':
rltt = CxoTime(bs_cmds['time'][0] + 180)
else:
rltt = CxoTime(bs_cmds['date'][0])
# Get non-backstop commands for continuity
cmds = commands.get_cmds(tstart, rltt, inclusive_stop=True)
# Add backstop commands
cmds = cmds.add_cmds(bs_cmds)
if other_cmds is not None:
if not isinstance(other_cmds, commands.CommandTable):
other_cmds = commands.CommandTable(other_cmds)
cmds = cmds.add_cmds(other_cmds)
return cls.from_commands(name, cmds, T_init=T_init, **kwargs)
[docs] def make_solarheat_plot(self, node, figfile=None, fig=None):
"""
Make a plot which shows the solar heat value vs. pitch.
Parameters
----------
node : string
The xija node which has the solar heating applied to it
in the model. Can be an real node on the spacecraft like
1DEAMZT or a pseudo-node like "dpa0" in the 1DPAMZT model.
figfile : string, optional
The file to write the solar heating plot to. One will be created
if not provided.
fig : :class:`~matplotlib.figure.Figure`, optional
A Figure instance to plot in. Default: None, one will be
created if not provided.
"""
if fig is None:
fig, ax = plt.subplots(figsize=(15, 10))
else:
ax = fig.add_subplot(111)
try:
comp = self.xija_model.comp[f"solarheat__{node}"]
except KeyError:
raise KeyError(f"{node} does not have a SolarHeat component!")
comp.plot_solar_heat__pitch(fig, ax)
ax.set_xlabel("Pitch (deg)", fontsize=18)
ax.set_ylabel("SolarHeat", fontsize=18)
ax.lines[1].set_label("P")
ax.lines[2].set_label("P+dP")
ax.legend(fontsize=18)
ax.tick_params(width=2, length=6)
for axis in ['top', 'bottom', 'left', 'right']:
ax.spines[axis].set_linewidth(2)
fontProperties = font_manager.FontProperties(size=18)
for label in ax.get_xticklabels():
label.set_fontproperties(fontProperties)
for label in ax.get_yticklabels():
label.set_fontproperties(fontProperties)
ax.title.set_fontsize(18)
if figfile is not None:
fig.savefig(figfile)
return fig
[docs] def make_power_plot(self, figfile=None, fig=None, use_ccd_count=False):
"""
Make a plot which shows the ACIS state power coefficients,
vs. either FEP or CCD count.
Parameters
----------
figfile : string, optional
The file to write the power coefficient plot to.
One will be created if not provided.
fig : :class:`~matplotlib.figure.Figure`, optional
A Figure instance to plot in. Default: None, one
will be created if not provided.
use_ccd_count : boolean, optional
If True, plot the CCD count on the x-axis. Primarily
useful for the 1DEAMZT model. Default: False
"""
if fig is None:
fig, ax = plt.subplots(figsize=(10, 10))
else:
ax = fig.add_subplot(111)
xm = self.xija_model
dtype = [('x', 'int'), ('y', 'float'), ('name', '<U32')]
clocking = []
not_clocking = []
either = []
for i, parname in enumerate(xm.parnames):
name = parname.split("__")[-1]
if name.startswith("pow"):
coeff = name.split("_")[-1]
if use_ccd_count:
count = int(coeff[1])
else:
count = int(coeff[0])
if name.endswith("x"):
either.append((count, xm.parvals[i], coeff))
elif name.endswith("1"):
clocking.append((count, xm.parvals[i], coeff))
elif name.endswith("0"):
not_clocking.append((count, xm.parvals[i], coeff))
clocking = np.array(clocking, dtype=dtype)
not_clocking = np.array(not_clocking, dtype=dtype)
either = np.array(either, dtype=dtype)
ax.scatter(clocking["x"], clocking["y"], label="Clocking", s=40,
color="C0")
for i, txt in enumerate(clocking["name"]):
ax.text(clocking["x"][i] + 0.25, clocking["y"][i], txt, color="C0",
fontsize=18)
ax.scatter(not_clocking["x"], not_clocking["y"], label="Not Clocking",
s=40, color="C1")
for i, txt in enumerate(not_clocking["name"]):
ax.text(not_clocking["x"][i] + 0.25, not_clocking["y"][i], txt,
color="C1", fontsize=18)
ax.scatter(either["x"], either["y"], label="Either", s=40, color="C2")
for i, txt in enumerate(either["name"]):
ax.text(either["x"][i] + 0.25, either["y"][i], txt, color="C2",
fontsize=18)
ax.tick_params(width=2, length=6)
ax.set_xlabel("{} Count".format("CCD" if use_ccd_count else "FEP"), fontsize=18)
ax.set_ylabel("Coefficient Value", fontsize=18)
ax.set_xticks(np.arange(7))
ax.set_xlim(-0.25, 7.0)
ax.legend(fontsize=18)
for axis in ['top', 'bottom', 'left', 'right']:
ax.spines[axis].set_linewidth(2)
fontProperties = font_manager.FontProperties(size=18)
for label in ax.get_xticklabels():
label.set_fontproperties(fontProperties)
for label in ax.get_yticklabels():
label.set_fontproperties(fontProperties)
if figfile is not None:
fig.savefig(figfile)
return fig
def find_text_time(time, hours=1.0):
return CxoTime(CxoTime(time).secs+hours*3600.0).date
def make_default_states():
return {
"ccd_count": np.array([0], dtype='int'),
"fep_count": np.array([0], dtype='int'),
"clocking": np.array([0], dtype='int'),
"vid_board": np.array([0], dtype='int'),
"pitch": np.array([90.0]),
"simpos": np.array([-99616.0]),
"hetg": np.array(["RETR"]),
"letg": np.array(["RETR"]),
"off_nom_roll": np.array([0.0]),
"dh_heater": np.array([0], dtype='int'),
"q1": np.array([1.0]),
"q2": np.array([0.0]),
"q3": np.array([0.0]),
"q4": np.array([0.0])
}
[docs]class SimulateSingleState(ThermalModelRunner):
"""
Class for simulating thermal models under constant conditions.
Parameters
----------
name : string
The name of the model to simulate.
tstart : string or float
The start time of the single-state run.
tstop : string or float
The stop time of the single-state run.
states : dict
A dictionary of modeled commanded states required for the single-state
run. All states must be single values. Any particular states which are
not included in this dict will be filled in using the "default" states,
which assume zero ACIS CCDs and FEPs, normal Sun, zero off-nominal roll,
HRC-S sim position, and no gratings inserted.
T_init : float
The starting temperature for the model in degrees C or F.
model_spec : string, optional
Path to the model spec JSON file for the model. Default: None, the
standard model path will be used.
dt : float, optional
The timestep to use for this run. Default is 328 seconds or is provided
by the model specification file.
evolve_method : integer, optional
Whether to use the old xija core solver (1) or the new one (2).
Default: None, which defaults to the value in the model spec
file.
rk4 : integer, optional
Whether to use 4th-order Runge-Kutta (1) instead of 2nd order (0).
Only works with evolve_method=2. Default: None, which defaults
to the value in the model spec file.
no_earth_heat : boolean, optional
Ignore the effect of earthshine in the ACIS radiator field of view.
This really only might be useful for the ACIS focal plane
temperature model. Default: False
other_init : dict, optional
A dictionary of names of nodes (such as pseudo-nodes) and initial
values, which can be supplied to initialize these nodes for the
start of the model run. Default: None
compute_model_supp : callable, optional
A function which takes the model name, tstart, tstop,
and a XijaModel object, and allows the user to
perform custom operations on the model.
Examples
--------
>>> states = {"pitch": 75.0, "off_nom_roll": -6.0, "clocking": 1,
... "ccd_count": 6, "dh_heater": 1, "simpos": 75624.0,}
>>> dea_run = SimulateSingleState("1deamzt", "2016:201:05:12:03",
... "2016:202:05:12:03", states, 15.0)
"""
def __init__(self, name, tstart, tstop, states, T_init, model_spec=None,
dt=328.0, evolve_method=None, rk4=None, no_earth_heat=False,
other_init=None, compute_model_supp=None):
_states = make_default_states()
if "ccd_count" in states and "fep_count" not in states:
states["fep_count"] = states["ccd_count"]
if "fep_count" in states and "ccd_count" not in states:
states["ccd_count"] = states["fep_count"]
for k in list(states.keys()):
if k in _states:
_states[k][0] = states[k]
else:
raise KeyError(f"You input a state ('{k}') which does not exist!")
for i in range(1, 5):
_states[f"targ_q{i}"] = _states[f"q{i}"]
if name in short_name_rev:
name = short_name_rev[name]
tstart = CxoTime(tstart).secs
datestart = CxoTime(tstart).date
tstop = CxoTime(tstop).secs
datestop = CxoTime(tstop).date
_states["datestart"] = np.array([datestart])
_states["datestop"] = np.array([datestop])
_states["tstart"] = np.array([tstart])
_states["tstop"] = np.array([tstop])
self.no_earth_heat = no_earth_heat
super().__init__(name, datestart, datestop, states=_states,
T_init=T_init, dt=dt, evolve_method=evolve_method,
rk4=rk4, model_spec=model_spec, get_msids=False,
other_init=other_init,
compute_model_supp=compute_model_supp)
[docs] def write_msids(self, filename, fields, mask_field=None, overwrite=False):
raise NotImplementedError
[docs] def write_states(self, states_file, overwrite=False):
raise NotImplementedError
[docs] def make_dashboard_plots(self, yplotlimits=None, errorplotlimits=None, fig=None):
raise NotImplementedError
[docs] def write_model_and_data(self, filename, overwrite=False):
raise NotImplementedError
[docs]class SimulateECSRun(ThermalModelRunner):
"""
Class for simulating thermal models for ECS measurements.
name : string
The msid of the model to simulate.
tstart : string or float
The start time of the single-state run.
hours : integer or float
The length of the ECS measurement in hours. NOTE that the
actual length of the ECS run is hours + 10 ks + 12 s, as
per the ECS CAP.
T_init : float
The starting temperature for the model in degrees C.
attitude : array_like
The input attitude for this simulated ECS run. Can be one of three
types:
* (pitch, roll) combination, e.g. (155.0, 5.0)
* Attitude quaternion, e.g [1.0, 0.0, 0.0, 0.0]
* Load name (for vehicle load): SEP0917C
If the (pitch, roll) combination is chosen, note that a default
quaternion of [1.0, 0.0, 0.0, 0.0] will be used, which means that the
focal plane prediction may be inaccurate since the earth solid angle
depends on the quaternion.
ccd_count : integer
The number of CCDs to clock.
dh_heater: integer, optional
Flag to set whether (1) or not (0) the detector housing heater is on.
Default: 0
dt : float, optional
The timestep to use for this run. Default is 328 seconds or is provided
by the model specification file.
evolve_method : integer, optional
Whether to use the old xija core solver (1) or the new one (2).
Default: None, which defaults to the value in the model spec
file.
rk4 : integer, optional
Whether to use 4th-order Runge-Kutta (1) instead of 2nd order (0).
Only works with evolve_method=2. Default: None, which defaults
to the value in the model spec file.
model_spec : string, optional
Path to the model spec JSON file for the model. Default: None, the
standard model path will be used.
no_earth_heat : boolean, optional
Ignore the effect of earthshine in the ACIS radiator field of view.
This really only might be useful for the ACIS focal plane
temperature model. Default: False
other_init : dict, optional
A dictionary of names of nodes (such as pseudo-nodes) and initial
values, which can be supplied to initialize these nodes for the
start of the model run. Default: None
compute_model_supp : callable, optional
A function which takes the model name, tstart, tstop,
and a XijaModel object, and allows the user to
perform custom operations on the model.
Examples
--------
>>> dea_run = SimulateECSRun("1deamzt", "2016:201:05:12:03", 24, 14.0,
... (150.-6.0), 5, dh_heater=1)
"""
def __init__(self, name, tstart, hours, T_init, attitude, ccd_count,
dh_heater=0, dt=328.0, evolve_method=None,
rk4=None, model_spec=None, no_earth_heat=False,
instrument=None, other_init=None,
compute_model_supp=None):
if name in short_name_rev:
name = short_name_rev[name]
if name.lower() == "fptemp_11" and instrument is None:
raise RuntimeError("Please specify instrument='ACIS-I' or "
"instrument='ACIS-S' when running the FP model!")
tstart = CxoTime(tstart).secs
tend = tstart+hours*3600.0+10012.0
tstop = tend+0.5*(tend-tstart)
datestart = CxoTime(tstart).date
datestop = CxoTime(tstop).date
if isinstance(attitude, str):
self.vehicle_load = attitude
else:
self.vehicle_load = None
self.hours = hours
self.no_earth_heat = no_earth_heat
self.instrument = instrument
if self.vehicle_load is not None:
mylog.info(f"Modeling a {ccd_count}-chip state concurrent with "
f"the {self.vehicle_load} vehicle loads.")
states = dict((k, state.value) for (k, state) in
States.from_load_page(self.vehicle_load).table.items())
run_idxs = states["tstart"] < tstop
states["ccd_count"][run_idxs] = ccd_count
states["fep_count"][run_idxs] = ccd_count
states["clocking"][run_idxs] = 1
states["vid_board"][run_idxs] = 1
states["simpos"][run_idxs] = -99616.0
states["hetg"][run_idxs] = "RETR"
states["letg"][run_idxs] = "RETR"
else:
states = {
"ccd_count": np.array([ccd_count], dtype='int'),
"fep_count": np.array([ccd_count], dtype='int'),
"clocking": np.array([1], dtype='int'),
"vid_board": np.array([1], dtype='int'),
"simpos": np.array([-99616.0]),
"datestart": np.array([datestart]),
"datestop": np.array([datestop]),
"tstart": np.array([tstart]),
"tstop": np.array([tstop]),
"hetg": np.array(["RETR"]),
"letg": np.array(["RETR"]),
"dh_heater": np.array([dh_heater], dtype='int')
}
if len(attitude) == 2:
if name.lower() == "fptemp_11":
mylog.warning("For the ACIS FP model, an attitude quaternion "
"is required to accurately capture the effects of "
"earth solid angle. With only the pitch and "
"off-nominal roll specified, temperatures "
"near radiation zones may be inaccurate.")
states["pitch"] = np.array([attitude[0]])
states["off_nom_roll"] = np.array([attitude[1]])
q = [1.0, 0.0, 0.0, 0.0]
elif len(attitude) == 4:
q = attitude
else:
raise RuntimeError("Invalid format for 'attitude'! Either "
"supply [pitch, off_nom_roll], an "
"attitude quaternion, or a load name!")
for i in range(4):
states[f"q{i+1}"] = np.array([q[i]])
super().__init__(name, tstart, tstop, states=states, T_init=T_init,
dt=dt, evolve_method=evolve_method, rk4=rk4,
model_spec=model_spec, other_init=other_init,
compute_model_supp=compute_model_supp)
mylog.info("Run Parameters")
mylog.info("--------------")
mylog.info(f"Modeled Temperature: {name}")
mylog.info(f"Start Datestring: {datestart}")
mylog.info(f"Length of state in hours: {self.hours}")
mylog.info(f"Stop Datestring: {datestop}")
mylog.info(f"Initial Temperature: {T_init} degrees C")
mylog.info(f"CCD/FEP Count: {ccd_count}")
if self.vehicle_load is None:
if len(attitude) == 2:
pitch, roll = attitude
else:
pitch = self.xija_model.comp["pitch"].dvals[0]
roll = self.xija_model.comp["roll"].dvals[0]
mylog.info(f"Quaternion: {attitude}")
mylog.info(f"Pitch: {pitch}")
mylog.info(f"Off-nominal Roll: {roll}")
dhh = {0: "OFF", 1: "ON"}[dh_heater]
mylog.info(f"Detector Housing Heater: {dhh}")
self.tend = tend
self.dateend = CxoTime(tend).date
mylog.info("Model Result")
mylog.info("------------")
self.limits = get_acis_limits(self.name, self.model_spec,
limits_map=limits_map)
self.limit_time = None
self.limit_date = None
self.duration = None
self.violate = False
self.hours = hours
if self.name.lower() == "fptemp_11":
cold = self.mvals.value < self.limits["cold_ecs"].value
cold &= self.xija_model.times < self.tend
cold_time = np.sum(cold)
if cold_time == 0:
msg = "The focal plane is never cold for this ECS measurement."
else:
msg = f"The focal plane is cold for " \
f"{cold_time*self.xija_model.dt_ksec} ks."
mylog.info(msg)
limit_name = self.instrument.lower().replace("-", "_")
else:
limit_name = "planning_hi"
self.limit = self.limits[limit_name]
viols = self.mvals.value > self.limit.value
if np.any(viols):
idx = np.where(viols)[0][0]
self.limit_time = self.times('model', self.name)[idx]
self.limit_date = CxoTime(self.limit_time).date
self.duration = Quantity((self.limit_time.value-tstart)*0.001, "ks")
msg = f"The limit of {self.limit.value} degrees C will be " \
f"reached at {self.limit_date}, after {self.duration.value} ksec."
mylog.info(msg)
if self.limit_time.value < self.tend:
self.violate = True
viol_time = "before"
else:
self.violate = False
viol_time = "after"
mylog.info(f"The limit is reached {viol_time} the end of the observation.")
else:
mylog.info(f"The limit of {self.limit.value} degrees C "
f"is never reached.")
if name.lower() != "fptemp_11":
if self.violate:
mylog.warning("This observation is NOT safe from a thermal perspective.")
else:
mylog.info("This observation is safe from a thermal perspective.")
def _time_ticks(self, dp, ymax, fontsize):
from matplotlib.ticker import AutoMinorLocator
axt = dp.ax.twiny()
mtimes = self.xija_model.times
xmin, xmax = (plotdate2cxctime(dp.ax.get_xlim())-mtimes[0])*1.0e-3
axt.plot((mtimes-mtimes[0])*1.0e-3,
ymax*np.ones_like(mtimes))
axt.set_xlim(xmin, xmax)
axt.xaxis.set_minor_locator(AutoMinorLocator(5))
axt.set_xlabel("Time (ks)", fontdict={"size": fontsize})
fontProperties = font_manager.FontProperties(size=fontsize)
for label in axt.get_xticklabels():
label.set_fontproperties(fontProperties)
for label in axt.get_yticklabels():
label.set_fontproperties(fontProperties)
axt.tick_params(which="major", width=2, length=6)
axt.tick_params(which="minor", width=2, length=3)
[docs] def plot_model(self, plot=None, fontsize=18, **kwargs):
"""
Plot the simulated ECS run.
Parameters
----------
plot : :class:`~acispy.plots.DatePlot` or :class:`~acispy.plots.CustomDatePlot`, optional
An existing DatePlot to add this plot to. Default: None, one
will be created if not provided.
fontsize : integer, optional
The font size for the labels in the plot. Default: 18 pt.
"""
if self.vehicle_load is None:
field2 = None
else:
field2 = "pitch"
viol_text = "NOT SAFE" if self.violate else "SAFE"
dp = DatePlot(self, [("model", self.name)], field2=field2, plot=plot,
fontsize=fontsize, **kwargs)
dp.add_hline(self.limit.value, ls='-', lw=2, color=self.limit.color)
if self.name.lower() != "fptemp_11":
dp.add_text(find_text_time(self.dateend, hours=4.0),
self.T_init.value + 2.0, viol_text, fontsize=22,
color='black')
dp.add_hline(self.limits["yellow_hi"].value, ls='-', lw=2,
color=self.limits["yellow_hi"].color)
else:
dp.add_hline(self.limits["cold_ecs"].value, ls='-', lw=2,
color=self.limits["cold_ecs"].color)
dp.add_vline(self.datestart, ls='--', lw=2, color='b')
dp.add_text(find_text_time(self.datestart), self.limit.value - 2.0,
"START", color='blue', rotation="vertical")
dp.add_vline(self.dateend, ls='--', lw=2, color='b')
dp.add_text(find_text_time(self.dateend), self.limit.value - 2.0,
"END", color='blue', rotation="vertical")
if self.limit_date is not None:
dp.add_vline(self.limit_date, ls='--', lw=2, color='r')
dp.add_text(find_text_time(self.limit_date), self.limit.value-2.0,
"VIOLATION", color='red', rotation="vertical")
dp.set_xlim(find_text_time(self.datestart, hours=-1.0), self.datestop)
ymin = min(self.T_init.value, self.mvals.value.min())-2.0
if self.name.lower() != "fptemp_11":
ymax = self.limits["yellow_hi"].value
else:
ymax = self.limit.value
ymax = max(ymax, self.mvals.value.max())+3.0
self._time_ticks(dp, ymax, fontsize)
dp.set_ylim(ymin, ymax)
return dp
[docs] def get_temp_at_time(self, t):
"""
Get the model temperature at a time *t* seconds
past the beginning of the single-state run.
"""
t += self.tstart.value
return Quantity(np.interp(t, self['model', self.name].times.value,
self['model', self.name].value), "deg_C")
@property
def mvals(self):
return self['model', self.name]
[docs] def write_msids(self, filename, fields, mask_field=None, overwrite=False):
raise NotImplementedError
[docs] def write_states(self, states_file, overwrite=False):
raise NotImplementedError
[docs] def make_dashboard_plots(self, yplotlimits=None, errorplotlimits=None, fig=None):
raise NotImplementedError
[docs] def write_model_and_data(self, filename, overwrite=False):
raise NotImplementedError