# A file that holds the functions that transmogrify l4 data to TDA (Technical Demo Analysis) data
import os
import numpy as np
from astropy.io import fits
from scipy.interpolate import interp1d
import warnings
from photutils.psf import fit_2dgaussian
from corgidrp.data import Dataset, Image, FluxcalFactor, SpecFluxCal
import corgidrp.fluxcal as fluxcal
from corgidrp import check
from corgidrp.klip_fm import measure_noise
from corgidrp.find_source import make_snmap, psf_scalesub
from scipy.stats import gaussian_kde
[docs]
def determine_app_mag(input_data, source_star, scale_factor = 1.):
"""
Determine the apparent Vega magnitude by comparing CALSPEC SEDs.
This function integrates the CALSPEC spectrum given by source_star through the
filter bandpass of the input image(s), integrates the Vega CALSPEC spectrum through
the same bandpass, and writes APP_MAG = -2.5 log10(source_irr / vega_irr) into
the header. The dataset is only inspected to ensure all frames share the same
filter and target. This function does not use the measured photometry/flux in input_data.
To convert a measured flux into a Vega magnitude, use fluxcal.calculate_vega_mag or determine_flux.
Args:
input_data (corgidrp.data.Dataset or corgidrp.data.Image):
A dataset of Images (L2b-level) or a single Image. Must be all of the same source with same filter.
source_star (str): either the fits file path of the flux model of the observed source in
CALSPEC units erg/(s * cm^2 * AA) and format or the (SIMBAD) name of a CALSPEC star
scale_factor (float): factor applied to the flux of the calspec standard source, so that you can apply it
if you have a different source with similar spectral type, but no calspec standard.
Defaults to 1.
Returns:
mag_data (corgidrp.data.Dataset): A version of the input with an updated header including the apparent
magnitude.
"""
# If input is a dataset, process each image
if isinstance(input_data, Dataset):
mag_data = input_data.copy()
# Make sure all frames in dataset have the same filter and target
filter_name = fluxcal.get_filter_name(mag_data[0])
target_name = mag_data[0].pri_hdr["TARGET"]
for img in mag_data:
img_filter = fluxcal.get_filter_name(img)
img_target = img.pri_hdr["TARGET"]
if img_filter != filter_name:
raise ValueError(f"All images in dataset must be taken with the same CFAMNAME for calculating"
f"apparent magnitude. Found {img_filter}, expected {filter_name}."
)
if img_target != target_name:
raise ValueError(f"All images in dataset must be taken of the same TARGET for calculating"
f"apparent magnitude. Found {img_target}, expected {target_name}."
)
elif isinstance(input_data, Image):
mag_data = Dataset([input_data.copy()])
filter_name = fluxcal.get_filter_name(mag_data[0])
# read the transmission curve from the color filter file
wave, filter_trans = fluxcal.read_filter_curve(filter_name)
if source_star.split(".")[-1] == "fits":
source_filepath = source_star
source_filename = os.path.basename(source_star)
else:
source_filepath, source_filename = fluxcal.get_calspec_file(source_star)
vega_filepath, vega_filename = fluxcal.get_calspec_file('Vega')
# calculate the flux of VEGA and the source star from the user given CALSPEC file binned on the wavelength grid of the filter
vega_sed = fluxcal.read_cal_spec(vega_filepath, wave)
source_sed = fluxcal.read_cal_spec(source_filepath, wave) * scale_factor
#Calculate the irradiance of vega and the source star in the filter band
vega_irr = fluxcal.calculate_band_irradiance(filter_trans, vega_sed, wave)
source_irr = fluxcal.calculate_band_irradiance(filter_trans, source_sed, wave)
#calculate apparent magnitude
app_mag = -2.5 * np.log10(source_irr/vega_irr)
# write the reference wavelength and the color correction factor to the header (keyword names tbd)
history_msg = "the apparent Vega magnitude is calculated and added to the header {0} applying source SED file {1} and VEGA SED file {2}".format(str(app_mag), source_filename, vega_filename)
# update the header of the output dataset and update the history
mag_data.update_after_processing_step(history_msg, header_entries = {"APP_MAG": app_mag})
return mag_data
[docs]
def determine_color_cor(input_dataset, ref_star, source_star):
"""
determine the color correction factor of the observed source
at the reference wavelength of the used filter band and put it into the header.
We assume that each frame in the dataset was observed with the same color filter.
Args:
input_dataset (corgidrp.data.Dataset): a dataset of Images (L2b-level)
ref_star (str): either the fits file path of the known reference flux (usually CALSPEC),
or the (SIMBAD) name of a CALSPEC star
source_star (str): either the fits file path of the flux model of the observed source in
CALSPEC units erg/(s * cm^2 * AA) and format or the (SIMBAD) name of a CALSPEC star
Returns:
corgidrp.data.Dataset: a version of the input dataset with updated header including
the reference wavelength and the color correction factor
"""
color_dataset = input_dataset.copy()
# get the filter name from the header keyword 'CFAMNAME'
filter_name = fluxcal.get_filter_name(color_dataset[0])
# read the transmission curve from the color filter file
wave, filter_trans = fluxcal.read_filter_curve(filter_name)
# calculate the reference wavelength of the color filter
lambda_ref = fluxcal.calculate_pivot_lambda(filter_trans, wave)
# ref_star/source_star is either the star name or the file path to fits file
if ref_star.split(".")[-1] == "fits":
calspec_filepath = ref_star
calspec_ref_name = os.path.basename(ref_star)
else:
calspec_filepath, calspec_ref_name = fluxcal.get_calspec_file(ref_star)
if source_star.split(".")[-1] == "fits":
source_filepath = source_star
source_filename = os.path.basename(source_star)
else:
source_filepath, source_filename = fluxcal.get_calspec_file(source_star)
# calculate the flux from the user given CALSPEC file binned on the wavelength grid of the filter
flux_ref = fluxcal.read_cal_spec(calspec_filepath, wave)
# we assume that the source spectrum is a calspec standard or its
# model data is in a file with the same format and unit as the calspec data
source_sed = fluxcal.read_cal_spec(source_filepath, wave)
#Calculate the color correction factor
k = fluxcal.compute_color_cor(filter_trans, wave, flux_ref, lambda_ref, source_sed)
# write the reference wavelength and the color correction factor to the header (keyword names tbd)
history_msg = "the color correction is calculated and added to the header: {0}, source SED file: {1}, reference SED file: {2}".format(str(k), source_filename, calspec_ref_name)
# update the header of the output dataset and update the history
color_dataset.update_after_processing_step(history_msg, header_entries = {"LAM_REF": lambda_ref, "COL_COR": k})
return color_dataset
[docs]
def convert_spec_to_flux(input_dataset, fluxcal):
"""
Flux calibrate 1-D spectroscopy spectra stored in the L4 SPEC extension.
The function propagates calibration uncertainties. It applies the spectral flux calibration,
if not available it can apply the fluxcal factor and color correction.
Requires the input dataset to have already been core-throughput
(ie SPEC header contains CTCOR=True) and
slit transmission corrected (ie SPEC header contains SLITCOR=True).
Args:
input_dataset (corgidrp.data.Dataset): L4 dataset containing SPEC,
SPEC_ERR, SPEC_DQ, SPEC_WAVE, and SPEC_WAVE_ERR extensions.
fluxcal (corgidrp.data.SpecFluxCal or corgidrp.data.FluxcalFactor): wavelength dependent flux calibration
or absolute flux calibration product used to convert the spectrum to absolute flux units erg/(s*cm^2*Å).
Returns:
corgidrp.data.Dataset: copy of the input dataset with the
SPEC/SPEC_ERR data converted to erg/(s*cm^2*Å) and
headers/history updated.
"""
specflux = False
#check that the correct flux calibration file is used
if isinstance(fluxcal, SpecFluxCal):
specflux = True
else:
if not isinstance(fluxcal, FluxcalFactor):
raise TypeError("fluxcal must be a corgidrp.data.FluxcalFactor or SpecFluxCal instance.")
spec_dataset = input_dataset.copy()
for frame in spec_dataset:
if 'SPEC' not in frame.hdu_list:
raise ValueError("Input dataset does not contain a 'SPEC' extension.")
is_coron = frame.pri_hdr.get('VISTYPE') not in ['CGIVST_CAL_SPEC_TGTREF','CGIVST_CAL_TGTREF_PHOT'] # using VISTYPE keyword to check if the image is coronagraphic
if is_coron:
if not frame.hdu_list['SPEC'].header.get('CTCOR', False):
raise ValueError("Core throughput correction must be applied before convert_spec_to_flux for coronagraphic images (missing CTCOR flag).")
spec = frame.hdu_list['SPEC'].data.astype(float, copy=True)
spec_header = frame.hdu_list['SPEC'].header
spec_err = frame.hdu_list['SPEC_ERR'].data.astype(float, copy=True)
if spec_header.get('BUNIT', '').strip().lower() != "photoelectron/s/bin":
raise ValueError("SPEC extension must have BUNIT 'photoelectron/s/bin' before flux calibration.")
if specflux:
# check that the correct flux calibration file is used
if frame.ext_hdr["CFAMNAME"] != fluxcal.ext_hdr["CFAMNAME"]:
raise ValueError(f"the spec_fluxcal has another filter name {fluxcal.ext_hdr['CFAMNAME']}, "
f"than the observation {frame.ext_hdr['CFAMNAME']}.")
# Convert to flux units and propagate uncertainties
factor = fluxcal.specflux
factor_error = fluxcal.specflux_err
else:
# Apply flux calibration factor and color correction
color_cor_fac = frame.ext_hdr.get('COL_COR', 1.0)
factor = fluxcal.fluxcal_fac / color_cor_fac
factor_error = fluxcal.fluxcal_err / color_cor_fac
# Convert to flux units and propagate uncertainties
spec_flux = spec * factor
spec_flux_err = np.sqrt((spec_err * factor) ** 2 + (spec * factor_error) ** 2)
frame.hdu_list['SPEC'].data[:] = spec_flux
frame.hdu_list['SPEC_ERR'].data[:] = spec_flux_err
spec_header['BUNIT'] = "erg/(s*cm^2*AA)"
frame.hdu_list['SPEC_ERR'].header['BUNIT'] = "erg/(s*cm^2*AA)"
if specflux:
history_message = f"Calibrated 1D spectrum applying spectral flux calibration file:{fluxcal.filename}."
spec_dataset.update_after_processing_step(
history_message,
header_entries={"SPECUNIT": "erg/(s*cm^2*AA)"}
)
else:
history_message = f"Calibrated 1D spectrum by applying a broad band fluxcal_factor={fluxcal.fluxcal_fac} determined by aperture photometry, COL_COR={color_cor_fac}."
spec_dataset.update_after_processing_step(
history_message,
header_entries={"SPECUNIT": "erg/(s*cm^2*AA)", "FLUXFAC": fluxcal.fluxcal_fac}
)
return spec_dataset
[docs]
def apply_slit_transmission(input_dataset, slit_transmission):
"""
applies the slit transmission and the algorithm throughput correction (if available)
to a spectrometer input dataset of a corresponding slit observation.
Args:
input_dataset (corgidrp.data.Dataset): L4 dataset containing SPEC,
SPEC_ERR, SPEC_DQ, SPEC_WAVE, and SPEC_WAVE_ERR extensions.
slit_transmission (corgidrp.data.SlitTransmission): slit throughput
calibration product, contains the (slit_map, slit_x, slit_y) tuple of a corresponding slit.
Returns:
corgidrp.data.Dataset: copy of the input dataset with the
SPEC/SPEC_ERR data corrected by slit transmission and headers/history updated.
"""
spec_dataset = input_dataset.copy()
history_messages = []
for frame in spec_dataset:
if 'SPEC' not in frame.hdu_list:
raise ValueError("Input dataset does not contain a 'SPEC' extension.")
if frame.ext_hdr["FSAMNAME"] != slit_transmission.slitname:
raise ValueError("Input dataset is not compatible to slit_transmission, different slits.")
spec = frame.hdu_list['SPEC'].data.astype(float, copy=True)
spec_header = frame.hdu_list['SPEC'].header
spec_err = frame.hdu_list['SPEC_ERR'].data.astype(float, copy=True)
if spec_header.get('BUNIT', '').strip().lower() != "photoelectron/s/bin":
raise ValueError("SPEC extension must have BUNIT 'photoelectron/s/bin' before flux calibration.")
# Apply algorithm throughput correction (ALGO_THRU) if present (PSF-subtracted frames)
if 'ALGO_THRU' in frame.hdu_list:
algo_thru = frame.hdu_list['ALGO_THRU'].data.astype(float)
if algo_thru.shape != spec.shape:
raise ValueError(
f"ALGO_THRU shape {algo_thru.shape} must match SPEC shape {spec.shape}."
)
# Divide by algorithm throughput, accounting for zeros/non-finite
valid = np.isfinite(algo_thru) & (algo_thru != 0)
spec = np.divide(spec, algo_thru, out=np.full_like(spec, np.nan), where=valid)
spec_err = np.divide(spec_err, algo_thru, out=np.full_like(spec_err, np.nan), where=valid)
spec_header['ALGOCOR'] = True
history_messages.append("Applied algorithm throughput correction (ALGO_THRU).")
# Apply slit transmission correction
slit_curve = np.asarray(slit_transmission.select_slit_transmission_curve(frame), dtype=float)
if slit_curve.shape != spec.shape:
raise ValueError(
f"slit_transmission curve shape {slit_curve.shape} must match SPEC shape {spec.shape}."
)
# Divide by wavelength-dependent slit transmission, accounting for zeros/non-finite
valid = np.isfinite(slit_curve) & (slit_curve != 0)
spec = np.divide(spec, slit_curve, out=np.full_like(spec, np.nan), where=valid)
spec_err = np.divide(spec_err, slit_curve, out=np.full_like(spec_err, np.nan), where=valid)
frame.hdu_list['SPEC'].data[:] = spec
frame.hdu_list['SPEC_ERR'].data[:] = spec_err
spec_header['SLITFAC'] = float(np.nanmean(slit_curve))
spec_header['SLITCOR'] = True
history_messages.append(f"spectrum is slit transmission corrected with mean factor={float(np.nanmean(slit_curve))}.")
spec_dataset.update_after_processing_step(
".".join(history_messages)
)
return spec_dataset
[docs]
def apply_core_throughput_correction(frame,
core_throughput_cal,
fpam_fsam_cal,
logr=False):
"""
Apply a core-throughput correction to a single L4 spectroscopy frame.
Args:
frame (corgidrp.data.Image): L4 spectroscopy frame containing SPEC/SPEC_ERR HDUs.
core_throughput_cal (corgidrp.data.CoreThroughputCalibration): calibration product
providing InterpolateCT().
fpam_fsam_cal (corgidrp.data.FpamFsamCal): calibration relating FPAM/FSAM motions
to EXCAM coordinates.
logr (bool): passed through to InterpolateCT (logarithmic radii interpolation).
Returns:
tuple: (ct_value, corrected_frame) where:
- ct_value (float): core-throughput factor applied to the frame.
- corrected_frame (corgidrp.data.Image): the corrected frame (same object as input, modified in place).
"""
if fpam_fsam_cal is None:
raise ValueError("fpam_fsam_cal is required for core throughput correction.")
try:
wv0_x = float(frame.ext_hdr['WV0_X'])
wv0_y = float(frame.ext_hdr['WV0_Y'])
except KeyError as exc:
raise ValueError("Frame is missing WV0_X/WV0_Y required for core throughput correction.") from exc
# Convert WV0_X/Y (absolute EXCAM pixels) to FPM-relative coordinates
# STARLOCX/Y is the FPM center during the coronagraphic observation
try:
fpm_center_x = float(frame.ext_hdr['STARLOCX'])
fpm_center_y = float(frame.ext_hdr['STARLOCY'])
except KeyError as exc:
raise ValueError("Frame is missing STARLOCX/STARLOCY required for core throughput correction.") from exc
wv0_x_relative = wv0_x - fpm_center_x
wv0_y_relative = wv0_y - fpm_center_y
# InterpolateCT expects coordinates relative to the FPM center
ct_values = core_throughput_cal.InterpolateCT(
wv0_x_relative,
wv0_y_relative,
Dataset([frame]),
fpam_fsam_cal,
logr=logr,
)
ct_value = float(np.atleast_1d(ct_values).ravel()[0])
if not np.isfinite(ct_value) or ct_value <= 0:
raise ValueError(f"Invalid core throughput value {ct_value}.")
frame.hdu_list['SPEC'].data[:] /= ct_value
if 'SPEC_ERR' in frame.hdu_list:
frame.hdu_list['SPEC_ERR'].data[:] /= ct_value
frame.hdu_list['SPEC_ERR'].header['CTCOR'] = True
frame.hdu_list['SPEC_ERR'].header['CTFAC'] = ct_value
frame.hdu_list['SPEC'].header['CTCOR'] = True
frame.hdu_list['SPEC'].header['CTFAC'] = ct_value
return ct_value, frame
[docs]
def combine_spectra(input_dataset):
"""
Combine multiple 1-D spectra in a Dataset into a single spectrum.
This function takes a Dataset of images that each contain SPEC,
SPEC_ERR and SPEC_WAVE extensions, interpolates all spectra onto
a common wavelength grid, and combines them using inverse-variance
weighting (1/σ^2).
Args:
input_dataset (corgidrp.data.Dataset): Dataset of Images with 1-D
spectra and SPEC/SPEC_ERR/SPEC_WAVE extensions.
Returns:
tuple: (combined_spec, wavelength, combined_err, rotations) where:
- combined_spec (ndarray): weighted spectrum on the reference grid
- wavelength (ndarray): reference wavelength grid
- combined_err (ndarray): 1σ uncertainty of the combined spectrum
- rotations (list): list of PA_APER angles
"""
# Collect per-frame spectra, uncertainties, and wavelength grids
spec_list = []
err_list = []
wave_list = []
rotations = []
for img in input_dataset:
spec = np.array(img.hdu_list['SPEC'].data, dtype=float)
spec_err = np.array(img.hdu_list['SPEC_ERR'].data, dtype=float)
spec_err = np.squeeze(spec_err)
wave = np.array(img.hdu_list['SPEC_WAVE'].data, dtype=float)
if spec.shape != wave.shape:
raise ValueError(f"SPEC shape {spec.shape} must match SPEC_WAVE shape {wave.shape}.")
if spec_err.shape != spec.shape:
raise ValueError(f"SPEC_ERR shape {spec_err.shape} must match SPEC shape {spec.shape}.")
spec_list.append(spec)
err_list.append(spec_err)
wave_list.append(wave)
rotations.append(img.pri_hdr.get('PA_APER'))
reference_wave = wave_list[0]
ref_decreasing = reference_wave[0] > reference_wave[-1]
reference_wave_ordered = reference_wave[::-1] if ref_decreasing else reference_wave
spectra_per_frame = {}
errs_per_frame = {}
weighted_numer = np.zeros_like(reference_wave, dtype=float)
weighted_denom = np.zeros_like(reference_wave, dtype=float)
for idx in range(len(spec_list)):
spec = spec_list[idx]
spec_err = err_list[idx]
wavelength = wave_list[idx]
# Align each spectrum/err onto the reference grid if needed
if np.allclose(wavelength, reference_wave):
aligned_spec = spec
aligned_err = spec_err
else:
# Regrid spectrum onto the reference wavelength grid
src_decreasing = wavelength[0] > wavelength[-1]
wave_ordered = wavelength[::-1] if src_decreasing else wavelength
spec_ordered = spec[::-1] if src_decreasing else spec
interp = np.interp(
reference_wave_ordered,
wave_ordered,
spec_ordered,
left=spec_ordered[0],
right=spec_ordered[-1],
)
aligned_spec = interp[::-1] if ref_decreasing else interp
# Regrid uncertainties in the same way
err_ordered = spec_err[::-1] if src_decreasing else spec_err
err_interp = np.interp(
reference_wave_ordered,
wave_ordered,
err_ordered,
left=err_ordered[0],
right=err_ordered[-1],
)
aligned_err = err_interp[::-1] if ref_decreasing else err_interp
spectra_per_frame[idx] = aligned_spec
errs_per_frame[idx] = aligned_err
# Inverse-variance weighting
weights = 1.0 / (aligned_err ** 2)
weighted_numer += aligned_spec * weights
weighted_denom += weights
# Final combined spectrum and uncertainty on the reference grid
combined_spec = weighted_numer / weighted_denom
combined_err = np.sqrt(1.0 / weighted_denom)
return combined_spec, reference_wave, combined_err, rotations
[docs]
def compute_spec_flux_ratio(host_image, companion_image):
"""
Compute the flux ratio of a single companion image relative to a single
host image. Input should already be flux calibrated and slit transmission corrected.
Args:
host_image (corgidrp.data.Image): L4 image containing the host spectrum (can be a combined/
weighted spectrum).
companion_image (corgidrp.data.Image): L4 image containing the companion spectrum (can be
a combined/ weighted spectrum).
Returns:
tuple: (flux_ratio, wavelength, metadata) where:
- flux_ratio (numpy.ndarray): companion/host spectrum R(λ).
- wavelength (numpy.ndarray): wavelength array in nm.
- metadata (dict): contains:
- 'rotation': host PA_APER angle
- 'companion_rotation': companion PA_APER angle
- 'ratio_err': 1σ uncertainty on the flux ratio R(λ).
"""
host_spec = np.array(host_image.hdu_list['SPEC'].data, dtype=float)
comp_spec = np.array(companion_image.hdu_list['SPEC'].data, dtype=float)
host_err = np.array(host_image.hdu_list['SPEC_ERR'].data, dtype=float)
comp_err = np.array(companion_image.hdu_list['SPEC_ERR'].data, dtype=float)
host_err = np.squeeze(host_err)
comp_err = np.squeeze(comp_err)
host_wave = host_image.hdu_list['SPEC_WAVE'].data
comp_wave = companion_image.hdu_list['SPEC_WAVE'].data
# Align wavelength grids if the host/companion spectra were sampled in opposite directions
# (np.interp requires increasing x-coordinates).
if not np.allclose(host_wave, comp_wave):
host_decreasing = host_wave[0] > host_wave[-1]
comp_decreasing = comp_wave[0] > comp_wave[-1]
host_wave_interp = host_wave[::-1] if host_decreasing else host_wave
if comp_decreasing:
comp_wave_interp = comp_wave[::-1]
comp_spec_interp = comp_spec[::-1]
comp_err_interp = comp_err[::-1]
else:
comp_wave_interp = comp_wave
comp_spec_interp = comp_spec
comp_err_interp = comp_err
comp_spec_aligned = np.interp(
host_wave_interp,
comp_wave_interp,
comp_spec_interp,
left=comp_spec_interp[0],
right=comp_spec_interp[-1]
)
comp_err_aligned = np.interp(
host_wave_interp,
comp_wave_interp,
comp_err_interp,
left=comp_err_interp[0],
right=comp_err_interp[-1]
)
comp_spec = comp_spec_aligned[::-1] if host_decreasing else comp_spec_aligned
comp_err = comp_err_aligned[::-1] if host_decreasing else comp_err_aligned
# Ratio calculation - invalid host/companion values become NaN rather than raising warnings.
with np.errstate(divide='ignore', invalid='ignore'):
flux_ratio = comp_spec / host_spec
invalid_mask = (host_spec == 0) | ~np.isfinite(host_spec) | ~np.isfinite(comp_spec)
flux_ratio[invalid_mask] = np.nan
# Propagate uncertainties for comp/host division:
# σ_R^2 = (σ_C / H)^2 + (C * σ_H / H^2)^2
ratio_unc = np.full_like(flux_ratio, np.nan, dtype=float)
valid_unc = (
(host_spec != 0) &
np.isfinite(host_spec) &
np.isfinite(comp_spec) &
np.isfinite(host_err) &
np.isfinite(comp_err)
)
if np.any(valid_unc):
comp_term = np.zeros_like(flux_ratio, dtype=float)
host_term = np.zeros_like(flux_ratio, dtype=float)
comp_term[valid_unc] = (comp_err[valid_unc] / host_spec[valid_unc]) ** 2
host_term[valid_unc] = (
(comp_spec[valid_unc] * host_err[valid_unc]) / (host_spec[valid_unc] ** 2)
) ** 2
variance = comp_term + host_term
ratio_unc[valid_unc] = np.sqrt(variance[valid_unc])
metadata = {
'rotation': host_image.pri_hdr.get('PA_APER'),
'companion_rotation': companion_image.pri_hdr.get('PA_APER'),
'ratio_err': ratio_unc,
}
return flux_ratio, host_wave, metadata
[docs]
def convert_to_flux(input_dataset, fluxcal_factor):
"""
Convert the data from photoelectron unit to flux unit erg/(s * cm^2 * AA).
Args:
input_dataset (corgidrp.data.Dataset): a dataset of Images
fluxcal_factor (corgidrp.data.FluxcalFactor): flux calibration file
Returns:
corgidrp.data.Dataset: a version of the input dataset with the data in flux units
"""
# you should make a copy the dataset to start
if input_dataset[0].ext_hdr['BUNIT'] != "photoelectron/s":
raise ValueError("input dataset must have unit photoelectron/s for the conversion, not {0}".format(input_dataset[0].ext_hdr['BUNIT']))
flux_dataset = input_dataset.copy()
flux_cube = flux_dataset.all_data
flux_error = flux_dataset.all_err
if "COL_COR" in flux_dataset[0].ext_hdr:
color_cor_fac = flux_dataset[0].ext_hdr['COL_COR']
else:
warnings.warn("There is no COL_COR keyword in the header, color correction was not done, it is set to 1")
color_cor_fac = 1
factor = fluxcal_factor.fluxcal_fac/color_cor_fac
factor_error = fluxcal_factor.fluxcal_err/color_cor_fac
error_frame = flux_cube * factor_error
flux_cube *= factor
#scale also the old error with the flux_factor and propagate the error
# err = sqrt(err_flux^2 * flux_fac^2 + fluxfac_err^2 * flux^2)
flux_dataset.rescale_error(factor, "fluxcal_factor")
flux_dataset.add_error_term(error_frame, "fluxcal_error")
history_msg = "data converted to flux unit erg/(s * cm^2 * AA) by fluxcal_factor {0} plus color correction".format(fluxcal_factor.fluxcal_fac)
# update the output dataset with this converted data and update the history
flux_dataset.update_after_processing_step(history_msg, new_all_data=flux_cube, header_entries = {"BUNIT":"erg/(s*cm^2*AA)", "FLUXFAC":fluxcal_factor.fluxcal_fac})
return flux_dataset
[docs]
def compute_flux_ratio_noise(input_dataset, NDcalibration, unocculted_star_dataset, unocculted_star_loc=None, requested_separations=None, halfwidth=None,
nsigma=1, small_sample_correction=False):
'''
Uses the PSF-subtracted frame and its algorithm throughput vs separation to
produce a calibrated n-sigma flux ratio noise curve, also accounting for the throughput of the coronagraph.
It calculates flux ratio noise curve value for each radial separation from the subtracted star location, interpolating KLIP and core throughput values at these input separations.
It uses a dataset of unocculted stars and ND transmission to determine the integrated flux of the Gaussian-fit star (where each frame in the dataset is assumed to correspond to the frames
in the input_dataset), and an estimate of planet flux per frame of input_dataset is made by calculating the integrated flux of a Gaussian with amplitude equal to
the annular noise and FWHM equal to that used for KLIP algorithm througput for each radial separation.
Args:
input_dataset (corgidrp.data.Dataset): a dataset of PSF-subtracted Images
NDcalibration (corgidrp.data.NDFilterSweetSpotDataset): ND filter calibration
unocculted_star_dataset (corgidrp.data.Dataset): a dataset of unocculted star Images corresponding to the Images in input_dataset. Should have the same number of frames as input_dataset (1-to-1 correspondence).
unocculted_star_loc (2-D float array, optional): array of coordinates of the unocculted stars according to the order given in the unocculted_star_dataset.
The first row of the array is for row position, and the second row is for column position.
If None, the peak pixel location is used for each frame. Defaults to None.
requested_separations (float array, optional): separations at which to compute the flux ratio noise curve. If None, the separations used for
the core throughput are used (e.g., no interpolation needed). Defaults to None.
halfwidth (float, optional): halfwidth of the annulus to use for noise calculation. If None, half
of the minimum spacing between separation distances (if it isn't uniform spacing) is used. Defaults to None.
nsigma (float, optional): Sigma multiplier for the noise curve. E.g. nsigma=5 produces a 5-sigma
contrast curve. Defaults to 1.
small_sample_correction (bool, optional): If True, apply the small sample statistics correction
from Mawet et al. (2014) using the Student's t-distribution. Defaults to False.
Returns:
corgidrp.data.Dataset: input dataset with an additional extension header 'FRN_CRV' for every frame, containing the
calibrated flux ratio noise curve as a function of radial separation. The data in that extension for a given frame is a (2+M)xN array,
where:
--the first row contains the separation radii in pixels
--the second row containts the separation radii in milli-arcseconds (mas)
--and the M rows contain the corresponding flux ratio noise curve values for the M KL mode truncations (maintaining the KL index ordering).
TODO: Add uncertainty to flux ratio noise curve based on uncertainties in core throughput and algorithm throughput if those are implemented in the future.
'''
output_dataset = input_dataset.copy()
if len(input_dataset) != len(unocculted_star_dataset):
raise ValueError('The number of frames in input_dataset and unocculted_star_dataset must be the same.')
for i, frame in enumerate(output_dataset.frames):
pixscale_mas = frame.ext_hdr['PLTSCALE']
klip_tp = frame.hdu_list['KL_THRU'].data[1:,:,0]
core_tp = frame.hdu_list['CT_THRU'].data[1]
klip_seps = frame.hdu_list['KL_THRU'].data[0,:,0]
ct_seps = frame.hdu_list['CT_THRU'].data[0]
klip_fwhms = frame.hdu_list['KL_THRU'].data[1:,:,1]
min_sep = np.max([np.min(klip_seps), np.min(ct_seps)])
max_sep = np.min([np.max(klip_seps), np.max(ct_seps)])
if requested_separations is None:
requested_separations = klip_seps
if np.any(requested_separations < min_sep) or np.any(requested_separations > max_sep):
warnings.warn('Not all requested_separations are within the range of the separations used for the KLIP and core throughputs. Extrapolation will be used.')
ct_spacings = ct_seps - np.roll(ct_seps, 1)
# ignore the meaningless first entry (b/c of looping around with np.roll)
min_ct_spacing = np.min(ct_spacings[1:])
klip_spacings = klip_seps - np.roll(klip_seps, 1)
# ignore the meaningless first entry (b/c of looping around with np.roll)
min_klip_spacing = np.min(klip_spacings[1:])
min_spacing = np.min([min_ct_spacing, min_klip_spacing])
if halfwidth is None:
halfwidth = min_spacing/2
check.real_positive_scalar(halfwidth, 'halfwidth', ValueError)
if halfwidth > min_spacing/2:
warnings.warn('Halfwidth is wider than half the minimum spacing between separation values.')
# Interpolate FWHMs (before measure_noise so they're available for small sample correction)
interp_fwhms = np.zeros((len(klip_fwhms), len(requested_separations)))
for j in range(len(klip_fwhms)):
fwhms_func = interp1d(klip_seps, klip_fwhms[j], kind='linear', fill_value='extrapolate')
interp_fwhms[j] = fwhms_func(requested_separations)
# Mean FWHM across KL modes for small sample correction
mean_fwhm = np.mean(interp_fwhms, axis=0) if small_sample_correction else None
annular_noise = measure_noise(frame, requested_separations, halfwidth,
nsigma=nsigma, fwhm=mean_fwhm,
small_sample_correction=small_sample_correction) # in photoelectrons/s
# now need to get Fp/Fs
# For star flux, Fs: integrated flux of star modeled as analytic formula for volume under 2-D Gaussian defined
# by amplitude and FWHM used for KLIP throughput calculation. Amplitude found by doing Gaussian fit.
star_fr = unocculted_star_dataset.frames[i]
if unocculted_star_loc is None:
peak_row, peak_col = np.where(star_fr.data == star_fr.data.max())
pos = (peak_row[0], peak_col[0])
else:
pos = (unocculted_star_loc[0][i], unocculted_star_loc[1][i])
if pos[0] > star_fr.data.shape[0] or pos[1] > star_fr.data.shape[1]:
raise ValueError('The guess centroid pixel location for the unocculted star is outside the image bounds.')
# fit_shape inupt below must have odd numbers:
if np.mod(star_fr.data.shape[0], 2) == 0:
row_shape = star_fr.data.shape[0] - 1
else:
row_shape = star_fr.data.shape[0]
if np.mod(star_fr.data.shape[1], 2) == 0:
col_shape = star_fr.data.shape[1] - 1
else:
col_shape = star_fr.data.shape[1]
fit_shape = (row_shape, col_shape)
data = star_fr.data[:row_shape, :col_shape]
mask = star_fr.dq.astype(bool)[:row_shape, :col_shape]
guess_row, guess_col = pos
# Get the value at the max_row and max_col position
half_value = data[guess_row, guess_col] / 2
# Calculate the absolute difference from half_value for all pixels
abs_diff = np.abs(data - half_value)
# Get the indices of the 20 smallest differences
closest_indices = np.unravel_index(np.argsort(abs_diff.ravel())[:20], data.shape)
# to estimate a guess FWHM over a large frame (to ensure a decent fit),
# Find the highest-density location among the 20 pixels closest to half the guess star amplitude
positions = np.vstack([closest_indices[0], closest_indices[1]])
# Perform kernel density estimation
kde = gaussian_kde(positions)
density = kde(positions)
# Find the index of the highest density
highest_density_index = np.argmax(density)
# Get the row and column of the highest-density location
median_row = closest_indices[0][highest_density_index]
median_col = closest_indices[1][highest_density_index]
fwhm_guess = 2*np.sqrt((median_row-guess_row)**2 + (median_col-guess_col)**2)
psf_phot = fit_2dgaussian(data, xypos=pos, fit_shape=fit_shape, fwhm=fwhm_guess, fix_fwhm=False,
mask=mask, error=None)
star_xs = psf_phot.results['x_fit']
star_ys = psf_phot.results['y_fit']
# in case more than 1 PSF found:
star_ind = np.argmin(np.sqrt((star_xs-guess_col)**2+(star_ys-guess_row)**2))
star_x = star_xs[star_ind]
star_y = star_ys[star_ind]
#TODO perhaps incorporate into ERR in future, and incorporate error in psf_phot argument above (must be non-zero, though)
star_err = psf_phot.results['flux_err'][star_ind]
# OD (optical density) of the ND filter at the star location:
ND_od = NDcalibration.interpolate_od(star_x, star_y)
# integral under the fitted 2-D Gaussian for the unocculted star,
# corrected for ND filter attenuation: true_flux = measured_flux * 10**OD
Fs = 10**ND_od * psf_phot.results['flux_fit'][star_ind]
# For planet flux, Fp: treat the annular noise value as the amplitude of a 2-D Gaussian and use the
# same FWHM used for KLIP throughput calculation. The analytic formula for volume under the Gaussian is the integrated flux.
noise_amp = annular_noise.T
Fp = np.pi*noise_amp*interp_fwhms**2/(4*np.log(2)) #integral of 2-D Gaussian
# Interpolate/extrapolate the algorithm and core throughputs at the desired separations
klip_interp_func = interp1d(klip_seps, klip_tp, kind='linear', fill_value='extrapolate')
klip_tp = klip_interp_func(requested_separations)
ct_interp_func = interp1d(ct_seps, core_tp, kind='linear', fill_value='extrapolate')
core_tp = ct_interp_func(requested_separations)
frn_vals = (Fp/Fs)/(klip_tp*core_tp)
# include row for separations in milli-arcseconds (mas)
requested_mas = requested_separations * pixscale_mas
flux_ratio_noise_curve = np.vstack([requested_separations, requested_mas, frn_vals])
hdr = fits.Header()
hdr['BUNIT'] = "Fp/Fs"
hdr['NSIGMA'] = (nsigma, "Sigma level of noise curve")
hdr['SSCORR'] = (small_sample_correction, "Mawet+2014 small sample correction applied")
hdr['COMMENT'] = "Flux ratio noise curve as a function of radial separation. First row: separation radii in pixels. Second row: separation radii in mas. Remaining rows: flux ratio noise curve values for KL mode truncations."
frame.add_extension_hdu('FRN_CRV', data = flux_ratio_noise_curve, header=hdr)
history_msg = 'Added FRN_CRV (nsigma={0}, small_sample_correction={1}).'.format(nsigma, small_sample_correction)
output_dataset.update_after_processing_step(history_msg)
return output_dataset
[docs]
def determine_flux(input_dataset, fluxcal_factor, photo = "aperture", phot_kwargs = None):
"""
Calculates the total number of photoelectrons/s of a point source and convert them to the flux in erg/(s * cm^2 * AA).
Write the flux and corresponding error in the header. Convert the flux to Vega magnitude and write it in the header.
We assume that the source is the brightest point source in the field close to the center.
Args:
input_dataset (corgidrp.data.Dataset): a dataset of Images with the source
fluxcal_factor (corgidrp.data.FluxcalFactor): flux calibration file
photo (String): do either aperture photometry ("aperture") or 2DGaussian fit of the point source ("2dgauss")
phot_kwargs (dict): parameters of the photometry method, for details see fluxcal.aper_phot and fluxcal.phot_by_gauss_2dfit
Returns:
corgidrp.data.Dataset: a version of the input dataset with the data in flux units
"""
# you should make a copy the dataset to start
if input_dataset[0].ext_hdr['BUNIT'] != "photoelectron/s":
raise ValueError("input dataset must have unit photoelectron/s for the flux determination, not {0}".format(input_dataset[0].ext_hdr['BUNIT']))
flux_dataset = input_dataset.copy()
if "COL_COR" in flux_dataset[0].ext_hdr:
color_cor_fac = flux_dataset[0].ext_hdr['COL_COR']
else:
warnings.warn("There is no COL_COR keyword in the header, color correction was not done, it is set to 1")
color_cor_fac = 1
factor = fluxcal_factor.fluxcal_fac/color_cor_fac
factor_error = fluxcal_factor.fluxcal_err/color_cor_fac
if photo == "aperture":
if phot_kwargs == None:
#set default values
phot_kwargs = {
'encircled_radius': 5,
'frac_enc_energy': 1.0,
'method': 'subpixel',
'subpixels': 5,
'background_sub': False,
'r_in': 5,
'r_out': 10,
'centering_method': 'xy',
'centroid_roi_radius': 5
}
phot_values = [fluxcal.aper_phot(image, **phot_kwargs) for image in flux_dataset]
elif photo == "2dgauss":
if phot_kwargs == None:
#set default values
phot_kwargs = {
'fwhm': 3,
'fit_shape': None,
'background_sub': False,
'r_in': 5,
'r_out': 10,
'centering_method': 'xy',
'centroid_roi_radius': 5
}
phot_values = [fluxcal.phot_by_gauss2d_fit(image, **phot_kwargs) for image in flux_dataset]
else:
raise ValueError(photo + " is not a valid photo parameter, choose aperture or 2dgauss")
if phot_kwargs.get('background_sub', False):
ap_sum, ap_sum_err, back = np.mean(np.array(phot_values),0)
else:
ap_sum, ap_sum_err = np.mean(np.array(phot_values),0)
back = 0
flux = ap_sum * factor
flux_err = np.sqrt(ap_sum_err**2 * factor**2 + factor_error**2 *ap_sum**2)
#Also determine the apparent Vega magnitude
filter_file = fluxcal.get_filter_name(flux_dataset[0])
vega_mag = fluxcal.calculate_vega_mag(flux, filter_file)
#calculate the magnitude error from the flux error and put it in MAGERR header
vega_mag_err = 2.5/np.log(10) * flux_err/flux
history_msg = "star {0} flux calculated as {1} erg/(s * cm^2 * AA) corresponding to {2} vega magnitude".format(flux_dataset[0].pri_hdr["TARGET"],flux, vega_mag)
# update the output dataset with this converted data and update the history
flux_dataset.update_after_processing_step(history_msg, header_entries = {"FLUXFAC": fluxcal_factor.fluxcal_fac, "LOCBACK": back, "FLUX": flux, "FLUXERR": flux_err, "APP_MAG": vega_mag, "MAGERR": vega_mag_err})
return flux_dataset
[docs]
def update_to_tda(input_dataset):
"""
Updates the data level to TDA (Technical Demo Analysis). Only works on L4 data.
Currently only checks that data is at the L4 level
Args:
input_dataset (corgidrp.data.Dataset): a dataset of Images (L4-level)
Returns:
corgidrp.data.Dataset: same dataset now at TDA-level
"""
# check that we are running this on L1 data
for orig_frame in input_dataset:
if orig_frame.ext_hdr['DATALVL'] != "L4":
err_msg = "{0} needs to be L4 data, but it is {1} data instead".format(orig_frame.filename, orig_frame.ext_hdr['DATALVL'])
raise ValueError(err_msg)
# we aren't altering the data
updated_dataset = input_dataset.copy(copy_data=False)
for frame in updated_dataset:
# update header
frame.ext_hdr['DATALVL'] = "TDA"
# update filename convention. The file convention should be
# "CGI_[datalevel_*]" so we should be same just replacing the just instance of L1
frame.filename = frame.filename.replace("_l4_", "_tda_", 1)
history_msg = "Updated Data Level to TDA"
updated_dataset.update_after_processing_step(history_msg)
return updated_dataset
[docs]
def find_source(input_image, psf=None, fwhm=2.8, nsigma_threshold=5.0,
image_without_planet=None):
"""
Detects sources in an image based on a specified SNR threshold and save their approximate pixel locations and SNRs into the header.
Args:
input_image (corgidrp.data.Image): The input image to search for sources (L4-level).
psf (ndarray, optional): The PSF used for detection. If None, a Gaussian approximation is created.
fwhm (float, optional): Full-width at half-maximum of the PSF in pixels.
nsigma_threshold (float, optional): The SNR threshold for source detection.
image_without_planet (ndarray, optional): An image without any sources (~noise map) to make snmap more accurate.
Returns:
corgidrp.data.Image: A copy of the input image with the detected sources and their SNRs saved in the header.
"""
new_image = input_image.copy()
# Ensure an odd-sized box for PSF convolution
boxsize = int(fwhm * 3)
boxsize += 1 if boxsize % 2 == 0 else 0 # Ensure an odd box size
# Create coordinate grids
y, x = np.indices((boxsize, boxsize))
y -= boxsize // 2 ; x -= boxsize // 2
if psf is None:
sigma = fwhm / (2.0 * np.sqrt(2.0 * np.log(2))) # Convert FWHM to sigma
psf = np.exp(-(x**2 + y**2) / (2.0 * sigma**2))
# Generate a binary mask for PSF convolution
distance_map = np.sqrt(x**2 + y**2)
idx = np.where( (distance_map <= fwhm*0.5) )
psf_binarymask = np.zeros_like(psf) ; psf_binarymask[idx] = 1
# Compute the SNR map using cross-correlation
image_residual = np.zeros_like(new_image.data) + new_image.data
image_snmap = make_snmap(image_residual, psf_binarymask, image_without_planet=image_without_planet)
sn_source, xy_source = [], []
# Iteratively detect sources above the SNR threshold
while np.nanmax(image_snmap) >= nsigma_threshold:
sn = np.nanmax(image_snmap)
xy = np.unravel_index(np.nanargmax(image_snmap), image_snmap.shape)
if sn > nsigma_threshold:
sn_source.append(sn)
xy_source.append(xy)
# Scale and subtract the detected PSF from the image
image_residual = psf_scalesub(image_residual, xy, psf, fwhm)
# Update the SNR map after source removal
image_snmap = make_snmap(image_residual, psf_binarymask, image_without_planet=image_without_planet)
# Store detected sources in FITS header
for i in range(len(sn_source)):
new_image.ext_hdr[f'snyx{i:03d}'] = f'{sn_source[i]:5.1f},{xy_source[i][0]:4d},{xy_source[i][1]:4d}'
# names of the header keywords are tentative
return new_image
[docs]
def calculate_zero_point(image, star_name, encircled_radius, phot_kwargs=None):
"""
Calculate the photometric zero point for a given star image.
Computes the zero point by comparing the measured photon flux from the image with the apparent magnitude
determined for the star relative to Vega. If no photometry keyword arguments are provided, default values
are used for the aperture photometry parameters.
Args:
image (corgidrp.data.Image): An image containing the star.
star_name (str): The name of the star, used to determine its apparent magnitude.
encircled_radius (float): The radius within which to sum the counts for aperture photometry.
phot_kwargs (dict, optional): A dictionary of keyword arguments for photometry, including parameters
like 'frac_enc_energy', 'method', 'subpixels', 'background_sub', 'r_in', 'r_out',
'centering_method', and 'centroid_roi_radius'. Defaults to a predefined set of parameters if None.
Returns:
zp (float): The computed zero point based on the apparent magnitude and the measured counts sum.
"""
if phot_kwargs is None:
phot_kwargs = {
'frac_enc_energy': 1.0,
'method': 'subpixel',
'subpixels': 5,
'background_sub': False,
'r_in': 5,
'r_out': 10,
'centering_method': 'xy',
'centroid_roi_radius': 5
}
# Compute apparent magnitude of this star compared to Vega in this band
mag_dataset = determine_app_mag(image, star_name)
# Get the apparent magnitude from the updated dataset
app_mag = mag_dataset[0].ext_hdr["APP_MAG"]
# Compute zero point using real measured photons
ap_sum = fluxcal.aper_phot(image, encircled_radius, **phot_kwargs, centering_initial_guess=None)
zp = app_mag + 2.5 * np.log10(ap_sum)
return zp
[docs]
def calc_pol_p_and_pa_image(input_Image):
"""Compute polarization intensity, fractional polarization, and EVPA from Stokes maps.
Args:
input_Image (Image): Object containing Stokes maps and uncertainties.
Returns:
Image: Image object containing
- data: stacked [P, p, evpa] (shape: 3 x H x W)
- ext_hdr: Header with updated HISTORY
- err, dq: arrays reflecting Perr, perr and evpa_err
Raises:
AttributeError: If `input_Image` is missing `data` or `err` attributes.
ValueError: If Stokes maps I, Q, U have inconsistent shapes or insufficient slices.
"""
# --- Extract Stokes parameters ---
try:
I, Q, U = input_Image.data[0:3]
Ierr, Qerr, Uerr = input_Image.err[0][0:3]
Idq, Qdq, Udq = input_Image.dq[0:3]
# V, Qphi, Uphi = Image.data[3:6] # unused
except AttributeError as e:
raise AttributeError("Image object must have 'data' and 'err' attributes.") from e
except IndexError as e:
raise ValueError("Image.data and Image.err must have at least [0..2] slices.") from e
# --- Polarized intensity and error ---
P = np.sqrt(Q**2 + U**2)
Perr = np.sqrt((Q * Qerr)**2 + (U * Uerr)**2) / np.maximum(P, 1e-10)
# --- Fractional polarization and its error ---
p = P / np.maximum(I, 1e-10)
perr = np.sqrt((Perr / np.maximum(I, 1e-10))**2 +
(P * Ierr / np.maximum(I, 1e-10)**2)**2)
# --- Polarization angle (EVPA) and uncertainty ---
evpa = 0.5 * np.arctan2(U, Q) # radians
evpa_err = 0.5 * np.sqrt((U * Qerr)**2 + (Q * Uerr)**2) / np.maximum(Q**2 + U**2, 1e-10)
evpa = np.degrees(evpa)
evpa_err = np.degrees(evpa_err)
# --- Data quality propagation ---
dq = np.bitwise_or(np.bitwise_or(Idq, Qdq), Udq)
# --- Stack results ---
data_out = np.stack([P, p, evpa], axis=0)
err_out = np.stack([Perr, perr, evpa_err], axis=0)
dq_out = np.stack([dq, dq, dq], axis=0)
# --- Headers ---
pri_hdr = input_Image.pri_hdr
ext_hdr = input_Image.ext_hdr
err_hdr = input_Image.err_hdr
dq_hdr = input_Image.dq_hdr
ext_hdr.add_history(
"Derived polarization products: data=[P, p, EVPA]; "
"err=[Perr, perr, EVPA_err]; dq propagated from I,Q,U."
)
# --- Construct output Image ---
Image_out = Image(
data_out,
pri_hdr=pri_hdr,
ext_hdr=ext_hdr,
err=err_out,
dq=dq_out,
err_hdr=err_hdr,
dq_hdr=dq_hdr
)
return Image_out
[docs]
def compute_QphiUphi(image, x_center=None, y_center=None):
"""
Compute Q_phi and U_phi from Stokes Q and U, returning an Image with shape [6, n, m]:
[I, Q, U, V, Q_phi, U_phi].
Args:
image: Image
Input image whose `data` is shaped [4, n, m] as [I, Q, U, V]. If the extension
header contains 'STARLOCX' and 'STARLOCY', these are used as the stellar center.
x_center: float or None
Optional override for the x-coordinate of the center. Used only when the header
does not provide 'STARLOCX'. If both header and this argument are missing, the
image center ( (m-1)/2 ) is used.
y_center: float or None
Optional override for the y-coordinate of the center. Used only when the header
does not provide 'STARLOCY'. If both header and this argument are missing, the
image center ( (n-1)/2 ) is used.
Returns:
Image: A copy of the input image with data expanded to [6, n, m] as
[I, Q, U, V, Q_phi, U_phi]. The `err` array is expanded to match and the
uncertainties for Q_phi/U_phi are propagated from Q and U (assuming no covariance).
The `dq` planes are expanded to match; if I/Q/U/V have identical dq, that mask is
copied to both Q_phi and U_phi, otherwise Q_phi inherits Q’s dq and U_phi inherits
U’s dq.
"""
# copy of the input image
out = image.copy(copy_data=True)
data = out.data
I, Q, U, V = data
n, m = I.shape
ext_hdr = getattr(out, "ext_hdr", None)
# Determine center coordinates: header > input args > image center (print which is used)
if ext_hdr is not None and ("STARLOCX" in ext_hdr) and ("STARLOCY" in ext_hdr):
cx = float(ext_hdr["STARLOCX"])
cy = float(ext_hdr["STARLOCY"])
elif (x_center is not None) and (y_center is not None):
cx = float(x_center)
cy = float(y_center)
else:
cx = (m - 1) * 0.5
cy = (n - 1) * 0.5
# Polar angle φ and rotation (use float64 to reduce numerical errors)
y_idx, x_idx = np.mgrid[0:n, 0:m]
phi = np.arctan2(y_idx - cy, x_idx - cx)
c2 = np.cos(2.0 * phi, dtype=np.float64)
s2 = np.sin(2.0 * phi, dtype=np.float64)
Qf = Q.astype(np.float64, copy=False)
Uf = U.astype(np.float64, copy=False)
Q_phi = -Qf * c2 - Uf * s2
U_phi = Qf * s2 - Uf * c2
# Cast back to the same dtype as input data
Q_phi = Q_phi.astype(data.dtype, copy=False)
U_phi = U_phi.astype(data.dtype, copy=False)
# Expand data to [6,n,m]
out.data = np.concatenate([data, Q_phi[None, ...], U_phi[None, ...]], axis=0)
# Ensure err / dq match the data (create if missing, or add 2 new planes)
nplanes = out.data.shape[0]
# --- err propagation ---
if out.err is None:
out.err = np.zeros((nplanes, n, m), dtype=out.data.dtype)
else:
if out.err.shape[0] >= 3 and out.err.shape[1:] == (n, m):
sigma_Q = out.err[1].astype(np.float64)
sigma_U = out.err[2].astype(np.float64)
var_Qphi = (c2**2) * sigma_Q**2 + (s2**2) * sigma_U**2
var_Uphi = (s2**2) * sigma_Q**2 + (c2**2) * sigma_U**2
err_Qphi = np.sqrt(var_Qphi).astype(out.data.dtype)
err_Uphi = np.sqrt(var_Uphi).astype(out.data.dtype)
out.err = np.concatenate([out.err,
err_Qphi[None, ...],
err_Uphi[None, ...]], axis=0)
else:
out.err = np.zeros((nplanes, n, m), dtype=out.data.dtype)
# --- dq alignment & inheritance ---
if getattr(out, "dq", None) is None:
# dq is nothing -> create zeros with correct shape
out.dq = np.zeros((nplanes, n, m), dtype=np.uint16)
elif out.dq.ndim != 3 or out.dq.shape[1:] != (n, m):
# shape mismatch -> reset
out.dq = np.zeros((nplanes, n, m), dtype=np.uint16)
else:
# if we still have only I,Q,U,V planes, append two new planes
if out.dq.shape[0] == nplanes - 2 and out.dq.shape[0] >= 4:
dq_Q = out.dq[1].astype(np.uint16, copy=False)
dq_U = out.dq[2].astype(np.uint16, copy=False)
dq_or = (dq_Q | dq_U).astype(np.uint16)
out.dq = np.concatenate([out.dq, dq_or[None, ...], dq_or[None, ...]], axis=0)
# if already 6 planes, (re)compute Q_phi/U_phi dq for correctness
elif out.dq.shape[0] == nplanes:
out.dq[4] = (out.dq[1].astype(np.uint16) | out.dq[2].astype(np.uint16))
out.dq[5] = (out.dq[1].astype(np.uint16) | out.dq[2].astype(np.uint16))
else:
# anything else -> reset
out.dq = np.zeros((nplanes, n, m), dtype=np.uint16)
# Add HISTORY record
msg = f"Computed Q_phi/U_phi with center=({cx:.6f},{cy:.6f}); output data shape {out.data.shape}."
if ext_hdr is not None:
try:
ext_hdr['HISTORY'] = msg
except Exception:
pass
return out