import os
import numpy as np
from astropy.time import Time
from astropy.io import fits, ascii
import corgidrp
from corgidrp import astrom, data
[docs]
here = os.path.abspath(os.path.dirname(__file__))
[docs]
def get_cfam(
cfam_name='1F',
cfam_version=0,
):
""" Read CFAM filter wavelength in nm and transmission.
Args:
cfam_name (string): Filter in CFAM. For instance, '1F', '4A', '3B' or '2C'.
cfam_version (int): version number of the filters (CFAM, pupil, imaging
lens).
Returns:
CFAM filter wavelength in nm and transmission.
"""
datadir = os.path.join(here, 'data', 'filter_curves')
filter_names = os.listdir(datadir)
filter_name = [name for name in filter_names if name.find(cfam_name) >= 0]
if filter_name == []:
raise ValueError(f'there is no filter available with name {cfam_name}')
filter_name = [name for name in filter_name if f'v{cfam_version}' in name]
if filter_name == []:
raise ValueError(f'there is no filter {cfam_name} available with version {cfam_version}')
tab = ascii.read(os.path.join(datadir,filter_name[0]), format='csv',
header_start = 3, data_start = 4)
lambda_nm_filter = tab['lambda_nm'].data
trans_filter = tab['%T'].data / tab['%T'].data.max()
return lambda_nm_filter, trans_filter
[docs]
def di_over_pil_transmission(
cfam_name='1F',
cfam_version=0,
):
""" Derives the relative transmission between the pupil lens and the imaging
lens: trans_imaging/trans_pupil.
Multiplying the counts of the pupil image by this factor translates them
into equivalent counts of the direct imaging lens.
Args:
cfam_name (string): Filter in CFAM. For instance, '1F', '4A', '3B' or '2C'.
cfam_version (int): version number of the filters (CFAM, pupil, imaging
lens).
Returns:
Ratio trans_imaging/trans_pupil.
"""
# Read pupil and direct imaging lenses
try:
lambda_pupil_A, trans_pupil = np.loadtxt(os.path.join(here, 'data',
'filter_curves', f'pupil_lens_v{cfam_version}.txt'),
delimiter=',', unpack=True)
lambda_pupil_nm = lambda_pupil_A / 10
except:
raise Exception('* File with the transmission of the pupil lens not found')
try:
lambda_imaging_A, trans_imaging = np.loadtxt(os.path.join(here, 'data',
'filter_curves', f'imaging_lens_v{cfam_version}.txt'),
delimiter=',', unpack=True)
lambda_imaging_nm = lambda_imaging_A / 10
except:
raise Exception('* File with the transmission of the imaging lens not found')
# Get CFAM filter wavelength and transmission
lambda_nm_filter, trans_lambda_filter = get_cfam(cfam_name=cfam_name,
cfam_version=cfam_version)
# Linear interpolation
trans_lambda_pupil_band = np.interp(
lambda_nm_filter,
lambda_pupil_nm,
trans_pupil)
trans_lambda_imaging_band = np.interp(
lambda_nm_filter,
lambda_imaging_nm,
trans_imaging)
# Ratio of both transmissions:
ratio_imaging_pupil_trans = (np.sum(trans_lambda_imaging_band*trans_lambda_filter)/
np.sum(trans_lambda_pupil_band*trans_lambda_filter))
return ratio_imaging_pupil_trans
[docs]
def get_psf_pix(
dataset,
roi_radius=3,
):
""" Estimate the PSF positions of a set of PSF images.
Args:
dataset (corgidrp.data.Dataset): a collection of off-axis PSFs.
roi_radius (int or float): Half-size of the box around the peak,
in pixels. Adjust based on desired λ/D.
Returns:
Array of pair of values with PSFs position in (fractional) EXCAM pixels
with respect to the pixel (0,0) in the PSF images
"""
psf_pix = []
for psf in dataset:
psf_pix += [astrom.centroid_with_roi(psf.data,roi_radius=roi_radius)]
return np.array(psf_pix)
[docs]
def get_psf_ct(
dataset,
unocc_psf_norm=1,
):
""" Estimate the core throughput of a set of PSF images.
Definition of core throughput: The numerator in CT (counts above 50% peak)
is measured with pupil masks (Lyot stop, SPC pupil mask) in place, DMs at
dark hole solution, but no FPM. The denominator (total stellar flux) is
measured without any masks in place and an infinite aperture.
NOTE: The FPM are kept in place while measuring the CT because near the
region of 6 lam/D, the FPM effects are negligible and the CT data set allows
one to quantify the effect of the FPM in other areas, near the IWA and OWA,
respectively.
See Journal of Astronomical Telescopes, Instruments, and Systems, Vol. 9,
Issue 4, 045002 (October 2023). https://doi.org/10.1117/1.JATIS.9.4.045002
and figures 9-13 for details.
Args:
dataset (corgidrp.data.Dataset): a collection of off-axis PSFs.
unocc_psf_norm (float): sum of the 2-d array corresponding to the
unocculted psf. Default: off-axis PSF are normalized to the unocculted
PSF already. That is, unocc_psf_norm equals 1.
Returns:
Array of core throughput values between 0 and 1.
"""
psf_ct = []
for psf in dataset:
psf_ct += [psf.data[psf.data >= psf.data.max()/2].sum()/unocc_psf_norm]
psf_ct = np.array(psf_ct, dtype=float)
return psf_ct
[docs]
def estimate_psf_pix_and_ct(
dataset_in,
roi_radius=3,
cfam_version=0,
):
"""
1090881 - Given a core throughput dataset consisting of M clean frames
(nominally 1024x1024) taken at different FSM positions, the CTC GSW shall
estimate the pixel location and core throughput of each PSF.
NOTE: the list of M clean frames may be a subset of the frames collected during
core throughput data collection, to allow for the removal of outliers.
Some of the images are pupil images of the unocculted source.
Args:
dataset_in (corgidrp.data.Dataset): A core throughput dataset consisting of
M clean frames (nominally 1024x1024) taken at different FSM positions.
It includes some pupil images of the unocculted source. photoelectrons / second / pixel.
roi_radius (int or float): Half-size of the box around the peak,
in pixels. Adjust based on desired λ/D.
cfam_version (int): version number of the filters (CFAM, pupil, imaging
lens).
Returns:
psf_pix (array): Array with PSF's pixel positions. Units: EXCAM pixels
referred to the (0,0) pixel.
psf_ct (array): Array with PSF's core throughput values. Units:
dimensionless (Values must be within 0 and 1).
"""
dataset = dataset_in.copy()
# All frames must have the same CFAM filter
cfam_list = []
for frame in dataset:
try:
cfam_list += [frame.ext_hdr['CFAMNAME']]
except:
raise Exception('Frame w/o CFAM specification. All frames must have CFAM specified')
if len(set(cfam_list)) != 1:
raise Exception('All frames must have the same CFAM filter')
# identify the pupil images in the dataset
pupil_img_frames = []
for frame in dataset:
try:
# Pupil images of the unocculted source satisfy:
# DPAM=PUPIL, LSAM=OPEN, FSAM=OPEN and FPAM=OPEN_12
exthd = frame.ext_hdr
if (exthd['DPAMNAME']=='PUPIL' and exthd['LSAMNAME']=='OPEN' and
exthd['FSAMNAME']=='OPEN' and exthd['FPAMNAME']=='OPEN_12'):
pupil_img_frames += [frame]
except:
pass
if pupil_img_frames:
print(f'Found {len(pupil_img_frames)} pupil images for the core throughput estimation.')
else:
raise Exception('No pupil image found. At least there must be one pupil image.')
# mean combine the total values (photo-electrons/sec) of the pupil images
unocc_psf_norm = 0
for frame in pupil_img_frames:
unocc_psf_norm += frame.data.sum()
unocc_psf_norm /= len(pupil_img_frames)
# Transform pupil counts into direct imaging counts. Recall all frames have
# the same cfam filter or an Exception is raised
unocc_psf_norm *= di_over_pil_transmission(cfam_name=cfam_list[0],
cfam_version=cfam_version)
# Remove pupil frames
offaxis_frames = []
for frame in dataset:
if frame not in pupil_img_frames:
offaxis_frames += [frame]
dataset_offaxis = corgidrp.data.Dataset(offaxis_frames)
if len(dataset_offaxis):
print(f'Found {len(dataset_offaxis)} off-axis PSFs for the core throughput estimation.')
else:
raise Exception('No off-axis PSF found. At least there must be one off-axis PSF.')
# find the PSF positions of the off-axis PSFs
psf_pix = get_psf_pix(
dataset_offaxis,
roi_radius=roi_radius)
# find the PSF corethroughput of the off-axis PSFs
psf_ct = get_psf_ct(
dataset_offaxis,
unocc_psf_norm = unocc_psf_norm)
# same number of estimates. One per PSF
if len(psf_pix) != len(psf_ct) or len(psf_pix) != len(dataset_offaxis):
raise Exception('PSF positions and CT values are inconsistent')
return psf_pix, psf_ct
[docs]
def generate_psf_cube(
dataset_in,
psf_loc,
cfam_name='1F',
cfam_version=0,
):
"""
Function that derives a 3-d cube of PSF images from a core throughput dataset.
# TODO: error data cubes will be added in a release after R3.0.2
Args:
dataset_in (corgidrp.data.Dataset): A core throughput dataset consisting of
M clean frames (nominally 1024x1024) taken at different FSM positions.
It includes some pupil images of the unocculted source.
psf_loc (array): Array of pair of values with PSFs position in (fractional)
EXCAM pixels with respect to the pixel (0,0) in the PSF images.
cfam_name (string): Filter in CFAM. For instance, '1F', '4A', '3B' or '2C'.
cfam_version (int): version number of the filters (CFAM, pupil, imaging
lens).
Returns:
3-d PSF cube of PSF images from a core throughput dataset, including their
data quality, and corresponding headers as HDU units.
"""
dataset = dataset_in.copy()
# 3-d cube of PSF images cut around the PSF's location
psf_cube = []
dq_cube = []
# Pixels arounf PSF's location +/- n_pix_psf in both dimensions that
# correspond to 3 lambda/D in units of EXCAM pixels:
# 3 * lambda_mean_nm * 1e-9 / D * rad_to_mas / EXCAM_pixel_pitch in mas
n_pix_psf = int(np.ceil(3*get_cfam(cfam_name=cfam_name,
cfam_version=cfam_version)[0].mean()*1e-9/2.36*180/np.pi*3600e3/21.8))
i_psf = 0
for frame in dataset:
# Skip pupil images of the unocculted source, which satisfy:
# DPAM=PUPIL, LSAM=OPEN, FSAM=OPEN and FPAM=OPEN_12
try:
# Pupil images of the unocculted source satisfy:
# DPAM=PUPIL, LSAM=OPEN, FSAM=OPEN and FPAM=OPEN_12
exthd = frame.ext_hdr
if (exthd['DPAMNAME']=='PUPIL' and exthd['LSAMNAME']=='OPEN' and
exthd['FSAMNAME']=='OPEN' and exthd['FPAMNAME']=='OPEN_12'):
continue
except:
pass
idx_0_0 = max(int(np.round(psf_loc[i_psf][1])) - n_pix_psf,0)
idx_0_1 = min(frame.data.shape[0],
int(np.round(psf_loc[i_psf][1])) + n_pix_psf + 1)
idx_1_0 = max(int(np.round(psf_loc[i_psf][0])) - n_pix_psf,0)
idx_1_1 = min(frame.data.shape[1],
int(np.round(psf_loc[i_psf][0])) + n_pix_psf + 1)
psf_cube += [frame.data[idx_0_0:idx_0_1, idx_1_0:idx_1_1]]
dq_cube += [frame.dq[idx_0_0:idx_0_1, idx_1_0:idx_1_1]]
i_psf += 1
psf_cube = np.array(psf_cube)
dq_cube = np.array(dq_cube)
# Check
if len(psf_cube) != len(psf_loc):
raise Exception(('The number of PSFs does not match the number of PSF '+
' locations.'))
# PSF cube header: use first off-axis frame so pupil headers don't propagate
first_offaxis_frame = None
for frame in dataset:
exthd = frame.ext_hdr
if not (exthd['DPAMNAME'] == 'PUPIL' and exthd['LSAMNAME'] == 'OPEN' and
exthd['FSAMNAME'] == 'OPEN' and exthd['FPAMNAME'] == 'OPEN_12'):
first_offaxis_frame = frame
break
if first_offaxis_frame is None:
raise Exception('No off-axis PSF frame found in dataset')
ext_hdr = first_offaxis_frame.ext_hdr
# Add EXTNAME
psf_hdu = fits.ImageHDU(data=psf_cube, header=ext_hdr, name='PSFCUBE')
# Data quality cube
dq_hdr = first_offaxis_frame.dq_hdr
# Add specific information
dq_hdr['COMMENT'] = 'Data quality for each image'
# Add EXTNAME
dq_hdu = fits.ImageHDU(data=dq_cube, header=dq_hdr, name='DQCUBE')
return psf_hdu, dq_hdu
[docs]
def generate_ct_cal(
dataset_in,
roi_radius=3,
cfam_version=0,
):
"""
Generate the elements needed to create a core throughput calibration file.
A CoreThroughput calibration file has two main data arrays:
3-d cube of PSF images, e.g, a N1xN1xN array where N1= +/- 3l/D about
PSF's centroid in EXCAM pixels. The N PSF images are the ones in the CT
dataset.
N sets of (x, y, CT measurements). The (x, y) are pixel coordinates of the
PSF images in the CT dataset wrt EXCAM (0,0) pixel during core throughput
observation.
Args:
dataset_in (corgidrp.data.Dataset): A core throughput dataset consisting of
M clean frames (nominally 1024x1024) taken at different FSM positions.
It includes some pupil images of the unocculted source.
roi_radius (int or float): Half-size of the box around the peak,
in pixels. Adjust based on desired λ/D.
cfam_version (int): version number of the filters (CFAM, pupil, imaging
lens).
Returns:
PSF cube, data quality cube, HDU list with the CT array measurements,
including the PSF locations, FPAM/FSAM positions, and corrresponding
headers.
"""
dataset = dataset_in.copy()
# All frames must have the same CFAM filter
cfam_list = []
for frame in dataset:
try:
cfam_list += [frame.ext_hdr['CFAMNAME']]
except:
raise Exception('Frame w/o CFAM specification. All frames must have CFAM specified')
if len(set(cfam_list)) != 1:
raise Exception('All frames must have the same CFAM filter')
# Get estimated PSF centers and CT
psf_loc_est, ct_est = \
corgidrp.corethroughput.estimate_psf_pix_and_ct(dataset,
roi_radius=roi_radius,
cfam_version=cfam_version)
psf_hdu, dq_hdu = generate_psf_cube(dataset, psf_loc_est,
cfam_name=cfam_list[0], cfam_version=cfam_version)
# N sets of (x,y, CT measurements)
# x, y: PSF centers wrt EXCAM's (0,0) pixel
ct_excam = np.array([psf_loc_est[:,0], psf_loc_est[:,1], ct_est])
ct_hdr = fits.Header()
# Core throughput values on EXCAM wrt pixel (0,0) (not a "CT map", which is
# wrt FPM's center
ct_hdr['COMMENT'] = ('PSF location with respect to EXCAM (0,0) pixel. '
'Core throughput value for each PSF. (x,y,ct)=(data[0], data[1], data[2])')
ct_hdr['UNITS'] = 'PSF location: EXCAM pixels. Core throughput: values between 0 and 1.'
ct_hdu_list = [fits.ImageHDU(data=ct_excam, header=ct_hdr, name='CTEXCAM')]
# Values of FPAM during CT observations (needed to derive the FPM's center
# during CT observations given a coronagraphic dataset). The values do not
# change during CT observations
fpam_hv = [dataset_in[0].ext_hdr['FPAM_H'], dataset_in[0].ext_hdr['FPAM_V']]
fpam_hdr = fits.Header()
fpam_hdr['COMMENT'] = 'FPAM H and V values during the core throughput observations'
fpam_hdr['UNITS'] = 'micrometer'
ct_hdu_list += [fits.ImageHDU(data=fpam_hv, header=fpam_hdr, name='CTFPAM')]
# Values of FSAM during CT observations (needed to derive the FPM's center
# during CT observations given a coronagraphic dataset). The values do not
# change during CT observations
fsam_hv = [dataset_in[0].ext_hdr['FSAM_H'], dataset_in[0].ext_hdr['FSAM_V']]
fsam_hdr = fits.Header()
fsam_hdr['COMMENT'] = 'FSAM H and V values during the core throughput observations'
fsam_hdr['UNITS'] = 'micrometer'
ct_hdu_list += [fits.ImageHDU(data=fsam_hv, header=fsam_hdr, name='CTFSAM')]
# Generate core throughput calibration file
ct_cal = data.CoreThroughputCalibration(psf_hdu.data,
pri_hdr=dataset[0].pri_hdr,
ext_hdr=psf_hdu.header,
input_hdulist=ct_hdu_list,
dq=dq_hdu.data,
dq_hdr=dq_hdu.header,
input_dataset=dataset)
return ct_cal
[docs]
def get_1d_ct(ct_cal,frame,seps,
method='nearest'):
"""Fetches core throughput values at specific separations from the mask center.
Currently only the 'nearest' method is configured.
Args:
ct_cal (corgidrp.data.CoreThroughputCalibration): the core throughput calibration
object.
frame (corgidrp.data.Image): data frame containing mask location and detector 0,0 coordinate
in the header
seps (np.array of float): separations (pixels from the mask center) at which to sample
the CT curve.
method (str, optional): Method of calculating CT at a given separation. Defaults to 'nearest'.
'nearest': grabs the core throughput measured at a location nearest to the desired
separation and assumes CT is radially symmetric.
Returns:
np.array: Array of shape (2,len(seps)), where the first row is the list of separations
sampled, and the second row is the ct value for each separation.
"""
x, y, ct = ct_cal.ct_excam
# Get location of mask center in CT coordinates
xcen = frame.ext_hdr['STARLOCX'] + frame.ext_hdr.get("DETPIX0X",0.) + 0.5
ycen = frame.ext_hdr['STARLOCY'] + frame.ext_hdr.get("DETPIX0Y",0.) + 0.5
ct_seps = np.sqrt((x-xcen)**2 + (y-ycen)**2)
if method == 'nearest':
cts_out = []
for sep in seps:
argmin = np.argmin(np.abs(sep-ct_seps))
ct_out = ct[argmin]
cts_out.append(ct_out)
ct_arr_out = np.array([seps,cts_out])
return ct_arr_out
else:
raise NotImplementedError
[docs]
def create_ct_map(
corDataset,
fpamfsamcal,
ct_cal,
x_range=[-23,23],
y_range=[-23,23],
n_gridx=47,
n_gridy=47,
target_pix=None,
logr=False,
filepath=None,
save=False):
"""
Create a core throughput map: Given a core throughput calibration file and
a coronagraphic dataset, derive 3-D list (x,y,ct) where (x,y) are some
target locations on EXCAM relative to the FPM's center and with valid
values of the throughput.
The core throughmap may be saved, optionally, as a CSV file.
The creation of the core throughput map relies on InterpolateCT(), a
method of the CoreThroughputCalibration class in data.py. Valid core
throughput values are within the minimum and maxium radial distance from
the FPM's center in the core throughput dataset used to generate the
core throughput calibration file. Its options are inluded in the call of
this method too.
If an external list of locations is not provided, a default grid of points
is condidered.
Args:
corDataset (corgidrp.data.Dataset): a dataset containing some
coronagraphic observations.
fpamfsamcal (corgidrp.data.FpamFsamCal): an instance of the
FpamFsamCal class. That is, a FpamFsamCal calibration.
ct_cal (corgidrp.data.CoreThroughputCalibration): an instance of the
CoreThroughputCalibration class. That is, a core throughput calibration
file.
x_range (array): Two values [xmin, xmax] specifying the range of pixels to
be considered. Units are EXCAM pixels measured with respect the center
of the FPM. Notice that [-23,23] is approx. +/-10 l/D in band 1.
y_range (array): Two values [ymin, ymax] specifying the range of pixels to
be considered. Units are EXCAM pixels measured with respect the center
of the FPM. Notice that [-23,23] is approx. +/-10 l/D in band 1.
n_gridx (int) (optional): Number of x gridpoints.
n_gridy (int) (optional): Number of y gridpoints.
target_pix (array) (optional): a user-defined Mx2 array containing the pixel
positions for M target pixels where the core throughput will be derived
by interpolation. The target pixels are measured with respect the center
of the focal plane mask in (fractional) EXCAM pixels. Default is None.
In this case, a rectangular grid of pixel positions is used. Using
matplotlib.pyplot, target_pix[0] is the horizontal axis (x), and
target_pix[1] is the vertical axis (y).
logr (bool) (optional): If True, radii are mapped into their logarithmic
values before constructing the interpolant.
filepath (string) (optional): String with the path and filename of the
file that will store the core throughput map as a CSV file.
save (bool) (optionla): Whether the core throughput map will be stored or not.
Returns:
A core throughput map with (x,y,ct) where x and y are locations
on EXCAM relative to the FPM's center with valid interpolated values of
the core throughput.
"""
# If no target pixels are provided, create a grid:
if target_pix is None:
x_tmp = np.linspace(x_range[0], x_range[1], n_gridx)
y_tmp = np.linspace(y_range[0], y_range[1], n_gridy)
target_pix = np.array(np.meshgrid(x_tmp, y_tmp)).reshape(2, n_gridx*n_gridy)
# Get interpolated CT values at valid positions
ct_interp = ct_cal.InterpolateCT(
target_pix[0], target_pix[1], corDataset, fpamfsamcal, logr=logr)
# Generate the core throughput map object
# Re-order output to match the required order: (x,y,ct)
ct_map = data.CoreThroughputMap(ct_interp[[1,2,0]],
pri_hdr=corDataset[0].pri_hdr,
ext_hdr=corDataset[0].ext_hdr,
input_dataset=corDataset)
return ct_map