import os
import json
import astropy.time as time
import warnings
import xml.etree.ElementTree as ET
import corgidrp
import corgidrp.astrom
import corgidrp.bad_pixel_calibration
import corgidrp.calibrate_kgain
import corgidrp.combine
import corgidrp.data as data
import corgidrp.caldb as caldb
import corgidrp.l1_to_l2a
import corgidrp.l2a_to_l2b
import corgidrp.l2b_to_l3
import corgidrp.l3_to_l4
import corgidrp.nd_filter_calibration
import corgidrp.photon_counting
import corgidrp.pump_trap_calibration
import corgidrp.calibrate_nonlin
import corgidrp.detector
import corgidrp.flat
import corgidrp.darks
import corgidrp.sorting
import corgidrp.fluxcal
import corgidrp.spec
[docs]
all_steps = {
"prescan_biassub" : corgidrp.l1_to_l2a.prescan_biassub,
"discard_setup_frames" : corgidrp.l1_to_l2a.discard_setup_frames,
"detect_cosmic_rays" : corgidrp.l1_to_l2a.detect_cosmic_rays,
"calibrate_nonlin": corgidrp.calibrate_nonlin.calibrate_nonlin,
"correct_nonlinearity" : corgidrp.l1_to_l2a.correct_nonlinearity,
"update_to_l2a" : corgidrp.l1_to_l2a.update_to_l2a,
"add_shot_noise_to_err" : corgidrp.l2a_to_l2b.add_shot_noise_to_err,
"dark_subtraction" : corgidrp.l2a_to_l2b.dark_subtraction,
"flat_division" : corgidrp.l2a_to_l2b.flat_division,
"flat_division_pol" : corgidrp.l2a_to_l2b.flat_division_pol,
"frame_select" : corgidrp.l2a_to_l2b.frame_select,
"convert_to_electrons" : corgidrp.l2a_to_l2b.convert_to_electrons,
"em_gain_division" : corgidrp.l2a_to_l2b.em_gain_division,
"cti_correction" : corgidrp.l2a_to_l2b.cti_correction,
"correct_bad_pixels" : corgidrp.l2a_to_l2b.correct_bad_pixels,
"desmear" : corgidrp.l2a_to_l2b.desmear,
"update_to_l2b" : corgidrp.l2a_to_l2b.update_to_l2b,
"boresight_calibration": corgidrp.astrom.boresight_calibration,
"calibrate_trap_pump": corgidrp.pump_trap_calibration.tpump_analysis,
"create_bad_pixel_map" : corgidrp.bad_pixel_calibration.create_bad_pixel_map,
"calibrate_kgain" : corgidrp.calibrate_kgain.calibrate_kgain,
"calibrate_darks" : corgidrp.darks.calibrate_darks_lsq,
"create_onsky_flatfield" : corgidrp.flat.create_onsky_flatfield,
"create_onsky_pol_flatfield" : corgidrp.flat.create_onsky_pol_flatfield,
"combine_subexposures" : corgidrp.combine.combine_subexposures,
"build_trad_dark" : corgidrp.darks.build_trad_dark,
"sort_pupilimg_frames" : corgidrp.sorting.sort_pupilimg_frames,
"get_pc_mean" : corgidrp.photon_counting.get_pc_mean,
"divide_by_exptime" : corgidrp.l2b_to_l3.divide_by_exptime,
"crop" : corgidrp.l2b_to_l3.crop,
"northup" : corgidrp.l3_to_l4.northup,
"calibrate_fluxcal_aper": corgidrp.fluxcal.calibrate_fluxcal_aper,
"calibrate_pol_fluxcal_aper": corgidrp.fluxcal.calibrate_pol_fluxcal_aper,
"update_to_l3": corgidrp.l2b_to_l3.update_to_l3,
"create_wcs": corgidrp.l2b_to_l3.create_wcs,
"replace_bad_pixels": corgidrp.l3_to_l4.replace_bad_pixels,
"distortion_correction": corgidrp.l3_to_l4.distortion_correction,
"find_star": corgidrp.l3_to_l4.find_star,
"find_spec_star" : corgidrp.l3_to_l4.find_spec_star,
"do_psf_subtraction": corgidrp.l3_to_l4.do_psf_subtraction,
"spec_psf_subtraction": corgidrp.l3_to_l4.spec_psf_subtraction,
"determine_wave_zeropoint": corgidrp.l3_to_l4.determine_wave_zeropoint,
"add_wavelength_map": corgidrp.l3_to_l4.add_wavelength_map,
"extract_spec": corgidrp.l3_to_l4.extract_spec,
"update_to_l4": corgidrp.l3_to_l4.update_to_l4,
"update_to_l4_pol": corgidrp.l3_to_l4.update_to_l4_pol,
"generate_ct_cal": corgidrp.corethroughput.generate_ct_cal,
"create_ct_map": corgidrp.corethroughput.create_ct_map,
"create_nd_filter_cal": corgidrp.nd_filter_calibration.create_nd_filter_cal,
"create_nd_filter_cal_spec": corgidrp.nd_filter_calibration.create_nd_filter_cal_spec,
"compute_psf_centroid": corgidrp.spec.compute_psf_centroid,
"calibrate_dispersion_model": corgidrp.spec.calibrate_dispersion_model,
"fit_line_spread_function": corgidrp.spec.fit_line_spread_function,
"split_image_by_polarization_state": corgidrp.l2b_to_l3.split_image_by_polarization_state,
"calc_stokes_unocculted": corgidrp.pol.calc_stokes_unocculted,
"generate_mueller_matrix_cal": corgidrp.pol.generate_mueller_matrix_cal,
"align_polarimetry_frames": corgidrp.l3_to_l4.align_polarimetry_frames,
"combine_polarization_states": corgidrp.l3_to_l4.combine_polarization_states,
"subtract_stellar_polarization": corgidrp.l3_to_l4.subtract_stellar_polarization,
"align_2d_frames": corgidrp.l3_to_l4.align_2d_frames,
"combine_spec": corgidrp.l3_to_l4.combine_spec,
"spec_fluxcal": corgidrp.spec.spec_fluxcal
}
[docs]
recipe_dir = os.path.join(os.path.dirname(__file__), "recipe_templates")
[docs]
def walk_corgidrp(filelist, CPGS_XML_filepath, outputdir, template=None):
"""
Automatically create a recipe and process the input filelist.
Does both the `autogen_recipe` and `run_recipe` steps.
Args:
filelist (list of str): list of filepaths to files
CPGS_XML_filepath (str): path to CPGS XML file for this set of files in filelist
outputdir (str): output directory folderpath
template (str or json): custom template. It can be one of three things
* the full json object,
* a filename of a template that's already in the recipe_templates folder
* a filepath to a template on disk somewhere
Returns:
json or list: the JSON recipe (or list of JSON recipes) that was used for processing
"""
if isinstance(template, str):
if os.path.sep not in template:
# this is just a template name in the recipe_templates folder
recipe_filepath = os.path.join(recipe_dir, template)
else:
recipe_filepath = template
template = json.load(open(recipe_filepath, 'r'))
# generate recipe
recipes = autogen_recipe(filelist, outputdir, template=template)
if not isinstance(recipes, list):
recipes = [recipes]
# accommodate a list of chains
if not isinstance(recipes[0], list):
list_of_recipe_chains = [recipes]
else:
list_of_recipe_chains = recipes
for recipes in list_of_recipe_chains:
# process recipes
output_filelist = None
for i, recipe in enumerate(recipes):
# check for recipe chaining
if i > 0 and len(recipe['inputs']) == 0:
recipe["inputs"] = []
for filename in output_filelist:
recipe["inputs"].append(filename)
# check for functions that require CPGS XML info
for step in recipe['steps']:
if step['name'].lower() == 'find_spec_star':
if not 'keywords' in step:
read_cpgs = True
step['keywords'] = {}
elif "r_lamD" not in step['keywords']:
read_cpgs = True
else:
read_cpgs = False
if read_cpgs: # if not already specified.
# need to populate satellite spot info from XML
cpgs_xml = ET.parse(CPGS_XML_filepath)
sat_spot_info = _get_satellite_spot_info_from_xml(cpgs_xml)
step['keywords']['r_lamD'] = sat_spot_info['spot1_sep']
step['keywords']['phi_deg'] = sat_spot_info['spot1_angle']
output_filelist = run_recipe(recipe)
# return just the recipe if there was only one
if len(list_of_recipe_chains) == 1:
if len(list_of_recipe_chains[0]) == 1:
return list_of_recipe_chains[0][0]
else:
return list_of_recipe_chains[0]
else:
return list_of_recipe_chains
[docs]
def autogen_recipe(filelist, outputdir, template=None):
"""
Automatically creates a recipe (or recipes) by identifyng and populating a template.
Returns a single recipe unless there are multiple recipes that should be produced.
Args:
filelist (list of str): list of filepaths to files
outputdir (str): output directory folderpath
template (json): enables passing in of custom template, if desired
Returns:
json list: the JSON recipe (or list of recipes) that the input filelist will be processed with
"""
# Handle the case where filelist is empty
if not filelist:
print("Input filelist is empty, using default handling to create recipe.")
first_frame = None
else:
# load the data to check what kind of recipe it is
dataset0 = data.Dataset([filelist[0]])
first_frame = dataset0[0]
# don't need the actual data, especially if it would take up a lot of RAM just to hold it in cache
dataset = data.Dataset(filelist, no_data=True, no_err=True, no_dq=True)
# if user didn't pass in template
if template is None:
recipe_filename, chained = guess_template(dataset)
# handle it as a list of lists moving forward
if isinstance(recipe_filename, list):
recipe_filename_list = recipe_filename
else:
recipe_filename_list = [recipe_filename]
if not isinstance(recipe_filename_list[0], list):
recipe_filename_list_list = [recipe_filename_list]
else:
recipe_filename_list_list = recipe_filename_list
for l in recipe_filename_list_list:
if not isinstance(l, list):
raise TypeError("Each element of recipe_filename_list should be a list, but got {0}".format(type(l)))
recipe_template_list_list = []
for recipe_filename_list in recipe_filename_list_list:
recipe_template_list = []
for recipe_filename in recipe_filename_list:
# load the template recipe
recipe_filepath = os.path.join(recipe_dir, recipe_filename)
template = json.load(open(recipe_filepath, 'r'))
recipe_template_list.append(template)
recipe_template_list_list.append(recipe_template_list)
else:
# user passed in a single template
recipe_template_list = [template]
recipe_template_list_list = [recipe_template_list]
chained = False
recipe_list_list = []
for recipe_template_list in recipe_template_list_list:
recipe_list = []
for i, template in enumerate(recipe_template_list):
# create the personalized recipe
recipe = template.copy()
recipe["template"] = False
# for chained recipes, don't put the input in yet since we don't know it
if i > 0 and chained:
pass
else:
for filename in filelist:
recipe["inputs"].append(filename)
recipe["outputdir"] = outputdir
## Populate default values
## This includes calibration files that need to be automatically determined
## This also includes the dark subtraction outputdir for synthetic darks
this_caldb = caldb.CalDB()
for step in recipe["steps"]:
# by default, identify all the calibration files needed, unless jit setting is turned on
# two cases where we should be identifying the calibration recipes now
if "jit_calib_id" in recipe['drpconfig'] and (not recipe['drpconfig']["jit_calib_id"]):
_fill_in_calib_files(step, this_caldb, first_frame)
elif ("jit_calib_id" not in recipe['drpconfig']) and (not corgidrp.jit_calib_id):
_fill_in_calib_files(step, this_caldb, first_frame)
if step["name"].lower() == "dark_subtraction":
if step["keywords"]["outputdir"].upper() == "AUTOMATIC":
step["keywords"]["outputdir"] = recipe["outputdir"]
if step["name"].lower() == "create_nd_filter_cal_spec":
if "keywords" in step and step["keywords"].get("outputdir", "").upper() == "AUTOMATIC":
step["keywords"]["outputdir"] = recipe["outputdir"]
recipe_list.append(recipe)
recipe_list_list.append(recipe_list)
# if list of chains, return that. If single list, return that. If single
# recipe, return that.
if len(recipe_list_list) > 1: # list of chains
return recipe_list_list
else:
if len(recipe_list_list[0]) > 1: # single list
return recipe_list_list[0]
else: #single recipe
return recipe_list_list[0][0]
def _fill_in_calib_files(step, this_caldb, ref_frame):
"""
Fills in calibration files defined as "AUTOMATIC" in a recipe
By default, throws an error if there are no available cal files of a certian type.
Exceptional case is when the pipeline setting `skip_missing_cal_steps = True` is set:
in this case, it will mark this step to be skipped, but continue processing the recipe.
Args:
step (dict): the portion of a recipe for this step
this_caldb (corgidrp.CalDB): calibration database conection
ref_frame (corgidrp.Image): a reference frame to use to determine the optimal calibration
Returns:
dict: the step, but with calibration files filled in
"""
if "calibs" not in step:
return step # don't have to do anything if no calibrations
for calib in step["calibs"]:
# order matters, so only one calibration file per dictionary
if "AUTOMATIC" in step["calibs"][calib].upper():
calib_dtype = data.datatypes[calib]
# try to look up the best calibration, but it could raise an error
try:
best_cal_file = this_caldb.get_calib(ref_frame, calib_dtype)
best_cal_filepath = best_cal_file.filepath
except ValueError as e:
if "OPTIONAL" in step["calibs"][calib].upper():
# couldn't find a good cal but this one is optional, so we are going to put nothing in there
# this means the step function can run without this calibration file
best_cal_filepath = None
elif corgidrp.skip_missing_cal_steps:
step["skip"] = True # skip this step but continue
step["calibs"][calib] = None
warnings.warn("Skipping {0} because no {1} in caldb and skip_missing_cal_steps is True".format(step['name'], calib))
continue # continue on the for loop
else:
raise # reraise exception
# set calibration file to this one
step["calibs"][calib] = best_cal_filepath
return step
[docs]
def guess_template(dataset):
"""
Guesses what template should be used to process a specific image
Args:
dataset (corgidrp.data.Dataset): a Dataset to process
Returns:
str or list: the best template filename, a list of multiple template filenames, or a list of template chains
bool: whether multiple recipes are chained together. If True, the output of the first recipe
should be used as the input to the second recipe. If False, the same input should be used
for all recipes. This keyworkd is irrelevant if only a single recipe is returned.
"""
image = dataset[0] # first image for convenience
chained = False # whether multiiple recipes are chained together
# L1 -> L2a data processing
if image.ext_hdr['DATALVL'] == "L1":
if 'VISTYPE' not in image.pri_hdr:
# this is probably IIT test data. Do generic processing
recipe_filename = "l1_to_l2b.json"
# elif image.pri_hdr['VISTYPE'][:11] == "CGIVST_ENG_":
# # if this is an ENG calibration visit
# # for either pupil or image
# recipe_filename = "l1_to_l2a_eng.json"
elif image.pri_hdr['VISTYPE'] == "CGIVST_CAL_BORESIGHT":
recipe_filename = ["l1_to_l2a_basic.json", "l2a_to_l2b.json", 'l2b_to_boresight.json'] #"l1_to_boresight.json"
chained = True
elif image.pri_hdr['VISTYPE'] == "CGIVST_CAL_FLAT":
if image.ext_hdr.get('DPAMNAME', '') in ('POL0', 'POL45'):
recipe_filename = ["l1_to_l2a_basic.json", "l2a_to_polflat.json"]
chained = True
else:
recipe_filename = "l1_flat_and_bp.json"
elif image.pri_hdr['VISTYPE'] == "CGIVST_CAL_DRK":
_, unique_vals = dataset.split_dataset(exthdr_keywords=['EXPTIME', 'EMGAIN_C', 'KGAINPAR'])
if image.ext_hdr['ISPC'] == 1:
recipe_filename = ["l1_to_l2b_pc_dark_1.json", "l1_to_l2b_pc_dark_2.json"]# "l1_to_l2b_pc_dark.json"
chained = True
elif len(unique_vals) > 1: # darks for noisemap creation
recipe_filename = ["l1_to_l2a_noisemap_1.json", "l1_to_l2a_noisemap_2.json"]#"l1_to_l2a_noisemap.json"
chained = True
else: # then len(unique_vals) is 1 and not PC: traditional darks
recipe_filename = ["build_trad_dark_image_1.json", "build_trad_dark_image_2.json"] #"build_trad_dark_image.json"
chained = True
elif image.pri_hdr['VISTYPE'] == "CGIVST_CAL_PUPIL_IMAGING":
recipe_filename = [["l1_to_l2a_nonlin_1.json", "l1_to_l2a_nonlin_2.json", "l1_to_l2a_nonlin_3.json"],
["l1_to_kgain_1.json", "l1_to_kgain_2.json"]] # ["l1_to_l2a_nonlin.json","l1_to_kgain.json"]
chained = True # in this case, each sub-list is chained
elif image.pri_hdr['VISTYPE'] in ("CGIVST_CAL_ABSFLUX_FAINT", "CGIVST_CAL_ABSFLUX_BRIGHT"):
is_spec_mode = image.ext_hdr.get('DPAMNAME', '').startswith('PRISM')
has_nd_filter = any(img.ext_hdr.get('FPAMNAME', '').startswith('ND') for img in dataset)
if is_spec_mode:
if has_nd_filter:
recipe_filename = ["l1_to_l2a_basic.json", "l2a_to_l2b_spec.json", "l2b_to_nd_filter_spec.json"]
chained = True
else:
recipe_filename = ["l1_to_l2a_basic.json", "l2a_to_l2b_spec.json", "l2b_to_spec_flux.json"]
chained = True
else:
_, fsm_unique = dataset.split_dataset(exthdr_keywords=['FSMX', 'FSMY'])
if len(fsm_unique) > 1:
recipe_filename = ["l1_to_l2a_basic.json", "l2a_to_l2b.json", "l2b_to_nd_filter.json"]
chained = True
else:
recipe_filename = ["l1_to_l2a_basic.json", "l2a_to_l2b.json", "l2b_to_fluxcal_factor.json"]
chained = True
elif image.pri_hdr['VISTYPE'] == 'CGIVST_CAL_CORETHRPT':
recipe_filename = ["l1_to_l2a_basic.json", "l2a_to_l2b.json", 'l2b_to_corethroughput.json']
chained = True
elif image.pri_hdr['VISTYPE'] == 'CGIVST_CAL_SPEC_LINESPREAD':
recipe_filename = ["l1_to_l2a_basic.json", "l2a_to_l2b_spec.json", 'l2b_to_spec_linespread.json']
chained = True
elif image.pri_hdr['VISTYPE'] == 'CGIVST_CAL_SPEC_TGTREF':
if image.ext_hdr['FPAMNAME'] == 'OPEN': #L1 -> spec dispersion calibration
recipe_filename = ["l1_to_l2a_basic.json","l2a_to_l2b_spec.json","l2b_to_spec_prism_disp.json"]
chained = True
else:
recipe_filename = ["l1_to_l2a_basic.json","l2a_to_l2b_spec.json","l2b_to_l3.json","l3_to_l4_noncoron_spec.json"]
chained = True
elif image.pri_hdr['VISTYPE'] == 'CGIVST_CAL_TGTREF_PHOT' and image.ext_hdr['DPAMNAME'] not in ['POL0','POL45']:
recipe_filename = ["l1_to_l2a_basic.json","l2a_to_l2b.json","l2b_to_l3.json","l3_to_l4_nopsfsub.json"]
chained = True
elif image.pri_hdr['VISTYPE'] == 'CGIVST_CAL_TPUMP':
recipe_filename = ['trap_pump_cal_1.json', 'trap_pump_cal_2.json']
chained = True
else:
recipe_filename = "l1_to_l2a_basic.json" # science data and all else (including photon counting)
# L2a -> L2b data processing
elif image.ext_hdr['DATALVL'] == "L2a":
if image.pri_hdr['VISTYPE'] == "CGIVST_CAL_DRK":
_, unique_vals = dataset.split_dataset(exthdr_keywords=['EXPTIME', 'EMGAIN_C', 'KGAINPAR'])
if image.ext_hdr['ISPC'] == 1:
recipe_filename = ["l2a_to_l2b_pc_dark_1.json", "l2a_to_l2b_pc_dark_2.json"]#"l2a_to_l2b_pc_dark.json"
chained = True
elif len(unique_vals) > 1: # darks for noisemap creation
recipe_filename = ["l2a_to_l2a_noisemap_1.json", "l2a_to_l2a_noisemap_2.json"] # "l2a_to_l2a_noisemap.json"
chained = True
else: # then len(unique_vals) is 1 and not PC: traditional darks
recipe_filename = ["l2a_build_trad_dark_image_1.json", "l2a_build_trad_dark_image_2.json"] #"l2a_build_trad_dark_image.json"
chained = True
else:
# Check if this is spectroscopy data (DPAMNAME == PRISM3, not sure of VISTYPE yet)
is_spectroscopy = image.ext_hdr.get('DPAMNAME', '') == 'PRISM3'
is_polarimetry = image.ext_hdr.get('DPAMNAME', '') in ['POL0', 'POL45']
if is_spectroscopy:
if image.ext_hdr['ISPC'] == 1:
recipe_filename = ["l2a_to_l2b_pc_spec_1.json", "l2a_to_l2b_pc_spec_2.json", "l2a_to_l2b_pc_spec_3.json"] #"l2a_to_l2b_pc_spec.json"
else:
recipe_filename = "l2a_to_l2b_spec.json"
elif is_polarimetry:
if image.ext_hdr['ISPC'] == 1:
recipe_filename = ["l2a_to_l2b_pc_1.json", "l2a_to_l2b_pc_2.json", "l2a_to_l2b_pol_pc_3.json"] #"l2a_to_l2b_pc_pol.json"
chained = True
else:
recipe_filename = "l2a_to_l2b_pol.json"
else:
if image.ext_hdr['ISPC'] == 1:
recipe_filename = ["l2a_to_l2b_pc_1.json", "l2a_to_l2b_pc_2.json", "l2a_to_l2b_pc_3.json"] #l2a_to_l2b_pc.json
chained = True
else:
recipe_filename = "l2a_to_l2b.json" # science data and all else
# L2b -> L3 data processing
elif image.ext_hdr['DATALVL'] == "L2b":
if image.pri_hdr['VISTYPE'] in ("CGIVST_CAL_ABSFLUX_FAINT", "CGIVST_CAL_ABSFLUX_BRIGHT"):
is_spec_mode = image.ext_hdr.get('DPAMNAME', '').startswith('PRISM')
has_nd_filter = any(img.ext_hdr.get('FPAMNAME', '').startswith('ND') for img in dataset)
if is_spec_mode:
if has_nd_filter:
recipe_filename = "l2b_to_nd_filter_spec.json"
else:
recipe_filename = "l2b_to_spec_flux.json"
else:
_, fsm_unique = dataset.split_dataset(exthdr_keywords=['FSMX', 'FSMY'])
if len(fsm_unique) > 1:
recipe_filename = "l2b_to_nd_filter.json"
else:
if image.ext_hdr['DPAMNAME'] == 'POL0' or image.ext_hdr['DPAMNAME'] == 'POL45':
recipe_filename = 'l2b_to_fluxcal_factor_pol.json'
else:
recipe_filename = "l2b_to_fluxcal_factor.json"
elif image.pri_hdr['VISTYPE'] == 'CGIVST_CAL_CORETHRPT':
recipe_filename = 'l2b_to_corethroughput.json'
elif image.pri_hdr['VISTYPE'] == "CGIVST_CAL_POL_SETUP":
recipe_filename = "l2b_to_polcal.json"
elif image.ext_hdr['DPAMNAME'] == 'POL0' or image.ext_hdr['DPAMNAME'] == 'POL45':
recipe_filename = "l2b_to_l3_pol.json"
elif 'TDD' not in image.pri_hdr['VISTYPE']:
warnings.warn("Only VISTYPE TDD and certain cal frames should be processed beyond L2b. Double-check which frames are being processed from L2b -> L3.")
recipe_filename = "l2b_to_l3.json"
else:
recipe_filename = "l2b_to_l3.json"
# L3 -> L4 data processing
elif image.ext_hdr['DATALVL'] == "L3":
if image.ext_hdr['DPAMNAME'] == 'POL0' or image.ext_hdr['DPAMNAME'] == 'POL45':
recipe_filename = "l3_to_l4_pol.json"
elif image.ext_hdr['DPAMNAME'] == 'PRISM3':
if image.pri_hdr['VISTYPE'] != 'CGIVST_CAL_SPEC_TGTREF':
# coronagraphic spec obs - PSF subtraction
recipe_filename = "l3_to_l4_psfsub_spec.json"
else:
# noncoronagraphic spec obs - no PSF subtraction
recipe_filename = "l3_to_l4_noncoron_spec.json"
else:
if image.pri_hdr['VISTYPE'] != 'CGIVST_CAL_TGTREF_PHOT':
# coronagraphic obs - PSF subtraction
recipe_filename = "l3_to_l4.json"
else:
# noncoronagraphic obs - no PSF subtraction
recipe_filename = "l3_to_l4_nopsfsub.json"
else:
raise NotImplementedError("Cannot automatically guess the input dataset with 'DATALVL' = {0}".format(image.ext_hdr['DATALVL']))
return recipe_filename, chained
[docs]
def save_data(dataset_or_image, outputdir, suffix="", ram_heavy_save=False):
"""
Saves the dataset or image that has currently been outputted by the last step function.
Records calibration frames into the caldb during the process
Args:
dataset_or_image (corgidrp.data.Dataset or corgidrp.data.Image): data to save
outputdir (str): path to directory where files should be saved
suffix (str): optional suffix to tack onto the filename.
E.g.: `test.fits` with `suffix="dark"` becomes `test_dark.fits`
ram_heavy_save (bool): If True, the input is assumed to have no data loaded into memory. (Only metadata was
manipulated in step leading up to save_data.) The data is loaded from the filepath frame by frame, and
each Image is saved to outputdir. Defaults to False.
"""
# convert everything to dataset to make life easier
if isinstance(dataset_or_image, data.Image):
dataset = data.Dataset([dataset_or_image])
else:
dataset = dataset_or_image
# add suffix to ending if necessary
if len(suffix) > 0:
filenames = []
suffix = suffix.strip("_") # user doesn't need to pass underscores
for image in dataset:
# grab everything before .FITS
fits_index = image.filename.lower().rfind(".fits")
filename_base = image.filename[:fits_index]
new_filename = "{0}_{1}.fits".format(filename_base, suffix)
filenames.append(new_filename)
else:
filenames = None
# save!
dataset.save(filedir=outputdir, filenames=filenames, ram_heavy_save=ram_heavy_save)
# add calibration data to caldb as necessary
for image in dataset:
if type(image) in caldb.labels:
# this is a calibration frame!
this_caldb = caldb.CalDB()
this_caldb.create_entry(image)
[docs]
def run_recipe(recipe, save_recipe_file=True):
"""
Run the specified recipe
Args:
recipe (dict or str): either the filepath to the recipe or the already loaded in recipe
save_recipe_file (bool): saves the recipe as a JSON file in the outputdir (true by default)
Returns:
list: list of filepaths to the saved files, or None if no files were saved
"""
if isinstance(recipe, str):
# need to load in
recipe = json.load(open(recipe, "r"))
# configure pipeline as needed
# these settings should only apply to this recipe, so we will restore old settings later
old_settings = {}
for setting in recipe.get("drpconfig", {}):
# capture previous value before overriding
old_settings[setting] = getattr(corgidrp, setting, None)
setattr(corgidrp, setting, recipe["drpconfig"][setting])
# save recipe before running recipe
if save_recipe_file:
recipe_filename = "{0}_{1}_recipe.json".format(recipe["name"], time.Time.now().isot)
recipe_filename = recipe_filename.replace(":", ".") # replace colons with periods for compatibility with Windows machines
recipe_filepath = os.path.join(recipe["outputdir"], recipe_filename)
with open(recipe_filepath, "w") as json_file:
json.dump(recipe, json_file, indent=4)
# determine if this is a RAM-heavy recipe which needs crop-stack processing
#sort_pupilimg_frames included here b/c it sorts through a large number of frame (>700) b/c
# EM gain cal files (sorted here and excluded, processed by SSC) are included in the visits
if "ram_heavy" in recipe:
ram_heavy_bool = bool(recipe["ram_heavy"])
else:
ram_heavy_bool = False
if "process_in_chunks" in recipe:
ram_increment_bool = bool(recipe["process_in_chunks"])
else:
ram_increment_bool = False
if ram_heavy_bool and ram_increment_bool:
warnings.warn('\'ram_heavy\' supercedes \'process_in_chunks\', so frames will be read in all at once with no data loaded.')
# read in data, if not doing bp map
if not recipe["inputs"]:
curr_dataset = []
ram_heavy_bool = False
filelist_chunks = [0] #anything of length 1
else:
filelist = recipe["inputs"]
if ram_increment_bool and not ram_heavy_bool: #ram_heavy_bool supercedes ram_increment_bool
# how many frames to process at a time (before getting the RAM-heaviest function in the recipe) if RAM-heavy
filelist_chunks = [filelist[n:n+corgidrp.chunk_size] for n in range(0, len(filelist), corgidrp.chunk_size)]
else:
filelist_chunks = [filelist]
try:
tot_steps = len(recipe["steps"])
save_step = False
output_filepaths = []
for filelist in filelist_chunks:
if recipe["inputs"]:
if ram_heavy_bool:
curr_dataset = data.Dataset(filelist, no_data=True, no_err=True, no_dq=True)
recipe_temp = recipe.copy()
# don't want to keep all ~26000 filepaths in all ~26000 ext headers b/c that's a lot of memory
recipe_temp["inputs"] = "See RECIPE header value in {0}".format(curr_dataset[-1].filepath)
else:
curr_dataset = data.Dataset(filelist)
recipe_temp = recipe
# write the recipe into the image extension header
curr_dataset[-1].ext_hdr["RECIPE"] = json.dumps(recipe)
if len(curr_dataset) > 1:
for frame in curr_dataset[:-1]:
frame.ext_hdr["RECIPE"] = json.dumps(recipe_temp)
# execute each pipeline step
print('Executing recipe: {0}'.format(recipe['name']))
if isinstance(filelist, list):
print('number of frames: ', len(filelist))
if ram_increment_bool and len(filelist_chunks) > 1:
print('Processing frames in chunks of {0} frames'.format(corgidrp.chunk_size))
if ram_heavy_bool:
print('Processing frames in RAM-heavy mode (data not loaded into memory until necessary, one frame at a time)')
for i, step in enumerate(recipe["steps"]):
print("Walker step {0}/{1}: {2}".format(i+1, tot_steps, step["name"]))
if step["name"].lower() == "save":
# special save instruction
# see if suffix is specified as a keyword
if "keywords" in step and "suffix" in step["keywords"]:
suffix = step["keywords"]["suffix"]
else:
suffix = ''
if "keywords" in step and "ram_heavy_save" in step["keywords"]:
ram_heavy_save = step["keywords"]["ram_heavy_save"]
else:
ram_heavy_save = False
save_data(curr_dataset, recipe["outputdir"], suffix=suffix, ram_heavy_save=ram_heavy_save)
if isinstance(curr_dataset, data.Dataset):
output_filepaths += [frame.filepath for frame in curr_dataset]
else:
output_filepaths += [curr_dataset.filepath]
save_step = True
else:
step_func = all_steps[step["name"]]
# edge case if this step has been specified to be skipped
if "skip" in step and step["skip"]:
continue
other_args = ()
if "calibs" in step:
# if JIT calibration resolving is toggled, figure out the calibrations here
# by default, this is false
if (corgidrp.jit_calib_id and ("jit_calib_id" not in recipe['drpconfig'])) or (("jit_calib_id" in recipe['drpconfig']) and recipe['drpconfig']["jit_calib_id"]) :
this_caldb = caldb.CalDB()
# dataset may have turned into a single image. handle this case.
if isinstance(curr_dataset, data.Dataset):
ref_image = curr_dataset[0]
list_of_frames = curr_dataset
else:
ref_image = curr_dataset
list_of_frames = [curr_dataset]
if ram_heavy_bool:
ref_image = data.Image(ref_image.filepath) #load in data for calibration matching
_fill_in_calib_files(step, this_caldb, ref_image)
# also update the recipe we used in the headers
if ram_heavy_bool:
recipe_temp = recipe.copy()
# don't want to keep all ~26000 filepaths in all ~26000 ext headers b/c that's a lot of memory
recipe_temp["inputs"] = "See RECIPE header value in {0}".format(curr_dataset[-1].filepath)
else:
recipe_temp = recipe
list_of_frames[-1].ext_hdr["RECIPE"] = json.dumps(recipe)
if len(list_of_frames) > 1:
for frame in list_of_frames[:-1]:
frame.ext_hdr["RECIPE"] = json.dumps(recipe_temp)
# load the calibration files in from disk
for calib in step["calibs"]:
if step["calibs"][calib] is not None:
# special case for pol flat because it has multiple files
if calib == "FlatFieldPOL0" or calib == "FlatFieldPOL45":
calib_dtype = data.datatypes['FlatField']
elif calib == "FluxcalFactorPOL0" or calib == "FluxcalFactorPOL45":
calib_dtype = data.datatypes['FluxcalFactor']
else:
calib_dtype = data.datatypes[calib]
cal_file = calib_dtype(step["calibs"][calib])
else:
cal_file = None
other_args += (cal_file,)
kwargs = step.get("keywords", {})
# run the step!
curr_dataset = step_func(curr_dataset, *other_args, **kwargs)
# make sure RECIPE header is propagated to output
if isinstance(curr_dataset, data.Dataset):
for frame in curr_dataset:
if "RECIPE" not in frame.ext_hdr:
frame.ext_hdr["RECIPE"] = json.dumps(recipe)
elif hasattr(curr_dataset, 'ext_hdr') and "RECIPE" not in curr_dataset.ext_hdr:
curr_dataset.ext_hdr["RECIPE"] = json.dumps(recipe)
if not save_step:
output_filepaths = None
return output_filepaths
finally:
# restore old pipeline settings that this recipe overwrote (even if a step raises)
for setting, old_val in old_settings.items():
setattr(corgidrp, setting, old_val)
def _get_satellite_spot_info_from_xml(xml_tree):
"""
Extracts satellite spot information from the CPGS XML file
Args:
xml_tree (ElementTree): loaded in CPGS XML file
Returns:
dict: dictionary with satellite spot information
"num_spots": int, number of satellite spots
"spot1_contrast": float, contrast of spot 1
"spot1_sep": float, separation of spot 1 in lam/D
"spo1_angle": float, angle of spot 1 in degrees
"spot2_contrast": float, contrast of spot 2
"spot2_sep": float, separation of spot 2 in lam/D
"spo2_angle": float, angle of spot 2 in degrees
"""
obs_specification = xml_tree.getroot()
sat_spot_info = obs_specification.find("satellite_spots")
sat_spot_output = {}
sat_spot_output['num_spots'] = 0
for i, pair in enumerate(sat_spot_info.findall("pair")):
sat_spot_output['num_spots'] += 1
if i == 0:
sat_spot_output['spot1_contrast'] = float(pair.find("intensity").text)
sat_spot_output['spot1_sep'] = float(pair.find("radial_distance").text)
sat_spot_output['spot1_angle'] = float(pair.find("clocking_angle").text)
elif i == 1:
sat_spot_output['spot2_contrast'] = float(pair.find("intensity").text)
sat_spot_output['spot2_sep'] = float(pair.find("radial_distance").text)
sat_spot_output['spot2_angle'] = float(pair.find("clocking_angle").text)
return sat_spot_output