r"""
Luminosity function likelihood (:mod:`~horizonground.lumfunc_likelihood`)
===========================================================================
Fit tracer luminosity functions and produce model constraints.
Data processing
---------------
.. autosummary::
DataSourceError
DataSourceWarning
LumFuncMeasurements
Likelihood evaluation
---------------------
.. autosummary::
LumFuncLikelihood
|
"""
import warnings
from collections import OrderedDict
from inspect import signature
from itertools import compress as iterfilter
from itertools import product as iterproduct
import numpy as np
from .utils import process_header
[docs]class DataSourceError(IOError):
"""Raise an exception when the data source is not available or not in
the correct format.
"""
[docs]class DataSourceWarning(UserWarning):
"""Raise an exception when the data source does not appear to be
self-consistent.
"""
[docs]class LumFuncMeasurements:
"""Luminosity function measurements for a tracer sample.
Parameters
----------
measurement_file : str or :class:`pathlib.Path`
File path to luminosity function measurements.
uncertainty_file : str or :class:`pathlib.Path` or None, optional
File path to luminosity function uncertainties.
base10_log : bool, optional
If `True` (default), measurement values are converted to base-10
logarithms. String ``'lg_'`` is detected in file headers to
determine whether loaded measurements are already in base-10
logarithms.
Attributes
----------
luminosity_bins : list of float
Luminosity bin centres.
redshift_bins : list of float
Redshift bin centres.
redshift_labels : list of str
Redshift bin labels.
Notes
-----
Data files are assumed to be an array where rows correspond to
increasing luminosity bins and columns correspond to increasing
redshift bins. The first line should be a comment line for column
headings and the first column should be the luminosity bins. Redshift
bin labels are read from column headings as suffices in the
format e.g. "z_0.5_1.5". If symmetric uncertainties are given for
base-10 logarithmic luminosity function and conversion to linear values
is needed, the larger asymmetric uncertainty on the linear luminosity
function value is taken.
"""
def __init__(self, measurement_file, uncertainty_file=None,
base10_log=True):
self._lg_conversion = base10_log
self._measurement_source_path = measurement_file
self._uncertainty_source_path = uncertainty_file
self._load_source_files()
self._valid_data_points = ~np.isnan(self._measurements.flatten())
if self._uncertainties is not None:
self._valid_data_points &= ~np.isnan(self._uncertainties.flatten())
if not self._lg_conversion:
self._valid_data_points &= np.greater_equal(
np.abs(self._measurements.flatten()),
np.abs(self._uncertainties.flatten())
)
def __str__(self):
return "LuminosityFunctionMeasurements({})".format(",".join(
[
"measurement_source={}".format(self._measurement_source_path),
"uncertainty_source={}".format(self._uncertainty_source_path),
]
))
[docs] def __getitem__(self, z_key):
"""Get luminosity function measurements and uncertainties (if
available) for a specific redshift bin.
Parameters
----------
z_key: int, slice or str
Slice or integer index or string representing redshift
bins. If a string, the accepted format is e.g. ``z=1.0``.
Returns
-------
:class:`numpy.ndarray`, :class:`numpy.ndarray` or None
Measurements and uncertainties for the redshift bin(s).
"""
if isinstance(z_key, (int, slice)):
z_idx = z_key
else:
try:
z = float(str(z_key).replace(" ", "").lstrip("z="))
z_idx = np.where(np.isclose(self.redshift_bins, z))[0][0]
except (TypeError, ValueError):
raise KeyError(
"Non-existent redshift bin for '{}'.".format(z_key)
)
if self._uncertainties is not None:
return self._measurements[z_idx], self._uncertainties[z_idx]
return self._measurements[z_idx], None
[docs] def get_statistics(self):
"""Return the empirical luminosity function data measurements
and uncertainties as mean and variance statistics.
Returns
-------
data_mean : :class:`numpy.ndarray`
Luminosity function measurements as empirical mean.
data_variance : :class:`numpy.ndarray` or None
Luminosity function uncertainties as empirical variance,
if available.
Notes
-----
Data vectors are ordered by increasing redshift bins and
within the same redshift bin ordered by increasing luminosity.
"""
data_mean = self._measurements.flatten()[self._valid_data_points]
if self._uncertainties is not None:
data_variance = np.square(
self._uncertainties.flatten()[self._valid_data_points]
)
else:
data_variance = None
return data_mean, data_variance
def _load_source_files(self):
# Process measurements.
with open(self._measurement_source_path, 'r') as mfile:
mheadings = process_header(mfile.readline(), skipcols=1)
self.redshift_labels, self.redshift_bins = \
self._extract_redshift_bins(mheadings)
measurement_source = np.genfromtxt(
self._measurement_source_path, unpack=True
)
self.luminosity_bins = measurement_source[0]
measurement_array = measurement_source[1:]
if len(measurement_array) != len(mheadings):
raise DataSourceError(
"Measurements do not match the number of headings ."
)
if not self._lg_conversion:
for z_idx, col_name in enumerate(mheadings):
if 'lg_' in col_name:
measurement_array[z_idx] = 10 ** measurement_array[z_idx]
mheadings[z_idx] = col_name.replace("lg_", "")
self._measurements = measurement_array
# Process uncertainties.
if not self._uncertainty_source_path:
self._uncertainties = None
else:
with open(self._uncertainty_source_path, 'r') as ufile:
uheadings = process_header(ufile.readline(), skipcols=1)
uncertainty_source = np.genfromtxt(
self._uncertainty_source_path, unpack=True
)
luminosity_bin_matching_msg = (
"Luminosity bins in measurement and uncertainty files "
"do not match."
)
if len(self.luminosity_bins) != len(uncertainty_source[0]):
raise DataSourceError(luminosity_bin_matching_msg)
if not np.allclose(self.luminosity_bins, uncertainty_source[0]):
warnings.warn(luminosity_bin_matching_msg, DataSourceWarning)
uncertainty_array = uncertainty_source[1:].copy()
if np.shape(uncertainty_array) != np.shape(measurement_array):
raise DataSourceError(
"Uncertainty file data do not match measurement file data."
)
if not self._lg_conversion:
# Optimistic wing of assymetric uncertainties.
error_array = uncertainty_source[1:].copy()
for z_idx, col_name in enumerate(uheadings):
if 'lg_' in col_name:
error_array[z_idx] = measurement_array[z_idx] \
* (1 - 10 ** (- error_array[z_idx]))
# Larger asymetric uncertainty is taken.
uncertainty_array[z_idx] = measurement_array[z_idx] \
* (10 ** uncertainty_array[z_idx] - 1)
uheadings[z_idx] = col_name.replace("lg_", "")
self._errors = error_array
self._uncertainties = uncertainty_array
@staticmethod
def _extract_redshift_bins(headings):
bin_labels = sorted(set(map(
r"${}$".format,
[
column.split("z_")[-1].replace("_", "<z<")
for column in headings if 'z_' in column
]
)))
bin_centres = np.mean(
[
tuple(map(float, column.split("z_")[-1].split("_")))
for column in headings if 'z_' in column
],
axis=-1
)
return bin_labels, bin_centres
def _uniform_log_pdf(param_vals, param_ranges):
if len(param_ranges) != len(param_vals):
raise ValueError(
"Number of parameter ranges does not match "
"the number of parameters."
)
param_ranges = np.sort(np.atleast_2d(param_ranges), axis=-1)
if any(np.less(param_vals, param_ranges[:, 0])) \
or any(np.greater(param_vals, param_ranges[:, 1])):
return - np.inf
return 0.
def _normal_log_pdf(deviation_vector, covariance_matrix):
deviation_vector = np.asarray(deviation_vector)
if not all(np.isfinite(deviation_vector)):
return - np.inf
return - 1/2 * np.dot(
deviation_vector, np.linalg.solve(covariance_matrix, deviation_vector)
)
[docs]class LumFuncLikelihood(LumFuncMeasurements):
"""Luminosity function likelihood.
Notes
-----
The built-in likelihood distribution is a multivariate normal
approximation near the maximum point of the Poisson distribution that
describes the tracer number count in redshift and luminosity bins,
with different prescriptions of the luminosity function uncertainties
to the diagonal covariance matrix of the multivariate normal
distribution (see appendix B of [1]_). The built-in prior
distribution is multivariate uniform.
.. [1] Pozzetti L. et al., 2016. A&A 590, A3.
[arXiv: `1603.01453 <https://arxiv.org/abs/1603.01453>`_]
Parameters
----------
model_lumfunc : callable
Luminosity function model. Must return base-10 logarithmic values
or accept `base10_log` boolean keyword argument.
measurement_file : str or :class:`pathlib.Path`
Luminosity function measurement file path.
prior_file : str or :class:`pathlib.Path`
Luminosity function model prior file path. The prior parameter
values (parameter ranges) may not matter, but the parameter names
provided in the file do.
model_constraint : callable or None, optional
Additional model constraint(s) to be imposed on model parameters
as a prior (default is `None`).
uncertainty_file : str or :class:`pathlib.Path` or None, optional
Luminosity function uncertainty file path (default is `None`).
Ignored if `data_covariance` is provided.
fixed_file : str or :class:`pathlib.Path` or None, optional
Luminosity function model fixed parameter file path (default is
`None`). This covers any model parameter(s) not included in
the prior.
prescription : {'native', 'poisson', 'symlg', 'symlin'}, str, optional
Gaussian likelihood approximation prescription (default is
'poisson').
model_options : dict or None, optional
Additional parameters passed to the `model_lumfunc` for model
evaluation (default is `None`). This should not contain
parametric luminosity function model parameters but only Python
function implementation optional parameters (e.g.
``redshift_pivot=2.2`` for
:func:`~horizonground.lumfunc_modeller.quasar_PLE_lumfunc`).
covariance_matrix : float array_like or None, optional
Covariance matrix for the multivariate normal likelihood
approximation (default is `None`). Its dimensions must match the
length of the data vector flattened by redshift and
luminosity bins.
Attributes
----------
data_points : list of float
A vector of (luminosity, redshift) points for each valid
luminosity function measurement.
data_vector : float :class:`numpy.ndarray`
Flattened luminosity function measurements for valid data points.
prior, fixed : :class:`collections.OrderedDict` or None
Ordered dictionary of varied prior parameter names and values or
fixed parameter names and values. The parameters are ordered
as in the input files, so that the ordering of luminosity function
arguments is consistent.
"""
def __init__(self, model_lumfunc, measurement_file, prior_file,
uncertainty_file=None, fixed_file=None,
prescription='poisson', model_constraint=None,
model_options=None, covariance_matrix=None):
# Set up likelihood treatment.
self._prior_source_path = prior_file
self._fixed_source_path = fixed_file
self.prior, self.fixed = self._setup_prior()
self._presciption = prescription
if self._presciption in ['native']:
self._lg_conversion = False
elif self._presciption in ['poisson', 'symlg', 'symlin']:
self._lg_conversion = True
else:
raise ValueError(
f"Invalid Gaussian likelihood prescription: {prescription}."
)
# Set up model evaluation.
self._model_lumfunc = model_lumfunc
self._model_constraint = model_constraint
self._model_options = model_options or {}
if 'base10_log' in signature(self._model_lumfunc).parameters:
self._model_options.update({'base10_log': self._lg_conversion})
# Set up data processing.
super().__init__(
measurement_file, uncertainty_file=uncertainty_file,
base10_log=self._lg_conversion
)
self.data_points = self._setup_data_points()
self.data_vector, self._covariance = \
self._get_moments(external_covariance=covariance_matrix)
def __str__(self):
return "LuminosityFunctionLikelihood({})".format(",".join(
[
"LF_model={}".format(self._model_lumfunc.__name__),
"measurement_source={}".format(self._measurement_source_path),
"uncertainty_source={}".format(self._uncertainty_source_path),
"prior_source={}".format(self._prior_source_path),
"fixed_source={}".format(self._fixed_source_path),
"likelihood_approximant={}".format(self._presciption),
]
))
[docs] def __call__(self, param_point, use_prior=False):
"""Evaluate the logarithmic likelihood at the model parameter
point.
Parameters
----------
param_point : float array_like
:attr:`model_lumfunc` parametric model parameters ordered in
the same way as the prior parameters.
use_prior : bool, optional
If `True` (default is `False`), use the user-input prior range.
Returns
-------
float
Logarithmic likelihood value.
"""
if isinstance(param_point, np.ndarray):
param_point = param_point.tolist()
else:
param_point = list(param_point)
# Check for prior.
if use_prior:
log_prior = _uniform_log_pdf(
np.reshape(param_point, -1), list(self.prior.values())
)
else:
log_prior = 0.
if not np.isfinite(log_prior):
return - np.inf
# Check for model constraint.
model_params = OrderedDict(zip(list(self.prior.keys()), param_point))
if self.fixed is not None:
model_params.update(self.fixed)
try:
within_constraint = self._model_constraint(model_params)
except TypeError:
within_constraint = True
if not within_constraint:
return - np.inf
# Pass full parameter scope to the luminosity function model.
model_params.update(self._model_options)
model_vector = [
self._model_lumfunc(*data_point, **model_params)
for data_point in self.data_points
]
# Form Gaussian approximant likelihood deviation vector.
deviation_vector = np.subtract(model_vector, self.data_vector)
covariance_matrix = self._covariance
if self._presciption == 'poisson':
deviation_vector = np.sqrt(
2 * np.abs(np.exp(deviation_vector) - 1 - deviation_vector)
)
elif self._presciption == 'symlin':
deviation_vector = np.exp(deviation_vector) - 1
log_likelihood = _normal_log_pdf(deviation_vector, covariance_matrix)
return log_prior + log_likelihood
def _setup_data_points(self):
# The primary axis is redshift, secondary luminosity.
_data_points = list(map(
lambda tup: tuple(reversed(tup)),
iterproduct(self.redshift_bins, self.luminosity_bins)
))
return list(iterfilter(_data_points, self._valid_data_points))
def _setup_prior(self):
with open(self._prior_source_path, 'r') as pfile:
parameter_names = process_header(pfile.readline())
prior_data = np.genfromtxt(self._prior_source_path, unpack=True)
prior_ranges = list(map(tuple, prior_data))
prior = OrderedDict(zip(parameter_names, prior_ranges))
if self._fixed_source_path is None:
fixed = None
else:
with open(self._fixed_source_path, 'r') as ffile:
fixed_names = process_header(ffile.readline())
fixed_values = np.genfromtxt(self._fixed_source_path, unpack=True)
fixed = OrderedDict(zip(fixed_names, fixed_values))
return prior, fixed
def _get_moments(self, external_covariance=None):
data_mean, data_var = self.get_statistics()
if external_covariance is not None:
data_covar = np.squeeze(external_covariance)
if len(set(np.shape(data_covar))) > 1 \
or len(data_covar) != len(self.data_points):
raise ValueError(
"`covariance_matrix` dimensions do not match data points: "
"({:d}, {:d}) versus {:d}."
.format(*np.shape(data_covar), len(self.data_points))
)
return data_mean, data_covar
if data_var is None:
raise ValueError(
"Either `uncertainty_file` or `covariance_matrix` must be "
"provided for setting the likelihood covariance matrix."
)
return data_mean, np.diag(data_var)