Source code for agasc_gaia.gaia_model

"""
Model for ACA Magnitudes as a function of Gaia magnitudes.

The model is implemented as a two classes:
- `MissingValueFiller` with methods `fit` and `fill_missing_values` methods.
- `GaiaModel` with `fit`, `predict` and `uncertainty` methods.

The reason for separating the missing-value-filling and the magnitude model is that they can be
used on different datasets. To estimate the missing Gaia values, one does not need the star to be
observed by ACA.

We first fit the missing value filler, using the training sample from the Gaia dataset.
Then we fit the magnitude model using the observed stars training set.
"""

import numpy as np
import scipy
import jinja2
import pandas as pd
import itertools
from sklearn.base import BaseEstimator, RegressorMixin

# from pathlib import Path
from astropy.table import Table

# from astropy.io import fits
# from scipy.optimize import curve_fit
from scipy.interpolate import interp1d
from sklearn.linear_model import LinearRegression
from sklearn.utils.validation import check_is_fitted

try:
    import sympy
    from sympy.matrices.dense import matrix_multiply_elementwise as prod

    with_sympy = True
except:
    with_sympy = False
from agasc_gaia import FILES
from agasc_gaia import datasets as ds, utils


AGASC_GAIA_SAMPLE = "agasc-gaia"


@utils.cached(name="missing-value-filler")
def get_missing_value_filler():
    filler = MissingValueFiller()
    gaia_data, _ = ds.get_gaia_min_bias_train_test()
    gaia_data = gaia_data.to_pandas()
    filler.fit(gaia_data)
    return filler


@utils.cached(name="gaia_model")
def get_gaia_model():
    # read in the training data
    observed_train = Table.read(FILES[f"{AGASC_GAIA_SAMPLE}-x-match-train-test"][0])
    sel = (
        # these will be extreme outliers, so we will not use them
        (np.abs(observed_train["mag_aca"] - observed_train["ave_gaia_mag"]) < 5)
        # variable stars are not used for training the magnitude model
        & ~observed_train["phot_variable_flag"]
        # only stars with all magnitudes (VERY few in the training sample do not)
        & (observed_train["has_mag"] == 7)
    )
    # also remove...
    if "n_mag_neighbors" in observed_train.colnames:
        # stars with potentially unresolved neighbors that significantly contribute to the magnitude
        sel &= observed_train["n_mag_neighbors"] == 1
    if "neighbor_mag_weight" in observed_train.colnames:
        # stars with potentially unresolved neighbors with large contribution to the magnitude
        sel &= observed_train["neighbor_mag_weight"] > 0.1
    observed_train = observed_train[sel].to_pandas()

    # read in the impute training data
    impute_train, _ = ds.get_gaia_min_bias_train_test()

    gaia_model = GaiaModel(
        missing_value_filler=get_missing_value_filler(), model="simple_color", degree=2
    )
    gaia_model.fit(observed_train, impute_train.to_pandas())
    return gaia_model


class Interp1d:
    def __init__(self, x, y):
        self.interpolator = interp1d(x, y, fill_value=y[0], bounds_error=False)
        self.x = self.interpolator.x
        self.y = self.interpolator.y

    def __call__(self, X):
        res = self.interpolator(X)
        res[X > self.interpolator.x[-1]] = self.interpolator.y[-1]
        return res


