Source code for corgidrp.l2b_to_l3

import numpy as np
import astropy.wcs as wcs
from corgidrp.spec import read_cent_wave
from corgidrp import data
from corgidrp import check

# A file that holds the functions that transmogrify l2b data to l3 data 
import numpy as np

[docs] def create_wcs(input_dataset, astrom_calibration, offset=None): """ Create the WCS headers for the dataset. Args: input_dataset (corgidrp.data.Dataset): a dataset of Images (L2b-level) astrom_calibration (corgidrp.data.AstrometricCalibration): an astrometric calibration file for the input dataset offset (optional, tuple(float, float)): x and y offset in units of pixel between the dataset and WCS center (for spectroscopy or other optics offset from imaging mode) Returns: corgidrp.data.Dataset: a version of the input dataset with the WCS headers added """ updated_dataset = input_dataset.copy() northangle = astrom_calibration.northangle platescale = astrom_calibration.platescale pa_aper_deg_astrom = astrom_calibration.pri_hdr['PA_APER'] ra_offset, dec_offset = astrom_calibration.avg_offset # create wcs for each image in the dataset for image in updated_dataset: im_data = image.data image_y, image_x = im_data.shape center_pixel = [(image_x-1) // 2, (image_y-1) // 2] if offset is not None: center_pixel[0] += offset[0] center_pixel[1] += offset[1] target_ra, target_dec = image.pri_hdr['RA'], image.pri_hdr['DEC'] corrected_ra, corrected_dec = target_ra - ra_offset, target_dec - dec_offset pa_aper_deg = image.pri_hdr['PA_APER'] roll_offset_deg = pa_aper_deg - pa_aper_deg_astrom #Define a roll offset using the pa_aper for the image frame and astrom cal frame #TO DO: double check this. northangle may be defined as the full rotation angle, # not north offset, in which case, adding the two below would be adding two absolute rotation angles from north vert_ang = np.radians(northangle - roll_offset_deg) pc = np.array([[-np.cos(vert_ang), np.sin(vert_ang)], [np.sin(vert_ang), np.cos(vert_ang)]]) cdmatrix = pc * (platescale * 0.001) / 3600. # create dictionary with wcs information wcs_info = {} wcs_info['CD1_1'] = float(cdmatrix[0,0]) wcs_info['CD1_2'] = float(cdmatrix[0,1]) wcs_info['CD2_1'] = float(cdmatrix[1,0]) wcs_info['CD2_2'] = float(cdmatrix[1,1]) wcs_info['CRPIX1'] = float(center_pixel[0]) wcs_info['CRPIX2'] = float(center_pixel[1]) wcs_info['CTYPE1'] = 'RA---TAN' wcs_info['CTYPE2'] = 'DEC--TAN' #wcs_info['CDELT1'] = (platescale * 0.001) / 3600 ## converting to degrees #wcs_info['CDELT2'] = (platescale * 0.001) / 3600 wcs_info['CRVAL1'] = float(corrected_ra) wcs_info['CRVAL2'] = float(corrected_dec) wcs_info['PLTSCALE'] = platescale ## [mas] / pixel wcs_info['NORTHANG'] = northangle - roll_offset_deg ## deg # update the image header with wcs information for key, value in wcs_info.items(): image.ext_hdr[key] = value # update the dataset with new header entries an dhistory message history_msg = 'WCS created' updated_dataset.update_after_processing_step(history_msg) return updated_dataset
[docs] def divide_by_exptime(input_dataset): """ Divide the data by the exposure time to get the units in electrons/s Args: input_dataset (corgidrp.data.Dataset): a dataset of Images (L2b-level) Returns: corgidrp.data.Dataset: a version of the input dataset with the data in units of electrons/s """ if input_dataset[0].ext_hdr['BUNIT'] != "photoelectron": raise ValueError("input dataset must have unit photoelectron for the conversion, not {0}".format(input_dataset[0].ext_hdr['BUNIT'])) data = input_dataset.copy() all_data_new = np.zeros(data.all_data.shape) all_err_new = np.zeros(data.all_err.shape) for i in range(len(data.frames)): exposure_time = float(data.frames[i].ext_hdr['EXPTIME']) data.frames[i].data = data.frames[i].data / exposure_time # data.frames[i].err = data.frames[i].err / exposure_time scale_factor = (1/exposure_time) * np.ones([data.frames[i].err.shape[1], data.frames[i].err.shape[2]]) # print(data.frames[i].err.shape) # print(data.frames[i].err[0]) # print(scale_factor.shape) #data.frames[i].rescale_error(data.frames[i].err, 'normalized by the exposure time') data.frames[i].rescale_error(scale_factor, 'normalized by the exposure time') all_data_new[i] = data.frames[i].data all_err_new[i] = data.frames[i].err data.frames[i].ext_hdr.set('BUNIT', 'photoelectron/s') history_msg = 'divided by the exposure time' data.update_after_processing_step(history_msg, new_all_data = all_data_new, new_all_err = all_err_new) return data
[docs] def split_image_by_polarization_state(input_dataset, image_center_x=512, image_center_y=512, separation_diameter_arcsec=7.5, alignment_angle_WP1=0.0, alignment_angle_WP2=45.0, image_size=None, padding=5): """ Split each polarimetric input image into two images by its polarization state, recompose the two images into a 2 x image_size x image_size datacube Args: input_dataset (corgidrp.data.Dataset): a dataset of Images (L2b-level), should all be taken with the same color filter (same CFAMNAME header) image_center_x (optional, float): x pixel coordinate location of the center location between the two polarized images on the detector, default is the detector center at x=512.0 image_center_y (optional, float): y pixel coordinate location of the center location between the two polarized images on the detector, default is the detector center at y=512.0 separation_diameter_arcsec (optional, float): Distance between the centers of the two polarized images on the detector in arcsec, default for Roman CGI is 7.5" alignment_angle_WP1 (optional, float): the angle in degrees of how the two polarized images are aligned with respect to the horizontal for WP1, defaults to 0 alignment_angle_WP2 (optional, float): the angle in degrees of how the two polarized images are aligned with respect to the horizontal for WP1, defaults to 45 image_size (optional, int): length/width of the cropped polarized images, if none is provided, the size is automatically determined based on the coronagraph mask used padding (optional, int): number of pixels to leave as blank space between the outer radius of each PSF and the edge of the image, default is 5, is overriden if a custom image size is provided Returns: corgidrp.data.Dataset: The input dataset with each image now being a 2 x image_size x image_size datacube """ passband = input_dataset[0].ext_hdr['CFAMNAME'] #remove the letter F to make it compatible with table if passband[1] == 'F': passband = passband[0] updated_dataset = input_dataset.copy() # determine coronagraph FOV diam = 2.363114 fov = input_dataset[0].ext_hdr['LSAMNAME'] if fov == 'NFOV': # NFOV outer radius is 9.7 λ/D # convert to arcsec: λ/D * 206265 radius_arcsec = 9.7 * ((read_cent_wave(passband)[0] * 1e-9) / diam) * 206265 elif fov == 'WFOV': # WFOV outer radius is 20.1 λ/D # convert to arcsec: λ/D * 206265 radius_arcsec = 20.1 * ((read_cent_wave(passband)[0] * 1e-9) / diam) * 206265 else: # default to unvignetted polarimetry FOV diameter of 3.8" radius_arcsec = 3.8 / 2 # convert to pixel: 0.0218" = 1 pixel radius_pix = int(round(radius_arcsec / 0.0218)) # raise error if the polarized images are close enough to overlap if separation_diameter_arcsec < 2 * radius_arcsec: raise ValueError(f'The inputted separation diameter of {separation_diameter_arcsec}" must be greater than {2 * radius_arcsec}"') # auto determine image size based on FOV if none is provided if image_size == None: # number of pixels between the coronagraph focal plane's outer radius and the image edge image_size = 2 * (radius_pix + padding) for image in updated_dataset: im_data = image.data im_err = image.err im_dq = image.dq image_y, image_x = im_data.shape prism = image.ext_hdr['DPAMNAME'] # make sure input image is polarized if prism != 'POL0' and prism != 'POL45': raise ValueError('Input image must be a polarimetric observation') # make sure every image in the input dataset uses the same filter if image.ext_hdr['CFAMNAME'] != input_dataset[0].ext_hdr['CFAMNAME']: raise ValueError('The color filter must be the same for all images in the dataset') # find polarized image centers if prism == 'POL0': #place image according to specified angle angle_rad = (alignment_angle_WP1 * np.pi) / 180 else: angle_rad = (alignment_angle_WP2 * np.pi) / 180 displacement_x = int(round((separation_diameter_arcsec * np.cos(angle_rad)) / (2 * 0.0218))) displacement_y = int(round((separation_diameter_arcsec * np.sin(angle_rad)) / (2 * 0.0218))) center_left = (image_center_x - displacement_x, image_center_y + displacement_y) center_right = (image_center_x + displacement_x, image_center_y - displacement_y) # find starting point for cropping image_radius = image_size // 2 start_left = (center_left[0] - image_radius, center_left[1] - image_radius) start_right = (center_right[0] - image_radius, center_right[1] - image_radius) # check that cropped image doesn't exceed full image bounds if start_left[0] < 0 or start_left[1] < 0 or start_right[0] < 0 or start_right[1] < 0\ or start_left[0] + image_size >= image_x or start_left[1] + image_size >= image_y\ or start_right[1] + image_size >= image_x or start_right[1] + image_size >= image_y: raise ValueError('Image bounds exceed that of the input data, please decrease the image size') # construct new datacube for data, err, and dq im_data_new = np.zeros(shape=(2, image_size, image_size)) # make sure to keep the additional dimension for err array im_err_new = np.zeros(shape=(im_err.shape[0], 2, image_size, image_size)) im_dq_new = np.zeros(shape=(2, image_size, image_size)) # define coordinates y, x = np.indices([image_size, image_size]) x_left = x + start_left[0] y_left = y + start_left[1] x_right = x + start_right[0] y_right = y + start_right[1] # fill in the first dimension, corresponding to 0 or 45 degree polarization im_data_new[0,:,:] = im_data[start_left[1]:start_left[1] + image_size, start_left[0]:start_left[0] + image_size] im_err_new[:,0,:,:] = im_err[:, start_left[1]:start_left[1] + image_size, start_left[0]:start_left[0] + image_size] im_dq_new[0,:,:] = im_dq[start_left[1]:start_left[1] + image_size, start_left[0]:start_left[0] + image_size] # fill in the second dimension, corresponding to the 90 or 135 degree polarization im_data_new[1,:,:] = im_data[start_right[1]:start_right[1] + image_size, start_right[0]:start_right[0] + image_size] im_err_new[:,1,:,:] = im_err[:, start_right[1]:start_right[1] + image_size, start_right[0]:start_right[0] + image_size] im_dq_new[1,:,:] = im_dq[start_right[1]:start_right[1] + image_size, start_right[0]:start_right[0] + image_size] # mark anything on the other side of the center line dividing the two images as NaN to avoid including the other image if prism == 'POL0': im_data_new[0, x_left >= image_center_x] = np.nan im_data_new[1, x_right <= image_center_x] = np.nan im_err_new[:, 0, x_left >= image_center_x] = np.nan im_err_new[:, 1, x_right <= image_center_x] = np.nan # update dq with corresponding flag for bad pixel im_dq_new[0, x_left >= image_center_x] = 1 im_dq_new[1, x_right <= image_center_x] = 1 else: im_data_new[0, y_left - image_center_y <= x_left - image_center_x] = np.nan im_data_new[1, y_right - image_center_y >= x_right - image_center_x] = np.nan im_err_new[:, 0, y_left - image_center_y <= x_left - image_center_x] = np.nan im_err_new[:, 1, y_right - image_center_y >= x_right - image_center_x] = np.nan im_dq_new[0, y_left - image_center_y <= x_left - image_center_x] = 1 im_dq_new[1, y_right - image_center_y >= x_right - image_center_x] = 1 #update data, err, and dq image.data = im_data_new image.err = im_err_new image.dq = im_dq_new history_msg = 'images split by polarization state' updated_dataset.update_after_processing_step(history_msg) return updated_dataset
[docs] def crop(input_dataset, sizexy=None, centerxy=None): """ Crop the Images in a Dataset to a desired field of view. Default behavior is to crop the image to the dark hole region, centered on the pixel intersection nearest to the star location. Assumes 3D Image data is a stack of 2D data arrays, so only crops the last two indices. Args: input_dataset (corgidrp.data.Dataset): a dataset of Images (any level) sizexy (int or array of int): desired frame size, if only one number is provided the desired shape is assumed to be square, otherwise xy order. If not provided, defaults to 61 for NFOV (narrow field-of-view) observations. Defaults to None. centerxy (float or array of float): desired center (xy order), should be a pixel center (aka integer) or intersection (aka half-integer) otherwise the function rounds to the nearest pixel or pixel intersection. Defaults to the "EACQ_ROW/COL" header values. Returns: corgidrp.data.Dataset: a version of the input dataset cropped to the desired FOV. """ # Copy input dataset dataset = input_dataset.copy() # Need to loop over frames and reinit dataset because array sizes change frames_out = [] for frame in dataset: prihdr = frame.pri_hdr exthdr = frame.ext_hdr dqhdr = frame.dq_hdr errhdr = frame.err_hdr # Pick default crop size based on the size of the effective field of view if sizexy is None: filter_band = exthdr['CFAMNAME'] # change filter names ending in F to just the number if filter_band[1] == 'F': filter_band = filter_band[0] prism = exthdr['DPAMNAME'] slit = exthdr['FSAMNAME'] cor_mode = exthdr['LSAMNAME'] spec_slits = ['R1C2', 'R2C3', 'R2C4', 'R2C5', 'R3C1', 'R3C2', 'R4C6', 'R5C1', 'R5C2', 'R6C3', 'R6C4', 'R6C5'] spec_prisms = ['PRISM2', 'PRISM3'] color_filters = ['1', '1A', '1B', '1C', '2', '2A', '2B', '2C', '3', '3A', '3B', '3C', '3D', '3E', '3F', '3G', '4', '4A', '4B', '4C'] # outer working angle in lambda/D cor_outer_working_angle = { 'WFOV': 20.1, 'SPEC': 9.1 } # Skip cropping by default if observation is non-coronagraphic if cor_mode == 'OPEN': return dataset if cor_mode == 'NFOV': # set size to 61 if coronagraph is HLC NFOV sizexy = 61 elif cor_mode not in cor_outer_working_angle or filter_band not in color_filters: # raise warning if unable to calculate image size raise UserWarning('Unable to determine image size, please change instrument configuration or provide a sizexy value') else: ## calculate image size using coronagraph information padding = 5 diam = 2.363114 # convert lambda/D to arcsec radius_arcsec = cor_outer_working_angle[cor_mode] * ((read_cent_wave(filter_band)[0] * 1e-9) / diam) * 206265 # convert arcsec to pix, 1 pix = 0.0218", round to nearest integer radius_pix = int(round(radius_arcsec / 0.0218)) # update sizexy sizexy = 2 * (padding + radius_pix) + 1 # add additional 60 pixels to account for increase in size with spec slit or prism if slit in spec_slits or prism in spec_prisms: sizexy += 60 # Assign new array sizes and center location frame_shape = frame.data.shape if isinstance(sizexy,int): sizexy = [sizexy]*2 if isinstance(centerxy,float): centerxy = [centerxy] * 2 elif centerxy is None: if ("EACQ_COL" in exthdr.keys()) and ("EACQ_ROW" in exthdr.keys()): centerxy = np.array([exthdr["EACQ_COL"],exthdr["EACQ_ROW"]]) if float(exthdr["EACQ_COL"]) == -999.0 or float(exthdr["EACQ_ROW"]) == -999.0: raise ValueError('EACQ_ROW/COL header values are invalid (-999.0)') else: raise ValueError('centerxy not provided but EACQ_ROW/COL are missing from image extension header.') # Round to center to nearest half-pixel if size is even, nearest pixel if odd size_evenness = (np.array(sizexy) % 2) == 0 centerxy_input = np.array(centerxy) centerxy = np.where(size_evenness,np.round(centerxy_input-0.5)+0.5,np.round(centerxy_input)) if not np.all(centerxy == centerxy_input): print(f'Desired center was {centerxy_input}. Centering crop on {centerxy}.') # Crop the data start_ind = (centerxy + 0.5 - np.array(sizexy)/2).astype(int) end_ind = (centerxy + 0.5 + np.array(sizexy)/2).astype(int) x1,y1 = start_ind x2,y2 = end_ind # Check if cropping outside the FOV left_pad = -x1 if (x1<0) else 0 right_pad = x2-frame_shape[-1] if (x2 > frame_shape[-1]) else 0 below_pad = -y1 if (y1<0) else 0 above_pad = y2-frame_shape[-2] if (y2 > frame_shape[-2]) else 0 if frame.data.ndim == 2: cropped_frame_data = np.full(sizexy[::-1],np.nan) cropped_frame_data[below_pad:sizexy[1]-above_pad, left_pad:sizexy[0]-right_pad] = frame.data[y1+below_pad:y2-above_pad, x1+left_pad:x2-right_pad] cropped_frame_err = np.full((frame.err.shape[0],*sizexy[::-1]),np.nan) cropped_frame_err[:,below_pad:sizexy[1]-above_pad, left_pad:sizexy[0]-right_pad] = frame.err[:,y1+below_pad:y2-above_pad, x1+left_pad:x2-right_pad] cropped_frame_dq = np.full(sizexy[::-1],np.nan) cropped_frame_dq[below_pad:sizexy[1]-above_pad, left_pad:sizexy[0]-right_pad] = frame.dq[y1+below_pad:y2-above_pad, x1+left_pad:x2-right_pad] # cropped_frame_data = frame.data[y1:y2,x1:x2] # cropped_frame_err = frame.err[:,y1:y2,x1:x2] # cropped_frame_dq = frame.dq[y1:y2,x1:x2] elif frame.data.ndim == 3: cropped_frame_data = np.full((frame.data.shape[0],*sizexy[::-1]),np.nan) cropped_frame_data[:,below_pad:sizexy[1]-above_pad, left_pad:sizexy[0]-right_pad] = frame.data[:,y1+below_pad:y2-above_pad, x1+left_pad:x2-right_pad] cropped_frame_err = np.full((*frame.err.shape[:2],*sizexy[::-1]),np.nan) cropped_frame_err[:,:,below_pad:sizexy[1]-above_pad, left_pad:sizexy[0]-right_pad] = frame.err[:,:,y1+below_pad:y2-above_pad, x1+left_pad:x2-right_pad] cropped_frame_dq = np.full((frame.dq.shape[0],*sizexy[::-1]),0).astype(int) cropped_frame_dq[:,below_pad:sizexy[1]-above_pad, left_pad:sizexy[0]-right_pad] = frame.dq[:,y1+below_pad:y2-above_pad, x1+left_pad:x2-right_pad] # cropped_frame_data = frame.data[:,y1:y2,x1:x2] # cropped_frame_err = frame.err[:,:,y1:y2,x1:x2] # cropped_frame_dq = frame.dq[:,y1:y2,x1:x2] else: raise ValueError('Crop function only supports 2D or 3D frame data.') # Update headers exthdr["NAXIS1"] = sizexy[0] exthdr["NAXIS2"] = sizexy[1] dqhdr["NAXIS1"] = sizexy[0] dqhdr["NAXIS2"] = sizexy[1] errhdr["NAXIS1"] = sizexy[0] errhdr["NAXIS2"] = sizexy[1] errhdr["NAXIS3"] = cropped_frame_err.shape[-3] if frame.data.ndim == 3: exthdr["NAXIS3"] = frame.data.shape[0] dqhdr["NAXIS3"] = frame.dq.shape[0] errhdr["NAXIS4"] = frame.err.shape[0] updated_hdrs = [] if ("EACQ_COL" in exthdr.keys()): exthdr["EACQ_COL"] -= x1 exthdr["EACQ_ROW"] -= y1 updated_hdrs.append('EACQ_ROW/COL') if ("CRPIX1" in exthdr.keys()): exthdr["CRPIX1"] -= x1 exthdr["CRPIX2"] -= y1 updated_hdrs.append('CRPIX1/2') if ("STARLOCX" in exthdr.keys()): exthdr["STARLOCX"] -= x1 exthdr["STARLOCY"] -= y1 updated_hdrs.append('STARLOCX/Y') if not ("DETPIX0X" in exthdr.keys()): exthdr.set('DETPIX0X',0) exthdr.set('DETPIX0Y',0) exthdr.set('DETPIX0X',exthdr["DETPIX0X"]+x1) exthdr.set('DETPIX0Y',exthdr["DETPIX0Y"]+y1) new_frame = data.Image(cropped_frame_data,prihdr,exthdr,cropped_frame_err,cropped_frame_dq,frame.err_hdr,frame.dq_hdr) new_frame.filename = frame.filename frames_out.append(new_frame) output_dataset = data.Dataset(frames_out) history_msg1 = f"""Frames cropped to new shape {list(output_dataset[0].data.shape)} on center {list(centerxy)}. Updated header kws: {", ".join(updated_hdrs)}.""" output_dataset.update_after_processing_step(history_msg1) return output_dataset
[docs] def update_to_l3(input_dataset): """ Updates the data level to L3. Only works on L2b data. Currently only checks that data is at the L2b level Args: input_dataset (corgidrp.data.Dataset): a dataset of Images (L2b-level) Returns: corgidrp.data.Dataset: same dataset now at L3-level """ # check that we are running this on L2b data for orig_frame in input_dataset: if orig_frame.ext_hdr['DATALVL'] != "L2b": err_msg = "{0} needs to be L2b data, but it is {1} data instead".format(orig_frame.filename, orig_frame.ext_hdr['DATALVL']) raise ValueError(err_msg) # we aren't altering the data updated_dataset = input_dataset.copy(copy_data=False) for frame in updated_dataset: # Apply header rules to each frame pri_hdr, ext_hdr, err_hdr, dq_hdr = check.merge_headers(data.Dataset([frame]), invalid_keywords=[]) frame.pri_hdr = pri_hdr frame.ext_hdr = ext_hdr frame.err_hdr = err_hdr frame.dq_hdr = dq_hdr frame.ext_hdr['DATALVL'] = "L3" # update filename convention. The file convention should be # "CGI_[dataleel_*]" so we should be same just replacing the just instance of L1 frame.filename = frame.filename.replace("_l2b", "_l3_", 1) #updating filename in the primary header frame.pri_hdr['FILENAME'] = frame.filename history_msg = "Updated Data Level to L3" updated_dataset.update_after_processing_step(history_msg) return updated_dataset