"""
A collection of methods to submit queries to Gaia.
"""
import os
import numpy as np
import yaml
from pathlib import Path
from tqdm import tqdm
import astropy.units as u
from astropy.table import Table, join
# from astropy.units import Quantity
from astroquery.gaia import Gaia
from astropy.coordinates import SkyCoord
from agasc import get_stars
from agasc_gaia.gaia_mag_models import mag_v_gaia_mag
[docs]def get_gaia_cone(agasc_id=None, radius=1.5 / 3600, debug=False, name=None):
"""
Convenience method to get Gaia stars in a cone. NOT used in the final analysis.
"""
name = name or "get_gaia_cone"
stars = get_stars([agasc_id])
lower_case_names = [(c, c.lower()) for c in stars.colnames if c != c.lower()]
if lower_case_names:
stars.rename_columns(*zip(*lower_case_names))
star = stars[0]
ra = star["ra"]
dec = star["dec"]
query = f"""SELECT
g.source_id as gaia_id,
g.ra, g.dec, g.ra_error, g.dec_error,
g.pmra as pm_ra, g.pmdec as pm_dec, g.ref_epoch as epoch,
g.phot_g_mean_mag as g_mag, g.phot_bp_mean_mag as bp_mag, g.phot_rp_mean_mag as rp_mag
FROM gaiaedr3.gaia_source AS g
WHERE 1 = CONTAINS(
POINT({ra}, {dec}),
CIRCLE(g.ra, g.dec, {radius})
)"""
if debug:
print(query)
try:
assert Gaia._TapPlus__isLoggedIn, "You must login to Gaia first"
job = Gaia.launch_job_async(query, dump_to_file=False, name=name)
except Exception as e:
print(f"FAILED: {e}")
neighbors = job.get_results()
# neighbors = join(stars, neighbors, keys="agasc_id", table_names=["agasc", "gaia"])
for col in ["agasc_id", "mag_catid", "mag_band"]:
neighbors[col] = star[col]
neighbors["mag_agasc"] = star["mag"]
neighbors["ra_agasc"] = star["ra"]
neighbors["dec_agasc"] = star["dec"]
neighbors["epoch_agasc"] = star["epoch"]
neighbors["pm_ra_agasc"] = star["pm_ra"]
neighbors["pm_dec_agasc"] = star["pm_dec"]
ra = np.array(neighbors["ra_agasc"])
dec = np.array(neighbors["dec_agasc"])
neighbors["coord_agasc"] = SkyCoord(ra=ra, dec=dec, unit="deg")
ra = np.array(neighbors["ra"])
dec = np.array(neighbors["dec"])
pm = ~neighbors["pm_ra"].mask & ~neighbors["pm_dec"].mask
ra[pm] += (
neighbors["pm_ra"][pm]
* (neighbors["epoch_agasc"][pm] - neighbors["epoch"][pm])
/ 1000
/ 3600
/ np.cos(np.deg2rad(neighbors["dec"][pm]))
)
dec[pm] += (
neighbors["pm_dec"][pm]
* (neighbors["epoch_agasc"][pm] - neighbors["epoch"][pm])
/ 1000
/ 3600
)
coord = SkyCoord(ra=ra, dec=dec, unit="deg")
neighbors["d2d"] = coord.separation(neighbors["coord_agasc"])
neighbors["mag_pred"] = mag_v_gaia_mag(neighbors)
return neighbors
[docs]def match_probability(d_mag, d2d):
"""
Rough estimate of the matching probability, as a function of magnitude and position differences.
This is NOT used in the final analysis. It was used in an intermediate stage.
The probability is given by two Gaussian factors:
where:
- The radial factor is a 2d Gaussian ~ r exp(-r^2/sigma^2), with sigma = 1.5 arcsec
- The magnitude factor is a Gaussian with sigma = 0.2. This value comes from the magnitude model fit.
"""
# .. math::
# \begin{eqnarray}
# \frac{r}{ {2 \pi} \sigma_{r} \sigma_{mag}} \exp(-\frac{1}{2}(\Delta mag / \sigma_{mag})^2 - \frac{1}{2}( r/\sigma_{r})^2)
# \end{eqnarray}
# \begin{eqnarray}
# \frac{r}{ {2 \pi} \sigma_{r} \sigma_{mag}} \exp(-\frac{1}{2}(\Delta mag / \sigma_{mag})^2 - \frac{1}{2}( r/\sigma_{r})^2)
# \end{eqnarray}
sigma_mag = 0.2
sigma_d2d = 1.5
p_value = (
np.exp(-0.5 * (d_mag / sigma_mag) ** 2)
/ (np.sqrt(2 * np.pi) * sigma_mag)
* d2d
* np.exp(-0.5 * (d2d / sigma_d2d) ** 2)
/ (np.sqrt(2 * np.pi) * sigma_d2d)
)
return p_value
QUERY = """
SELECT
a.agasc_id, g.source_id as gaia_id,
g.ra, g.dec, g.ra_error, g.dec_error, g.pmra as pm_ra, g.pmdec as pm_dec, g.ref_epoch as epoch,
g.phot_g_mean_mag as g_mag, g.phot_bp_mean_mag as bp_mag, g.phot_rp_mean_mag as rp_mag
FROM agasc as a, {gaia_table} AS g
WHERE
a.agasc_oid >= {oid_min}
a.agasc_oid < {oid_max}
and 1 = CONTAINS(
POINT(a.ra, a.dec),
CIRCLE(g.ra, g.dec, {neighborhood_radius})
)
and (tycho_id IS NOT NULL OR gsc2_3 != '0.0')
"""
QUERY_2 = """
SELECT
a.agasc_id, g.source_id as gaia_id
FROM {agasc_table} as a, {gaia_table} AS g
WHERE
a.{agasc_table}_oid >= {oid_min}
and a.{agasc_table}_oid < {oid_max}
and 1 = CONTAINS(
POINT(a.ra, a.dec),
CIRCLE(g.ra, g.dec, {neighborhood_radius})
)
and (tycho_id IS NOT NULL OR gsc2_3 != '0.0')
"""
QUERY_AGASC_ID = """SELECT
a.agasc_id, g.source_id as gaia_id,
g.ra, g.dec, g.ra_error, g.dec_error, g.pmra as pm_ra, g.pmdec as pm_dec, g.ref_epoch as epoch,
g.phot_g_mean_mag as g_mag, g.phot_bp_mean_mag as bp_mag, g.phot_rp_mean_mag as rp_mag
FROM agasc as a, {gaia_table} AS g
WHERE 1 = CONTAINS(
POINT(a.ra, a.dec),
CIRCLE(g.ra, g.dec, {neighborhood_radius})
) and a.agasc_id in ({agasc_ids})
"""
HIGH_PM_QUERY = """
SELECT *
FROM (
SELECT *
from gaiaedr3.gaia_source AS g2
WHERE g2.pmdec > {pm:.2f}/1.5
or g2.pmdec < -{pm:.2f}/1.5
or g2.pmra > {pm:.2f}/1.5
or g2.pmra < -{pm:.2f}/1.5
) as high_pm
WHERE SQRT(POWER(high_pm.pmra/ cos(high_pm.dec), 2) + POWER(high_pm.pmdec, 2)) > {pm:.2f}
"""
def gaia_login():
"""Login to Gaia archive server.
This is required to submit queries. Requires a file `.gaia_credentials` with credentials in
the current directory.
"""
Gaia.login(credentials_file=".gaia_credentials")
[docs]def get_query(oid_min, oid_max, agasc_table="agasc"):
"""Get the query for a preliminary AGASC-Gaia match, restricted to a range of AGASC OIDs.
Parameters
----------
oid_min : int
The minimum AGASC OID to include in the match. OID is the primary key in the AGASC table.
oid_max : int
The maximum AGASC OID to include in the match. OID is the primary key in the AGASC table.
agasc_table : str, optional
Agasc table to use, by default "agasc". It can be a sub-query.
Returns
-------
str
a query for Gaia
"""
query = QUERY_2.format(
oid_min=oid_min,
oid_max=oid_max,
agasc_table=agasc_table,
gaia_table="gaiaedr3.gaia_source",
neighborhood_radius=15 / 3600,
)
high_pm_query = QUERY_2.format(
oid_min=oid_min,
oid_max=oid_max,
agasc_table=agasc_table,
gaia_table=f"({HIGH_PM_QUERY.format(pm=300)})",
neighborhood_radius=1200 / 3600,
)
query = f"{query} UNION ({high_pm_query})"
return query
def get_gaia_matches_(
agasc_ids, radius=1.5, neighborhood_radius=None, debug=False, name=None, jobid=None
):
assert AGASC_1P8 is not None
if jobid is not None:
jobs = [job for job in Gaia.list_async_jobs() if job.jobid == jobid]
assert jobs, f"Can't find job {jobid}"
job = jobs[0]
gaia_cone = job.get_data()
else:
assert Gaia._TapPlus__isLoggedIn, "You must login to Gaia first"
name = name or "get_gaia_matches"
query = QUERY_AGASC_ID.format(
gaia_table="gaiaedr3.gaia_source",
neighborhood_radius=15 / 3600,
agasc_ids=",".join([str(a) for a in agasc_ids]),
)
high_pm_query = QUERY_AGASC_ID.format(
gaia_table=f"({HIGH_PM_QUERY.format(pm=300)})",
neighborhood_radius=1200 / 3600,
agasc_ids=",".join([str(a) for a in agasc_ids]),
)
query = f"{query} UNION {high_pm_query}"
if debug:
print(query)
job = Gaia.launch_job_async(query=query, name=name)
gaia_cone = job.results
agasc_file = str(Path(os.environ["SKA"], "data", "agasc", "agasc1p7.h5"))
agasc_cone = get_stars(agasc_ids, agasc_file=agasc_file, use_supplement=False)
lower_case_names = [(c, c.lower()) for c in agasc_cone.colnames if c != c.lower()]
if lower_case_names:
agasc_cone.rename_columns(*zip(*lower_case_names))
data = join(agasc_cone, gaia_cone, keys="agasc_id", table_names=["agasc", "gaia"])
if np.any(
(data["xref_id3"] != -9999)
& ((data["epoch_agasc"] == -9999) | (data["epoch_agasc"] == 2000.0))
):
data["epoch_agasc"][
(data["xref_id3"] != -9999)
& ((data["epoch_agasc"] == -9999) | (data["epoch_agasc"] == 2000.0))
] = 1991.5
ra = np.array(data["ra_agasc"])
dec = np.array(data["dec_agasc"])
data["coord_agasc"] = SkyCoord(ra=ra, dec=dec, unit="deg")
ra = np.array(data["ra_gaia"])
dec = np.array(data["dec_gaia"])
pm = ~data["pm_ra_gaia"].mask & ~data["pm_dec_gaia"].mask
ra[pm] += (
data["pm_ra_gaia"][pm]
* (data["epoch_agasc"][pm] - data["epoch_gaia"][pm])
/ 1000
/ 3600
/ np.cos(np.deg2rad(data["dec_gaia"][pm]))
)
dec[pm] += (
data["pm_dec_gaia"][pm]
* (data["epoch_agasc"][pm] - data["epoch_gaia"][pm])
/ 1000
/ 3600
)
data["coord_gaia"] = SkyCoord(ra=ra, dec=dec, unit="deg")
data["d2d"] = data["coord_gaia"].separation(data["coord_agasc"])
data = data[data["d2d"] < radius * u.arcsec]
data["d2d"] = data["d2d"].to(u.arcsec)
cols = [
"gaia_id",
"ra_gaia",
"dec_gaia",
"pm_ra_gaia",
"pm_dec_gaia",
"g_mag",
"rp_mag",
"bp_mag",
"coord_gaia",
"agasc_id",
"ra_agasc",
"dec_agasc",
"pm_ra_agasc",
"pm_dec_agasc",
"mag",
"mag_catid",
"mag_band",
"mag_aca",
"coord_agasc",
"d2d",
]
return data[cols]
[docs]def get_gaia_matches(
agasc_ids, radius=1.5, neighborhood_radius=None, debug=False, jobid=None
):
"""
Convenience method to make a rough match between AGASC and Gaia. NOT used in the final analysis.
Useful for debugging.
"""
matches = get_gaia_matches_(
agasc_ids, radius=radius, neighborhood_radius=neighborhood_radius, jobid=jobid
)
matches["mag_pred"] = mag_v_gaia_mag(matches)
# matches['d_mag'] = matches['mag_pred'] - matches['mag_1p7']
matches["d_mag"] = matches["mag_pred"] - matches["mag"]
sigma_mag = 0.2
sigma_d2d = 1.5
matches["p_value"] = (
np.exp(-0.5 * (matches["d_mag"] / sigma_mag) ** 2)
/ (np.sqrt(2 * np.pi) * sigma_mag)
* matches["d2d"]
* np.exp(-0.5 * (matches["d2d"] / sigma_d2d) ** 2)
/ (np.sqrt(2 * np.pi) * sigma_d2d)
)
matches["log_p_value"] = np.inf
matches["log_p_value"][matches["p_value"] > 0] = -np.log10(
matches["p_value"][matches["p_value"] > 0]
)
fmts = {
"d2d": "0.5f",
"mag_1p7": "0.2f",
"mag_1p8": "0.2f",
"g_mag": "0.2f",
"pm_ra_gaia": "0.2f",
"pm_dec_gaia": "0.2f",
"pm_ra_agasc": "0.2f",
"pm_dec_agasc": "0.2f",
"mag_aca_1p7": "0.2f",
"mag_aca_1p8": "0.2f",
"mag_pred": "0.2f",
"p_value": "0.5f",
"log_p_value": "0.5f",
"d_mag": "0.2f",
}
matches.rename_columns(["mag"], ["mag_1p7"])
matches = join(
matches,
AGASC_1P8[["agasc_id", "mag", "mag_aca", "gaia_id_1p8"]],
keys=["agasc_id"],
join_type="left",
table_names=["1p7", "1p8"],
)
matches.rename_columns(["mag"], ["mag_1p8"])
matches["d_mag"] = matches["mag_pred"] - matches["mag_1p7"]
for name, fmt in fmts.items():
if name in matches.colnames:
matches[name].format = fmt
g = matches.group_by("agasc_id")
i = g.groups.indices[:-1] + np.array(
[np.argmin(gr["log_p_value"]) for gr in g.groups]
)
final_matches = g[i]
if debug:
return matches, final_matches
return final_matches
AGASC_1P8 = None
def load_agasc_1p8(filename):
import tables
global AGASC_1P8
with tables.open_file(filename) as h5:
AGASC_1P8 = Table(h5.root.data[:])
AGASC_1P8.rename_columns(
*zip(*[(c, c.lower()) for c in AGASC_1P8.colnames if c != c.lower()])
)
AGASC_1P8.rename_columns(["xref_id5"], ["gaia_id_1p8"])
class CachedDict(dict):
def __init__(self, *args, filename=None, clear=True, **kwargs):
super().__init__(*args, **kwargs)
if filename is None:
raise Exception('Argument "filename" is required')
self.filename = Path(filename)
if clear and self.filename.exists():
self.filename.unlink()
if self.filename.exists():
with open(self.filename) as fh:
self.update(yaml.load(fh, Loader=yaml.SafeLoader))
if self:
self._cache()
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._cache()
def __delitem__(self, key):
super().__delitem__(key)
self._cache()
def _cache(self):
with open(self.filename, "w") as fh:
yaml.dump(dict(self), fh)
[docs]def cleanup(agasc_table, remove_index=False):
"""Cleanup intermediate file and jobs from a preliminary cross-match between AGASC and Gaia.
Parameters
----------
agasc_table : str
table name
remove_index : bool, optional
whether to remove the YAML file with job IDs, by default False
"""
import re
tables = Gaia.load_tables(only_names=True)
table_names = [t.name.split(".")[1] for t in tables if "user_" in t.name]
# for intermediate tables, we query Gaia to see if they are still there and remove them
intermediate_tables = [
name for name in table_names if re.match(f"{agasc_table}_[0-9]+$", name)
]
for name in tqdm(intermediate_tables):
Gaia.delete_user_table(name)
# for intermediate jobs we can't query Gaia because it does not set the job name,
# so the best we can do is use the cache file if it's there.
info_file = Path(f"gaia_job_info_{agasc_table}.yaml")
if info_file.exists():
with open(info_file, "r") as fh:
info = yaml.load(fh, Loader=yaml.SafeLoader)
Gaia.remove_jobs(
[job.jobid for job in Gaia.list_async_jobs() if job.jobid in info["jobids"]]
)
if remove_index and f"{agasc_table}_gaia_index" in table_names:
Gaia.delete_user_table(f"{agasc_table}_gaia_index")
[docs]def run_full_preliminary_cross_match(
agasc_table="agasc",
resume=False,
n_batches=100,
login=True,
):
"""
Do all the steps to make a preliminary cross-match between AGASC and Gaia DR3.
This process is repeated exactly for the normal "agasc" table, and for the background
"agasc_bkg" table (which has randomly shifted positions).
This function will:
- launch a number (n_batches) of cross-match jobs between {agasc_table} and gaiadr3.gaia_source
- wait for all jobs to finish
- upload the intermediate result tables to the Gaia archive (tables named {agasc_table}_{n})
- submit a query to join the intermediate tables
- wait for this job, and upload the result to the Gaia archive (table {agasc_table}_gaia_index)
- remove the intermediate tables
- submit a final query with get_gaia_matches, which can be downloaded.
This function was written mostly to document all the steps. It is not really intended to be run
on its own. It needs some oversight, because things can fail. For this reason, it has the
"resume" option, which will skip steps that have already been done.
Admitedly this is not ideal. I would rather have a DAG of tasks, but the infrastructure is not
setup for that, so the process evolved into this.
Parameters
----------
agasc_table : str
The AGASC table to use. This is either "agasc" or "agasc_bkg".
resume : bool
If True, skip steps that have already been done.
n_batches : int
Number of batches to split the AGASC catalog into. Each batch will be cross-matched
separately, and the results will be joined at the end.
login : bool
If True, login to Gaia before starting.
"""
import time
import logging
logger = logging.getLogger("astroquery")
log_level = logging.INFO
# logger.setLevel(logging.WARNING)
# agasc_table = 'agasc_bkg'
if login:
gaia_login()
gaia_job_info = CachedDict(
{
"jobids": None,
"union": None,
"final": None,
# 'uploaded': [],
},
filename=f"gaia_job_info_{agasc_table}.yaml",
clear=not resume,
)
# print("Loading AGASC summary")
# agasc_summary = datasets.get_agasc_summary()
# N = len(agasc_summary)
N = 18865970
dN = N // n_batches
oid = [(m, m + dN) for m in np.arange(0, N, dN)]
if gaia_job_info["jobids"] is None:
print("Launching cross-match jobs")
queries = [get_query(m, M, agasc_table) for m, M in oid]
jobs = [
Gaia.launch_job_async(
query=query, name=f"{agasc_table}_{i}", background=True
)
for i, query in enumerate(queries)
]
gaia_job_info["jobids"] = {
job.jobid: f"{agasc_table}_{i}" for i, job in enumerate(jobs)
}
print("Waiting for cross-match jobs")
n_finished = 0
jobs = [
job for job in Gaia.list_async_jobs() if job.jobid in gaia_job_info["jobids"]
]
progress = tqdm(total=len(jobs))
while n_finished < len(jobs):
jobs = [
job
for job in Gaia.list_async_jobs()
if job.jobid in gaia_job_info["jobids"]
]
n_finished = np.count_nonzero([job.is_finished() for job in jobs])
progress.update(n_finished - progress.n)
time.sleep(2)
# no need to do anything on intermediate table is the union is already there
if gaia_job_info["union"] is None:
print("Uploading intermediate tables")
tables = Gaia.load_tables(only_names=True)
table_names = [t.name.split(".")[1] for t in tables if "user_" in t.name]
existing = []
for job in tqdm(jobs):
name = gaia_job_info["jobids"][job.jobid]
if name not in table_names:
Gaia.upload_table_from_job(job=job, table_name=name, verbose=False)
else:
existing.append(name)
print("The following tables were already there:")
for name in existing:
print(f" {name}")
print(
"Waiting for intermediate tables to be uploaded to Gaia archive user space"
)
names = sorted(gaia_job_info["jobids"].values())
logger.setLevel(logging.WARNING)
progress = tqdm(total=len(names))
while names:
tables = Gaia.load_tables(only_names=True)
table_names = [t.name.split(".")[1] for t in tables if "user_" in t.name]
for name in table_names:
if name in names:
progress.update()
names.remove(name)
time.sleep(2)
logger.setLevel(log_level)
print("Joining intermediate tables")
union_job = Gaia.launch_job_async(
query="\nUNION ".join(
[f"(SELECT * FROM {agasc_table}_{i})" for i in range(len(oid))]
),
name=f"{agasc_table}_gaia_index",
background=True,
)
gaia_job_info["union"] = union_job.jobid
print("Waiting for union process")
while True:
union_job = [
job for job in Gaia.list_async_jobs() if job.jobid == gaia_job_info["union"]
]
if not union_job:
raise Exception(f"Can't find union job: {gaia_job_info['union']}")
if union_job[0].is_finished():
break
time.sleep(1)
print("Removing intermediate tables")
logger.setLevel(logging.WARNING)
tables = Gaia.load_tables(only_names=True)
table_names = [t.name.split(".")[1] for t in tables if "user_" in t.name]
logger.setLevel(log_level)
for name in tqdm(
[name for name in gaia_job_info["jobids"].values() if name in table_names]
):
Gaia.delete_user_table(name)
print(f"Uploading final table ({agasc_table}_gaia_index)")
tables = Gaia.load_tables(only_names=True)
table_names = [t.name.split(".")[1] for t in tables if "user_" in t.name]
if f"{agasc_table}_gaia_index" not in table_names:
Gaia.upload_table_from_job(
job=gaia_job_info["union"],
table_name=f"{agasc_table}_gaia_index",
verbose=False,
)
else:
print(f"Table {agasc_table}_gaia_index already exists")
print(f"Waiting for final table ({agasc_table}_gaia_index) to be uploaded")
logger.setLevel(logging.WARNING)
while True:
tables = Gaia.load_tables(only_names=True)
table_names = [t.name.split(".")[1] for t in tables if "user_" in t.name]
if f"{agasc_table}_gaia_index" in table_names:
break
time.sleep(1)
logger.setLevel(log_level)
if gaia_job_info["final"] is None:
query = f"""
SELECT
a.agasc_id, a.gaia_id,
g.ra, g.dec, g.ra_error, g.dec_error,
g.pmra as pm_ra, g.pmdec as pm_dec, g.ref_epoch as epoch,
g.phot_g_mean_mag as g_mag, g.phot_bp_mean_mag as bp_mag, g.phot_rp_mean_mag as rp_mag,
1./phot_g_mean_flux_over_error as g_mag_error, 1./phot_bp_mean_flux_over_error as bp_mag_error, 1./phot_rp_mean_flux_over_error as rp_mag_error,
v.range_mag_g_fov, v.std_dev_mag_g_fov, v.mad_mag_g_fov,
g.phot_variable_flag, g.phot_proc_mode, g.non_single_star
FROM {agasc_table}_gaia_index as a
LEFT JOIN gaiadr3.gaia_source as g ON a.gaia_id = g.source_id
LEFT JOIN gaiadr3.vari_summary as v ON a.gaia_id = v.source_id
"""
final_job = Gaia.launch_job_async(
query=query, name=f"{agasc_table}_gaia", background=True
)
gaia_job_info["final"] = final_job.jobid