[docs]class MissingValueFiller: """ Class to fill missing values in Gaia magnitude data. Gaia provides magnitudes in three bands: g, bp, and rp. The vast majority of stars have all three, but some have only one or two. This class fits linear models to all possibilities, so a missing value can be filled in by predicting it from the other values. """ def __init__( self, mag_bins=None, uncertainty_nmin=100, columns=("g_mag", "rp_mag", "bp_mag") ): self.models_ = None # these bins need to extend to so there are no magnitudes in the overflow bin, # otherwise, we need to change the code below to handle the overflow bin. self.uncertainty_mag_bins_ = ( mag_bins if mag_bins is not None else np.linspace(0, 30, 61) ) self.uncertainty_nmin_ = uncertainty_nmin self.columns = list(columns) def _uncertainty_(self, regress, data, col): X = regress.predict(data[regress.x]) Y = data[col] residuals = X[:, regress.y.index(col)] - Y i_mag_bin = np.digitize(np.mean(X, axis=1), self.uncertainty_mag_bins_) bin_centers = self.uncertainty_mag_bins_[:-1] + np.diff( self.uncertainty_mag_bins_ ) x = [ bin_centers[imag - 1] for imag in np.unique(i_mag_bin) if np.count_nonzero(i_mag_bin == imag) > self.uncertainty_nmin_ ] y = [ np.diff(np.quantile(residuals[i_mag_bin == imag], [0.136, 0.863])).squeeze() / 2 for imag in np.unique(i_mag_bin) if np.count_nonzero(i_mag_bin == imag) > self.uncertainty_nmin_ ] # return interp1d(x, y, fill_value="extrapolate") return Interp1d(x, y) def fit(self, data): # in our case, the vast majority of stars have all magnitudes, # so we can fit with those and call it a day. impute_train = data[data["has_mag"] == 7][self.columns] models = {} for col in impute_train.columns: cols = list(impute_train.columns) cols.remove(col) x, y = [col], cols regress = LinearRegression() regress.fit(impute_train[x], impute_train[y]) model = 1 * ("g_mag" in x) + 2 * ("rp_mag" in x) + 4 * ("bp_mag" in x) regress.x = x regress.y = y regress.uncertainty = { col: self._uncertainty_(regress, impute_train, col) for col in y } models[model] = regress x, y = cols, [col] regress = LinearRegression() regress.fit(impute_train[x], impute_train[y]) model = 1 * ("g_mag" in x) + 2 * ("rp_mag" in x) + 4 * ("bp_mag" in x) regress.x = x regress.y = y regress.uncertainty = { col: self._uncertainty_(regress, impute_train, col) for col in y } models[model] = regress for i in models: models[i].flag = i self.models_ = models def fill_missing_values(self, data): assert self.models_ is not None, "Need to call fit before calling this method" result = data.copy() for i in np.unique(data["has_mag"]): if i not in self.models_: continue model = self.models_[i] sel = data["has_mag"] == i X = data[self.columns][sel][model.x] Y = model.predict(X) for j, col in enumerate(model.y): result.loc[sel, col] = Y[:, j] return result def uncertainty(self, data): assert self.models_ is not None, "Need to call fit before calling this method" result = pd.DataFrame( np.zeros_like(data[self.columns]), columns=self.columns, index=data.index, ) for i in np.unique(data["has_mag"]): if i not in self.models_: continue sel = (data["has_mag"] == i).values model = self.models_[i] x = np.mean(data[model.x][sel], axis=1) for col in model.y: result.loc[sel, col] = model.uncertainty[col](x) return result
# for easier reading, the following formula is transposed from what is actually done in the code: # coefficients[0] + coefficients[1:] @ K @ (mag - mean) / scale # instead of # coefficients[0] + (mag - mean) / scale @ K @ coefficients[1:] # in this description, mag has shape (3, 1) whereas in the code it is (N, 3). GAIA_MODEL_TEMPLATE = """ Model for ACA Magnitudes as a function of Gaia magnitudes. The model has the following form: linear(data) + broken(b, point, m1, m2, w) Linear model is fit with stars with mag < {{ mean_mag_threshold }} Input columns: {{ columns }} Linear model: {{ model }} Broken line model: {{ broken }} """ def _polynomial_features(X, N): """ Polynomial features, including independent term, for all components. Parameters ---------- X : np.array The principal components. N : int The degree of the polynomial. Returns ------- np.array The polynomial features. """ i = np.arange(X.shape[1]) res = [np.ones(X.shape[0], dtype=X.dtype)[:, None]] for n in range(N): res += [ np.hstack( [ np.product(X[:, list(c)], axis=1, keepdims=True) for c in itertools.combinations_with_replacement(i, n + 1) ] ) ] return np.hstack(res) def _polynomial_features_2(X, N): """ Polynomial features, including independent term, for all but the first component. Parameters ---------- X : np.array The principal components. N : int The degree of the polynomial. Returns ------- np.array The polynomial features. """ i = np.arange(1, X.shape[1]) res = [np.ones(X.shape[0], dtype=X.dtype)[:, None], X[:, 0][:, None]] for n in range(N): res += [ np.hstack( [ np.product(X[:, list(c)], axis=1, keepdims=True) for c in itertools.combinations_with_replacement(i, n + 1) ] ) ] return np.hstack(res) def _outlier_mask(residuals): # using 20*IQR is VERY conservative in the gaussian case, but we have tails. q1, q2, q3 = np.quantile(residuals, [0.25, 0.5, 0.75]) iqr = q3 - q1 return (residuals < q2 - 20 * iqr) | (residuals > q2 + 20 * iqr)
[docs]class SimpleColorModel: """ Model for ACA Magnitudes as a function of Gaia magnitudes. The model is fit in the space of the N largest principal components. It is linear below a magnitude threshold, and a broken line above that magnitude. The broken line is a cubic spline. Parameters ---------- missing_value_filler : MissingValueFiller A model that fills missing values in the data. mean_mag_threshold : float The magnitude threshold above which the model is not linear. n_components : int The number of principal components to use. """ def __init__( self, degree=1, alpha=0.0, ): self.degree = degree self.alpha = alpha
[docs] def fit(self, data, column="mag_aca_obs"): """ Fit the model to the data. Parameters ---------- data : pandas.DataFrame The data to fit the model to. """ Y = np.asarray(data[column] - data["g_mag"]) X = np.atleast_2d(np.asarray(data["bp_mag"] - data["rp_mag"])).T self.mean_ = np.mean(X, axis=0, keepdims=True) self.std_ = np.std(X, axis=0, keepdims=True) X = (X - self.mean_) / self.std_ X = _polynomial_features(X, self.degree) XX = np.linalg.inv(X.T @ X + self.alpha * np.eye(X.shape[-1])) self.linear_coefficients_ = XX @ X.T @ Y Y_1 = X @ self.linear_coefficients_ mse = np.sum((Y_1 - Y) ** 2) / (X.shape[0] - X.shape[-1]) self.linear_covariance_ = XX * mse
[docs] def predict(self, data): """ Predict the ACA magnitude for the given data. Parameters ---------- data : pandas.DataFrame The data to predict the magnitude. """ X = np.atleast_2d(np.asarray(data["bp_mag"] - data["rp_mag"])).T X = (X - self.mean_) / self.std_ X = _polynomial_features(X, self.degree) res = np.asarray(data["g_mag"]) + X @ self.linear_coefficients_ return res
def __str__(self): template = jinja2.Template( """ g_mag + Pol(X, n={{degree}}) where: Pol(X, n) = p0 + ... pn * X**n X = ((bp_mag - rp_mag) - mean) / scale mean: {{ mean }} scale: {{ scale }} Linear model: coefficients: {{ linear_coefficients }} covariance: {%- for cov in linear_covariance %} {{ cov }} {%- endfor %} """ ) myself = template.render( degree=self.degree, mean=self.mean_.squeeze(), scale=self.std_.squeeze(), linear_coefficients=self.linear_coefficients_, linear_covariance=self.linear_covariance_, ) return myself
class PrincipalComponentLinearModel: def __init__( self, n_components=2, degree=1, alpha=0.0, ): self.columns = ["g_mag", "bp_mag", "rp_mag"] self.n_components = n_components self.degree = degree self.alpha = alpha def fit(self, data, column="mag_aca_obs", sample_weight=None): X = np.asarray(data[self.columns]) Y = np.asarray(data[column]) # subtract mean and scale by standard deviation self.mean_ = np.mean(X, axis=0, keepdims=True) self.std_ = np.std(X, axis=0, keepdims=True) x_scaled = (X - self.mean_) / self.std_ # diagonalize covariance matrix to find principal components self.covariance_ = np.cov(x_scaled.T) eigenvalues, eigenvectors = np.linalg.eig(self.covariance_) # by convention, I choose only positive eigenvalues: eigenvectors = eigenvectors @ np.diag(np.sign(eigenvalues)) eigenvalues = eigenvalues * np.sign(eigenvalues) i_components = np.argsort(eigenvalues)[::-1][: self.n_components] # take the N largest eigenvalues (and their corresponding eigenvectors) self.eigenvectors_ = eigenvectors[:, i_components] self.eigenvalues_ = eigenvalues[i_components] # project all points on the space generated by the eigenvectors X = x_scaled @ self.eigenvectors_ # Make a linear fit below mag == self.mean_mag_threshold: X = _polynomial_features_2(X, self.degree) XX = np.linalg.inv(X.T @ X + self.alpha * np.eye(X.shape[-1])) self.linear_coefficients_ = XX @ X.T @ Y Y_1 = X @ self.linear_coefficients_ mse = np.sum((Y_1 - Y) ** 2) / (X.shape[0] - X.shape[-1]) self.linear_covariance_ = XX * mse def predict(self, data): """ Predict the ACA magnitude for the given data. Parameters ---------- data : pandas.DataFrame The data to predict the magnitude. """ return self.predict_(self.principal_components(data)) def predict_(self, pc): """ Predict the ACA magnitude for the given principal components. Parameters ---------- PC : np.array The principal components. """ X = _polynomial_features_2(np.hstack([pc]), self.degree) res = X @ self.linear_coefficients_ return res def principal_components(self, data): """ Project the data on the principal components. Parameters ---------- data : pandas.DataFrame The data to project. """ X = data[self.columns].values x_scaled = (X - self.mean_) / self.std_ return x_scaled @ self.eigenvectors_ def __str__(self): template = jinja2.Template( """ coefficients[0] + coefficients[1:] @ eigenvectors @ (mag - mean) / scale where: - `mag` is a 3-vector of Gaia magnitudes [[g_mag], [bp_mag], [rp_mag]] - `scale` and `mean` are 3-vectors to scale and center the magnitudes, - `eigenvectors` is the matrix of eigenvectors of the principal components, - `coefficients` is the vector of the linear model, Number of principal components: {{ n_components }} mean: [ {% for m in mean[0] %}[ {{ m }} ]{% if not loop.last %}, {% endif %}{% endfor %} ] scale: [ {% for m in scale[0] %}[ {{ m }} ]{% if not loop.last %}, {% endif %}{% endfor %} ] Principal Components: Eigenvalues: {{ eigenvalues }} Eigenvectors: {%- for eigenvector in eigenvectors.T %} [{% for m in eigenvector %}{{ m }}{% if not loop.last %}, {% endif %}{% endfor %} ], {%- endfor %} Linear model: coefficients: {{ linear_coefficients }} covariance: {%- for cov in linear_covariance %} {{ cov }} {%- endfor %} """ ) myself = template.render( n_components=self.n_components, mean=self.mean_, scale=self.std_, eigenvectors=self.eigenvectors_, eigenvalues=self.eigenvalues_, linear_coefficients=self.linear_coefficients_, linear_covariance=self.linear_covariance_, ) return myself
[docs]class GaiaModel: """ Model for ACA Magnitudes as a function of Gaia magnitudes. The model is fit in the space of the N largest principal components. It is linear below a magnitude threshold, and a broken line above that magnitude. The broken line is a cubic spline. Parameters ---------- missing_value_filler : MissingValueFiller A model that fills missing values in the data. mean_mag_threshold : float The magnitude threshold above which the model is not linear. n_components : int The number of principal components to use. """ def __init__( self, missing_value_filler, mean_mag_threshold=8, n_components=2, degree=1, alpha=0.0, with_instrument_bias=True, fit_uncertainty=True, use_weights=True, model="PCA", exclude_outliers=True, ): self.mean_mag_threshold = mean_mag_threshold self.missing_value_filler = missing_value_filler self.n_components = n_components if model == "PCA" else 2 self.columns = ["g_mag", "bp_mag", "rp_mag"] self.alpha = alpha self.degree = degree self.with_instrument_bias = with_instrument_bias self.fit_uncertainty = fit_uncertainty self.use_weights = use_weights self.model = model self.exclude_outliers = exclude_outliers if model == "simple_color": self.model = SimpleColorModel(degree=degree, alpha=alpha) else: self.model = PrincipalComponentLinearModel( n_components=n_components, degree=degree, alpha=alpha )
[docs] def fit(self, data, gaia_data, column="mag_aca_obs"): """ Fit the model to the data. Parameters ---------- data : pandas.DataFrame The data to fit the model to. gaia_data : pandas.DataFrame A table of gaia magnitudes (does not require an observed ACA magnitude). This is used to estimate the impact of missing magnitudes in the final estimated ACA magnitude. """ data = self.missing_value_filler.fill_missing_values(data) mean_mag = np.mean(data[self.columns], axis=1) if self.use_weights: sample_weight = GaiaModel.get_weights(data) else: sample_weight = np.ones(len(data)) # Make a linear fit below mag == self.mean_mag_threshold: self.model.fit(data[mean_mag < self.mean_mag_threshold]) if self.exclude_outliers: # repeat the fit excluding outliers residuals = ( self.model.predict(data[mean_mag < self.mean_mag_threshold]) - data[column][mean_mag < self.mean_mag_threshold] ) outlier = _outlier_mask(residuals) self.model.fit(data[(mean_mag < self.mean_mag_threshold) & ~outlier]) if self.with_instrument_bias: # fit the residuals of the linear fit to a broken line (a cubic spline, actually) Y_1 = self.model.predict(data) Y = np.asarray(data[column]) self.broken_ = Broken() self.broken_.fit(Y_1, Y_1 - Y, sample_weight=sample_weight) if self.exclude_outliers: # repeat the fit excluding outliers outlier = _outlier_mask(Y_1 - Y - self.broken_.predict(Y_1)) self.broken_.fit( Y_1[~outlier], (Y - Y_1)[~outlier], sample_weight=sample_weight[~outlier], ) else: self.broken_ = None if self.fit_uncertainty: self._fit_uncertainty(data) self._fit_missing_mag_uncertainty(gaia_data)
[docs] def predict(self, data, with_instrument_bias=True): """ Predict the ACA magnitude for the given data. Parameters ---------- data : pandas.DataFrame The data to predict the magnitude. """ data = self.missing_value_filler.fill_missing_values(data) res = self.model.predict(data) if self.with_instrument_bias and with_instrument_bias: res += self.broken_.predict(res) return res
def __str__(self) -> str: template = jinja2.Template(GAIA_MODEL_TEMPLATE) myself = template.render( mean_mag_threshold=self.mean_mag_threshold, columns=self.columns, model=self.model, broken=self.broken_, ) return myself def _fit_uncertainty(self, data): """ Fit the uncertainty of the model. Calculates the variance below the mean magnitude threshold. Fits the mean square deviation of the data relative to the model. The fit is done by binning the data in MAG_ACA bins, calculating the root mean square error for each bin, and fitting a broken line to the root mean square error. Parameters ---------- data : pandas.DataFrame The data to fit the uncertainty. """ data = data[ ["g_mag", "bp_mag", "rp_mag", "mag_aca_obs", "ave_gaia_mag", "has_mag"] ].copy() # calculate the residuals data["mag_aca_pred"] = self.predict(data, with_instrument_bias=False) data["mag_aca_pred_res"] = data["mag_aca_pred"] - data["mag_aca_obs"] data["mag_aca_pred_res_2"] = data["mag_aca_pred_res"] ** 2 data["mag_aca_pred_bin"] = np.digitize( data["mag_aca_pred"], np.linspace(-10, 10, 41) ) if self.exclude_outliers: # exclude outliers # these outliers are due to issues in the agasc supplement generation. data = data[~_outlier_mask(data["mag_aca_pred_res"])] # calculate the variance below the mean magnitude threshold (large N) self._base_variance = np.mean( data["mag_aca_pred_res_2"][data["ave_gaia_mag"] < self.mean_mag_threshold] ) # bin the residuals residuals = data.groupby("mag_aca_pred_bin")[ ["ave_gaia_mag", "mag_aca_pred", "mag_aca_pred_res", "mag_aca_pred_res_2"] ].mean() residuals["d_mag_aca_pred_res"] = data.groupby("mag_aca_pred_bin")[ ["mag_aca_pred_res"] ].std() residuals["n"] = data.groupby("mag_aca_pred_bin")["mag_aca_pred_res"].count() residuals = residuals[residuals["n"] > 1] residuals["mag_aca_pred_res_2"] -= self._base_variance residuals.loc[residuals["mag_aca_pred_res_2"] < 0, "mag_aca_pred_res_2"] = 0 residuals = residuals.reset_index() # fit a broken line to the root mean squared binned residuals self.uncertainty_model_ = Broken() self.uncertainty_model_.fit( residuals[["mag_aca_pred"]].values, np.sqrt(residuals["mag_aca_pred_res_2"]) ) self.residuals_ = residuals def _fit_missing_mag_uncertainty(self, data): """ Determine the contribution of missing magnitudes to the uncertainty. To determine the effect of missing magnitudes, this functions uses the subset of the data that has all magnitudes (the vast majority), and imputes the missing magnitudes, and calculates the residuals of the imputed data. The residuals are then binned in magnitude and the standard deviation of the residuals linearly interpolated. """ data = data[data["has_mag"] == 7] mag_aca_pred = self.predict(data, with_instrument_bias=True) missing_mag_uncertainty = {} for i in range(1, 7): sel = np.bitwise_and(data["has_mag"], i).astype(bool) imputed_data = data[sel].copy() assert len(imputed_data), f"No data with (has_mag & {i})" model = self.missing_value_filler.models_[i] imputed_data[model.y] = model.predict(imputed_data[model.x]) # pc = self.principal_components(imputed_data).T # pc_names = [f"pc{i}" for i in range(1, self.n_components + 1)] # for i_comp, name in enumerate(pc_names): # imputed_data[name] = pc[i_comp] imputed_data["mag_aca_pred"] = self.predict( imputed_data, with_instrument_bias=True ) imputed_data["mag_aca_pred_res"] = ( imputed_data["mag_aca_pred"] - mag_aca_pred ) imputed_data["mag_aca_pred_res_2"] = imputed_data["mag_aca_pred_res"] ** 2 imputed_data["mag_aca_bin"] = np.digitize( imputed_data["mag_aca_pred"], np.linspace(-10, 10, 21) ) residuals = imputed_data.groupby("mag_aca_bin")[ [ "mag_aca_pred", "ave_gaia_mag", "mag_aca_pred_res", "mag_aca_pred_res_2", ] ].mean() residuals["d_mag_aca_pred_res"] = imputed_data.groupby("mag_aca_bin")[ ["mag_aca_pred_res"] ].std() residuals["n"] = imputed_data.groupby("mag_aca_bin")[ "mag_aca_pred_res" ].count() residuals = residuals[residuals["n"] > 10] residuals = residuals.reset_index() x = residuals["mag_aca_pred"].values y = np.sqrt(residuals["mag_aca_pred_res_2"].values) missing_mag_uncertainty[i] = Interp1d(x, y) self.missing_mag_uncertainty_ = missing_mag_uncertainty def _uncertainty_2(self, data): assert ( self.model == "PCA" ), "using uncertainty incompatible with model other than PCA" # the model before the broken line fit is # (mag - mean) @ diag(1. / scale) @ K @ coef[1:] + coef[0] mag_error = self.missing_value_filler.uncertainty(data) jacobian = ( np.diag(1.0 / self.std_.flatten())
[docs] @ self.eigenvectors_ @ self.linear_coefficients_[1:, None] ) # strictly speaking, the variance is jacobian.T @ mag_variance @ jacobian # but assuming uncorrelated errors means mag_variance = np.diag(mag_error)**2 # this is (mag_error @ jacobian).T @ (mag_error @ jacobian) uncertainty = np.sqrt(self._base_variance + (mag_error @ jacobian) ** 2) return uncertainty def uncertainty( self, data=None, mag_aca=None, has_mag=None, with_base=True, with_missing_mag=True, with_instrument=True, with_variable_stars=True, ): """ Predict the uncertainty of the model. This function can be called in two ways: 1. With the data as a parameter (pc1 and has_mag will be determined). 2. With mag_aca and the has_mag column as parameters (data must be None). Parameters ---------- data : pandas.DataFrame The data to predict the uncertainty. mag_aca : array-like The estimated mag_aca of the data. has_mag : array-like A column where each entry is a bit mask: 1*has_g_mag + 2*has_rp_mag + 4*has_bp_mag. If it is not given, the assumption is that all magnitudes are known. with_base : bool Include the base uncertainty of the model. with_missing_mag : bool Include the uncertainty due to missing magnitudes. with_instrument : bool Include the uncertainty due to the instrument. with_variable_stars : bool Include the uncertainty due to variable stars. """ assert data is not None or mag_aca is not None, "Must specify data or mag_aca" assert data is None or mag_aca is None, "Cannot specify both data and mag_aca" if data is not None: data = self.missing_value_filler.fill_missing_values(data) has_mag = data["has_mag"] mag_aca = self.predict(data, with_instrument_bias=False) else: if has_mag is None: has_mag = np.ones(mag_aca.shape[0], dtype=int) * 7 # This is the variance of the model (below the magnitude threshold) base_variance = np.ones(mag_aca.shape[0]) * self._base_variance # The uncertainty model (above the magnitude threshold) is a function of predicted mag_aca instrument = self.uncertainty_model_.predict(mag_aca) # missing magnitudes # this is the default case, when all magnitudes are known. missing_mag = np.zeros(mag_aca.shape[0]) # and these are the cases when not all magnitudes are known for i in range(1, 7): sel = has_mag == i missing_mag[sel] = self.missing_mag_uncertainty_[i](mag_aca[sel]) # The variable case # In this case all bets are off, but we can try. variable = np.zeros(mag_aca.shape[0]) if data is not None: var = data["phot_variable_flag"] variable[var] = data["mad_mag_g_fov"][var] result = np.sqrt( (base_variance if with_base else 0) + (instrument**2 if with_instrument else 0) + (missing_mag**2 if with_missing_mag else 0) + (variable**2 if with_variable_stars else 0) ) return result
[docs] @staticmethod def get_weights(train): """ Get the weights for the training data. The weights have two factors: 1. A statistical weight that is inversely proportional to the number of stars in a magnitude bin. 2. An uncertainty weight that is inversely proportional to the uncertainty of the ACA magnitude. The statistical weight is largest at small and large magnitudes, and smallest in the middle. The value of the statistical weight is clipped from above at value corresponding to 9.0 mag, and it is clipped from below at 1/200. This restricts the range of weights. The uncertainty weight is usually largest at small magnitudes. Parameters ---------- train : pandas.DataFrame The training data. Returns ------- weights : array-like The weights for the training data. """ values, bins = np.histogram( (train["g_mag"] + train["bp_mag"] + train["rp_mag"]) / 3, bins=np.linspace(0, 20, 241), ) mask = values >= 200 values[~mask] = values[mask].min() weights = 1 / values i = np.digitize(train["ave_gaia_mag"], bins) - 1 train["stat_weight"] = weights[i] thr = 9.0 train.loc[i > np.digitize(thr, bins), "stat_weight"] = weights[ np.digitize(thr, bins) ] train["stat_weight"] /= np.max(train["stat_weight"]) train["weight"] = ( train["stat_weight"] * train["mag_aca_err_obs"].min() / train["mag_aca_err_obs"] ) return train["weight"]
@property def symbols(self): if not with_sympy: return None if not hasattr(self, "linear_coefficients_"): print("Need to call fit before calling this method") return None return SymbolicGaiaModel(self)
# BaseEstimator adds get_params, set_params, _validate_data and other goodies # RegressorMixin adds a standard score() member function
[docs]class Broken(RegressorMixin, BaseEstimator): def __init__(self, axis=0): self.axis = axis def fit(self, X, y=None, **kwargs): if "sample_weight" in kwargs and kwargs["sample_weight"] is not None: sigma = 1.0 / kwargs["sample_weight"] sigma /= np.mean(sigma) else: sigma = None if len(X.shape) == 1: X = X[:, None] X, y = self._validate_data(X, y) X = X.T[self.axis] # self.mean_ = np.array([np.mean(X), np.mean(y)]) self.mean_ = np.array([np.mean(X), 0]) self.std_ = np.array([np.std(X), np.std(y)]) X = (X - self.mean_[0]) / self.std_[0] y = (y - self.mean_[1]) / self.std_[1] self.params_, self.covariance_ = scipy.optimize.curve_fit( self._broken_2, X, y, p0=[0.5, 0.0, 0.5], sigma=sigma ) # self.params_, self.covariance_ = scipy.optimize.curve_fit( # self._broken, X, y, p0=[0.0, 0.5, 0.0, 0.0, 0.5], sigma=sigma # ) return self def predict(self, X): check_is_fitted(self) if len(X.shape) == 1: X = X[:, None] X = self._validate_data(X) X = X.T[self.axis] y_res = self.mean_[1] + self.std_[1] * self._broken_2( (X - self.mean_[0]) / self.std_[0], *self.params_ ) # y_res = self.mean_[1] + self.std_[1] * self._broken( # (X - self.mean_[0]) / self.std_[0], *self.params_ # ) return y_res @staticmethod def _pars_(b, point, m1, m2, w): # a + bx + cx^2 + dx^3 = v # b + 2cx + 3cx^2 = m x1 = point - w / 2 x2 = point + w / 2 v1 = b + m1 * x1 v2 = b + m1 * point + m2 * (x2 - point) M = np.array( [ [1, x1, x1**2, x1**3], [1, x2, x2**2, x2**3], [0, 1, 2 * x1, 3 * x1**2], [0, 1, 2 * x2, 3 * x2**2], ] ) B = np.array([v1, v2, m1, m2]) return np.linalg.solve(M, B) @staticmethod def _broken_2(x, point, m2, w): return Broken._broken(x, 0, point, 0, m2, w) @staticmethod def _broken(x, b, point, m1, m2, w): # print(b, point, m1, m2, w) # print(min(x), max(x)) y = np.empty(len(x)) x1 = point - w / 2 x2 = point + w / 2 A, B, C, D = Broken._pars_(b, point, m1, m2, w) z = x[(x >= x1) & (x < x2)] y[x < x1] = b + m1 * x[x < x1] y[x >= x2] = b + m1 * (point) + m2 * (x[x >= x2] - point) y[(x >= x1) & (x < x2)] = A + B * z + C * z**2 + D * z**3 return y def __str__(self) -> str: myself = f""" axis: {self.axis} X: mean={self.mean_[0]}, std={self.std_[0]} Y: mean={self.mean_[1]}, std={self.std_[1]} Broken: - b=0, - point={self.params_[0]}, - m1=0, - m2={self.params_[1]}, - w={self.params_[2]} """ return myself
# the following fails with some inputs, so I am not checking it # from sklearn.utils.estimator_checks import check_estimator # check_estimator(Broken()) class SymbolicGaiaModel: def __init__(self, gaia_model): self.g_mag, self.bp_mag, self.rp_mag = sympy.symbols("g_mag bp_mag rp_mag") self.gaia_mag = sympy.Matrix([[self.g_mag], [self.bp_mag], [self.rp_mag]]) self.coefficients = sympy.Matrix(gaia_model.model.linear_coefficients_[None]).T if isinstance(gaia_model.model, SimpleColorModel): self.mean = gaia_model.model.mean_.squeeze() self.scale = 1.0 / gaia_model.model.std_.squeeze() self.model = "SimpleColorModel" self.x = sympy.Matrix( [ [ ((self.bp_mag - self.rp_mag) - gaia_model.model.mean_.squeeze()) / gaia_model.model.std_.squeeze() ] ] ) self.pol_features = SymbolicGaiaModel._polynomial_features( self.x.T, gaia_model.model.degree ) self.mag = self.g_mag + (self.pol_features * self.coefficients)[0] elif type(gaia_model.model) == PrincipalComponentLinearModel: self.mean = sympy.Matrix(gaia_model.model.mean_).T self.scale = sympy.Matrix(1.0 / gaia_model.model.std_).T self.model = "PrincipalComponentLinearModel" self.eigenvectors = sympy.Matrix(gaia_model.model.eigenvectors_).T self.x = self.eigenvectors * prod(self.gaia_mag - self.mean, self.scale) self.pol_features = SymbolicGaiaModel._polynomial_features_2( self.x.T, gaia_model.model.degree ) self.mag = (self.pol_features * self.coefficients)[0] else: raise Exception(f"Unknown model type {type(gaia_model.model)}") self.m1, self.m2, self.m3 = sympy.symbols("M1 M2 M3") self.pc1, self.pc2 = sympy.symbols("PC1 PC2") self.M = sympy.Matrix([[self.m1], [self.m2], [self.m3]]) self.PC = sympy.Matrix([[self.pc1], [self.pc2]]) self.features = sympy.Matrix([1, self.pc1, self.pc2, self.pc2**2]).T def print_model(self): if self.model == "PrincipalComponentLinearModel": print(sympy.latex(prod((self.gaia_mag - self.mean), self.scale))) print(sympy.latex(self.mag)) # sympy.pprint(prod((mag - mean), scale)) def print_mag(self): sympy.pprint(self.mag) def print_model_2(self): if self.model == "PrincipalComponentLinearModel": M = sympy.latex(self.M) M_ = sympy.latex(sympy.latex(prod((self.gaia_mag - self.mean), self.scale))) PC = sympy.latex(self.PC) PC_ = sympy.latex(self.eigenvectors * self.M) mag_aca = sympy.latex(self.features * self.coefficients) text = f"${M} = {M_}$" text += f"${PC} = {PC_}$" text += f"mag_aca = ${mag_aca}$" print(text) @staticmethod def _polynomial_features(x, N): features = sympy.Matrix([1]) i = np.arange(0, len(x)) res = [] for n in range(N): for c in itertools.combinations_with_replacement(i, n + 1): bla = x.col(c[0]) for f in c[1:]: bla = bla * x.col(c[1]) res += [bla] for f in res: features = features.row_join(f) return features @staticmethod def _polynomial_features_2(x, N): features = sympy.Matrix([1]) i = np.arange(1, len(x)) res = [] for n in range(N): for c in itertools.combinations_with_replacement(i, n + 1): bla = x.col(c[0]) for f in c[1:]: bla = bla * x.col(c[1]) res += [bla] features = features.row_join(x.col(0)) for f in res: features = features.row_join(f) return features