"""Masking utilities for flat-sky patches and full-sky HEALPix maps.
Flat-sky helpers (formerly in ``preprocessing``):
get_peak_masks, inpaint_masked_regions, boundary_apod_mask,
get_mask_using_gaussian_fitting
AGORA MDPL2 cluster/point-source masks:
get_mdpl2_halo_cat, get_cluster_mask_radius,
get_point_source_mask_in_healpix, get_mdpl2_conversion_factors_K_to_MjyperSr,
apodize_binary_mask_prof, get_apodised_mdpl2_cluster_mask
"""
import healpy as hp
import numpy as np
from scipy import ndimage, optimize
# ---------------------------------------------------------------------------
# Flat-sky peak and boundary masking
# ---------------------------------------------------------------------------
[docs]
def get_peak_masks(
tmap,
mask_threshold_sigma_units=10,
mask_radius_pixel_units=0,
perform_apod=1,
mask_shape="circle",
taper_radius_fac=2.0,
):
"""Generate a sigma-clipping peak mask for a flat-sky map.
Parameters
----------
tmap : ndarray
Input flat-sky map.
mask_threshold_sigma_units : float
Sigma threshold above which pixels are masked.
mask_radius_pixel_units : float
Radius (in pixels) of the mask hole punched around each peak.
When 0, only the peak pixel itself is masked.
perform_apod : bool
Apodise the mask boundary.
mask_shape : {'circle', 'square'}
Shape of the mask hole.
taper_radius_fac : float
Apodisation taper radius as a multiple of *mask_radius_pixel_units*.
Returns
-------
peak_mask, mask : ndarray
Binary peak mask and (apodised) extended mask.
"""
assert mask_radius_pixel_units >= 0
assert mask_shape in ["circle", "square"]
peak_mask = np.ones(tmap.shape)
inds = np.where(abs(tmap) > abs(np.mean(tmap)) + mask_threshold_sigma_units * np.std(tmap))
peak_mask[inds] = 0.0
if mask_radius_pixel_units > 0:
x_grid, y_grid = np.indices(tmap.shape)
mask = np.ones(y_grid.shape)
for i, j in zip(inds[0], inds[1]):
y, x = x_grid[j, i], y_grid[j, i]
if mask_shape == "circle":
radius = np.sqrt((x - x_grid) ** 2.0 + (y - y_grid) ** 2.0)
inds_to_mask = np.where(radius <= mask_radius_pixel_units)
else:
inds_to_mask = np.where(
(abs(x - x_grid) < mask_radius_pixel_units)
& (abs(y - y_grid) < mask_radius_pixel_units)
)
mask[inds_to_mask[0], inds_to_mask[1]] = 0.0
taper_radius = mask_radius_pixel_units * taper_radius_fac
if perform_apod:
ker = np.hanning(taper_radius)
ker2d = np.asarray(np.sqrt(np.outer(ker, ker)))
mask = ndimage.convolve(mask, ker2d)
mask /= mask.max()
else:
mask = peak_mask
return peak_mask, mask
[docs]
def inpaint_masked_regions(hmap, mask, rng=None):
"""Replace masked pixels with Gaussian noise matching unmasked-region statistics.
Parameters
----------
hmap : ndarray, shape (npix,)
HEALPix (or flat-sky) map.
mask : ndarray, shape (npix,)
Mask array (1 = keep, 0 = replace).
rng : numpy.random.Generator, optional
Random number generator. A fresh ``default_rng()`` is used when *None*.
Returns
-------
ndarray
Copy of *hmap* with masked pixels replaced by Gaussian noise.
"""
if rng is None:
rng = np.random.default_rng()
hmap = hmap.copy()
unmasked = hmap[mask > 0.5]
mean = unmasked.mean()
std = unmasked.std()
masked_idx = np.where(mask < 0.5)[0]
hmap[masked_idx] = rng.normal(loc=mean, scale=std, size=len(masked_idx))
return hmap
[docs]
def boundary_apod_mask(
x_grid, y_grid, mask_radius, perform_apod=True, mask_shape="circle", taper_radius_fac=6.0
):
"""Create an apodised boundary mask on a 2D grid.
Parameters
----------
x_grid, y_grid : ndarray
Coordinate grids (e.g. RA, Dec) of the map.
mask_radius : float
Mask radius in the same units as *x_grid* and *y_grid*.
perform_apod : bool
Apodise the mask boundary.
mask_shape : {'circle', 'square'}
Shape of the masked region.
taper_radius_fac : float
Apodisation taper radius as a multiple of *mask_radius*.
Returns
-------
ndarray
Binary or apodised mask with the same shape as *x_grid*.
"""
assert mask_shape in ["circle", "square"]
mask = np.ones(y_grid.shape)
if mask_shape == "circle":
radius = np.sqrt(x_grid**2.0 + y_grid**2.0)
inds_to_mask = np.where(radius <= mask_radius)
else:
inds_to_mask = np.where((abs(x_grid) < mask_radius) & (abs(y_grid) < mask_radius))
mask[inds_to_mask[0], inds_to_mask[1]] = 0.0
if perform_apod:
taper_radius = mask_radius * taper_radius_fac
ker = np.hanning(taper_radius)
ker2d = np.asarray(np.sqrt(np.outer(ker, ker)))
mask = ndimage.convolve(mask, ker2d)
mask /= mask.max()
return mask
[docs]
def get_mask_using_gaussian_fitting(
nonpeak_mask,
mul_width_by_factor=2,
ini_height=0.0,
ini_amp=1.0,
ini_rot=0.0,
ini_blob_size_in_pixels=10.0,
use_elliptical_gaussian=False,
perform_apod=True,
):
"""Fit Gaussians to blobs in a binary mask and create a smooth mask.
Parameters
----------
nonpeak_mask : ndarray
Binary mask where 1 marks regions to be masked.
mul_width_by_factor : float
Multiply the fitted Gaussian width by this factor for the mask radius.
ini_height, ini_amp, ini_rot : float
Initial Gaussian parameters (baseline, amplitude, rotation).
ini_blob_size_in_pixels : float
Initial guess for the Gaussian width in pixels.
use_elliptical_gaussian : bool
Fit an elliptical (asymmetric) Gaussian if *True*.
perform_apod : bool
Apodise the final mask.
Returns
-------
ndarray
Final (possibly apodised) mask.
"""
from foregrounds_diffusion.statistics import fitting_func
ny, nx = nonpeak_mask.shape
x = y = np.arange(nx)
xgrid, ygrid = np.meshgrid(x, y)
wx = wy = ini_blob_size_in_pixels
non_zero_yinds, non_zero_xinds = np.where(nonpeak_mask == 1)
howmany = len(non_zero_yinds)
total_mask = np.zeros(nonpeak_mask.shape)
for cntr, (bx, by) in enumerate(zip(non_zero_xinds, non_zero_yinds)):
if not (cntr < 10 or cntr > howmany - 10):
continue
if use_elliptical_gaussian:
p0 = [ini_height, ini_amp, bx, by, wx, wy, ini_rot]
else:
p0 = [ini_height, ini_amp, bx, by, wx]
p1, _ = optimize.leastsq(fitting_func, p0[:], args=(p0, xgrid, ygrid, nonpeak_mask))
x_fit, y_fit, x_width = p1[2], p1[3], p1[4]
width_for_mask = x_width * mul_width_by_factor
rad_grid = np.hypot(xgrid - x_fit, ygrid - y_fit)
curr_mask = np.zeros(nonpeak_mask.shape)
curr_mask[rad_grid <= width_for_mask] = 1.0
total_mask += curr_mask
final_mask = np.ones(nonpeak_mask.shape)
final_mask[total_mask != 0] = 0.0
if perform_apod:
npix_cos = int((nx / 10.0))
ker = np.hanning(npix_cos)
ker2d = np.asarray(np.sqrt(np.outer(ker, ker)))
final_mask = ndimage.convolve(final_mask, ker2d)
final_mask /= final_mask.max()
return final_mask
# ---------------------------------------------------------------------------
# AGORA MDPL2 halo catalogue
# ---------------------------------------------------------------------------
[docs]
def get_mdpl2_halo_cat(halo_cat_fname, get_velocities=True):
"""Load the MDPL2 halo catalogue.
Parameters
----------
halo_cat_fname : str
Path to the halo catalogue file (.npy or .npz).
get_velocities : bool
If *True*, also return the three velocity components.
Returns
-------
tuple of ndarray
``(ra, dec, z, m200c, m500c)`` or
``(ra, dec, z, m200c, m500c, vlos, vtht, vphi)`` when
``get_velocities=True``.
"""
loaded = np.load(halo_cat_fname, allow_pickle=True)
if isinstance(loaded, np.lib.npyio.NpzFile):
mdpl2_ra = loaded["totra"]
mdpl2_dec = loaded["totdec"]
mdpl2_z = loaded["totz"]
mdpl2_m200c = loaded["totm200"]
mdpl2_m500c = loaded["totm500"]
mdpl2_vlos = loaded["totvlos"]
mdpl2_vtht = loaded["totvtht"]
mdpl2_vphi = loaded["totvphi"]
else:
# Legacy .npy format: columns packed as rows
cols = loaded.T
(
mdpl2_ra,
mdpl2_dec,
mdpl2_z,
mdpl2_m200c,
mdpl2_m500c,
mdpl2_vlos,
mdpl2_vtht,
mdpl2_vphi,
) = cols
if get_velocities:
return (
mdpl2_ra,
mdpl2_dec,
mdpl2_z,
mdpl2_m200c,
mdpl2_m500c,
mdpl2_vlos,
mdpl2_vtht,
mdpl2_vphi,
)
return mdpl2_ra, mdpl2_dec, mdpl2_z, mdpl2_m200c, mdpl2_m500c
# ---------------------------------------------------------------------------
# Cluster mask radius
# ---------------------------------------------------------------------------
[docs]
def get_cluster_mask_radius(m500c):
"""Return a mask radius in arcminutes based on cluster mass.
Parameters
----------
m500c : float
Cluster mass M_500c in solar masses.
Returns
-------
float
Mask radius in arcminutes.
"""
if m500c < 1e14:
return 3.0
elif m500c < 3e14:
return 5.0
elif m500c < 5e14:
return 8.0
else:
return 10.0
# ---------------------------------------------------------------------------
# Point-source mask
# ---------------------------------------------------------------------------
[docs]
def get_point_source_mask_in_healpix(
freq,
hmap_Mjy_per_sr,
threshold_mjy_freq0,
threshold2_mjy_freq0=None,
freq0=150.0,
spec_index=3.4,
full_sky=True,
ang_res_am=None,
return_flux_map_in_mjy=False,
):
"""Identify pixels containing point sources above a flux threshold.
Parameters
----------
freq : float
Observation frequency in GHz.
hmap_Mjy_per_sr : ndarray
HEALPix map in MJy/sr.
threshold_mjy_freq0 : float
Flux threshold in mJy at *freq0*.
threshold2_mjy_freq0 : float, optional
Upper flux threshold for band-pass masking.
freq0 : float
Reference frequency in GHz.
spec_index : float
Spectral index for frequency scaling.
full_sky : bool
If *True*, treat *hmap_Mjy_per_sr* as a full-sky HEALPix map and
compute pixel area from NSIDE. Otherwise supply *ang_res_am*.
ang_res_am : float, optional
Pixel angular resolution in arcminutes (required when ``full_sky=False``).
return_flux_map_in_mjy : bool
If *True*, also return the flux map in mJy.
Returns
-------
mask_pixels : ndarray of int
Pixel indices (or (row, col) index tuple for flat maps) to mask.
hmap_mjy : ndarray
Flux map in mJy (only returned when ``return_flux_map_in_mjy=True``).
"""
if full_sky:
nside = hp.get_nside(hmap_Mjy_per_sr)
pix_area = hp.nside2resol(nside) ** 2.0
else:
assert ang_res_am is not None
pix_area = np.radians(ang_res_am / 60.0) ** 2.0
hmap_mjy = np.copy(hmap_Mjy_per_sr) * pix_area * 1e9 # MJy/sr → mJy
scaling = (freq / freq0) ** spec_index
threshold_mjy = threshold_mjy_freq0 * scaling
if threshold2_mjy_freq0 is None:
mask_pixels = np.where(hmap_mjy >= threshold_mjy)
else:
threshold2 = threshold2_mjy_freq0 * scaling
mask_pixels = np.where((hmap_mjy >= threshold_mjy) & (hmap_mjy < threshold2))
if full_sky:
mask_pixels = mask_pixels[0]
if return_flux_map_in_mjy:
return mask_pixels, hmap_mjy
return mask_pixels
# ---------------------------------------------------------------------------
# Unit-conversion factors
# ---------------------------------------------------------------------------
[docs]
def get_mdpl2_conversion_factors_K_to_MjyperSr(expname, band):
"""Look up the K → MJy/sr conversion factor for a given experiment and band.
Parameters
----------
expname : str or None
Experiment name: ``'planck'``, ``'spt3g'``/``'spt'``/``'spt4'``,
``'cmbs4'``/``'s4wide'``/``'s4deep'``, or *None* for a generic set.
band : int
Band centre frequency in GHz.
Returns
-------
float
Conversion factor.
"""
planck = {100: 243.623, 143: 371.036, 217: 481.882, 353: 287.281, 545: 57.6963, 857: 2.26476}
spt = {
95: 208.973,
150: 375.876,
220: 472.522,
221: 473.332,
285: 414.977,
286: 414.977,
345: 310.827,
}
s4 = {145: 379.391 * 0.976, 155: 403.379 * 0.975}
generic = {95: 208.973, 150: 375.876, 220: 472.522, 285: 414.977, 345: 310.827}
if expname == "planck":
return planck[band]
elif expname in ("spt3g", "spt", "spt4"):
return spt[band]
elif expname in ("cmbs4", "s4wide", "s4deep"):
return s4[band]
else:
return generic[band]
# ---------------------------------------------------------------------------
# Apodisation
# ---------------------------------------------------------------------------
[docs]
def apodize_binary_mask_prof(binary_mask, dist_smooth_angle, apod_start_dist, apod_end_dist):
"""Apodise a binary HEALPix mask using a distance-based smooth profile.
The apodisation profile is ``(x − sin x) / 2π`` on ``[0, 2π]``, the
integral of a cosine-kernel cross-section.
Parameters
----------
binary_mask : ndarray
Full-sky HEALPix binary mask (1 = unmasked, 0 = masked).
dist_smooth_angle : float
FWHM of the Gaussian kernel applied to the distance map, in radians.
apod_start_dist : float
Distance below which pixels are set to 0, in radians.
apod_end_dist : float
Distance above which the profile is not applied, in radians.
Returns
-------
ndarray
Apodised mask.
"""
net_dist = apod_end_dist - apod_start_dist
dist_map = hp.dist2holes(binary_mask)
binary_mask[dist_map <= apod_start_dist] = 0.0
smooth_region = (dist_map > apod_start_dist) & (dist_map < apod_end_dist)
nside = hp.get_nside(binary_mask)
dist_map = hp.smoothing(dist_map, fwhm=dist_smooth_angle, lmax=nside)
smooth_mask = np.array(binary_mask)
x = (dist_map[smooth_region] - apod_start_dist) / net_dist * 2.0 * np.pi
del dist_map
smooth_mask[smooth_region] = (x - np.sin(x)) / (2.0 * np.pi)
smooth_mask *= binary_mask
smooth_mask = np.clip(smooth_mask, 0.0, 1.0)
del binary_mask
return smooth_mask
# ---------------------------------------------------------------------------
# Cluster mask
# ---------------------------------------------------------------------------
[docs]
def get_apodised_mdpl2_cluster_mask(
nside,
halo_cat_fname,
m500c_threshold=5e13,
cluster_lmz_dic=None,
howmanythetaforclusters=-1,
apodise=True,
expname=None,
):
"""Build an apodised HEALPix cluster mask from the MDPL2 halo catalogue.
Parameters
----------
nside : int
HEALPix NSIDE resolution.
halo_cat_fname : str
Path to the halo catalogue ``.npy`` file.
m500c_threshold : float
Minimum M_500c (in M_☉) for a cluster to be masked.
Pass ``-1`` to use the redshift-dependent mass limit from
*cluster_lmz_dic* instead.
cluster_lmz_dic : dict, optional
Redshift-dependent mass-limit dictionary (required when
``m500c_threshold=-1``). Must contain keys ``'redshift'`` and
``'M500c'``.
howmanythetaforclusters : float
If ``> 0``, compute the mask radius as this multiple of θ_500c.
Otherwise use :func:`get_cluster_mask_radius`.
apodise : bool
Apodise the binary mask.
expname : str, optional
Experiment name (used only when ``m500c_threshold=-1``).
Returns
-------
ndarray
Final apodised (or binary) full-sky cluster mask.
"""
import gc
from astropy.cosmology import FlatLambdaCDM
(mdpl2_ra, mdpl2_dec, mdpl2_z, mdpl2_m200c, mdpl2_m500c, mdpl2_vlos, mdpl2_vtht, mdpl2_vphi) = (
get_mdpl2_halo_cat(halo_cat_fname)
)
# --- select clusters to mask ---
if m500c_threshold != -1:
clus_inds = np.where(mdpl2_m500c >= m500c_threshold)[0]
else:
assert cluster_lmz_dic is not None and expname is not None
redshifts = cluster_lmz_dic["redshift"]
dz = np.diff(redshifts)[0]
lim_M500c = cluster_lmz_dic["M500c"] * 1e14
clus_inds = []
inds = np.where(mdpl2_z < redshifts[0])[0]
clus_inds.extend(inds[mdpl2_m500c[inds] > lim_M500c[0]])
for zcntr, zzz in enumerate(redshifts):
inds = np.where((mdpl2_z >= zzz) & (mdpl2_z < zzz + dz))[0]
passed = np.where(mdpl2_m500c[inds] > lim_M500c[zcntr])[0]
clus_inds.extend(inds[passed])
clus_inds = np.asarray(clus_inds)
print(f"\tTotal clusters to mask: {len(clus_inds)}")
# --- optionally compute θ_500c-based radii ---
if howmanythetaforclusters > 0:
try:
from colossus.cosmology import cosmology as colossus_cosmology
from colossus.halo import concentration, mass_defs
colossus_cosmology.setCosmology("planck15")
except ImportError:
raise ImportError("colossus is required for θ_500c-based masking.")
cosmo = FlatLambdaCDM(H0=67.74, Om0=0.3089)
cluster_mask_radius_am_arr = []
for cntr, iii in enumerate(clus_inds):
if cntr % 1000 == 0:
print(cntr)
c500c = concentration.concentration(mdpl2_m500c[iii], "500c", mdpl2_z[iii])
_, r500c, _ = mass_defs.changeMassDefinition(
mdpl2_m500c[iii], c500c, mdpl2_z[iii], "500c", "500c", profile="nfw"
)
r500c_mpc = r500c / 1e3
ang_dia_dist = cosmo.comoving_distance(mdpl2_z[iii]) / (1.0 + mdpl2_z[iii])
theta500c_am = np.degrees(r500c_mpc / ang_dia_dist.value) * 60.0
cluster_mask_radius_am_arr.append(int(theta500c_am * howmanythetaforclusters) + 1)
arr = np.asarray(cluster_mask_radius_am_arr, dtype=float)
arr_mod = np.zeros_like(arr)
arr_mod[arr <= 5.0] = 5.0
arr_mod[(arr > 5.0) & (arr <= 10.0)] = 8.0
arr_mod[(arr > 10.0) & (arr <= 20.0)] = 15.0
arr_mod[(arr > 20.0) & (arr <= 50.0)] = 35.0
arr_mod[(arr > 50.0) & (arr <= 100.0)] = 75.0
arr_mod[arr > 100.0] = 100.0
cluster_mask_radius_am_arr = arr_mod
# --- build combined binary mask ---
npix = hp.nside2npix(nside)
combined_mask = np.ones(npix, dtype=np.float32)
for cntr, iii in enumerate(clus_inds):
if cntr % 5000 == 0:
print(cntr)
ppp = hp.ang2pix(nside, np.radians(90.0 - mdpl2_dec[iii]), np.radians(mdpl2_ra[iii]))
if howmanythetaforclusters > 0:
r_am = cluster_mask_radius_am_arr[cntr]
else:
r_am = get_cluster_mask_radius(mdpl2_m500c[iii])
ivec = hp.pix2vec(nside, ppp)
disc = hp.query_disc(nside, ivec, np.deg2rad(r_am / 60.0))
combined_mask[disc] = 0.0
del mdpl2_ra, mdpl2_dec, mdpl2_z, mdpl2_m200c, mdpl2_m500c
del mdpl2_vlos, mdpl2_vtht, mdpl2_vphi, clus_inds
gc.collect()
print("Starting apodisation")
if apodise:
apod_fwhm_rad = np.radians(15.0 / 60.0) # 15 arcmin apodisation
final_hmask = hp.smoothing(combined_mask, fwhm=apod_fwhm_rad, lmax=2 * nside).astype(
np.float32
)
del combined_mask
gc.collect()
final_hmask = np.clip(final_hmask, 0.0, 1.0)
final_hmask /= final_hmask.max()
else:
final_hmask = combined_mask
return final_hmask