"""
Calibration tracking system. Modified from kpicdrp caldb implmentation (Copyright (c) 2024, KPIC Team)
"""
import os
import numpy as np
import pandas as pd
import corgidrp
import corgidrp.data as data
import corgidrp.mocks as mocks
import corgidrp.spec as spec
import corgidrp.flat as flat
import corgidrp.detector as detector
import astropy.time as time
import astropy.io.fits as fits
from astropy.table import Table
import datetime
[docs]
column_dtypes = {
"Filepath": str,
"Type": str,
"MJD": float,
"EXPTIME": float,
"Files Used": int,
"Date Created": float,
"Hash": str,
"DRPVERSN": str,
"OBSNUM": str,
"NAXIS1": int,
"NAXIS2": int,
"OPMODE": str,
"EMGAIN_C": float,
"EXCAMT": float,
"CFAMNAME": str,
"DPAMNAME": str,
"FPAMNAME": str,
"FSAMNAME": str,
"SPAMNAME": str,
"PC_STAT": str
}
[docs]
default_values = {
str : "",
float : np.nan,
int : -1
}
[docs]
column_names = list(column_dtypes.keys())
[docs]
labels = {data.Dark: "Dark",
data.NonLinearityCalibration: "NonLinearityCalibration",
data.KGain : "KGain",
data.BadPixelMap: "BadPixelMap",
data.DetectorNoiseMaps: "DetectorNoiseMaps",
data.FlatField : "FlatField",
data.FlatFieldPOL0 : "FlatField",
data.FlatFieldPOL45 : "FlatField",
data.DetectorParams : "DetectorParams",
data.AstrometricCalibration : "AstrometricCalibration",
data.TrapCalibration : "TrapCalibration",
data.FluxcalFactor : "FluxcalFactor",
data.FluxcalFactorPOL0 : "FluxcalFactor",
data.FluxcalFactorPOL45 : "FluxcalFactor",
data.FpamFsamCal : "FpamFsamCal",
data.CoreThroughputCalibration: "CoreThroughputCalibration",
data.NDFilterSweetSpotDataset: "NDFilterSweetSpot",
data.NDSpectroscopy: "NDSpectroscopy",
data.SpectroscopyCentroidPSF: "SpectroscopyCentroidPSF",
data.DispersionModel: "DispersionModel",
data.MuellerMatrix: "MuellerMatrix",
data.NDMuellerMatrix: "NDMuellerMatrix",
data.SpecFilterOffset: "SpecFilterOffset",
data.SpecFluxCal: "SpecFluxCal",
data.SlitTransmission: "SlitTransmission",
data.LineSpread: "LineSpread"
}
[docs]
class CalDB:
"""
Database for tracking calibration files saved to disk. Modified from the kpicdrp version
Note that database is not parallelism-safe, but should be ok in most cases.
(Jason: look at using posix_ipc to guarantee thread safety if we really need it)
Args:
filepath (str): [optional] filepath to a CSV file with an existing database
Fields:
columns (list): column names of dataframe
filepath(str): full filepath to data
"""
def __init__(self, filepath=""):
"""
Args:
filepath (str): [optional] filepath to a CSV file with an existing database
"""
# If filepath is not passed in, use the default (majority of case)
if len(filepath) == 0:
self.filepath = corgidrp.caldb_filepath
else:
# possibly edge case where we want to use specialized caldb
self.filepath = filepath
# if database does't exist, create a blank one
if not os.path.exists(self.filepath):
# new database
self.columns = column_names
self._db = pd.DataFrame(columns=self.columns)
self.save()
else:
# already a database exists
self.load()
self.columns = list(self._db.columns.values)
[docs]
def load(self):
"""
Load/update db from filepath
"""
self._db = pd.read_csv(self.filepath)
# Scan the database for any columns that might be missing, fill in missing columns with default values if necessary
for col in column_names:
if col not in self._db.columns:
self._db[col] = default_values[column_dtypes[col]]
# fill any NA values with defaults and enforce column types
for col, dtype in column_dtypes.items():
if col in self._db.columns:
self._db[col] = self._db[col].fillna(default_values[dtype]).astype(dtype)
[docs]
def save(self):
"""
Save file without numbered index to disk with user specified filepath as a CSV file
"""
self._db.to_csv(self.filepath, index=False)
def _get_values_from_entry(self, entry, is_calib=True):
"""
Extract the properties from this data entry to ingest them into the database
Args:
entry (corgidrp.data.Image subclass): calibration frame to add to the database
is_calib (bool): is a calibration frame. if Not, it won't look up filetype.
Only used in get_calib() to grab metadata for science frames
Returns:
tuple:
row (list):
List of data entry properties
row_dict (dict):
Dictionary of data entry properties keyed by column names
"""
# return a dummy entry if nothing is passed in
if entry is None:
time_now = time.Time.now()
row_dict = {
"Filepath" : "",
"Type" : "Sci",
"MJD" : time_now.mjd,
"EXPTIME" : 0.,
"Files Used" : 0,
"Date Created" : time_now.mjd,
"Hash" : hash(time_now),
"DRPVERSN" : "0.0",
"OBSNUM" : "000",
"NAXIS1": 0,
"NAXIS2" : 0,
"OPMODE" : "",
"EMGAIN_C" : 0.,
"EXCAMT" : 0,
"CFAMNAME": ""
}
return list(row_dict.values()), row_dict
filepath = os.path.abspath(entry.filepath)
if is_calib:
datatype = labels[entry.__class__] # get the database str representation
else:
datatype = "Sci"
mjd = float(entry.ext_hdr["MJDSRT"])
exptime = entry.ext_hdr["EXPTIME"]
if exptime is None:
exptime = np.nan
# check if this exists. will be a keyword written by corgidrp
if "DRPNFILE" in entry.ext_hdr:
files_used = entry.ext_hdr["DRPNFILE"]
if files_used is None:
files_used = -1
else:
files_used = -1
if "DRPCTIME" in entry.ext_hdr:
date_created = time.Time(entry.ext_hdr["DRPCTIME"]).mjd
if date_created is None:
date_created = np.nan
else:
date_created = np.nan
if "DRPVERSN" in entry.ext_hdr:
drp_version = entry.ext_hdr["DRPVERSN"]
if drp_version is None:
drp_version = ""
else:
drp_version = ""
if "OBSNUM" in entry.pri_hdr:
obsid = str(entry.pri_hdr["OBSNUM"]) # force to be str
if obsid is None:
obsid = ""
else:
obsid = ""
hash_val = entry.get_hash()
# this only works for 2D images. may need to adapt for non-2D calibration frames
# import IPython; IPython.embed()
entry_shape = entry.data.shape
if len(entry_shape) < 2:
naxis1 = entry.data.shape[-1]
naxis2 = 0
else:
naxis1 = entry.data.shape[-1]
naxis2 = entry.data.shape[-2]
# naxis1 = entry.data.shape[-1]
# naxis2 = entry.data.shape[-2]
row = [
filepath,
datatype,
mjd,
exptime,
files_used,
date_created,
hash_val,
drp_version,
obsid,
naxis1,
naxis2,
]
# rest are ext_hdr keys we can copy
start_index = len(row)
for i in range(start_index, len(self.columns)):
if self.columns[i] not in entry.ext_hdr:
row.append(default_values[column_dtypes[self.columns[i]]])
else:
val = entry.ext_hdr[self.columns[i]]
if val is not None:
row.append(val) # add value staright from header
else:
# if value is not in header, use default value
row.append(default_values[column_dtypes[self.columns[i]]])
row_dict = {}
for key, val in zip(self.columns, row):
row_dict[key] = val
return row, row_dict
[docs]
def create_entry(self, entry, to_disk=True):
"""
Add a new entry to or update an existing one in the database. Note that function by default will load and save db to disk
Args:
entry (corgidrp.data.Image subclass): calibration frame to add to the database
to_disk (bool): True by default, will update DB from disk before adding entry and saving it back to disk
"""
if not os.path.exists(entry.filepath):
raise FileNotFoundError("Calibration file {0} does not exist on disk; save it before calling create_entry.".format(entry.filepath))
new_row, row_dict = self._get_values_from_entry(entry)
# update database from disk in case anything changed
if to_disk:
self.load()
# use filepath as key to see if it's already in database
if row_dict["Filepath"] in self._db.values:
row_index = self._db[
self._db["Filepath"] == row_dict["Filepath"]
].index.values
self._db.loc[row_index, self.columns] = new_row
# otherwise create new entry
else:
new_entry = pd.DataFrame([new_row], columns=self.columns)
if len(self._db) == 0:
self._db = new_entry
else:
self._db = pd.concat([self._db, new_entry], ignore_index=True)
# save to disk to update changes
if to_disk:
self.save()
[docs]
def remove_entry(self, entry, to_disk=True):
"""
Remove an entry from the database. Removes the entire row
Args:
entry (corgidrp.data.Image subclass): calibration frame to add to the database
to_disk (bool): True by default, will update DB from disk before adding entry and saving it back to disk
"""
new_row, row_dict = self._get_values_from_entry(entry)
# update database from disk in case anything changed
if to_disk:
self.load()
if row_dict["Filepath"] in self._db.values:
entry_index = self._db[
self._db["Filepath"] == row_dict["Filepath"]
].index.values
self._db = self._db.drop(self._db.index[entry_index])
self._db = self._db.reset_index(drop=True)
else:
raise ValueError("No filepath found so could not remove.")
# save to disk to update changes
if to_disk:
self.save()
[docs]
def get_calib(self, frame, dtype, to_disk=True):
"""
Outputs the best calibration file of the given type for the input science frame.
Args:
frame (corgidrp.data.Image): an image frame to request a calibration for. If None is passed in, looks for the
most recently created calibration.
dtype (corgidrp.data Class): for example: corgidrp.data.Dark (TODO: document the entire list of options)
to_disk (bool): True by default, will update DB from disk before matching
Returns:
corgidrp.data.*: an instance of the appropriate calibration type (Exact type depends on calibration type)
"""
if dtype not in labels:
raise ValueError(
"Requested calibration dtype of {0} not a valid option".format(dtype)
)
dtype_label = labels[dtype]
# get values for this science frame
_, frame_dict = self._get_values_from_entry(frame, is_calib=False)
# update database from disk in case anything changed
if to_disk:
self.load()
# downselect to only calibs of this type
calibdf = self._db[self._db["Type"] == dtype_label]
if len(calibdf) == 0:
raise ValueError("No valid {0} calibration in caldb located at {1}".format(dtype_label, self.filepath))
# different logic for different cases
# each if/else statement sets options_sorted: candidates ordered by preference (best first)
if frame is None:
# no frame is passed in, get the most recently created
options_sorted = calibdf.sort_values("Date Created", ascending=False)
elif dtype_label in ["Dark"]:
# match on exposure time and EM gain
options = self.filter_calib(calibdf, "EXPTIME", frame_dict["EXPTIME"], err_if_none=True)
options = self.filter_calib(options, "EMGAIN_C", frame_dict["EMGAIN_C"], err_if_none=True)
# for analog frames, exclude PC master darks. for PC frames, prefer them
is_pc = frame.ext_hdr.get('ISPC', 0)
if is_pc:
# prefer PC master dark if available, otherwise fall back to any matching dark
pc_options = options[options['PC_STAT'] == 'photon-counted master dark']
if len(pc_options) > 0:
options = pc_options
else:
# analog frame: do not select a matching PC master dark
options = options[options['PC_STAT'] != 'photon-counted master dark']
if len(options) == 0:
raise ValueError(
"No valid analog Dark calibration found in caldb located at {0} "
"(no analog master darks with EXPTIME={1} and EMGAIN_C={2}).".format(
self.filepath, frame_dict["EXPTIME"], frame_dict["EMGAIN_C"]
)
)
# sort by closest in time
options_sorted = options.iloc[np.argsort(np.abs(options["MJD"] - frame_dict["MJD"]))]
elif dtype_label in ['FluxcalFactor']:
# filter by color filter and DPAM
# FluxcalFactorPOL0/FluxcalFactorPOL45 force the DPAMNAME for lookup
if dtype == data.FluxcalFactorPOL0:
dpamname = "POL0"
elif dtype == data.FluxcalFactorPOL45:
dpamname = "POL45"
else:
dpamname = frame_dict['DPAMNAME']
options = self.filter_calib(calibdf, "CFAMNAME", frame_dict['CFAMNAME'], err_if_none=True)
options = self.filter_calib(options, "DPAMNAME", dpamname, err_if_none=True)
# sort by closest in time
options_sorted = options.iloc[np.argsort(np.abs(options["MJD"] - frame_dict["MJD"]))]
# FluxcalFactorPOL0/FluxcalFactorPOL45 are looked up as FluxcalFactor entries on disk
dtype = data.FluxcalFactor
elif dtype_label in ['SpecFluxCal']:
# filter by color filter and DPAM
if frame_dict['CFAMNAME'] in ['2F', '3F', '2A', '2B', '2C', '3A', '3B', '3C', '3D', '3E', '3G']:
value = list(frame_dict['CFAMNAME'])[0] + 'F'
options = self.filter_calib(calibdf, "CFAMNAME", value, err_if_none=True)
else:
options = self.filter_calib(calibdf, "CFAMNAME", frame_dict['CFAMNAME'], err_if_none=True)
options = self.filter_calib(options, "DPAMNAME", frame_dict['DPAMNAME'], err_if_none=True)
# sort by closest in time
options_sorted = options.iloc[np.argsort(np.abs(options["MJD"] - frame_dict["MJD"]))]
dtype = data.SpecFluxCal
elif dtype_label in ['CoreThroughputCalibration']:
# filter by focal plane mask
options = self.filter_calib(calibdf, "FPAMNAME", frame_dict['FPAMNAME'], err_if_none=True)
options = self.filter_calib(options, "CFAMNAME", frame_dict['CFAMNAME'], err_if_none=True)
# sort by closest in time
options_sorted = options.iloc[np.argsort(np.abs(options["MJD"] - frame_dict["MJD"]))]
elif dtype_label in ['FlatField']:
# DPAM: IMAGING, POL0, or POL45 only. All other DPAM settings, including PUPIL, use flat = ones
# CFAM: spectroscopy bands (2F/3F and their sub-filters) and CLEAR use flat = ones
# all other non-imaging CFAM positions should use a custom recipe, not auto-selection
# Sub-bands 1A/1B/1C and 4A/4B/4C are mapped to their corresponding broadband (1F, 4F)
# FPAM: FPM-in data uses the open-substrate flat for the appropriate band pair (OPEN_12 or OPEN_34)
spectroscopy_cfams = {'2F', '3F', '2A', '2B', '2C', '3A', '3B', '3C', '3D', '3E', '3G'}
imaging_cfams = {'1F', '1A', '1B', '1C', '4F', '4A', '4B', '4C'}
cfam_subband_map = {'1A': '1F', '1B': '1F', '1C': '1F',
'4A': '4F', '4B': '4F', '4C': '4F'}
ones_flat_cfams = spectroscopy_cfams | {'CLEAR'}
if frame_dict['DPAMNAME'] not in ['IMAGING', 'POL0', 'POL45'] or frame_dict['CFAMNAME'] in ones_flat_cfams:
# Use an all-ones flat file identified with FPAMNAME='ONES'
options = calibdf[calibdf["FPAMNAME"] == "ONES"]
if len(options) == 0:
raise ValueError("No ones-flat FlatField calibration found in caldb at {0}".format(self.filepath))
# Any ones-flat is equivalent, choose the most recently created
options_sorted = options.sort_values("MJD", ascending=False)
elif frame_dict['CFAMNAME'] not in imaging_cfams:
raise ValueError(
"No flat defined for CFAMNAME={0}, use a custom recipe".format(frame_dict['CFAMNAME'])
)
else:
# sub-bands map to their corresponding broadband flat
cfam_lookup = cfam_subband_map.get(frame_dict['CFAMNAME'], frame_dict['CFAMNAME'])
options = self.filter_calib(calibdf, "CFAMNAME", cfam_lookup, err_if_none=True)
options = self.filter_calib(options, "DPAMNAME", frame_dict['DPAMNAME'], err_if_none=True)
# FPAM should be either OPEN_12 for bands 1&2, or OPEN_34 for bands 3&4
options = options[options["FPAMNAME"].str.startswith("OPEN")]
if len(options) == 0:
raise ValueError("No FlatField with OPEN FPAM found in caldb at {0} for CFAMNAME={1}, DPAMNAME={2}".format(
self.filepath, cfam_lookup, frame_dict['DPAMNAME']))
options_sorted = options.iloc[np.argsort(np.abs(options["MJD"] - frame_dict["MJD"]))]
# FlatFieldPOL0/FlatFieldPOL45 are looked up as FlatField entries on disk
dtype = data.FlatField
elif dtype_label in ['MuellerMatrix']:
# filter by color filter
options = self.filter_calib(calibdf, "CFAMNAME", frame_dict['CFAMNAME'], err_if_none=True)
# sort by closest in time
options_sorted = options.iloc[np.argsort(np.abs(options["MJD"] - frame_dict["MJD"]))]
elif dtype_label in ['SlitTransmission']:
# filter by slit mask (FSAM), prism (DPAM), and color filter (CFAM)
options = self.filter_calib(calibdf, "FSAMNAME", frame_dict['FSAMNAME'], err_if_none=True)
options = self.filter_calib(options, "DPAMNAME", frame_dict['DPAMNAME'], err_if_none=True)
options = self.filter_calib(options, "CFAMNAME", frame_dict['CFAMNAME'], err_if_none=True)
# sort by closest in time
options_sorted = options.iloc[np.argsort(np.abs(options["MJD"] - frame_dict["MJD"]))]
elif dtype_label in ['LineSpread']:
# filter by prism (DPAM) and color filter (CFAM)
options = self.filter_calib(calibdf, "DPAMNAME", frame_dict['DPAMNAME'], err_if_none=True)
options = self.filter_calib(options, "CFAMNAME", frame_dict['CFAMNAME'], err_if_none=True)
# sort by closest in time
options_sorted = options.iloc[np.argsort(np.abs(options["MJD"] - frame_dict["MJD"]))]
elif dtype_label in ['SpectroscopyCentroidPSF']:
# filter by prism (DPAM) and color filter (CFAM)
options = self.filter_calib(calibdf, "DPAMNAME", frame_dict['DPAMNAME'], err_if_none=True)
options = self.filter_calib(options, "CFAMNAME", frame_dict['CFAMNAME'], err_if_none=True)
# sort by closest in time
options_sorted = options.iloc[np.argsort(np.abs(options["MJD"] - frame_dict["MJD"]))]
else:
# sort by closest in time
options_sorted = calibdf.iloc[np.argsort(np.abs(calibdf["MJD"] - frame_dict["MJD"]))]
# load the object from disk and return it
calib_filepath = self._pick_existing(options_sorted, dtype_label)
return dtype(calib_filepath)
[docs]
def scan_dir_for_new_entries(self, filedir, look_in_subfolders=True, to_disk=True):
"""
Scan a folder and subfolder for calibration files and add them all to the caldb
Args:
filedir (str): path to folder to scan (includes all subfolders by default)
look_in_subfolders (bool): whether to look in subfolders for files. True by default
to_disk (bool): True by default, will update DB from disk before adding entry and saving it back to disk
"""
calib_frames = []
# walk the directory to find all the calibration files
for dirpath, subfolders, filenames in os.walk(filedir):
for filename in filenames:
# hard coded check only for files that end in .fits
if filename[-5:] != ".fits":
continue
filepath = os.path.join(dirpath, filename)
try:
frame = data.autoload(filepath)
except Exception:
continue
# check what class it has been loaded as. only save frames that fall into calibration classes
if frame.__class__ in labels:
calib_frames.append(frame)
# the first iteration looks in the basedir
# if we don't wnat to look in subdirs now, we should break
if not look_in_subfolders:
break
# load all these files into the caldb
for calib_frame in calib_frames:
self.create_entry(calib_frame, to_disk=to_disk)
def _pick_existing(self, options, dtype_label):
"""Walk candidates in preference order and return the first filepath that exists on disk.
Args:
options (pd.DataFrame): candidate calibration rows sorted in preference order (best first)
dtype_label (str): calibration type label used in error messages
Returns:
str: filepath of the best existing calibration
Raises:
ValueError: if no candidate file exists on disk
"""
for filepath in options["Filepath"]:
if os.path.exists(filepath):
return filepath
print("Calibration file {0} no longer exists on disk, trying next best option.".format(filepath))
raise ValueError(
"No valid {0} calibration in caldb located at {1}: "
"all matching entries reference files that no longer exist on disk".format(
dtype_label, self.filepath
)
)
[docs]
def filter_calib(self, calibdf, col_name, value, err_if_none=True):
'''
Takes in a calibration dataframe, filters them so that
only the files with matching header values are returned. If none is found,
this function is omitted and the original list is returned or an error is
thrown depending on the err_if_none parameter.
Args:
calibdf (pd.DataFrame): database containing the potential calibration files
col_name (string): name of the column that we want to look for matches in
value (string/float/int): value of the column entry to filter by
err_if_none (optional, boolean): tells the function whether to throw an error
or not if no matches are found.
Returns:
filtered_calibdf (pd.DataFrame): database containing only the calibration files
with matching values, or the original database if no matches are found and
err_if_none is set to false.
'''
filtered_calibdf = calibdf.loc[
(
(calibdf[col_name] == value)
)
]
if len(filtered_calibdf) == 0:
# throws an error if err_if_none=True
if err_if_none:
raise ValueError(f"No valid calibration with {col_name}={value})")
else:
print(f"No valid calibration with {col_name}={value}")
return calibdf
return filtered_calibdf
[docs]
def initialize():
"""
Creates default calibrations and caldb if it doesn't exist
"""
global initialized
### Create set of default calibrations
rescan_needed = False
# Add default detector_params calibration file if it doesn't exist
if not os.path.exists(os.path.join(corgidrp.default_cal_dir, "DetectorParams_2023-11-01T00.00.00.000.fits")):
date_valid = time.Time("2023-11-01 00:00:00", scale='utc')
sctsrt_str = date_valid.to_datetime().strftime("%Y-%m-%dT%H:%M:%S")
default_detparams = data.DetectorParams({}, date_valid=date_valid)
default_detparams.ext_hdr['SCTSRT'] = sctsrt_str
default_detparams.ext_hdr['SCTEND'] = sctsrt_str
default_detparams.save(filedir=corgidrp.default_cal_dir, filename="DetectorParams_2023-11-01T00.00.00.000.fits")
rescan_needed = True
# Add default FpamFsamCal calibration file if it doesn't exist
if not os.path.exists(os.path.join(corgidrp.default_cal_dir, "FpamFsamCal_2024-02-10T00.00.00.000.fits")):
date_valid = time.Time("2024-02-10 00:00:00", scale='utc')
sctsrt_str = date_valid.to_datetime().strftime("%Y-%m-%dT%H:%M:%S")
fpamfsam_2excam = data.FpamFsamCal([], date_valid=date_valid)
fpamfsam_2excam.ext_hdr['MJDSRT'] = float(date_valid.mjd)
fpamfsam_2excam.ext_hdr['SCTSRT'] = sctsrt_str
fpamfsam_2excam.ext_hdr['SCTEND'] = sctsrt_str
fpamfsam_2excam.save(filedir=corgidrp.default_cal_dir)
rescan_needed = True
# Add default SpecFilterOffset calibration file if it doesn't exist
if not os.path.exists(os.path.join(corgidrp.default_cal_dir, "SpecFilterOffset_2025-12-10T00.00.00.000.fits")):
date_valid = time.Time("2025-12-10 00:00:00", scale='utc')
sctsrt_str = date_valid.to_datetime().strftime("%Y-%m-%dT%H:%M:%S")
spec_filter = data.SpecFilterOffset({}, date_valid=date_valid)
spec_filter.ext_hdr['MJDSRT'] = float(date_valid.mjd)
spec_filter.ext_hdr['SCTSRT'] = sctsrt_str
spec_filter.ext_hdr['SCTEND'] = sctsrt_str
spec_filter.save(filedir=corgidrp.default_cal_dir)
rescan_needed = True
# Add default DispersionModel calibration file if it doesn't exist
if not os.path.exists(os.path.join(corgidrp.default_cal_dir, 'cgi_0200001001001001001_20240210t0000000_dpm_cal.fits')):
spec_datadir = os.path.join(os.path.split(corgidrp.__file__)[0], "data", "spectroscopy")
output_dir = corgidrp.default_cal_dir
prihdr, exthdr, errhdr, dqhdr, biashdr = mocks.create_default_L2b_headers()
dt_time = time.Time("2024-02-10 00:00:00", scale='utc')
dt = dt_time.to_datetime()
dt_str = dt.strftime("%Y-%m-%dT%H:%M:%S")
ftime = dt.strftime("%Y%m%dt%H%M%S%f")[:-5]
disp_filename = f"cgi_{prihdr['VISITID']}_{ftime}_dpm_cal.fits"
prihdr['FILETIME'] = dt_str
prihdr['FILENAME'] = disp_filename
exthdr['DATETIME'] = dt_str
exthdr['FTIMEUTC'] = dt_str
exthdr['MJDSRT'] = float(dt_time.mjd)
exthdr['SCTSRT'] = dt_str
exthdr['SCTEND'] = dt_str
# not physically relevant since we are just constructing the calibration product for the dispersion model, not
# the observations that produced it, but just to avoid confusion, we set the values to something sensible
exthdr['DPAMNAME'] = 'PRISM3'
exthdr['CFAMNAME'] = '3F'
exthdr['FSAMNAME'] = 'OPEN'
# these below, however, are needed for the DispersionModel calibration
exthdr["REFWAVE"] = 730.
exthdr["BAND"] = '3'
band_list = spec.read_cent_wave('3')
band_center = band_list[0]
fwhm = band_list[1]
bandpass_frac = fwhm/band_center
exthdr["BANDFRAC"] = bandpass_frac
disp_file_path = os.path.join(spec_datadir, "TVAC_PRISM3_dispersion_profile.npz")
disp_params = np.load(disp_file_path)
disp_dict = {'clocking_angle': disp_params['clocking_angle'],
'clocking_angle_uncertainty': disp_params['clocking_angle_uncertainty'],
'pos_vs_wavlen_polycoeff': disp_params['pos_vs_wavlen_polycoeff'],
'pos_vs_wavlen_cov' : disp_params['pos_vs_wavlen_cov'],
'wavlen_vs_pos_polycoeff': disp_params['wavlen_vs_pos_polycoeff'],
'wavlen_vs_pos_cov': disp_params['wavlen_vs_pos_cov']}
disp_model = data.DispersionModel(disp_dict, pri_hdr = prihdr, ext_hdr = exthdr)
disp_model.save(output_dir, disp_model.filename)
rescan_needed = True
# Add default ones-flat FlatField calibration file
fixed_ones_flat_filename = "cgi_0000000000000000000_20000101t0000000_flt_cal.fits"
fixed_ones_flat_filepath = os.path.join(corgidrp.default_cal_dir, fixed_ones_flat_filename)
if not os.path.exists(fixed_ones_flat_filepath):
ones_flat_dataset = mocks.create_flatfield_dummy(numfiles=1)
ones_flat_dataset[0].data[:] = 1.0
ones_flat = flat.create_flatfield(ones_flat_dataset)
ones_flat.ext_hdr["FPAMNAME"] = "ONES"
ones_flat.ext_hdr["MJDSRT"] = float(time.Time("2026-01-01").mjd)
# Write to a fixed CGI-formatted filename so there's no need to search for the latest ones-flat
ones_flat.filename = fixed_ones_flat_filename
ones_flat.pri_hdr["FILENAME"] = fixed_ones_flat_filename
ones_flat.save(filedir=corgidrp.default_cal_dir, filename=fixed_ones_flat_filename)
rescan_needed = True
# Add TVAC default calibration files built from raw TVAC data packaged in corgidrp/data/default_calibs/.
# Fixed filenames encode the synthetic timestamp (20260101) so the existence check is stable across runs.
tvac_raw_dir = os.path.join(os.path.split(corgidrp.__file__)[0], "data", "default_calibs")
tvac_nln_filename = "cgi_0000000000000000000_20260101t0000000_nln_cal.fits"
tvac_krn_filename = "cgi_0000000000000000000_20260101t0000001_krn_cal.fits"
tvac_dnm_filename = "cgi_0000000000000000000_20260101t0000002_dnm_cal.fits"
tvac_flt_filename = "cgi_0000000000000000000_20260101t0000003_flt_cal.fits"
tvac_bpm_filename = "cgi_0000000000000000000_20260101t0000004_bpm_cal.fits"
tvac_pol0_flt_filename = "cgi_0000000000000000000_20260101t0000005_flt_cal.fits"
tvac_pol45_flt_filename = "cgi_0000000000000000000_20260101t0000006_flt_cal.fits"
tvac_flt4_filename = "cgi_0000000000000000000_20260101t0000007_flt_cal.fits"
tvac_pol0_flt4_filename = "cgi_0000000000000000000_20260101t0000008_flt_cal.fits"
tvac_pol45_flt4_filename = "cgi_0000000000000000000_20260101t0000009_flt_cal.fits"
tvac_cal_filenames = [tvac_nln_filename, tvac_krn_filename, tvac_dnm_filename,
tvac_flt_filename, tvac_bpm_filename,
tvac_pol0_flt_filename, tvac_pol45_flt_filename,
tvac_flt4_filename, tvac_pol0_flt4_filename, tvac_pol45_flt4_filename]
tvac_cals_missing = any(
not os.path.exists(os.path.join(corgidrp.default_cal_dir, f))
for f in tvac_cal_filenames
)
if tvac_cals_missing:
pri_hdr, ext_hdr, _, _ = mocks.create_default_calibration_product_headers()
mjd_2026 = float(time.Time("2026-01-01").mjd)
isot_2026 = "2026-01-01T00:00:00.000"
ext_hdr["MJDSRT"] = mjd_2026
ext_hdr["DATETIME"] = isot_2026
ext_hdr["FTIMEUTC"] = isot_2026
ext_hdr["DRPCTIME"] = isot_2026
ext_hdr["DRPVERSN"] = corgidrp.__version__
# Minimal mock input dataset for bookkeeping requirements in calibration constructors
mock_dataset = mocks.create_flatfield_dummy(numfiles=2)
# NonLinearityCalibration from packaged TVAC nonlinearity table
if not os.path.exists(os.path.join(corgidrp.default_cal_dir, tvac_nln_filename)):
nonlin_path = os.path.join(tvac_raw_dir, "nonlin_table_240322.txt")
nonlin_dat = np.genfromtxt(nonlin_path, delimiter=",")
nonlinear_cal = data.NonLinearityCalibration(
nonlin_dat, pri_hdr=pri_hdr.copy(), ext_hdr=ext_hdr.copy(),
input_dataset=mock_dataset,
)
nonlinear_cal.ext_hdr["MJDSRT"] = mjd_2026
nonlinear_cal.ext_hdr["DATETIME"] = isot_2026
nonlinear_cal.ext_hdr["FTIMEUTC"] = isot_2026
nonlinear_cal.save(filedir=corgidrp.default_cal_dir, filename=tvac_nln_filename)
# KGain — 8.7 e/DN and read noise from TVAC measurements
if not os.path.exists(os.path.join(corgidrp.default_cal_dir, tvac_krn_filename)):
kgain_val = 8.7
signal_array = np.linspace(0, 50)
noise_array = np.sqrt(signal_array)
ptc = np.column_stack([signal_array, noise_array])
ext_hdr_krn = ext_hdr.copy()
ext_hdr_krn["RN"] = 121.76 # read noise in electrons from TVAC measurement
ext_hdr_krn["RN_ERR"] = 2.0 # read noise uncertainty in electrons
kgain = data.KGain(
kgain_val, ptc=ptc, pri_hdr=pri_hdr.copy(), ext_hdr=ext_hdr_krn,
input_dataset=mock_dataset,
)
kgain.ext_hdr["MJDSRT"] = mjd_2026
kgain.ext_hdr["DATETIME"] = isot_2026
kgain.ext_hdr["FTIMEUTC"] = isot_2026
kgain.save(filedir=corgidrp.default_cal_dir, filename=tvac_krn_filename)
# DetectorNoiseMaps from packaged TVAC noise component files.
# The 1024x1024 science-image data is embedded in the full detector frame.
if not os.path.exists(os.path.join(corgidrp.default_cal_dir, tvac_dnm_filename)):
with fits.open(os.path.join(tvac_raw_dir, "fpn_20240322.fits")) as hdulist:
fpn_dat = hdulist[0].data.astype(float)
with fits.open(os.path.join(tvac_raw_dir, "cic_20240322.fits")) as hdulist:
cic_dat = hdulist[0].data.astype(float)
with fits.open(os.path.join(tvac_raw_dir, "dark_current_20240322.fits")) as hdulist:
dark_dat = hdulist[0].data.astype(float)
frame_rows = detector.detector_areas["SCI"]["frame_rows"]
frame_cols = detector.detector_areas["SCI"]["frame_cols"]
img_rows, img_cols, r0c0 = detector.unpack_geom("SCI", "image")
r0, c0 = r0c0
noise_map_dat = np.zeros((3, frame_rows, frame_cols))
noise_map_dat[0, r0:r0 + img_rows, c0:c0 + img_cols] = fpn_dat
noise_map_dat[1, r0:r0 + img_rows, c0:c0 + img_cols] = cic_dat
noise_map_dat[2, r0:r0 + img_rows, c0:c0 + img_cols] = dark_dat
noise_map_err = np.zeros((1,) + noise_map_dat.shape)
noise_map_dq = np.zeros(noise_map_dat.shape, dtype=int)
noise_err_hdr = fits.Header()
noise_err_hdr["BUNIT"] = "detected electron"
ext_hdr_dnm = ext_hdr.copy()
ext_hdr_dnm["B_O"] = 0.0
ext_hdr_dnm["B_O_ERR"] = 0.0
noise_map = data.DetectorNoiseMaps(
noise_map_dat,
pri_hdr=pri_hdr.copy(),
ext_hdr=ext_hdr_dnm,
input_dataset=mock_dataset,
err=noise_map_err,
dq=noise_map_dq,
err_hdr=noise_err_hdr,
)
noise_map.ext_hdr["MJDSRT"] = mjd_2026
noise_map.ext_hdr["DATETIME"] = isot_2026
noise_map.ext_hdr["FTIMEUTC"] = isot_2026
noise_map.ext_hdr["DRPNFILE"] = 96
noise_map.save(filedir=corgidrp.default_cal_dir, filename=tvac_dnm_filename)
# FlatField — all-ones array for the default imaging configuration.
# FPAMNAME='OPEN_12' and CFAMNAME='1F' match the caldb lookup for
# standard DPAMNAME='IMAGING' observations in bands 1 & 2.
if not os.path.exists(os.path.join(corgidrp.default_cal_dir, tvac_flt_filename)):
flat_dat = np.ones((1024, 1024))
flat_mock_dataset = mocks.create_flatfield_dummy(numfiles=1)
tvac_flat = data.FlatField(flat_dat, pri_hdr=pri_hdr.copy(), ext_hdr=ext_hdr.copy(),
input_dataset=flat_mock_dataset)
tvac_flat.ext_hdr["FPAMNAME"] = "OPEN_12"
tvac_flat.ext_hdr["CFAMNAME"] = "1F"
tvac_flat.ext_hdr["DPAMNAME"] = "IMAGING"
tvac_flat.ext_hdr["MJDSRT"] = mjd_2026
tvac_flat.ext_hdr["DATETIME"] = isot_2026
tvac_flat.ext_hdr["FTIMEUTC"] = isot_2026
tvac_flat.save(filedir=corgidrp.default_cal_dir, filename=tvac_flt_filename)
# BadPixelMap from packaged TVAC bad pixel data.
# A synthetic Dark frame is required by BadPixelMap as the base header source.
if not os.path.exists(os.path.join(corgidrp.default_cal_dir, tvac_bpm_filename)):
with fits.open(os.path.join(tvac_raw_dir, "bad_pix.fits")) as hdulist:
bp_dat = hdulist[0].data.astype(float)
bp_dark_pri, bp_dark_ext, _, _ = mocks.create_default_calibration_product_headers()
bp_dark_ext["EXPTIME"] = 1.0
bp_dark_ext["EMGAIN_C"] = 1.0
bp_dark_ext["DRPNFILE"] = 1
bp_dark_ext["MJDSRT"] = mjd_2026
bp_dark_ext["DATETIME"] = isot_2026
bp_dark_ext["FTIMEUTC"] = isot_2026
bp_dark = data.Dark(
np.zeros_like(bp_dat, dtype=float),
pri_hdr=bp_dark_pri,
ext_hdr=bp_dark_ext,
input_dataset=mock_dataset,
err=np.zeros((1,) + bp_dat.shape, dtype=float),
dq=np.zeros(bp_dat.shape, dtype="uint16"),
err_hdr=fits.Header(),
)
bp_map = data.BadPixelMap(
bp_dat,
pri_hdr=pri_hdr.copy(),
ext_hdr=ext_hdr.copy(),
input_dataset=data.Dataset([bp_dark]),
)
bp_map.ext_hdr["MJDSRT"] = mjd_2026
bp_map.ext_hdr["DATETIME"] = isot_2026
bp_map.ext_hdr["FTIMEUTC"] = isot_2026
bp_map.save(filedir=corgidrp.default_cal_dir, filename=tvac_bpm_filename)
# POL0 FlatField — all-ones for the default polarimetry configuration.
# FPAMNAME='OPEN_12', CFAMNAME='1F', DPAMNAME='POL0' for caldb lookup.
if not os.path.exists(os.path.join(corgidrp.default_cal_dir, tvac_pol0_flt_filename)):
flat_dat = np.ones((1024, 1024))
flat_mock_dataset = mocks.create_flatfield_dummy(numfiles=1)
pol0_flat = data.FlatField(flat_dat, pri_hdr=pri_hdr.copy(), ext_hdr=ext_hdr.copy(),
input_dataset=flat_mock_dataset)
pol0_flat.ext_hdr["FPAMNAME"] = "OPEN_12"
pol0_flat.ext_hdr["CFAMNAME"] = "1F"
pol0_flat.ext_hdr["DPAMNAME"] = "POL0"
pol0_flat.ext_hdr["MJDSRT"] = mjd_2026
pol0_flat.ext_hdr["DATETIME"] = isot_2026
pol0_flat.ext_hdr["FTIMEUTC"] = isot_2026
pol0_flat.save(filedir=corgidrp.default_cal_dir, filename=tvac_pol0_flt_filename)
# POL45 FlatField — all-ones for the default polarimetry configuration.
# FPAMNAME='OPEN_12', CFAMNAME='1F', DPAMNAME='POL45' for caldb lookup.
if not os.path.exists(os.path.join(corgidrp.default_cal_dir, tvac_pol45_flt_filename)):
flat_dat = np.ones((1024, 1024))
flat_mock_dataset = mocks.create_flatfield_dummy(numfiles=1)
pol45_flat = data.FlatField(flat_dat, pri_hdr=pri_hdr.copy(), ext_hdr=ext_hdr.copy(),
input_dataset=flat_mock_dataset)
pol45_flat.ext_hdr["FPAMNAME"] = "OPEN_12"
pol45_flat.ext_hdr["CFAMNAME"] = "1F"
pol45_flat.ext_hdr["DPAMNAME"] = "POL45"
pol45_flat.ext_hdr["MJDSRT"] = mjd_2026
pol45_flat.ext_hdr["DATETIME"] = isot_2026
pol45_flat.ext_hdr["FTIMEUTC"] = isot_2026
pol45_flat.save(filedir=corgidrp.default_cal_dir, filename=tvac_pol45_flt_filename)
# Band-4 IMAGING FlatField — all-ones, FPAMNAME='OPEN_34', CFAMNAME='4F', DPAMNAME='IMAGING'.
if not os.path.exists(os.path.join(corgidrp.default_cal_dir, tvac_flt4_filename)):
flat_dat = np.ones((1024, 1024))
flat_mock_dataset = mocks.create_flatfield_dummy(numfiles=1)
flt4 = data.FlatField(flat_dat, pri_hdr=pri_hdr.copy(), ext_hdr=ext_hdr.copy(),
input_dataset=flat_mock_dataset)
flt4.ext_hdr["FPAMNAME"] = "OPEN_34"
flt4.ext_hdr["CFAMNAME"] = "4F"
flt4.ext_hdr["DPAMNAME"] = "IMAGING"
flt4.ext_hdr["MJDSRT"] = mjd_2026
flt4.ext_hdr["DATETIME"] = isot_2026
flt4.ext_hdr["FTIMEUTC"] = isot_2026
flt4.save(filedir=corgidrp.default_cal_dir, filename=tvac_flt4_filename)
# Band-4 POL0 FlatField — all-ones, FPAMNAME='OPEN_34', CFAMNAME='4F', DPAMNAME='POL0'.
if not os.path.exists(os.path.join(corgidrp.default_cal_dir, tvac_pol0_flt4_filename)):
flat_dat = np.ones((1024, 1024))
flat_mock_dataset = mocks.create_flatfield_dummy(numfiles=1)
pol0_flt4 = data.FlatField(flat_dat, pri_hdr=pri_hdr.copy(), ext_hdr=ext_hdr.copy(),
input_dataset=flat_mock_dataset)
pol0_flt4.ext_hdr["FPAMNAME"] = "OPEN_34"
pol0_flt4.ext_hdr["CFAMNAME"] = "4F"
pol0_flt4.ext_hdr["DPAMNAME"] = "POL0"
pol0_flt4.ext_hdr["MJDSRT"] = mjd_2026
pol0_flt4.ext_hdr["DATETIME"] = isot_2026
pol0_flt4.ext_hdr["FTIMEUTC"] = isot_2026
pol0_flt4.save(filedir=corgidrp.default_cal_dir, filename=tvac_pol0_flt4_filename)
# Band-4 POL45 FlatField — all-ones, FPAMNAME='OPEN_34', CFAMNAME='4F', DPAMNAME='POL45'.
if not os.path.exists(os.path.join(corgidrp.default_cal_dir, tvac_pol45_flt4_filename)):
flat_dat = np.ones((1024, 1024))
flat_mock_dataset = mocks.create_flatfield_dummy(numfiles=1)
pol45_flt4 = data.FlatField(flat_dat, pri_hdr=pri_hdr.copy(), ext_hdr=ext_hdr.copy(),
input_dataset=flat_mock_dataset)
pol45_flt4.ext_hdr["FPAMNAME"] = "OPEN_34"
pol45_flt4.ext_hdr["CFAMNAME"] = "4F"
pol45_flt4.ext_hdr["DPAMNAME"] = "POL45"
pol45_flt4.ext_hdr["MJDSRT"] = mjd_2026
pol45_flt4.ext_hdr["DATETIME"] = isot_2026
pol45_flt4.ext_hdr["FTIMEUTC"] = isot_2026
pol45_flt4.save(filedir=corgidrp.default_cal_dir, filename=tvac_pol45_flt4_filename)
rescan_needed = True
if rescan_needed:
# add default caldb entries
default_caldb = CalDB()
default_caldb.scan_dir_for_new_entries(corgidrp.default_cal_dir)
# set initialization
initialized = True
if not os.environ.get('CORGIDRP_DO_NOT_AUTO_INIT_CALDB', False):
initialize()