Source code for corgidrp.l2a_to_l2b

# A file that holds the functions that transmogrify l2a data to l2b data
import numpy as np
from scipy.interpolate import interp1d
import copy
import warnings
import corgidrp.data as data
from corgidrp.darks import build_synthesized_dark
from corgidrp.detector import detector_areas, ENF


[docs] def add_shot_noise_to_err(input_dataset, kgain, detector_params): """ Propagate the Poisson/shot noise determined from the image signal to the error map. Also includes the effect of read noise. Estimation of photon/poisson/shot noise by interpolation of the photon transfer curve, added excess noise in case of em_gain > 1. Especially useful when a dataset has very few frames so that the shot noise is not accurately obtained by averaging the frames, nor is the read noise effectively mitigated through averaging. Args: input_dataset (corgidrp.data.Dataset): a dataset of Images (L2a-level) kgain (corgidrp.data.KGain): kgain calibration object detector_params (corgidrp.data.DetectorParams): detector parameters calibration object Returns: corgidrp.data.Dataset: shot noise propagated to the image error extensions of the input dataset """ # you should make a copy the dataset to start phot_noise_dataset = input_dataset.copy() #get the noise from the ptc curve ptc = kgain.ptc nem = detector_params.params['NEMGAIN'] for i, frame in enumerate(phot_noise_dataset.frames): # use measured gain if available TODO change hdr name if necessary try: em_gain = frame.ext_hdr["EMGAIN_M"] except: # use EM applied EM gain if available em_gain = frame.ext_hdr.get("EMGAIN_A", 0) if em_gain > 0: em_gain = em_gain else : # otherwise use commanded EM gain em_gain = frame.ext_hdr.get("EMGAIN_C", 0) #estimate of photon/poisson/shot noise by interpolation of the photon transfer curve interp_func = interp1d(ptc[:,0], ptc[:,1], kind='linear', fill_value='extrapolate') phot_err = interp_func(frame.data) #add excess noise in case of em_gain > 1 if em_gain > 1: phot_err *= ENF(em_gain, nem) frame.add_error_term(phot_err, "shotnoise_error") new_all_err = np.array([frame.err for frame in phot_noise_dataset.frames]) history_msg = "Poisson/shot noise propagated to error map" # update the output dataset phot_noise_dataset.update_after_processing_step(history_msg, new_all_err = new_all_err) return phot_noise_dataset
[docs] def dark_subtraction(input_dataset, noisemaps=None, dark=None, detector_regions=None, outputdir=None): """ Perform dark subtraction of a dataset using the corresponding dark frame. The dark frame can be either a synthesized master dark (made for any given EM gain and exposure time) or for a traditional master dark (average of darks taken at the EM gain and exposure time of the corresponding observation). The function gives preference for the traditional master dark if provided. If the frames are photon-counted (PC) and a PC dark is provided, the subtraction will not happen here but instead in the PC function pc_mean(), later in the pipeline. The master dark is also saved if it is of the synthesized type after it is built. When noisemaps is provided (and dark is not), the dataset may contain frames with multiple distinct EMGAIN and exposure time configurations; a separate synthesized dark is built for each configuration. Args: input_dataset (corgidrp.data.Dataset): a dataset of Images that need dark subtraction (L2a-level) noisemaps (corgidrp.data.DetectorNoiseMaps): If not None and if dark input not provided, a synthesized master dark is created using this DetectorNoiseMaps object for the EM gain and exposure time used in the frames in input_dataset. Multiple EMGAIN/EXPTIME configurations within input_dataset are supported in this mode. dark (corgidrp.data.Dark or corgidrp.data.DetectorNoiseMaps): If dark is of the corgidrp.data.Dark type, dark subtraction will be done with this input whether noisemaps is specified or not. detector_regions: (dict): A dictionary of detector geometry properties. Keys should be as found in detector_areas in detector.py. Defaults to detector_areas in detector.py. outputdir (string): Filepath for output directory where to save the master dark if it is a synthesized master dark. Defaults to current directory. Returns: corgidrp.data.Dataset: a dark-subtracted version of the input dataset including error propagation """ if noisemaps is None and dark is None: raise Exception('Either noisemaps or dark must be provided.') if input_dataset[0].ext_hdr['BUNIT'] != "detected electron": if input_dataset[0].ext_hdr['ISPC'] == 0: #i.e., analog raise ValueError ("input dataset must have unit 'detected electron' for dark subtraction, not {0}".format(input_dataset[0].ext_hdr['BUNIT'])) sub_datasets, unique_vals = input_dataset.split_dataset(exthdr_keywords=['EXPTIME', 'EMGAIN_C', 'KGAINPAR']) # When synthesizing darks from noisemaps, multiple detector configurations are supported: # split by configuration, process each subset, and recombine. if type(noisemaps) is data.DetectorNoiseMaps and dark is None and len(unique_vals) > 1: output_frames = [] for sub_dataset in sub_datasets: sub_result = dark_subtraction(sub_dataset, noisemaps=noisemaps, detector_regions=detector_regions, outputdir=outputdir) output_frames.extend(sub_result.frames) return data.Dataset(output_frames) if len(unique_vals) > 1: raise Exception('Input dataset should contain frames of the same exposure time, commanded EM gain, and k gain.') if detector_regions is None: detector_regions = detector_areas # you should make a copy the dataset to start darksub_dataset = input_dataset.copy() rows = detector_regions['SCI']['frame_rows'] cols = detector_regions['SCI']['frame_cols'] im_rows = detector_regions['SCI']['image']['rows'] im_cols = detector_regions['SCI']['image']['cols'] if type(noisemaps) is data.DetectorNoiseMaps and dark is None: if input_dataset.frames[0].data.shape == (rows, cols): full_frame = True elif input_dataset.frames[0].data.shape == (im_rows, im_cols): full_frame = False else: raise Exception('Frames in input_dataset do not have valid SCI full-frame or image dimensions.') if noisemaps.ext_hdr['DRPNFILE'] < len(input_dataset): warnings.warn('Number of frames which made the DetectorNoiseMaps product is less than the number of frames in input_dataset.') dark = build_synthesized_dark(input_dataset, noisemaps, detector_regions=detector_regions, full_frame=full_frame) if outputdir is None: outputdir = '.' #current directory # Base the dark filename on the input science frame, replacing the data-level # marker so the dark can be traced back to the data it was built from. input_filename = input_dataset[-1].filename or input_dataset[-1].pri_hdr.get('FILENAME', '') if '_l2a.' in input_filename: dark.filename = input_filename.replace('_l2a.', '_drk_cal.') dark.pri_hdr['FILENAME'] = dark.filename dark.save(filedir=outputdir) elif type(dark) is data.Dark: # dark is used whether noisemaps is provided or not if 'PC_STAT' in dark.ext_hdr: if dark.ext_hdr['PC_STAT'] == 'photon-counted master dark': if input_dataset[0].ext_hdr['ISPC'] == 0: #i.e., analog raise Exception('The input \'dark\' is a photon-counted master dark and cannot be used in the dark_subtraction step function with analog frames.') else: #i.e., photon-counted return darksub_dataset else: #traditional analog master dark pass #proceed with dark subtraction here, whether frames input_dataset is PC or not if dark.ext_hdr['DRPNFILE'] < len(input_dataset): warnings.warn('Number of frames which made the Dark product is less than the number of frames in input_dataset.') # In this case, the Dark loaded in should already match the array dimensions # of input_dataset, specified by full_frame argument of build_trad_dark # when this Dark was built if (dark.ext_hdr['EXPTIME'], dark.ext_hdr['EMGAIN_C'], dark.ext_hdr['KGAINPAR']) != unique_vals[0]: raise Exception('Dark should have the same EXPTIME, EMGAIN_C, and KGAINPAR as input_dataset.') else: raise Exception('No appropriate calibration input found for dark subtraction.') darksub_cube = darksub_dataset.all_data - dark.data # propagate the error of the dark frame if hasattr(dark, "err"): darksub_dataset.add_error_term(dark.err[0], "dark_error") else: warnings.warn("no error attribute in the dark frame") if hasattr(dark, "dq"): new_all_dq = np.bitwise_or(darksub_dataset.all_dq, dark.dq) else: new_all_dq = None history_msg = "Dark subtracted using dark {0}. Units changed from detected electrons to photoelectrons.".format(dark.filename) # update the output dataset with this new dark subtracted data and update the history darksub_dataset.update_after_processing_step(history_msg, new_all_data=darksub_cube, new_all_dq = new_all_dq, header_entries = {"BUNIT":"photoelectron"}) return darksub_dataset
[docs] def flat_division(input_dataset, flat_field): """ Divide the dataset by the master flat field. Args: input_dataset (corgidrp.data.Dataset): a dataset of Images (L2a-level) flat_field (corgidrp.data.FlatField): a master flat field to divide by Returns: corgidrp.data.Dataset: a version of the input dataset with the flat field divided out """ # copy of the dataset flatdiv_dataset = input_dataset.copy() #Divide by the master flat with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) # catch divide by zero flatdiv_cube = flatdiv_dataset.all_data / flat_field.data #Find where the flat_field is 0 and set a DQ flag: where_zero = np.where(flat_field.data == 0) flatdiv_dq = copy.deepcopy(flatdiv_dataset.all_dq) for i in range(len(flatdiv_dataset)): flatdiv_dq[i][where_zero] = np.bitwise_or(flatdiv_dataset[i].dq[where_zero], 4) # propagate the error of the master flat frame if hasattr(flat_field, "err"): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) # catch divide by zero flatdiv_dataset.rescale_error(1/flat_field.data, "FlatField") flatdiv_dataset.add_error_term(flatdiv_dataset.all_data*flat_field.err[0]/(flat_field.data**2), "FlatField_error") else: raise Warning("no error attribute in the FlatField") history_msg = "Flat calibration done using Flat field {0}".format(flat_field.filename) # update the output dataset with this new flat calibrated data and update the history flatdiv_dataset.update_after_processing_step(history_msg,new_all_data=flatdiv_cube, new_all_dq = flatdiv_dq) return flatdiv_dataset
[docs] def flat_division_pol(input_dataset, flat_fieldPOL0, flat_fieldPOL45): """ Divide the dataset by the master flat field. Args: input_dataset (corgidrp.data.Dataset): a dataset of Images (L2a-level) flat_fieldPOL0 (corgidrp.data.FlatField): a master flat field to divide by flat_fieldPOL45 (corgidrp.data.FlatField): a master flat field to divide by Returns: corgidrp.data.Dataset: a version of the input dataset with the flat field divided out """ # copy of the dataset flatdiv_dataset0 = input_dataset.copy() split_dataset_list, unique_vals = flatdiv_dataset0.split_dataset(exthdr_keywords=['DPAMNAME']) #Ouptut file container. output_frames = [] for flatdiv_dataset, val in zip(split_dataset_list, unique_vals): #Get the right flat: if val == 'POL0': flat_field = flat_fieldPOL0 elif val == 'POL45': flat_field = flat_fieldPOL45 else: #Throw error raise Exception('DPAMNAME value in input dataset should be either POL0 or POL45 for the flat division step function with polarization-dependent flats.') #Divide by the master flat with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) # catch divide by zero flatdiv_cube = flatdiv_dataset.all_data / flat_field.data #Find where the flat_field is 0 and set a DQ flag: where_zero = np.where(flat_field.data == 0) flatdiv_dq = copy.deepcopy(flatdiv_dataset.all_dq) for i in range(len(flatdiv_dataset)): flatdiv_dq[i][where_zero] = np.bitwise_or(flatdiv_dataset[i].dq[where_zero], 4) # propagate the error of the master flat frame if hasattr(flat_field, "err"): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) # catch divide by zero flatdiv_dataset.rescale_error(1/flat_field.data, "FlatField") flatdiv_dataset.add_error_term(flatdiv_dataset.all_data*flat_field.err[0]/(flat_field.data**2), "FlatField_error") else: raise Warning("no error attribute in the FlatField") history_msg = "Flat calibration done using Flat field {0}".format(flat_field.filename) # update the output dataset with this new flat calibrated data and update the history flatdiv_dataset.update_after_processing_step(history_msg,new_all_data=flatdiv_cube, new_all_dq = flatdiv_dq) output_frames.extend(flatdiv_dataset.frames) output_dataset = data.Dataset(output_frames) return output_dataset
[docs] def frame_select(input_dataset, bpix_frac=1., allowed_bpix=0, overexp=False, tt_rms_thres=None, tt_bias_thres=None, discard_bad=True): """ Selects the frames that we want to use for further processing. Args: input_dataset (corgidrp.data.Dataset): a dataset of Images (L2a-level) bpix_frac (float): greater than fraction of the image needs to be bad to discard. Default: 1.0 (not used) allowed_bpix (int): sum of DQ values that are allowed and not counted towards to bpix fraction (e.g., 6 means 2 and 4 are not considered bad). Default is 0 (all nonzero DQ flags are considered bad) overexp (bool): if True, removes frames where the OVEREXP keyword is True. Default: False tt_rms_thres (float): maximum allowed RMS tip or tilt in image to be considered good. Default: None (not used) tt_bias_thres (float): maximum allowed bias in tip/tilt over the course of an image to be consdiered good. Default: None (not used) discard_bad (bool): if True, drops the bad frames rather than keeping them through processing Returns: corgidrp.data.Dataset: a version of the input dataset with only the frames we want to use """ pruned_dataset = input_dataset.copy() reject_flags = np.zeros(len(input_dataset)) reject_reasons = {} disallowed_bits = np.invert(allowed_bpix) # invert the mask for i, frame in enumerate(pruned_dataset.frames): reject_reasons[i] = [] # list of rejection reasons if bpix_frac < 1: masked_dq = np.bitwise_and(frame.dq, disallowed_bits) # handle allowed_bpix values numbadpix = np.size(np.where(masked_dq > 0)[0]) frame_badpix_frac = numbadpix / np.size(masked_dq) # if fraction of bad pixel over threshold, mark is as bad if frame_badpix_frac > bpix_frac: reject_flags[i] += 1 reject_reasons[i].append("bad pix frac {0:.5f} > {1:.5f}" .format(frame_badpix_frac, bpix_frac)) frame.ext_hdr['FRMSEL01'] = (bpix_frac, "Bad Pixel Fraction < This Value. Doesn't include DQflags summed to {0}".format(allowed_bpix)) # record selection criteria if overexp: if frame.ext_hdr['OVEREXP']: reject_flags[i] += 2 # use distinct bits in case it's useful reject_reasons[i].append("OVEREXP = T") frame.ext_hdr['FRMSEL02'] = (overexp, "Are we selecting on the OVEREXP flag?") # record selection criteria if tt_rms_thres is not None: if frame.ext_hdr['Z2VAR'] > tt_rms_thres: reject_flags[i] += 4 # use distinct bits in case it's useful reject_reasons[i].append("tip rms (Z2VAR) {0:.1f} > {1:.1f}" .format(frame.ext_hdr['Z2VAR'], tt_rms_thres)) if frame.ext_hdr['Z3VAR'] > tt_rms_thres: reject_flags[i] += 8 # use distinct bits in case it's useful reject_reasons[i].append("tilt rms (Z3VAR) {0:.1f} > {1:.1f}" .format(frame.ext_hdr['Z3VAR'], tt_rms_thres)) if tt_rms_thres is None: tt_rms_thres_hdr = -999. # to keep type consistency else: tt_rms_thres_hdr = tt_rms_thres frame.ext_hdr['FRMSEL03'] = (tt_rms_thres_hdr, "tip rms (Z2VAR) threshold") # record selection criteria frame.ext_hdr['FRMSEL04'] = (tt_rms_thres_hdr, "tilt rms (Z3VAR) threshold") # record selection criteria if tt_bias_thres is not None: if frame.ext_hdr['Z2RES'] > tt_bias_thres: reject_flags[i] += 16 # use distinct bits in case it's useful reject_reasons[i].append("tip bias (Z2RES) {0:.1f} > {1:.1f}" .format(frame.ext_hdr['Z2RES'], tt_bias_thres)) if frame.ext_hdr['Z3RES'] > tt_bias_thres: reject_flags[i] += 32 # use distinct bits in case it's useful reject_reasons[i].append("tilt bias (Z3RES) {0:.1f} > {1:.1f}" .format(frame.ext_hdr['Z3RES'], tt_bias_thres)) if tt_bias_thres is None: tt_bias_thres_hdr = -999. # to keep type consistency else: tt_bias_thres_hdr = tt_bias_thres frame.ext_hdr['FRMSEL05'] = (tt_bias_thres_hdr, "tip bias (Z2RES) threshold") # record selection criteria frame.ext_hdr['FRMSEL06'] = (tt_bias_thres_hdr, "tilt bias (Z3RES) threshold") # record selection criteria # if rejected, mark as bad in the header if reject_flags[i] > 0: frame.ext_hdr['IS_BAD'] = True good_frames = np.where(reject_flags == 0) bad_frames = np.where(reject_flags > 0) # check that we didn't remove all of the good frames if np.size(good_frames) == 0: raise ValueError("No good frames were selected. Unable to continue") # if we need to discard bad, do that here. if discard_bad: pruned_frames = pruned_dataset.frames[good_frames] pruned_dataset = data.Dataset(pruned_frames) # history message of which frames were removed and why history_msg = "Removed {0} frames as bad:".format(np.size(bad_frames)) else: # history message of which frames were marked and why history_msg = "Marked {0} frames as bad:".format(np.size(bad_frames)) for bad_index in bad_frames[0]: bad_frame = input_dataset.frames[bad_index] bad_reasons = "; ".join(reject_reasons[bad_index]) history_msg += " {0} ({1}),".format(bad_frame.filename, bad_reasons) history_msg = history_msg[:-1] # remove last comma or : pruned_dataset.update_after_processing_step(history_msg) return pruned_dataset
[docs] def convert_to_electrons(input_dataset, k_gain): """ Convert the data from ADU to electrons. TODO: Establish the interaction with the CalDB for obtaining gain calibration Args: input_dataset (corgidrp.data.Dataset): a dataset of Images (L2a-level) k_gain (corgidrp.data.KGain): KGain calibration file Returns: corgidrp.data.Dataset: a version of the input dataset with the data in electrons """ if input_dataset[0].ext_hdr['BUNIT'] != "DN": raise ValueError("input dataset must have unit DN for the conversion, not {0}".format(input_dataset[0].ext_hdr['BUNIT'])) # you should make a copy the dataset to start kgain_dataset = input_dataset.copy() kgain_cube = kgain_dataset.all_data kgain = k_gain.value #extract from caldb error_frame = kgain_cube * k_gain.error kgain_cube *= kgain #scale also the old error with kgain and propagate the error kgain_dataset.rescale_error(kgain, "kgain") kgain_dataset.add_error_term(error_frame, "kgain_error") history_msg = "data converted to detected EM electrons by kgain {0}".format(str(kgain)) # update the output dataset with this converted data and update the history kgain_dataset.update_after_processing_step(history_msg, new_all_data=kgain_cube, header_entries = {"BUNIT":"detected EM electron", "KGAINPAR":kgain, "KGAIN_ER": k_gain.error, "RN":k_gain.ext_hdr['RN'], "RN_ERR":k_gain.ext_hdr["RN_ERR"]}) return kgain_dataset
[docs] def em_gain_division(input_dataset): """ Convert the data from detected EM electrons to detected electrons by dividing by the EM gain. Update the change in units in the header [detected electrons]. Args: input_dataset (corgidrp.data.Dataset): a dataset of Images (L2a-level) Returns: corgidrp.data.Dataset: a version of the input dataset with the data in units "detected electrons" """ if input_dataset[0].ext_hdr['BUNIT'] != "detected EM electron": raise ValueError("input dataset must have unit 'detected EM electron' for the conversion, not {0}".format(input_dataset[0].ext_hdr['BUNIT'])) # you should make a copy the dataset to start emgain_dataset = input_dataset.copy() emgain_cube = emgain_dataset.all_data emgain_error = emgain_dataset.all_err for i in range(len(emgain_dataset)): try: # use measured gain if available emgain = emgain_dataset[i].ext_hdr["EMGAIN_M"] except: emgain = emgain_dataset[i].ext_hdr["EMGAIN_A"] if emgain > 0: # use EM applied EM gain if available emgain = emgain_dataset[i].ext_hdr["EMGAIN_A"] else: # otherwise use commanded EM gain emgain = emgain_dataset[i].ext_hdr["EMGAIN_C"] emgain_cube[i] /= emgain emgain_error[i] /= emgain dataset_list, _ = emgain_dataset.split_dataset(exthdr_keywords=['EMGAIN_C']) if len(dataset_list) > 1: history_msg = "data divided by EM gain for dataset with frames with different commanded EM gains" else: history_msg = "data divided by EM gain for dataset with frames with the same commanded EM gain" # update the output dataset with this EM gain divided data and update the history emgain_dataset.update_after_processing_step(history_msg, new_all_data=emgain_cube, new_all_err=emgain_error, header_entries = {"BUNIT":"detected electron"}) return emgain_dataset
[docs] def cti_correction(input_dataset, pump_trap_cal): """ Apply the CTI correction to the dataset. Currently a no-op step function. Args: input_dataset (corgidrp.data.Dataset): a dataset of Images (L2a-level) pump_trap_cal (corgidrp.data.TrapCalibration): Pump trap calibration file Returns: corgidrp.data.Dataset: a version of the input dataset with the CTI correction applied """ # also remember to update CTI_CORR ext header keyword return input_dataset.copy()
[docs] def correct_bad_pixels(input_dataset, bp_mask): """ Correct for bad pixels: Bad pixels are identified as part of the data calibration. This function replaces bad pixels by NaN values. It also updates its DQ storing the type of bad pixel at each bad pixel location, and it records the fact that the pixel has been replaced by NaN. Args: input_dataset (corgidrp.data.Dataset): a dataset of Images (L2a-level) bp_mask (corgidrp.data.BadPixelMap): Bad-pixel mask built from the bad pixel calibration file. Returns: corgidrp.data.Dataset: a version of the input dataset with bad detector pixels and cosmic rays replaced by NaNs """ data = input_dataset.copy() data_cube = data.all_data dq_cube = data.all_dq.astype(np.uint8) for i in range(data_cube.shape[0]): # combine DQ and BP masks bp_dq_mask = np.bitwise_or(dq_cube[i],bp_mask.data.astype(np.uint8)) # mask affected pixels with NaN bp = np.where(bp_dq_mask != 0) data_cube[i, bp[0], bp[1]] = np.nan # Update DQ to keep track of replaced bad pixel values bp_dq_mask[bp[0], bp[1]]=np.bitwise_or(bp_dq_mask[bp[0], bp[1]], 2) dq_cube[i] = bp_dq_mask history_msg = "removed pixels affected by bad pixels" data.update_after_processing_step(history_msg, new_all_data=data_cube, new_all_dq=dq_cube) return data
[docs] def desmear(input_dataset, detector_params): """ EXCAM has no shutter, and so continues to illuminate the detector during readout. This creates a "smearing" effect into the resulting images. The desmear function corrects for this effect. There are a small number of use cases for not desmearing data (e.g. time-varying raster data). Args: input_dataset (corgidrp.data.Dataset): a dataset of Images (L2a-level) detector_params (corgidrp.data.DetectorParams): a calibration file storing detector calibration values Returns: corgidrp.data.Dataset: a version of the input dataset with desmear applied """ data = input_dataset.copy() data_cube = data.all_data rowreadtime_sec = detector_params.params['ROWREADT'] for i in range(data_cube.shape[0]): exptime_sec = float(data[i].ext_hdr['EXPTIME']) smear = np.zeros_like(data_cube[i]) m = len(smear) for r in range(m): columnsum = 0 for s in range(r+1): columnsum = columnsum + rowreadtime_sec/exptime_sec*((1 + rowreadtime_sec/exptime_sec)**((s+1)-(r+1)-1))*data_cube[i,s,:] smear[r,:] = columnsum data_cube[i] -= smear history_msg = "Desmear applied to data" header_update = {'DESMEAR' : True} data.update_after_processing_step(history_msg, new_all_data=data_cube, header_entries=header_update) return data
[docs] def update_to_l2b(input_dataset): """ Updates the data level to L2b. Only works on L2a data. Currently only checks that data is at the L2a level Args: input_dataset (corgidrp.data.Dataset): a dataset of Images (L2a-level) Returns: corgidrp.data.Dataset: same dataset now at L2b-level """ # check that we are running this on L1 data for orig_frame in input_dataset: if orig_frame.ext_hdr['DATALVL'] != "L2a": err_msg = "{0} needs to be L2a data, but it is {1} data instead".format(orig_frame.filename, orig_frame.ext_hdr['DATALVL']) raise ValueError(err_msg) # we aren't altering the data updated_dataset = input_dataset.copy(copy_data=False) for frame in updated_dataset: # update header frame.ext_hdr['DATALVL'] = "L2b" # update filename convention. The file convention should be # "CGI_[dataleel_*]" so we should be same just replacing the just instance of L1 frame.filename = frame.filename.replace("_l2a", "_l2b", 1) #updating filename in the primary header frame.pri_hdr['FILENAME'] = frame.filename history_msg = "Updated Data Level to L2b" updated_dataset.update_after_processing_step(history_msg) return updated_dataset