"""
A description of the Chip noise properties and their positions on the Detector
Module Summary
--------------
This module holds three classes: ``Detector``, ``Chip`` and ``HXRGNoise``.
``Chip``
Everything to do with photons and electrons happens in the ``Chip`` class. Each
``Chip`` is initialised with a position relative to the centre of the detector
array, a size [in pixels] and a resolution [in arcsec]. Photons fall onto the
``Chip`` s and are read out together with the read noise characteristics of the
``Chip``.
``Detector``
The ``Detector`` holds the information on where the ``Chip`` s are placed on the
focal plane. Focal plane coordinates are in [arcsec]. These coordinates are
either read in from a default file or determined by the user. The ``Detector``
object is an intermediary - it only passes information on the photons to the
``Chip`` s. It is mainly a convenience class so that the user can read out all
``Chip`` s at the same time.
Classes
-------
Detector
builds an array of ``Chip`` objects based on a ``UserCommands`` object
Chip
converts incoming photons into ADUs and adds in read-out noise
See Also
--------
:class:`simcado.optics.OpticalTrain`,
:class:`simcado.source.Source`
Notes
-----
References
----------
[1] Bernhard Rauscher's HxRG Noise Generator script
Examples
--------
The :class:`.Detector` can be used in stand-alone mode. In this case it outputs
only the noise that a sealed-off detector would generate:
>>> import simcado
>>> fpa = simcado.Detector(simcado.UserCommands())
>>> fpa.read_out(output=True, chips=[0])
The :class:`.Detector` is more useful if we combine it with a :class:`.Source`
object and an :class:`.OpticalTrain`. Here we create a :class:`.Source` object
for an open cluster in the LMC and pass the photons arriving from it through the
E-ELT and MICADO. The photons are then cast onto the detector array. Each
:class:`.Chip` converts the photons to ADUs and adds the resulting image to an
Astropy ``HDUList``. The ``HDUList`` is then written to disk.
>>> # Create a set of commands, optical train and detector
>>>
>>> import simcado
>>> cmds = simcado.UserCommands()
>>> opt_train = simcado.OpticalTrain(cmds)
>>> fpa = simcado.Detector(cmds)
>>>
>>> # Pass photons from a 10^4 Msun open cluster in the LMC through to the detector
>>>
>>> src = sim.source.source_1E4_Msun_cluster()
>>> src.apply_optical_train(opt_train, fpa)
>>>
>>># Read out the detector array to a FITS file
>>>
>>> fpa.read_out(filename="my_raw_image.fits")
"""
############################
# TODO
# - update open, write - remove references to self.params
import os
import sys
from datetime import datetime
import warnings
#import logging # unused
from copy import deepcopy
import multiprocessing as mp
import numpy as np
import numpy.ma as ma
import scipy.ndimage.interpolation as spi
#from scipy.ndimage.interpolation import zoom
from astropy.io import fits
from astropy.io import ascii as ioascii # ascii redefines builtin
from astropy.wcs import WCS
#from astropy.stats.funcs import median_absolute_deviation as mad
from .utils import find_file, airmass2zendist
from . import spectral as sc
from . import commands
from .nghxrg import HXRGNoise
__all__ = ["Detector", "Chip", "open", "plot_detector", "plot_detector_layout",
"make_noise_cube", "install_noise_cube"]
################################################################################
# Detector Objects #
################################################################################
[docs]class Detector(object):
"""
Generate a series of :class:`.Chip` objects for a focal plane array
Extended Summary
----------------
The :class:`.Detector` is a holder for the series of :class:`.Chip`
objects which make up the detector array. The main advantage of the
:class:`.Detector` object is that the user can read out all chips in the
whole detector array at once. A :class:`.Detector` is a parameter in the
:meth:`.Source.apply_optical_train()` method.
Parameters
----------
cmds : UserCommands
Commands for how to model the Detector
small_fov : bool, optional
Default is True. Uses a single 1024x1024 window at the centre of the FoV
Attributes
----------
cmds : UserCommands
commands for modelling the detector layout and exposures
layout : astropy.table.Table
table of positions and sizes of the chips on the focal plane
chips : list
a list of the ``Chips`` which make up the detector array
oversample : int
factor between the internal angular resolution and the pixel FOV
fpa_res : float
[mas] field of view of a single pixel
dit : float
[s] exposure time of a single DIT
tro : float
[s] time between consecutive non-destructive readouts in up-the-ramp mode
ndit : int
number of exposures (DITs)
Methods
-------
read_out()
for reading out the detector array into a FITS file
open()
not yet implemented
write()
not yet implemented
Save the Detector object into a FITS file
.. todo::
Open should be moved into a general function for detector.py which
returns a :class:`.Detector` object after reading in a saved detector
file
See Also
--------
:class:`.Chip`, :class:`.Source`,
:class:`.OpticalTrain`, :class:`.UserCommands`
Examples
--------
Create a :class:`Detector` object
::
>>> import simcado as sim
>>> my_cmds = sim.UserCommands()
>>> my_detector = sim.Detector(my_cmds)
Read out only the first :class:`.Chip`
::
>>> my_detector.readout(filename=image.fits, chips=[0])
"""
def __init__(self, cmds, small_fov=True):
# 1. Read in the chip layout
# 2. Read in the flat field file
# 3. Generate chip objects
# 4. Check if a noise file has been given
# if not, generate new noise files
# else: read in the noise file
# if the noise file has many extensions, choose several random
# extensions
self.cmds = cmds
if small_fov:
print("Safety switch is on - Detector(..., small_fov='True')")
self.layout = ioascii.read(
"""# id x_cen y_cen x_len y_len pixsize angle gain
# mm mm pixel pixel mm deg e-/ADU
0 0 0 1024 1024 0.015 0. 1.""")
else:
try:
self.layout = ioascii.read(self.cmds["FPA_CHIP_LAYOUT"])
except:
raise FileNotFoundError(self.cmds["FPA_CHIP_LAYOUT"] +
" (FPA_CHIP_LAYOUT) cannot be read")
if self.cmds["INST_FLAT_FIELD"] is not None and \
os.path.exists(self.cmds["INST_FLAT_FIELD"]):
hdu_flat_field = fits.open(self.cmds["INST_FLAT_FIELD"])
n_exts = len(hdu_flat_field.info(output=False))
if n_exts == len(self.layout["id"]):
j = 0
elif n_exts > len(self.layout["id"]):
j = 1
elif n_exts == 1:
hdu_flat_field = [hdu_flat_field[0]] * len(self.layout["id"])
else:
raise ValueError("What's going on with INST_FLAT_FIELD?")
else:
j = 0
hdu_flat_field = [None] * len(self.layout["id"])
self.chips = [Chip(self.layout["x_cen"][i], self.layout["y_cen"][i],
self.layout["x_len"][i], self.layout["y_len"][i],
self.cmds["SIM_DETECTOR_PIX_SCALE"],
self.layout["pixsize"][i],
self.layout["angle"][i],
self.layout["gain"][i],
(self.cmds["OBS_RA"], self.cmds["OBS_DEC"]),
0.,
self.layout["id"][i],
hdu_flat_field[i+j])
for i in range(len(self.layout["x_cen"]))]
self.oversample = self.cmds["SIM_OVERSAMPLING"]
self.fpa_res = self.cmds["SIM_DETECTOR_PIX_SCALE"]
self.dit = self.cmds["OBS_DIT"]
self.ndit = self.cmds["OBS_NDIT"]
self.exptime = self.dit * self.ndit
self._n_ph_atmo = 0
self._n_ph_mirror = 0
self._n_ph_ao = 0
self.array = None # defined in method
[docs] def read_out(self, filename=None, to_disk=False, chips=None,
read_out_type="superfast", **kwargs):
"""
Simulate the read-out process of the detector array
Based on the parameters set in the ``UserCommands`` object, the detector
will read out the images stored on the ``Chips`` according to the
specified read-out scheme, i.e. Fowler, up-the-ramp, single read, etc.
Parameters
----------
filename : str
where the file is to be saved. If ``None`` and ``to_disk`` is true,
the output file is called "output.fits". Default is ``None``
to_disk : bool
a flag for where the output should go.
If ``filename`` is given or if ``to_disk=True``,
the ``Chip`` images will be written to a `.fits`` file on disk.
If no `filename`` is specified, the output will be called "output.fits".
chips : int, array-like, optional
The chip or chips to be read out, based on the detector_layout.dat
file. Default is the first ``Chip`` specified in the list, i.e. [0].
read_out_type : str, optional
The name of the algorithm used to read out the chips:
- "superfast"
- "non_destructive"
- "up_the_ramp"
Returns
-------
astropy.io.fits.HDUList
Keyword Arguments (**kwargs)
----------------------------
**kwargs are used to update the ``UserCommands`` object that controls
the ``Detector``. Therefore any dictionary keywords can be passed in the
form of a dictionary, i.e. {"OBS_DIT" : 60, "OBS_OUTPUT_DIR" : "./"}
"""
#removed kwargs
self.cmds.update(kwargs)
if filename is not None:
to_disk = True
if filename is None and to_disk is True:
if self.cmds["OBS_OUTPUT_DIR"] is None:
self.cmds["OBS_OUTPUT_DIR"] = "./output.fits"
filename = self.cmds["OBS_OUTPUT_DIR"]
if chips is not None:
if np.isscalar(chips):
ro_chips = [chips]
else:
ro_chips = chips
elif chips is None:
ro_chips = np.arange(len(self.chips))
else:
raise ValueError("Something wrong with ``chips``")
# Time stamp for FITS header
#creation_date = datetime.now().isoformat(timespec='seconds')
# timespec="seconds" throws an error on some python versions
creation_date = datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
hdulist = fits.HDUList()
# Create primary header unit for multi-extension files
if len(ro_chips) > 1:
primary_hdu = fits.PrimaryHDU()
primary_hdu.header['DATE'] = creation_date
for key in self.cmds.cmds:
val = self.cmds.cmds[key]
if isinstance(val, (sc.TransmissionCurve, sc.EmissionCurve,
sc.UnityCurve, sc.BlackbodyCurve)):
val = val.params["filename"]
if isinstance(val, str) and len(val) > 35:
val = "... " + val[-35:]
try:
primary_hdu.header["HIERARCH "+key] = val
except NameError: # any other exceptions possible?
pass
hdulist.append(primary_hdu)
# Save the detector image(s)
for i in ro_chips:
######
# Put in a catch here so that only the chips specified in "chips"
# are read out
######
print("Reading out chip", self.chips[i].id, "using",
read_out_type)
print("DIT =", self.dit, " NDIT =", self.ndit)
array = self.chips[i].read_out(self.cmds,
read_out_type=read_out_type)
## TODO: transpose is just a hack - need to make sure
## x and y are handled correctly throughout SimCADO
thishdu = fits.ImageHDU(array.T)
thishdu.header["EXTNAME"] = ("CHIP_{:02d}".format(self.chips[i].id),
"Chip ID")
thishdu.header["CHIP_ID"] = (self.chips[i].id, "Chip ID")
thishdu.header['DATE'] = creation_date
# Primary WCS for sky coordinates
thishdu.header.extend(self.chips[i].wcs.to_header())
# Secondary WCS for focal plane coordinates
try:
thishdu.header.extend(self.chips[i].wcs_fp.to_header(key='A'))
except AttributeError:
print("No WCS_FP!")
pass
thishdu.header["BUNIT"] = ("ADU", "")
thishdu.header["EXPTIME"] = (self.dit, "[s] Exposure time")
thishdu.header["NDIT"] = (self.ndit, "Number of exposures")
thishdu.header["INTTIME"] = (self.exptime,
"[s] Total integration time")
#thishdu.header["TRO"] = (self.tro,
# "[s] Time between non-destructive readouts")
thishdu.header["GAIN"] = (self.chips[i].gain, "[e-/ADU]")
thishdu.header["AIRMASS"] = (self.cmds["ATMO_AIRMASS"], "")
thishdu.header["ZD"] = \
(airmass2zendist(self.cmds["ATMO_AIRMASS"]), "[deg]")
for key in self.cmds.cmds:
val = self.cmds.cmds[key]
if isinstance(val, str):
if len(val) > 35:
val = "... " + val[-35:]
try:
thishdu.header["HIERARCH "+key] = val
except NameError: # any other exceptions possible?
pass
except ValueError:
warnings.warn("ValueError - Couldn't add keyword: "+key)
hdulist.append(thishdu)
if to_disk:
hdulist.writeto(filename, clobber=True, checksum=True)
return hdulist
[docs] def write(self, filename=None, **kwargs):
"""
Write a ``Detector`` object out to a FITS file
Extended Summary
----------------
Writes the important information contained in a ``Detector`` object into
FITS file for later use. The main information written out includes: the
layout of the detector chips, any pixel maps associated with the
detector chips, a linearity curve and a QE curve for the chips.
Parameters
----------
filename : str, optional
path to the FITS file where the ``Detector`` object is stored. If
``filename=None`` (by default), the file written is ``./detector.fits``
Keyword Arguments (**kwargs)
----------------------------
Examples
--------
"""
self.params.update(kwargs)
if filename is None:
filename = self.params["OBS_OUTPUT_DIR"]
if filename is None:
raise ValueError("No output path was specified. " + \
"Use either filename= or HXRG_OUTPUT_PATH=")
hdu = fits.PrimaryHDU(self.array)
hdu.header["PIX_RES"] = self.pix_res
hdu.header["EXPTIME"] = self.dit
hdu.header["GAIN"] = self.params["FPA_GAIN"]
hdu.header["SIMCADO"] = "FPA_NOISE"
try:
hdu.writeto(filename, clobber=True, checksum=True)
except OSError:
warnings.warn(filename+" exists and is busy. OS won't let me write")
################################################################################
# Chip Objects #
################################################################################
[docs]class Chip(object):
"""
Holds the "image" as seen by a single chip in the focal plane
Extended Summary
----------------
The ``Chip`` object contains information on where it is located in the focal
plane array. The method ``<Source>.apply_optical_train()`` passes an image of
the on-sky object to each ``Chip``. This image is resampled to the ``Chip``
pixel scale. Each ``Chip`` holds the "ideal" image as an array of expectation
values for number of photons arriving per second. The ``Chip`` then
adds detector noise and other characteristics to the image when
<Detector>.readout() is called.
Parameters
----------
x_cen, y_cen : float
[micron] the coordinates of the centre of the chip relative to the
centre of the focal plane
x_len, y_len : int
the number of pixels per dimension
pix_res : float
[arcsec] the field of view per pixel
id : int
an identification number for the chip (assuming they are not correctly
ordered)
flat_field : np.ndarray
a 2D array holding the flat fielding effects for the chip
Attributes
----------
x_cen, y_cen : float
[arcsec] the coordinates of the centre of the chip relative to the
centre of the focal plane
naxis1, naxis2 : int
the number of pixels per dimension
pix_res : float
[arcsec] the field of view per pixel
chipid : int, optional
the id of the chip relative to the others on the detector array. Default is ``None``
dx, dy : float
[arcsec] half of the field of view of each chip
x_min, x_max, y_min, y_max : float
[arcsec] the borders of the chip relative to the centre of the focal plane
array : np.ndarray
an array for holding the signal registered by the ``Chip``
Methods
-------
add_signal(signal)
adds signal to ``.array``. The signal should be the same dimensions as
``Chip.array``
add_uniform_background(emission, lam_min, lam_max, output=False)
adds a constant to the signal in ``.array``. The background level is found
by integrating the ``emission`` curve between ``lam_min`` and ``lam_max``.
If output is set to ``True``, an image with the same dimensions as
``.array`` scaled to the background flux is returned.
apply_pixel_map(pixel_map_path=None, dead_pix=None, max_well_depth=1E5)
applies a mask to ``.array`` representing the position of the current
"hot" and "dead" pixels / lines
reset()
resets the signal on the ``Chip`` to zero. In future releases, an
implementation of the persistence characteristics of the detector will
go here.
Raises
------
See Also
--------
Detector,
:class:`simcado.source.Source`,
:class:`simcado.commands.UserCommands`,
:class:`simcado.optics.OpticalTrain`
Examples
--------
"""
def __init__(self, x_cen, y_cen, x_len, y_len, pix_res, pixsize=15,
angle=0, gain=1, obs_coords=[0,0], fieldangle=0,
chipid=None, flat_field=None):
# permanent rotation of chip
cangle = np.cos(np.deg2rad(angle))
sangle = np.sin(np.deg2rad(angle))
# offset of chip centre from field centre in um
xoff = x_cen * cangle - y_cen * sangle
yoff = x_cen * sangle + y_cen * cangle
# Primary WCS to transform pixel coordinates to sky coordinates
thewcs = WCS(naxis=2)
thewcs.wcs.ctype = ['RA---TAN', 'DEC--TAN']
thewcs.wcs.name = "PIX2SKY"
thewcs.wcs.crval = obs_coords
thewcs.wcs.crpix = [(x_len + 1) / 2 - xoff / pixsize,
(y_len + 1) / 2 - yoff / pixsize]
thewcs.wcs.cdelt = [pix_res / 3600, pix_res / 3600] # arcsec to deg
thewcs.wcs.cunit = ['deg', 'deg']
thewcs.wcs.pc = np.array([[cangle, sangle], [-sangle, cangle]])
fieldangle = np.deg2rad(fieldangle)
fieldrot = np.matrix([[np.cos(fieldangle), np.sin(fieldangle)],
[-np.sin(fieldangle), np.cos(fieldangle)]])
thewcs.wcs.pc = fieldrot * np.asmatrix(thewcs.wcs.pc)
self.wcs = thewcs
# Second WCS to transform pixel coordinates to focal-plane coordinates
wcs_fp = WCS(naxis=2)
wcs_fp.wcs.ctype = ['LINEAR', 'LINEAR']
wcs_fp.wcs.name = "PIX2FP"
wcs_fp.wcs.crval = [0., 0.]
wcs_fp.wcs.crpix = [(x_len + 1) / 2 - xoff / pixsize,
(y_len + 1) / 2 - yoff / pixsize]
wcs_fp.wcs.cdelt = [pixsize, pixsize]
wcs_fp.wcs.cunit = ['mm', 'mm']
wcs_fp.wcs.pc = np.array([[cangle, sangle], [-sangle, cangle]])
self.wcs_fp = wcs_fp
# attributes in um
self.xcen_um = x_cen
self.ycen_um = y_cen
dx_um = (x_len // 2) * pixsize
dy_um = (y_len // 2) * pixsize
self.xmin_um = self.xcen_um - dx_um
self.xmax_um = self.xcen_um + dx_um
self.ymin_um = self.ycen_um - dy_um
self.ymax_um = self.ycen_um + dy_um
# for backwards compatibility: attributes in arcsec
self.x_cen = x_cen / pixsize * pix_res
self.y_cen = y_cen / pixsize * pix_res
self.naxis1 = x_len
self.naxis2 = y_len
self.pix_res = pix_res
self.gain = gain
self.id = chipid
dx = (x_len // 2) * pix_res
dy = (y_len // 2) * pix_res
self.x_min = self.x_cen - dx
self.x_max = self.x_cen + dx
self.y_min = self.y_cen - dy
self.y_max = self.y_cen + dy
self.array = None
self.ndit = 0
self.dit = 0
self.dark = 0
self.min_dit = 0
if flat_field is not None:
if isinstance(flat_field, (fits.ImageHDU, fits.PrimaryHDU)):
flat_field = flat_field.data
wf, hf = flat_field.shape
zx, zy = self.naxis1 / wf, self.naxis2 / hf
if wf == 1. and hf == 1.:
self.flat_field = flat_field
else:
self.flat_field = spi.zoom(flat_field, (zx, zy), order=1)
self.flat_field = self.flat_field.T
else:
self.flat_field = None
[docs] def add_signal(self, signal):
"""
Add a 2D array of photon signal to the Chip
Add some signal photons to the detector array. Input units are expected
to be [ph/s/pixel]
Parameters
----------
signal : np.ndarray
[ph/pixel/s] photon signal. ``signal`` should have the same dimensions
as the ``array``
Returns
-------
None
"""
if isinstance(signal, np.ndarray):
if signal.shape[0] == self.naxis1 and \
signal.shape[1] == self.naxis2:
if self.array is None:
self.array = signal
else:
self.array += signal
else:
raise ValueError(str(signal.shape) + " != " + \
str((self.naxis1, self.naxis2)))
elif not hasattr(signal, "__len__"):
self.array += signal
[docs] def apply_pixel_map(self, pixel_map_path=None, dead_pix=None,
max_well_depth=1E5):
"""
Adds "hot" and "dead" pixels to the array
Extended Summary
----------------
applies a mask to ``.array`` representing the positions of the current
"hot" and "dead" pixels / lines. The method either reads in a FITS file
with locations of these pixels, or generates a series of random
coordinates and random weights for the pixels.
Parameters
----------
pixel_map_path : str
path to the FITS file. Default is None
dead_pix : int
[%] the percentage of dead or hot pixels on the chip - only used if
``pixel_map_path = None``. Default is ``None``.
max_well_depth : 1E5
Returns
-------
None
"""
try:
pixel_map = fits.getdata(pixel_map_path)
if self.array.shape != pixel_map.shape:
raise ValueError("pixel_map.shape != detector_array.shape")
self.array += pixel_map * max_well_depth
except ValueError:
if dead_pix is not None:
n = int(self.naxis1 * self.naxis2 * dead_pix / 100)
x = np.random.randint(self.naxis1, size=n)
y = np.random.randint(self.naxis2, size=n)
z = np.random.random(n)
self.array[x, y] += z * max_well_depth
else:
raise ValueError("Couldn't apply pixel_map")
[docs] def reset(self):
self.array = None
[docs] def read_out(self, cmds, read_out_type="superfast"):
"""
Read out the detector array
Parameters
----------
cmds : simcado.UserCommands
Commands for how to read out the chip
Returns
-------
out_array : np.ndarray
image of the chip read out
"""
# set up the read out
self.dit = cmds["OBS_DIT"]
self.ndit = int(cmds["OBS_NDIT"])
self.exptime = self.dit * self.ndit
self.dark = cmds["FPA_DARK_MEDIAN"]
self.min_dit = cmds["FPA_PIXEL_READ_TIME"] * \
(self.naxis1 * self.naxis1 / cmds["HXRG_NUM_OUTPUTS"])
if self.array is None:
self.array = np.zeros((self.naxis1, self.naxis2), dtype=np.float32)
# At this point, the only negatives come from the convolution.
# Remove them for the Poisson process
self.array[self.array < 0] = 0
out_array = np.zeros(self.array.shape, dtype=np.float32)
######## Multiply by Exptime
# the different read out modes
if read_out_type.lower() == "superfast":
out_array = self._read_out_superfast(cmds, self.dit, self.ndit)
elif read_out_type.lower() == "non_destructive":
out_array = self._read_out_non_destructive(cmds, self.dit, self.ndit)
elif read_out_type.lower() == "up_the_ramp":
out_array = self._read_out_up_the_ramp(cmds, self.dit, max_byte=2**30)
else:
raise ValueError("``read_out_type`` not readable")
######## Flat fielding
if self.flat_field is not None:
out_array *= self.flat_field
######## Remove a constant BG level
if cmds["OBS_REMOVE_CONST_BG"].lower() == "yes":
bg_val = np.median(out_array)
out_array -= bg_val
return out_array
def _read_out_non_destructive(self, cmds, dit, ndit):
"""
Read out NDIT times non-destructively according to FPA_READ_OUT_SCHEME
Parameters
----------
cmds : UserCommands
Returns
-------
out_array : np.ndarray
"""
lin_curve = cmds["FPA_LINEARITY_CURVE"]
ro_times = self._get_readout_times(scheme=cmds["FPA_READ_OUT_SCHEME"])
out_array = np.zeros(self.array.shape, dtype=np.float32)
for n in range(self.ndit):
ro_cube = []
for t in ro_times:
signal = self._read_out_poisson((self.array + self.dark), dit=t, ndit=1)
if lin_curve is not None:
signal, lin_curve = self._apply_linearity(signal, lin_curve,
return_curve=True)
read_noise = self._read_noise_frame(cmds)[0]
ro_cube += [signal + read_noise]
ro_cube = np.array(ro_cube)
mask = ro_cube > cmds["FPA_FULL_WELL_DEPTH"]
masked_ro_cube = ma.array(ro_cube, mask=mask)
masked_ro_cube[1:,:,:] -= masked_ro_cube[0,:,:]
ro_times.resize((len(ro_times),1,1))
masked_ro_cube[1:,:,:] /= ro_times[1:,:,:]
av_ro_cube = np.average(masked_ro_cube[1:,:,:], axis=0)
out_array += av_ro_cube * dit
out_array /= self.gain
return out_array.astype(np.float32)
## TODO: What to do if dit = min_dit (single read)?
## TODO: Make breaking up into memory chunks more flexible?
def _read_out_up_the_ramp(self, cmds, dit, max_byte=2**30):
"""
Test readout onto a detector using cube model
Parameters
----------
image :
a 2D image to be mapped onto the detector. Units are [ph/s/pixel]
dit :
integration time [s]
Optional Parameters
-------------------
max_byte :
the largest possible chunk of memory that can be used for computing
the sampling slope
This function builds an intermediate cube of dimensions (nx, ny, nro) with a
layer for each non-destructive read.
Output is given in [ph/pixel].
"""
print("NOTE - 'up the ramp' readout only reads a single DIT")
image = self.array + self.dark
tro = self.min_dit
nx, ny = image.shape
nro = np.int(dit / tro)
tpts = (1 + np.arange(nro)) * tro
img_byte = image.nbytes
pix_byte = img_byte / (nx * ny)
max_pix = max_byte / pix_byte
#cube_megabyte = img_byte * nro / 2**20
#print("Full cube has {0:.1f} Megabytes".format(cube_megabyte))
ny_cut = np.int(max_pix / (nx * nro))
if ny_cut >= ny:
ny_cut = ny
#print("Cut image to ny={0:d} rows".format(ny_cut))
slope = np.zeros(image.shape)
cube = np.zeros((nx, ny_cut, nro))
y1 = 0
while y1 < ny:
y2 = y1 + ny_cut
if y2 > ny:
y2 = ny
ny_cut = ny - y1
try:
del cube
cube = np.zeros((nx, ny_cut, nro))
except NameError:
pass
## Fill the cube with Poisson realization, individual reads
for i in range(nro):
cube[:, :, i] = np.random.poisson(image[:, y1:y2] * tro)
## Build the ramp
sumcube = cube.cumsum(axis=2)
## determine the slope using explicit formula calculated over cube
Sx = tpts.sum()
Sxx = (tpts * tpts).sum()
Sy = np.sum(sumcube, axis=2)
Sxy = np.sum(sumcube * tpts, axis=2)
slope[:, y1:y2] = (nro * Sxy - Sx * Sy) / (nro * Sxx - Sx * Sx)
## Move to next slice
y1 = y2
# return values are [ph/pixel]
return slope * dit
def _read_out_superfast(self, cmds, dit, ndit):
"""
Superfast read-out
"""
signal = self._read_out_poisson(self.array + self.dark, dit, ndit)
# apply the linearity curve
lin_curve = cmds["FPA_LINEARITY_CURVE"]
if lin_curve is not None:
signal, lin_curve = self._apply_linearity(signal,
lin_curve,
return_curve=True)
# superfast hack to get an approximation of the readout noise
# in the image
ro = self._read_noise_frame(cmds, n_frames=1) / np.sqrt(ndit)
########## Could work, but it's too slow for ndit > 10 ##############
# add 1 to the ndits, because there will always be a readout at
# the start
#ro_frames = self._read_noise_frame(cmds, n_frames=max(ndit,2))
#ro = np.sum(ro_frames, axis=0)
out_array = signal + ro
out_array /= self.gain
return out_array
def _read_out_poisson(self, image, dit, ndit):
"""
Apply a poisson distribution to the image
Parameters
----------
image : np.ndarray
image to be poissonified
dit : float
[s] length of exposure
ndit : int
[#] number of exposures
Returns
-------
im_st : np.ndarray
average of ndit exposures of length dit
"""
# image2 holds the number of photons expected over a single
# exposure
image2 = image * dit
# Check for windows systems. np.poisson is limited to 32-bit
if os.name == "nt":
image2[image2 > 2.147E9] = 2.147E9 # 2**31 = 2147483648
# Each DIT is read out individually. This helps prevent overflows
# due to too large expected photon numbers, at the expense of increased
# execution time.
im_st = np.zeros(np.shape(image))
for _ in range(ndit):
im_st += np.random.poisson(image2)
# Return the average image, corresponding to one DIT
im_st /= ndit
return im_st.astype(np.float32)
def _read_noise_frame(self, cmds, n_frames=1):
"""
Read in read-out-noise from the FITS file specified by FPA_NOISE_PATH
If cmds["FPA_NOISE_PATH"] == "gen", all info about the size and number
of layers must be in the :class:`.UserCommands` object.
I.e. HXRG_NUM_NDRO
Parameters
----------
cmds : UserCommands
n_frames : int
The number of frames needed
Returns
-------
noise_cube : np.ndarray
if n_frames == 1: shape = (naxis1, naxis2)
else: shape = = (naxis1, naxis2, n_frames)
"""
if cmds["FPA_USE_NOISE"].lower() == "no":
return np.zeros((self.naxis1, self.naxis2))
if "gen" in cmds["FPA_NOISE_PATH"].lower():
if cmds["HXRG_OUTPUT_PATH"] is not None:
generate_hxrg_noise(cmds)
tmp = fits.getdata(cmds["HXRG_OUTPUT_PATH"])
return tmp[:self.naxis1, :self.naxis2]
else:
noise_cube = generate_hxrg_noise(cmds)
elif cmds["FPA_NOISE_PATH"] is not None:
n = len(fits.info(cmds["FPA_NOISE_PATH"], False))
layer = np.random.randint(low=0, high=n, size=n_frames)
tmp = [fits.getdata(cmds["FPA_NOISE_PATH"], i) for i in layer]
noise_cube = np.array([im[:self.naxis1, :self.naxis2] for im in tmp])
else:
noise_cube = np.zeros((self.naxis1, self.naxis2, n_frames))
if n_frames == 1:
return noise_cube[0, :, :]
else:
return noise_cube
def _get_readout_times(self, scheme="double_corr"):
"""
Expect that scheme = cmds["FPA_READ_OUT_SCHEME"]
"""
if "double_corr" in scheme:
scheme = [0, self.dit]
elif "up" in scheme:
scheme = np.arange(0, self.dit, self.min_dit + 1E-3)
elif "fowl" in scheme:
fowl_pair = np.min((4, int(self.dit / self.min_dit) // 2))
scheme = np.arange(0, self.dit, self.min_dit + 1E-3).tolist()
scheme = scheme[:fowl_pair] + scheme[-fowl_pair:]
if isinstance(scheme, str):
if os.path.exists(scheme):
data = ioascii.read(scheme)
times = data[data.colnames[0]]
elif isinstance(scheme, (list, tuple, np.ndarray)):
times = np.array(scheme)
return times.astype(np.float32)
def _apply_linearity(self, in_array, curve, return_curve=False):
"""
Apply a linearity curve to a 2D array
Parameters
----------
in_array : array
the array to be linearized
curve : string, astropy.Table
if string, it is assumed to be a filename
if Table, it is assumed to be from a previous run
return_curve : bool
whether to return the linearity curve
Returns
-------
out_array, data : array, Table
if return_curve == True
out_array : array
if return_curve == False
"""
from astropy.table import Table
if isinstance(curve, str):
if os.path.exists(curve):
data = ioascii.read(curve)
else:
raise ValueError("file doesn't exist: "+curve)
elif isinstance(curve, Table):
data = curve
real_cts = data[data.colnames[0]].data.astype(np.float32)
measured_cts = data[data.colnames[1]].data.astype(np.float32)
out_array = np.interp(in_array.flatten(),
real_cts, measured_cts).reshape(in_array.shape)
out_array = out_array.astype(np.float32)
if return_curve:
return out_array, data
else:
return out_array
def __array__(self):
return self.array
def __mul__(self, x):
psf_new = deepcopy(self)
return psf_new.array * x
def __add__(self, x):
psf_new = deepcopy(self)
return psf_new.array + x
def __sub__(self, x):
psf_new = deepcopy(self)
return psf_new.array - x
def __rmul__(self, x):
return self.__mul__(x)
def __radd__(self, x):
return self.__add__(x)
def __rsub__(self, x):
psf_new = deepcopy(self)
return x - psf_new.array
def __imul__(self, x):
return self.__mul__(x)
def __iadd__(self, x):
return self.__add__(x)
def __isub__(self, x):
return self.__sub__(x)
# TODO this ought to be renamed (redefined-builtin)
[docs]def open(self, filename):
"""
Opens a saved ``Detector`` file.
** Not yet implemented **
** Should be moved outside of ``Detector`` and called with
``detector.open()`` **
Detector objects can be saved to FITS file and read back in for later
simulations.
Parameters
----------
filename : str
path to the FITS file where the ``Detector`` object is stored
Returns
-------
``simcado.Detector`` object
Examples
--------
"""
if not os.path.exists(filename):
raise FileNotFoundError(filename + " doesn't exist")
with fits.open(filename) as fp1:
self.params.update(fp1[0].header)
self.array = fp1[0].data
raise ValueError("Function not finished")
#def plot_detector_layout(detector, clr="g"):
# """Plot the detector layout. NOT FINISHED """
# try:
# import matplotlib.pyplot as plt
# except ImportError:
# raise ImportError("matplotlib can't be found")
#
# for i, chip in enumerate(detector.chips):
# plt.plot((chip.x_min, chip.x_max), (chip.y_min, chip.y_min), clr)
# plt.plot((chip.x_min, chip.x_max), (chip.y_max, chip.y_max), clr)
# plt.plot((chip.x_min, chip.x_min), (chip.y_min, chip.y_max), clr)
# plt.plot((chip.x_max, chip.x_max), (chip.y_min, chip.y_max), clr)
# plt.text(chip.x_cen, chip.y_cen, chip.id, fontsize=14)
# plt.xlabel("Distance [arcsec]", fontsize=14)
# plt.ylabel("Distance [arcsec]", fontsize=14)
[docs]def plot_detector_layout(detector, plane="sky", fmt='g-', plot_origin=False,
label=True, **kwargs):
"""Plot the detector layout
Parameters
==========
detector : simcado.Detector
The Detector to be shown
plane : 'sky' or 'fpa'
Plot detector layout on the sky (in arcsec) or in the focal plane (in mm)
fmt : matplotlib format string
plot_origin : bool
Mark position pixel (1,1) for each chip.
label : bool
Label the chips with their numberq
Keyword arguments
=================
Further keywords arguments are passed on to the matplotlib plot function
and can be used to set line style or colour of the detector outline.
"""
from matplotlib import pyplot as plt
npts = 101
for chip in detector.chips:
if plane == 'sky':
thewcs = chip.wcs
scale = 3600.
xlabel = 'RA offset (arcsec)'
ylabel = 'DE offset (arcsec)'
elif plane == 'fpa':
thewcs = chip.wcs_fp
scale = 1.
xlabel = 'x (mm)'
ylabel = 'y (mm)'
xrange = np.linspace(1, chip.naxis1, npts)
yrange = np.linspace(1, chip.naxis2, npts)
xpix = np.concatenate((xrange,
np.zeros(npts) + chip.naxis1,
xrange[::-1],
np.zeros(npts) + 1))
ypix = np.concatenate((np.zeros(npts) + 1,
yrange,
np.zeros(npts) + chip.naxis2,
yrange[::-1]))
xworld, yworld = thewcs.all_pix2world(xpix, ypix, 1)
xworld -= thewcs.wcs.crval[0]
yworld -= thewcs.wcs.crval[1]
plt.plot(xworld * scale, yworld * scale, fmt, **kwargs)
if plot_origin:
x0, y0 = thewcs.all_pix2world(1, 1, 1)
x0 -= thewcs.wcs.crval[0]
y0 -= thewcs.wcs.crval[1]
plt.plot(x0 * scale, y0 * scale, 'r.')
xcen, ycen = thewcs.all_pix2world(chip.naxis1 / 2, chip.naxis2 / 2, 1)
xcen -= thewcs.wcs.crval[0]
ycen -= thewcs.wcs.crval[1]
if label:
plt.text(xcen * scale, ycen * scale, chip.id)
plt.axes().set_aspect('equal')
if plane == 'sky':
plt.gca().invert_xaxis()
plt.xlabel(xlabel)
plt.ylabel(ylabel)
[docs]def plot_detector(detector):
"""
Plot the contents of a detector array
Parameters
----------
detector : simcado.Detector
The detector object to be shown
"""
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
plt.figure(figsize=(15*2, 13.75*2))
x0 = np.min([i.x_min for i in detector.chips])
x1 = np.max([i.x_max for i in detector.chips])
y0 = np.min([i.y_min for i in detector.chips])
y1 = np.max([i.y_max for i in detector.chips])
w = (detector.chips[0].x_max - detector.chips[0].x_min)/(x1 - x0)
h = (detector.chips[0].y_max - detector.chips[0].y_min)/(y1 - y0)
for chip in detector.chips:
ax1 = plt.axes([(chip.x_min - x0)/(x1 - x0), (chip.y_min - y0)/(y1 - y0),
w, h])
ax1.set_xticks([])
ax1.set_yticks([])
if chip.array is not None:
ax1.imshow(np.rot90(chip.array - np.min(chip.array)), norm=LogNorm(),
cmap="Greys", vmin=1)
plt.show()
def generate_hxrg_noise(cmds):
"""
Generate a read noise frame using a UserCommands object
Create a detector noise array using Bernhard Rauscher's NGHxRG tool
Parameters
----------
cmds : simcado.UserCommands
"""
#if len(kwargs) > 0 and self.verbose: print("updating ",kwargs)
#self.params.update(kwargs)
print("Generating a new chip noise array")
print(mp.current_process())
# HXRG needs a pca file to run. Work out what a PCA file means!!
ng_h4rg = HXRGNoise(naxis1=cmds["HXRG_NAXIS1"],
naxis2=cmds["HXRG_NAXIS2"],
naxis3=cmds["HXRG_NUM_NDRO"],
n_out=cmds["HXRG_NUM_OUTPUTS"],
nroh=cmds["HXRG_NUM_ROW_OH"],
pca0_file=cmds["HXRG_PCA0_FILENAME"],
verbose=cmds["SIM_VERBOSE"])
# Make a noise file
noise = ng_h4rg.mknoise(o_file=cmds["HXRG_OUTPUT_PATH"],
rd_noise=cmds["FPA_READOUT_MEDIAN"],
pedestal=cmds["HXRG_PEDESTAL"],
c_pink=cmds["HXRG_CORR_PINK"],
u_pink=cmds["HXRG_UNCORR_PINK"],
acn=cmds["HXRG_ALT_COL_NOISE"])
return noise
[docs]def make_noise_cube(num_layers=25, filename="FPA_noise.fits", multicore=True):
"""
Create a large noise cube with many separate readout frames.
Note:
Each frame takes about 15 seconds to be generated. The default value of
25 frames will take around six minutes depending on your computer's
architecture.
Parameters
----------
num_layers : int, optional
the number of separate readout frames to be generated. Default is 25.
filename : str, optional
The filename for the FITS cube. Default is "FPA_noise.fits"
multicore : bool, optional
If you're not using windows, this allows the process to use all
available cores on your machine to speed up the process. Default is True
Notes
-----
multicore doesn't work - fix it
"""
if sys.version_info.major < 3:
print("Sorry, but this only works in Python 3 and above. \
See the SimCADO FAQs for work-around options")
return None
cmds = commands.UserCommands()
cmds["FPA_NOISE_PATH"] = "generate"
cmds["FPA_CHIP_LAYOUT"] = "default"
if "Windows" in os.environ.get('OS', ''):
multicore = False
if __name__ == "__main__" and multicore:
pool = mp.Pool(processes=mp.cpu_count()-1)
frames = pool.map(generate_hxrg_noise, (cmds)*num_layers)
else:
frames = [generate_hxrg_noise(cmds) \
for i in range(num_layers)]
hdu = fits.HDUList([fits.PrimaryHDU(frames[0])] + \
[fits.ImageHDU(frames[i]) \
for i in range(1, num_layers)])
if filename is None:
return hdu
else:
hdu.writeto(filename, overwrite=True, checksum=True)
[docs]def install_noise_cube(n=9):
"""
Install a noise cube in the package directory
Parameters
----------
n : int, optional
number of layers.
Warning
-------
Each layer is ~64MB, default is 9 layers (~600MB). If you have less than
1 GB on the drive where your Python installation is. Be careful!
"""
if sys.version_info.major >= 3:
print("WARNING - this process can take up to 10 minutes. Fear not!")
hdu = make_noise_cube(n, filename=None)
filename = find_file("FPA_noise.fits")
hdu.writeto(filename, overwrite=True, checksum=True)
print("Saved noise cube with", n, "layers to the package directory:")
print(filename)
else:
print("Sorry, but this only works in Python 3 and above. \
See the SimCADO FAQs for work-around options")