Source code for corgidrp.calibrate_kgain

import os
import warnings
import numpy as np
import pandas as pd
from pathlib import Path
from scipy.optimize import curve_fit

from corgidrp import check
import corgidrp.data as data
from corgidrp.data import Image
from corgidrp.detector import slice_section, detector_areas
from corgidrp.sorting import extract_datetime
from corgidrp.calibrate_nonlin import nonlin_kgain_dataset_2_stack

# Dictionary with constant kgain calibration parameters
[docs] kgain_params_default= { # ROI constants 'rowroi1': 9, 'rowroi2': 1000, 'colroi1': 1199, 'colroi2': 2000, # read noise bins range limits 'rn_bins1': -200, 'rn_bins2': 201, # maximum DN value to be included in PTC 'max_DN_val': 13000, # number of bins in the signal variables 'signal_bins_N': 400, }
[docs] def check_kgain_params(kgain_params): """ Checks integrity of kgain parameters in the dictionary kgain_params. Args: kgain_params (dict): Dictionary of parameters used for calibrating the k gain. """ if 'rowroi1' not in kgain_params: raise ValueError('Missing parameter: rowroi1.') if 'rowroi2' not in kgain_params: raise ValueError('Missing parameter: rowroi2.') if 'colroi1' not in kgain_params: raise ValueError('Missing parameter: colroi1.') if 'colroi2' not in kgain_params: raise ValueError('Missing parameter: colroi2.') if 'rn_bins1' not in kgain_params: raise ValueError('Missing parameter: rn_bins1.') if 'rn_bins2' not in kgain_params: raise ValueError('Missing parameter: rn_bins2.') if 'max_DN_val' not in kgain_params: raise ValueError('Missing parameter: max_DN_val.') if 'signal_bins_N' not in kgain_params: raise ValueError('Missing parameter: signal_bins_N.') if not isinstance(kgain_params['rowroi1'], (float, int)): raise TypeError('rowroi1 is not a number') if not isinstance(kgain_params['rowroi2'], (float, int)): raise TypeError('rowroi2 is not a number') if not isinstance(kgain_params['colroi1'], (float, int)): raise TypeError('colroi1 is not a number') if not isinstance(kgain_params['colroi2'], (float, int)): raise TypeError('colroi2 is not a number') if not isinstance(kgain_params['rn_bins1'], (float, int)): raise TypeError('rn_bins1 is not a number') if not isinstance(kgain_params['rn_bins2'], (float, int)): raise TypeError('rn_bins2 is not a number') if not isinstance(kgain_params['max_DN_val'], (float, int)): raise TypeError('max_DN_val is not a number') if not isinstance(kgain_params['signal_bins_N'], (float, int)): raise TypeError('signal_bins_N is not a number')
[docs] class CalKgainException(Exception): """Exception class for calibrate_kgain."""
################### function defs #####################
[docs] def ptc_bin2(frame_in, mean_frame, binwidth, max_DN): """ frame_in is a bias-corrected frame trimmed to the ROI. mean_frame is the scaled high SNR mean frame made from the >=30 frames of uniform exposure time, binwidth is an integer equal to the width of each bin, max_DN is an integer equal to the maximum DN value to be included in the bins. Args: frame_in (np.array): bias corrected frame mean_frame (np.array): mean frame of uniform exposure time binwidth (int): width of each bin max_DN (int): maximum DN value Returns: np.array: mean array np.array: mean array """ # calculate the size of output arrays rows, cols = frame_in.shape out_rows, out_cols = rows // binwidth, cols // binwidth local_mean_array = np.zeros((out_rows, out_cols)) local_noise_array = np.zeros((out_rows, out_cols)) # Define the bin edges row_bins = np.arange(1, rows + 1, binwidth) col_bins = np.arange(1, cols + 1, binwidth) tot_bins = len(row_bins) * len(col_bins) DN_bin = max_DN / tot_bins # Flatten the arrays for easier indexing mean_flat = mean_frame.flatten() frame_in_flat = frame_in.flatten() DN_val = 0 for m in range(len(row_bins) - 1): for n in range(len(col_bins) - 1): DN_val += DN_bin # Create masks based on DN values mean_idx = (mean_flat >= DN_val) & (mean_flat < (DN_val + DN_bin)) # Ensure that there is at least one element to calculate mean and std if np.any(mean_idx): with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=RuntimeWarning) local_mean_array[m, n] = np.nanmean(frame_in_flat[mean_idx]) local_noise_array[m, n] = np.nanstd(frame_in_flat[mean_idx]) else: local_mean_array[m, n] = 0 local_noise_array[m, n] = 0 return local_mean_array, local_noise_array
[docs] def diff2std(diff_frame, detector_regions=None): """ calculate the standard deviation of the frame difference, diff_frame within the ROI row and column boundaries. Args: diff_frame (np.array): frame diffference detector_regions (dict): a dictionary of detector geometry properties. Keys should be as found in detector_areas in detector.py. Defaults to that dictionary. Returns: np.array: standard deviation of frame difference """ selected_area = slice_section(diff_frame, 'SCI', 'prescan_reliable', detector_regions) std_value = np.nanstd(selected_area.reshape(-1), ddof=1) # dividing by sqrt(2) since we want std of one frame return std_value / np.sqrt(2)
[docs] def gauss(x, A, mean, sigma): """ Gauss function. Called by sgaussfit. Args: x (np.array): input array A (float): amplitude mean (float): mean value sigma (float): sigma Returns: np.array: Gauss function """ return A * np.exp(-0.5 * ((x - mean) ** 2) / (sigma ** 2))
[docs] def sgaussfit(xdata, ydata, gaussinp): """ Find fitting parameters to Gauss function. Called by Single_peakfit. gaussinp is an array containing initial values of A, mean, sigma. Args: xdata (np.array): input x array ydata (np.array): input y array gaussinp (np.array): initial guess array Returns: np.array: fit parameters np.array: fit parameters """ popt, pcov = curve_fit(gauss, xdata, ydata, p0=gaussinp) sse = np.sum((ydata - gauss(xdata, *popt)) ** 2) return popt, lambda params: (sse, gauss(xdata, *params))
[docs] def Single_peakfit(xdata, ydata): """ Fit Gauss function to x, y data. Returns mean and sigma. Only sigma is used in main code call. Args: xdata (np.array): x data ydata (np.array): y data Returns: float: sigma """ astart = np.nanmax(ydata) mustart = xdata[np.argmax(ydata)] sigmastart = 10 sgaussinp = [astart, mustart, sigmastart] estimates, model = sgaussfit(xdata, ydata, sgaussinp) a1, mu1, sigma = estimates return sigma
[docs] def histc_roi(frame, rn_bins, detector_regions=None): """ Histogram of pixel values of frame within the ROI and bins defined in rn_bins. Returns the counts in each bin. Args: frame (np.array): frame rn_bins (int): histogram bins detector_regions (dict): a dictionary of detector geometry properties. Keys should be as found in detector_areas in detector.py. Defaults to that dictionary. Returns: np.array: counts in each bin """ selected_area = slice_section(frame, 'SCI', 'prescan_reliable', detector_regions) data_reshaped = selected_area.ravel() counts, _ = np.histogram(data_reshaped, bins=rn_bins) return counts
[docs] def calculate_mode(arr): """ calculates histogram of an array Args: arr (np.array): input array Returns: np.array: bin center values np.array: bin center counts """ counts, bin_edges = np.histogram(arr) # Calculate bin centers (values) values = (bin_edges[:-1] + bin_edges[1:]) / 2 max_count_index = np.argmax(counts) return values[max_count_index], counts[max_count_index]
[docs] def sigma_clip(data, sigma=2.5, max_iters=6): """ Perform sigma-clipping on the data. Args: data (np.array): The input data to be sigma-clipped. sigma (float): The number of standard deviations to use for clipping. max_iters (int): The maximum number of iterations to perform. Returns: np.ndarray: The sigma-clipped data. np.ndarray: A boolean mask where True indicates a clipped value. """ data = np.asarray(data) clipped_data = data.copy() for i in range(max_iters): mean = np.nanmean(clipped_data) std = np.nanstd(clipped_data) mask = np.abs(clipped_data - mean) > sigma * std if not np.any(mask): break clipped_data = clipped_data[~mask] return clipped_data, mask
######################### start of main code #############################
[docs] def calibrate_kgain(dataset_kgain, n_cal=10, n_mean=30, min_val=800, max_val=3000, binwidth=68, make_plot=False,plot_outdir='figures', show_plot=False, logspace_start=-1, logspace_stop=4, logspace_num=200, verbose=False, detector_regions=None, kgain_params=None, apply_dq = True, dataset_copy=True): """ kgain (e-/DN) is calculated from the means and variances within the defined minimum and maximum mean values. A photon transfer curve is plotted from the std dev and mean values from the bins. Args: dataset_kgain (corgidrp.Dataset): The frames in the dataset are bias-subtracted. The dataset contains frames belonging to two different sets -- Mean frame and a large array of unity gain frames. Mean frame: Unity gain frames with constant exposure time. These frames are used to create a mean pupil image. The mean frame is used to select pixels in each frame of the large array of unity gain frames (see next) to calculate its mean signal. In general, it is expected that at least 30 frames or more will be taken for this set. In TVAC, 30 frames, each with an exposure time of 5.0 sec were taken. Large array of unity gain frames: Set of unity gain frames with subsets of equal exposure times. Data for each subset should be taken sequentially: Each subset must have at least 5 frames. All frames for a subset are taken before moving to the next subset. Two of the subsets have the same (repeated) exposure time. These two subsets are not contiguous: The first subset is taken near the start of the data collection and the second one is taken at the end of the data collection (see TVAC example below). Two subsets with the same exposure time are not strictly needed for k gain calibration, but they are needed for nonlinearity calibration, and the same visit will be used to cover both k gain and nonlinearity calibrations, so there should be two unity gain frame subsets with the same exposure time. The mean signal of these two subsets is used to correct for illumination brightness/sensor sensitivity drifts in nonlinearity calibration for all the frames in the whole set, depending on when the frames were taken. There should be no other repeated exposure time among the subsets. In TVAC, a total of 110 frames were taken within this category. The 110 frames consisted of 22 subsets, each with 5 frames. All 5 frames had the same exposure time. The exposure times in TVAC in seconds were, each repeated 5 times to collect 5 frames in each subset -- 0.077, 0.770, 1.538, 2.308, 3.077, 3.846, 4.615, 5.385, 6.154, 6.923, 7.692, 8.462, 9.231, 10.000, 11.538, 10.769, 12.308, 13.077, 13.846, 14.615, 15.385, and 1.538 (again). n_cal (int): Minimum number of sub-stacks used to calibrate K-Gain. The default value is 10. n_mean (int): Minimum number of frames used to generate the mean frame. The default value is 30. min_val (int): Minimum value (in DN) of mean values from sub-stacks to use in calculating kgain. (> 400 recommended) max_val (int): Maximum value (in DN) of mean values from sub-stacks to use in calculating kgain. Choose value that avoids large deviations from linearity at high counts. (< 6,000 recommended) binwidth (int): (Optional) Width of each bin for calculating std devs and means from each sub-stack in dataset_kgain. Maximum value of binwidth is 800. Notice that small values increase computation time. (minimum 10; binwidth between 65 and 75 recommended) make_plot (bool): (Optional) generate and store plots. Default is True. plot_outdir (str): (Optional) Output directory to store figues. Default is 'figures'. The default directory is not tracked by git. show_plot (bool): (Optional) display the plots. Default is False. logspace_start (int): log plot min value in np.logspace. logspace_stop (int): log plot max value in np.logspace. logspace_num (int): Number of elements in np.logspace. verbose (bool): (Optional) display various diagnostic print messages. Default is False. detector_regions (dict): a dictionary of detector geometry properties. Keys should be as found in detector_areas in detector.py. Defaults to that dictionary. kgain_params (dict): (Optional) Dictionary containing row and col specifications for the region of interest (indicated by 'rowroi1','rowroi2','colroi1',and 'colroi2'). The 'roi' needs one square region specified, and 'back' needs two square regions, where a '1' ending indicates the smaller of two values, and a '2' ending indicates the larger of two values. The coordinates of the square region are specified by matching up as follows: (rowroi1, colroi1), (rowroi2, colroi1), etc. Also must contain: 'rn_bins1': lower bound of counts histogram for fitting or read noise 'rn_bins2': upper bound of counts histogram for fitting or read noise 'max_DN_val': maximum DN value to be included in photon transfer curve (PTC) 'signal_bins_N': number of bins in the signal variables of PTC curve Defaults to kgain_params_default included in this file. apply_dq (bool): consider the dq mask (from cosmic ray detection) or not dataset_copy (bool): flag indicating whether the input dataset will be preserved after this function is executed or not. If False, the output dataset will be the input dataset modified, and the input and output datasets will be identical. This is useful when handling a large dataset and when the input dataset is not needed afterwards. Defaults to True. Returns: corgidrp.data.KGain: kgain estimate from the least-squares fit to the photon transfer curve (in e-/DN). The expected value of kgain for EXCAM with flight readout sequence should be between 8 and 9 e-/DN """ if kgain_params is None: kgain_params = kgain_params_default check_kgain_params(kgain_params) if detector_regions is None: detector_regions = detector_areas # cast dataset objects into np arrays for convenience #cal_list, mean_frame_list, actual_gain = kgain_dataset_2_list(dataset_kgain, apply_dq = apply_dq) cal_list, mean_frame_list, _, _, _, actual_gains, _, truncated_set_len = nonlin_kgain_dataset_2_stack(dataset_kgain, apply_dq = apply_dq, cal_type='kgain', dataset_copy=dataset_copy) split_arr = np.arange(0,len(cal_list[0]), truncated_set_len)[1:] cal_list = np.array(np.split(cal_list[0], split_arr)) actual_gain = np.nanmean(actual_gains) # check number of frames, unique EM value, exposure times and datetimes tmp = cal_list[0] for idx in range(4): try: tmp = tmp[idx] except: pass if idx != 3: raise CalKgainException('cal_list must be 4-D (i.e., a stack of ' '3-D sub-stacks)') if len(cal_list) < n_cal: raise CalKgainException(f'Number of sub-stacks in cal_list must ' 'be more than {n_cal}.') if len(cal_list) < 20 : print('Number of sub-stacks in cal_list is less than 20, ' 'which is the recommended minimum number for a good fit ') for i in range(len(cal_list)): check.threeD_array(cal_list[i], 'cal_list['+str(i)+']', TypeError) if len(cal_list[i]) < 5: raise CalKgainException(f'A sub-stack in cal_list was found with less than 5 ' 'frames, which is the required minimum number per sub-stack') if i > 0: if len(cal_list[i-1]) != len(cal_list[i]): raise CalKgainException('All sub-stacks must have the ' 'same number of frames and frame shape.') tmp = mean_frame_list[0] for idx in range(3): try: tmp = tmp[idx] except: pass if idx != 2: raise CalKgainException('mean_frame_list must be 3-D (i.e., a stack of ' '2-D sub-stacks') if len(mean_frame_list) < n_mean: raise CalKgainException(f'Number of sub-stacks in mean_frame_list must ' 'be equal to or greater than {n_mean}.') check.real_positive_scalar(actual_gain, 'actual_gain', TypeError) if actual_gain != 1: raise CalKgainException('Actual gain must equal 1.') check.positive_scalar_integer(min_val, 'min_val', TypeError) check.positive_scalar_integer(max_val, 'max_val', TypeError) if min_val >= max_val: raise CalKgainException('max_val must be greater than min_val') check.positive_scalar_integer(binwidth, 'binwidth', TypeError) if binwidth < 10: raise CalKgainException('binwidth must be >= 10.') if binwidth > 800: raise CalKgainException('binwidth must be < 800.') if not isinstance(logspace_start, (float, int)): raise TypeError('logplot1 is not a number') if not isinstance(logspace_stop, (float, int)): raise TypeError('logplot2 is not a number') if not isinstance(logspace_num, (float, int)): raise TypeError('logplot3 is not a number') # get relevant constants rowroi1 = kgain_params['rowroi1'] rowroi2 = kgain_params['rowroi2'] colroi1 = kgain_params['colroi1'] colroi2 = kgain_params['colroi2'] rn_bins1 = kgain_params['rn_bins1'] rn_bins2 = kgain_params['rn_bins2'] max_DN_val = kgain_params['max_DN_val'] signal_bins_N = kgain_params['signal_bins_N'] if make_plot is True: # Avoid issues with importing matplotlib on headless servers without GUI # support without proper configuration import matplotlib.pyplot as plt # Output directory if os.path.exists(plot_outdir) is False: os.mkdir(plot_outdir) if verbose: print('Output directory for figures created in ', os.getcwd()) # NOTE: binwidth must be <= rowroi and colroi rowroi = slice(rowroi1,rowroi2) colroi = slice(colroi1,colroi2) averages = [] deviations = [] read_noise = [] deviations_shot = [] #mean_signals = [] rn_bins = np.array(range(rn_bins1, rn_bins2)) bin_locs = rn_bins[0:-1] max_DN = max_DN_val; # maximum DN value to be included in PTC nrow = len(cal_list[0][0]) ncol = len(cal_list[0][0][0]) # prepare "good mean frame" good_mean_frame = np.zeros((nrow, ncol)) nFrames2 = len(mean_frame_list) for mean_frame_count in range(nFrames2): good_mean_frame += mean_frame_list[mean_frame_count] good_mean_frame = good_mean_frame / nFrames2 # Slice the good_mean_frame array frame_slice = good_mean_frame[rowroi, colroi] # If requested, create a figure and plot the sliced frame if make_plot: fname = 'kgain_mean_frame' plt.figure() # 'viridis' is a common colormap plt.imshow(frame_slice, aspect='equal', cmap='viridis') plt.colorbar() plt.title('Good quality mean frame') plt.savefig(f'{plot_outdir}/{fname}') if verbose: print(f'Figure {fname} stored in {plot_outdir}') if show_plot: plt.show() plt.close() nFrames = len(cal_list[0]) # number of frames in an exposure set nSets = len(cal_list) # number of exposure sets # Start with specific offset indices for list comprehension in jj loop index_offsets = [2, 3, 1, 4, 5] # Start with specific index pairs for list comprehension in jj loop index_pairs = [(0, 1), (0, 2), (1, 2), (2, 3), (3, 4)] # if nFrames>5, then append additional terms to index_pairs if nFrames > 5: # append offsets index_offsets += [i for i in range(4, nFrames - 1)] # append index pairs (i, i + 1) for i from 4 to len(frames) - 2 index_pairs += [(i, i + 1) for i in range(4, nFrames - 1)] for jj in range(nSets): if verbose: print(jj) # multi-frame analysis method frames = [cal_list[jj][nFrames - offset] for offset in index_offsets] # Calculate frame differences frames_diff = [frames[j] - frames[k] for j, k in index_pairs] # calculate read noise with std from prescan rn_std = [diff2std(frames_diff[x], detector_regions) for x in range(len(frames_diff))] counts_diff = [histc_roi(frames_diff[x], rn_bins, detector_regions) for x in range(len(frames_diff))] # calculate read noise from prescan with Gaussian fit rn_gauss = [Single_peakfit(bin_locs,counts_diff[x])/np.sqrt(2) for x in range(len(frames_diff))] # split each frame up into bins, take std and mean of each region with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=RuntimeWarning) mean_frames_mean_curr0 = np.nanmean(frames, axis=0) mean_frames_mean_curr = np.nanmean(mean_frames_mean_curr0[rowroi,colroi]) # Calculate the means mean_good_mean_frame_roi = np.nanmean(good_mean_frame[rowroi,colroi]) # Compute the scaling factor scaling_factor = mean_frames_mean_curr / mean_good_mean_frame_roi # Apply the scaling to the selected ROI in good_mean_frame good_mean_frame_scaled = scaling_factor * good_mean_frame[rowroi,colroi] # calculate means and stdevs using good_mean_frame_scaled logical indexing local_mean_arrays = [ptc_bin2(frame[rowroi, colroi], good_mean_frame_scaled, binwidth, max_DN)[0] for frame in frames] local_noise_arrays = [ptc_bin2(frame[rowroi, colroi], good_mean_frame_scaled, binwidth, max_DN)[1] for frame in frames_diff] std_diffs = local_noise_arrays / np.sqrt(2) averages0 = [array.reshape(-1, 1) for array in local_mean_arrays] averages.extend(averages0) deviations0 = [array.reshape(-1, 1) for array in std_diffs] deviations.extend(deviations0) with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=DeprecationWarning) added_deviations_shot_arr = [ np.sqrt(np.square(np.reshape(std_diffs[x], shape=(-1, 1))) - complex(rn_std[x])**2) for x in range(len(rn_std)) ] deviations_shot.extend(added_deviations_shot_arr) read_noise.extend(rn_std) averages = np.concatenate([arr.flatten() for arr in averages]) deviations = np.concatenate([arr.flatten() for arr in deviations]) deviations_shot = np.concatenate([arr.flatten() for arr in deviations_shot]) averages_deviations_vector = np.column_stack((averages, np.abs(deviations_shot))) # Generate linearly spaced bins signal_bins = np.linspace(np.nanmin(averages), np.nanmax(averages), signal_bins_N) signal_bins = np.insert(signal_bins, 0, 0) # Insert 0 at the beginning # Initialize containers for the results binned_averages_compiled = [] binned_averages_error = [] binned_shot_deviations_compiled = [] binned_deviations_error = [] binned_total_deviations = [] # Loop through each bin, excluding the first which is just 0 for b in range(1, len(signal_bins)): average_bool = (averages_deviations_vector[:, 0] > signal_bins[b-1]) & (averages_deviations_vector[:, 0] < signal_bins[b]) # Filter data within the current bin current_binned_averages = averages_deviations_vector[:, 0][average_bool] current_binned_deviations = averages_deviations_vector[:, 1][average_bool] current_binned_total_deviations = deviations[average_bool] # Compute statistics if there are data points within the bin if current_binned_averages.size > 0: binned_averages_compiled.append(np.mean(current_binned_averages)) with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=RuntimeWarning) binned_averages_error.append(np.std(current_binned_averages, ddof=1) / np.sqrt(current_binned_averages.size)) binned_shot_deviations_compiled.append(np.mean(current_binned_deviations)) with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=RuntimeWarning) binned_deviations_error.append(np.std(current_binned_deviations, ddof=1) / np.sqrt(current_binned_averages.size)) binned_total_deviations.append(np.mean(current_binned_total_deviations)) else: # Append NaN or some other placeholder if no data points in the bin binned_averages_compiled.append(np.nan) binned_averages_error.append(np.nan) binned_shot_deviations_compiled.append(np.nan) binned_deviations_error.append(np.nan) binned_total_deviations.append(np.nan) # Convert lists to numpy arrays compiled_binned_averages = np.array(binned_averages_compiled) compiled_binned_shot_deviations = np.array(binned_shot_deviations_compiled) compiled_binned_total_deviations = np.array(binned_total_deviations) compiled_binned_deviations_error = np.array(binned_deviations_error) # define bounds for linearity fit # lower bound indices should include as many data points as possible # upper bound indices should avoid nonlin region lower_bound = np.nanmin(np.where(compiled_binned_averages > min_val)[0]) upper_bound = np.nanmax(np.where(compiled_binned_averages < max_val)[0]) # Logarithmic transformation of specific array segments logged_averages = np.log10(compiled_binned_averages[lower_bound:upper_bound]) logged_deviations = np.log10(compiled_binned_shot_deviations[lower_bound:upper_bound]) # Find indices of NaN's nan_dev = np.isnan(logged_deviations) nan_avg = np.isnan(logged_averages) # Filter out NaN's from the data logged_deviations = logged_deviations[~nan_dev & ~nan_avg] logged_averages = logged_averages[~nan_dev & ~nan_avg] # Calculate the error factor logged_deviations_error_factor = compiled_binned_deviations_error / compiled_binned_shot_deviations logged_deviations_error_factor = logged_deviations_error_factor[lower_bound:upper_bound] logged_deviations_error_factor = logged_deviations_error_factor[~nan_dev & ~nan_avg] # Calculate the logged deviations error logged_deviations_error = logged_deviations_error_factor * logged_deviations # Create a mask for non-NaN values non_nan_mask = ~np.isnan(logged_deviations_error) # Use the mask to filter out NaN values y_err = logged_deviations_error[non_nan_mask] x_vals = logged_averages[non_nan_mask] y_vals = logged_deviations[non_nan_mask] # make array of kgain values from variance and mean values # equation: k_gain = mean / signal variance kgain_arr = 10**(x_vals)/(10**y_vals)**2 if make_plot: fname = 'kgain_histogram' plt.figure() # 'auto' lets matplotlib decide the number of bins plt.hist(kgain_arr, bins='auto', log=True) plt.title('Histogram of kgain values') plt.savefig(f'{plot_outdir}/{fname}') if verbose: print(f'Figure {fname} stored in {plot_outdir}') if show_plot: plt.show() plt.close() if make_plot: fname = 'kgain_vs_mean' plt.figure() plt.plot(10**x_vals, kgain_arr, marker='o', linestyle='-', color='b', label='kgain') # Set y-axis range plt.ylim(6, 11) # Add a horizontal line at y = 10 plt.axhline(y=8.7, color='r', linestyle='--', label='y = 8.7') # Add a grid plt.grid(True) plt.title('kgain versus mean') plt.savefig(f'{plot_outdir}/{fname}') if verbose: print(f'Figure {fname} stored in {plot_outdir}') if show_plot: plt.show() plt.close() kgain_clipped, _ = sigma_clip(kgain_arr) mode_kgain,_ = calculate_mode(kgain_clipped) # adopt 'mode_kgain' as the final value to return kgain = mode_kgain if make_plot: fname = 'kgain_clipped_histogram' plt.figure() # 'auto' lets matplotlib decide the number of bins plt.hist(kgain_clipped, bins='auto', log=True) plt.title('Histogram of clipped kgain values') plt.savefig(f'{plot_outdir}/{fname}') if verbose: print(f'Figure {fname} stored in {plot_outdir}') if show_plot: plt.show() plt.close() # parameter for plot parm1 = -0.5*np.log10(kgain) # Gaussian read noise value in DN mean_rn_gauss_DN = np.nanmean(rn_gauss) mean_rn_gauss_e = mean_rn_gauss_DN * kgain mean_rn_std_DN = np.nanmean(read_noise) mean_rn_std_e = mean_rn_std_DN * kgain # If requested, plotting if make_plot: fname = 'kgain_ptc' # Create log-spaced averages for plotting full_range_averages = np.logspace(logspace_start, logspace_stop, logspace_num) plt.figure(num=3, figsize=(20, 10)) # Main data plots plt.plot(compiled_binned_averages, compiled_binned_total_deviations, label='Shot Noise + Read Noise') # Assuming `lower_bound` is 6 plt.plot(compiled_binned_averages, compiled_binned_shot_deviations, label='Shot Noise') plt.errorbar(10**logged_averages, 10**logged_deviations, yerr=10**(logged_deviations_error), color='gray', label='Fitted Points') # Extra lines plt.plot(10**full_range_averages, 10**(full_range_averages * 0.50 + parm1), 'k-', label='Forced 0.50 Gradient') # Reference lines and text annotations plt.axhline(y=mean_rn_std_DN, color='k', linestyle='--') plt.axvline(x=95000, color='k', linestyle='--') plt.text(2, 1.5, f'Fitted k gain = {np.round(kgain,1)} e/DN', fontsize=15) plt.text(2, 10, f'Read noise (Gaussian fit) = {np.round(mean_rn_gauss_DN,1)} DN = {mean_rn_gauss_e} e', fontsize=15) plt.text(2, 8, f'Read noise (Std) = {np.round(mean_rn_std_DN,1)} DN = {np.round(mean_rn_std_e,1)} e', fontsize=15) # Scaling and labeling plt.xscale('log') plt.yscale('log') plt.xlabel('Signal (DN)') plt.ylabel('Noise (DN)') plt.title('PTC curve fit') plt.grid(True) # Setting axis limits plt.xlim([1, 20000]) plt.ylim([1, 1000]) # Legend and aesthetics plt.legend(loc='upper left') plt.gca().tick_params(axis='both', which='major', labelsize=18) # Adjust figure position plt.gcf().set_dpi(100) plt.gcf().set_size_inches(20, 10) plt.savefig(f'{plot_outdir}/{fname}') if verbose: print(f'Figure {fname} stored in {plot_outdir}') if show_plot: plt.show() plt.close() # prepare PTC array ptc_list = [compiled_binned_averages, compiled_binned_total_deviations] ptc = np.column_stack(ptc_list) invalid_krn_keywords = data.typical_cal_invalid_keywords + ['EXPTIME', 'EMGAIN_C', 'EMGAIN_A', 'HVCBIAS'] # Remove specific keywords for key in ['PROGNUM', 'EXECNUM', 'CAMPAIGN', 'SEGMENT', 'OBSNUM']: if key in invalid_krn_keywords: invalid_krn_keywords.remove(key) prihdr, exthdr, errhdr, dqhdr = check.merge_headers(dataset_kgain, invalid_keywords=invalid_krn_keywords) # Read noise and error exthdr['RN'] = mean_rn_gauss_e # rn err depends on spread of data that determines rn and the error in kgain, # so use error propagation to find error in (rn in DN)*kgain kgain_err = np.nanstd(kgain_clipped) rn_err_DN = np.nanstd(rn_gauss) rn_err_e = np.sqrt((kgain*rn_err_DN)**2 + (mean_rn_gauss_DN*kgain_err)**2) exthdr['RN_ERR'] = rn_err_e exthdr['RN_UNIT'] = 'detected electron' # Update history exthdr['HISTORY'] = f"Kgain and read noise derived from a set of frames on {exthdr['DATETIME']}" k_gain = data.KGain(kgain, err = kgain_err, ptc = ptc, pri_hdr = prihdr, ext_hdr = exthdr, input_dataset=dataset_kgain) return k_gain
[docs] def kgain_dataset_2_list(dataset, apply_dq = True): """ Casts the CORGIDRP Dataset object for K-gain calibration into a list of numpy arrays sharing the same exposure time. It also returns the list of unique EM values and set of exposure times used with each EM. Note: EM gain is the commanded values: EMGAIN_C. This function also performs a set of tests about the data type and values in dataset. Args: dataset (corgidrp.Dataset): Dataset with a set of of EXCAM illuminated pupil L1 SCI frames (counts in DN) apply_dq (bool): consider the dq mask (from cosmic ray detection) or not Returns: list with stack of stacks of data array associated with each frame array of exposure times associated with each frame array of datetimes associated with each frame """ # Split Dataset dataset_cp = dataset.copy() split = dataset_cp.split_dataset(exthdr_keywords=['EXPTIME']) # Data stack = [] # Mean frame data mean_frame_stack = [] # Exposure times exp_times = [] # Datetimes datetimes = [] # EM gains (There can only be an EM gain in the data used to calibrate K-gain) em_gains = [] # Size of each sub stack len_sstack = [] # Record measured gain of each substack of calibration frames gains = [] latest_datetime = 0 #initialize for idx_set, data_set in enumerate(split[0]): # Second layer (array of different exposure times) sub_stack = [] record_exp_time = True record_len = True record_gain = True for frame in data_set.frames: if apply_dq: bad = np.where(frame.dq > 0) frame.data[bad] = np.nan if record_exp_time: exp_time_mean_frame = frame.ext_hdr['EXPTIME'] record_exp_time = False if frame.ext_hdr['EXPTIME'] != exp_time_mean_frame: raise Exception('Frames in the same data set must have the same exposure time') if frame.pri_hdr['OBSNAME'] == 'MNFRAME': if frame.ext_hdr['EMGAIN_C'] != 1: raise Exception('The commanded gain used to build the mean frame must be unity') mean_frame_stack.append(frame.data) else: if record_len: len_sstack.append(len(data_set.frames)) record_len = False sub_stack.append(frame.data) exp_time = frame.ext_hdr['EXPTIME'] if isinstance(exp_time, float) is False: raise Exception('Exposure times must be float') if exp_time <=0: raise Exception('Exposure times must be positive') exp_times.append(exp_time) sctsrt = frame.ext_hdr['SCTSRT'] date_number = extract_datetime(sctsrt) if date_number > latest_datetime: latest_datetime = date_number repeat_ind = len(sub_stack) - 1 if isinstance(sctsrt, str) is False: raise Exception('SCTSRT must be a string') datetimes.append(sctsrt) em_gain = frame.ext_hdr['EMGAIN_C'] if em_gain < 1: raise Exception('Commanded EM gain must be >= 1') em_gains.append(em_gain) if record_gain: try: # if EM gain measured directly from frame gains.append(frame.ext_hdr['EMGAIN_M']) except: if frame.ext_hdr['EMGAIN_A'] > 0: # use applied EM gain if available gains.append(frame.ext_hdr['EMGAIN_A']) else: # use commanded gain otherwise gains.append(frame.ext_hdr['EMGAIN_C']) record_gain = False # Calibration data may have different subsets if len(sub_stack) != 0: stack.append(np.stack(sub_stack)) # Need to split substacks with the same exposure times stack_cp = [] # Get expected size of substacks len_sub = min(len_sstack) rep_len_sub = 2*len_sub # Length of substack must be at least 1 if len(len_sstack) == 0: raise Exception('Substacks must have at least one element') for sub in stack: if stack.index(sub) == repeat_ind: continue if len(sub) == len_sub: stack_cp.append(sub) else: # Add extra care confirming all collected substacks have same # frames if len(sub)/len_sub != len(sub)//len_sub: # truncate substack to the nearest multiple of len_sub sub = sub[:len(sub)//len_sub*len_sub] idx_0 = 0 for rep in range(len(sub)//len_sub): stack_cp.append(sub[idx_0:idx_0+len_sub]) idx_0 += len_sub if len(stack[repeat_ind]) == rep_len_sub: stack_cp.append(sub) else: rep_sub = stack[repeat_ind] # Add extra care confirming all collected substacks have same # frames if len(rep_sub)/rep_len_sub != len(rep_sub)//rep_len_sub: # truncate substack to the nearest multiple of len_sub rep_sub = rep_sub[:len(rep_sub)//rep_len_sub*rep_len_sub] idx_0 = 0 for rep in range(len(rep_sub)//rep_len_sub): stack_cp.append(rep_sub[idx_0:idx_0+rep_len_sub]) idx_0 += rep_len_sub stack = stack_cp # All elements of datetimes must be unique if len(datetimes) != len(set(datetimes)): raise Exception('SCTSRTs cannot be duplicated') # There can only be an EM gain in the data used to calibrate K-gain if len(set(em_gains)) != 1: raise Exception('There can only be one commanded EM gain when calibrating K-Gain') if np.any(np.array(gains) < 1): raise Exception('Actual EM gains must be greater than or equal to 1') # When measuring k_gain, there can only be one gain for all exposure times actual_gain = np.nanmean(gains) # not actually used in k gain calibration since frames already gain-divided return stack, mean_frame_stack, actual_gain