import warnings
import glob
import numpy as np
import scipy.ndimage as ndi
import scipy.optimize as optimize
from scipy.interpolate import interp1d, LinearNDInterpolator
from corgidrp.data import Dataset, SpectroscopyCentroidPSF, DispersionModel, LineSpread, SpecFluxCal, SpecFilterOffset, SlitTransmission
import os
from astropy.io import ascii, fits
from astropy.table import Table
import astropy.modeling.models as models
import astropy.modeling.fitting as fitting
import corgidrp
from corgidrp.fluxcal import get_filter_name, read_cal_spec, read_filter_curve, get_calspec_file
[docs]
def gauss2d(x0, y0, sigma_x, sigma_y, peak):
"""
2d gaussian function for gaussfit2d
Args:
x0: center of gaussian
y0: center of gaussian
peak: peak amplitude of gaussian
sigma_x: stddev in x direction
sigma_y: stddev in y direction
Returns:
function evaluated at coordinate tuple y,x
"""
return lambda y,x: peak * np.exp(-( ((x - x0) / sigma_x) ** 2 + ((y - y0) / sigma_y) **2 ) / 2)
[docs]
def gaussfit2d_pix(frame, xguess, yguess, xfwhm_guess=3, yfwhm_guess=6,
halfwidth=3, halfheight=5, guesspeak=1, oversample=5, refinefit=True):
"""
Fits a 2-d Gaussian to the data at point (xguess, yguess), with pixel integration.
Args:
frame: the data - Array of size (y,x)
xguess: location to fit the 2d gaussian to (should be within +/-1 pixel of true peak)
yguess: location to fit the 2d gaussian to (should be within +/-1 pixel of true peak)
xfwhm_guess: approximate x-axis fwhm to fit to
yfwhm_guess: approximate y-axis fwhm to fit to
halfwidth: 1/2 the width of the box used for the fit
halfheight: 1/2 the height of the box used for the fit
guesspeak: approximate flux in peak pixel
oversample: odd integer >= 3; to represent detector pixels, oversample and then bin the model by this factor
refinefit: whether to refine the fit of the position of the guess
Returns:
xfit: x position (only changed if refinefit is True)
yfit: y position (only changed if refinefit is True)
xfwhm: x-axis fwhm of the PSF in pixels
yfwhm: y-axis fwhm of the PSF in pixels
peakflux: the peak value of the gaussian
fitbox: 2-d array of the fitted region from the data array
model: 2-d array of the best-fit model
residual: 2-d array of residuals (data - model)
"""
if not isinstance(halfwidth, int):
raise ValueError("halfwidth must be an integer")
if not isinstance(halfheight, int):
raise ValueError("halfheight must be an integer")
if not isinstance(oversample, int) or (oversample % 2 != 1) or (oversample < 3):
raise ValueError("oversample must be an odd integer >= 3")
x0 = np.rint(xguess).astype(int)
y0 = np.rint(yguess).astype(int)
fitbox = np.copy(frame[y0 - halfheight:y0 + halfheight + 1,
x0 - halfwidth:x0 + halfwidth + 1])
nrows = fitbox.shape[0]
ncols = fitbox.shape[1]
fitbox[np.where(np.isnan(fitbox))] = 0
oversampled_ycoord = np.linspace(-(oversample // 2) / oversample,
nrows - 1 + (oversample // 2) / oversample + 1./oversample,
nrows * oversample)
oversampled_xcoord = np.linspace(-(oversample // 2) / oversample,
ncols - 1 + (oversample // 2) / oversample + 1./oversample,
ncols * oversample)
oversampled_grid = np.meshgrid(oversampled_ycoord, oversampled_xcoord, indexing='ij')
if refinefit:
errorfunction = lambda p: np.ravel(
np.reshape(gauss2d(*p)(*oversampled_grid),
(nrows, oversample, ncols, oversample)).mean(
axis = 1).mean(axis = 2) - fitbox)
guess = (halfwidth, halfheight,
xfwhm_guess/(2 * np.sqrt(2*np.log(2))),
yfwhm_guess/(2 * np.sqrt(2*np.log(2))),
guesspeak)
p, success = optimize.leastsq(errorfunction, guess)
xfit = p[0] + (x0 - halfwidth)
yfit = p[1] + (y0 - halfheight)
xfwhm = p[2] * (2 * np.sqrt(2*np.log(2)))
yfwhm = p[3] * (2 * np.sqrt(2*np.log(2)))
peakflux = p[4]
model = np.reshape(gauss2d(*p)(*oversampled_grid),
(nrows, oversample, ncols, oversample)).mean(
axis = 1).mean(axis = 2)
residual = fitbox - model
else:
model = np.reshape(gauss2d(*guess)(*oversampled_grid),
(nrows, oversample, ncols, oversample)).mean(
axis = 1).mean(axis = 2)
residual = fitbox - model
xfit = xfit
yfit = yfit
xfwhm = xfwhm_guess
yfwhm = yfwhm_guess
peakflux = guesspeak
return xfit, yfit, xfwhm, yfwhm, peakflux, fitbox, model, residual
[docs]
def psf_registration_costfunc(p, template, data):
"""
Cost function for a least-squares fit to register a PSF with a fitting template.
Args:
p (tuple): shift and scale parameters:
(x-axis shift in pixels, y-axis shift in pixels,
amplitude scale factor)
template (numpy.ndarray): PSF tempate array, 2d
data (numpy.ndarray): PSF data array, 2d
Returns:
The sum of squares of differences between the data array and the shifted
and scaled template.
"""
xshift = p[0]
yshift = p[1]
amp = p[2]
shifted_template = amp * ndi.shift(template, (yshift, xshift), order=1, prefilter=False)
return np.sum((data - shifted_template)**2)
[docs]
def get_center_of_mass(frame):
"""
Finds the center coordinates for a given frame.
Args:
frame (np.ndarray): 2D array to compute centering
Returns:
tuple:
xcen (float): X centroid coordinate
ycen (float): Y centroid coordinate
"""
y, x = np.indices(frame.shape)
ycen = np.sum(y * frame)/np.sum(frame)
xcen = np.sum(x * frame)/np.sum(frame)
return xcen, ycen
[docs]
def rotate_points(points, angle_rad, pivot_point):
"""
Rotate an array of (x,y) coordinates by an angle about a pivot point.
Args:
points (tuple): Two-element tuple of (x,y) coordinates.
The first element is an array of x values;
the second element is an array of y values.
angle_rad (float): Rotation angle in radians
pivot_point (tuple): Tuple of (x,y) coordinates of the pivot point.
Returns:
Two-element tuple of rotated (x,y) coordinates.
"""
rotated_points = (points[0] - pivot_point[0], points[1] - pivot_point[1])
rotated_points = (rotated_points[0] * np.cos(angle_rad) - rotated_points[1] * np.sin(angle_rad),
rotated_points[0] * np.sin(angle_rad) + rotated_points[1] * np.cos(angle_rad))
rotated_points = (rotated_points[0] + pivot_point[0], rotated_points[1] + pivot_point[1])
return rotated_points
[docs]
def fit_psf_centroid(psf_data, psf_template,
xcent_template = None, ycent_template = None,
xcent_guess = None, ycent_guess = None,
halfwidth = 10, halfheight = 10,
fwhm_major_guess = 3, fwhm_minor_guess = 6,
gauss2d_oversample = 9):
"""
Fit the centroid of a PSF image with a template.
Args:
psf_data (np.ndarray): PSF data, 2D array
psf_template (np.ndarray): PSF template, 2D array
xcent_template (float): true x centroid of the template PSF; for accurate
results this must be determined in advance.
ycent_template (float): true y centroid of the template PSF; for accurate
results this must be determined in advance.
xcent_guess (int): Estimate of the x centroid of the data array, pixels
ycent_guess (int): Estimate of the y centroid of the data array, pixels
halfwidth (int): Half-width of the fitting region, pixels
halfheight (int): Half-height of the fitting region, pixels
fwhm_major_guess (float): guess for FWHM value along major axis of PSF, pixels
fwhm_minor_guess (float): guess for FWHM value along minor axis of PSF, pixels
gauss2d_oversample (int): upsample factor for 2-D Gaussian PSF fit;
this must be an odd number.
Returns:
xfit (float): Data PSF x centroid obtained from the template fit,
array pixels
yfit (float): Data PSF y centroid obtained from the template fit,
array pixels
gauss2d_xfit (float): Data PSF x centroid estimated by a 2-D Gaussian fit to
the main lobe of the PSF
gauss2d_yfit (float): Data PSF y centroid estimated by a 2-D Gaussian fit to
the main lobe of the PSF
peakpix_snr (float): Peak-pixel signal-to-noise ratio
x_precis (float): Statistical precision of the x centroid fit, estimated from
peak-pixel S/N ratio
y_precis (float): Statistical precision of the y centroid fit, estimated from
peak-pixel S/N ratio
"""
if not isinstance(halfheight, int):
raise ValueError("halfheight must be an integer")
# Use the center of mass as a starting point if positions were not provided.
if xcent_template is None or ycent_template is None:
xcom_template, ycom_template = np.rint(get_center_of_mass(psf_template))
xcent_template, ycent_template = xcom_template, ycom_template
else:
xcom_template, ycom_template = (np.rint(xcent_template), np.rint(ycent_template))
if xcent_guess is None or ycent_guess is None:
median_filt_psf = ndi.median_filter(psf_data, size=2)
xcom_data, ycom_data = np.rint(get_center_of_mass(median_filt_psf))
else:
xcom_data, ycom_data = (np.rint(xcent_guess), np.rint(ycent_guess))
xmin_template_cut, xmax_template_cut = (int(xcom_template) - halfwidth, int(xcom_template) + halfwidth)
ymin_template_cut, ymax_template_cut = (int(ycom_template) - halfheight, int(ycom_template) + halfheight)
xmin_data_cut, xmax_data_cut = (int(xcom_data) - halfwidth, int(xcom_data) + halfwidth)
ymin_data_cut, ymax_data_cut = (int(ycom_data) - halfheight, int(ycom_data) + halfheight)
template_stamp = psf_template[ymin_template_cut:ymax_template_cut+1, xmin_template_cut:xmax_template_cut+1]
data_stamp = psf_data[ymin_data_cut:ymax_data_cut+1, xmin_data_cut:xmax_data_cut+1]
xoffset_guess, yoffset_guess = (0.0, 0.0)
amp_guess = np.sum(psf_data) / np.sum(psf_template)
guess_params = (xoffset_guess, yoffset_guess, amp_guess)
registration_result = optimize.minimize(psf_registration_costfunc, guess_params,
args=(template_stamp, data_stamp),
method='Powell')
if not registration_result.success:
print(f"Warning: Registration optimization did not converge: {registration_result.message}")
xfit = xcent_template + (xcom_data - xcom_template) + registration_result.x[0]
yfit = ycent_template + (ycom_data - ycom_template) + registration_result.x[1]
psf_data_bkg = psf_data.copy()
psf_data_bkg[ymin_data_cut:ymax_data_cut+1, xmin_data_cut:xmax_data_cut+1] = np.nan
psf_peakpix_snr = np.max(psf_data) / np.nanstd(psf_data_bkg)
(gauss2d_xfit, gauss2d_yfit, xfwhm, yfwhm, gauss2d_peakfit,
fitted_data_stamp, model, residual) = gaussfit2d_pix(psf_data,
xguess = xfit,
yguess = yfit,
xfwhm_guess = fwhm_minor_guess,
yfwhm_guess = fwhm_major_guess,
halfwidth = 1, halfheight = halfheight,
guesspeak = np.max(psf_data), oversample = gauss2d_oversample,
refinefit = True)
(x_precis, y_precis) = (np.abs(xfwhm) / (2 * np.sqrt(2 * np.log(2))) / psf_peakpix_snr,
np.abs(yfwhm) / (2 * np.sqrt(2 * np.log(2))) / psf_peakpix_snr)
return xfit, yfit, gauss2d_xfit, gauss2d_yfit, psf_peakpix_snr, x_precis, y_precis
[docs]
def get_template_dataset(dataset):
"""
return the default template dataset from the data/spectroscopy/templates files
Args:
dataset (Dataset): Dataset containing 2D PSF images. Each image must include pri_hdr and ext_hdr.
Returns:
Dataset: template dataset
boolean: filtersweep true or false
"""
template_dir = os.path.join(os.path.dirname(__file__), "data", "spectroscopy", "templates")
filtersweep = False
cfamname = []
slits = []
for frames in dataset.frames:
dpamname = frames.ext_hdr['DPAMNAME']
fsamname = frames.ext_hdr['FSAMNAME']
if dpamname != "PRISM3":
raise AttributeError("currently we only have template files for PRISM3, not for "+ dpamname)
cfamname.append (frames.ext_hdr['CFAMNAME'])
slits.append (fsamname)
if len(np.unique(slits)) != 1:
raise AttributeError("currently we only have template files for no slit or R1C2, not for "+ slits)
if len(np.unique(cfamname)) == 1:
band = cfamname[0]
if not band.startswith ("3"):
raise AttributeError("currently we only have template files for the filter band 3, not for "+ band)
slit = slits[0]
if slit == "R1C2":
filenames = sorted(glob.glob(os.path.join(template_dir,"spec_unocc_r1c2slit_offset_prism3_3d_*.fits")))
elif slit == "OPEN":
filenames = sorted(glob.glob(os.path.join(template_dir,"spec_unocc_noslit_offset_prism3_3d_*.fits")))
else:
raise AttributeError("we do not (yet) have template files for slit " + slit)
else:
#filtersweep
filenames = sorted(glob.glob(os.path.join(template_dir, "spec_unocc_noslit_prism3_filtersweep_*.fits")))
filtersweep = True
return Dataset(filenames), filtersweep
[docs]
def compute_psf_centroid(dataset, template_dataset = None, initial_cent = None, filtersweep = False, halfwidth=10, halfheight=10, verbose = False):
"""
Compute PSF centroids for a grid of PSFs and return them as a calibration object.
Args:
dataset (Dataset): Dataset containing 2D PSF images. Each image must include pri_hdr and ext_hdr.
template_dataset (Dataset): dataset of the template PSF, if None, a simulated PSF from the data/spectroscopy/template path is taken
initial_cent (dict): Dictionary with initial guesses for PSF centroids.
Must include keys 'xcent' and 'ycent', each mapping to an array of shape (N,).
filtersweep (bool): If True, it uses a filter sweep/scan dataset, this parameter is only relevant if template_dataset is not None.
halfwidth (int): Half-width of the PSF fitting box.
halfheight (int): Half-height of the PSF fitting box.
verbose (bool): If True, prints fitted centroid values for each frame.
Returns:
SpectroscopyCentroidPSF: Calibration object with fitted (x, y) centroids.
"""
if not isinstance(dataset, Dataset):
raise TypeError("Input must be a corgidrp.data.Dataset object.")
pri_hdr_centroid, ext_hdr_centroid, _ , _ = corgidrp.check.merge_headers(
dataset,
any_true_keywords=['DESMEAR', 'CTI_CORR'],
invalid_keywords=[
#pri header
'OPGAIN', 'PHTCNT', 'FRAMET', 'PA_V3', 'PA_APER', 'SVB_1', 'SVB_2', 'SVB_3', 'ROLL',
'PITCH', 'YAW', 'WBJ_1', 'WBJ_2', 'WBJ_3', 'VISITID',
#ext header
'DATETIME', 'FTIMEUTC', 'FRMTYPE', 'ISHOWFSC', 'ISACQ', 'SPBAL', 'ISFLAT', 'SATSPOTS', 'STATUS',
'HVCBIAS', 'OPMODE', 'EMGAIN_C', 'BLNKTIME', 'BLNKCYC', 'EXPCYC', 'OVEREXP', 'NOVEREXP',
'PROXET', 'FCMLOOP', 'FCMPOS', 'FSMINNER', 'FSMLOS', 'FSMPRFL', 'FSMRSTR',
'FSMSG1', 'FSMSG2', 'FSMSG3', 'FSMX', 'FSMY',
'EACQ_ROW', 'EACQ_COL', 'SB_FP_DX', 'SB_FP_DY', 'SB_FS_DX', 'SB_FS_DY',
'DMZLOOP', 'ISVALID',
'Z2AVG', 'Z3AVG', 'Z4AVG', 'Z5AVG', 'Z6AVG', 'Z7AVG', 'Z8AVG', 'Z9AVG',
'Z10AVG', 'Z11AVG', 'Z12AVG', 'Z13AVG', 'Z14AVG',
'Z2RES', 'Z3RES', 'Z4RES', 'Z5RES', 'Z6RES', 'Z7RES', 'Z8RES', 'Z9RES',
'Z10RES', 'Z11RES', 'Z2VAR', 'Z3VAR',
'FWC_PP_E', 'FWC_EM_E', 'SAT_DN',
'CFAM_H', 'CFAM_V', 'CFAMNAME', 'CFAMSP_H', 'CFAMSP_V', 'DATETIME', 'FTIMEUTC'
]
)
if initial_cent is None:
xcent, ycent = None, None
else:
xcent = np.asarray(initial_cent.get("xcent"))
ycent = np.asarray(initial_cent.get("ycent"))
if xcent is None or ycent is None:
raise ValueError("initial_cent dictionary must contain 'xcent' and 'ycent' arrays.")
if len(dataset) != len(xcent) or len(dataset) != len(ycent):
raise ValueError("Mismatch between dataset length and centroid guess arrays.")
if template_dataset is None:
template_dataset, filtersweep = get_template_dataset(dataset)
xcent_temp = []
ycent_temp = []
for frame in template_dataset:
if "XCENT" in frame.ext_hdr:
xcent_temp.append(frame.ext_hdr["XCENT"])
else:
xcent_temp.append(None)
if "YCENT" in frame.ext_hdr:
ycent_temp.append(frame.ext_hdr["YCENT"])
else:
ycent_temp.append(None)
xcent_temp = np.asarray(xcent_temp)
ycent_temp = np.asarray(ycent_temp)
centroids = np.zeros((len(dataset), 2))
centroids_err = np.zeros((len(dataset), 2))
filters = []
for idx, frame in enumerate(dataset):
cfam = frame.ext_hdr['CFAMNAME']
filters.append(cfam)
psf_data = frame.data
if xcent is None:
xguess, yguess = None, None
else:
xguess = xcent[idx]
yguess = ycent[idx]
if filtersweep:
found_cfam = False
for k, temp_frame in enumerate(template_dataset):
cfam_temp = temp_frame.ext_hdr['CFAMNAME']
if cfam == cfam_temp:
temp_psf_data = temp_frame.data
temp_x = xcent_temp[k]
temp_y = ycent_temp[k]
found_cfam = True
break
if found_cfam == False:
raise AttributeError("no template image found for filter: "+cfam)
else:
if idx < len(template_dataset):
temp_psf_data = template_dataset[idx].data
temp_x = xcent_temp[idx]
temp_y = ycent_temp[idx]
else:
temp_psf_data = template_dataset[-1].data
temp_x = xcent_temp[-1]
temp_y = ycent_temp[-1]
# larger fitting stamp needed for broadband filter
if cfam == '2' or cfam == '3':
halfheight = 30
xfit, yfit, gauss2d_xfit, gauss2d_yfit, psf_peakpix_snr, x_precis, y_precis = fit_psf_centroid(
psf_data, temp_psf_data,
xcent_template=temp_x,
ycent_template=temp_y,
xcent_guess=xguess,
ycent_guess=yguess,
halfwidth=halfwidth,
halfheight=halfheight
)
centroids[idx] = [xfit, yfit]
centroids_err[idx] = [x_precis, y_precis]
if verbose:
print(f"Slice {idx}: x = {xfit:.3f}, y = {yfit:.3f}")
if filtersweep:
ext_hdr_centroid['FILTERS'] = ",".join(filters)
else:
ext_hdr_centroid['FILTERS'] = filters[0]
# set CFAMNAME to the PRISM band
ext_hdr_centroid['CFAMNAME'] = cfam[0]
calibration = SpectroscopyCentroidPSF(
centroids,
pri_hdr=pri_hdr_centroid.copy(),
ext_hdr=ext_hdr_centroid.copy(),
err_hdr = fits.Header(),
err = centroids_err,
input_dataset=dataset
)
return calibration
[docs]
def read_cent_wave(band, filter_file = None):
"""
read the csv filter file containing the band names and the central wavelength in nm.
There are 6 columns: the CFAM filter name, the (Phase C) center wavelength, the TVAC TV-40b measured center wavelength
and the FWHM for the 4 broad bands, and xoffset, yoffset between the bands
The TVAC wavelengths are not measured for all filters, but are the preferred value if available.
Args:
filter_file (str): file name of the filter file
band (str): name of the filter band
Returns:
list: [central wavelength of the filter band, fwhm, xoffset, yoffset]
"""
if filter_file is None:
filter_file = os.path.join(os.path.dirname(__file__), "data", "spectroscopy", "CGI_bandpass_centers.csv")
data = ascii.read(filter_file, format = 'csv', data_start = 1)
filter_names = data.columns[0]
band = band.upper()
if band not in filter_names:
raise ValueError("{0} is not in table band names {1}".format(band, filter_names))
ret_list = []
if data.columns[2][filter_names == band]:
cen_wave = data.columns[2][filter_names == band][0]
else:
cen_wave = data.columns[1][filter_names == band][0]
ret_list.append(cen_wave)
for i in range(3, 6):
ret_list.append(data.columns[i][filter_names == band][0])
return ret_list
[docs]
def estimate_dispersion_clocking_angle(xpts, ypts, weights):
"""
Estimate the clocking angle of the dispersion axis based on the centroids of
the sub-band filter PSFs.
Args:
xpts (numpy.ndarray): Array of x coordinates in EXCAM pixels
ypts (numpy.ndarray): Array of y coordinates in EXCAM pixels
weights (numpy.ndarray): Array of weights for line fit
Returns:
clocking_angle, clocking_angle_uncertainty
"""
linear_fit, V = np.polyfit(ypts, xpts, deg=1, w=weights, cov=True)
theta = np.arctan(1/linear_fit[0])
if theta > 0:
clocking_angle = np.rad2deg(theta - np.pi)
else:
clocking_angle = np.rad2deg(theta)
clocking_angle_uncertainty = np.abs(np.rad2deg(np.arctan(linear_fit[0] + np.sqrt(V[0,0]))) -
np.rad2deg(np.arctan(linear_fit[0] - np.sqrt(V[0,0])))) / 2
return clocking_angle, clocking_angle_uncertainty
[docs]
def fit_dispersion_polynomials(wavlens, xpts, ypts, cent_errs, clock_ang, ref_wavlen, pixel_pitch_um=13.0):
"""
Given arrays of wavelengths and positions, fit two polynomials:
1. Displacement from a reference wavelength along the dispersion axis,
in millimeters as a function of wavelength
2. Wavelength as a function of displacement along the dispersion axis
Args:
wavlens (numpy.ndarray): Array of wavelengths corresponding to the
centroid data points
xpts (numpy.ndarray): Array of x coordinates in EXCAM pixels
ypts (numpy.ndarray): Array of y coordinates in EXCAM pixels
cent_errs (numpy.ndarray): Array of centroid uncertainties in EXCAM pixels
clock_ang (float): Clocking angle of the dispersion axis in degrees
ref_wavlen (float): Reference wavelength of the bandpass, in nanometers
pixel_pitch_um (float): EXCAM pixel pitch in microns
Returns:
pfit_pos_vs_wavlen (numpy.ndarray): polynomial coefficients for the
position vs wavelength fit
cov_pos_vs_wavlen (numpy.ndarray): covariance matrix of the polynomial
coefficients for the position vs wavelength fit
pfit_wavlen_vs_pos (numpy.ndarray): polynomial coefficients for the
wavelength vs position fit
cov_wavlen_vs_pos (numpy.ndarray): covariance matrix of the polynomial
coefficients for the wavelength vs position fit
"""
pixel_pitch_mm = pixel_pitch_um * 1E-3
# Rotate the centroid coordinates so the dispersion axis is horizontal
# to define a rotation pivot point, select the filter closest to the nominal
# zero deviation wavelength
refidx = np.argmin(np.abs(wavlens - ref_wavlen))
(x_rot, y_rot) = rotate_points((xpts, ypts), -np.deg2rad(clock_ang),
pivot_point = (xpts[refidx], ypts[refidx]))
# Fit an intermediate polynomial to wavelength versus position
delta_x = (x_rot - x_rot[refidx]) * pixel_pitch_mm
pos_err = cent_errs * pixel_pitch_mm
weights = 1 / pos_err
lambda_func_x = np.poly1d(np.polyfit(x = delta_x, y = wavlens, deg = 2, w = weights))
# Determine the position at the reference wavelength
poly_roots = (np.poly1d(lambda_func_x) - ref_wavlen).roots
real_roots = poly_roots[np.isreal(poly_roots)]
root_select_ind = np.argmin(np.abs(poly_roots[np.isreal(poly_roots)]))
pos_ref = np.real(real_roots[root_select_ind])
np.testing.assert_almost_equal(lambda_func_x(pos_ref), ref_wavlen)
displacements_mm = delta_x - pos_ref
# Fit two polynomials:
# 1. Displacement from the band center along the dispersion axis as a
# function of wavelength
# 2. Wavelength as a function of displacement along the dispersion axis
(pfit_pos_vs_wavlen,
cov_pos_vs_wavlen) = np.polyfit(x = (wavlens - ref_wavlen) / ref_wavlen,
y = displacements_mm, deg = 3, w = weights, cov=True)
(pfit_wavlen_vs_pos,
cov_wavlen_vs_pos) = np.polyfit(x = displacements_mm, y = wavlens, deg = 3,
w = weights, cov=True)
return pfit_pos_vs_wavlen, cov_pos_vs_wavlen, pfit_wavlen_vs_pos, cov_wavlen_vs_pos
[docs]
def calibrate_dispersion_model(centroid_psf, spec_filter_offset, band_center_file = None, pixel_pitch_um = 13.0):
"""
Generate a DispersionModel of the spectral dispersion profile of the CGI ZOD prism.
Args:
centroid_psf (data.SpectroscopyCentroidPsf): instance of SpectroscopyCentroidPsf calibration class
spec_filter_offset (data.SpecFilterOffset): instance of SpecFilterOffset calibration class
band_center_file (str): file name of the band centers, optional, default is in data/spectroscopy
pixel_pitch_um (float): EXCAM pixel pitch in micron, default 13 micron
Returns:
data.DispersionModel: DispersionModel calfile object with the fit results including errors of the spectral trace and the dispersion
"""
prism = centroid_psf.ext_hdr['DPAMNAME']
if prism not in ['PRISM2', 'PRISM3']:
raise ValueError("prism must be PRISM2 or PRISM3")
#PRISM2 not yet available
if prism == 'PRISM2':
subband_list = ['2A', '2B', '2C', '2F']
ref_cfam = '2'
ref_wavlen = 660.
else:
subband_list = ['3A', '3B', '3C', '3D', '3E', '3G']
ref_cfam = '3'
ref_wavlen = 730.
##bandpass_frac = fwhm/cen_wave, needed for the wavelength calibration
band_center, fwhm, _, _ = read_cent_wave(ref_cfam, filter_file = band_center_file)
#needed to consider the position offsets between different filters
offset_band = spec_filter_offset.get_offsets(ref_cfam)
xoff_band = offset_band[0]
yoff_band = offset_band[1]
bandpass_frac = fwhm/band_center
if 'FILTERS' not in centroid_psf.ext_hdr:
raise AttributeError("there should be a FILTERS header keyword in the filtersweep SpectroscopyCentroidPsf")
filters = centroid_psf.ext_hdr['FILTERS'].upper().split(",")
center_wavel = []
xoff = []
yoff = []
for band in filters:
band_str = band.strip()
if band_str == ref_cfam:
pass
elif band_str not in subband_list:
warnings.warn("measured band {0} is not in the sub band list {1} of the used prism".format(band_str, subband_list))
else:
cen_wave = read_cent_wave(band_str, filter_file = band_center_file)
center_wavel.append(cen_wave[0])
offset_sub = spec_filter_offset.get_offsets(band_str)
xoff.append(offset_sub[0] - xoff_band)
yoff.append(offset_sub[1] - yoff_band)
if len(center_wavel) < 4:
raise ValueError ("number of measured sub-bands {0} is too small to model the dispersion".format(len(center_wavel)))
if len(center_wavel) != len(centroid_psf.xfit) -1:
raise ValueError ("number of measured sub-bands {0} does not fit to the measured number of centroids {1}".format(len(center_wavel), len(centroid_psf.xfit)))
center_wavel = np.array(center_wavel)
xfit = centroid_psf.xfit[:-1] - np.array(xoff)
yfit = centroid_psf.yfit[:-1] - np.array(yoff)
xfit_err = centroid_psf.xfit_err[:-1]
yfit_err = centroid_psf.yfit_err[:-1]
(clocking_angle,
clocking_angle_uncertainty) = estimate_dispersion_clocking_angle(xfit, yfit, weights = 1. / yfit_err)
(pfit_pos_vs_wavlen, cov_pos_vs_wavlen,
pfit_wavlen_vs_pos, cov_wavlen_vs_pos) = fit_dispersion_polynomials(
center_wavel, xfit, yfit,
yfit_err, clocking_angle, ref_wavlen, pixel_pitch_um = pixel_pitch_um
)
disp_dict = {"clocking_angle": clocking_angle,
"clocking_angle_uncertainty": clocking_angle_uncertainty,
"pos_vs_wavlen_polycoeff": pfit_pos_vs_wavlen,
"pos_vs_wavlen_cov": cov_pos_vs_wavlen,
"wavlen_vs_pos_polycoeff": pfit_wavlen_vs_pos,
"wavlen_vs_pos_cov": cov_wavlen_vs_pos}
pri_hdr = centroid_psf.pri_hdr.copy()
ext_hdr = centroid_psf.ext_hdr.copy()
ext_hdr["REFWAVE"] = ref_wavlen
ext_hdr["BAND"] = ref_cfam
ext_hdr["BANDFRAC"] = bandpass_frac
ext_hdr["CFAMNAME"] = ref_cfam
corgi_dispersion_profile = DispersionModel(
disp_dict, pri_hdr = pri_hdr, ext_hdr = ext_hdr
)
return corgi_dispersion_profile
[docs]
def create_wave_cal(disp_model, wave_zeropoint, pixel_pitch_um=13.0, ntrials = 1000):
"""
Create a wavelength calibration map and a wavelength-position lookup table,
given a dispersion model and a wavelength zero-point
Args:
disp_model (data.DispersionModel): Dispersion model calibration object
wave_zeropoint (dict): Wavelength zero-point dictionary
pixel_pitch_um (float): EXCAM pixel pitch in microns
ntrials (int): number of trials when applying a Monte Carlo error propagation to estimate the uncertainties of the
values in the wavelength calibration map
Returns:
wavlen_map (numpy.ndarray): 2-D wavelength calibration map. Each image
pixel value is a wavelength in units of nanometers, computed for the
dispersion profile, zero-point position, coordinates, and image shape
specified in the input wavelength zero-point object.
wavlen_uncertainty (numpy.ndarray): 2-D array of wavelength calibration map
uncertainty values in units of nanometers.
pos_lookup_table (astropy.table.Table): Wavelength-to-position
lookup table, computed for the dispersion profile, zero-point position,
coordinates, and image shape specified in the input wavelength
zero-point object. The table contains 5 columns: wavelength, x, x
uncertainty, y, y uncertainty.
x_refwav, y_refwave (float): coordinates of the source at the reference wavelength
"""
ref_wavlen = disp_model.ext_hdr["REFWAVE"]
#bandpass_frac = fwhm/cen_wave
bandpass_frac = disp_model.ext_hdr["BANDFRAC"]
pos_vs_wavlen_poly = np.poly1d(disp_model.pos_vs_wavlen_polycoeff)
wavlen_vs_pos_poly = np.poly1d(disp_model.wavlen_vs_pos_polycoeff)
wavlen_c = ref_wavlen
d_zp_mm = pos_vs_wavlen_poly((wave_zeropoint.get('wavlen') - wavlen_c) / wavlen_c)
pixel_pitch_mm = pixel_pitch_um * 1E-3
theta = np.deg2rad(disp_model.clocking_angle)
x_refwav, y_refwav = (wave_zeropoint.get('x') - d_zp_mm * np.cos(theta) / pixel_pitch_mm,
wave_zeropoint.get('y') - d_zp_mm * np.sin(theta) / pixel_pitch_mm)
yy, xx = np.indices((wave_zeropoint.get('shapex'), wave_zeropoint.get('shapey')))
dd_mm = (xx - x_refwav) * np.cos(theta) + (yy - y_refwav) * np.sin(theta) * pixel_pitch_mm
wavlen_map = wavlen_vs_pos_poly(dd_mm)
delta_wav = 0.5
n_wav = int(ref_wavlen * bandpass_frac / delta_wav)
n_wav_odd = n_wav + (n_wav % 2 == 0) # force odd array length
wavlen_beg = ref_wavlen - n_wav_odd // 2 * delta_wav
wavlen_end = ref_wavlen + n_wav_odd // 2 * delta_wav
wavlens = np.linspace(wavlen_beg, wavlen_end, n_wav_odd)
#np.testing.assert_almost_equal(wavlens[n_wav_odd // 2], ref_wavlen)
# Use a Monte Carlo error propagation to estimate the uncertainties of the
# values in the wavelength calibration map and the position lookup table.
polyfit_order = len(disp_model.pos_vs_wavlen_polycoeff) - 1
prand_wavlen_pos = np.zeros((ntrials, polyfit_order + 1))
prand_pos_wavlen = np.zeros((ntrials, polyfit_order + 1))
# Add the wavelength zero-point position error to the dispersion profile uncertainty
d_zp_err_mm = np.sqrt((wave_zeropoint.get('xerr') * np.cos(theta))**2 + (wave_zeropoint.get('yerr') * np.sin(theta))**2) * pixel_pitch_mm
# To translate the position uncertainty to wavelength uncertainty, use the second coefficient of the
# the wavelength(x) polynomial, which is the linear dispersion coefficient (units nm/mm).
d_zp_err_nm = disp_model.wavlen_vs_pos_polycoeff[2] * d_zp_err_mm
disp_model.pos_vs_wavlen_cov[polyfit_order, polyfit_order] += d_zp_err_mm**2
disp_model.wavlen_vs_pos_cov[polyfit_order, polyfit_order] += d_zp_err_nm**2
# Generate random polynomial coefficients consistent with the covariance
# matrix in the dispersion profile.
for ii in range(ntrials):
prand_wavlen_pos[ii] = np.random.multivariate_normal(disp_model.wavlen_vs_pos_polycoeff, cov=disp_model.wavlen_vs_pos_cov)
prand_pos_wavlen[ii] = np.random.multivariate_normal(disp_model.pos_vs_wavlen_polycoeff, cov=disp_model.pos_vs_wavlen_cov)
ws = (wavlens - wavlen_c) / wavlen_c
ds_mm = pos_vs_wavlen_poly(ws)
ds_vander = np.vander(ds_mm, N=polyfit_order+1, increasing=False)
ws_vander = np.vander(ws, N=polyfit_order+1, increasing=False)
wavlen_rand_eval = prand_wavlen_pos.dot(ds_vander.T)
pos_rand_eval = prand_pos_wavlen.dot(ws_vander.T)
pos_eval_std = np.std(pos_rand_eval, axis=0)
wavlen_eval_std = np.std(wavlen_rand_eval, axis=0)
pos_vs_wavlen_err_func = interp1d(wavlens, pos_eval_std, fill_value="extrapolate")
wavlen_vs_pos_err_func = interp1d(ds_mm, wavlen_eval_std, fill_value="extrapolate")
# Wavelength uncertainty map
wavlen_uncertainty_map = wavlen_vs_pos_err_func(dd_mm)
# Build the position lookup table
ds_eval = pos_vs_wavlen_poly((wavlens - wavlen_c) / wavlen_c) / pixel_pitch_mm
xs_eval, ys_eval = (x_refwav + ds_eval * np.cos(theta),
y_refwav + ds_eval * np.sin(theta))
xs_uncertainty, ys_uncertainty = (np.abs(pos_vs_wavlen_err_func(wavlens) / pixel_pitch_mm * np.cos(theta)),
np.abs(pos_vs_wavlen_err_func(wavlens) / pixel_pitch_mm * np.sin(theta)))
pos_lookup_table = Table((wavlens, xs_eval, xs_uncertainty, ys_eval, ys_uncertainty),
names=('Wavelength (nm)', 'x (column)', 'x uncertainty', 'y (row)', 'y uncertainty'))
return wavlen_map, wavlen_uncertainty_map, pos_lookup_table, x_refwav, y_refwav
[docs]
def get_shift_correlation(
img_data,
img_template,
):
""" Find the array shift that maximizes the phase correlation between two
images.
Args:
img_data (array): first two dimensional array.
img_template (array): second two dimensional array. Its size must be the same or
less than img1, because img2 is the noiseless template used to find the
spectrum on the L2b data and it is a cropped frame.
Returns:
Image shift in image pixels that maximizes the phase correlation of the
first image with the second one.
"""
if np.any(img_data.shape < img_template.shape):
raise Exception('The template image cannot have a larger size then the data one')
# Pad img_template to be the same size as img_data
img2 = np.zeros_like(img_data)
img2[img_data.shape[0]//2-img_template.shape[0]//2:img_data.shape[0]//2+img_template.shape[0]//2 + 1,
img_data.shape[1]//2-img_template.shape[1]//2:img_data.shape[1]//2+img_template.shape[1]//2 + 1] = img_template
dft1 = np.fft.fftshift(np.fft.fft2(img_data))
dft2 = np.fft.fftshift(np.fft.fft2(img2))
# Cross-power spectrum (Add epsilon to avoid division by zero, from Google Labs)
R = (dft1 * np.conj(dft2)) / (np.abs(dft1 * np.conj(dft2)) + 1e-10)
# Inverse FFT: Imaginary part are numerical residuals. The original data are real.
poc_real = np.real(np.fft.ifft2(R))
# Find the peak location (shift)
shift = np.unravel_index(np.argmax(np.abs(poc_real)), poc_real.shape)
return shift
[docs]
def star_spec_registration(
dataset_fsm,
pathfiles_template,
slit_align_err=0,
halfheight=40):
""" This function addresses:
CGI-REQT-5465 – Given (1) a series of cleaned images of a prism-dispersed
unocculted star observed through the FSAM slit mask, observed with the
same CFAM filter, and acquired over a grid of FSM offsets and (2) an
estimate of the spectroscopic target source position on EXCAM and its
alignment error from the FSAM slit, the CTC GSW should identify the
dispersed star image whose PSF-to-FSAM slit alignment most closely matches
that of the target source.
NOTE: This calibration is repeated for each roll angle in the observation
campaign
Args:
dataset_fsm (Dataset): Dataset containing a series of L2b cleaned images of a
prism-dispersed unocculted star observed through the FSAM slit mask,
observed with the same CFAM filter, and acquired over a grid of FSM
offsets. By default, the grid of FSM offsets spans a 3×3 FSM offset grid.
Each of the L2b images must have the following header keywords:
– FSMX, FSMY (float64)
– CFAMNAME (same for all images)
– FSAMNAME = OPEN, R1C2, R6C5, R3C1
pathfiles_template (array): array of path and filenames containing the
simulated star spectrum that are used as a template to find the image
in dataset_fsm that best matches it.
slit_align_err (float64): Distance between the source and the center of
the slit aperture, measured along the narrow axis of the slit aperture,
in units of mas. It is determined after each observation by
looking at the data.
halfheight: 1/2 the height of the box used for the fit.
Returns:
Filenames with the star image whose PSF-to-FSAM slit alignment most
closely matches that of the target source.
"""
# Confirm spectroscopy configuration for different PAMs
# CFAM
cfam_name = dataset_fsm[0].ext_hdr['CFAMNAME'].upper()
if cfam_name.find('3') != -1:
dpam_name = 'PRISM3'
# fsam_name = []
elif cfam_name.find('2') != -1:
dpam_name = 'PRISM2'
# fsam_name = []
else:
raise ValueError(f'{cfam_name} is not a spectroscopy filter')
# DPAM
if dataset_fsm[0].ext_hdr['DPAMNAME'] != dpam_name:
raise ValueError(f'DPAMNAME should be {dpam_name}')
# FPAM
fpam_name = dataset_fsm[0].ext_hdr['FPAMNAME'].upper()
if (fpam_name != 'OPEN' and fpam_name != 'ND225' and fpam_name != 'ND475'):
raise ValueError('FPAMNAME should be either OPEN, ND225 or ND475')
# SPAM
spam_name = dataset_fsm[0].ext_hdr['SPAMNAME'].upper()
if spam_name[0:4] != 'SPEC':
raise ValueError('SPAMNAME should be SPEC')
# LSAM
lsam_name = dataset_fsm[0].ext_hdr['LSAMNAME'].upper()
if lsam_name[0:4] != 'SPEC':
raise ValueError('LSAMNAME should be SPEC')
# FSAM
fsam_name = dataset_fsm[0].ext_hdr['FSAMNAME'].upper()
if (fsam_name != 'OPEN' and fsam_name != 'R1C2' and fsam_name != 'R6C5' and
fsam_name != 'R3C1'):
raise ValueError('FSAMNAME should be either OPEN, R1C2, R6C5 or R3C1')
# All images must have the same setup
for img in dataset_fsm:
exthdr = img.ext_hdr
assert exthdr['CFAMNAME'].upper() == cfam_name, f"CFAMNAME={exthdr['CFAMNAME']} differs from expected value: {cfam_name}"
assert exthdr['DPAMNAME'].upper() == dpam_name, f"DPAMNAME={exthdr['DPAMNAME']} differs from expected value: {dpam_name}"
assert exthdr['FPAMNAME'].upper() == fpam_name, f"FPAMNAME={exthdr['FPAMNAME']} differs from expected value: {fpam_name}"
assert exthdr['SPAMNAME'].upper() == spam_name, f"SPAMNAME={exthdr['SPAMNAME']} differs from expected value: {spam_name}"
assert exthdr['LSAMNAME'].upper() == lsam_name, f"LSAMNAME={exthdr['LSAMNAME']} differs from expected value: {lsam_name}"
assert exthdr['FSAMNAME'].upper() == fsam_name, f"FSAMNAME={exthdr['FSAMNAME']} differs from expected values: {fsam_name}"
# Confirm presence of FSMX, FSMY
assert 'FSMX' in exthdr.keys() and 'FSMY' in exthdr.keys(), 'Missing FSMX/Y'
# Templates
yoffset_arr = []
for file in pathfiles_template:
# Make sure that all template files exist
if os.path.exists(file) == False:
raise Exception(f'Template file {file} not found.')
# Collect FSAM offsets
yoffset_arr += [fits.open(file)[0].header['FSM_OFF']]
# Find closest template offset to the one measured in the data
slit_idx = int(np.abs(slit_align_err - yoffset_arr).argmin())
# Template data
temp = fits.open(pathfiles_template[slit_idx])[0]
temp_data = temp.data
# Associated zeropoint
try:
wv0_x = temp.header['WV0_X']
except:
raise ValueError(f'WV0_X missing from {pathfiles_template[slit_idx]:s}')
try:
wv0_y = temp.header['WV0_Y']
except:
raise ValueError(f'WV0_Y missing from {pathfiles_template[slit_idx]:s}')
# Split FSM dataset according to their FSM values
dataset_list, fsm_values = dataset_fsm.split_dataset(exthdr_keywords=['FSMY'])
# Combine frames with the same FSMY to increase SNR before the analysis
fsm_combined = []
for dataset in dataset_list:
fsm_tmp = []
for img in dataset:
fsm_tmp += [img.data]
fsm_combined += [np.median(np.array(fsm_tmp), axis=0)]
# Find best PSF centroid fit for each image compared to the template
# Cost function: Start with any large value that cannot happen. P.S. Units
# are EXCAM pixels
zeropt_dist = 1e8
idx_best = None
# cross-correlate data with expected slit error with template
shift = get_shift_correlation(fsm_combined[slit_idx], temp_data)
for idx_img, img in enumerate(fsm_combined):
# Bring img_data on top of img_template
img = np.roll(img, (-shift[0], -shift[1]), axis=(0,1))
# Crop it to match img2 size
img_cropped = img[img.shape[0]//2-temp_data.shape[0]//2:img.shape[0]//2+temp_data.shape[0]//2+1,
img.shape[1]//2-temp_data.shape[1]//2:img.shape[1]//2+temp_data.shape[1]//2+1]
# Find best centroid
x_fit, y_fit = fit_psf_centroid(img_cropped, temp_data,
xcent_template = wv0_x,
ycent_template = wv0_y,
halfheight = halfheight)[0:2]
# best-matching image is wrt zero-point
zeropt_dist_img = np.sqrt((x_fit - wv0_x)**2 + (y_fit - wv0_y)**2)
# Keep track of absolute minimum
if zeropt_dist_img < zeropt_dist:
zeropt_dist = zeropt_dist_img
idx_best = idx_img
# Check that there's at least one solution
assert idx_best != None, 'No suitable best image found.'
# List of filenames of frames with the same FSM value that best matches the
# stellar template
list_of_best_fsm = []
for img in dataset_list[idx_best]:
list_of_best_fsm += [img.filename]
return list_of_best_fsm
[docs]
def fit_line_spread_function(dataset, halfwidth = 2, halfheight = 9, guess_fwhm = 15.):
"""
Fit the line spread function to a wavelength calibrated (averaged) dataset, by reading
the wavelength map extension and wavelength zeropoint header
Args:
dataset (corgidrp.data.Dataset): dataset containing a narrowband filter + prism PSF
halfwidth (int): The width of the fitting region is 2 * halfwidth + 1 pixels.
halfheight (int): The height of the fitting region is 2 * halfheight + 1 pixels.
guess_fwhm (float): guess value of the fwhm of the line
Returns:
corgidrp.data.LineSpread: LineSpread object containing
wavlens (numpy.ndarray)
flux_profile (numpy.ndarray)
fwhm_fit (float)
mean_fit (float)
peak_fit (float)
"""
# Assumed that only narrowband filter (includes sat spots) frames are taken to fit the line spread function LSF
narrow_dataset, band = dataset.split_dataset(exthdr_keywords=["CFAMNAME"])
band = np.array([s.upper() for s in band])
if "3D" in band:
nar_dataset = narrow_dataset[int(np.nonzero(band == "3D")[0].item())]
elif "2C" in band:
nar_dataset = narrow_dataset[int(np.nonzero(band == "2C")[0].item())]
else:
raise AttributeError("No narrowband frames found in input dataset")
wave = []
wave_err = []
fwhm = []
fwhm_err = []
peak = []
peak_err = []
wavlens = []
flux_profile = []
for image in nar_dataset:
xcent_round, ycent_round = (int(np.rint(image.ext_hdr["WV0_X"])), int(np.rint(image.ext_hdr["WV0_Y"])))
image_cutout = image.data[ycent_round - halfheight:ycent_round + halfheight + 1,
xcent_round - halfwidth:xcent_round + halfwidth + 1]
dq_cutout = image.dq[ycent_round - halfheight:ycent_round + halfheight + 1,
xcent_round - halfwidth:xcent_round + halfwidth + 1]
wave_cal_map_cutout = image.hdu_list["WAVE"].data[ycent_round - halfheight:ycent_round + halfheight + 1,
xcent_round - halfwidth:xcent_round + halfwidth + 1]
bad_ind = np.where(dq_cutout > 0)
image_cutout[bad_ind] = np.nan
flux_p = np.nansum(image_cutout, axis=1) / np.nansum(image_cutout)
wav = np.mean(wave_cal_map_cutout, axis=1)
flux_profile.append(flux_p)
wavlens.append(wav)
g_init = models.Gaussian1D(amplitude = np.max(flux_p),
mean = wav[halfheight],
stddev = guess_fwhm/(2 * np.sqrt(2*np.log(2))))
fit_g = fitting.LevMarLSQFitter(calc_uncertainties=True)
g_func = fit_g(g_init, x = wav, y = flux_p)
fwhm.append(2 * np.sqrt(2*np.log(2)) * g_func.stddev.value)
wave.append(g_func.mean.value)
peak.append(g_func.amplitude.value)
errors = np.diagonal(fit_g.fit_info.get("param_cov"))
peak_err.append(errors[0])
wave_err.append(errors[1])
fwhm_err.append(errors[2] * 8 * np.log(2))
mean_peak = np.mean(np.array(peak))
mean_fwhm = np.mean(np.array(fwhm))
mean_wave = np.mean(np.array(wave))
mean_peak_err = np.sqrt(np.sum(np.array(peak_err)))
mean_wave_err = np.sqrt(np.sum(np.array(wave_err)))
mean_fwhm_err = np.sqrt(np.sum(np.array(fwhm_err)))
mean_flux_profile = np.mean(np.array(flux_profile), axis = 0)
mean_wavlens = np.mean(np.array(wavlens), axis = 0)
prihdr = nar_dataset[0].pri_hdr.copy()
exthdr = nar_dataset[0].ext_hdr.copy()
ls_data = np.array([mean_wavlens, mean_flux_profile])
gauss_profile = np.array([mean_peak, mean_wave, mean_fwhm, mean_peak_err, mean_wave_err, mean_fwhm_err])
line_spread = LineSpread(ls_data, pri_hdr = prihdr, ext_hdr = exthdr, gauss_par = gauss_profile, input_dataset = nar_dataset)
return line_spread
[docs]
def slit_transmission(
dataset_slit,
dataset_open,
target_pix=None,
x_range=[40.,42],
y_range=[32.,34],
n_gridx=10,
n_gridy=10,
kind='linear',
average='mean',
):
""" This step function addresses:
CGI-REQT-5475 – Given (1) a series of cleaned images of a prism-dispersed
unocculted star observed through the FSAM slit mask, observed with a CFAM
filter, and acquired over one or more FSM offsets, (2) a series of cleaned
images of the same prism-dispersed unocculted star observed with the FSAM
slit mask removed (FSAM in OPEN position), the same CFAM filter, and
acquired over one or more FSM offsets, the CTC GSW should compute the slit
transmission map.
Args:
dataset_slit (Dataset): Dataset containing a set of extracted spectra for
some set of FSM positions with the FSAM slit in its position. There can
be a different number of frames for each FSM position.
dataset_open (Dataset): Dataset containing a set of extracted spectra for
some set of FSM positions with the FSAM slit in OPEN position. There can
be a different number of frames for each FSM position.
target_pix (array) (optional): a user-defined Mx2 array containing the
pixel positions for M target pixels where the slit transmission will be
derived by interpolation. The target pixels are measured with respect
the zero-point in (fractional) EXCAM pixels. Default is None. In this
case, a rectangular grid of pixel positions is used.
x_range (array): Two values [xmin, xmax] specifying the range of pixels to
be considered. Units are EXCAM pixels measured with respect the zero-point
solution along EXCAM +X direction.
y_range (array): Two values [ymin, ymax] specifying the range of pixels to
be considered. Units are EXCAM pixels measured with respect the zero-point
solution along EXCAM +Y direction.
n_gridx (int) (optional): Number of positions when pos_range is set.
n_gridy (int) (optional): Number of positions when pos_range is set.
kind (string): Specifies the kind of interpolation. See scipy documentation.
Default is piecewise linear.
average (str): The type of average (first momentum) applied to each subset
of spectra. The slitless spectra are all averaged at once regardless of
their FSMX, FSMY values. The spectra with the slit in are averaged over
subsets with the same FSMX, FSMY values. Options are 'mean' and 'median'.
Returns:
SlitTransmission calibration product containing:
1/ Slit transmission map derived at different locations by interpolation.
2/ Corresponding locations along EXCAM +X direction with respect to the
zero-point in (fractional) EXCAM pixels where the slit transmission has
been derived.
3/ Corresponding locations along EXCAM +Y direction with respect to the
zero-point in (fractional) EXCAM pixels where the slit transmission has
been derived.
"""
# Confirm spectroscopy configuration for different PAMs
# CFAM
cfam_name = dataset_slit[0].ext_hdr['CFAMNAME'].upper()
if cfam_name.find('3') != -1:
dpam_name = 'PRISM3'
# fsam_name = []
elif cfam_name.find('2') != -1:
dpam_name = 'PRISM2'
# fsam_name = []
else:
raise ValueError(f'{cfam_name} is not a spectroscopy filter')
# DPAM
if dataset_slit[0].ext_hdr['DPAMNAME'] != dpam_name:
raise ValueError(f'DPAMNAME should be {dpam_name}')
# FPAM
fpam_name = dataset_slit[0].ext_hdr['FPAMNAME'].upper()
if (fpam_name != 'OPEN' and fpam_name != 'ND225' and fpam_name != 'ND475'):
raise ValueError('FPAMNAME should be either OPEN, ND225 or ND475')
# SPAM
spam_name = dataset_slit[0].ext_hdr['SPAMNAME'].upper()
if spam_name[0:4] != 'SPEC':
raise ValueError('SPAMNAME should be SPEC')
# LSAM
lsam_name = dataset_slit[0].ext_hdr['LSAMNAME'].upper()
if lsam_name[0:4] != 'SPEC':
raise ValueError('LSAMNAME should be SPEC')
# FSAM: slit in
fsam_name = dataset_slit[0].ext_hdr['FSAMNAME'].upper()
if (fsam_name != 'R1C2' and fsam_name != 'R6C5' and fsam_name != 'R3C1'):
raise ValueError('FSAMNAME with the slit in must be either R1C2, R6C5 or R3C1')
# FSAM: slitless
if dataset_open[0].ext_hdr['FSAMNAME'] != 'OPEN':
raise ValueError('FSAMNAME must be OPEN for slitless observations.')
# All images with the slit in must have the same setup
for image in dataset_slit:
exthdr = image.ext_hdr
assert exthdr['CFAMNAME'].upper() == cfam_name, f"CFAMNAME={exthdr['CFAMNAME']} differs from expected value: {cfam_name}"
assert exthdr['DPAMNAME'].upper() == dpam_name, f"DPAMNAME={exthdr['DPAMNAME']} differs from expected value: {dpam_name}"
assert exthdr['FPAMNAME'].upper() == fpam_name, f"FPAMNAME={exthdr['FPAMNAME']} differs from expected value: {fpam_name}"
assert exthdr['SPAMNAME'].upper() == spam_name, f"SPAMNAME={exthdr['SPAMNAME']} differs from expected value: {spam_name}"
assert exthdr['LSAMNAME'].upper() == lsam_name, f"LSAMNAME={exthdr['LSAMNAME']} differs from expected value: {lsam_name}"
assert exthdr['FSAMNAME'].upper() == fsam_name, f"FSAMNAME={exthdr['FSAMNAME']} differs from expected value: {fsam_name}"
# All images without the slit must have the same setup as with the slit, but
# for FSAMNAME=OPEN
for image in dataset_open:
exthdr = image.ext_hdr
assert exthdr['CFAMNAME'].upper() == cfam_name, f"CFAMNAME={exthdr['CFAMNAME']} differs from expected value: {cfam_name}"
assert exthdr['DPAMNAME'].upper() == dpam_name, f"DPAMNAME={exthdr['DPAMNAME']} differs from expected value: {dpam_name}"
assert exthdr['FPAMNAME'].upper() == fpam_name, f"FPAMNAME={exthdr['FPAMNAME']} differs from expected value: {fpam_name}"
assert exthdr['SPAMNAME'].upper() == spam_name, f"SPAMNAME={exthdr['SPAMNAME']} differs from expected value: {spam_name}"
assert exthdr['LSAMNAME'].upper() == lsam_name, f"LSAMNAME={exthdr['LSAMNAME']} differs from expected value: {lsam_name}"
# It can only be OPEN
assert exthdr['FSAMNAME'].upper() == 'OPEN', f"FSAMNAME={exthdr['FSAMNAME']} differs from expected value: OPEN"
# Split each subset with the slit in by FSMX/Y values (FSM values are not used)
dataset_slit_subsets = []
dataset_slit_y = dataset_slit.split_dataset(exthdr_keywords=['FSMY'])[0]
for ds_1 in dataset_slit_y:
dataset_slit_subsets += ds_1.split_dataset(exthdr_keywords=['FSMX'])[0]
# Average all spectra of the images with FSAM=OPEN
if average.lower() == 'mean':
spec_open = np.mean([ds.hdu_list["SPEC"].data for ds in dataset_open], axis=0)
elif average.lower() == 'median':
spec_open = np.median([ds.hdu_list["SPEC"].data for ds in dataset_open], axis=0)
else:
raise ValueError(f'Averaging method {average} not recognized.')
# Average all spectra of the images with the slit in by FSM position and get
# the wavelength zero-point solution for each one
slit_trans_fsm = []
slit_pos_x = []
slit_pos_y = []
for subset in dataset_slit_subsets:
slit_pos_x += [subset[0].ext_hdr['WV0_X']]
slit_pos_y += [subset[0].ext_hdr['WV0_Y']]
if average.lower() == 'mean':
slit_trans_fsm += [np.mean([ds.hdu_list["SPEC"].data/spec_open for ds in subset], axis=0)]
# At this point average can only take 'mean' and 'median' values
else:
slit_trans_fsm += [np.median([ds.hdu_list["SPEC"].data/spec_open for ds in subset], axis=0)]
# Double check they all have the same length
slit_pos_x = np.array(slit_pos_x)
slit_pos_y = np.array(slit_pos_y)
slit_trans_fsm = np.array(slit_trans_fsm)
if not (len(slit_pos_x) == len(slit_pos_y) == len(slit_trans_fsm)):
raise ValueError('The lengths of distinct FSM positions and averaged spectra is different.')
# If there's only one position, there's no interpolation
if len(np.unique(slit_pos_y)) == len(np.unique(slit_pos_x)) == 1:
print('Only one unique position in the data. Returning slit transmission at that position.')
return (slit_trans_fsm,
slit_pos_x,
slit_pos_y)
# If no target pixels are provided, create a series
if target_pix == None:
# If the FSM images are along one direction:
if len(np.unique(slit_pos_x)) == 1:
x_tmp = np.ones(n_gridy) * np.unique(slit_pos_x)
y_tmp = np.linspace(y_range[0], y_range[1], n_gridy)
elif len(np.unique(slit_pos_y)) == 1:
x_tmp = np.linspace(x_range[0], x_range[1], n_gridx)
y_tmp = np.ones(n_gridx) * np.unique(slit_pos_y)
# If the FSM images span a 2-d grid:
else:
x_tmp = np.linspace(x_range[0], x_range[1], n_gridx)
y_tmp = np.linspace(y_range[0], y_range[1], n_gridy)
target_pix = np.array(np.meshgrid(x_tmp, y_tmp)).reshape(2, n_gridx*n_gridy)
# Derive slit transmission at desired locations
# 1-d cases: The positions along one of the slit dimensions is constant.
# P.S. scipy takes care of raising exceptions if there's any extrapolation
if len(np.unique(slit_pos_y)) == 1:
interpolant = interp1d(slit_pos_x, slit_trans_fsm, axis=0, kind=kind)
slit_trans_interp = interpolant(target_pix[0])
elif len(np.unique(slit_pos_x)) == 1:
interpolant = interp1d(slit_pos_y, slit_trans_fsm, axis=0, kind=kind)
slit_trans_interp = interpolant(target_pix[1])
else:
# 2-d grid:
try:
if kind.lower() != 'linear':
raise ValueError('Only linear interpolation is available for',
'two dimensional scattered data.')
else:
interpolator = LinearNDInterpolator(np.c_[slit_pos_x, slit_pos_y],
slit_trans_fsm)
slit_trans_interp = interpolator(target_pix[0], target_pix[1])
# If there's some extrapolation, redefine target points to be within limits
if np.sum(np.isnan(slit_trans_interp) == True):
raise ValueError('Some target points require extrapolation.'
'Make sure all target points are within the interpolator support.')
except:
raise ValueError('Not enough independent values to derive a slit transmission map')
# Raise ValueError if all values are NaN
if np.all(np.isnan(slit_trans_interp)):
raise ValueError('There are no valid target positions within the ' +
'range of input PSF locations')
pri_hdr, ext_hdr, _, _ = corgidrp.check.merge_headers(
dataset_slit,
any_true_keywords=['DESMEAR', 'CTI_CORR'],
invalid_keywords=[
'FRMTYPE',
'EACQ_ROW', 'EACQ_COL', 'SB_FP_DX', 'SB_FP_DY', 'SB_FS_DX', 'SB_FS_DY',
'Z2AVG', 'Z3AVG', 'Z4AVG', 'Z5AVG', 'Z6AVG', 'Z7AVG', 'Z8AVG', 'Z9AVG',
'Z10AVG', 'Z11AVG', 'Z12AVG', 'Z13AVG', 'Z14AVG',
'Z2RES', 'Z3RES', 'Z4RES', 'Z5RES', 'Z6RES', 'Z7RES', 'Z8RES', 'Z9RES',
'Z10RES', 'Z11RES',
'Z2VAR', 'Z3VAR',
'FWC_PP_E', 'FWC_EM_E', 'WV0_X', 'WV0_Y'
]
)
input_dataset = Dataset([frame for frame in dataset_slit] + [frame for frame in dataset_open])
slit_trans = SlitTransmission(slit_trans_interp, pri_hdr = pri_hdr, ext_hdr = ext_hdr, x_offset = target_pix[0], y_offset = target_pix[1], input_dataset = input_dataset)
return slit_trans
[docs]
def star_pos_spec(
dataset,
r_lamD=3,
phi_deg=0,
):
""" Find the position of the star using the information from the satellite
spot. The position of the satellite spot on EXCAM is given by the
zero-point solution. Using the information of the commanded position of
the satellite spot with respect the occulted star, one can infer the
location of the occulted star.
The relative of the satellite spot with respect the occulted star is given
in polar coordinates. The radial distance of the satellite spot is measured
in units lambda/D, with lambda the band reference wavelength, either 730 nm
(band 3) or 660 nm (band 2), and D=2.4 m. The polar angle is measured in
degrees, with 0 degrees meaning +X and 90 degrees meaning +Y. The polar
coordinates of the satellite spot are translated into (X,Y) EXCAM pixel
coordinates, which can then be subtracted from the zero-point solution to
infer the location of the occulted star.
Args:
dataset (Dataset): A Dataset with L3 spectroscopy frames.
r_lamD (float): Radial distance of the satellite spot on EXCAM with respect
the occulted star in units of lambda/D.
phi_deg (float): Polar angle of the satellite spot on EXCAM with respect
the occulted star in degrees, with 0 degrees meaning +X and 90 degrees
meaning +Y.
Returns:
Input Dataset with updated keywords recording the satellite position
in EXCAM pixels.
"""
# Primary diameter of Roman Space Telescope in meters
D_m=2.4
# Basic checks
if r_lamD < 0:
raise ValueError('r_lamD cannot be negative. Usual range is 3-20.')
dataset_cp = dataset.copy()
for img in dataset_cp:
# Check it is L3
if img.ext_hdr['DATALVL'] != 'L3':
raise ValueError(f"The data level must be L3 and it is {img.ext_hdr['DATALVL']}")
# Extract satellite spot wavelength from L3 extended header (it must be present)
try:
lam_sat_nm = float(img.ext_hdr['WAVLEN0'])
except:
raise ValueError(f'WAVLEN0 keyword missing in L3 frame.')
vistype = img.pri_hdr['VISTYPE']
# shift of star location only for coronagraphic observations
if vistype != 'CGIVST_CAL_SPEC_TGTREF':
# Conversion from EXCAM pixels to milliarsec
plate_scale_mas = img.ext_hdr['PLTSCALE']
# Conversion from radians to milliarsec (mas/rad)
rad2mas = 180/np.pi*3600*1e3
# lam/D in radians
lamDrad = 1e-9*lam_sat_nm/D_m
# lam/D to EXCAM pixels
r_pix = r_lamD*lamDrad*rad2mas/plate_scale_mas
# EXCAM (X,Y) coordinates
X_pix = r_pix * np.cos(phi_deg*np.pi/180)
Y_pix = r_pix * np.sin(phi_deg*np.pi/180)
# Update estimated location of the occulted star
img.ext_hdr['STARLOCX'] = img.ext_hdr['WV0_X'] - X_pix
img.ext_hdr['STARLOCY'] = img.ext_hdr['WV0_Y'] - Y_pix
else:
img.ext_hdr['STARLOCX'] = img.ext_hdr['WV0_X']
img.ext_hdr['STARLOCY'] = img.ext_hdr['WV0_Y']
return dataset_cp
[docs]
def spec_fluxcal(dataset_or_image, calspec_file = None):
"""
generates the SpecFluxCal calibration product for one band,
calculates the spectral flux calibration or spectro-photometric calibration, that describes the
sensitivity of the spectrometer, i.e. how an input power is
converted into how many photoelectrons per wavelength, with the final unit erg/(s * cm^2 * AA)/(photoelectron/s/bin).
The input is expected to be the dataset of an extracted spectrum of a
CALSPEC photometric standard star with units photoelectron/s/bin.
The band flux values of the input calspec data files are divided by
the spectral extracted photoelectrons/s/bin interpolated on the available wavelengths.
Propagates also errors to the spectral flux calibration file.
Parameters:
dataset_or_image (corgidrp.data.Dataset or corgidrp.data.Image): Image(s) to compute
the calibration factor. Should already be normalized for exposure time. Output of extract_spec.
calspec_file (str, optional): file path to the calspec fits file of the observed star.
If None, it is downloaded from the calspec database
Returns:
SpecFluxCal (corgidrp.data.SpecFluxCal): A calibration object containing the computed
flux calibration factor in units erg/(s * cm^2 * AA)/(photoelectron/s/bin)
"""
d_or_i = dataset_or_image.copy()
if isinstance(d_or_i, Dataset):
image = d_or_i[0]
dataset = d_or_i
else:
image = d_or_i
dataset = Dataset([image])
if image.ext_hdr['BUNIT'] != "photoelectron/s":
raise ValueError("input dataset must have unit photoelectron/s for the calibration, not {0}".format(image.ext_hdr['BUNIT']))
if "SPEC" not in image.hdu_names:
raise AttributeError("input dataset has no spectral extracted data and has not run through extract_spec")
filter_name = image.ext_hdr["CFAMNAME"]
filter_file = get_filter_name(image)
# Read filter and CALSPEC data.
wave, filter_trans = read_filter_curve(filter_file)
if calspec_file is not None:
calspec_filepath = calspec_file
calspec_filename = os.path.basename(calspec_file)
else:
star_name = image.pri_hdr["TARGET"]
calspec_filepath, calspec_filename = get_calspec_file(star_name)
flux_ref = read_cal_spec(calspec_filepath, wave)
#is this correct, do we need to consider the filter transmission at all?
flux = flux_ref #* filter_trans
if len(dataset) == 1:
spec = image.hdu_list["SPEC"].data
spec_dq = image.hdu_list["SPEC_DQ"].data
spec_err = image.hdu_list["SPEC_ERR"].data
spec_wave = image.hdu_list["SPEC_WAVE"].data
spec_wave_err = image.hdu_list["SPEC_WAVE_ERR"].data
else:
spec = []
spec_dq = []
spec_err = []
spec_wave = []
spec_wave_err = []
for frame in dataset:
spec.append(frame.hdu_list["SPEC"].data)
spec_dq.append(frame.hdu_list["SPEC_DQ"].data)
spec_err.append(frame.hdu_list["SPEC_ERR"].data)
spec_wave.append(frame.hdu_list["SPEC_WAVE"].data)
spec_wave_err.append(frame.hdu_list["SPEC_WAVE_ERR"].data)
spec = np.mean(np.array(spec),0)
spec_err = np.mean(np.array(spec_err),0)
spec_wave = np.mean(np.array(spec_wave), 0)
spec_wave_err = np.mean(np.array(spec_wave_err), 0)
spec_dq = np.bitwise_or.reduce(np.array(spec_dq),axis = 0)
#interpolate on the extracted wavelength in nm (AA/10)
wave_nm = wave/10.
inter = interp1d(wave_nm, flux, fill_value="extrapolate")
spec_flux = inter(spec_wave)/spec
spec_flux_err = spec_flux / spec**2 * spec_err[0]
data = np.array([spec_wave, spec_flux])
error = np.array([spec_wave_err, spec_flux_err])
spec_fluxcal_obj = SpecFluxCal(
data,
err=error,
dq = np.tile(spec_dq, (2,1)),
pri_hdr=image.pri_hdr,
ext_hdr=image.ext_hdr,
input_dataset=dataset
)
# Append to or create a HISTORY entry in the header.
history_entry = "spectral flux calibration was determined by spectral extraction using SED file {0}".format(calspec_filename)
spec_fluxcal_obj.ext_hdr.add_history(history_entry)
return spec_fluxcal_obj
[docs]
def generate_filter_offset(offset_file = None):
"""
read the csv filter file containing at least the band names and the pixel x/y offsets
between the filters and generate a new SpecFilterOffset product.
Args:
offset_file (str): file name of the filter file, if none it takes data/spectroscopy/CGI_bandpass_centers.csv
Returns:
corgidrp.data.SpecFilterOffset: SpecFilterOffset product
"""
if offset_file is None:
offset_file = os.path.join(os.path.dirname(__file__), "data", "spectroscopy", "CGI_bandpass_centers.csv")
table = ascii.read(offset_file, format = 'csv')
offset_dict = {}
for i, col in enumerate(table.colnames):
if "filter" in col:
filter_name = table.columns[i].value
if "xoffset" in col:
xoffset = table.columns[i].value
if "yoffset" in col:
yoffset = table.columns[i].value
for i, filter in enumerate(filter_name):
offset_dict[str(filter)] = [float(xoffset[i]), float(yoffset[i])]
return SpecFilterOffset(offset_dict)