Source code for agasc_gaia.gaia_queries

"""
A collection of methods to submit queries to Gaia.
"""

import os
import numpy as np
import yaml
from pathlib import Path
from tqdm import tqdm

import astropy.units as u
from astropy.table import Table, join

# from astropy.units import Quantity
from astroquery.gaia import Gaia
from astropy.coordinates import SkyCoord

from agasc import get_stars

from agasc_gaia.gaia_mag_models import mag_v_gaia_mag


[docs]def get_gaia_cone(agasc_id=None, radius=1.5 / 3600, debug=False, name=None): """ Convenience method to get Gaia stars in a cone. NOT used in the final analysis. """ name = name or "get_gaia_cone" stars = get_stars([agasc_id]) lower_case_names = [(c, c.lower()) for c in stars.colnames if c != c.lower()] if lower_case_names: stars.rename_columns(*zip(*lower_case_names)) star = stars[0] ra = star["ra"] dec = star["dec"] query = f"""SELECT g.source_id as gaia_id, g.ra, g.dec, g.ra_error, g.dec_error, g.pmra as pm_ra, g.pmdec as pm_dec, g.ref_epoch as epoch, g.phot_g_mean_mag as g_mag, g.phot_bp_mean_mag as bp_mag, g.phot_rp_mean_mag as rp_mag FROM gaiaedr3.gaia_source AS g WHERE 1 = CONTAINS( POINT({ra}, {dec}), CIRCLE(g.ra, g.dec, {radius}) )""" if debug: print(query) try: assert Gaia._TapPlus__isLoggedIn, "You must login to Gaia first" job = Gaia.launch_job_async(query, dump_to_file=False, name=name) except Exception as e: print(f"FAILED: {e}") neighbors = job.get_results() # neighbors = join(stars, neighbors, keys="agasc_id", table_names=["agasc", "gaia"]) for col in ["agasc_id", "mag_catid", "mag_band"]: neighbors[col] = star[col] neighbors["mag_agasc"] = star["mag"] neighbors["ra_agasc"] = star["ra"] neighbors["dec_agasc"] = star["dec"] neighbors["epoch_agasc"] = star["epoch"] neighbors["pm_ra_agasc"] = star["pm_ra"] neighbors["pm_dec_agasc"] = star["pm_dec"] ra = np.array(neighbors["ra_agasc"]) dec = np.array(neighbors["dec_agasc"]) neighbors["coord_agasc"] = SkyCoord(ra=ra, dec=dec, unit="deg") ra = np.array(neighbors["ra"]) dec = np.array(neighbors["dec"]) pm = ~neighbors["pm_ra"].mask & ~neighbors["pm_dec"].mask ra[pm] += ( neighbors["pm_ra"][pm] * (neighbors["epoch_agasc"][pm] - neighbors["epoch"][pm]) / 1000 / 3600 / np.cos(np.deg2rad(neighbors["dec"][pm])) ) dec[pm] += ( neighbors["pm_dec"][pm] * (neighbors["epoch_agasc"][pm] - neighbors["epoch"][pm]) / 1000 / 3600 ) coord = SkyCoord(ra=ra, dec=dec, unit="deg") neighbors["d2d"] = coord.separation(neighbors["coord_agasc"]) neighbors["mag_pred"] = mag_v_gaia_mag(neighbors) return neighbors
[docs]def match_probability(d_mag, d2d): """ Rough estimate of the matching probability, as a function of magnitude and position differences. This is NOT used in the final analysis. It was used in an intermediate stage. The probability is given by two Gaussian factors: where: - The radial factor is a 2d Gaussian ~ r exp(-r^2/sigma^2), with sigma = 1.5 arcsec - The magnitude factor is a Gaussian with sigma = 0.2. This value comes from the magnitude model fit. """ # .. math:: # \begin{eqnarray} # \frac{r}{ {2 \pi} \sigma_{r} \sigma_{mag}} \exp(-\frac{1}{2}(\Delta mag / \sigma_{mag})^2 - \frac{1}{2}( r/\sigma_{r})^2) # \end{eqnarray} # \begin{eqnarray} # \frac{r}{ {2 \pi} \sigma_{r} \sigma_{mag}} \exp(-\frac{1}{2}(\Delta mag / \sigma_{mag})^2 - \frac{1}{2}( r/\sigma_{r})^2) # \end{eqnarray} sigma_mag = 0.2 sigma_d2d = 1.5 p_value = ( np.exp(-0.5 * (d_mag / sigma_mag) ** 2) / (np.sqrt(2 * np.pi) * sigma_mag) * d2d * np.exp(-0.5 * (d2d / sigma_d2d) ** 2) / (np.sqrt(2 * np.pi) * sigma_d2d) ) return p_value
QUERY = """ SELECT a.agasc_id, g.source_id as gaia_id, g.ra, g.dec, g.ra_error, g.dec_error, g.pmra as pm_ra, g.pmdec as pm_dec, g.ref_epoch as epoch, g.phot_g_mean_mag as g_mag, g.phot_bp_mean_mag as bp_mag, g.phot_rp_mean_mag as rp_mag FROM agasc as a, {gaia_table} AS g WHERE a.agasc_oid >= {oid_min} a.agasc_oid < {oid_max} and 1 = CONTAINS( POINT(a.ra, a.dec), CIRCLE(g.ra, g.dec, {neighborhood_radius}) ) and (tycho_id IS NOT NULL OR gsc2_3 != '0.0') """ QUERY_2 = """ SELECT a.agasc_id, g.source_id as gaia_id FROM {agasc_table} as a, {gaia_table} AS g WHERE a.{agasc_table}_oid >= {oid_min} and a.{agasc_table}_oid < {oid_max} and 1 = CONTAINS( POINT(a.ra, a.dec), CIRCLE(g.ra, g.dec, {neighborhood_radius}) ) and (tycho_id IS NOT NULL OR gsc2_3 != '0.0') """ QUERY_AGASC_ID = """SELECT a.agasc_id, g.source_id as gaia_id, g.ra, g.dec, g.ra_error, g.dec_error, g.pmra as pm_ra, g.pmdec as pm_dec, g.ref_epoch as epoch, g.phot_g_mean_mag as g_mag, g.phot_bp_mean_mag as bp_mag, g.phot_rp_mean_mag as rp_mag FROM agasc as a, {gaia_table} AS g WHERE 1 = CONTAINS( POINT(a.ra, a.dec), CIRCLE(g.ra, g.dec, {neighborhood_radius}) ) and a.agasc_id in ({agasc_ids}) """ HIGH_PM_QUERY = """ SELECT * FROM ( SELECT * from gaiaedr3.gaia_source AS g2 WHERE g2.pmdec > {pm:.2f}/1.5 or g2.pmdec < -{pm:.2f}/1.5 or g2.pmra > {pm:.2f}/1.5 or g2.pmra < -{pm:.2f}/1.5 ) as high_pm WHERE SQRT(POWER(high_pm.pmra/ cos(high_pm.dec), 2) + POWER(high_pm.pmdec, 2)) > {pm:.2f} """ def gaia_login(): """Login to Gaia archive server. This is required to submit queries. Requires a file `.gaia_credentials` with credentials in the current directory. """ Gaia.login(credentials_file=".gaia_credentials")
[docs]def get_query(oid_min, oid_max, agasc_table="agasc"): """Get the query for a preliminary AGASC-Gaia match, restricted to a range of AGASC OIDs. Parameters ---------- oid_min : int The minimum AGASC OID to include in the match. OID is the primary key in the AGASC table. oid_max : int The maximum AGASC OID to include in the match. OID is the primary key in the AGASC table. agasc_table : str, optional Agasc table to use, by default "agasc". It can be a sub-query. Returns ------- str a query for Gaia """ query = QUERY_2.format( oid_min=oid_min, oid_max=oid_max, agasc_table=agasc_table, gaia_table="gaiaedr3.gaia_source", neighborhood_radius=15 / 3600, ) high_pm_query = QUERY_2.format( oid_min=oid_min, oid_max=oid_max, agasc_table=agasc_table, gaia_table=f"({HIGH_PM_QUERY.format(pm=300)})", neighborhood_radius=1200 / 3600, ) query = f"{query} UNION ({high_pm_query})" return query
def get_gaia_matches_( agasc_ids, radius=1.5, neighborhood_radius=None, debug=False, name=None, jobid=None ): assert AGASC_1P8 is not None if jobid is not None: jobs = [job for job in Gaia.list_async_jobs() if job.jobid == jobid] assert jobs, f"Can't find job {jobid}" job = jobs[0] gaia_cone = job.get_data() else: assert Gaia._TapPlus__isLoggedIn, "You must login to Gaia first" name = name or "get_gaia_matches" query = QUERY_AGASC_ID.format( gaia_table="gaiaedr3.gaia_source", neighborhood_radius=15 / 3600, agasc_ids=",".join([str(a) for a in agasc_ids]), ) high_pm_query = QUERY_AGASC_ID.format( gaia_table=f"({HIGH_PM_QUERY.format(pm=300)})", neighborhood_radius=1200 / 3600, agasc_ids=",".join([str(a) for a in agasc_ids]), ) query = f"{query} UNION {high_pm_query}" if debug: print(query) job = Gaia.launch_job_async(query=query, name=name) gaia_cone = job.results agasc_file = str(Path(os.environ["SKA"], "data", "agasc", "agasc1p7.h5")) agasc_cone = get_stars(agasc_ids, agasc_file=agasc_file, use_supplement=False) lower_case_names = [(c, c.lower()) for c in agasc_cone.colnames if c != c.lower()] if lower_case_names: agasc_cone.rename_columns(*zip(*lower_case_names)) data = join(agasc_cone, gaia_cone, keys="agasc_id", table_names=["agasc", "gaia"]) if np.any( (data["xref_id3"] != -9999) & ((data["epoch_agasc"] == -9999) | (data["epoch_agasc"] == 2000.0)) ): data["epoch_agasc"][ (data["xref_id3"] != -9999) & ((data["epoch_agasc"] == -9999) | (data["epoch_agasc"] == 2000.0)) ] = 1991.5 ra = np.array(data["ra_agasc"]) dec = np.array(data["dec_agasc"]) data["coord_agasc"] = SkyCoord(ra=ra, dec=dec, unit="deg") ra = np.array(data["ra_gaia"]) dec = np.array(data["dec_gaia"]) pm = ~data["pm_ra_gaia"].mask & ~data["pm_dec_gaia"].mask ra[pm] += ( data["pm_ra_gaia"][pm] * (data["epoch_agasc"][pm] - data["epoch_gaia"][pm]) / 1000 / 3600 / np.cos(np.deg2rad(data["dec_gaia"][pm])) ) dec[pm] += ( data["pm_dec_gaia"][pm] * (data["epoch_agasc"][pm] - data["epoch_gaia"][pm]) / 1000 / 3600 ) data["coord_gaia"] = SkyCoord(ra=ra, dec=dec, unit="deg") data["d2d"] = data["coord_gaia"].separation(data["coord_agasc"]) data = data[data["d2d"] < radius * u.arcsec] data["d2d"] = data["d2d"].to(u.arcsec) cols = [ "gaia_id", "ra_gaia", "dec_gaia", "pm_ra_gaia", "pm_dec_gaia", "g_mag", "rp_mag", "bp_mag", "coord_gaia", "agasc_id", "ra_agasc", "dec_agasc", "pm_ra_agasc", "pm_dec_agasc", "mag", "mag_catid", "mag_band", "mag_aca", "coord_agasc", "d2d", ] return data[cols]
[docs]def get_gaia_matches( agasc_ids, radius=1.5, neighborhood_radius=None, debug=False, jobid=None ): """ Convenience method to make a rough match between AGASC and Gaia. NOT used in the final analysis. Useful for debugging. """ matches = get_gaia_matches_( agasc_ids, radius=radius, neighborhood_radius=neighborhood_radius, jobid=jobid ) matches["mag_pred"] = mag_v_gaia_mag(matches) # matches['d_mag'] = matches['mag_pred'] - matches['mag_1p7'] matches["d_mag"] = matches["mag_pred"] - matches["mag"] sigma_mag = 0.2 sigma_d2d = 1.5 matches["p_value"] = ( np.exp(-0.5 * (matches["d_mag"] / sigma_mag) ** 2) / (np.sqrt(2 * np.pi) * sigma_mag) * matches["d2d"] * np.exp(-0.5 * (matches["d2d"] / sigma_d2d) ** 2) / (np.sqrt(2 * np.pi) * sigma_d2d) ) matches["log_p_value"] = np.inf matches["log_p_value"][matches["p_value"] > 0] = -np.log10( matches["p_value"][matches["p_value"] > 0] ) fmts = { "d2d": "0.5f", "mag_1p7": "0.2f", "mag_1p8": "0.2f", "g_mag": "0.2f", "pm_ra_gaia": "0.2f", "pm_dec_gaia": "0.2f", "pm_ra_agasc": "0.2f", "pm_dec_agasc": "0.2f", "mag_aca_1p7": "0.2f", "mag_aca_1p8": "0.2f", "mag_pred": "0.2f", "p_value": "0.5f", "log_p_value": "0.5f", "d_mag": "0.2f", } matches.rename_columns(["mag"], ["mag_1p7"]) matches = join( matches, AGASC_1P8[["agasc_id", "mag", "mag_aca", "gaia_id_1p8"]], keys=["agasc_id"], join_type="left", table_names=["1p7", "1p8"], ) matches.rename_columns(["mag"], ["mag_1p8"]) matches["d_mag"] = matches["mag_pred"] - matches["mag_1p7"] for name, fmt in fmts.items(): if name in matches.colnames: matches[name].format = fmt g = matches.group_by("agasc_id") i = g.groups.indices[:-1] + np.array( [np.argmin(gr["log_p_value"]) for gr in g.groups] ) final_matches = g[i] if debug: return matches, final_matches return final_matches
AGASC_1P8 = None def load_agasc_1p8(filename): import tables global AGASC_1P8 with tables.open_file(filename) as h5: AGASC_1P8 = Table(h5.root.data[:]) AGASC_1P8.rename_columns( *zip(*[(c, c.lower()) for c in AGASC_1P8.colnames if c != c.lower()]) ) AGASC_1P8.rename_columns(["xref_id5"], ["gaia_id_1p8"]) class CachedDict(dict): def __init__(self, *args, filename=None, clear=True, **kwargs): super().__init__(*args, **kwargs) if filename is None: raise Exception('Argument "filename" is required') self.filename = Path(filename) if clear and self.filename.exists(): self.filename.unlink() if self.filename.exists(): with open(self.filename) as fh: self.update(yaml.load(fh, Loader=yaml.SafeLoader)) if self: self._cache() def __setitem__(self, key, value): super().__setitem__(key, value) self._cache() def __delitem__(self, key): super().__delitem__(key) self._cache() def _cache(self): with open(self.filename, "w") as fh: yaml.dump(dict(self), fh)
[docs]def cleanup(agasc_table, remove_index=False): """Cleanup intermediate file and jobs from a preliminary cross-match between AGASC and Gaia. Parameters ---------- agasc_table : str table name remove_index : bool, optional whether to remove the YAML file with job IDs, by default False """ import re tables = Gaia.load_tables(only_names=True) table_names = [t.name.split(".")[1] for t in tables if "user_" in t.name] # for intermediate tables, we query Gaia to see if they are still there and remove them intermediate_tables = [ name for name in table_names if re.match(f"{agasc_table}_[0-9]+$", name) ] for name in tqdm(intermediate_tables): Gaia.delete_user_table(name) # for intermediate jobs we can't query Gaia because it does not set the job name, # so the best we can do is use the cache file if it's there. info_file = Path(f"gaia_job_info_{agasc_table}.yaml") if info_file.exists(): with open(info_file, "r") as fh: info = yaml.load(fh, Loader=yaml.SafeLoader) Gaia.remove_jobs( [job.jobid for job in Gaia.list_async_jobs() if job.jobid in info["jobids"]] ) if remove_index and f"{agasc_table}_gaia_index" in table_names: Gaia.delete_user_table(f"{agasc_table}_gaia_index")
[docs]def run_full_preliminary_cross_match( agasc_table="agasc", resume=False, n_batches=100, login=True, ): """ Do all the steps to make a preliminary cross-match between AGASC and Gaia DR3. This process is repeated exactly for the normal "agasc" table, and for the background "agasc_bkg" table (which has randomly shifted positions). This function will: - launch a number (n_batches) of cross-match jobs between {agasc_table} and gaiadr3.gaia_source - wait for all jobs to finish - upload the intermediate result tables to the Gaia archive (tables named {agasc_table}_{n}) - submit a query to join the intermediate tables - wait for this job, and upload the result to the Gaia archive (table {agasc_table}_gaia_index) - remove the intermediate tables - submit a final query with get_gaia_matches, which can be downloaded. This function was written mostly to document all the steps. It is not really intended to be run on its own. It needs some oversight, because things can fail. For this reason, it has the "resume" option, which will skip steps that have already been done. Admitedly this is not ideal. I would rather have a DAG of tasks, but the infrastructure is not setup for that, so the process evolved into this. Parameters ---------- agasc_table : str The AGASC table to use. This is either "agasc" or "agasc_bkg". resume : bool If True, skip steps that have already been done. n_batches : int Number of batches to split the AGASC catalog into. Each batch will be cross-matched separately, and the results will be joined at the end. login : bool If True, login to Gaia before starting. """ import time import logging logger = logging.getLogger("astroquery") log_level = logging.INFO # logger.setLevel(logging.WARNING) # agasc_table = 'agasc_bkg' if login: gaia_login() gaia_job_info = CachedDict( { "jobids": None, "union": None, "final": None, # 'uploaded': [], }, filename=f"gaia_job_info_{agasc_table}.yaml", clear=not resume, ) # print("Loading AGASC summary") # agasc_summary = datasets.get_agasc_summary() # N = len(agasc_summary) N = 18865970 dN = N // n_batches oid = [(m, m + dN) for m in np.arange(0, N, dN)] if gaia_job_info["jobids"] is None: print("Launching cross-match jobs") queries = [get_query(m, M, agasc_table) for m, M in oid] jobs = [ Gaia.launch_job_async( query=query, name=f"{agasc_table}_{i}", background=True ) for i, query in enumerate(queries) ] gaia_job_info["jobids"] = { job.jobid: f"{agasc_table}_{i}" for i, job in enumerate(jobs) } print("Waiting for cross-match jobs") n_finished = 0 jobs = [ job for job in Gaia.list_async_jobs() if job.jobid in gaia_job_info["jobids"] ] progress = tqdm(total=len(jobs)) while n_finished < len(jobs): jobs = [ job for job in Gaia.list_async_jobs() if job.jobid in gaia_job_info["jobids"] ] n_finished = np.count_nonzero([job.is_finished() for job in jobs]) progress.update(n_finished - progress.n) time.sleep(2) # no need to do anything on intermediate table is the union is already there if gaia_job_info["union"] is None: print("Uploading intermediate tables") tables = Gaia.load_tables(only_names=True) table_names = [t.name.split(".")[1] for t in tables if "user_" in t.name] existing = [] for job in tqdm(jobs): name = gaia_job_info["jobids"][job.jobid] if name not in table_names: Gaia.upload_table_from_job(job=job, table_name=name, verbose=False) else: existing.append(name) print("The following tables were already there:") for name in existing: print(f" {name}") print( "Waiting for intermediate tables to be uploaded to Gaia archive user space" ) names = sorted(gaia_job_info["jobids"].values()) logger.setLevel(logging.WARNING) progress = tqdm(total=len(names)) while names: tables = Gaia.load_tables(only_names=True) table_names = [t.name.split(".")[1] for t in tables if "user_" in t.name] for name in table_names: if name in names: progress.update() names.remove(name) time.sleep(2) logger.setLevel(log_level) print("Joining intermediate tables") union_job = Gaia.launch_job_async( query="\nUNION ".join( [f"(SELECT * FROM {agasc_table}_{i})" for i in range(len(oid))] ), name=f"{agasc_table}_gaia_index", background=True, ) gaia_job_info["union"] = union_job.jobid print("Waiting for union process") while True: union_job = [ job for job in Gaia.list_async_jobs() if job.jobid == gaia_job_info["union"] ] if not union_job: raise Exception(f"Can't find union job: {gaia_job_info['union']}") if union_job[0].is_finished(): break time.sleep(1) print("Removing intermediate tables") logger.setLevel(logging.WARNING) tables = Gaia.load_tables(only_names=True) table_names = [t.name.split(".")[1] for t in tables if "user_" in t.name] logger.setLevel(log_level) for name in tqdm( [name for name in gaia_job_info["jobids"].values() if name in table_names] ): Gaia.delete_user_table(name) print(f"Uploading final table ({agasc_table}_gaia_index)") tables = Gaia.load_tables(only_names=True) table_names = [t.name.split(".")[1] for t in tables if "user_" in t.name] if f"{agasc_table}_gaia_index" not in table_names: Gaia.upload_table_from_job( job=gaia_job_info["union"], table_name=f"{agasc_table}_gaia_index", verbose=False, ) else: print(f"Table {agasc_table}_gaia_index already exists") print(f"Waiting for final table ({agasc_table}_gaia_index) to be uploaded") logger.setLevel(logging.WARNING) while True: tables = Gaia.load_tables(only_names=True) table_names = [t.name.split(".")[1] for t in tables if "user_" in t.name] if f"{agasc_table}_gaia_index" in table_names: break time.sleep(1) logger.setLevel(log_level) if gaia_job_info["final"] is None: query = f""" SELECT a.agasc_id, a.gaia_id, g.ra, g.dec, g.ra_error, g.dec_error, g.pmra as pm_ra, g.pmdec as pm_dec, g.ref_epoch as epoch, g.phot_g_mean_mag as g_mag, g.phot_bp_mean_mag as bp_mag, g.phot_rp_mean_mag as rp_mag, 1./phot_g_mean_flux_over_error as g_mag_error, 1./phot_bp_mean_flux_over_error as bp_mag_error, 1./phot_rp_mean_flux_over_error as rp_mag_error, v.range_mag_g_fov, v.std_dev_mag_g_fov, v.mad_mag_g_fov, g.phot_variable_flag, g.phot_proc_mode, g.non_single_star FROM {agasc_table}_gaia_index as a LEFT JOIN gaiadr3.gaia_source as g ON a.gaia_id = g.source_id LEFT JOIN gaiadr3.vari_summary as v ON a.gaia_id = v.source_id """ final_job = Gaia.launch_job_async( query=query, name=f"{agasc_table}_gaia", background=True ) gaia_job_info["final"] = final_job.jobid