# calibrate nonlin
import io
import os
import warnings
import numpy as np
import pandas as pd
from pathlib import Path
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
from statsmodels.nonparametric.smoothers_lowess import lowess
try:
from numpy.exceptions import RankWarning
except:
from numpy import RankWarning
from corgidrp import check
import corgidrp.data as data
# Dictionary with constant non-linearity calibration parameters
[docs]
nonlin_params_default = {
# ROI constants
'rowroi1': 305,
'rowroi2': 736,
'colroi1': 1385,
'colroi2': 1846,
# background ROIs
'rowback11': 20,
'rowback12': 301,
'rowback21': 740,
'rowback22': 1001,
'colback11': 1200,
'colback12': 2001,
'colback21': 1200,
'colback22': 2001,
# minimum exposure time, s
'min_exp_time': 0,
# histogram bin parameters; min_bin is in DN
'num_bins': 50,
'min_bin': 200,
# factor to mutiply bin_edge values when making mask
'min_mask_factor': 1.1,
}
[docs]
def check_nonlin_params(nonlin_params):
""" Checks integrity of kgain parameters in the dictionary nonlin_params.
Args:
nonlin_params (dict): Dictionary of parameters used for calibrating nonlinearity.
"""
if 'rowroi1' not in nonlin_params:
raise ValueError('Missing parameter: rowroi1.')
if 'rowroi2' not in nonlin_params:
raise ValueError('Missing parameter: rowroi2.')
if 'colroi1' not in nonlin_params:
raise ValueError('Missing parameter: colroi1.')
if 'colroi2' not in nonlin_params:
raise ValueError('Missing parameter: colroi2.')
if 'rowback11' not in nonlin_params:
raise ValueError('Missing parameter: rowback11.')
if 'rowback12' not in nonlin_params:
raise ValueError('Missing parameter: rowback12.')
if 'rowback21' not in nonlin_params:
raise ValueError('Missing parameter: rowback21.')
if 'rowback22' not in nonlin_params:
raise ValueError('Missing parameter: rowback22.')
if 'colback11' not in nonlin_params:
raise ValueError('Missing parameter: colback11.')
if 'colback12' not in nonlin_params:
raise ValueError('Missing parameter: colback12.')
if 'colback21' not in nonlin_params:
raise ValueError('Missing parameter: colback21.')
if 'colback22' not in nonlin_params:
raise ValueError('Missing parameter: colback22.')
if 'min_exp_time' not in nonlin_params:
raise ValueError('Missing parameter: min_exp_time.')
if 'num_bins' not in nonlin_params:
raise ValueError('Missing parameter: num_bins.')
if 'min_bin' not in nonlin_params:
raise ValueError('Missing parameter: min_bin.')
if 'min_mask_factor' not in nonlin_params:
raise ValueError('Missing parameter: min_mask_factor.')
if not isinstance(nonlin_params['rowroi1'], (float, int)):
raise TypeError('rowroi1 is not a number')
if not isinstance(nonlin_params['rowroi2'], (float, int)):
raise TypeError('rowroi2 is not a number')
if not isinstance(nonlin_params['colroi1'], (float, int)):
raise TypeError('colroi1 is not a number')
if not isinstance(nonlin_params['colroi2'], (float, int)):
raise TypeError('colroi2 is not a number')
if not isinstance(nonlin_params['rowback11'], (float, int)):
raise TypeError('rowback11 is not a number')
if not isinstance(nonlin_params['rowback12'], (float, int)):
raise TypeError('rowback12 is not a number')
if not isinstance(nonlin_params['rowback21'], (float, int)):
raise TypeError('rowback21 is not a number')
if not isinstance(nonlin_params['rowback22'], (float, int)):
raise TypeError('rowback22 is not a number')
if not isinstance(nonlin_params['colback11'], (float, int)):
raise TypeError('colback11 is not a number')
if not isinstance(nonlin_params['colback12'], (float, int)):
raise TypeError('colback12 is not a number')
if not isinstance(nonlin_params['colback21'], (float, int)):
raise TypeError('colback21 is not a number')
if not isinstance(nonlin_params['colback22'], (float, int)):
raise TypeError('colback22 is not a number')
if not isinstance(nonlin_params['min_exp_time'], (float, int)):
raise TypeError('min_exp_time is not a number')
if not isinstance(nonlin_params['num_bins'], (float, int)):
raise TypeError('num_bins is not a number')
if not isinstance(nonlin_params['min_bin'], (float, int)):
raise TypeError('min_bin is not a number')
if not isinstance(nonlin_params['min_mask_factor'], (float, int)):
raise TypeError('min_mask_factor is not a number')
[docs]
class CalNonlinException(Exception):
"""Exception class for calibrate_nonlin."""
[docs]
def calibrate_nonlin(dataset_nl,
n_cal=20, n_mean=30, norm_val = 2500, min_write = 800.0,
max_write = 10000.0,
lowess_frac = 0.1, rms_low_limit = 0.004, rms_upp_limit = 0.2,
pfit_upp_cutoff1 = -2, pfit_upp_cutoff2 = -3,
pfit_low_cutoff1 = 2, pfit_low_cutoff2 = 1,
make_plot=False, plot_outdir='figures', show_plot=False,
verbose=False, nonlin_params=None, apply_dq = True):
"""
Function that derives the non-linearity calibration table for a set of DN
and EM values.
Args:
dataset_nl (corgidrp.Dataset): The frames in the dataset are
bias-subtracted. The dataset contains frames belonging to two different
sets -- Mean frame, a large array of unity gain frames, and set with
non-unity gain frames.
Mean frame -- Unity gain frames with constant exposure time. These frames
are used to create a mean pupil image. The mean frame is used to select
pixels in each frame of the large array of unity gain frames (see next)
to calculate its mean signal. In general, it is expected that at least
30 frames or more will be taken for this set. In TVAC, 30 frames, each
with an exposure time of 5.0 sec were taken.
Large array of unity gain frames: Set of unity gain frames with subsets
of equal exposure times. Data for each subset should be taken sequentially:
Each subset must have at least 5 frames. All frames for a subset are taken
before moving to the next subset. Two of the subsets have the same (repeated)
exposure time. These two subsets are not contiguous: The first subset is
taken near the start of the data collection and the second one is taken
at the end of the data collection (see TVAC example below). The mean
signal of these two subsets is used to correct for illumination
brightness/sensor sensitivity drifts for all the frames in the whole set,
depending on when the frames were taken. There should be no other repeated
exposure time among the subsets. In TVAC, a total of 110 frames were taken
within this category. The 110 frames consisted of 22 subsets, each with
5 frames. All 5 frames had the same exposure time. The exposure times in
TVAC in seconds were, each repeated 5 times to collect 5 frames in each
subset -- 0.077, 0.770, 1.538, 2.308, 3.077, 3.846, 4.615, 5.385, 6.154,
6.923, 7.692, 8.462, 9.231, 10.000, 11.538, 10.769, 12.308, 13.077,
13.846, 14.615, 15.385, and 1.538 (again).
Set with non-unity gain frames: a set of subsets of frames. All frames
in each subset have a unique, non-unity EM gain. For instance, in TVAC,
11 subsets were considered with EM values (EMGAIN_C): 1.65, 5.24, 8.60,
16.70, 27.50, 45.26, 87.50, 144.10, 237.26, 458.70 and 584.40. These
correspond to a range of actual EM gains from about 2 to 7000. Each subset
collects the same number of frames, which is at least 20 frames. In TVAC,
each non-unity EM value had 22 frames. In each subset, there are two
repeated exposure times: one near the start of the data collection and
one at the very end. The exposure times of the frames in each EM subset
do not need to be the same. For EM=1.65, the values of the exposure times
in seconds were: 0.076, 0.758, 1.515, 2.273, 3.031, 3.789, 4.546, 5.304,
6.062, 6.820, 7.577, 8.335, 9.093, 9.851, 10.608, 11.366, 12.124, 12.881,
13.639, 14.397, 15.155, and 1.515 (repeated). And for EM=5.24, the 22
values of the exposure times in seconds were: 0.070, 0.704, 1.408, 2.112,
2.816, 3.520, 4.225, 4.929, 5.633, 6.337, 7.041, 7.745, 8.449, 9.153,
9.857, 10.561, 11.265, 11.969, 12.674, 13.378, 14.082, and 1.408 (repeated).
n_cal (int):
Minimum number of frames per sub-stack used to calibrate Non-Linearity. The default
value is 20.
n_mean (int):
Minimum number of frames used to generate the mean frame. The default value
is 30.
norm_val (int): (Optional) Value in DN to normalize the nonlinearity values to.
Must be greater than 0 and must be divisible by 20 without remainder.
(1500 to 3000 recommended).
min_write (float): (Optional) Minimum mean value in DN to output in
nonlin. (800.0 recommended)
max_write (float): (Optional) Maximum mean value in DN to output in
nonlin. (10000.0 recommended)
lowess_frac (float): (Optional) factor to use in lowess smoothing function,
larger is smoother
rms_low_limit (float): (Optional) rms relative error selection limits for
linear fit. Lower limit.
rms_upp_limit (float): (Optional) rms relative error selection limits for
linear fit. Upper limit. rms_upp_limit must be greater than rms_low_limit.
pfit_upp_cutoff1 (int): (Optional) polyfit upper cutoff. The following limits were
determined with simulated frames. If rms_low_limit < rms_y_rel_err < rms_upp_limit,
this is the upper value applied to select the data to be fitted.
pfit_upp_cutoff2 (int): (Optional) polyfit upper cutoff. The following limits were
determined with simulated frames. If rms_y_rel_err >= rms_upp_limit,
this is the upper value applied to select the data to be fitted.
pfit_low_cutoff1 (int): (Optional) polyfit upper cutoff. The following limits were
determined with simulated frames. If rms_low_limit < rms_y_rel_err < rms_upp_limit,
this is the lower value applied to select the data to be fitted.
pfit_low_cutoff2 (int): (Optional) polyfit upper cutoff. The following limits were
determined with simulated frames. If rms_y_rel_err >= rms_upp_limit,
this is the lower value applied to select the data to be fitted.
make_plot (bool): (Optional) generate and store plots. Default is True.
plot_outdir (str): (Optional) Output directory to store figues. Default is
'figures'. The default directory is not tracked by git.
show_plot (bool): (Optional) display the plots. Default is False.
verbose (bool): (Optional) display various diagnostic print messages.
Default is False.
nonlin_params (dict): (Optional) Dictionary of row and col specifications
for the region of interest (indicated by 'roi') where the frame is illuminated and for
two background regions (indicated by 'back1' and 'back2') where the frame is not illuminated.
Must contain 'rowroi1','rowroi2','colroi1','colroi2','rowback11','rowback12',
'rowback21','rowback22','colback11','colback12','colback21',and 'colback22'.
The 'roi' needs one square region specified, and 'back' needs two square regions,
where a '1' ending indicates the smaller of two values, and a '2' ending indicates the larger
of two values. The coordinates of each square are specified by matching
up as follows: (rowroi1, colroi1), (rowroi1, colroi2), (rowback11, colback11),
(rowback11, colback12), etc. Defaults to nonlin_params_default specified in this file.
apply_dq (bool): consider the dq mask (from cosmic ray detection) or not
Returns:
nonlin_arr (NonLinearityCalibration): 2-D array with nonlinearity values
for input signal level (DN) in rows and EM gain values in columns. The
input signal in DN is the first column. Signal values start with min_write
and run through max_write in steps of 20 DN.
"""
if nonlin_params is None:
nonlin_params = nonlin_params_default
check_nonlin_params(nonlin_params)
if dataset_nl[0].data is None:
ram_heavy = True
else:
ram_heavy = False
# cast dataset objects into np arrays and retrieve aux information
cal_list, mean_frame_list, exp_arr, datetime_arr, len_list, actual_gain_arr, datetimes_sort_inds, _ = \
nonlin_kgain_dataset_2_stack(dataset_nl, apply_dq = apply_dq, cal_type='nonlin')
if ram_heavy:
cal_arr = np.array(cal_list)[datetimes_sort_inds]
else:
cal_arr = np.vstack(cal_list)[datetimes_sort_inds]
mean_frame_arr = np.stack(mean_frame_list)
# Get relevant constants
rowroi1 = nonlin_params['rowroi1']
rowroi2 = nonlin_params['rowroi2']
colroi1 = nonlin_params['colroi1']
colroi2 = nonlin_params['colroi2']
rowback11 = nonlin_params['rowback11']
rowback12 = nonlin_params['rowback12']
rowback21 = nonlin_params['rowback21']
rowback22 = nonlin_params['rowback22']
colback11 = nonlin_params['colback11']
colback12 = nonlin_params['colback12']
colback21 = nonlin_params['colback21']
colback22 = nonlin_params['colback22']
min_exp_time = nonlin_params['min_exp_time']
num_bins = nonlin_params['num_bins']
min_bin = nonlin_params['min_bin']
min_mask_factor = nonlin_params['min_mask_factor']
if type(cal_arr) != np.ndarray:
raise TypeError('cal_arr must be an ndarray.')
if not ram_heavy: # if RAM-heavy, this is a set of filepaths, not a 3-D array
if np.ndim(cal_arr) != 3:
raise CalNonlinException('cal_arr must be 3-D')
if len(len_list) < 1:
raise CalNonlinException('Number of elements in len_list must '
'be greater than or equal to 1.')
if np.sum(len_list) != len(cal_arr):
raise CalNonlinException('Number of sub-stacks in cal_arr must '
'equal the sum of the elements in len_list')
# cal_arr must have at least 20 frames for each EM gain
if np.any(np.array(len_list) < n_cal):
raise Exception(f'cal_arr must have at least {n_cal} frames for each EM value')
if len(np.unique(datetime_arr)) != len(datetime_arr):
raise CalNonlinException('All elements of datetime_arr must be unique.')
for g_index in range(len(len_list)):
# Define the start and stop indices
start_index = int(np.sum(len_list[0:g_index]))
stop_index = start_index + len_list[g_index]
# Convert camera times to datetime objects
ctim_strings = datetime_arr[start_index:stop_index]
ctim_datetime = pd.to_datetime(ctim_strings, errors='coerce')
# Check if the array is time-ordered in increasing order
is_increasing = np.all(ctim_datetime[:-1] <= ctim_datetime[1:])
if not is_increasing:
raise CalNonlinException('Elements of datetime_arr must be '
'in increasing time order for each EM gain value.')
if type(mean_frame_arr) != np.ndarray:
raise TypeError('mean_frame_arr must be an ndarray.')
if not ram_heavy: # if RAM-heavy, this is a set of filepaths, not a 3-D array
if np.ndim(mean_frame_arr) != 3:
raise CalNonlinException('mean_frame_arr must be 3-D (i.e., a stack of '
'2-D sub-stacks')
# mean_frame_arr must have at least 30 frames
if len(mean_frame_arr) < n_mean:
raise CalNonlinException(f'Number of frames in mean_frame_arr must '
'be at least {n_mean}.')
check.real_array(exp_arr, 'exp_arr', TypeError)
check.oneD_array(exp_arr, 'exp_arr', TypeError)
if (exp_arr <= min_exp_time).any():
raise CalNonlinException('Each element of exp_arr must be '
' greater than min_exp_time.')
# check to see if there is at least one set of exposure times with length different from that of the others
index = 0
r_flag = True
for x in range(len(len_list)):
temp = np.copy(exp_arr[index:index+len_list[x]])
# Unique counts of exposure times
_, u_counts = np.unique(temp, return_counts=True)
# Check if all elements are the same
all_elements_same = np.all(u_counts == u_counts[0])
if all_elements_same == True:
r_flag = False
index = index + len_list[x]
# check to see that there is a repeated exposure time (e.g., at least one set in between 2 sets of the same exposure time)
index = 0
repeated_lens = [] # to be used later, to know how to split up the repeated sets
for x in range(len(len_list)):
temp = np.copy(exp_arr[index:index+len_list[x]])
# first condition below: frames are already time-ordered, so if there is non-monotonicity, there is repitition, which we want
# r_flag condition: merely a set with length longer than the others (which TVAC code has); this gives a "way out" if no repeated set after other sets
if np.all(np.diff(temp) >= 0) and not r_flag:
raise CalNonlinException('Each substack of cal_arr must have a '
'group of frames with a repeated exposure time.')
if np.all(np.diff(temp) < 0):
repeat_ind = np.where(np.diff(temp) < 0)[0][0]
ending = np.where(np.diff(temp)[repeat_ind+1:] != 0)[0]
if len(ending) == 0: # repeated set is last one in time
end_ind = None
else:
end_ind = ending[0]+1
repeated_lens.append(len(np.diff(temp)[repeat_ind:end_ind]))
index = index + len_list[x]
if len(len_list) != len(actual_gain_arr):
raise CalNonlinException('Length of actual_gain_arr be the same as the '
'length of len_list.')
if sum(1 for number in actual_gain_arr if number < 1) != 0:
raise CalNonlinException('Each element of actual_gain_arr must be greater '
'than or equal to 1.')
check.real_array(actual_gain_arr, 'actual_gain_arr', TypeError)
check.oneD_array(actual_gain_arr, 'actual_gain_arr', TypeError)
check.positive_scalar_integer(norm_val, 'norm_val', TypeError)
if np.mod(norm_val, 20) !=0:
raise CalNonlinException('norm_val must be divisible by 20.')
check.real_positive_scalar(min_write, 'min_write', TypeError)
check.real_positive_scalar(max_write, 'max_write', TypeError)
if min_write >= max_write:
raise CalNonlinException('max_write must be greater than min_write')
if (norm_val < min_write) or (norm_val > max_write):
raise CalNonlinException('norm_val must be between min_write and '
'max_write.')
check.real_nonnegative_scalar(rms_low_limit, 'rms_low_limit', TypeError)
check.real_nonnegative_scalar(rms_upp_limit, 'rms_upp_limit', TypeError)
if rms_low_limit >= rms_upp_limit:
raise CalNonlinException('rms_upp_limit must be greater than rms_low_limit')
if not isinstance(lowess_frac, (float, int)):
raise TypeError('lowess_frac is not a number')
if not isinstance(rms_low_limit, (float, int)):
raise TypeError('rms_low_limit is not a number')
if not isinstance(rms_upp_limit, (float, int)):
raise TypeError('rms_upp_limit is not a number')
if not isinstance(pfit_upp_cutoff1, (float, int)):
raise TypeError('pfit_upp_cutoff1 is not a number')
if not isinstance(pfit_upp_cutoff2, (float, int)):
raise TypeError('pfit_upp_cutoff2 is not a number')
if not isinstance(pfit_low_cutoff1, (float, int)):
raise TypeError('pfit_low_cutoff1 is not a number')
if not isinstance(pfit_low_cutoff2, (float, int)):
raise TypeError('pfit_low_cutoff2 is not a number')
if make_plot is True:
# Avoid issues with importing matplotlib on headless servers without GUI
# support without proper configuration
import matplotlib.pyplot as plt
# Output directory
if os.path.exists(plot_outdir) is False:
os.mkdir(plot_outdir)
if verbose:
print('Output directory for figures created in ', os.getcwd())
######################### start of main code #############################
# Define pixel ROIs
rowroi = list(range(rowroi1, rowroi2))
colroi = list(range(colroi1, colroi2))
# Background subtraction regions
rowback1 = list(range(rowback11, rowback12))
rowback2 = list(range(rowback21, rowback22))
colback1 = list(range(colback11, colback12))
colback2 = list(range(colback21, colback22))
####################### create good_mean_frame ###################
if ram_heavy:
temp_frame = data.Image(mean_frame_arr[0]) # just to get shape
frame_shape = temp_frame.data.shape
good_mean_frame = np.zeros(frame_shape)
for filepath in mean_frame_arr:
temp_frame = data.Image(filepath)
if apply_dq:
bad = np.where(temp_frame.dq > 0)
temp_frame.data[bad] = np.nan
good_mean_frame += temp_frame.data
good_mean_frame = good_mean_frame / len(mean_frame_arr)
else:
nrow = len(mean_frame_arr[0])
ncol = len(mean_frame_arr[0][0])
good_mean_frame = np.zeros((nrow, ncol))
nFrames = len(mean_frame_arr)
good_mean_frame = good_mean_frame / nFrames
mean_frame_index = 0
# Loop over the mean_frame_arr frames
for i in range(nFrames):
frame = mean_frame_arr[i]
# Add this frame to the cumulative good_mean_frame
good_mean_frame += frame
mean_frame_index += 1
# Calculate the average of the frames if required
if mean_frame_index > 0:
good_mean_frame /= mean_frame_index
# plot, if requested
if make_plot:
fname = 'non_lin_good_frame'
# Slice the good_mean_frame array
frame_slice = good_mean_frame[np.ix_(rowroi, colroi)]
# Create a figure and plot the sliced frame
plt.figure()
# 'viridis' is a common colormap
plt.imshow(frame_slice, aspect='equal', cmap='viridis')
plt.colorbar()
plt.title('Good quality mean frame')
plt.savefig(f'{plot_outdir}/{fname}')
if verbose:
print(f'Figure {fname} stored in {plot_outdir}')
if show_plot:
plt.show()
plt.close()
# Convert to numpy arrays if they are not already
rowroi = np.array(rowroi)
colroi = np.array(colroi)
if make_plot:
fname = 'non_lin_mean_frame_histogram'
# Plot a histogram of the values within the specified ROI
roi_values = good_mean_frame[rowroi[:, None], colroi]
plt.figure()
# 'auto' lets matplotlib decide the number of bins
plt.hist(roi_values.flatten(), bins='auto', log=True)
plt.gca().set_yscale('log')
plt.gca().set_xscale('log')
plt.title('Histogram of Mean Frame in ROI')
plt.savefig(f'{plot_outdir}/{fname}')
if verbose:
print(f'Figure {fname} stored in {plot_outdir}')
if show_plot:
plt.show()
plt.close()
# find minimum in histogram
# 1000-1500 DN recommended when the peak of histogram of
# "good_mean_frame" is between 2000 and 4000 DN)
roi_values = good_mean_frame[rowroi[:, None], colroi]
hst_counts, hist_edges = np.histogram(roi_values.flatten(),bins=num_bins)
# range above some value
above_range = (hist_edges[:-1] >= min_bin)
# Filter the counts and bin_edges arrays
filtered_counts_above = hst_counts[above_range]
filtered_bin_edges_above = hist_edges[:-1][above_range]
# Find the index of the maximum count within the filtered range
max_count_index_above_range = np.argmax(filtered_counts_above)
# Get the corresponding bin edge
max_edge_value = filtered_bin_edges_above[max_count_index_above_range]
# Find the indices of the bins that fall within the specified range
within_range = (hist_edges[:-1] >= min_bin) & (hist_edges[:-1] <= max_edge_value)
# Filter the counts and bin_edges arrays
filtered_counts = hst_counts[within_range]
filtered_bin_edges = hist_edges[:-1][within_range]
# Find the index of the minimum count within the filtered range
min_count_index_within_range = np.argmin(filtered_counts)
# Get the corresponding bin edge value and increase by min_mask_factor
min_mask = min_mask_factor*filtered_bin_edges[min_count_index_within_range]
# Create the mask
mask = np.where(good_mean_frame < min_mask, 0, 1)
# plot, if requested
if make_plot:
fname = 'non_lin_mask'
# Plot the mask
plt.figure()
plt.imshow(mask, cmap='gray')
plt.title('Mask')
plt.colorbar()
plt.savefig(f'{plot_outdir}/{fname}')
if verbose:
print(f'Figure {fname} stored in {plot_outdir}')
if show_plot:
plt.show()
plt.close()
fname = 'non_lin_mean_frame'
# Plot the mean frame
plt.figure()
# 'viridis' is a good default color map
plt.imshow(good_mean_frame, cmap='viridis')
plt.title('Mean Frame')
plt.colorbar()
plt.close()
# initialize arrays for nonlin results table
nonlin = []
######################## loop over em gain values #########################
for gain_index in range(len(len_list)):
start_index = int(np.sum(len_list[0:gain_index]))
stop_index = start_index + len_list[gain_index]
# Convert camera times to datetime objects
ctime_strings = datetime_arr[start_index:stop_index]
ctime_datetime = pd.to_datetime(ctime_strings, errors='coerce')
# Select exp times for this em gain
exp_em = exp_arr[start_index:stop_index]
# select frames for this em gain
full_flst = cal_arr[start_index:stop_index]
# Unique exposure times and their counts
exposure_strings_list, counts = np.unique(exp_em, return_counts=True)
# Grouping exposures and finding the max count
max_count_index = np.argmax(counts)
repeat_exp = exposure_strings_list[max_count_index] # Exposure time of repeated frames
# Calculate mean time differences as aid in illumination drift corrections
group_mean_time = []
first_flag = False
for t0 in exposure_strings_list:
idx = np.where(exp_em == t0)[0]
if t0 != repeat_exp:
del_s = (ctime_datetime[idx] - ctime_datetime[0]).total_seconds()
group_mean_time.append(np.mean(del_s))
elif t0 == repeat_exp and not first_flag:
# NOTE works fine for same number of frames per exposure time for a given EM gain (which is the observation plan, and this
# is what the TVAC code has), but for more general case, use repeated_lens[gain_index] instead for the number of frames in the 2nd (repeated) set
idx_2 = len(idx) // 2
start_time_repeated = ctime_datetime[idx[0]]
end_time_repeated = ctime_datetime[idx[idx_2]]
del_s2 = (ctime_datetime[idx[:idx_2]] - ctime_datetime[0]).total_seconds()
group_mean_time.append(np.mean(del_s2))
first_flag = True
if verbose is True:
print(group_mean_time)
print('Time between repeated exposure frames for EM gain = ', actual_gain_arr[gain_index],': ',
(end_time_repeated - start_time_repeated).total_seconds(), 'seconds')
# Additional setup
mean_signal = []
repeat_flag = 0
filtered_exposure_times = []
for jj in range(len(exposure_strings_list)):
current_exposure_time = exposure_strings_list[jj]
if current_exposure_time >= min_exp_time:
if current_exposure_time == repeat_exp:
repeat_flag = 1
# Filtering frames based on the current exposure time
selected_files = [
full_flst[idx] for idx, exp_time in enumerate(exp_em) if exp_time == current_exposure_time
]
filtered_exposure_times.append(current_exposure_time)
# Initialize for processing of files
mean_frame_index = 0
frame_count = []
frame_mean = []
if not repeat_flag:
for iframe in range(len(selected_files)):
if ram_heavy:
image_1 = data.Image(selected_files[iframe])
frame_1 = image_1.data
if apply_dq: # in ram_heavy case, this hasn't been applied yet
bad = np.where(image_1.dq > 0)
frame_1[bad] = np.nan
else:
frame_1 = selected_files[iframe]
frame_1 = frame_1.astype(np.float64)
# Subtract background
frame_1_back1 = np.nanmean(frame_1[rowback1[0]:rowback1[-1]+1,
colback1[0]:colback1[-1]+1])
frame_1_back2 = np.nanmean(frame_1[rowback2[0]:rowback2[-1]+1,
colback2[0]:colback2[-1]+1])
frame_back = (frame_1_back1 + frame_1_back2) / 2
# Calculate counts and mean in the ROI after background subtraction
roi_frame = frame_1[rowroi[0]:rowroi[-1]+1, colroi[0]:colroi[-1]+1] - frame_back
frame_count0 = np.sum(roi_frame)
frame_mean0 = frame_1 - frame_back
# Apply mask and calculate the positive mean
frame_mean0 *= mask
positive_means = frame_mean0[frame_mean0 > 0]
frame_mean1 = np.nanmean(positive_means) if positive_means.size > 0 else np.nan
frame_count.append(frame_count0)
frame_mean.append(frame_mean1)
mean_frame_index += 1
mean_signal.append(np.nanmean(frame_mean))
elif repeat_flag:
# for repeated exposure frames, split into the first half/set
# and the second half/set
# NOTE works fine for same number of frames per exposure time for a given EM gain (which is the observation plan, and this
# is what the TVAC code has), but for more general case, use repeated_lens[gain_index] instead for the number of frames in the 2nd (repeated) set
first_half = len(selected_files) // 2
for i in range(first_half):
if ram_heavy:
image_1 = data.Image(selected_files[i])
frame_1 = image_1.data
if apply_dq: # in ram_heavy case, this hasn't been applied yet
bad = np.where(image_1.dq > 0)
frame_1[bad] = np.nan
else:
frame_1 = selected_files[i]
frame_1 = frame_1.astype(np.float64)
# Subtract background
frame_1_back1 = np.nanmean(frame_1[rowback1[0]:rowback1[-1]+1,
colback1[0]:colback1[-1]+1])
frame_1_back2 = np.nanmean(frame_1[rowback2[0]:rowback2[-1]+1,
colback2[0]:colback2[-1]+1])
frame_back = (frame_1_back1 + frame_1_back2) / 2
# Calculate counts and mean in the ROI after background subtraction
roi_frame = frame_1[rowroi[0]:rowroi[-1]+1,
colroi[0]:colroi[-1]+1] - frame_back
frame_count0 = np.sum(roi_frame)
frame_mean0 = frame_1 - frame_back
# Apply mask and calculate the positive mean
frame_mean0 *= mask
positive_means = frame_mean0[frame_mean0 > 0]
frame_mean1 = np.nanmean(positive_means) if positive_means.size > 0 else np.nan
frame_count.append(frame_count0)
frame_mean.append(frame_mean1)
mean_frame_index += 1
mean_signal.append(np.nanmean(frame_mean))
repeat1_mean_signal = np.nanmean(frame_mean)
second_half = len(selected_files)
for i in range(first_half + 1, second_half):
if ram_heavy:
image_1 = data.Image(selected_files[i])
frame_1 = image_1.data
if apply_dq: # in ram_heavy case, this hasn't been applied yet
bad = np.where(image_1.dq > 0)
frame_1[bad] = np.nan
else:
frame_1 = selected_files[i]
frame_1 = frame_1.astype(np.float64)
# Subtract background
frame_1_back1 = np.nanmean(frame_1[rowback1[0]:rowback1[-1]+1, colback1[0]:colback1[-1]+1])
frame_1_back2 = np.nanmean(frame_1[rowback2[0]:rowback2[-1]+1, colback2[0]:colback2[-1]+1])
frame_back = (frame_1_back1 + frame_1_back2) / 2
# Calculate counts and mean
roi_frame = frame_1[rowroi[0]:rowroi[-1]+1, colroi[0]:colroi[-1]+1] - frame_back
frame_count0 = np.sum(roi_frame)
frame_mean0 = frame_1 - frame_back
frame_mean0 *= mask
positive_means = frame_mean0[frame_mean0 > 0]
frame_mean1 = np.nanmean(positive_means) if positive_means.size > 0 else np.nan
frame_count.append(frame_count0)
frame_mean.append(frame_mean1)
mean_frame_index += 1
# Calculate the mean signal from the second half of the processing
repeat2_mean_signal = np.nanmean(frame_mean)
repeat_flag = 0 # Reset flag
# Calculate the time deltas in seconds from the first frame
delta_ctimes_s = (ctime_datetime - ctime_datetime[0]).total_seconds()
# Make sure delta_ctimes_s is a pandas Series with numeric values
delta_ctimes_s = pd.Series(delta_ctimes_s, index=ctime_datetime)
# Calculate the difference in signals
delta_signal = repeat2_mean_signal - repeat1_mean_signal
# Assuming all_exposure_strings and repeat_exp are already defined
# Find indices of the frames where the exposure time matches repeat_exp
repeat_times_idx = np.where(exp_em == repeat_exp)[0] # np.where returns a tuple, extract first element
# Calculate the mean times for the first and second halves of these indices
# NOTE works fine for same number of frames per exposure time for a given EM gain (which is the observation plan, and this
# is what the TVAC code has), but for more general case, use repeated_lens[gain_index] instead for the number of frames in the 2nd (repeated) set
first_half = len(repeat_times_idx) // 2
first_half_mean_time = delta_ctimes_s.iloc[repeat_times_idx[:first_half]].mean()
second_half = len(repeat_times_idx)
second_half_mean_time = delta_ctimes_s.iloc[repeat_times_idx[first_half:second_half]].mean()
if verbose is True:
print("First half mean time:", first_half_mean_time)
print("Second half mean time:", second_half_mean_time)
# Calculate DN/s
illum_slope = delta_signal / (second_half_mean_time - first_half_mean_time)
# Calculate DN
illum_inter = repeat1_mean_signal - illum_slope * first_half_mean_time
# Adjust observations based on calculated slope and intercept
illum_obs = (group_mean_time - group_mean_time[0]) * illum_slope + illum_inter
# Correct the illumination observations
illum_corr = illum_obs / illum_obs[0]
# Correct the mean signal
#illum_cor = np.ones(len(illum_corr))
corr_mean_signal = mean_signal / illum_corr
# Sort arrays by exposure time
filt_exp_times_sorted, I = np.sort(filtered_exposure_times), np.argsort(filtered_exposure_times)
corr_mean_signal_sorted = np.array(corr_mean_signal)[I]
if make_plot:
fname = 'non_lin_signal_vs_exp'
# Plotting the corrected mean signal against sorted exposure times
plt.figure()
plt.plot(filt_exp_times_sorted, corr_mean_signal_sorted, 'o', label='Data Points')
plt.title('Signal versus exposure time')
plt.xlabel('Exposure time (s)')
plt.ylabel('Signal (DN)')
# Fit a polynomial to selected points (excluding some points)
p0 = np.polyfit(filt_exp_times_sorted, corr_mean_signal_sorted, 1)
y0 = np.polyval(p0, filt_exp_times_sorted)
y_rel_err = np.abs((corr_mean_signal_sorted - y0)/corr_mean_signal_sorted)
rms_y_rel_err = np.sqrt(np.mean(y_rel_err**2))
# NOTE: the following limits were determined with simulated frames
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=RankWarning)
if rms_y_rel_err < rms_low_limit:
p1 = np.polyfit(filt_exp_times_sorted, corr_mean_signal_sorted, 1)
elif (rms_y_rel_err >= rms_low_limit) and (rms_y_rel_err < rms_upp_limit):
p1 = np.polyfit(filt_exp_times_sorted[pfit_low_cutoff1:pfit_upp_cutoff1],
corr_mean_signal_sorted[pfit_low_cutoff1:pfit_upp_cutoff1], 1)
else:
p1 = np.polyfit(filt_exp_times_sorted[pfit_low_cutoff2:pfit_upp_cutoff2],
corr_mean_signal_sorted[pfit_low_cutoff2:pfit_upp_cutoff2], 1)
y1 = np.polyval(p1, filt_exp_times_sorted)
if make_plot:
fname = 'non_lin_fit'
# Plot the fitted line
plt.plot(filt_exp_times_sorted, y1, label='Fitted Line')
# Show the plot with legend
plt.legend()
plt.savefig(f'{plot_outdir}/{fname}')
if verbose:
print(f'Figure {fname} stored in {plot_outdir}')
if show_plot:
plt.show()
plt.close()
# Calculating relative gain
rel_gain = corr_mean_signal_sorted / y1
# Smoothing the relative gain data; larger 'lowess_frac' gives smoother curve
rel_gain_smoothed = lowess(rel_gain,
corr_mean_signal_sorted, frac=lowess_frac)[:, 1]
# find the min/max values of corrected measured means and append array
temp_min = np.nanmin(corr_mean_signal_sorted)
temp_max = np.nanmax(corr_mean_signal_sorted)
if make_plot:
# Plotting Signal vs. Relative Gain
plt.figure()
plt.plot(corr_mean_signal_sorted, rel_gain, 'o', label='Original Data')
plt.ylim([0.95, 1.05])
plt.xlim([1, 14000])
plt.axhline(1.0, linestyle='--', color='k', linewidth=1) # horizontal line at 1.0
plt.title('Signal/fit versus Signal')
plt.xlabel('Signal (DN)')
plt.ylabel('Relative gain')
# Plot the smoothed data
plt.plot(corr_mean_signal_sorted, rel_gain_smoothed, 'r-', label='Smoothed Data')
# Show legend and plot
plt.legend()
plt.savefig(f'{plot_outdir}/{fname}')
if verbose:
print(f'Figure {fname} stored in {plot_outdir}')
if show_plot:
plt.show()
plt.close()
# Generate evenly spaced values between 20 and 14000
mean_linspace = np.linspace(20, 14000, 1+int((14000-20)/20))
# Interpolate/extrapolate the relative gain values
interp_func = interp1d(corr_mean_signal_sorted,
rel_gain_smoothed, kind='linear', fill_value='extrapolate')
rel_gain_interp = interp_func(mean_linspace)
# Normalize the relative gain to the value at norm_val DN
# First, find the index for norm_val DN in mean_linspace
idxnorm = np.where(mean_linspace == norm_val)[0][0]
normconst = rel_gain_interp[idxnorm]
rel_gain_interp /= normconst
if (norm_val < temp_min) or (norm_val > temp_max):
print('norm_val is not between the minimum and maximum values '
'of the means for the current EM gain. Extrapolation '
'will be used for norm_val.')
if make_plot:
fname = 'non_lin_fit_norm_dn'
# Plotting Signal vs. Relative Gain normalized at norm_val DN
plt.figure()
plt.plot(corr_mean_signal_sorted, rel_gain / normconst, 'o', label='Original Data')
plt.ylim([0.95, 1.05])
plt.xlim([1, 14000])
plt.axhline(1.0, linestyle='--', color='k', linewidth=1) # horizontal line at 1.0
plt.title(f'Signal/fit versus Signal (norm @ {norm_val} DN)')
plt.xlabel('Signal (DN)')
plt.ylabel('Relative gain')
# Plot the interpolated data
plt.plot(mean_linspace, rel_gain_interp, 'r-', label='Interpolated Data')
plt.legend()
plt.savefig(f'{plot_outdir}/{fname}')
if verbose:
print(f'Figure {fname} stored in {plot_outdir}')
if show_plot:
plt.show()
plt.close()
# NOTE: nonlinearity is equal to 1/rel_gain
# multiply raw data by 1/rel_gain to correct for nonlinearity
temp = 1/rel_gain_interp
nonlin.append(temp)
# prepare nonlin array
nonlin_arr0 = np.transpose(np.array(nonlin))
# insert new column at the start of nonlin_arr
nonlin_arr1 = np.insert(nonlin_arr0, 0, mean_linspace, axis=1)
# select rows that satisfy min/max limits
nonlin_arr2 = nonlin_arr1[nonlin_arr1[:, 0] >= min_write]
nonlin_arr3 = nonlin_arr2[nonlin_arr2[:, 0] <= max_write]
# See data.NonLinearityCalibration doc string for more details:
# [0, 1:]: Gain axis values
# [1:, 0]: "count" axis value
actual_gain_arr = np.insert(actual_gain_arr, 0, np.nan)
n_col = len(nonlin_arr3) + 1
n_row = len(actual_gain_arr)
nonlin_data=np.insert(nonlin_arr3, 0, actual_gain_arr).reshape(n_col,n_row)
# Return NonLinearity instance
invalid_nln_keywords = data.typical_cal_invalid_keywords + ['EXPTIME', 'EMGAIN_C', 'EMGAIN_A', 'KGAINPAR', 'HVCBIAS', 'KGAIN_ER']
# Remove specific keywords
for key in ['PROGNUM', 'EXECNUM', 'CAMPAIGN', 'SEGMENT', 'OBSNUM']:
if key in invalid_nln_keywords:
invalid_nln_keywords.remove(key)
prhdr, exthdr, errhdr, dqhdr = check.merge_headers(dataset_nl, invalid_keywords=invalid_nln_keywords)
exthdr['HISTORY'] = f"Non-linearity calibration derived from a set of frames on {exthdr['DATETIME']}"
# Just for the purpose of getting the instance created
nonlin = data.NonLinearityCalibration(nonlin_data,
prhdr, exthdr, dataset_nl)
return nonlin
[docs]
def nonlin_kgain_dataset_2_stack(dataset, apply_dq = True, cal_type='nonlin', dataset_copy=True):
"""
Casts the CORGIDRP Dataset object for non-linearity calibration into a stack
of numpy arrays sharing the same commanded gain value. It also returns the list of
unique EM values and set of exposure times used with each EM. Note: it also
performs a set of tests about the integrity of the data type and values in
the dataset.
Args:
dataset (corgidrp.Dataset): Dataset with a set of of EXCAM illuminated
pupil L1 SCI frames (counts in DN)
apply_dq (bool): consider the dq mask (from cosmic ray detection) or not
cal_type (str): If 'kgain', then sets of frames with the same exposure time for a given EM gain
are truncated so that each has the same number of frames. Otherwise (for the 'nonlin' case), there is no truncation.
dataset_copy (bool): flag indicating whether the input dataset will be preserved after this function is executed or not.
If False, the output dataset will be the input dataset modified, and
the input and output datasets will be identical. This is useful when
handling a large dataset and when the input dataset is not needed afterwards. Defaults to True.
Returns:
list of data arrays associated with each frame (just the filepaths if data is None for input dataset)
list of mean frames (just the filepaths if data is None for input dataset)
array of exposure times associated with each frame
array of datetimes associated with each frame
list with the number of frames with same EM gain
List of actual EM gains
array of indices for timestamp ordering
number of frames each set of frames with same exposure time truncated to for EM gain = 1 frames
"""
# Split Dataset
if dataset[0].data is None: #RAM-heavy mode
ram_heavy = True
dataset_cp = dataset
else:
if dataset_copy:
dataset_cp = dataset.copy()
else:
dataset_cp = dataset
ram_heavy = False
split = dataset_cp.split_dataset(exthdr_keywords=['EMGAIN_C'])
# Calibration data
stack = []
# Mean frame data
mean_frame_list = []
record_exp_time = True
# Exposure times
exp_times = []
# Datetimes
datetimes = []
# Size of each sub stack
len_sstack = []
# Record measured gain of each substack of calibration frames
gains = []
smallest_set_len = None
for idx_set, data_set in enumerate(split[0]):
obsname_dsets, obsname_vals = data_set.split_dataset(prihdr_keywords=['OBSNAME'])
cal_dsets = []
mnframe_ind = None
for i, v in enumerate(obsname_vals):
if v.upper()=='MNFRAME':
mnframe_ind = i
else:
cal_dsets.append(obsname_dsets[i])
# First layer (array of unique EM values)
stack_cp = []
len_cal_frames = 0
record_gain = True
if mnframe_ind is not None:
for frame in obsname_dsets[mnframe_ind]:
if apply_dq:
if ram_heavy:
pass # don't apply yet; apply later when loading in data
else:
bad = np.where(frame.dq > 0)
frame.data[bad] = np.nan
if record_exp_time:
exp_time_mean_frame = frame.ext_hdr['EXPTIME']
record_exp_time = False
if frame.ext_hdr['EXPTIME'] != exp_time_mean_frame:
raise Exception('Frames used to build the mean frame must have the same exposure time')
if frame.ext_hdr['EMGAIN_C'] != 1:
raise Exception('The commanded gain used to build the mean frame must be unity')
if ram_heavy:
mean_frame_list.append(frame.filepath)
else:
mean_frame_list.append(frame.data)
for cal_dset in cal_dsets:
# each of dsets has just one frame in it
dsets, vals = cal_dset.split_dataset(exthdr_keywords=['SCTSRT','EXPTIME'])
smallest_set_length = np.inf
sub = []
start_val = float(vals[0][1])
start_val_ind = 0
exptime_dsets = []
for val in vals:
if vals.index(val) == 0:
continue
if float(val[1]) == start_val:
pass
else:
exptime_dsets.append(dsets[start_val_ind:vals.index(val)])
start_val = float(val[1])
start_val_ind = vals.index(val)
# ending set not covered
exptime_dsets.append(dsets[start_val_ind:])
for i in exptime_dsets:
if len(i) < smallest_set_length:
smallest_set_length = len(i)
if cal_type == 'nonlin':
smallest_set_length = None # for nonlin, don't need to truncate exptime sets to be same length for a given EM gain
for i, exptime_dset_list in enumerate(exptime_dsets):
if ram_heavy:
sub = [dset.frames[0].filepath for dset in exptime_dset_list[:smallest_set_length]]
else:
sub = np.stack([dset.frames[0].data for dset in exptime_dset_list[:smallest_set_length]])
for exptime_dset in exptime_dset_list[:smallest_set_length]:
frame = exptime_dset.frames[0]
if not (frame.pri_hdr['OBSNAME'] == 'KGAIN' or
frame.pri_hdr['OBSNAME'] == 'NONLIN'):
raise Exception('OBSNAME can only be MNFRAME or NONLIN in non-linearity')
sctsrt = frame.ext_hdr['SCTSRT']
if isinstance(sctsrt, str) is False:
raise Exception('SCTSRT must be a string')
datetimes.append(sctsrt)
exp_time = frame.ext_hdr['EXPTIME']
if isinstance(exp_time, float) is False:
raise Exception('Exposure times must be float')
if exp_time <=0:
raise Exception('Exposure times must be positive')
exp_times.append(exp_time)
if record_gain:
try: # if EM gain measured directly from frame
gains.append(frame.ext_hdr['EMGAIN_M'])
except:
if frame.ext_hdr['EMGAIN_A'] > 0: # use applied EM gain if available
gains.append(frame.ext_hdr['EMGAIN_A'])
else: # use commanded gain otherwise
gains.append(frame.ext_hdr['EMGAIN_C'])
record_gain = False
if gains[-1] == 1:
smallest_set_len = smallest_set_length
if ram_heavy:
stack_cp += sub
else:
stack_cp.append(sub)
len_cal_frames += len(sub)
# Length of substack must be at least 1
if len(stack_cp) == 0:
raise Exception('Substacks must have at least one element')
else:
if ram_heavy:
stack += stack_cp
else:
stack.append(np.vstack(stack_cp))
len_sstack.append(len_cal_frames)
# All elements of datetimes must be unique
if len(datetimes) != len(set(datetimes)):
raise Exception('SCTSRTs cannot be duplicated')
# Every EM gain must be greater than or equal to 1
if np.any(np.array(split[1]) < 1):
raise Exception('Each set of frames categorized by commanded EM gains must be have 1 or more frames')
if np.any(np.array(gains) < 1):
raise Exception('Actual EM gains must be greater than or equal to 1')
# sort frames by time stamp for drift correction later
datetimes_sort_inds = np.argsort(datetimes)
return (stack, mean_frame_list, np.array(exp_times)[datetimes_sort_inds],
np.array(datetimes)[datetimes_sort_inds], len_sstack, np.array(gains), datetimes_sort_inds, smallest_set_len)