import os
import re
import numpy as np
import numpy.ma as ma
import astropy.io.fits as fits
import warnings
from astropy.io.fits.card import VerifyWarning
import astropy.time as time
import pandas as pd
from astropy.table import Table
import pyklip
from pyklip.instruments.Instrument import Data as pyKLIP_Data
from pyklip.instruments.utils.wcsgen import generate_wcs
from scipy.interpolate import LinearNDInterpolator
from astropy import wcs
import copy
import corgidrp
from datetime import datetime, timedelta, timezone
[docs]
typical_bool_keywords = ['DESMEAR', 'CTI_CORR']
[docs]
typical_cal_invalid_keywords = [
# Primary header keywords
'VISITID', 'FILETIME', 'PROGNUM', 'EXECNUM', 'CAMPAIGN',
'SEGMENT', 'OBSNUM', 'VISNUM', 'CPGSFILE', 'AUXFILE',
'VISTYPE', 'TARGET', 'RA', 'DEC', 'RAPM', 'DECPM',
'OPGAIN', 'PHTCNT', 'FRAMET', 'PA_V3', 'PA_APER',
'SVB_1', 'SVB_2', 'SVB_3', 'ROLL', 'PITCH', 'YAW',
'FILENAME', 'OBSNAME', 'WBJ_1', 'WBJ_2', 'WBJ_3',
# Extension header keywords
'BUNIT', 'ISHOWFSC', 'ISACQ', 'SPBAL', 'ISFLAT', 'SATSPOTS',
'EXPTIME', 'EMGAIN_C', 'KGAINPAR', 'BLNKTIME', 'BLNKCYC',
'EXPCYC', 'OVEREXP', 'NOVEREXP', 'PROXET',
'FCMLOOP', 'FCMPOS', 'FSMINNER', 'FSMLOS', 'FSMPRFL', 'FSMRSTR',
'FSMSG1', 'FSMSG2', 'FSMSG3', 'FSMX', 'FSMY',
'EACQ_ROW', 'EACQ_COL', 'SB_FP_DX', 'SB_FP_DY', 'SB_FS_DX', 'SB_FS_DY',
'DMZLOOP', '1SVALID', 'Z2AVG', 'Z2RES', 'Z2VAR', 'Z3AVG', 'Z3RES', 'Z3VAR',
'10SVALID', 'Z4AVG', 'Z4RES', 'Z5AVG', 'Z5RES',
'Z6AVG', 'Z6RES', 'Z7AVG', 'Z7RES', 'Z8AVG', 'Z8RES',
'Z9AVG', 'Z9RES', 'Z10AVG', 'Z10RES', 'Z11AVG', 'Z11RES',
'Z12AVG', 'Z13AVG', 'Z14AVG',
'SPAM_H', 'SPAM_V', 'SPAMNAME', 'SPAMSP_H', 'SPAMSP_V',
'FPAM_H', 'FPAM_V', 'FPAMNAME', 'FPAMSP_H', 'FPAMSP_V',
'LSAM_H', 'LSAM_V', 'LSAMNAME', 'LSAMSP_H', 'LSAMSP_V',
'FSAM_H', 'FSAM_V', 'FSAMNAME', 'FSAMSP_H', 'FSAMSP_V',
'CFAM_H', 'CFAM_V', 'CFAMNAME', 'CFAMSP_H', 'CFAMSP_V',
'DPAM_H', 'DPAM_V', 'DPAMNAME', 'DPAMSP_H', 'DPAMSP_V',
'FTIMEUTC', 'DATATYPE', 'FWC_PP_E', 'FWC_EM_E', 'SAT_DN', 'DATETIME',
]
[docs]
class Dataset():
"""
A sequence of data of the same kind. Can be indexed and looped over
Args:
frames_or_filepaths (list): list of either filepaths or data objects (e.g., Image class)
Attributes:
all_data (np.array): an array with all the data combined together. First dimension is always number of images
frames (np.array): list of data objects (probably corgidrp.data.Image)
"""
def __init__(self, frames_or_filepaths, no_data=False, no_err=False, no_dq=False):
"""
Args:
frames_or_filepaths (list): list of either filepaths or data objects (e.g., Image class)
no_data (bool): If True, only the header information is loaded into the dataset for the frames' data. However, if the input is at the L1 level, the err and dq for
each frame will have the default loaded in (arrays of zeros). Defaults to False.
no_err (bool): If True, no err arrays are loaded in. This overrides the condition concerning err in the no_data description above. Defaults to False.
no_dq (bool): If True, no dq arrays are loaded in. This overrides the condition concerning dq in the no_data description above. Defaults to False.
"""
if len(frames_or_filepaths) == 0:
raise ValueError("Empty list passed in")
# default value
[docs]
self.data_loaded = True
if no_data:
self.data_loaded = False
if isinstance(frames_or_filepaths[0], str):
# list of filepaths
# TODO: do some auto detection of the filetype, but for now assume it is an image file
self.frames = []
for filepath in frames_or_filepaths:
fr = Image(filepath)
if no_data:
fr.data = None
if no_err:
fr.err = None
if no_dq:
fr.dq = None
self.frames.append(fr)
else:
# list of frames
self.frames = frames_or_filepaths
# if one of the input frames has data, set data_loaded to True
self.data_loaded = False
for frame in self.frames:
if frame.data is not None:
self.data_loaded = True
# turn lists into np.array for indexing behavior
if isinstance(self.frames, list):
self.frames = np.array(self.frames) # list of objects
# create 3-D cube of all the data
[docs]
self.all_data = np.array([frame.data for frame in self.frames])
[docs]
self.all_err = np.array([frame.err for frame in self.frames])
[docs]
self.all_dq = np.array([frame.dq for frame in self.frames])
# do a clever thing to point all the individual frames to the data in this cube
# this way editing a single frame will also edit the entire datacube
for i, frame in enumerate(self.frames):
frame.data = self.all_data[i]
frame.err = self.all_err[i]
frame.dq = self.all_dq[i]
[docs]
def __iter__(self):
return self.frames.__iter__()
[docs]
def __getitem__(self, indices):
if isinstance(indices, int):
# return a single element of the data
return self.frames[indices]
else:
# return a subset of the dataset
return Dataset(self.frames[indices])
[docs]
def __len__(self):
return len(self.frames)
[docs]
def save(self, filedir=None, filenames=None, ram_heavy_save=False):
"""
Save each file of data in this dataset into directory
Args:
filedir (str): directory to save the files. Default: the existing filedir for each file
filenames (list): a list of output filenames for each file. Default: unchanged filenames
ram_heavy_save (bool): If True, the input is assumed to have no data loaded into memory. (Only metadata was
manipulated in step leading up to save_data.) The data is loaded from the filepath frame by frame, and
each Image is saved to outputdir. Defaults to False.
"""
# if filenames are not passed, use the default ones
if filenames is None:
filenames = []
for frame in self.frames:
filename = frame.filename
filenames.append(frame.filename)
for filename, frame in zip(filenames, self.frames):
if ram_heavy_save:
temp_frame = Image(frame.filepath)
if frame.data is None:
frame.data = temp_frame.data
if frame.err is None:
frame.err = temp_frame.err
if frame.dq is None:
frame.dq = temp_frame.dq
for name in frame.hdu_names: # by construction hdus other than the usual err and dq
if len(frame.hdu_list) > 0:
if frame.hdu_list[name] is None:
frame.hdu_list[name].data = temp_frame.hdu_list[name].data
##redoing the change to the FILENAME keyword to cover our bases
frame.pri_hdr['FILENAME'] = frame.filename
frame.save(filename=filename, filedir=filedir)
if not ram_heavy_save:
# relink frames with all_data
self.all_data = np.array([frame.data for frame in self.frames])
self.all_err = np.array([frame.err for frame in self.frames])
self.all_dq = np.array([frame.dq for frame in self.frames])
for i, frame in enumerate(self.frames):
frame.data = self.all_data[i]
frame.err = self.all_err[i]
frame.dq = self.all_dq[i]
[docs]
def update_after_processing_step(self, history_entry, new_all_data=None, new_all_err = None, new_all_dq = None, header_entries = None,
update_err_header=True):
"""
Updates the dataset after going through a processing step
Args:
history_entry (str): a description of what processing was done. Mention reference files used.
new_all_data (np.array): (optional) Array of new data. Needs to be the same shape as `all_data`
new_all_err (np.array): (optional) Array of new err. Needs to be the same shape as `all_err` except of second dimension
new_all_dq (np.array): (optional) Array of new dq. Needs to be the same shape as `all_dq`
header_entries (dict): (optional) a dictionary {} of ext_hdr and err_hdr entries to add or update
update_err_header (bool): (optional) whether or not to add the new entry to the error header
"""
# update data if necessary
if new_all_data is not None:
if new_all_data.shape != self.all_data.shape:
raise ValueError("The shape of new_all_data is {0}, whereas we are expecting {1}".format(new_all_data.shape, self.all_data.shape))
self.all_data[:] = new_all_data # specific operation overwrites the existing data rather than changing pointers
if new_all_err is not None:
if new_all_err.shape[-2:] != self.all_err.shape[-2:] or new_all_err.shape[0] != self.all_err.shape[0]:
raise ValueError("The shape of new_all_err is {0}, whereas we are expecting {1}".format(new_all_err.shape, self.all_err.shape))
self.all_err = new_all_err
for i in range(len(self.frames)):
self.frames[i].err = self.all_err[i]
if new_all_dq is not None:
if new_all_dq.shape != self.all_dq.shape:
raise ValueError("The shape of new_all_dq is {0}, whereas we are expecting {1}".format(new_all_dq.shape, self.all_dq.shape))
self.all_dq[:] = new_all_dq # specific operation overwrites the existing data rather than changing pointers
# update history and header entries
for img in self.frames:
img.ext_hdr['HISTORY'] = history_entry
img.err_hdr['HISTORY'] = history_entry
if header_entries:
for key, value in header_entries.items():
img.ext_hdr[key] = value
if update_err_header:
img.err_hdr[key] = value
[docs]
def copy(self, copy_data=True):
"""
Make a copy of this dataset, including all data and headers.
Data copying can be turned off if you only want to modify the headers
Headers should always be copied as we should modify them any time we make new edits to the data
Args:
copy_data (bool): (optional) whether the data should be copied. Default is True
Returns:
corgidrp.data.Dataset: a copy of this dataset
"""
# there's a smarter way to manage memory, but to keep the API simple, we will avoid it for now
new_frames = [frame.copy(copy_data=copy_data) for frame in self.frames]
new_dataset = Dataset(new_frames)
return new_dataset
[docs]
def add_error_term(self, input_error, err_name):
"""
Calls Image.add_error_term() for each frame.
Updates Dataset.all_err.
Args:
input_error (np.array): per-frame or per-dataset error layer
err_name (str): name of the uncertainty layer
"""
if input_error.ndim == self.all_data.ndim:
for i,frame in enumerate(self.frames):
frame.add_error_term(input_error[i], err_name)
elif input_error.ndim == self.all_data.ndim - 1:
for frame in self.frames:
frame.add_error_term(input_error, err_name)
else:
raise ValueError("input_error is not either a 2D or 3D array for 2D data, or a 3D or 4D array for 3D data.")
# Preserve pointer links between Dataset.all_err and Image.err
self.all_err = np.array([frame.err for frame in self.frames])
for i, frame in enumerate(self.frames):
frame.err = self.all_err[i]
[docs]
def rescale_error(self, scale_factor, scale_name):
"""
Calls Image.rescale_errors() for each frame.
Updates Dataset.all_err
Args:
scale_factor (np.array or float): scale factor value or array
scale_name (str): name of the scaling reason
"""
if isinstance(scale_factor, float):
for frame in self.frames:
frame.rescale_error(scale_factor, scale_name)
elif scale_factor.ndim == 2:
for frame in self.frames:
frame.rescale_error(scale_factor, scale_name)
elif scale_factor.ndim == 3:
for i,frame in enumerate(self.frames):
frame.rescale_error(scale_factor[i], scale_name)
else:
raise ValueError("scale_factor is neither a float nor a 2D or 3D array.")
# Preserve pointer links between Dataset.all_err and Image.err
self.all_err = np.array([frame.err for frame in self.frames])
for i, frame in enumerate(self.frames):
frame.err = self.all_err[i]
[docs]
def split_dataset(self, prihdr_keywords=None, exthdr_keywords=None):
"""
Splits up this dataset into multiple smaller datasets that have the same set of header keywords
The code uses all keywords together to determine an unique group
Args:
prihdr_keywords (list of str): list of primary header keywords to split
exthdr_keywords (list of str): list of 1st extension header keywords to split on
Returns:
list of datasets: list of sub datasets
list of tuples: list of each set of unique header keywords. pri_hdr keywords occur before ext_hdr keywords
"""
if prihdr_keywords is None and exthdr_keywords is None:
raise ValueError("No prihdr or exthdr keywords passed in to split dataset")
col_names = []
col_vals = []
if prihdr_keywords is not None:
for key in prihdr_keywords:
dataset_vals = [frame.pri_hdr[key] for frame in self.frames]
col_names.append(key)
col_vals.append(dataset_vals)
if exthdr_keywords is not None:
for key in exthdr_keywords:
dataset_vals = [frame.ext_hdr[key] for frame in self.frames]
col_names.append(key)
col_vals.append(dataset_vals)
all_data = np.array(col_vals).T
# track all combinations
df = pd.DataFrame(data=all_data, columns=col_names)
grouped = df.groupby(col_names)
unique_vals = list(grouped.indices.keys()) # each unique set of values
split_datasets = []
for combo in grouped.indices:
dataset_indices = grouped.indices[combo]
sub_dataset = self[dataset_indices]
split_datasets.append(sub_dataset)
return split_datasets, unique_vals
[docs]
class Image():
"""
Base class for 2-D image data. Data can be created by passing in the data/header explicitly, or
by passing in a filepath to load a FITS file from disk
Args:
data_or_filepath (str or np.array): either the filepath to the FITS file to read in OR the 2D image data
pri_hdr (astropy.io.fits.Header): the primary header (required only if raw 2D data is passed in)
ext_hdr (astropy.io.fits.Header): the image extension header (required only if raw 2D data is passed in)
err (np.array): 2-D/3-D uncertainty data
dq (np.array): 2-D data quality, 0: good. Other values track different causes for bad pixels and other pixel-level effects in accordance with the DRP implementation document.x
err_hdr (astropy.io.fits.Header): the error extension header
dq_hdr (astropy.io.fits.Header): the data quality extension header
hdu_list (astropy.io.fits.HDUList): an astropy HDUList object that contains any other extension types.
Attributes:
data (np.array): 2-D data for this Image
err (np.array): 2-D uncertainty
dq (np.array): 2-D data quality
pri_hdr (astropy.io.fits.Header): primary header
ext_hdr (astropy.io.fits.Header): image extension header. Generally this header will be edited/added to
err_hdr (astropy.io.fits.Header): the error extension header
dq_hdr (astropy.io.fits.Header): the data quality extension header
hdu_list (astropy.io.fits.HDUList): an astropy HDUList object that contains any other extension types.
filename (str): the filename corresponding to this Image
filedir (str): the file directory on disk where this image is to be/already saved.
filepath (str): full path to the file on disk (if it exists)
"""
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None, err = None, dq = None, err_hdr = None, dq_hdr = None, input_hdulist = None):
if isinstance(data_or_filepath, str):
# a filepath is passed in
with fits.open(data_or_filepath, ignore_missing_simple=True) as hdulist:
#Pop out the primary header
self.pri_hdr = hdulist.pop(0).header
#Pop out the image extension
first_hdu = hdulist.pop(0)
self.ext_hdr = first_hdu.header
self.data = first_hdu.data
# Upcast float data to float64 for processing precision
if self.data is not None and np.issubdtype(self.data.dtype, np.floating):
self.data = self.data.astype(np.float64)
#A list of extensions
self.hdu_names = [hdu.name for hdu in hdulist]
# we assume that if the err and dq array is given as parameter they supersede eventual err and dq extensions
if err is not None:
if isinstance(err, float):
if np.size(self.data) != 1:
raise ValueError("err can only be a float if data is a float value")
self.err = np.array([err])
elif np.shape(self.data) != np.shape(err)[-self.data.ndim:]:
raise ValueError("The shape of err is {0} while we are expecting shape {1}".format(err.shape[-self.data.ndim:], self.data.shape))
#we want to have an extra dimension in the error array
elif err.ndim == self.data.ndim+1:
self.err = err
else:
self.err = err.reshape((1,)+err.shape)
elif "ERR" in self.hdu_names:
err_hdu = hdulist.pop("ERR")
self.err = err_hdu.data
self.err_hdr = err_hdu.header
# Upcast float err to float64 for processing precision
if self.err is not None and np.issubdtype(self.err.dtype, np.floating):
self.err = self.err.astype(np.float64)
if self.err.ndim != 1 and self.err.ndim == self.data.ndim:
self.err = self.err.reshape((1,)+self.err.shape)
else:
self.err = np.zeros((1,)+self.data.shape)
if dq is not None:
if np.shape(self.data) != np.shape(dq):
raise ValueError("The shape of dq is {0} while we are expecting shape {1}".format(dq.shape, self.data.shape))
self.dq = dq
elif "DQ" in self.hdu_names:
dq_hdu = hdulist.pop("DQ")
self.dq = dq_hdu.data
self.dq_hdr = dq_hdu.header
else:
self.dq = np.zeros(self.data.shape, dtype = int)
if input_hdulist is not None:
this_hdu_list = [hdu.copy() for hdu in input_hdulist]
else:
#After the data, err and dqs are popped out, the rest of the hdulist is stored in hdu_list
this_hdu_list = [hdu.copy() for hdu in hdulist]
# Upcast float data in extra HDUs to float64 for processing precision
for hdu in this_hdu_list:
if hdu.data is not None and np.issubdtype(hdu.data.dtype, np.floating):
hdu.data = hdu.data.astype(np.float64)
self.hdu_list = fits.HDUList(this_hdu_list)
# parse the filepath to store the filedir and filename
filepath_args = data_or_filepath.split(os.path.sep)
if len(filepath_args) == 1:
# no directory info in filepath, so current working directory
self.filedir = "."
self.filename = filepath_args[0]
else:
self.filename = filepath_args[-1]
self.filedir = os.path.sep.join(filepath_args[:-1])
else:
# data has been passed in directly
# creation of a new file in DRP eyes
if isinstance(data_or_filepath, float):
self.data = np.array([data_or_filepath])
if err is not None:
if isinstance(err, float):
self.err = np.array([err])
else:
raise ValueError("err value must be float")
else:
self.err = np.array([0.])
elif hasattr(data_or_filepath, "__len__"):
self.data = data_or_filepath
# Upcast float data to float64 for processing precision
if hasattr(self.data, 'dtype') and np.issubdtype(self.data.dtype, np.floating):
self.data = self.data.astype(np.float64)
if err is not None:
if np.shape(self.data) != np.shape(err)[-self.data.ndim:]:
raise ValueError("The shape of err is {0} while we are expecting shape {1}".format(err.shape[-self.data.ndim:], self.data.shape))
# Upcast float err to float64 for processing precision
if hasattr(err, 'dtype') and np.issubdtype(err.dtype, np.floating):
err = err.astype(np.float64)
#we want to have a 3 dim error array
if err.ndim == self.data.ndim + 1:
self.err = err
else:
self.err = err.reshape((1,)+err.shape)
else:
self.err = np.zeros((1,)+self.data.shape)
else:
raise ValueError("input must be an array or float")
if pri_hdr is None or ext_hdr is None:
raise ValueError("Missing primary and/or extension headers, because you passed in raw data")
self.pri_hdr = pri_hdr
self.ext_hdr = ext_hdr
self.filedir = "."
self.filename = ""
if dq is not None:
if np.shape(self.data) != np.shape(dq):
raise ValueError("The shape of dq is {0} while we are expecting shape {1}".format(dq.shape, self.data.shape))
self.dq = dq
else:
self.dq = np.zeros(self.data.shape, dtype = int)
#The default hdu extensions
self.hdu_names = ["ERR", "DQ"]
#Take the input hdulist or make a blank one.
if input_hdulist is not None:
this_hdu_list = [hdu.copy() for hdu in input_hdulist]
self.hdu_list = fits.HDUList(this_hdu_list)
#Keep track of the names
for hdu in input_hdulist:
self.hdu_names.append(hdu.name)
else:
self.hdu_list = fits.HDUList()
# record when this file was created and with which version of the pipeline
self.ext_hdr.set('DRPVERSN', corgidrp.__version__, "corgidrp version that produced this file")
self.ext_hdr.set('DRPCTIME', time.Time.now().isot, "When this file was saved")
# we assume that if the err_hdr and dq_hdr is given as parameter they supersede eventual existing err_hdr and dq_hdr
if err_hdr is not None:
self.err_hdr = err_hdr
if dq_hdr is not None:
self.dq_hdr = dq_hdr
if not hasattr(self, 'err_hdr'):
self.err_hdr = fits.Header()
self.err_hdr["EXTNAME"] = "ERR"
if not hasattr(self, 'dq_hdr'):
self.dq_hdr = fits.Header()
self.dq_hdr["EXTNAME"] = "DQ"
# discard individual errors if we aren't tracking them but multiple error terms are passed in
if self.err is not None and not corgidrp.track_individual_errors and self.err.shape[0] > 1:
num_errs = self.err.shape[0] - 1
# delete keywords specifying the error of each individual slice
for i in range(num_errs):
del self.err_hdr['Layer_{0}'.format(i + 2)]
self.err = self.err[:1] # only save the total err, preserve 3-D shape
self.err_hdr['TRK_ERRS'] = corgidrp.track_individual_errors # specify whether we are tracing errors
# the DRP needs to make sure certain keywords are set in its reduced products
# check those here, and if not, set them.
# by default, assume desmear and CTI correction are not applied by default
# and they can be toggled to true after their step functions are run
if not 'DESMEAR' in self.ext_hdr:
self.ext_hdr.set('DESMEAR', False, "Was desmear applied to this frame?")
if not 'CTI_CORR' in self.ext_hdr:
self.ext_hdr.set('CTI_CORR', False, "Was CTI correction applied to this frame?")
if not 'IS_BAD' in self.ext_hdr:
self.ext_hdr.set('IS_BAD', False, "Was this frame deemed bad?")
# PHTCNT can come in as an int from L1 data but the DRP expects it as a string
if 'PHTCNT' in self.pri_hdr and not isinstance(self.pri_hdr['PHTCNT'], str):
self.pri_hdr['PHTCNT'] = str(self.pri_hdr['PHTCNT'])
# the DRP has touched this file so it's origin is now this DRP
self.pri_hdr['ORIGIN'] = 'DRP'
# correct header type while preserving the value so that headers conform to standards in mock.py
adjusted_pri_hdr, adjusted_img_hdr = corgidrp.check.hdr_type_conform(self.pri_hdr, self.ext_hdr)
[docs]
self.pri_hdr = adjusted_pri_hdr
[docs]
self.ext_hdr = adjusted_img_hdr
# create this field dynamically
@property
[docs]
def filepath(self):
return os.path.join(self.filedir, self.filename)
[docs]
def save(self, filedir=None, filename=None):
"""
Save file to disk with user specified filepath
Args:
filedir (str): filedir to save to. Use self.filedir if not specified
filename (str): filepath to save to. Use self.filename if not specified
"""
if filename is not None:
self.filename = filename
if filedir is not None:
self.filedir = filedir
if len(self.filename) == 0:
raise ValueError("Output filename is not defined. Please specify!")
# recast header data to the appropriate bit depth as set by the pipeline settings
if self.data is not None:
self.data = self.data.astype(corgidrp.image_dtype, copy=False)
if self.err is not None:
self.err = self.err.astype(corgidrp.image_dtype, copy=False)
if self.dq is not None:
self.dq = self.dq.astype(corgidrp.dq_dtype, copy=False)
prihdu = fits.PrimaryHDU(header=self.pri_hdr)
exthdu = fits.ImageHDU(data=self.data, header=self.ext_hdr)
hdulist = fits.HDUList([prihdu, exthdu])
errhdu = fits.ImageHDU(data=self.err, header = self.err_hdr)
hdulist.append(errhdu)
dqhdu = fits.ImageHDU(data=self.dq, header = self.dq_hdr)
hdulist.append(dqhdu)
# Cast data in additional HDUs to the configured dtype before appending
for hdu in self.hdu_list:
if hdu.data is not None:
hdu.data = hdu.data.astype(corgidrp.image_dtype, copy=False)
hdulist.append(hdu)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=VerifyWarning) # fits save card length truncated warning
# in case some FITS-formatted headers like NAXIS1, NAXIS2, etc cards were put in the wrong place,
# this will position them in the right order and throw an error is something is unfixable
hdulist.verify('silentfix')
hdulist.writeto(self.filepath, overwrite=True)
hdulist.close()
def _record_parent_filenames(self, input_dataset):
"""
Record what input dataset was used to create this Image.
This assumes many Images were used to make this single Image.
Also tracks images that were used to make the images that make this image.
Record is stored in the ext header.
Args:
input_dataset (corgidrp.data.Dataset): the input dataset that were combined together to make this image
"""
parent_filenames = set()
# go through filenames and also check each frames parents
for img in input_dataset:
# Some synthetic products may not have a filename yet, prefer `filename`, fall back to `filepath'
if getattr(img, "filename", None):
parent_filenames.add(img.filename)
elif getattr(img, "filepath", None):
parent_filenames.add(img.filepath)
# also check if this frame has parent frames. keep trakc of them too
if 'DRPNFILE' in img.ext_hdr:
for j in range(img.ext_hdr['DRPNFILE']):
# Support FILE0- and FILE1- conventions, and deal with missing cards (for synthetic calibration products)
key0 = f"FILE{j}"
key1 = f"FILE{j+1}"
if key0 in img.ext_hdr:
parent_filenames.add(img.ext_hdr[key0])
elif key1 in img.ext_hdr:
parent_filenames.add(img.ext_hdr[key1])
for i, filename in enumerate(parent_filenames):
if len(str(i)) > 4:
self.ext_hdr.set('HIERARCH FILE{0}'.format(i), filename, "File #{0} filename used to create this frame".format(i))
else:
self.ext_hdr.set('FILE{0}'.format(i), filename, "File #{0} filename used to create this frame".format(i))
self.ext_hdr.set('DRPNFILE', len(parent_filenames), "# of files used to create this processed frame")
[docs]
def copy(self, copy_data=True):
"""
Make a copy of this image file. including data and headers.
Data copying can be turned off if you only want to modify the headers
Headers should always be copied as we should modify them any time we make new edits to the data
Args:
copy_data (bool): (optional) whether the data should be copied. Default is True
Returns:
corgidrp.data.Image: a copy of this Image
"""
if copy_data:
new_img = copy.deepcopy(self)
else:
new_img = copy.copy(self)
# copy the hdu_list and hdu_names list, but not their pointers
new_img.hdu_list = self.hdu_list.copy()
new_img.hdu_names = copy.copy(self.hdu_names)
# update DRP version tracking
new_img.ext_hdr['DRPVERSN'] = corgidrp.__version__
new_img.ext_hdr['DRPCTIME'] = time.Time.now().isot
return new_img
[docs]
def get_masked_data(self):
"""
Uses the dq array to generate a numpy masked array of the data
Returns:
numpy.ma.MaskedArray: the data masked
"""
mask = self.dq>0
return ma.masked_array(self.data, mask=mask)
[docs]
def add_error_term(self, input_error, err_name):
"""
Add a layer of a specific additive uncertainty on the 3- or 4-dim error array extension
and update the combined uncertainty in the first layer.
Update the error header and assign the error name.
Only tracks individual errors if the "track_individual_errors" setting is set to True
in the configuration file
Args:
input_error (np.array): error layer with same shape as data
err_name (str): name of the uncertainty layer
"""
ndim = self.data.ndim
if not (input_error.ndim==2 or input_error.ndim==3) or input_error.shape != self.data.shape:
raise ValueError("we expect a 2-dimensional or 3-dimensional error layer with dimensions {0}".format(self.data.shape))
#first layer is always the updated combined error
if ndim == 2:
self.err[0,:,:] = np.sqrt(self.err[0,:,:]**2 + input_error**2)
elif ndim == 3:
self.err[0,:,:,:] = np.sqrt(self.err[0,:,:,:]**2 + input_error**2)
self.err_hdr["Layer_1"] = "combined_error"
if corgidrp.track_individual_errors:
#append new error as layer on 3D or 4D cube
self.err=np.append(self.err, [input_error], axis=0)
layer = str(self.err.shape[0])
self.err_hdr["Layer_" + layer] = err_name
# record history since 2-D error map doesn't track individual terms
self.err_hdr['HISTORY'] = "Added error term: {0}".format(err_name)
[docs]
def rescale_error(self, scale_factor, scale_name):
"""
scale the 3-dim error array extension with a factor.
Update the error header with the history and reason of the scaling.
Args:
scale_factor (np.array or float): scale factor value or array
scale_name (str): name of the scaling reason
"""
if isinstance(scale_factor, float):
pass
elif scale_factor.ndim == 2 or scale_factor.shape == self.data.shape:
pass
else:
raise ValueError("we expect a 2-dimensional input array with dimensions {0} or a float value".format(self.data.shape))
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=RuntimeWarning) # catch any invalid value encountered in multiply
self.err = self.err*scale_factor
# record history since 2-D error map doesn't track individual terms
self.err_hdr['HISTORY'] = "Errors rescaled by: {0}".format(scale_name)
[docs]
def get_hash(self):
"""
Computes the hash of the data, err, and dq. Does not use the header information.
Returns:
str: the hash of the data, err, and dq
"""
if self.data is not None:
data_bytes = self.data.data.tobytes()
else:
data_bytes = b''
if self.err is not None:
err_bytes = self.err.data.tobytes()
else:
err_bytes = b''
if self.dq is not None:
dq_bytes = self.dq.data.tobytes()
else:
dq_bytes = b''
total_bytes = data_bytes + err_bytes + dq_bytes
return str(hash(total_bytes))
[docs]
def add_extension_hdu(self, name, data = None, header=None):
"""
Create a new hdu extension and append it to the hdu_list
Args:
name (str): The name of the new extension
data (array, optional): Some kind of data. Defaults to None.
header (astropy.io.fits.Header, optional): _description_. Defaults to None.
"""
new_hdu = fits.ImageHDU(data=data, header=header, name=name)
if name in self.hdu_names:
raise ValueError("Extension name already exists in HDU list")
else:
self.hdu_names.append(name)
self.hdu_list.append(new_hdu)
[docs]
class Dark(Image):
"""
Dark calibration frame for a given exposure time and EM gain.
Args:
data_or_filepath (str or np.array): either the filepath to the FITS file to read in OR the 2D image data
pri_hdr (astropy.io.fits.Header): the primary header (required only if raw 2D data is passed in)
ext_hdr (astropy.io.fits.Header): the image extension header (required only if raw 2D data is passed in)
input_dataset (corgidrp.data.Dataset): the Image files combined together to make this dark (required only if raw 2D data is passed in and if raw data filenames not already archived in ext_hdr)
err (np.array): the error array (required only if raw data is passed in)
err_hdr (astropy.io.fits.Header): the error header (required only if raw data is passed in)
dq (np.array): the DQ array (required only if raw data is passed in)
"""
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None, input_dataset=None, err = None, dq = None, err_hdr = None, dq_hdr = None):
# run the image class contructor
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err=err, dq=dq, err_hdr=err_hdr, dq_hdr=dq_hdr)
# if this is a new dark, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new dark
if ext_hdr is not None:
if input_dataset is None and 'DRPNFILE' not in ext_hdr.keys():
# error check. this is required in this case
raise ValueError("This appears to be a new dark. The dataset of input files needs to be passed in to the input_dataset keyword to record history of this dark.")
self.ext_hdr['DATATYPE'] = 'Dark' # corgidrp specific keyword for saving to disk
self.ext_hdr['BUNIT'] = 'detected electron'
# TO-DO: check PC_STAT and whether this will be in L2s
if 'PC_STAT' not in ext_hdr:
self.ext_hdr['PC_STAT'] = 'analog master dark'
# PC dark will have PCTHRESH in it, so make non-PC darks have that as well for consistency
if 'PCTHRESH' not in self.ext_hdr:
self.ext_hdr['PCTHRESH'] = -999. #keeping float format for consistency
# PC dark will have 1 more dimension than other darks due b/c the frames can be binned to make separate PC frames from subsets
# log all the data that went into making this calibration file
if 'DRPNFILE' not in ext_hdr.keys() and input_dataset is not None:
self._record_parent_filenames(input_dataset)
# PC master darks have NUM_FR, which may differ from DRPNFILE b/c of binning of frames (i.e., DRPNFILE would be # of bins * NUM_FR)
# Add in a null NUM_FR if not present for consistency with PC master darks
if 'NUM_FR' not in self.ext_hdr:
self.ext_hdr['NUM_FR'] = -999 #keeping int format for consistency
if "HISTORY" not in self.err_hdr:
self.err_hdr['HISTORY'] = ''
# PC frames (or others if non-standard recipe used) may have undergone frame selection, so add in null values if necessary so that all Dark products have same headers
if 'FRMSEL01' not in self.ext_hdr:
self.ext_hdr['FRMSEL01'] = -999. #keeping float format for consistency
if 'FRMSEL02' not in self.ext_hdr:
self.ext_hdr['FRMSEL02'] = False
if 'FRMSEL03' not in self.ext_hdr:
self.ext_hdr['FRMSEL03'] = -999. #keeping float format for consistency
if 'FRMSEL04' not in self.ext_hdr:
self.ext_hdr['FRMSEL04'] = -999. #keeping float format for consistency
if 'FRMSEL05' not in self.ext_hdr:
self.ext_hdr['FRMSEL05'] = -999. #keeping float format for consistency
if 'FRMSEL06' not in self.ext_hdr:
self.ext_hdr['FRMSEL06'] = -999. #keeping float format for consistency
if 'KGAINPAR' not in self.err_hdr:
if 'KGAINPAR' in self.ext_hdr:
self.err_hdr['KGAINPAR'] = self.ext_hdr['KGAINPAR']
else:
self.err_hdr['KGAINPAR'] = -999. #keeping float format for consistency
if 'KGAIN_ER' not in self.err_hdr:
if 'KGAIN_ER' in self.ext_hdr:
self.err_hdr['KGAIN_ER'] = self.ext_hdr['KGAIN_ER']
else:
self.err_hdr['KGAIN_ER'] = -999. #keeping float format for consistency
if 'RN' not in self.err_hdr:
if 'RN' in self.ext_hdr:
self.err_hdr['RN'] = self.ext_hdr['RN']
else:
self.err_hdr['RN'] = -999. #keeping float format for consistency
if 'RN_ERR' not in self.err_hdr:
if 'RN_ERR' in self.ext_hdr:
self.err_hdr['RN_ERR'] = self.ext_hdr['RN_ERR']
else:
self.err_hdr['RN_ERR'] = -999. #keeping float format for consistency
if 'LAYER_1' not in self.err_hdr:
self.err_hdr['LAYER_1'] = 'combined_error'
# add to history
self.ext_hdr['HISTORY'] = "Dark with exptime = {0} s and commanded EM gain = {1} created from {2} frames".format(self.ext_hdr['EXPTIME'], self.ext_hdr['EMGAIN_C'], self.ext_hdr['DRPNFILE'])
# give it a default filename using the last input file as the base
# strip off everything starting at .fits
if input_dataset is not None:
orig_input_filename = input_dataset[-1].filename.split(".fits")[0]
self.filename = "{0}_drk_cal.fits".format(orig_input_filename)
self.filename = re.sub('_l[0-9].', '', self.filename)
# dnm_cal fed directly into drk_cal when doing build_synthesized_dark, so this will delete that string if it's there:
self.filename = self.filename.replace("_dnm_cal", "")
else:
if self.filename == '':
self.filename = "drk_cal.fits" # we shouldn't normally be here, but we default to something just in case.
else:
self.filename = self.filename.replace("_dnm_cal", "_drk_cal")
self.pri_hdr['FILENAME'] = self.filename
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
if 'PC_STAT' not in self.ext_hdr:
self.ext_hdr['PC_STAT'] = 'analog master dark'
if 'IS_SYNTH' not in self.ext_hdr:
if self.ext_hdr['PC_STAT'] == 'photon-counted master dark':
self.ext_hdr['IS_SYNTH'] = 0 # not relevant in this case
else:
# defaults to analog synthetic, which will standard choice over analog traditional
# int flag for whether this dark is synthesized or not. 0 = no (analog traditional), 1 = yes (analog synthesized).
self.ext_hdr['IS_SYNTH'] = 1
if err_hdr is not None:
self.err_hdr['BUNIT'] = 'detected electron'
# double check that this is actually a dark file that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a Dark file.")
if self.ext_hdr['DATATYPE'] != 'Dark':
raise ValueError("File that was loaded was not a Dark file.")
[docs]
class FlatField(Image):
"""
Master flat generated from raster scan of uranus or Neptune.
Args:
data_or_filepath (str or np.array): either the filepath to the FITS file to read in OR the 2D image data
pri_hdr (astropy.io.fits.Header): the primary header (required only if raw 2D data is passed in)
ext_hdr (astropy.io.fits.Header): the image extension header (required only if raw 2D data is passed in)
input_dataset (corgidrp.data.Dataset): the Image files combined together to make this flat file (required only if raw 2D data is passed in)
"""
#def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None, input_dataset=None):
# run the image class contructor
#super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr)
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None, input_dataset=None):
if input_dataset is not None:
pri_hdr,ext_hdr,err_hdr,dq_hdr=corgidrp.check.merge_headers(
input_dataset, first_frame_keywords= ['MJDSRT','SCTSRT','PROGNUM', 'EXECNUM', 'CAMPAIGN', 'SEGMENT','OBSNUM','VISNUM','CPGSFILE','AUXFILE','VISTYPE',
'TARGET', 'RA','DEC', 'EQUINOX', 'RAPM','DECPM', 'PSFREF', 'OPGAIN','PHTCNT','OPMODE','EXPTIME','EMGAIN_C',
'UNITYG','EMGAINA1','EMGAINA2','EMGAINA3','EMGAINA4','EMGAINA5','EMGAIN_A','KGAINPAR','ISPC','SPAM_H','SPAM_V',
'SPAMNAME','SPAMSP_H','SPAMSP_V','FPAM_H','FPAM_V','FPAMNAME','FPAMSP_H','FPAMSP_V','LSAM_H','LSAM_V',
'LSAMNAME','LSAMSP_H','LSAMSP_V','FSAM_H','FSAM_V','FSAMNAME','FSAMSP_H','FSAMSP_V','CFAM_H','CFAM_V',
'CFAMNAME','CFAMSP_H','CFAMSP_V','DPAM_H','DPAM_V','DPAMNAME','DPAMSP_H','DPAMSP_V','PA_V3','WBJ_1','WBJ_2','WBJ_3','GAINTCAL',
'STATUS','BLNKTIME','BLNKCYC','EXPCYC','OVEREXP','NOVEREXP'],
last_frame_keywords = ['VISITID', 'MJDEND', 'SCTEND', 'NAXIS', 'NAXIS1', 'NAXIS2', 'NAXIS3', 'NAXIS4','EACQ_ROW', 'EACQ_COL','DATATYPE'],
invalid_keywords=['PA_APER','FRAMET','SVB_1','SVB_2','SVB_3','ROLL','PITCH','YAW','ISHOWFSC', 'ISACQ', 'SPBAL', 'SATSPOTS', 'HCVBIAS','EXCAMT','LOCAMT','CYCLES',
'LASTEXP','PROXET','FCMLOOP', 'FCMPOS','FSMINNER', 'FSMLOS', 'FSMPRFL', 'FSMRSTR',
'FSMSG1', 'FSMSG2', 'FSMSG3', 'FSMX', 'FSMY','SB_FP_DX', 'SB_FP_DY', 'SB_FS_DX', 'SB_FS_DY','DMZLOOP','1SVALID',
'Z2AVG','Z2RES','Z2VAR','Z3AVG','Z3RES','Z3VAR','10SVALID','Z4AVG','Z4RES','Z5AVG','Z5RES','Z6AVG','Z6RES',
'Z7AVG','Z7RES','Z8AVG','Z8RES','Z9AVG','Z9RES','Z10AVG','Z10RES','Z11AVG','Z11RES','Z12AVG','Z13AVG', 'Z14AVG'],
calculated_value_keywords = ['DATETIME','FTIMEUTC','FILETIME', 'NUM_FR', 'DRPCTIME', 'DRPNFILE', 'COMMENT', 'HISTORY', 'FILENAME', 'RECIPE']+
[f'FILE{i}' for i in range(100)]
)
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err_hdr=err_hdr, dq_hdr=dq_hdr)
else:
# run the image class contructor
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr)
# if this is a new master flat, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new masterflat
if ext_hdr is not None:
if input_dataset is None:
# error check. this is required in this case
raise ValueError("This appears to be a master flat. The dataset of input files needs to be passed in to the input_dataset keyword to record history of this flat")
self.ext_hdr['DATATYPE'] = 'FlatField' # corgidrp specific keyword for saving to disk
self.ext_hdr['BUNIT'] = '' # flat field is dimensionless
# log all the data that went into making this flat
self._record_parent_filenames(input_dataset)
# add to history
self.ext_hdr['HISTORY'] = "Flat with exptime = {0} s created from {1} frames".format(self.ext_hdr['EXPTIME'], self.ext_hdr['DRPNFILE'])
# give it a default filename using the last input file as the base
self.filename = re.sub('_l[0-9].', '_flt_cal', input_dataset[-1].filename)
self.pri_hdr['FILENAME'] = self.filename
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
# double check that this is actually a masterflat file that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a FlatField file.")
if self.ext_hdr['DATATYPE'] != 'FlatField':
raise ValueError("File that was loaded was not a FlatField file.")
[docs]
class SpectroscopyCentroidPSF(Image):
"""
Calibration product that stores fitted PSF centroid (x, y) positions
for a grid of simulated PSFs.
Args:
data_or_filepath (str or np.ndarray): 2D array of (x, y) centroid positions
with shape (N, 2), where N is the number of PSFs.
err (np.ndarray): 2D array of (x,y) errors of centroid positions with shape (N,2)
pri_hdr (fits.Header): Primary header.
ext_hdr (fits.Header): Extension header.
err_hdr (fits.Header): error extension header
input_dataset (Dataset): Dataset of raw PSF images used to generate this calibration.
"""
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None, err_hdr = None, err = None, input_dataset=None):
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err_hdr = err_hdr, err = err)
# if this is a new SpectroscopyCentroidPSF, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new SpectroscopyCentroidPSF
if ext_hdr is not None:
if input_dataset is None:
raise ValueError("Must pass `input_dataset` to create new SpectroscopyCentroidPSF.")
self.ext_hdr["EXTNAME"] = "CENTROIDS"
self.ext_hdr['DATATYPE'] = 'SpectroscopyCentroidPSF'
self.ext_hdr['DATALVL'] = 'CAL'
self._record_parent_filenames(input_dataset)
self.ext_hdr['HISTORY'] = "Stored PSF centroid calibration results."
# Generate default output filename
base = input_dataset[-1].filename.split(".fits")[0]
filename = f"{base}_scp_cal.fits"
self.filename = re.sub('_l[0-9].', '', filename)
self.pri_hdr['FILENAME'] = self.filename
if err is None:
self.err = np.zeros(self.data.shape)
self.err_hdr = fits.Header()
if 'DATATYPE' not in self.ext_hdr or self.ext_hdr['DATATYPE'] != 'SpectroscopyCentroidPSF':
raise ValueError("This file is not a valid SpectroscopyCentroidPSF.")
[docs]
self.xfit = self.data[:, 0]
[docs]
self.yfit = self.data[:, 1]
[docs]
self.xfit_err = self.err[0][:, 0]
[docs]
self.yfit_err = self.err[0][:, 1]
[docs]
class LineSpread(Image):
"""
Calibration product that stores a flux profile vs. wavelength of a narrowband observation and the fitted Gaussian parameters
Args:
data_or_filepath (str or np.ndarray): 1D wavelength array (nm)
1D flux profile
with shape (N, 2), where N is the length of the wavelength array.
pri_hdr (fits.Header): Primary header.
ext_hdr (fits.Header): Extension header.
gauss_par (np.ndarray): Gaussian fit parameters + corresponding errors: [amplitude, mean_wavelen, fwhm, amp_err, wave_err, fwhm_err]
input_dataset (Dataset): Dataset used to generate this calibration.
Attr:
wavlens (np.array): wavelengths in nm
flux_profile (np.array): normalized flux
gauss_par (np.array): Gaussian fit parameters: [amplitude, mean_wavelen, fwhm, amp_err, wave_err, fwhm_err]
amplitude (float): Gaussian amplitude
mean_wave (float): mean wavelength
fwhm (float): Gaussian FWHM
amp_err (float): fit error of the amplitude
wave_err (float): fit error of the mean wavelength
fwhm_err (float): fit error of the Gaussian fwhm
"""
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None, gauss_par=None, input_dataset=None):
if input_dataset is not None:
pri_hdr, ext_hdr, err_hdr, dq_hdr = corgidrp.check.merge_headers(
input_dataset,
any_true_keywords=['DESMEAR', 'CTI_CORR'],
invalid_keywords=[
#pri header
'OPGAIN', 'PHTCNT', 'FRAMET', 'PA_V3', 'PA_APER', 'SVB_1', 'SVB_2', 'SVB_3', 'ROLL',
'PITCH', 'YAW', 'WBJ_1', 'WBJ_2', 'WBJ_3',
#ext header
'FRMTYPE', 'ISHOWFSC', 'ISACQ', 'SPBAL', 'ISFLAT', 'SATSPOTS', 'STATUS',
'HVCBIAS', 'OPMODE', 'EMGAIN_C', 'BLNKTIME', 'BLNKCYC', 'EXPCYC', 'OVEREXP', 'NOVEREXP',
'PROXET', 'FCMLOOP', 'FCMPOS', 'FSMINNER', 'FSMLOS', 'FSMPRFL', 'FSMRSTR',
'FSMSG1', 'FSMSG2', 'FSMSG3', 'FSMX', 'FSMY',
'EACQ_ROW', 'EACQ_COL', 'SB_FP_DX', 'SB_FP_DY', 'SB_FS_DX', 'SB_FS_DY',
'DMZLOOP', 'ISVALID',
'Z2AVG', 'Z3AVG', 'Z4AVG', 'Z5AVG', 'Z6AVG', 'Z7AVG', 'Z8AVG', 'Z9AVG',
'Z10AVG', 'Z11AVG', 'Z12AVG', 'Z13AVG', 'Z14AVG',
'Z2RES', 'Z3RES', 'Z4RES', 'Z5RES', 'Z6RES', 'Z7RES', 'Z8RES', 'Z9RES',
'Z10RES', 'Z11RES', 'Z2VAR', 'Z3VAR',
'FWC_PP_E', 'FWC_EM_E', 'SAT_DN', 'DATETIME', 'FTIMEUTC'
]
)
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err_hdr=err_hdr, dq_hdr=dq_hdr)
else:
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr)
# if this is a new LineSpread, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new LineSpread
if ext_hdr is not None:
if input_dataset is None:
raise ValueError("Must pass `input_dataset` to create new LineSpread calibration.")
self.ext_hdr['DATATYPE'] = 'LineSpread'
self.ext_hdr['DATALVL'] = 'CAL'
self.ext_hdr['EXTNAME'] = 'FLUX_PROF'
self._record_parent_filenames(input_dataset)
self.ext_hdr['HISTORY'] = "Stored LineSpread fit results."
# Generate default output filename
# Strip level suffix (e.g., _l2b) before adding calibration suffix
base = input_dataset[-1].filename.split(".fits")[0]
self.filename = f"{base}_lsf_cal.fits"
self.filename = re.sub('_l[0-9].', '', self.filename)
if gauss_par is not None:
if not (gauss_par.ndim == 1 and len(gauss_par) == 6):
raise ValueError('The LineSpread calibration gauss_par array must have 6 entries')
else:
self.gauss_par = gauss_par
else:
raise ValueError('The LineSpread calibration must have also the Gaussian parameters')
self.gauss_hdr = fits.Header()
self.gauss_hdr["EXTNAME"] = "GAUSS_PAR"
else:
# a filepath is passed in
with fits.open(data_or_filepath) as hdulist:
#gauss par is in FITS extension
self.gauss_par = hdulist[2].data
if self.gauss_par is not None and np.issubdtype(self.gauss_par.dtype, np.floating):
self.gauss_par = self.gauss_par.astype(np.float64)
self.gauss_hdr = hdulist[2].header
if 'DATATYPE' not in self.ext_hdr or self.ext_hdr['DATATYPE'] != 'LineSpread':
raise ValueError("This file is not a valid LineSpread calibration.")
if self.data.shape[0] != 2:
raise ValueError('The LineSpread calibration array must have a shape of (2,N)')
#convenience attributes
[docs]
self.wavlens = self.data[0, :]
[docs]
self.flux_profile = self.data[1, :]
[docs]
self.amplitude = self.gauss_par[0]
[docs]
self.mean_wave = self.gauss_par[1]
[docs]
self.fwhm = self.gauss_par[2]
[docs]
self.amp_err = self.gauss_par[3]
[docs]
self.wave_err = self.gauss_par[4]
[docs]
self.fwhm_err = self.gauss_par[5]
[docs]
def save(self, filedir=None, filename=None):
"""
Save file to disk with user specified filepath
Args:
filedir (str): filedir to save to. Use self.filedir if not specified
filename (str): filepath to save to. Use self.filename if not specified
"""
if filename is not None:
self.filename = filename
if filedir is not None:
self.filedir = filedir
if len(self.filename) == 0:
raise ValueError("Output filename is not defined. Please specify!")
# Cast data to configured dtype before saving
if self.data is not None:
self.data = self.data.astype(corgidrp.image_dtype, copy=False)
# Cast gauss_par to configured dtype before saving
if self.gauss_par is not None:
self.gauss_par = self.gauss_par.astype(corgidrp.image_dtype, copy=False)
prihdu = fits.PrimaryHDU(header=self.pri_hdr)
exthdu = fits.ImageHDU(data=self.data, header=self.ext_hdr)
hdulist = fits.HDUList([prihdu, exthdu])
gauss_hdu = fits.ImageHDU(data=self.gauss_par, header = self.gauss_hdr)
hdulist.append(gauss_hdu)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=VerifyWarning) # fits save card length truncated warning
hdulist.writeto(self.filepath, overwrite=True)
hdulist.close()
[docs]
class DispersionModel(Image):
"""
Class for dispersion model parameter data structure
Args:
data_or_filepath (str or dict): either the filepath to the FITS file to read in OR the dictionary containing the dispersion data
pri_hdr (fits.Header): Primary header.
ext_hdr (fits.Header): Extension header.
Attributes:
data (dict): table containing the dispersion data
clocking_angle (float): Clocking angle of the dispersion axis, theta,
oriented in the direction of increasing wavelength, measured in degrees
counterclockwise from the positive x-axis on the EXCAM data array
(direction of increasing column index).
clocking_angle_uncertainty (float): Uncertainty of the dispersion axis
clocking angle in degrees.
pos_vs_wavlen_polycoeff (numpy.ndarray): Polynomial fit to the
source displacement on EXCAM along the dispersion axis as a function of
wavelength, relative to the source position at the band reference
wavelength (lambda_c = 730.0 nm for Band 3) in units of millimeters.
pos_vs_wavlen_cov (numpy.ndarray): Covariance matrix of the
polynomial coefficients
wavlen_vs_pos_polycoeff (numpy.ndarray): Polynomial fit to the
wavelength as a function of displacement along the dispersion axis on
EXCAM, relative to the source position at the Band 3 reference
wavelength (x_c at lambda_c = 730.0 nm) in units of nanometers.
wavlen_vs_pos_cov (numpy.ndarray): Covariance matrix of the
polynomial coefficients
params_key (list): key names of the parameters
"""
[docs]
params_key = ['clocking_angle', 'clocking_angle_uncertainty', 'pos_vs_wavlen_polycoeff', 'pos_vs_wavlen_cov', 'wavlen_vs_pos_polycoeff', 'wavlen_vs_pos_cov']
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None):
if isinstance(data_or_filepath, str):
# run the image class constructor
super().__init__(data_or_filepath)
# double check that this is actually a DispersionModel file that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a DispersionModel file.")
if self.ext_hdr['DATATYPE'] != 'DispersionModel':
raise ValueError("File that was loaded was not a DispersionModel file.")
else:
if not isinstance(data_or_filepath, dict):
raise ValueError("Input should either be a dictionary or a filepath string")
if pri_hdr == None:
pri_hdr = fits.Header()
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=VerifyWarning)
if ext_hdr == None:
ext_hdr = fits.Header()
ext_hdr['DRPCTIME'] = time.Time.now().isot
ext_hdr['DRPVERSN'] = corgidrp.__version__
self.pri_hdr = pri_hdr
self.ext_hdr = ext_hdr
self.ext_hdr['DATATYPE'] = 'DispersionModel' # corgidrp specific keyword for saving to disk
# add to history
self.ext_hdr['HISTORY'] = "DispersionModel file created"
#check that all parameters are available in the input dict
for key in self.params_key:
if key not in data_or_filepath:
raise ValueError("parameter {0} is missing in the data".format(key))
data_list = Table(rows = [data_or_filepath])
self.data = data_list
self.filedir = "."
# Use the file name of SpectroscopyCentroid
scp_filename = pri_hdr["FILENAME"]
self.filename = re.sub('scp', 'dpm', scp_filename)
self.pri_hdr['FILENAME'] = self.filename
# initialization data passed in
[docs]
self.clocking_angle = self.data["clocking_angle"][0]
[docs]
self.clocking_angle_uncertainty = self.data["clocking_angle_uncertainty"][0]
[docs]
self.pos_vs_wavlen_polycoeff = np.array(self.data["pos_vs_wavlen_polycoeff"][0])
[docs]
self.pos_vs_wavlen_cov = np.array(self.data["pos_vs_wavlen_cov"][0])
[docs]
self.wavlen_vs_pos_polycoeff = np.array(self.data["wavlen_vs_pos_polycoeff"][0])
[docs]
self.wavlen_vs_pos_cov = np.array(self.data["wavlen_vs_pos_cov"][0])
# Add err and dq attributes for walker compatibility (set to None since DispersionModel doesn't have these)
[docs]
def save(self, filedir=None, filename=None):
"""
Save file to disk with user specified filepath
Args:
filedir (str): filedir to save to. Use self.filedir if not specified
filename (str): filepath to save to. Use self.filename if not specified
"""
if filename is not None:
self.filename = filename
if filedir is not None:
self.filedir = filedir
if len(self.filename) == 0:
raise ValueError("Output filename is not defined. Please specify!")
prihdu = fits.PrimaryHDU(header=self.pri_hdr)
exthdu = fits.BinTableHDU(data=self.data, header=self.ext_hdr)
hdulist = fits.HDUList([prihdu, exthdu])
hdulist.writeto(self.filepath, overwrite=True)
hdulist.close()
[docs]
class SpecFilterOffset(Image):
"""
calibration class that contains a dictionary of the x/y offsets of the different used filters.
Args:
data_or_filepath (str or dict): either the filepath to the FITS file to read in OR the dictionary containing
the filter offsets in pixel units as a paired list [&x, &y],
see SpecFilterOffset.default_offsets as an example
date_valid (astropy.time.Time): date after which these offsets are valid
Attributes:
default_offsets (dict): dictionary containing the default filter offsets in pixel units [&x, &y]
data (astropy.Table): table containing the new filter offsets
offsets(dict): dictionary of updated filter offset positions
"""
#default x and y offsets of each filter in a dictionary
[docs]
default_offsets = {
"1" : [0, 0],
"2" : [0, 0],
"3" : [0.725909, -0.09398],
"4" : [0, 0],
"1A": [0, 0],
"1B": [0, 0],
"1C": [0, 0],
"2A": [0, 0],
"2B": [0, 0],
"2C": [0, 0],
"3A": [0.202398, -0.417079],
"3B": [-0.151775, -0.271393],
"3C": [1.115033, 0.086057],
"3G": [0, 0],
"3D": [-0.115639, -0.151316],
"3E": [0.05028, -0.54714],
"4A": [0, 0],
"4B": [0, 0],
"4C": [0, 0]
}
def __init__(self, data_or_filepath, date_valid = None):
if isinstance(data_or_filepath, str):
# run the image class contructor
super().__init__(data_or_filepath)
# double check that this is actually a DispersionModel file that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a SpecFilterOffset file.")
if self.ext_hdr['DATATYPE'] != 'SpecFilterOffset':
raise ValueError("File that was loaded was not a SpecFilterOffset file.")
names = self.data.names
self.offsets = {}
for i, key in enumerate(names):
self.offsets[key] = [self.data[0][i], self.data[1][i]]
else:
if not isinstance(data_or_filepath, dict):
raise ValueError("Input should either be a dictionary or a filepath string")
if date_valid is None:
date_valid = time.Time.now()
pri_hdr = fits.Header()
ext_hdr = fits.Header()
ext_hdr['SCTSRT'] = date_valid.isot # use this for validity date
ext_hdr['DRPVERSN'] = corgidrp.__version__
ext_hdr['DRPCTIME'] = time.Time.now().isot
# fill caldb required keywords with dummy data
pri_hdr["OBSNUM"] = 000
ext_hdr["EXPTIME"] = 1.
ext_hdr['OPMODE'] = ""
ext_hdr['EMGAIN_C'] = 1.0
ext_hdr['EXCAMT'] = 40.0
# Enforce data level = CAL
ext_hdr['DATALVL'] = 'CAL'
ext_hdr['DATATYPE'] = 'SpecFilterOffset' # corgidrp specific keyword for saving to disk
# add to history
ext_hdr['HISTORY'] = "SpecFilterOffset file created"
#check that all parameters are available in the input dict
#if not replace the corresponding keys in the default offsets with the new positions
self.offsets = {}
input_dict = {}
for input_key, value in data_or_filepath.items():
input_dict[input_key.upper()] = value
for key in self.default_offsets:
if key not in input_dict:
self.offsets[key] = self.default_offsets.get(key)
else:
if len(input_dict[key]) != 2:
raise ValueError("the offset positions should be a a list of paired x and y offset values")
else:
self.offsets[key] = input_dict.get(key)
self.data = Table(self.offsets)
self.filedir = "."
filename = "SpecFilterOffset_{0}.fits".format(ext_hdr['SCTSRT']).replace(':','.')
self.filename = filename
pri_hdr['FILENAME'] = self.filename
self.pri_hdr = pri_hdr
self.ext_hdr = ext_hdr
[docs]
def get_offsets(self, filter):
return self.offsets.get(filter.upper())
[docs]
def save(self, filedir=None, filename=None):
"""
Save file to disk with user specified filepath
Args:
filedir (str): filedir to save to. Use self.filedir if not specified
filename (str): filepath to save to. Use self.filename if not specified
"""
if filename is not None:
self.filename = filename
if filedir is not None:
self.filedir = filedir
if len(self.filename) == 0:
raise ValueError("Output filename is not defined. Please specify!")
prihdu = fits.PrimaryHDU(header=self.pri_hdr)
exthdu = fits.BinTableHDU(data=self.data, header=self.ext_hdr)
hdulist = fits.HDUList([prihdu, exthdu])
hdulist.writeto(self.filepath, overwrite=True)
hdulist.close()
[docs]
class NonLinearityCalibration(Image):
"""
Class for non-linearity calibration files. Although it's not strictly an image that you might look at, it is a 2D array of data
The required format for calibration data is as follows:
- Minimum 2x2
- First value (top left) must be assigned to nan
- Row headers (dn counts) must be monotonically increasing
- Column headers (EM gains) must be monotonically increasing
- Data columns (relative gain curves) must straddle 1
- The first row will provide the the Gain axis values (accesssed via
gain_ax = non_lin_correction.data[0, 1:])
- The first column will provide the "count" axis value (accessed via
count_ax = non_lin_correction.data[1:, 0])
- The rest of the array will be the calibration data (accessed via
relgains = non_lin_correction.data[1:, 1:])
For example:
[
[nan, 1, 10, 100, 1000 ], <- gain axis
[1, 0.900, 0.950, 0.989, 1.000],
[1000, 0.910, 0.960, 0.990, 1.010],
[2000, 0.950, 1.000, 1.010, 1.050],
[3000, 1.000, 1.001, 1.011, 1.060],
^
count axis
],
where the row headers [1, 1000, 2000, 3000] are dn counts, the column
headers [1, 10, 100, 1000] are EM gains, and the first data column
[0.900, 0.910, 0.950, 1.000] is the first of the four relative gain curves.
Args:
data_or_filepath (str or np.array): either the filepath to the FITS file
to read in OR the 2D calibration data. See above for the required format.
pri_hdr (astropy.io.fits.Header): the primary header (required only if
raw 2D data is passed in)
ext_hdr (astropy.io.fits.Header): the image extension header (required
only if raw 2D data is passed in)
input_dataset (corgidrp.data.Dataset): the Image files combined
together to make this NonLinearityCalibration file (required only if
raw 2D data is passed in)
"""
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None,
input_dataset=None):
# run the image class contructor
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr)
# File format checks - Ported from II&T
nonlin_raw = self.data
if nonlin_raw.ndim < 2 or nonlin_raw.shape[0] < 2 or \
nonlin_raw.shape[1] < 2:
raise ValueError('The non-linearity calibration array must be at'
'least 2x2 (room for x and y axes and one data'
'point)')
if not np.isnan(nonlin_raw[0, 0]):
raise ValueError('The first value of the non-linearity calibration '
'array (upper left) must be set to "nan"')
# additional bookkeeping for a calibration file
# if this is a new calibration file, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if
# it is a new NonLinearityCalibration file
if ext_hdr is not None:
if input_dataset is None:
# error check. this is required in this case
raise ValueError("This appears to be a new Non Linearity "
"Correction. The dataset of input files needs"
"to be passed in to the input_dataset keyword"
"to record history of this calibration file.")
# corgidrp specific keyword for saving to disk
self.ext_hdr['DATATYPE'] = 'NonLinearityCalibration'
# log all the data that went into making this calibration file
self._record_parent_filenames(input_dataset)
# add to history
self.ext_hdr['HISTORY'] = "Non Linearity Calibration file created"
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
# Follow filename convention as of R3.0.2
self.filedir = '.'
self.filename = re.sub('_l[0-9].', '_nln_cal', input_dataset[-1].filename)
self.pri_hdr['FILENAME'] = self.filename
# double check that this is actually a NonLinearityCalibration file that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a NonLinearityCalibration file.")
if self.ext_hdr['DATATYPE'] != 'NonLinearityCalibration':
raise ValueError("File that was loaded was not a NonLinearityCalibration file.")
self.dq_hdr['COMMENT'] = 'DQ not meaningful for this calibration; just present for class consistency'
# headers deleted from initial L1 level
leave_out_ext = ['BSCALE', 'BZERO', 'LOCAMT', 'CYCLES', 'LASTEXP']
for key in leave_out_ext:
if key in self.ext_hdr:
del self.ext_hdr[key]
[docs]
class KGain(Image):
"""
Class for KGain calibration file. Until further insights it is just one float value.
Args:
data_or_filepath (str or float): either the filepath to the FITS file to read in OR the calibration data. See above for the required format.
err (float): uncertainty value of kgain factor
ptc (np.array): 2 column array with the photon transfer curve
pri_hdr (astropy.io.fits.Header): the primary header (required only if raw data is passed in)
ext_hdr (astropy.io.fits.Header): the image extension header (required only if raw data is passed in)
err_hdr (astropy.io.fits.Header): the err extension header (required only if raw data is passed in)
ptc_hdr (astropy.io.fits.Header): the ptc extension header (required only if raw data is passed in)
input_dataset (corgidrp.data.Dataset): the Image files combined together to make this KGain file (required only if raw 2D data is passed in)
Attrs:
value: the getter of the kgain value
_kgain (float): the value of kgain
error: the getter of the kgain error value
_kgain_error (float): the value of kgain error
"""
def __init__(self, data_or_filepath, err = None, ptc = None, pri_hdr=None, ext_hdr=None, err_hdr = None, ptc_hdr = None, input_dataset = None):
# run the image class contructor
super().__init__(data_or_filepath, err=err, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err_hdr=err_hdr)
# initialize these headers that have been recently added so that older calib files still contain this keyword when initialized and allow for tests that don't require
# these values to run smoothly
if 'RN' not in self.ext_hdr:
self.ext_hdr['RN'] = -999.
if 'RN_ERR' not in self.ext_hdr:
self.ext_hdr['RN_ERR'] = -999.
# File format checks
if self.data.shape != (1,):
raise ValueError('The KGain calibration data should be just one float value')
self._kgain = self.data[0]
self._kgain_error = self.err[0]
if isinstance(data_or_filepath, str):
# a filepath is passed in
with fits.open(data_or_filepath) as hdulist:
self.ptc_hdr = hdulist[3].header
# ptc data is in FITS extension
self.ptc = hdulist[3].data
if self.ptc is not None and np.issubdtype(self.ptc.dtype, np.floating):
self.ptc = self.ptc.astype(np.float64)
else:
if ptc is not None:
self.ptc = ptc
else:
self.ptc = np.zeros([2,0])
if ptc_hdr is not None:
self.ptc_hdr = ptc_hdr
else:
self.ptc_hdr = fits.Header()
self.ptc_hdr["EXTNAME"] = "PTC"
# additional bookkeeping for a calibration file
# if this is a new calibration file, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new KGain file
if ext_hdr is not None:
if input_dataset is None:
if 'DRPNFILE' not in ext_hdr:
# error check. this is required in this case
raise ValueError("This appears to be a new kgain. The dataset of input files needs to be passed in to the input_dataset keyword to record history of this kgain.")
else:
pass
else:
# log all the data that went into making this calibration file
self._record_parent_filenames(input_dataset)
# give it a default filename using the last input file as the base
self.filename = re.sub('_l[0-9].', '_krn_cal', input_dataset[-1].filename)
self.pri_hdr['FILENAME'] = self.filename
self.ext_hdr['DATATYPE'] = 'KGain' # corgidrp specific keyword for saving to disk
self.ext_hdr['BUNIT'] = 'detected EM electron/DN'
# add to history
self.ext_hdr['HISTORY'] = "KGain Calibration file created"
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
if err_hdr is not None:
self.err_hdr['BUNIT'] = 'detected EM electron/DN'
# double check that this is actually a KGain file that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a KGain Calibration file.")
if self.ext_hdr['DATATYPE'] != 'KGain':
raise ValueError("File that was loaded was not a KGain Calibration file.")
# headers deleted from initial L1 level
leave_out_ext = ['BSCALE', 'BZERO', 'LOCAMT', 'CYCLES', 'LASTEXP']
for key in leave_out_ext:
if key in self.ext_hdr:
del self.ext_hdr[key]
@property
[docs]
def value(self):
return self._kgain
@property
[docs]
def error(self):
return self._kgain_error
[docs]
def save(self, filedir=None, filename=None):
"""
Save file to disk with user specified filepath
Args:
filedir (str): filedir to save to. Use self.filedir if not specified
filename (str): filepath to save to. Use self.filename if not specified
"""
if filename is not None:
self.filename = filename
if filedir is not None:
self.filedir = filedir
if len(self.filename) == 0:
raise ValueError("Output filename is not defined. Please specify!")
# use the appropriate bit depth as set by the config file
if self.data is not None:
self.data = self.data.astype(corgidrp.image_dtype, copy=False)
if self.err is not None:
self.err = self.err.astype(corgidrp.image_dtype, copy=False)
if self.ptc is not None:
self.ptc = self.ptc.astype(corgidrp.image_dtype, copy=False)
prihdu = fits.PrimaryHDU(header=self.pri_hdr)
exthdu = fits.ImageHDU(data=self.data, header=self.ext_hdr)
hdulist = fits.HDUList([prihdu, exthdu])
errhdu = fits.ImageHDU(data=self.err, header = self.err_hdr)
hdulist.append(errhdu)
ptchdu = fits.ImageHDU(data=self.ptc, header = self.ptc_hdr)
hdulist.append(ptchdu)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=VerifyWarning) # fits save card length truncated warning
hdulist.writeto(self.filepath, overwrite=True)
hdulist.close()
[docs]
class BadPixelMap(Image):
"""
Class for bad pixel map. The bad pixel map indicates which pixels are hot
pixels and thus unreliable. Note: These bad pixels are bad due to inherent
nonidealities in the detector (applicable to any frame taken) and are
separate from pixels marked per frame as contaminated by cosmic rays.
Args:
data_or_filepath (str or np.array): either the filepath to the FITS file to read in OR the 2D image data
pri_hdr (astropy.io.fits.Header): the primary header (required only if raw 2D data is passed in)
ext_hdr (astropy.io.fits.Header): the image extension header (required only if raw 2D data is passed in)
input_dataset (corgidrp.data.Dataset): the Image files combined together to make this bad pixel map (required only if raw 2D data is passed in)
"""
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None, input_dataset=None):
if input_dataset is not None:
# Use the dark frame for the base header set
dark_frames = [
f for f in input_dataset
if getattr(f, "ext_hdr", {}).get("DATATYPE") in ("Dark", "MasterDark")
or "_drk" in str(getattr(f, "filename", "")).lower()
]
if not dark_frames:
raise ValueError(
"BadPixelMap input_dataset must contain at least one dark(-like) frame "
"(DATATYPE='Dark'/'MasterDark' or filename containing '_drk'). "
)
base_dataset = Dataset(dark_frames)
pri_hdr, ext_hdr, err_hdr, dq_hdr = corgidrp.check.merge_headers(
base_dataset,
any_true_keywords=typical_bool_keywords,
invalid_keywords=typical_cal_invalid_keywords
)
## TODO: we shouldn't need to do this manually, and should be done in merge header
# but haven't figured out the bug, so we're hard coding it
if "DESMEAR" not in err_hdr:
err_hdr["DESMEAR"] = bool(ext_hdr.get("DESMEAR", False))
if 'DESMEAR' in err_hdr:
if type(err_hdr['DESMEAR']) == int:
err_hdr['DESMEAR'] = bool(err_hdr['DESMEAR'])
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err_hdr=err_hdr, dq_hdr=dq_hdr)
else:
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr)
# if this is a new bad pixel map, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new bad pixel map
if ext_hdr is not None:
if input_dataset is None and 'DRPNFILE' not in ext_hdr.keys():
# error check. this is required in this case
raise ValueError("This appears to be a new bad pixel map. The dataset of input files needs to be passed in to the input_dataset keyword to record history of this bad pixel map.")
self.ext_hdr['DATATYPE'] = 'BadPixelMap'
# log all the data that went into making this bad pixel map
self._record_parent_filenames(input_dataset)
# add to history
self.ext_hdr['HISTORY'] = "Bad Pixel map created"
# check whether we're making the bpmap from a flat only, or from L1/2 files.
if "_flt_cal" in input_dataset[-1].filename:
self.filename = input_dataset[-1].filename.replace("_flt_cal", "_bpm_cal")
else:
self.filename = re.sub('_l[0-9].', '_bpm_cal', input_dataset[-1].filename)
self.pri_hdr['FILENAME'] = self.filename
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
# double check that this is actually a bad pixel map that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a BadPixelMap file.")
if self.ext_hdr['DATATYPE'] != 'BadPixelMap':
raise ValueError("File that was loaded was not a BadPixelMap file.")
self.dq_hdr['COMMENT'] = 'DQ not meaningful for this calibration; just present for class consistency'
self.err_hdr['COMMENT'] = 'err not meaningful for this calibration; just present for class consistency'
[docs]
class DetectorNoiseMaps(Image):
"""
Class for DetectorNoiseMaps calibration file. The data is a 3-D stack of 3 frames, each of which is a full SCI frame of fitted
values for a given noise type at a given temperature. The 4th calibration product is bias offset, which is stored in the header.
The 3 frames in the stack are in this order:
index 0 for the fixed-pattern noise (FPN) map,
index 1 for the clock-induced charge (CIC) map,
index 2 for the dark current (DC) map.
The input err should be a 4-D stack with first dimension of 1 and the next 3 corresponding to a 3-D stack with this order above.
The input dq should be a 3-D stack corresponding to the order above.
Args:
data_or_filepath (str or np.array): either the filepath to the FITS file to read in OR the 3-D calibration data. See above for the required format.
pri_hdr (astropy.io.fits.Header): the primary header (required only if data is passed in for data_or_filepath)
ext_hdr (astropy.io.fits.Header): the image extension header (required only if data is passed in for data_or_filepath)
input_dataset (corgidrp.data.Dataset): the input data combined together to make the noise maps (required only if data is passed in for data_or_filepath and if the filenames for the raw data used to make the calibration data are not already archived in ext_hdr)
err (np.array): the error 3-D array (required only if data is passed in for data_or_filepath)
dq (np.array): the 3-D DQ array (required only if data is passed in for data_or_filepath)
err_hdr (astropy.io.fits.Header): the error header (required only if data is passed in for data_or_filepath)
"""
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None, input_dataset=None, err = None, dq = None, err_hdr = None, dq_hdr=None):
# run the image class contructor
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err=err, dq=dq, err_hdr=err_hdr, dq_hdr=dq_hdr)
# File format checks
if self.data.ndim != 3 or self.data.shape[0] != 3:
raise ValueError('The DetectorNoiseMaps calibration data should be a 3-D array with the first dimension size equal to 3.')
if self.err.ndim != 4 or self.err.shape[0] != 1: # conforms to usual format the Image class expects, with 1 as the first element of the shape for err
raise ValueError('The DetectorNoiseMaps err data should be a 4-D array with the first dimension size equal to 4.')
if self.dq.ndim != 3 or self.dq.shape[0] != 3:
raise ValueError('The DetectorNoiseMaps dq data should be a 3-D array with the first dimension size equal to 3.')
# required inputs, whether or not ext_hdr is None
if "B_O" not in self.ext_hdr.keys() or "B_O_ERR" not in self.ext_hdr.keys():
raise ValueError('Calibrated bias offset and its error should be present in header.')
# additional bookkeeping for a calibration file
# if this is a new calibration file, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new calibration file
if ext_hdr is not None:
if input_dataset is None and 'DRPNFILE' not in ext_hdr.keys():
# error check. this is required in this case
raise ValueError("This appears to be a new DetectorNoiseMaps instance. The dataset of input files needs to be passed in to the input_dataset keyword to record the history of the files that made the calibration products.")
self.ext_hdr['DATATYPE'] = 'DetectorNoiseMaps' # corgidrp specific keyword for saving to disk
self.ext_hdr['BUNIT'] = 'detected electron'
# bias offset
self.ext_hdr['B_O_UNIT'] = 'DN' # err unit is also in DN
# log all the data that went into making this calibration file
if 'DRPNFILE' not in ext_hdr.keys():
self._record_parent_filenames(input_dataset)
# add to history
self.ext_hdr['HISTORY'] = "DetectorNoiseMaps calibration file created"
# give it a default filename
if input_dataset is not None:
orig_input_filename = input_dataset.frames[-1].filename.split(".fits")[0]
else:
#running the calibration code gets the name right (based on last filename in input dataset); this is a standby
orig_input_filename = self.ext_hdr['FILE0'].split(".fits")[0]
self.filename = "{0}_dnm_cal.fits".format(orig_input_filename)
self.filename = re.sub('_l[0-9].', '', self.filename)
self.pri_hdr['FILENAME'] = self.filename
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
if err_hdr is not None:
self.err_hdr['BUNIT'] = 'detected electron'
# double check that this is actually a DetectorNoiseMaps file that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a DetectorNoiseMaps Calibration file.")
if self.ext_hdr['DATATYPE'] != 'DetectorNoiseMaps':
raise ValueError("File that was loaded was not a DetectorNoiseMaps Calibration file.")
#convenient attributes
[docs]
self.bias_offset = self.ext_hdr["B_O"]
[docs]
self.bias_offset_err = self.ext_hdr["B_O_ERR"]
[docs]
self.FPN_map = self.data[0]
[docs]
self.CIC_map = self.data[1]
[docs]
self.DC_map = self.data[2]
[docs]
self.FPN_err = self.err[0][0]
[docs]
self.CIC_err = self.err[0][1]
[docs]
self.DC_err = self.err[0][2]
if 'FPN_IMM' not in self.ext_hdr.keys():
fpn_imm = -999. #can't store NaN in FITS header, so do a number that's obviously wrong
self.ext_hdr['FPN_IMM'] = fpn_imm
if 'CIC_IMM' not in self.ext_hdr.keys():
cic_imm = -999. #can't store NaN in FITS header, so do a number that's obviously wrong
self.ext_hdr['CIC_IMM'] = cic_imm
if 'DC_IMM' not in self.ext_hdr.keys():
dc_imm = -999. #can't store NaN in FITS header, so do a number that's obviously wrong
self.ext_hdr['DC_IMM'] = dc_imm
if 'FPN_IMME' not in self.ext_hdr.keys():
fpn_imme = -999. #can't store NaN in FITS header, so do a number that's obviously wrong
self.ext_hdr['FPN_IMME'] = fpn_imme
[docs]
class DetectorParams(Image):
"""
Class containing detector parameters that may change over time.
To create a new instance of DetectorParams, you only need to pass in the values you would like to change from default values:
new_valid_date = astropy.time.Time("2027-01-01")
new_det_params = DetectorParams({'gmax' : 7500.0 }, date_valid=new_valid_date).
Args:
data_or_filepath (dict or str): either a filepath string corresponding to an
existing DetectorParams file saved to disk or a
dictionary of parameters to modify from default values
date_valid (astropy.time.Time): date after which these parameters are valid
Attributes:
params (dict): the values for various detector parameters specified here
default_values (dict): default values for detector parameters (fallback values)
back_compat_mapping (dict): values to make test FITS files comply with new header standard
"""
# default detector params
[docs]
default_values = {
'KGAINPAR' : 8.7,
'FWC_PP_E' : 90000.,
'FWC_EM_E' : 100000.,
'ROWREADT' : 223.5e-6, # seconds
'NEMGAIN': 604, # number of EM gain register stages
'TELRSTRT': -1, # slice of rows that are used for telemetry
'TELREND': None, #goes to the end, in other words
'CRHITRT': 5.0e+04, # cosmic ray hit rate (hits/m**2/sec)
'PIXAREA': 1.69e-10, # pixel area (m**2/pixel)
'GAINMAX': 8000.0, # Maximum allowable EM gain
'DELCNST': 1.0e-4, # tolerance in exposure time calculator
'OVERHEAD': 3, # Overhead time, in seconds, for each collected frame. Used to compute total wall-clock time for data collection
'PCECNTMX': 0.1, # Maximum allowed electrons/pixel/frame for photon counting
'TFACTOR': 5, # number of read noise standard deviations at which to set the photon-counting threshold
}
[docs]
back_compat_mapping = {
"KGAINPAR" : "KGAIN",
"FWC_PP_E" : "FWC_PP",
'FWC_EM_E' : 'FWC_EM',
'ROWREADT' : "rowreadtime",
"NEMGAIN" : "NEM",
"TELRSTRT" : "telem_rows_start",
"TELREND" : "telem_rows_end",
"CRHITRT" : "X",
"PIXAREA" : "A",
"GAINMAX" : "GMAX",
"DELCNST" : "delta_constr",
"PCECNTMX" : "pc_ecount_max",
"TFACTOR" : "T_FACTOR"
}
def __init__(self, data_or_filepath, date_valid=None):
if date_valid is None:
date_valid = time.Time.now()
# if filepath passed in, just load in from disk as usual
if isinstance(data_or_filepath, str):
# run the image class contructor
super().__init__(data_or_filepath)
# double check that this is actually a DetectorParams file that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a DetectorParams file.")
if self.ext_hdr['DATATYPE'] != 'DetectorParams':
raise ValueError("File that was loaded was not a DetectorParams file.")
else:
if not isinstance(data_or_filepath, dict):
raise ValueError("Input should either be a dictionary or a filepath string")
pri_hdr = fits.Header()
ext_hdr = fits.Header()
ext_hdr['MJDSRT'] = date_valid.mjd
ext_hdr['DRPVERSN'] = corgidrp.__version__
ext_hdr['DRPCTIME'] = time.Time.now().isot
# fill caldb required keywords with dummy data
pri_hdr["OBSNUM"] = 000
ext_hdr["EXPTIME"] = 1.
ext_hdr['OPMODE'] = ""
ext_hdr['EMGAIN_C'] = 1.0
ext_hdr['EXCAMT'] = 40.0
# Enforce data level = CAL?
ext_hdr['DATALVL'] = 'CAL'
# write default values to headers
for key, value in self.default_values.items():
ext_hdr[key] = value
# overwrite default values
for key, value in data_or_filepath.items():
# to avoid VerifyWarning from fits
ext_hdr[key] = value
self.pri_hdr = pri_hdr
self.ext_hdr = ext_hdr
self.data = np.zeros([1,1])
self.dq = np.zeros([1,1])
self.err = np.zeros([1,1])
self.err_hdr = fits.Header()
self.dq_hdr = fits.Header()
self.hdu_list = fits.HDUList()
# make a dictionary that's easy to use
# load back in all the values from the header
for key in self.default_values:
# if this key is not in the header, try the backwards compatability mapping
new_key = key
if key not in self.ext_hdr:
key = self.back_compat_mapping[key]
if len(key) > 8:
# to avoid VerifyWarning from fits
self.params[new_key] = self.ext_hdr['HIERARCH ' + key]
else:
self.params[new_key] = self.ext_hdr[key]
# for backwards compatability:
if "OBSID" in self.pri_hdr:
self.pri_hdr['OBSNUM'] = self.pri_hdr['OBSID']
if "CMDGAIN" in self.ext_hdr:
self.ext_hdr["EMGAIN_C"] = self.ext_hdr['CMDGAIN']
# if this is a new DetectorParams file, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new DetectorParams file
if isinstance(data_or_filepath, dict):
self.ext_hdr['DATATYPE'] = 'DetectorParams' # corgidrp specific keyword for saving to disk
# add to history
self.ext_hdr['HISTORY'] = "Detector Params file created"
# use the start date for the filename by default
self.filedir = "."
filename = "DetectorParams_{0}.fits".format(self.ext_hdr['MJDSRT']).replace(':','.')
self.filename = filename
self.pri_hdr['FILENAME'] = self.filename
[docs]
def get_hash(self):
"""
Computes the hash of the detector param values
Returns:
str: the hash of the detector parameters
"""
hashing_str = "" # make a string that we can actually hash
for key in self.params:
hashing_str += str(self.params[key])
return str(hash(hashing_str))
[docs]
class AstrometricCalibration(Image):
"""
Class for astrometric calibration file.
Args:
data_or_filepath (str or np.array): either the filepath to the FITS file to read in OR a single array of calibration measurements of the following lengths (boresight: length 2 (RA, DEC),
plate scale: length 1 (float), north angle: length 1 (float), average offset: length 2 (floats) of average boresight offset in RA/DEC [deg],
distortion coeffs: length dependent on order of polynomial fit but the last value should be an int describing the polynomial order). For a
3rd order distortion fit the input array should be length 37.
pri_hdr (astropy.io.fits.Header): the primary header (required only if raw 2D data is passed in)
ext_hdr (astropy.io.fits.Header): the image extension header (required only if raw 2D data is passed in)
Attrs:
boresight (np.array): the corrected RA/DEC [deg] position of the detector center
platescale (float): the platescale value in [mas/pixel]
northangle (float): the north angle value in [deg]
avg_offset (np.array): the average offset [deg] from the detector center
distortion_coeffs (np.array): the array of legendre polynomial coefficients that describe the distortion map, where the last value of the array is the order of polynomial used
"""
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None, err=None, input_dataset=None):
if input_dataset is not None:
# Primary header keywords
pri_hdr, _, _, _ = corgidrp.check.merge_headers(
input_dataset,
invalid_keywords=['VISITID', 'FILETIME', 'PROGNUM', 'EXECNUM', 'CAMPAIGN',
'SEGMENT', 'OBSNUM', 'VISNUM', 'CPGSFILE', 'AUXFILE',
'VISTYPE', 'TARGET', 'RA', 'DEC', 'RAPM', 'DECPM',
'OPGAIN', 'PHTCNT', 'FRAMET', 'PA_V3', #'PA_APER',
'SVB_1', 'SVB_2', 'SVB_3', 'ROLL', 'PITCH', 'YAW',
'FILENAME', 'OBSNAME', 'WBJ_1', 'WBJ_2', 'WBJ_3',
'STAR1','STAR2','STAR3','STAR4','STAR5'] + ['STAR{0}'.format(i) for i in range(6, 1000)],
deleted_keywords=['SATSPOTS','ISHOWFSC','HOWFSLNK'])
_, ext_hdr, err_hdr, dq_hdr = corgidrp.check.merge_headers(
input_dataset,
any_true_keywords=['DESMEAR', 'CTI_CORR'],
invalid_keywords=[
# Extension header keywords
'BUNIT', 'ISHOWFSC', 'ISACQ', 'SPBAL', 'ISFLAT', 'SATSPOTS',
'EXPTIME', 'EMGAIN_C', 'KGAINPAR', 'BLNKTIME', 'BLNKCYC',
'EXPCYC', 'OVEREXP', 'NOVEREXP', 'PROXET',
'FCMLOOP', 'FCMPOS', 'FSMINNER', 'FSMLOS', 'FSMPRFL', 'FSMRSTR',
'FSMSG1', 'FSMSG2', 'FSMSG3', 'FSMX', 'FSMY',
'EACQ_ROW', 'EACQ_COL', 'SB_FP_DX', 'SB_FP_DY', 'SB_FS_DX', 'SB_FS_DY',
'DMZLOOP', '1SVALID', 'Z2AVG', 'Z2RES', 'Z2VAR', 'Z3AVG', 'Z3RES', 'Z3VAR',
'10SVALID', 'Z4AVG', 'Z4RES', 'Z5AVG', 'Z5RES',
'Z6AVG', 'Z6RES', 'Z7AVG', 'Z7RES', 'Z8AVG', 'Z8RES',
'Z9AVG', 'Z9RES', 'Z10AVG', 'Z10RES', 'Z11AVG', 'Z11RES',
'Z12AVG', 'Z13AVG', 'Z14AVG',
'SPAM_H', 'SPAM_V', 'SPAMNAME', 'SPAMSP_H', 'SPAMSP_V',
'FPAM_H', 'FPAM_V', 'FPAMNAME', 'FPAMSP_H', 'FPAMSP_V',
'LSAM_H', 'LSAM_V', 'LSAMNAME', 'LSAMSP_H', 'LSAMSP_V',
'FSAM_H', 'FSAM_V', 'FSAMNAME', 'FSAMSP_H', 'FSAMSP_V',
'CFAM_H', 'CFAM_V', 'CFAMNAME', 'CFAMSP_H', 'CFAMSP_V',
'DPAM_H', 'DPAM_V', 'DPAMNAME', 'DPAMSP_H', 'DPAMSP_V',
'FTIMEUTC', 'DATATYPE', 'FWC_PP_E', 'FWC_EM_E', 'SAT_DN', 'DATETIME',
'STAR1','STAR2','STAR3','STAR4','STAR5'] + ['STAR{0}'.format(i) for i in range(6, 1000)],
averaged_keywords=['EXCAMT']
)
# run the image class constructor
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err=err)
else:
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err=err)
# File format checks
if type(self.data) != np.ndarray:
raise ValueError("The AstrometricCalibration data should be an array of calibration measurements")
else:
self.boresight = self.data[:2]
self.platescale = self.data[2]
self.northangle = self.data[3]
self.avg_offset = self.data[4:6]
self.distortion_coeffs = self.data[6:]
# if this is a new astrometric calibration file, bookkeep it in the header
# we need to check if it is new
if ext_hdr is not None:
if input_dataset is None:
raise ValueError("This appears to be a new astrometric calibration file. The dataset of input files needs to be passed in to the input_dataset keyword to record its history.")
self.ext_hdr['DATATYPE'] = 'AstrometricCalibration'
# record all the data that went into making this calibration file
self._record_parent_filenames(input_dataset)
# add to history
self.ext_hdr['HISTORY'] = "Astrometric Calibration file created"
# give it a default filename using the first input file as the base
# strip off everything starting at .fits
orig_input_filename = input_dataset[-1].filename.split(".fits")[0]
self.filename = "{0}_ast_cal.fits".format(orig_input_filename)
self.filename = re.sub('_l[0-9].', '', self.filename)
self.pri_hdr['FILENAME'] = self.filename
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
# check that this is actually an AstrometricCalibration file that was read in
if 'DATATYPE' not in self.ext_hdr or self.ext_hdr['DATATYPE'] != 'AstrometricCalibration':
raise ValueError("File that was loaded was not an AstrometricCalibration file.")
self.dq_hdr['COMMENT'] = 'DQ not meaningful for this calibration; just present for class consistency'
[docs]
def save(self, filedir=None, filename=None):
"""
Save to disk, keeping data as float64 since astrometric coordinates
require more than float32 precision.
Args:
filedir (str): filedir to save to
filename (str): filepath to save to
"""
# Temporarily override image_dtype to preserve float64 for this class
original_dtype = corgidrp.image_dtype
corgidrp.image_dtype = np.float64
try:
super().save(filedir=filedir, filename=filename)
finally:
corgidrp.image_dtype = original_dtype
[docs]
class TrapCalibration(Image):
"""
Class for data related to charge traps that cause charge transfer inefficiency.
The calibration is generated by trap-pumped data.
The format will be [n,10], where each entry will have:
[row, column, sub-electrode location, index numnber of trap at this pixel/electrode,
capture time constant, maximum amplitude of the dipole, energy level of hole,
cross section for holes, R^2 value of fit, release time constant]
Args:
data_or_filepath (str or np.array): either the filepath to the FITS file to read in OR the 2D image data
pri_hdr (astropy.io.fits.Header): the primary header (required only if raw 2D data is passed in)
ext_hdr (astropy.io.fits.Header): the image extension header (required only if raw 2D data is passed in)
input_dataset (corgidrp.data.Dataset): the Image files combined together to make the trap calibration
"""
def __init__(self,data_or_filepath, pri_hdr=None,ext_hdr=None, input_dataset=None):
# run the image class constructor
super().__init__(data_or_filepath,pri_hdr=pri_hdr, ext_hdr=ext_hdr)
# if this is a new calibration, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new cal
if ext_hdr is not None:
if input_dataset is None:
# error check. this is required in this case
raise ValueError("This appears to be a new TrapCalibration. The dataset of input files needs to be "
"passed in to the input_dataset keyword to record history of this TrapCalibration.")
self.ext_hdr['DATATYPE'] = 'TrapCalibration' # corgidrp specific keyword for saving to disk
# log all the data that went into making this dark
self._record_parent_filenames(input_dataset)
# add to history
self.ext_hdr['HISTORY'] = "TrapCalibration created from {0} frames".format(self.ext_hdr['DRPNFILE'])
# give it a default filename using the first input file as the base
# strip off everything starting at .fits
orig_input_filename = input_dataset[-1].filename.split(".fits")[0]
self.filename = "{0}_tpu_cal.fits".format(orig_input_filename)
self.filename = re.sub('_l[0-9].', '', self.filename)
self.pri_hdr['FILENAME'] = self.filename
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
# double check that this is actually a dark file that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr or self.ext_hdr['DATATYPE'] != 'TrapCalibration':
raise ValueError("File that was loaded was not a TrapCalibration file.")
self.dq_hdr['COMMENT'] = 'DQ not meaningful for this calibration; just present for class consistency'
[docs]
class FluxcalFactor(Image):
"""
Class containing the flux calibration factor (and corresponding error) for each band in unit erg/(s * cm^2 * AA)/photo-electrons/s.
To create a new instance of FluxcalFactor, you need to pass the value and error and the filter name in the ext_hdr:
Args:
data_or_filepath (str or float): either a filepath string corresponding to an
existing FluxcalFactor file saved to disk or the data and error float values of the
flux cal factor of a certain filter defined in the header
err (float): uncertainty value of fluxcal factor
pri_hdr (astropy.io.fits.Header): the primary header (required only if raw data is passed in)
ext_hdr (astropy.io.fits.Header): the image extension header (required only if raw data is passed in)
err_hdr (astropy.io.fits.Header): the err extension header (required only if raw data is passed in)
input_dataset (corgidrp.data.Dataset): the Image files combined together to make this FluxcalFactor file (required only if raw 2D data is passed in)
Attributes:
filter (str): used filter name
nd_filter (str): used neutral density filter or "No"
fluxcal_fac (float): the value of the flux cal factor for the corresponding filter
fluxcal_err (float): the error of the flux cal factor for the corresponding filter
"""
def __init__(self, data_or_filepath, err = None, pri_hdr=None, ext_hdr=None, err_hdr = None, input_dataset = None):
if input_dataset is not None:
pri_hdr, ext_hdr, err_hdr, dq_hdr = corgidrp.check.merge_headers(
input_dataset,
any_true_keywords=['DESMEAR', 'CTI_CORR'],
invalid_keywords=[
#pri header
'OPGAIN', 'PHTCNT', 'FRAMET', 'PA_V3', 'PA_APER', 'SVB_1', 'SVB_2', 'SVB_3', 'ROLL',
'PITCH', 'YAW', 'WBJ_1', 'WBJ_2', 'WBJ_3',
#ext header
'FRMTYPE', 'ISHOWFSC', 'ISACQ', 'SPBAL', 'ISFLAT', 'SATSPOTS', 'STATUS',
'HVCBIAS', 'OPMODE', 'EMGAIN_C', 'BLNKTIME', 'BLNKCYC', 'EXPCYC', 'OVEREXP', 'NOVEREXP',
'PROXET', 'FCMLOOP', 'FCMPOS', 'FSMINNER', 'FSMLOS', 'FSMPRFL', 'FSMRSTR',
'FSMSG1', 'FSMSG2', 'FSMSG3', 'FSMX', 'FSMY',
'EACQ_ROW', 'EACQ_COL', 'SB_FP_DX', 'SB_FP_DY', 'SB_FS_DX', 'SB_FS_DY',
'DMZLOOP', 'ISVALID',
'Z2AVG', 'Z3AVG', 'Z4AVG', 'Z5AVG', 'Z6AVG', 'Z7AVG', 'Z8AVG', 'Z9AVG',
'Z10AVG', 'Z11AVG', 'Z12AVG', 'Z13AVG', 'Z14AVG',
'Z2RES', 'Z3RES', 'Z4RES', 'Z5RES', 'Z6RES', 'Z7RES', 'Z8RES', 'Z9RES',
'Z10RES', 'Z11RES', 'Z2VAR', 'Z3VAR',
'FWC_PP_E', 'FWC_EM_E', 'SAT_DN'
]
)
# run the image class constructor
super().__init__(data_or_filepath, err=err, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err_hdr=err_hdr, dq_hdr=dq_hdr)
else:
super().__init__(data_or_filepath, err=err, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err_hdr=err_hdr)
# if filepath passed in, just load in from disk as usual
# if this is a new FluxcalFactors file, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new FluxcalFactors file
if ext_hdr is not None:
if input_dataset is None:
raise ValueError("This appears to be a new FluxcalFactor. The dataset of input files needs to be passed \
in to the input_dataset keyword to record history of this FluxcalFactor file.")
else:
# log all the data that went into making this calibration file
self._record_parent_filenames(input_dataset)
# give it a default filename using the first input file as the base
# strip off everything starting at .fits
orig_input_filename = input_dataset[-1].filename.split(".fits")[0]
self.ext_hdr['DATATYPE'] = 'FluxcalFactor' # corgidrp specific keyword for saving to disk
# JM: moved the below to fluxcal.py since it varies depending on the method
#self.ext_hdr['BUNIT'] = 'erg/(s * cm^2 * AA)/(photoelectron/s)'
#self.err_hdr['BUNIT'] = 'erg/(s * cm^2 * AA)/(photoelectron/s)'
# add to history
self.ext_hdr['HISTORY'] = "Flux calibration file created"
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
self.ext_hdr['DRPVERSN'] = corgidrp.__version__
self.ext_hdr['DRPCTIME'] = time.Time.now().isot
# use the start date for the filename by default
self.filedir = "."
# slight hack for old mocks not in the standard filename format
if input_dataset is not None:
self.filename = "{0}_abf_cal.fits".format(orig_input_filename)
self.filename = re.sub('_l[0-9].', '', self.filename)
else:
self.filename = re.sub(r'\.fits$', '_abf_cal.fits', self.pri_hdr['FILENAME'])
self.pri_hdr['FILENAME'] = self.filename
# File format checks
if self.data.shape != (1,):
raise ValueError('The FluxcalFactor calibration data should be just one float value')
#TBC
[docs]
self.nd_filter = "ND0" #no neutral density filter in beam
if 'FPAMNAME' in self.ext_hdr:
name = self.ext_hdr['FPAMNAME']
if name.startswith("ND"):
self.nd_filter = name
elif 'FSAMNAME' in self.ext_hdr:
name = self.ext_hdr['FSAMNAME']
if name.startswith("ND"):
self.nd_filter = name
else:
raise ValueError('The FluxcalFactor calibration has no keyword FPAMNAME or FSAMNAME in the header')
if 'CFAMNAME' in self.ext_hdr:
self.filter = self.ext_hdr['CFAMNAME']
else:
raise ValueError('The FluxcalFactor calibration has no filter keyword CFAMNAME in the header')
if isinstance(data_or_filepath, str):
# double check that this is actually a FluxcalFactor file that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a FluxcalFactor file.")
if self.ext_hdr['DATATYPE'] != 'FluxcalFactor':
raise ValueError("File that was loaded was not a FluxcalFactor file.")
# make some attributes to be easier to use
[docs]
self.fluxcal_fac = self.data[0]
[docs]
self.fluxcal_err = self.err[0]
[docs]
class SpecFluxCal(Image):
"""
Class containing the wavelength dependent absolute spectral flux calibration (spectro-photometric calibration)
and corresponding error in unit
erg/(s * cm^2 * AA)/photo-electrons/s/bin (spectral sensitivity) for each broad band.
To create a new instance of SpecFluxCal, you need to pass the 2d array (2, N) of wavelength and flux calibration and the
corresponding error array:
Args:
data_or_filepath (str or np.array): either a filepath string corresponding to an
existing SpecFluxCal file saved to disk or the data and error float values of the
spectral flux cal factors vs. wavelength
err (np.array): 2d array of uncertainties of wavelength and spectral flux calibration
dq (np.array): 2d array of data quality
pri_hdr (astropy.io.fits.Header): the primary header (required only if raw data is passed in)
ext_hdr (astropy.io.fits.Header): the image extension header (required only if raw data is passed in)
err_hdr (astropy.io.fits.Header): the err extension header (required only if raw data is passed in)
input_dataset (corgidrp.data.Dataset): the Image files combined together to make this SpecFluxCal file
(required only if raw 2D data is passed in)
Attributes:
band (str): band name for which this calibration is valid
nd_filter (str): Neutral Density filter name, used if standard is bright
wavelength (np.array): 1d array of wavelengths
specflux (np.array): 1d array of the flux calibration
specflux_err (np.array): 1d array of the error of the spectral flux calibration
wave_err (np.array): 1d array of the error of the wavelengths
specflux_dq (np.array): data quality of spectral flux calibration
"""
def __init__(self, data_or_filepath, err = None, dq = None, pri_hdr=None, ext_hdr=None, err_hdr = None, input_dataset = None):
# run the image class contructor
super().__init__(data_or_filepath, err=err, dq = dq, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err_hdr=err_hdr)
# if this is a new SpecFluxCal file, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new SpecFluxCal file
if ext_hdr is not None:
if input_dataset is None:
raise ValueError("This appears to be a new SpecFluxCal. The dataset of input files needs to be passed \
in to the input_dataset keyword to record history of this SpecFluxCal file.")
else:
# log all the data that went into making this calibration file
self._record_parent_filenames(input_dataset)
# give it a default filename using the first input file as the base
# strip off everything starting at .fits
orig_input_filename = input_dataset[-1].filename.split(".fits")[0]
self.ext_hdr['DATATYPE'] = 'SpecFluxCal' # corgidrp specific keyword for saving to disk
self.ext_hdr['BUNIT'] = 'erg/(s * cm^2 * AA)/(photoelectron/s/bin)'
self.err_hdr['BUNIT'] = 'erg/(s * cm^2 * AA)/(photoelectron/s/bin)'
# add to history
self.ext_hdr['HISTORY'] = "Spectral flux calibration file created"
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
self.ext_hdr['DRPVERSN'] = corgidrp.__version__
self.ext_hdr['DRPCTIME'] = time.Time.now().isot
# use the start date for the filename by default
self.filedir = "."
# slight hack for old mocks not in the standard filename format
self.filename = "{0}_sfl_cal.fits".format(orig_input_filename)
self.filename = re.sub('_l[0-9].', '', self.filename)
self.pri_hdr['FILENAME'] = self.filename
# File format checks
if self.data.shape[0] != 2:
raise ValueError('The spectral flux calibration data should be a 2D array')
if 'CFAMNAME' in self.ext_hdr:
self.band = self.ext_hdr['CFAMNAME']
else:
raise ValueError('The SpecFluxCal calibration has no keyword CFAMNAME in the header')
if isinstance(data_or_filepath, str):
# double check that this is actually a FluxcalFactor file that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a SpecFluxCal file.")
if self.ext_hdr['DATATYPE'] != 'SpecFluxCal':
raise ValueError("File that was loaded was not a SpecFluxCal file.")
# make some attributes to be easier to use
[docs]
self.nd_filter = "ND0" #no neutral density filter in beam, TBC
if 'FPAMNAME' in self.ext_hdr:
name = self.ext_hdr['FPAMNAME']
if name.startswith("ND"):
self.nd_filter = name
else:
raise ValueError('The FluxcalFactor calibration has no keyword FPAMNAME in the header')
[docs]
self.wavelength = self.data[0,:]
[docs]
self.specflux = self.data[1,:]
[docs]
self.specflux_err = self.err[0,1,:]
[docs]
self.wave_err = self.err[0,0,:]
[docs]
self.specflux_dq = self.dq[0]
# if this is a new SpecFluxCal file, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new SpecFluxCal file
if ext_hdr is not None:
if input_dataset is None:
raise ValueError("This appears to be a new SpecFluxCal. The dataset of input files needs to be passed \
in to the input_dataset keyword to record history of this SpecFluxCal file.")
else:
# log all the data that went into making this calibration file
self._record_parent_filenames(input_dataset)
# give it a default filename using the first input file as the base
# strip off everything starting at .fits
orig_input_filename = input_dataset[-1].filename.split(".fits")[0]
self.ext_hdr['DATATYPE'] = 'SpecFluxCal' # corgidrp specific keyword for saving to disk
self.ext_hdr['BUNIT'] = 'erg/(s * cm^2 * AA)/(photoelectron/s/bin)'
self.err_hdr['BUNIT'] = 'erg/(s * cm^2 * AA)/(photoelectron/s/bin)'
# add to history
self.ext_hdr['HISTORY'] = "Spectral flux calibration file created"
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
self.ext_hdr['DRPVERSN'] = corgidrp.__version__
self.ext_hdr['DRPCTIME'] = time.Time.now().isot
# use the start date for the filename by default
self.filedir = "."
# slight hack for old mocks not in the standard filename format
self.filename = "{0}_sfl_cal.fits".format(orig_input_filename)
self.filename = re.sub('_l[0-9].', '', self.filename)
self.pri_hdr['FILENAME'] = self.filename
[docs]
class SlitTransmission(Image):
"""
Contains the slit transmission map of a defined slit. This consists of an
2D array with:
1/ Slit transmission map derived at different locations by interpolation.
2/ Corresponding locations along EXCAM +X direction with respect to the
zero-point in (fractional) EXCAM pixels where the slit transmission has
been derived.
3/ Corresponding locations along EXCAM +Y direction with respect to the
zero-point in (fractional) EXCAM pixels where the slit transmission has
been derived.
Args:
data_or_filepath (str or np.array): either a filepath string corresponding to an
existing SlitTransmission file saved to disk or an
2D array with the slit transmission map
x_offset (np.array): 1D array of x positions in slit
y_offset (np.array): 1D array of y positions in slit
pri_hdr (astropy.io.fits.Header): the primary header (required only if raw data is passed in)
ext_hdr (astropy.io.fits.Header): the image extension header (required only if raw data is passed
input_dataset (corgidrp.data.Dataset): the Image files combined together to make this SlitTransmission file
(required only if raw 2D data is passed in)
Attributes:
x_offset (np.array): 1D array, locations along EXCAM +X direction with respect to the
zero-point in (fractional) EXCAM pixels where the slit transmission has
been derived.
y_offset (np.array): 1D array, locations along EXCAM +Y direction with respect to the
zero-point in (fractional) EXCAM pixels where the slit transmission has
been derived.
slitname (str): name of slit with measured transmission
"""
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None, x_offset = None, y_offset = None, input_dataset=None):
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr)
# if this is a new SlitTransmission, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new SlitTransmission
if isinstance(data_or_filepath, np.ndarray):
if input_dataset is None:
raise ValueError("Must pass `input_dataset` to create new SlitTransmission calibration.")
self.ext_hdr['DATATYPE'] = 'SlitTransmission'
self.ext_hdr['DATALVL'] = 'CAL'
self._record_parent_filenames(input_dataset)
self.ext_hdr['HISTORY'] = "Stored Slit Transmission map."
# Generate default output filename
# Strip level suffix (e.g., _l2b) before adding calibration suffix
base = input_dataset[0].filename.split(".fits")[0]
self.filename = f"{base}_slt_cal.fits"
self.filename = re.sub('_l[0-9].', '', self.filename)
# File format checks
if self.data.ndim != 2:
raise ValueError('The slit transmission array must have 2 dimensions')
if x_offset is not None and y_offset is not None:
if len (x_offset) != len(y_offset):
raise ValueError('x and y positions must have same array size')
elif len(x_offset) != self.data.shape[0]:
raise ValueError('x and y positions do not fit to slit map size')
else:
self.x_offset = x_offset
self.y_offset = y_offset
else:
raise ValueError('The SlitTransmission calibration must have also the x- and y offset parameters')
self.xoff_hdr = fits.Header()
self.xoff_hdr["EXTNAME"] = "XOFF"
self.yoff_hdr = fits.Header()
self.yoff_hdr["EXTNAME"] = "YOFF"
else:
# a filepath is passed in
with fits.open(data_or_filepath) as hdulist:
#x/y offset is in FITS extension
self.x_offset = hdulist["XOFF"].data
if self.x_offset is not None and np.issubdtype(self.x_offset.dtype, np.floating):
self.x_offset = self.x_offset.astype(np.float64)
self.xoff_hdr = hdulist["XOFF"].header
self.y_offset = hdulist["YOFF"].data
if self.y_offset is not None and np.issubdtype(self.y_offset.dtype, np.floating):
self.y_offset = self.y_offset.astype(np.float64)
self.yoff_hdr = hdulist["YOFF"].header
if 'DATATYPE' not in self.ext_hdr or self.ext_hdr['DATATYPE'] != 'SlitTransmission':
raise ValueError("This file is not a valid SlitTransmission calibration.")
[docs]
self.slitname = self.ext_hdr['FSAMNAME']
[docs]
def select_slit_transmission_curve(self, frame):
"""
Select the slit-transmission curve for the frame from SlitTransmission cal product
Args:
frame (corgidrp.data.Image): L4 spectroscopy frame whose WV0_X/WV0_Y
coordinates identify where the slit correction should be evaluated.
Returns:
numpy.ndarray: 1-D slit throughput curve sampled on the frame's SPEC
wavelength grid.
"""
slit_map, slit_x, slit_y = self.data, self.x_offset, self.y_offset
slit_map = np.asarray(slit_map, dtype=float)
slit_x = np.asarray(slit_x, dtype=float)
slit_y = np.asarray(slit_y, dtype=float)
try:
wv0_x = float(frame.ext_hdr['WV0_X'])
wv0_y = float(frame.ext_hdr['WV0_Y'])
except KeyError as exc:
raise ValueError("Frame must contain WV0_X and WV0_Y for slit correction.") from exc
# Slit map should be (N_positions, N_wave) or already 1-D in wavelength
if slit_map.ndim == 1:
slit_curve = slit_map
elif slit_map.ndim == 2:
if slit_map.shape[0] != slit_x.size or slit_x.size != slit_y.size:
raise ValueError("slit_map first dimension must match slit_x and slit_y length.")
# Find the closest sampled slit position to the spectrum's WV0 location (not interpolating,
# just doing nearest neighbor lookup)
idx = np.argmin(np.hypot(slit_x - wv0_x, slit_y - wv0_y))
slit_curve = slit_map[idx]
else:
raise ValueError("slit_transmission map must be 1-D or 2-D.")
slit_curve = np.asarray(slit_curve, dtype=float).ravel()
# Require that the slit transmission is defined on the same size wavelength grid as SPEC
# note: should spec.slit_transmission() also return a wavelength array to make sure it's
# the same wavelength grid?
spec_wave = frame.hdu_list['SPEC_WAVE'].data
if slit_curve.size != spec_wave.size:
raise ValueError(
f"slit_transmission wavelength axis (len={slit_curve.size}) must match "
f"SPEC_WAVE length (len={spec_wave.size})."
)
return slit_curve
[docs]
def save(self, filedir=None, filename=None):
"""
Save file to disk with user specified filepath
Args:
filedir (str): filedir to save to. Use self.filedir if not specified
filename (str): filepath to save to. Use self.filename if not specified
"""
if filename is not None:
self.filename = filename
if filedir is not None:
self.filedir = filedir
if len(self.filename) == 0:
raise ValueError("Output filename is not defined. Please specify!")
# recast data to the appropriate bit depth as set by the pipeline settings
if self.data is not None:
self.data = self.data.astype(corgidrp.image_dtype, copy=False)
if self.x_offset is not None:
self.x_offset = self.x_offset.astype(corgidrp.image_dtype, copy=False)
if self.y_offset is not None:
self.y_offset = self.y_offset.astype(corgidrp.image_dtype, copy=False)
prihdu = fits.PrimaryHDU(header=self.pri_hdr)
exthdu = fits.ImageHDU(data=self.data, header=self.ext_hdr)
hdulist = fits.HDUList([prihdu, exthdu])
xoff_hdu = fits.ImageHDU(data=self.x_offset, header = self.xoff_hdr)
hdulist.append(xoff_hdu)
yoff_hdu = fits.ImageHDU(data=self.y_offset, header = self.yoff_hdr)
hdulist.append(yoff_hdu)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=VerifyWarning) # fits save card length truncated warning
hdulist.writeto(self.filepath, overwrite=True)
hdulist.close()
[docs]
class FpamFsamCal(Image):
"""
Class containing the FPAM to EXCAM and FSAM to EXCAM transformation matrices.
CGI model was consistent with FFT/TVAC tests. Transformation matrices are
a 2x2 array with real values. Model cases are fpam_to_excam_modelbased and
fsam_to_excam_modelbased, see below.
Args:
data_or_filepath (dict or str): either a filepath string corresponding to an
existing FpamFsamCal file saved to disk or an
array with the FPAM and FSAM rotation matrices
date_valid (astropy.time.Time): date after which these parameters are valid
Attributes:
fpam_to_excam_modelbased (array): default values for FPAM rotation matrices.
fsam_to_excam_modelbased (array): default values for FSAM rotation matrices.
default_trans (array): array collecting fpam_to_excam_modelbased and
fsam_to_excam_modelbased.
"""
# default transformation matrices (model is consistent with FFT/TVAC tests)
# Signs +/- have been double checked against FFT/TVAC data
[docs]
fpam_to_excam_modelbased = np.array([[ 0. , 0.12285012],
[-0.12285012, 0. ]], dtype=float)
# Signs -/- have been double checked against FFT/TVAC data
[docs]
fsam_to_excam_modelbased = np.array([[-0. , -0.09509319],
[-0.09509319, 0. ]], dtype=float)
[docs]
default_trans = np.array([fpam_to_excam_modelbased, fsam_to_excam_modelbased])
###################
### Constructor ###
###################
def __init__(self, data_or_filepath, date_valid=None):
if date_valid is None:
date_valid = time.Time.now()
# if filepath passed in, just load in from disk as usual
if isinstance(data_or_filepath, str):
# run the image class contructor
super().__init__(data_or_filepath)
# double check that this is actually a FpamFsamCal file that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError('File that was loaded was not a FpamFsamCal file.')
if self.ext_hdr['DATATYPE'] != 'FpamFsamCal':
raise ValueError('File that was loaded was not a FpamFsamCal file.')
else:
if len(data_or_filepath) == 0:
data_or_filepath = self.default_trans
elif not isinstance(data_or_filepath, np.ndarray):
raise ValueError('Input should either be an array or a filepath string.')
if data_or_filepath.shape != (2,2,2):
raise ValueError('FpamFsamCal must be a 2x2x2 array')
prihdr = fits.Header()
exthdr = fits.Header()
exthdr['SCTSRT'] = date_valid.isot # use this for validity date
exthdr['DRPVERSN'] = corgidrp.__version__
exthdr['DRPCTIME'] = time.Time.now().isot
# fill caldb required keywords with dummy data
prihdr['OBSNUM'] = 0
exthdr["EXPTIME"] = 0.
exthdr['OPMODE'] = ""
exthdr['EMGAIN_C'] = 1.0
exthdr['EXCAMT'] = 40.0
self.pri_hdr = prihdr
self.ext_hdr = exthdr
self.data = data_or_filepath
self.dq = self.data * 0
self.err = self.data * 0
self.err_hdr = fits.Header()
self.dq_hdr = fits.Header()
self.hdu_list = fits.HDUList()
# if this is a new FpamFsamCal file, we need to bookkeep it in the
# header b/c of logic in the super.__init__, we just need to check this
# to see if it is a new FpamFsamCal file
if isinstance(data_or_filepath, np.ndarray):
# corgidrp specific keyword for saving to disk
self.ext_hdr['DATATYPE'] = 'FpamFsamCal'
# add to history
self.ext_hdr['HISTORY'] = 'FPAM/FSAM rotation matrices file created'
# use the start date for the filename by default
self.filedir = '.'
self.filename = "FpamFsamCal_{0}.fits".format(self.ext_hdr['SCTSRT']).replace(':', '.') # compatible with Windows machines
self.pri_hdr['FILENAME'] = self.filename
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
[docs]
class CoreThroughputCalibration(Image):
"""
Class containing a core throughput calibration file
A CoreThroughput calibration file has two main data arrays:
3-d cube of PSF images, e.g, a N1xN1xN array where N1= +/- 3l/D about
PSF's centroid in EXCAM pixels. The N PSF images are the ones in the CT
dataset.
N sets of (x, y, CT measurements). The (x, y) are pixel coordinates of the
PSF images in the CT dataset wrt EXCAM (0,0) pixel during core throughput
observation.
Args:
data_or_filepath (array or str): either a filepath string corresponding
to an existing CoreThroughputCalibration file saved to disk or an array
with the elements of the core throughput calibration file.
hdu_list (astropy.io.fits.HDUList): an astropy HDUList object that
contains the elements of the core throughput calibration file.
"""
###################
### Constructor ###
###################
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None, err=None,
dq=None, err_hdr=None, dq_hdr=None, input_hdulist=None,
input_dataset=None):
# run the image class contructor
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr,
err=err, dq=dq, err_hdr=err_hdr, dq_hdr=dq_hdr,
input_hdulist=input_hdulist)
# Verify the extension header corresponds to the PSF cube
if self.ext_hdr['EXTNAME'] != 'PSFCUBE':
raise ValueError('The input data does not seem to contain the PSF '
'cube of measurements')
# CT array measurements on EXCAM
idx_hdu = 0
if self.hdu_list[idx_hdu].name == 'CTEXCAM':
self.ct_excam = self.hdu_list[idx_hdu].data
self.ct_excam_hdr = self.hdu_list[idx_hdu].header
else:
raise ValueError('The HDU list does not seem to contain the CT '
'array of measurements')
# FPAM positions during CT observations
idx_hdu = 1
if self.hdu_list[idx_hdu].name == 'CTFPAM':
self.ct_fpam = self.hdu_list[idx_hdu].data
self.ct_fpam_hdr = self.hdu_list[idx_hdu].header
else:
raise ValueError('The HDU list does not seem to contain the FPAM '
'value during core throughput observations')
# FSAM positions during CT observations
idx_hdu = 2
if self.hdu_list[idx_hdu].name == 'CTFSAM':
self.ct_fsam = self.hdu_list[idx_hdu].data
self.ct_fsam_hdr = self.hdu_list[idx_hdu].header
else:
raise ValueError('The HDU list does not seem to contain the FPAM '
'value during core throughput observations')
# File format checks
# Check PSF basis is a 3D set
if self.data.ndim != 3:
raise ValueError('The PSF basis is an (N,N1,N1) array with N PSFs, '
'each with N1 pixels x N1 pixels.')
# Check CT array is a 3xN set
if len(self.ct_excam) != 3:
raise ValueError('The core throughput array of measurements is a '
'Nx3 array.')
# Check the CT map has the same number of elements as the PSF cube
if self.ct_excam.shape[1] != self.data.shape[0]:
raise ValueError('The core throughput map must have one PSF location '
'and CT value for each PSF.')
if input_dataset is not None:
# Filter to off-axis PSF frames only (exclude pupil images) to check
# that PAM keywords are consistent across all images
offaxis_frames = [f for f in input_dataset
if f.ext_hdr.get('DPAMNAME') != 'PUPIL']
offaxis_dataset = Dataset(offaxis_frames)
pri_hdr, ext_hdr, err_hdr, dq_hdr = corgidrp.check.merge_headers(
offaxis_dataset, averaged_keywords = [
'RA', 'DEC', 'RAPM', 'DECPM', 'PA_V3', 'PA_APER', 'SVB_1', 'SVB_2', 'SVB_3'
'ROLL', 'PITCH', 'YAW', 'EXCAMT', 'NOVEREXP', 'PROXET',
'Z2AVG', 'Z2RES', 'Z2VAR', 'Z3AVG', 'Z3RES', 'Z3VAR',
'Z4AVG', 'Z4RES', 'Z5AVG', 'Z5RES',
'Z6AVG', 'Z6RES', 'Z7AVG', 'Z7RES', 'Z8AVG', 'Z8RES',
'Z9AVG', 'Z9RES', 'Z10AVG', 'Z10RES', 'Z11AVG', 'Z11RES',
'Z12AVG', 'Z13AVG', 'Z14AVG',
],
invalid_keywords = [
'FTIMEUTC', 'PROXET', 'DATETIME', 'FSMSG1',
'FSMSG2', 'FSMSG3', 'FSMX', 'FSMY',
]
)
# Apply merged headers from PSF part of the dataset back to the output
self.pri_hdr = pri_hdr
ext_hdr['EXTNAME'] = 'PSFCUBE'
ext_hdr['BUNIT'] = 'photoelectron/s'
ext_hdr['COMMENT'] = ('Set of PSFs derived from a core throughput '
'observing sequence. PSFs are not normalized. They are the '
'images of the off-axis source. The data cube is centered '
'around each PSF location')
self.ext_hdr = ext_hdr
self.err_hdr = err_hdr
self.dq_hdr = dq_hdr
# Additional bookkeeping for a calibration file:
# If this is a new calibration file, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if
# it is a new CoreThroughputCalibration file
if ext_hdr is not None:
if input_dataset is None:
# error check. this is required in this case
raise ValueError('This appears to be a new Core Throughput calibration'
'File. The dataset of input files needs'
'to be passed in to the input_dataset keyword'
'to record history of this calibration file.')
# corgidrp specific keyword for saving to disk
self.ext_hdr['DATATYPE'] = 'CoreThroughputCalibration'
# log all the data that went into making this calibration file
self._record_parent_filenames(input_dataset)
# add to history if not present
if not 'HISTORY' in self.ext_hdr:
self.ext_hdr['HISTORY'] = ('Core Throughput calibration derived '
f'from a set of frames on {self.ext_hdr["DATETIME"]}')
# Default convention: replace _l3_.fits from the filename of the
# input dataset by _ctp_cal.fits
self.filedir = '.'
self.filename = re.sub('_l[0-9].', '_ctp_cal', input_dataset[-1].filename)
self.pri_hdr['FILENAME'] = self.filename
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
# double check that this is actually a NonLinearityCalibration file that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a CoreThroughputCalibration file.")
if self.ext_hdr['DATATYPE'] != 'CoreThroughputCalibration':
raise ValueError("File that was loaded was not a CoreThroughputCalibration file.")
###############
### Methods ###
###############
[docs]
def GetCTFPMPosition(self,
corDataset,
fpamfsamcal):
""" Gets the FPM's center during a Core throughput observing sequence.
The use of the delta FPAM/FSAM positions and the rotation matrices is
based on the prescription provided on 1/14/25: "H/V values to EXCAM
row/column pixels"
delta_pam = np.array([[dh], [dv]]) # fill these in
M = np.array([[ M00, M01], [M10, M11]], dtype=float32)
delta_pix = M @ delta_pam
Args:
corDataset (corgidrp.data.Dataset): a dataset containing some
coronagraphic observations.
fpamfsamcal (corgidrp.data.FpamFsamCal): an instance of the
FpamFsamCal class.
Returns:
Returns the FPM's center during a Core throughput observing sequence.
"""
# Read FPM location during the coronagraphic observations
cor_fpm_center = np.array([corDataset[0].ext_hdr['STARLOCX'],
corDataset[0].ext_hdr['STARLOCY']])
# Read FPAM and FSAM values during the coronagraphic observations
cor_fpam = np.array([corDataset[0].ext_hdr['FPAM_H'],
corDataset[0].ext_hdr['FPAM_V']])
cor_fsam = np.array([corDataset[0].ext_hdr['FSAM_H'],
corDataset[0].ext_hdr['FSAM_V']])
# Compute delta FPAM and delta FSAM
delta_fpam_um = self.ct_fpam - cor_fpam
delta_fsam_um = self.ct_fsam - cor_fsam
# Follow the array prescription from the doc string
delta_fpam_um = np.array([delta_fpam_um]).transpose()
delta_fsam_um = np.array([delta_fsam_um]).transpose()
# Get the shift in EXCAM pixels for FPAM and FSAM
delta_fpam_excam = (fpamfsamcal.data[0] @ delta_fpam_um).transpose()[0]
delta_fsam_excam = (fpamfsamcal.data[1] @ delta_fsam_um).transpose()[0]
# Return the FPAM and FSAM centers during the core throughput observations
return cor_fpm_center + delta_fpam_excam, cor_fpm_center + delta_fsam_excam
[docs]
def InterpolateCT(self,
x_cor,
y_cor,
corDataset,
fpamfsamcal,
logr=False):
""" Interpolate CT value at a desired position of a coronagraphic
observation.
First implementation based on Max Millar-Blanchaer's suggestions
https://collaboration.ipac.caltech.edu/display/romancoronagraph/Max%27s+Interpolation+idea
Here we assume that the core throughput measurements with the star
located along a series of radial spikes at various azimuths.
It throws an error if the radius of the new points is outside the range
of the input radii. If the azimuth is greater than the maximum azimuth
of the core throughput dataset, it will mod the azimuth to be within
the range of the input azimuths.
Assumes that the input core_thoughput is between 0 and 1.
# TODO: review accuracy of the method with simulated data that are more
# representative of future mission data, including error budget and
# expected azimuthal dependence on the CT.
Args:
x_cor (numpy.ndarray): Values of the first dimension of the
target locations where the CT will be interpolated. Locations are
EXCAM pixels measured with respect to the FPM's center.
y_cor (numpy.ndarray): Values of the second dimension of the
target locations where the CT will be interpolated. Locations are
EXCAM pixels measured with respect to the FPM's center.
corDataset (corgidrp.data.Dataset): a dataset containing some
coronagraphic observations.
fpamfsamcal (corgidrp.data.FpamFsamCal): an instance of the
FpamFsamCal calibration class.
logr (bool) (optional): If True, radii are mapped into their logarithmic
values before constructing the interpolant.
Returns:
Returns interpolated value of the CT, first, and positions for valid
locations as a numpy ndarray.
"""
if isinstance(x_cor, np.ndarray) is False:
if isinstance(x_cor, int) or isinstance(x_cor, float):
x_cor = np.array([x_cor])
elif isinstance(x_cor, list):
x_cor = np.array(x_cor)
else:
raise ValueError('Target locations must be a scalar '
'(int or float), list of int or float values, or '
' a numpy.ndarray')
if isinstance(y_cor, np.ndarray) is False:
if isinstance(y_cor, int) or isinstance(y_cor, float):
y_cor = np.array([y_cor])
elif isinstance(y_cor, list):
y_cor = np.array(y_cor)
else:
raise ValueError('Target locations must be a scalar '
'(int or float), list of int or float values, or '
' a numpy.ndarray')
if len(x_cor) != len(y_cor):
raise ValueError('Target locations must have the same number of '
'elements')
# Get FPM's center during CT observations
fpam_ct_pix_out = self.GetCTFPMPosition(
corDataset,
fpamfsamcal)[0]
# Get CT measurements relative to CT FPM's center
x_grid = self.ct_excam[0,:] - fpam_ct_pix_out[0]
y_grid = self.ct_excam[1,:] - fpam_ct_pix_out[1]
core_throughput = self.ct_excam[2,:]
# Algorithm
radii = np.sqrt(x_grid**2 + y_grid**2)
# We'll need to mod the input azimuth, so let's subtract the
# minimum azimuth so we are relative to zero
azimuths = np.arctan2(y_grid, x_grid)
azimuth0 = azimuths.min()
azimuths = azimuths - azimuth0
# Now we can create a 2D array of the radii and azimuths
# Calculate the new datapoint in the right radius and azimuth coordinates
radius_cor = np.sqrt(x_cor**2 + y_cor**2)
# Remove interpolation locations that are outside the radius range
r_good = radius_cor >= radii.min()
if len(x_cor[r_good]) == 0:
raise ValueError('All target radii are less than the minimum '
'radius in the core throughout data: {:.2f} EXCAM pixels'.format(radii.min()))
radius_cor = radius_cor[r_good]
# Update x_cor and y_cor
x_cor = x_cor[r_good]
y_cor = y_cor[r_good]
r_good = radius_cor <= radii.max()
if len(x_cor[r_good]) == 0:
raise ValueError('All target radius are greater than the maximum '
'radius in the core throughout data: {:.2f} EXCAM pixels'.format(radii.max()))
radius_cor = radius_cor[r_good]
# Update x_cor and y_cor
x_cor = x_cor[r_good]
y_cor = y_cor[r_good]
# We'll need to mod the input azimuth, so let's subtract the minimum azimuth so we are relative to zero.
azimuth_cor = np.arctan2(y_cor, x_cor) - azimuth0
# MOD this azimuth so that we're in the right range: all angles will be
# within [0, azimuths.max()), including negative values
azimuth_cor = azimuth_cor % azimuths.max()
if logr:
radii = np.log10(radii)
radius_cor = np.log10(radius_cor)
rad_az = np.c_[radii, azimuths]
# Make the interpolator
interpolator = LinearNDInterpolator(rad_az, core_throughput)
# Now interpolate:
interpolated_values = interpolator(radius_cor, azimuth_cor)
# Raise ValueError if CT < 0, CT> 1
if np.any(interpolated_values < 0) or np.any(interpolated_values > 1):
raise ValueError('Some interpolated core throughput values are '
f'out of bounds (0,1): ({interpolated_values.min():.2f}, '
f'{interpolated_values.max():.2f})')
# Edge case:
# If a target location happens to be part of the CT dataset (i.e., the
# interpolator) and its azimuth is equal to the maximum azimuth in the
# CT dataset, the interpolated CT may sometimes be assigned to NaN, while
# it should simply be the same inout CT value at the same location
idx_az_max = np.argwhere(np.isnan(interpolated_values))
for idx in idx_az_max:
idx_x_arr = np.argwhere(x_cor[idx] == x_grid)
idx_y_arr = np.argwhere(y_cor[idx] == y_grid)
for idx_x in idx_x_arr:
# If and only if the same index is in both, it's the same location
if idx_x in idx_y_arr:
interpolated_values[idx_x] = core_throughput[idx_x]
# Raise ValueError if all CT are NaN
if np.all(np.isnan(interpolated_values)):
raise ValueError('There are no valid target positions within the ' +
'range of input PSF locations')
# Extrapolation: Remove NaN values
is_valid = np.where(np.isnan(interpolated_values) == False)[0]
return np.array([interpolated_values[is_valid],
x_cor[is_valid],
y_cor[is_valid]])
[docs]
def GetPSF(self,
x_cor,
y_cor,
corDataset,
fpamfsamcal,
method='nearest-polar'):
""" Get a PSF at a given (x,y) location on HLC in a coronagraphic
observation given a CT calibration file and the PAM transformation from
encoder values to EXCAM pixels.
First implementation: nearest PSF in a polar sense. See below.
# TODO: Implement an interpolation method that takes into account other
# PSF than the nearest one. Comply with any required precision from the
# functions that will use the interpolated PSF.
Args:
x_cor (numpy.ndarray): Values of the first dimension of the
target locations where the CT will be interpolated. Locations are
EXCAM pixels measured with respect to the FPM's center.
y_cor (numpy.ndarray): Values of the second dimension of the
target locations where the CT will be interpolated. Locations are
EXCAM pixels measured with respect to the FPM's center.
corDataset (corgidrp.data.Dataset): a dataset containing some
coronagraphic observations.
fpamfsamcal (corgidrp.data.FpamFsamCal): an instance of the
FpamFsamCal class. That is, a FpamFsamCal calibration file.
method (str): Interpolation method that will be used:
'polar-nearest': Given an (x,y) position wrt FPM's center, the
associated PSF is the one in the CT calibration dataset whose
radial distance to the FPM's center is the closest to
sqrt(x**2+y**2). If there is more than one CT PSF at the same
radial distance, choose the one whose angular distance to the
(x,y) location is the smallest.
Returns:
psf_interp_list (array): Array of interpolated PSFs for the valid
target locations.
x_interp_list (array): First dimension of the list of valid target positions.
y_interp_list (array): Second dimension of the list of valid target positions.
"""
if isinstance(x_cor, np.ndarray) is False:
if isinstance(x_cor, int) or isinstance(x_cor, float):
x_cor = np.array([x_cor])
elif isinstance(x_cor, list):
x_cor = np.array(x_cor)
else:
raise ValueError('Target locations must be a scalar '
'(int or float), list of int or float values, or '
' a numpy.ndarray')
if isinstance(y_cor, np.ndarray) is False:
if isinstance(y_cor, int) or isinstance(y_cor, float):
y_cor = np.array([y_cor])
elif isinstance(y_cor, list):
y_cor = np.array(y_cor)
else:
raise ValueError('Target locations must be a scalar '
'(int or float), list of int or float values, or '
' a numpy.ndarray')
if len(x_cor) != len(y_cor):
raise ValueError('Target locations must have the same number of '
'elements')
# We need to translate the PSF locations in the CT cal file to be with
# respect to the FPM's center during CT observations:
# Get FPM's center during CT observations
fpam_ct_pix_out = self.GetCTFPMPosition(
corDataset,
fpamfsamcal)[0]
# Get CT measurements relative to CT FPM's center
x_grid = self.ct_excam[0,:] - fpam_ct_pix_out[0]
y_grid = self.ct_excam[1,:] - fpam_ct_pix_out[1]
# Algorithm:
# Radial distances wrt FPM's center
radii = np.sqrt(x_grid**2 + y_grid**2)
# Azimuths
azimuths = np.arctan2(y_grid, x_grid)
# Radial distances of the target locations
radius_cor = np.sqrt(x_cor**2 + y_cor**2)
# Remove interpolation locations that are outside the radius range
r_good = radius_cor >= radii.min()
if len(x_cor[r_good]) == 0:
raise ValueError('All target radius are less than the minimum '
'radius in the core throughout data: {:.2f} EXCAM pixels'.format(radii.min()))
radius_cor = radius_cor[r_good]
# Update x_cor and y_cor
x_cor = x_cor[r_good]
y_cor = y_cor[r_good]
r_good = radius_cor <= radii.max()
if len(x_cor[r_good]) == 0:
raise ValueError('All target radius are either less than the minimum'
' radius or greater than the maximum radius in the core throughout'
' data: {:.2f} EXCAM pixels'.format(radii.max()))
radius_cor = radius_cor[r_good]
# Update x_cor and y_cor
x_cor = x_cor[r_good]
y_cor = y_cor[r_good]
r_cor = np.sqrt(x_cor**2 + y_cor**2)
psf_interp_list = []
x_interp_list = []
y_interp_list = []
if method.lower() == 'nearest-polar':
for i_psf in range(len(x_cor)):
# Agreeement for this nearest method is that radial distances are
# binned at 1/10th of a pixel. This will be unnecessary as soon as
# there's any other interpolation method than the 'nearest' one.
# Find the nearest radial position in the CT file (argmin()
# returns the first occurence only)
diff_r_abs = np.round(10*np.abs(r_cor[i_psf] - radii)/10)
idx_near = np.argwhere(diff_r_abs == diff_r_abs.min())
# If there's more than one case, select that one with the
# smallest angular distance
if len(idx_near) > 1:
print("More than one PSF found with the same radial distance from the FPM's center")
# Difference in angle b/w target and grid
# We want to distinguish PSFs at different quadrants
az_grid = np.arctan2(y_grid[idx_near], x_grid[idx_near])
az_cor = np.arctan2(y_cor[i_psf], x_cor[i_psf])
# Flatten into a 1-D array
diff_az_abs = np.abs(az_cor - az_grid).transpose()[0]
# Azimuth binning consistent with the binning of the radial distance
bin_az_fac = 1/10/r_cor[i_psf]
diff_az_abs = bin_az_fac * np.round(diff_az_abs/bin_az_fac)
# Closest angular location to the target location within equal radius
idx_near_az = np.argwhere(diff_az_abs == diff_az_abs.min())
# If there are two locations (half angle), choose the average (agreement)
if len(idx_near_az) == 2:
psf_interp = np.squeeze(self.data[idx_near[idx_near_az]]).mean(axis=0)
# Otherwise, this is the PSF
elif len(idx_near_az) == 1:
psf_interp = np.squeeze(self.data[idx_near[idx_near_az[0]]])
else:
raise ValueError(f'There are {len(idx_near_az):d} PSFs ',
'equally near the target PSF. This should not happen.')
# Otherwise this is the interpolated PSF (nearest)
elif len(idx_near) == 1:
psf_interp = np.squeeze(self.data[idx_near[0]])
# This should not happen b/c there should always be a closest radius
else:
raise Exception('No closest radial distance found. This should not happen.')
# Add valid case
psf_interp_list += [psf_interp]
x_interp_list += [x_cor[i_psf]]
y_interp_list += [y_cor[i_psf]]
else:
raise ValueError(f'Unidentified method for the interpolation: {method}')
return np.array(psf_interp_list), np.array(x_interp_list), np.array(y_interp_list)
[docs]
class CoreThroughputMap(Image):
""" Class containing a corethroughput map.
The corethroughput map consists of M sets of (x, y, CT estimated). The
(x, y) are pixel coordinates wrt the FPM's center. More details about the
corethroughput map array can be found in the class method create_ct_map().
Args:
data_or_filepath (array or str): either the filepath to the FITS file to
read in OR the 2D image data. The FITS file or data must be from a
coronagraphic observation because the FPM's center is needed during the
creation of the corethroughput map.
"""
###################
### Constructor ###
###################
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None, err=None, input_dataset=None):
# run the image class constructor
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err=err)
# Check it has the FPM's center information
if ('STARLOCX' not in self.ext_hdr) or ('STARLOCY' not in self.ext_hdr):
raise ValueError('The input dataset does not contain the information'
'about the FPM center')
# Check data have the expected format (x,y,ct)
if isinstance(data_or_filepath, str) is False:
data_or_filepath.shape[0] == 3
if data_or_filepath[2,:].max() > 1 or data_or_filepath[2,:].min() < 0:
raise ValueError('Corethroughput map values should be within 0 and 1')
if input_dataset is not None:
# Filter to off-axis PSF frames only (exclude pupil images) to check
# that PAM keywords are consistent across all images
offaxis_frames = [f for f in input_dataset
if f.ext_hdr.get('DPAMNAME') != 'PUPIL']
offaxis_dataset = Dataset(offaxis_frames)
pri_hdr, ext_hdr, err_hdr, dq_hdr = corgidrp.check.merge_headers(
offaxis_dataset, averaged_keywords = [
'RA', 'DEC', 'RAPM', 'DECPM', 'PA_V3', 'PA_APER', 'SVB_1', 'SVB_2', 'SVB_3'
'ROLL', 'PITCH', 'YAW', 'EXCAMT', 'NOVEREXP', 'PROXET',
'Z2AVG', 'Z2RES', 'Z2VAR', 'Z3AVG', 'Z3RES', 'Z3VAR',
'Z4AVG', 'Z4RES', 'Z5AVG', 'Z5RES',
'Z6AVG', 'Z6RES', 'Z7AVG', 'Z7RES', 'Z8AVG', 'Z8RES',
'Z9AVG', 'Z9RES', 'Z10AVG', 'Z10RES', 'Z11AVG', 'Z11RES',
'Z12AVG', 'Z13AVG', 'Z14AVG',
],
invalid_keywords = [
'FTIMEUTC', 'PROXET', 'DATETIME', 'FSMSG1',
'FSMSG2', 'FSMSG3', 'FSMX', 'FSMY',
]
)
# Apply merged headers from PSF part of the dataset back to the output
self.pri_hdr = pri_hdr
ext_hdr['BUNIT'] = 'photoelectron/s'
self.ext_hdr = ext_hdr
self.err_hdr = err_hdr
self.dq_hdr = dq_hdr
# Additional bookkeeping for a calibration file:
# If this is a new calibration file, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if
# it is a new CoreThroughputMap file
if ext_hdr is not None:
if input_dataset is None:
# error check. this is required in this case
raise ValueError('This appears to be a new CoreThroughputMap '
'file. The dataset of input files needs '
'to be passed in to the input_dataset keyword '
'to record history of this calibration file.')
# corgidrp specific keyword for saving to disk
self.ext_hdr['DATATYPE'] = 'CoreThroughputMap'
# log all the data that went into making this calibration file
self._record_parent_filenames(input_dataset)
# add to history if not present
if not 'HISTORY' in self.ext_hdr:
self.ext_hdr['HISTORY'] = ('CoreThroughputMap derived '
f'from a set of frames on {self.ext_hdr["DATETIME"]}')
# Generate filename following the calibration file convention
self.filedir = '.'
if input_dataset is not None:
self.filename = re.sub('_l[0-9].', '_ctm_cal', input_dataset[-1].filename)
else:
self.filename = 'corethroughput_map.fits' # fallback
self.pri_hdr['FILENAME'] = self.filename
# Enforce data level = L3
self.ext_hdr['DATALVL'] = 'L3'
# Keep track of the coronagraphic files used to create the CT map
if input_dataset is not None:
self._record_parent_filenames(input_dataset)
# double check that this is actually a NonLinearityCalibration file that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a CoreThroughputMap file.")
if self.ext_hdr['DATATYPE'] != 'CoreThroughputMap':
raise ValueError("File that was loaded was not a CoreThroughputMap file.")
[docs]
class PyKLIPDataset(pyKLIP_Data):
"""
A pyKLIP instrument class for Roman Coronagraph Instrument data.
# TODO: Add more bandpasses, modes to self.filter_wavs
# Add wcs header info!
Attrs:
input: Input corgiDRP dataset.
centers: Star center locations.
filenums: file numbers.
filenames: file names.
PAs: position angles.
wvs: wavelengths.
wcs: WCS header information. Currently None.
IWA: inner working angle.
OWA: outer working angle.
psflib: corgiDRP dataset containing reference PSF observations.
output: PSF subtracted pyKLIP dataset
"""
####################
### Constructors ###
####################
def __init__(self,
dataset,
psflib_dataset=None,
highpass=False):
"""
Initialize the pyKLIP instrument class for space telescope data.
# TODO: Determine inner working angle based on PAM positions
- Inner working angle based on Focal plane mask (starts with HLC) + color filter ('1F') for primary mode
- Outer working angle based on field stop? (should be R1C1 or R1C3 for primary mode)
Args:
dataset (corgidrp.data.Dataset):
Dataset containing input science observations.
psflib_dataset (corgidrp.data.Dataset, optional):
Dataset containing input reference observations. The default is None.
highpass (bool, optional):
Toggle to do highpass filtering. Defaults fo False.
"""
# Initialize pyKLIP Data class.
super(PyKLIPDataset, self).__init__()
# Set filter wavelengths
[docs]
self.filter_wavs = {'1F': 575e-9, '4F': 825e-9} # meters
# Read science and reference files.
self.readdata(dataset, psflib_dataset, highpass)
pass
################################
### Instance Required Fields ###
################################
@property
@input.setter
def input(self, newval):
self._input = newval
@property
[docs]
def centers(self):
return self._centers
@centers.setter
def centers(self, newval):
self._centers = newval
@property
[docs]
def filenums(self):
return self._filenums
@filenums.setter
def filenums(self, newval):
self._filenums = newval
@property
[docs]
def filenames(self):
return self._filenames
@filenames.setter
def filenames(self, newval):
self._filenames = newval
@property
[docs]
def PAs(self):
return self._PAs
@PAs.setter
def PAs(self, newval):
self._PAs = newval
@property
[docs]
def wvs(self):
return self._wvs
@wvs.setter
def wvs(self, newval):
self._wvs = newval
@property
[docs]
def wcs(self):
return self._wcs
@wcs.setter
def wcs(self, newval):
self._wcs = newval
@property
[docs]
def IWA(self):
return self._IWA
@IWA.setter
def IWA(self, newval):
self._IWA = newval
@property
[docs]
def OWA(self):
return self._OWA
@OWA.setter
def OWA(self, newval):
self._OWA = newval
@property
[docs]
def psflib(self):
return self._psflib
@psflib.setter
def psflib(self, newval):
self._psflib = newval
@property
[docs]
def output(self):
return self._output
@output.setter
def output(self, newval):
self._output = newval
###############
### Methods ###
###############
[docs]
def readdata(self,
dataset,
psflib_dataset,
highpass=False):
"""
Read the input science observations.
Args:
dataset (corgidrp.data.Dataset):
Dataset containing input science observations.
psflib_dataset (corgidrp.data.Dataset, optional):
Dataset containing input reference observations. The default is None.
highpass (bool, optional):
Toggle to do highpass filtering. Defaults fo False.
"""
# Check input.
if not isinstance(dataset, corgidrp.data.Dataset):
raise UserWarning('Input dataset is not a corgidrp Dataset object.')
if len(dataset) == 0:
raise UserWarning('No science frames in the input dataset.')
if not psflib_dataset is None:
if not isinstance(psflib_dataset, corgidrp.data.Dataset):
raise UserWarning('Input psflib_dataset is not a corgidrp Dataset object.')
# Loop through frames.
input_all = []
centers_all = [] # pix
filenames_all = []
PAs_all = [] # deg
wvs_all = [] # m
wcs_all = []
PIXSCALE = [] # arcsec
psflib_data_all = []
psflib_centers_all = [] # pix
psflib_filenames_all = []
# Iterate over frames in dataset
for i, frame in enumerate(dataset):
phead = frame.pri_hdr
shead = frame.ext_hdr
if 'TELESCOP' in phead:
TELESCOP = phead['TELESCOP']
if TELESCOP != "ROMAN":
raise UserWarning('Data is not from Roman Space Telescope Coronagraph Instrument. TELESCOP = {0}'.format(TELESCOP))
INSTRUME = phead['INSTRUME']
if INSTRUME != "CGI":
raise UserWarning('Data is not from Roman Space Telescope Coronagraph Instrument. INSTRUME = {0}'.format(INSTRUME))
CFAMNAME = shead['CFAMNAME']
data = frame.data
if data.ndim == 2:
data = data[np.newaxis, :]
if data.ndim != 3:
raise UserWarning('Requires 2D/3D data cube')
NINTS = data.shape[0]
pix_scale = shead['PLTSCALE'] * 1000. # arcsec
PIXSCALE += [pix_scale]
# Get centers.
centers = np.array([shead['STARLOCX'], shead['STARLOCY']] * NINTS)
# Get metadata.
input_all += [data]
centers_all += [centers]
filenames_all += [os.path.split(frame.filename)[1] + '_INT%.0f' % (j + 1) for j in range(NINTS)]
PAs_all += [shead['NORTHANG']] * NINTS ##we quietly change PAs_all to use NORTHANG
# Get center wavelengths
try:
CWAVEL = self.filter_wavs[CFAMNAME]
except:
raise UserWarning(f'CFAM position {CFAMNAME} is not configured in corgidrp.data.PyKLIPDataset .')
# Rounding error introduced here?
wvs_all += [CWAVEL] * NINTS
# pyklip will look for wcs.cd, so make sure that attribute exists
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=fits.verify.VerifyWarning)
wcs_obj = wcs.WCS(header=shead)
if not hasattr(wcs_obj.wcs,'cd'):
wcs_obj.wcs.cd = wcs_obj.wcs.pc * wcs_obj.wcs.cdelt
for j in range(NINTS):
wcs_all += [wcs_obj.deepcopy()]
try:
input_all = np.concatenate(input_all)
except ValueError:
raise UserWarning('Unable to concatenate images. Some science files do not have matching image shapes')
centers_all = np.concatenate(centers_all).reshape(-1, 2)
filenames_all = np.array(filenames_all)
filenums_all = np.array(range(len(filenames_all)))
PAs_all = np.array(PAs_all)
wvs_all = np.array(wvs_all).astype(np.float32)
wcs_all = np.array(wcs_all)
PIXSCALE = np.unique(np.array(PIXSCALE))
if len(PIXSCALE) != 1:
raise UserWarning('Some science files do not have matching pixel scales')
iwa_all = np.min(wvs_all) / 6.5 * 180. / np.pi * 3600. / PIXSCALE[0] # pix
owa_all = np.sum(np.array(input_all.shape[1:]) / 2.) # pix
# Recenter science images so that the star is at the center of the array.
new_center = (np.array(data.shape[1:])-1)/ 2.
new_center = new_center[::-1]
for i, image in enumerate(input_all):
recentered_image = pyklip.klip.align_and_scale(image, new_center=new_center, old_center=centers_all[i])
input_all[i] = recentered_image
centers_all[i] = new_center
# Assign pyKLIP variables.
self._input = input_all
self._centers = centers_all
self._filenames = filenames_all
self._filenums = filenums_all
self._PAs = PAs_all
self._wvs = wvs_all
self._wcs = wcs_all
self._IWA = iwa_all
self._OWA = owa_all
# Prepare reference library
if not psflib_dataset is None:
psflib_data_all = []
psflib_centers_all = [] # pix
psflib_filenames_all = []
for i, frame in enumerate(psflib_dataset):
phead = frame.pri_hdr
shead = frame.ext_hdr
data = frame.data
if data.ndim == 2:
data = data[np.newaxis, :]
if data.ndim != 3:
raise UserWarning('Requires 2D/3D data cube')
NINTS = data.shape[0]
pix_scale = shead['PLTSCALE'] * 1000. # arcsec
PIXSCALE += [pix_scale]
# Get centers.
centers = np.array([shead['STARLOCX'], shead['STARLOCY']] * NINTS)
psflib_data_all += [data]
psflib_centers_all += [centers]
psflib_filenames_all += [os.path.split(frame.filename)[1] + '_INT%.0f' % (j + 1) for j in range(NINTS)]
psflib_data_all = np.concatenate(psflib_data_all)
if psflib_data_all.ndim != 3:
raise UserWarning('Some reference files do not have matching image shapes')
psflib_centers_all = np.concatenate(psflib_centers_all).reshape(-1, 2)
psflib_filenames_all = np.array(psflib_filenames_all)
# Recenter reference images.
new_center = (np.array(data.shape[1:])-1)/ 2.
new_center = new_center[::-1]
for i, image in enumerate(psflib_data_all):
recentered_image = pyklip.klip.align_and_scale(image, new_center=new_center, old_center=psflib_centers_all[i])
psflib_data_all[i] = recentered_image
psflib_centers_all[i] = new_center
# Append science data.
psflib_data_all = np.append(psflib_data_all, self._input, axis=0)
psflib_centers_all = np.append(psflib_centers_all, self._centers, axis=0)
psflib_filenames_all = np.append(psflib_filenames_all, self._filenames, axis=0)
# Initialize PSF library.
psflib = pyklip.rdi.PSFLibrary(psflib_data_all, new_center, psflib_filenames_all, compute_correlation=True, highpass=highpass)
# Prepare PSF library.
psflib.prepare_library(self)
# Assign pyKLIP variables.
self._psflib = psflib
else:
self._psflib = None
pass
[docs]
def savedata(self,
filepath,
data,
klipparams=None,
filetype='',
zaxis=None,
more_keywords=None):
"""
Function to save the data products that will be called internally by
pyKLIP.
Args:
filepath (path):
Path of the output FITS file.
data (3D-array):
KLIP-subtracted data of shape (nkl, ny, nx).
klipparams (str, optional):
PyKLIP keyword arguments used for the KLIP subtraction. The default
is None.
filetype (str, optional):
Data type of the pyKLIP product. The default is ''.
zaxis (list, optional):
List of KL modes used for the KLIP subtraction. The default is
None.
more_keywords (dict, optional):
Dictionary of additional header keywords to be written to the
output FITS file. The default is None.
"""
# Make FITS file.
hdul = fits.HDUList()
hdul.append(fits.PrimaryHDU(data))
# Write all used files to header. Ignore duplicates.
filenames = np.unique(self.filenames)
Nfiles = np.size(filenames)
hdul[0].header['DRPNFILE'] = (Nfiles, 'Num raw files used in pyKLIP')
for i, filename in enumerate(filenames):
if i < 1000:
hdul[0].header['FILE_{0}'.format(i)] = filename + '.fits'
else:
print('WARNING: Too many files to be written to header, skipping')
break
# Write PSF subtraction parameters and pyKLIP version to header.
try:
pyklipver = pyklip.__version__
except:
pyklipver = 'unknown'
hdul[0].header['PSFSUB'] = ('pyKLIP', 'PSF Subtraction Algo')
hdul[0].header.add_history('Reduced with pyKLIP using commit {0}'.format(pyklipver))
hdul[0].header['CREATOR'] = 'pyKLIP-{0}'.format(pyklipver)
hdul[0].header['pyklipv'] = (pyklipver, 'pyKLIP version that was used')
if klipparams is not None:
hdul[0].header['PSFPARAM'] = (klipparams, 'KLIP parameters')
hdul[0].header.add_history('pyKLIP reduction with parameters {0}'.format(klipparams))
# Write z-axis units to header if necessary.
if zaxis is not None:
if 'KL Mode' in filetype:
hdul[0].header['CTYPE3'] = 'KLMODES'
for i, klmode in enumerate(zaxis):
hdul[0].header['KLMODE{0}'.format(i)] = (klmode, 'KL Mode of slice {0}'.format(i))
# Write extra keywords to header if necessary.
if more_keywords is not None:
for hdr_key in more_keywords:
hdul[0].header[hdr_key] = more_keywords[hdr_key]
# Update image center.
center = self.output_centers[0]
hdul[0].header.update({'PSFCENTX': center[0], 'PSFCENTY': center[1]})
hdul[0].header.update({'CRPIX1': center[0] + 1, 'CRPIX2': center[1] + 1})
hdul[0].header.add_history('Image recentered to {0}'.format(str(center)))
# Write FITS file.
try:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=VerifyWarning) # fits save card length truncated warning
hdul.writeto(filepath, overwrite=True)
except TypeError:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=VerifyWarning) # fits save card length truncated warning
hdul.writeto(filepath, clobber=True)
hdul.close()
pass
[docs]
class NDFilterSweetSpotDataset(Image):
"""
Class for an ND filter sweet spot dataset product.
Typically stores an N×3 array of data:
[OD, x_center, y_center] for each measurement.
Args:
data_or_filepath (str or np.array): Either the filepath to the FITS file
to read in OR the 2D array of ND filter sweet-spot data (N×3).
pri_hdr (astropy.io.fits.Header): The primary header (required only if
raw 2D data is passed in).
ext_hdr (astropy.io.fits.Header): The image extension header (required
only if raw 2D data is passed in).
input_dataset (corgidrp.data.Dataset): The input dataset used to produce
this calibration file (optional). If this is a new product, you should
pass in the dataset so that the parent filenames can be recorded.
err (np.array): Optional 3D error array for the data.
dq (np.array): Optional 2D data-quality mask for the data.
err_hdr (astropy.io.fits.Header): Optional error extension header.
Attributes:
od_values (np.array): Array of OD measurements (length N).
x_values (np.array): Array of x-centroid positions (length N).
y_values (np.array): Array of y-centroid positions (length N).
"""
def __init__(
self,
data_or_filepath,
pri_hdr=None,
ext_hdr=None,
input_dataset=None,
err=None,
dq=None,
err_hdr=None
):
if input_dataset is not None:
pri_hdr, ext_hdr, err_hdr, dq_hdr = corgidrp.check.merge_headers(
input_dataset,
invalid_keywords=[
# Primary header keywords
'VISITID', 'FILETIME', 'PROGNUM', 'EXECNUM', 'CAMPAIGN',
'SEGMENT', 'OBSNUM', 'VISNUM', 'CPGSFILE', 'AUXFILE',
'VISTYPE', 'TARGET', 'RA', 'DEC', 'RAPM', 'DECPM',
'OPGAIN', 'PHTCNT', 'FRAMET', 'PA_V3', 'PA_APER',
'SVB_1', 'SVB_2', 'SVB_3', 'ROLL', 'PITCH', 'YAW',
'FILENAME', 'OBSNAME', 'WBJ_1', 'WBJ_2', 'WBJ_3',
# Extension header keywords
'BITPIX', 'BUNIT', 'ISHOWFSC', 'ISACQ', 'SPBAL', 'ISFLAT', 'SATSPOTS',
'STATUS', 'HVCBIAS', 'OPMODE',
'EXPTIME', 'EMGAIN_C', 'KGAINPAR',
'BLNKTIME', 'BLNKCYC', 'EXPCYC', 'OVEREXP', 'NOVEREXP',
'PROXET',
'FCMLOOP', 'FCMPOS', 'FSMINNER', 'FSMLOS', 'FSMPRFL', 'FSMRSTR',
'FSMSG1', 'FSMSG2', 'FSMSG3', 'FSMX', 'FSMY',
'EACQ_ROW', 'EACQ_COL', 'SB_FP_DX', 'SB_FP_DY', 'SB_FS_DX', 'SB_FS_DY',
'DMZLOOP',
'1SVALID', 'Z2AVG', 'Z2RES', 'Z2VAR', 'Z3AVG', 'Z3RES', 'Z3VAR',
'10SVALID', 'Z4AVG', 'Z4RES', 'Z5AVG', 'Z5RES',
'Z6AVG', 'Z6RES', 'Z7AVG', 'Z7RES', 'Z8AVG', 'Z8RES',
'Z9AVG', 'Z9RES', 'Z10AVG', 'Z10RES', 'Z11AVG', 'Z11RES',
'Z12AVG', 'Z13AVG', 'Z14AVG',
'FPAM_H', 'FPAM_V', 'FPAMNAME', 'FPAMSP_H', 'FPAMSP_V',
'DATETIME', 'FTIMEUTC', 'DATATYPE',
'FWC_PP_E', 'FWC_EM_E', 'SAT_DN',
'CRPIX1', 'CRPIX2', 'CDELT1', 'CDELT2', 'CD1_1', 'CD1_2', 'CD2_1', 'CD2_2',
],
)
else:
dq_hdr = None
super().__init__(
data_or_filepath,
pri_hdr=pri_hdr,
ext_hdr=ext_hdr,
err=err,
dq=dq,
err_hdr=err_hdr,
dq_hdr=dq_hdr
)
# 1. Check data shape: expect N×3 array for the sweet-spot dataset.
if self.data.ndim != 2 or self.data.shape[1] != 3:
raise ValueError(
"NDFilterSweetSpotDataset data must be a 2D array of shape (N, 3). "
f"Received shape {self.data.shape}."
)
# 2. Parse the columns into convenient attributes.
# Column 0: OD, Column 1: x_center, Column 2: y_center.
[docs]
self.od_values = self.data[:, 0]
[docs]
self.x_values = self.data[:, 1]
[docs]
self.y_values = self.data[:, 2]
# 3. If creating a new product (i.e. ext_hdr was passed in), record metadata.
if ext_hdr is not None:
if input_dataset is not None:
self._record_parent_filenames(input_dataset)
self.filename = re.sub('_l[0-9].', '_ndf_cal', input_dataset[-1].filename)
# if no input_dataset is given, do we want to set the filename manually using
# header values?
self.pri_hdr['FILENAME'] = self.filename
self.ext_hdr['DATATYPE'] = 'NDFilterSweetSpotDataset'
self.ext_hdr['HISTORY'] = (
f"NDFilterSweetSpotDataset created from {self.ext_hdr.get('DRPNFILE','?')} frames"
)
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
# 4. If reading from a file, verify that the header indicates the correct DATATYPE.
if 'DATATYPE' not in self.ext_hdr or self.ext_hdr['DATATYPE'] != 'NDFilterSweetSpotDataset':
raise ValueError("File that was loaded is not labeled as an NDFilterSweetSpotDataset file.")
[docs]
def interpolate_od(self, x, y, method="nearest"):
"""
Interpolates the data to get the OD at the requested x/y location
Args:
x (float): x detector pixel location
y (float): y detector pixel location
method (str): only "nearest" supported currently
Returns:
float: the OD at the requested point
"""
interpolator = LinearNDInterpolator(np.array([self.x_values, self.y_values]).T, self.od_values)
return interpolator(x, y)
[docs]
class NDSpectroscopy(Image):
"""
ND filter calibration product for spectroscopy.
For spectroscopy observations (DPAMNAME=PRISM*) - stores OD(lambda),
the optical depth of the ND filter as a function of wavelength.
Unlike the imaging mode ND filter calibration product, spectroscopy mode
ND filter calibration product is only measured at a single detector location.
Data shape: (2, M)
row 0: wavelengths in nm
row 1: OD(lambda) values (dimensionless)
Error shape: (1, 2, M)
err[0, 0, :]: wavelength uncertainties (nm)
err[0, 1, :]: OD uncertainties
Args:
data_or_filepath (str or np.array): filepath to an existing NDSpectroscopy
FITS file, or a (2, M) numpy array of [wavelengths, OD].
err (np.array): (1, 2, M) error array.
dq (np.array): (2, M) data-quality array.
pri_hdr (fits.Header): primary header (required if raw array passed in).
ext_hdr (fits.Header): extension header (required if raw array passed in).
err_hdr (fits.Header): error extension header.
input_dataset (corgidrp.data.Dataset): input frames used to create this
product (required if raw array passed in without DRPNFILE in ext_hdr).
Attributes:
wavelengths (np.array): length-M wavelength array in nm.
od_spectrum (np.array): length-M OD(lambda) array.
od_err (np.array): length-M OD uncertainty array.
wave_err (np.array): length-M wavelength uncertainty array.
"""
def __init__(self, data_or_filepath, err=None, dq=None,
pri_hdr=None, ext_hdr=None, err_hdr=None, input_dataset=None):
if input_dataset is not None:
pri_hdr, ext_hdr, err_hdr, dq_hdr = corgidrp.check.merge_headers(
input_dataset,
invalid_keywords=[
# Primary header keywords
'VISITID', 'FILETIME', 'PROGNUM', 'EXECNUM', 'CAMPAIGN',
'SEGMENT', 'OBSNUM', 'VISNUM', 'CPGSFILE', 'AUXFILE',
'VISTYPE', 'TARGET', 'RA', 'DEC', 'RAPM', 'DECPM',
'OPGAIN', 'PHTCNT', 'FRAMET', 'PA_V3', 'PA_APER',
'SVB_1', 'SVB_2', 'SVB_3', 'ROLL', 'PITCH', 'YAW',
'FILENAME', 'OBSNAME', 'WBJ_1', 'WBJ_2', 'WBJ_3',
# Extension header keywords
'BITPIX', 'BUNIT', 'ISHOWFSC', 'ISACQ', 'SPBAL', 'ISFLAT', 'SATSPOTS',
'STATUS', 'HVCBIAS', 'OPMODE',
'EXPTIME', 'EMGAIN_C', 'KGAINPAR',
'BLNKTIME', 'BLNKCYC', 'EXPCYC', 'OVEREXP', 'NOVEREXP',
'PROXET',
'FCMLOOP', 'FCMPOS', 'FSMINNER', 'FSMLOS', 'FSMPRFL', 'FSMRSTR',
'FSMSG1', 'FSMSG2', 'FSMSG3', 'FSMX', 'FSMY',
'EACQ_ROW', 'EACQ_COL', 'SB_FP_DX', 'SB_FP_DY', 'SB_FS_DX', 'SB_FS_DY',
'DMZLOOP',
'1SVALID', 'Z2AVG', 'Z2RES', 'Z2VAR', 'Z3AVG', 'Z3RES', 'Z3VAR',
'10SVALID', 'Z4AVG', 'Z4RES', 'Z5AVG', 'Z5RES',
'Z6AVG', 'Z6RES', 'Z7AVG', 'Z7RES', 'Z8AVG', 'Z8RES',
'Z9AVG', 'Z9RES', 'Z10AVG', 'Z10RES', 'Z11AVG', 'Z11RES',
'Z12AVG', 'Z13AVG', 'Z14AVG',
'FPAM_H', 'FPAM_V', 'FPAMNAME', 'FPAMSP_H', 'FPAMSP_V',
'DATETIME', 'FTIMEUTC', 'DATATYPE',
'FWC_PP_E', 'FWC_EM_E', 'SAT_DN',
'CRPIX1', 'CRPIX2', 'CDELT1', 'CDELT2', 'CD1_1', 'CD1_2', 'CD2_1', 'CD2_2',
],
)
else:
dq_hdr = None
super().__init__(
data_or_filepath,
pri_hdr=pri_hdr,
ext_hdr=ext_hdr,
err=err,
dq=dq,
err_hdr=err_hdr,
dq_hdr=dq_hdr,
)
# Shape validation - expect (2, M)
if self.data.ndim != 2 or self.data.shape[0] != 2:
raise ValueError(
"NDSpectroscopy data must be a 2D array of shape (2, M). "
f"Received shape {self.data.shape}."
)
# Class attributes
[docs]
self.wavelengths = self.data[0, :]
[docs]
self.od_spectrum = self.data[1, :]
if self.err is not None and self.err.shape == (1, 2, self.data.shape[1]):
self.wave_err = self.err[0, 0, :]
self.od_err = self.err[0, 1, :]
else:
self.wave_err = np.zeros_like(self.wavelengths)
self.od_err = np.zeros_like(self.od_spectrum)
# Bookkeeping info for new files
if ext_hdr is not None:
if input_dataset is not None:
self._record_parent_filenames(input_dataset)
orig_input_filename = input_dataset[-1].filename.split(".fits")[0]
self.filename = "{0}_nds_cal.fits".format(orig_input_filename)
self.filename = re.sub('_l[0-9].', '', self.filename)
self.ext_hdr['DATATYPE'] = 'NDSpectroscopy'
self.ext_hdr['BUNIT'] = '' # dimensionless OD
self.ext_hdr['DATALVL'] = 'CAL'
self.ext_hdr['HISTORY'] = "NDSpectroscopy OD(lambda) calibration created"
self.pri_hdr['FILENAME'] = self.filename
# Validate DATATYPE when loading from file
if 'DATATYPE' not in self.ext_hdr or self.ext_hdr['DATATYPE'] != 'NDSpectroscopy':
raise ValueError("File that was loaded is not labeled as an NDSpectroscopy file.")
[docs]
class MuellerMatrix(Image):
"""
Class for a Mueller matrix dataset product.
Stores a 4x4 Mueller matrix and its error
Args:
data_or_filepath (str or np.array): either the filepath to the FITS file to read in OR the 2D image data
pri_hdr (astropy.io.fits.Header): the primary header (required only if raw 2D data is passed in)
ext_hdr (astropy.io.fits.Header): the image extension header (required only if raw 2D data is passed in)
input_dataset (corgidrp.data.Dataset): the Image files combined together to make this Mueller Matrix (required only if raw 2D data is passed in)
"""
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None, input_dataset=None, err = None, err_hdr = None):
if input_dataset is not None:
pri_hdr, ext_hdr, err_hdr, dq_hdr = corgidrp.check.merge_headers(
input_dataset,
invalid_keywords=[
'VISITID', 'CDMSVERS', 'FSWDVERS', 'FILETIME', 'PROGNUM', 'EXECNUM','CAMPAIGN', 'SEGMENT','OBSNUM',
'VISNUM', 'CPGSFILE', 'AUXFILE', 'TARGET', 'RA', 'DEC', 'EQUINOX', 'RAPM', 'DECPM', 'OPGAIN', 'PHTCNT',
'FRAMET', 'PA_V3', 'PA_APER', 'SVB_1', 'SVB_2', 'SVB_3', 'ROLL', 'PITCH', 'YAW', 'FILENAME', 'OBSNAME',
'WBJ_1', 'WBJ_2', 'WBJ_3',
'SCTSRT', 'SCTEND', 'FRMTYPE','ISHOWSFC','ISACQ','SPBAL','ISFLAT','SATSPOTS','STATUS', 'HVCBIAS',
'OPMODE','EXPTIME','EMGAIN_C','UNITYG','EMGAINA1','EMGAINA2','EMGAINA3','EMGAINA4','EMGAINA5',
'GAINTCAL','EXCAMT','LOCAMT','EMGAIN_A','KGAINPAR','CYCLES','LASTEXP','BLNKTIME',
'BLNKCYC','EXPCYC','OVEREXP','NOVEREXP','ISPC','PROXET','FCMLOOP','FCMPOS','FSMINNER','FSMLOS',
'FSMPRFL','FSMRSTR','FSMSG1','FSMSG2','FSMSG3','FSMX','FSMY','EACQ_ROW','EACQ_COL','SB_FP_DX',
'SB_FP_DY', 'SB_FS_DX', 'SB_FS_DY', 'DMZLOOP', '1SVALID', 'Z2AVG', 'Z2RES', 'Z2VAR','Z3AVG','Z3RES',
'Z3VAR', '10SVALID', 'Z4AVG', 'Z4RES', 'Z5AVG','Z5RES', 'Z6AVG', 'Z6RES', 'Z7AVG', 'Z7RES', 'Z8AVG',
'Z8RES', 'Z9AVG', 'Z9RES', 'Z10AVG', 'Z10RES','Z11AVG', 'Z11RES', 'Z12AVG', 'Z12RES', 'Z13AVG', 'Z13RES', 'Z14AVG',
'DATETIME', 'FTIMEUTC','MJDSRT','MJDEND', 'DESMEAR','CTI_CORR','IS_BAD','FWC_PP_E','MWC_EM_E','SAT_DN',
'FRMSEL01','FRMSEL02','FRMSEL03','FRMSEL04','FRMSEL05','FRMSEL06','KGAIN_ER','RN','RN_ERR',
'DPAMNAME','DPAMSP_H','DPAMSP_V',
]
)
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err=err, err_hdr=err_hdr, dq_hdr=dq_hdr)
else:
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err=err)
# if this is a new Mueller Matrix map, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new Mueller Matrix
if ext_hdr is not None:
if input_dataset is None and 'DRPNFILE' not in ext_hdr.keys():
# error check. this is required in this case
raise ValueError("This appears to be a new Mueller Matrix. The dataset of input files needs to be passed in to the input_dataset keyword to record history of this Mueller Matrix.")
self.ext_hdr['DATATYPE'] = 'MuellerMatrix'
# log all the data that went into making this Mueller Matrix
self._record_parent_filenames(input_dataset)
# add to history
self.ext_hdr['HISTORY'] = "Mueller Matrix created"
# set the filename
self.filename = re.sub('_l[0-9].', '_mmx_cal', input_dataset[-1].filename)
self.pri_hdr['FILENAME'] = self.filename
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
if 'ND225' in self.ext_hdr['FPAMNAME'] or 'ND475' in self.ext_hdr['FPAMNAME']:
raise ValueError("Mueller Matrix cannot be created from ND225 or ND475 data, instead create an NDMuellerMatrix")
# double check that this is actually a bad pixel map that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a MuellerMatrix file.")
if self.ext_hdr['DATATYPE'] != 'MuellerMatrix':
raise ValueError("File that was loaded was not a MuellerMatrix file.")
self.dq_hdr['COMMENT'] = 'DQ not meaningful for this calibration; just present for class consistency'
[docs]
class NDMuellerMatrix(Image):
"""
Class for a Mueller matrix dataset product made with ND filter data.
Stores a 4x4 Mueller matrix and its error.
Args:
data_or_filepath (str or np.array): either the filepath to the FITS file to read in OR the 2D image data
pri_hdr (astropy.io.fits.Header): the primary header (required only if raw 2D data is passed in)
ext_hdr (astropy.io.fits.Header): the image extension header (required only if raw 2D data is passed in)
input_dataset (corgidrp.data.Dataset): the Image files combined together to make this Mueller Matrix (required only if raw 2D data is passed in)
err (astropy.io.fits.Header): the error array (required only if raw 2D data is passed in)
err_hdr (astropy.io.fits.Header): the error header (required only if raw 2D data is passed in)
"""
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None, input_dataset=None, err = None, err_hdr = None):
if input_dataset is not None:
pri_hdr, ext_hdr, err_hdr, dq_hdr = corgidrp.check.merge_headers(
input_dataset,
invalid_keywords=[
'VISITID', 'CDMSVERS', 'FSWDVERS', 'FILETIME', 'PROGNUM', 'EXECNUM','CAMPAIGN', 'SEGMENT','OBSNUM',
'VISNUM', 'CPGSFILE', 'AUXFILE', 'TARGET', 'RA', 'DEC', 'EQUINOX', 'RAPM', 'DECPM', 'OPGAIN', 'PHTCNT',
'FRAMET', 'PA_V3', 'PA_APER', 'SVB_1', 'SVB_2', 'SVB_3', 'ROLL', 'PITCH', 'YAW', 'FILENAME', 'OBSNAME',
'WBJ_1', 'WBJ_2', 'WBJ_3',
'SCTSRT', 'SCTEND', 'FRMTYPE','ISHOWSFC','ISACQ','SPBAL','ISFLAT','SATSPOTS','STATUS', 'HVCBIAS',
'OPMODE','EXPTIME','EMGAIN_C','UNITYG','EMGAINA1','EMGAINA2','EMGAINA3','EMGAINA4','EMGAINA5',
'GAINTCAL','EXCAMT','LOCAMT','EMGAIN_A','KGAINPAR','CYCLES','LASTEXP','BLNKTIME',
'BLNKCYC','EXPCYC','OVEREXP','NOVEREXP','ISPC','PROXET','FCMLOOP','FCMPOS','FSMINNER','FSMLOS',
'FSMPRFL','FSMRSTR','FSMSG1','FSMSG2','FSMSG3','FSMX','FSMY','EACQ_ROW','EACQ_COL','SB_FP_DX',
'SB_FP_DY', 'SB_FS_DX', 'SB_FS_DY', 'DMZLOOP', '1SVALID', 'Z2AVG', 'Z2RES', 'Z2VAR','Z3AVG','Z3RES',
'Z3VAR', '10SVALID', 'Z4AVG', 'Z4RES', 'Z5AVG','Z5RES', 'Z6AVG', 'Z6RES', 'Z7AVG', 'Z7RES', 'Z8AVG',
'Z8RES', 'Z9AVG', 'Z9RES', 'Z10AVG', 'Z10RES','Z11AVG', 'Z11RES', 'Z12AVG', 'Z12RES', 'Z13AVG', 'Z13RES', 'Z14AVG',
'DATETIME', 'FTIMEUTC','MJDSRT','MJDEND', 'DESMEAR','CTI_CORR','IS_BAD','FWC_PP_E','MWC_EM_E','SAT_DN',
'FRMSEL01','FRMSEL02','FRMSEL03','FRMSEL04','FRMSEL05','FRMSEL06','KGAIN_ER','RN','RN_ERR',
'DPAMNAME','DPAMSP_H','DPAMSP_V',
]
)
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err=err, err_hdr=err_hdr, dq_hdr=dq_hdr)
else:
super().__init__(data_or_filepath, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err=err)
# if this is a new ND Mueller Matrix map, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new Mueller Matrix
if ext_hdr is not None:
if input_dataset is None and 'DRPNFILE' not in ext_hdr.keys():
# error check. this is required in this case
raise ValueError("This appears to be a new ND Mueller Matrix. The dataset of input files needs to be passed in to the input_dataset keyword to record history of this ND Mueller Matrix.")
self.ext_hdr['DATATYPE'] = 'NDMuellerMatrix'
# log all the data that went into making this Mueller Matrix
self._record_parent_filenames(input_dataset)
# add to history
self.ext_hdr['HISTORY'] = "Mueller Matrix created"
# set the filename
self.filename = re.sub('_l[0-9].', '_ndm_cal', input_dataset[-1].filename)
self.pri_hdr['FILENAME'] = self.filename
# Enforce data level = CAL
self.ext_hdr['DATALVL'] = 'CAL'
if 'ND225' not in self.ext_hdr['FPAMNAME'] and 'ND475' not in self.ext_hdr['FPAMNAME']:
raise ValueError("An ND Mueller Matrix is only for datasets with ND225 or ND475. You may want to instead create a MuellerMatrix instead.")
# double check that this is actually a bad pixel map that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a ND MuellerMatrix file.")
if self.ext_hdr['DATATYPE'] != 'NDMuellerMatrix':
raise ValueError("File that was loaded was not a ND MuellerMatrix file.")
self.dq_hdr['COMMENT'] = 'DQ not meaningful for this calibration; just present for class consistency'
[docs]
class FlatFieldPOL0:
def __init__(self, ref: FlatField):
[docs]
class FlatFieldPOL45:
def __init__(self, ref: FlatField):
[docs]
class FluxcalFactorPOL0:
def __init__(self, ref: FluxcalFactor):
[docs]
class FluxcalFactorPOL45:
def __init__(self, ref: FluxcalFactor):
[docs]
datatypes = { "Image" : Image,
"Dark" : Dark,
"NonLinearityCalibration" : NonLinearityCalibration,
"KGain" : KGain,
"BadPixelMap" : BadPixelMap,
"DetectorNoiseMaps": DetectorNoiseMaps,
"FlatField" : FlatField,
"FlatFieldPOL0" : FlatFieldPOL0,
"FlatFieldPOL45" : FlatFieldPOL45,
"DetectorParams" : DetectorParams,
"AstrometricCalibration" : AstrometricCalibration,
"TrapCalibration" : TrapCalibration,
"FluxcalFactor" : FluxcalFactor,
"FluxcalFactorPOL0" : FluxcalFactorPOL0,
"FluxcalFactorPOL45" : FluxcalFactorPOL45,
"FpamFsamCal" : FpamFsamCal,
"CoreThroughputMap" : CoreThroughputMap,
"CoreThroughputCalibration": CoreThroughputCalibration,
"NDFilterSweetSpotDataset": NDFilterSweetSpotDataset,
"NDSpectroscopy": NDSpectroscopy,
"SpectroscopyCentroidPSF": SpectroscopyCentroidPSF,
"DispersionModel": DispersionModel,
"LineSpread": LineSpread,
"SpecFilterOffset": SpecFilterOffset,
"MuellerMatrix": MuellerMatrix,
"NDMuellerMatrix": NDMuellerMatrix,
"SpecFluxCal": SpecFluxCal,
"SlitTransmission": SlitTransmission }
[docs]
def autoload(filepath):
"""
Loads the supplied FITS file filepath using the appropriate data class
Should be used sparingly to avoid accidentally loading in data of the wrong type
Args:
filepath (str): path to FITS file
Returns:
corgidrp.data.* : an instance of one of the data classes specified here
"""
with fits.open(filepath) as hdulist:
# check the exthdr for datatype
if 'DATATYPE' in hdulist[1].header:
dtype = hdulist[1].header['DATATYPE']
else:
# datatype not specified. Check if it's 2D
if len(hdulist[1].data.shape) == 2:
# a standard image (possibly a science frame)
dtype = "Image"
else:
errmsg = "Could not determine datatype for {0}. Data shape of {1} is not 2-D"
raise ValueError(errmsg.format(filepath, hdulist[1].data.shape))
# if we got here, we have a datatype
data_class = datatypes[dtype]
# use the class constructor to load in the data
frame = data_class(filepath)
return frame
[docs]
def unpackbits_64uint(arr, axis):
"""
Unpacking bits from a 64-bit unsigned integer array
Args:
arr (np.ndarray): the array to unpack
axis (int): axis to unpack
Returns:
np.ndarray of bits
"""
arr = arr.astype('>u8')
n = arr.view('u1')
return np.unpackbits(n, axis=axis, bitorder='big')
[docs]
def packbits_64uint(arr, axis):
"""
Packing bits into a 64-bit unsigned integer array
Args:
arr (np.ndarray): the array to pack
axis (int): axis to pack
Returns:
np.ndarray of 64-bit unsigned integers
"""
return np.packbits(arr, axis=axis, bitorder='big').view('>u8')
[docs]
def get_flag_to_bit_map():
"""
Returns a dictionary mapping flag names to bit positions.
Returns:
dict: A dictionary with flag names as keys and bit positions (int) as values.
"""
return {
"bad_pixel_unspecified": 0,
"data_replaced_by_filled_value": 1,
"bad_pixel": 2,
"hot_pixel": 3,
"not_used": 4,
"full_well_saturated_pixel": 5,
"non_linear_pixel": 6,
"pixel_affected_by_cosmic_ray": 7,
"TBD": 8,
}
[docs]
def get_flag_to_value_map():
"""
Returns a dictionary mapping flag names to their decimal flag values. Example usage is as follows:
FLAG_TO_VALUE_MAP = get_flag_to_value_map()
flag_value = FLAG_TO_VALUE_MAP["TBD"] # Gives the decimal value corresponding to "TBD"
Returns:
dict: A dictionary with flag names as keys and decimal values (int) as values.
"""
return {name: (1 << bit) for name, bit in get_flag_to_bit_map().items()}
[docs]
def get_value_to_flag_map():
"""
Returns a dictionary mapping flag decimal values to flag names. Example usage is as follows:
FLAG_TO_BIT_MAP = get_flag_to_bit_map()
bit_position = FLAG_TO_BIT_MAP["TBD"] # Gives which index it should be in with the key.
Expected position of bit_position of the unpacked array = 63 - bit_position
Returns:
dict: A dictionary with decimal values (int) as keys and flag names as values.
"""
return {value: name for name, value in get_flag_to_value_map().items()}
[docs]
def get_bit_to_flag_map():
"""
Returns a dictionary mapping bit positions to flag names. Example usage is as follows:
BIT_TO_FLAG_MAP = get_bit_to_flag_map()
flag_name_from_bit = BIT_TO_FLAG_MAP[8] # Expected: "TBD"
Returns:
dict: A dictionary with bit positions (int) as keys and flag names as values.
"""
return {bit: name for name, bit in get_flag_to_bit_map().items()}
[docs]
def get_stokes_intensity_image(stokes_image):
"""Return a copy containing only the Stokes I plane for photometry.
Args:
stokes_image (Image): L4 polarimetry Image to extract first plane
(Stokes I) from
Returns:
Image: New Image containing the Stokes I data and all required extensions
"""
intensity_image = stokes_image.copy(copy_data=True)
intensity_image.data = stokes_image.data[0].copy()
err_slice = stokes_image.err[0]
err_layer = err_slice if err_slice.ndim == 3 else np.array([err_slice])
if err_layer.shape[0] != 1:
err_layer = err_layer[:1]
intensity_image.err = err_layer.copy()
intensity_image.dq = stokes_image.dq[0].copy()
intensity_image.ext_hdr.add_history("Extracted Stokes-I plane via get_stokes_intensity_image.")
return intensity_image