13 — Profiling and Benchmarking Baseline

Phase 2 baseline measurements for the evaluation pipeline functions. Sweeps: N (maps), H (pixel side), T (thresholds), B (ℓ-bands). Pre-optimisation timings on the local machine (CPU, single core).

1 Setup

[ ]:
import cProfile, pstats, tracemalloc, timeit, os, sys
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import linregress

sys.path.insert(0, os.path.join(os.getcwd(), '..', '..'))

from foregrounds_diffusion.flatmaps import (
    get_lpf_hpf, make_gaussian_realisation, map2cl,
)
from foregrounds_diffusion.moments import compute_cross_moments, compute_summed_moments
from foregrounds_diffusion.morphology import compute_minkowski_tensors
from foregrounds_diffusion.peak_counts import compute_peak_minima_counts

%matplotlib inline
plt.rcParams.update({'figure.dpi': 100, 'font.size': 10})

[ ]:
def profile_fn(fn, *args, n_repeat=3, **kwargs):
    """Return median wall-clock time and peak memory for one call of fn.""""
    times = timeit.repeat(lambda: fn(*args, **kwargs), number=1, repeat=n_repeat)
    tracemalloc.start()
    fn(*args, **kwargs)
    _, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    return {
        'time_median_s': float(sorted(times)[n_repeat // 2]),
        'time_min_s':    float(min(times)),
        'peak_mem_mb':   peak / 1024 ** 2,
    }

def sweep(fn, dim_values, make_args, n_repeat=3, label='N', verbose=True):
    """Sweep a single input dimension; return (times, peak_mems) arrays.""""
    times, mems = [], []
    for val in dim_values:
        args, kwargs = make_args(val)
        r = profile_fn(fn, *args, n_repeat=n_repeat, **kwargs)
        times.append(r['time_median_s'])
        mems.append(r['peak_mem_mb'])
        if verbose:
            print(f'  {label}={val:>5}  {r["time_median_s"]:.4f}s  {r["peak_mem_mb"]:.1f} MB')
    return np.array(times), np.array(mems)

def fit_slope(xs, ys):
    """Fit log-log power-law slope.""""
    xs, ys = np.array(xs, float), np.array(ys, float)
    mask = ys > 0
    if mask.sum() < 2:
        return float('nan')
    slope, *_ = linregress(np.log(xs[mask]), np.log(ys[mask]))
    return slope

[ ]:
_EL = np.arange(1, 5000)
_CL = 1e-5 * _EL.astype(float) ** (-2)

def make_maps(N, H):
    np.random.seed(42)
    params = [H, H, 1.40625, 1.40625]
    return (
        np.array([make_gaussian_realisation(params, _EL, _CL) for _ in range(N)]),
        params,
    )

def make_bp_filters(params, B):
    edges = np.linspace(200, 7000, B + 1)
    return [get_lpf_hpf(params, (edges[i], edges[i+1]), filter_type=2) for i in range(B)]

N_VALUES = [1, 5, 10, 50, 100]
H_VALUES = [32, 64, 128, 256]
T_VALUES = [5, 10, 25, 50, 100]
B_VALUES = [2, 4, 8, 16]

H_FIXED, T_FIXED, B_FIXED, N_FIXED = 64, 25, 4, 10
flatskymapparams = [H_FIXED, H_FIXED, 1.40625, 1.40625]
thresholds = np.linspace(-2, 2, T_FIXED)
bp_fixed = make_bp_filters(flatskymapparams, B_FIXED)
thresh_peaks  = np.linspace(-1, 5, 20)
thresh_minima = np.linspace(-5, 1, 20)

maps_cache = {N: make_maps(N, H_FIXED)[0] for N in N_VALUES}
maps10, _ = make_maps(N_FIXED, H_FIXED)
print('Fixtures ready.')

2 Baseline measurements (pre-optimisation)

2a compute_minkowski_tensors

[ ]:
print('N-scaling (H=64, T=25)')
mink_N_times, mink_N_mems = sweep(
    compute_minkowski_tensors, N_VALUES,
    lambda N: ((maps_cache[N], lambda x: x, thresholds), {}), label='N')

print()
print('H-scaling (N=10, T=25)')
mink_H_times, mink_H_mems = [], []
for H in H_VALUES:
    maps, params = make_maps(N_FIXED, H)
    r = profile_fn(compute_minkowski_tensors, maps, lambda x: x, np.linspace(-2,2,T_FIXED))
    mink_H_times.append(r['time_median_s']); mink_H_mems.append(r['peak_mem_mb'])
    print(f'  H={H:>4}  {r["time_median_s"]:.4f}s  {r["peak_mem_mb"]:.1f} MB')
mink_H_times, mink_H_mems = np.array(mink_H_times), np.array(mink_H_mems)

print()
print('T-scaling (N=10, H=64)')
mink_T_times, mink_T_mems = sweep(
    compute_minkowski_tensors, T_VALUES,
    lambda T: ((maps10, lambda x: x, np.linspace(-2,2,T)), {}), label='T')

print(f'\nN-slope={fit_slope(N_VALUES,mink_N_times):.2f}  '
      f'H-slope={fit_slope(H_VALUES,mink_H_times):.2f}  '
      f'T-slope={fit_slope(T_VALUES,mink_T_times):.2f}')

2b compute_cross_moments

[ ]:
print('N-scaling (H=64, B=4)')
cross_N_times, cross_N_mems = sweep(
    compute_cross_moments, N_VALUES,
    lambda N: ((maps_cache[N], maps_cache[N], bp_fixed), {}), label='N')

print()
print('B-scaling (N=10, H=64)')
cross_B_times, cross_B_mems = sweep(
    compute_cross_moments, B_VALUES,
    lambda B: ((maps10, maps10, make_bp_filters(flatskymapparams, B)), {}), label='B')

print(f'\nN-slope={fit_slope(N_VALUES,cross_N_times):.2f}  '
      f'B-slope={fit_slope(B_VALUES,cross_B_times):.2f}')

2c map2cl (per-map loop)

[ ]:
print('N-scaling (H=64)')
map2cl_N_times, map2cl_N_mems = [], []
for N in N_VALUES:
    maps = maps_cache[N]
    r = profile_fn(lambda maps=maps: [map2cl(flatskymapparams, m) for m in maps])
    map2cl_N_times.append(r['time_median_s']); map2cl_N_mems.append(r['peak_mem_mb'])
    print(f'  N={N:>5}  {r["time_median_s"]:.4f}s  {r["peak_mem_mb"]:.1f} MB')
map2cl_N_times, map2cl_N_mems = np.array(map2cl_N_times), np.array(map2cl_N_mems)

print()
print('H-scaling (N=10)')
map2cl_H_times, map2cl_H_mems = [], []
for H in H_VALUES:
    maps, params = make_maps(N_FIXED, H)
    r = profile_fn(lambda maps=maps, p=params: [map2cl(p, m) for m in maps])
    map2cl_H_times.append(r['time_median_s']); map2cl_H_mems.append(r['peak_mem_mb'])
    print(f'  H={H:>4}  {r["time_median_s"]:.4f}s  {r["peak_mem_mb"]:.1f} MB')
map2cl_H_times, map2cl_H_mems = np.array(map2cl_H_times), np.array(map2cl_H_mems)

print(f'\nN-slope={fit_slope(N_VALUES,map2cl_N_times):.2f}  '
      f'H-slope={fit_slope(H_VALUES,map2cl_H_times):.2f}')

2d compute_peak_minima_counts

[ ]:
print('N-scaling (H=64)')
peak_N_times, peak_N_mems = sweep(
    compute_peak_minima_counts, N_VALUES,
    lambda N: ((maps_cache[N], thresh_peaks, thresh_minima), {}), label='N')

print(f'\nN-slope={fit_slope(N_VALUES,peak_N_times):.2f}')

3 Figures (pre-optimisation)

[ ]:
# Figure 1 — Wall-clock time vs N (log-log), one panel per function
fig, ax = plt.subplots(figsize=(7, 4))
for label, times in [
    ('compute_minkowski_tensors', mink_N_times),
    ('compute_cross_moments',     cross_N_times),
    ('map2cl (loop)',             map2cl_N_times),
    ('compute_peak_minima_counts', peak_N_times),
]:
    ax.loglog(N_VALUES, times, marker='o', label=label)
# Reference: ideal linear
ax.loglog(N_VALUES, np.array(N_VALUES, float) / N_VALUES[0] * mink_N_times[0],
          'k--', lw=0.8, label='O(N) reference')
ax.set_xlabel('N (number of maps)')
ax.set_ylabel('Wall-clock time (s)')
ax.set_title('Figure 1 — Time vs N (H=64, T=25, B=4)')
ax.legend(fontsize=8)
fig.tight_layout()
os.makedirs('../../plots/benchmarks', exist_ok=True)
fig.savefig('../../plots/benchmarks/fig1_time_vs_N.png', dpi=120)
plt.show()

[ ]:
# Figure 2 — Wall-clock time vs map size H×W (log-log)
fig, ax = plt.subplots(figsize=(6, 4))
ax.loglog(H_VALUES, mink_H_times,  marker='s', label='compute_minkowski_tensors')
ax.loglog(H_VALUES, map2cl_H_times, marker='^', label='map2cl (N=10 loop)')
# O(H²) and O(H² log H) references
h = np.array(H_VALUES, float)
ax.loglog(h, h**2 / h[0]**2 * mink_H_times[0], 'k--', lw=0.8, label='O(H²) ref')
ax.set_xlabel('Map side length H (pixels)')
ax.set_ylabel('Wall-clock time (s)')
ax.set_title('Figure 2 — Time vs H (N=10)')
ax.legend(fontsize=8)
fig.tight_layout()
fig.savefig('../../plots/benchmarks/fig2_time_vs_H.png', dpi=120)
plt.show()

[ ]:
# Figure 3 — Peak memory vs N
fig, ax = plt.subplots(figsize=(7, 4))
for label, mems in [
    ('compute_minkowski_tensors', mink_N_mems),
    ('compute_cross_moments',     cross_N_mems),
    ('map2cl',                    map2cl_N_mems),
]:
    ax.plot(N_VALUES, mems, marker='o', label=label)
ax.set_xlabel('N (number of maps)')
ax.set_ylabel('Peak memory (MB)')
ax.set_title('Figure 3 — Peak memory vs N (H=64)')
ax.legend(fontsize=8)
fig.tight_layout()
fig.savefig('../../plots/benchmarks/fig3_mem_vs_N.png', dpi=120)
plt.show()

7 Scaling law summary table (pre-optimisation)

[ ]:
import pandas as pd

rows = [
    ('compute_minkowski_tensors',
     fit_slope(N_VALUES, mink_N_times), fit_slope(H_VALUES, mink_H_times),
     fit_slope(T_VALUES, mink_T_times), '—',
     mink_N_times[-1], '—'),
    ('compute_cross_moments',
     fit_slope(N_VALUES, cross_N_times), '—', '—',
     fit_slope(B_VALUES, cross_B_times),
     cross_N_times[-1], '—'),
    ('map2cl (loop)',
     fit_slope(N_VALUES, map2cl_N_times), fit_slope(H_VALUES, map2cl_H_times),
     '—', '—',
     map2cl_N_times[-1], '—'),
    ('compute_peak_minima_counts',
     fit_slope(N_VALUES, peak_N_times), '—', '—', '—',
     peak_N_times[-1], '—'),
]

df = pd.DataFrame(rows, columns=[
    'Function', 'N-slope', 'H-slope', 'T-slope', 'B-slope',
    'Time @N=100,H=64 (s)', 'Post-opt time (s)'
])
df = df.set_index('Function')
print(df.to_string())

4 Optimisations applied

To be filled in after §2.6 optimisations are implemented.

5 Post-optimisation measurements

To be filled in after §2.6 optimisations are implemented.

6 Before/after comparison figures

To be filled in after §2.6 optimisations are implemented.

8 Parallel scaling (§3.2)

Strong-scaling benchmark: fixed total work (N=50 maps, H=64²), varying worker count. All functions now accept n_jobs — see §3.2 implementation notes.

[ ]:
from multiprocessing import cpu_count
from foregrounds_diffusion.moments import compute_summed_moments, compute_cross_moments
from foregrounds_diffusion.peak_counts import compute_peak_minima_counts
from foregrounds_diffusion.morphology import compute_minkowski_tensors

N_PARALLEL = 50
MAX_WORKERS = min(cpu_count(), 8)
N_WORKERS = [w for w in [1, 2, 4, 8] if w <= MAX_WORKERS]
print(f'Logical CPUs available: {cpu_count()}, sweeping: {N_WORKERS}')

maps50, _ = make_maps(N_PARALLEL, H_FIXED)
thresholds_p = np.linspace(-2, 4, 15)
thresholds_m = np.linspace(-4, 2, 15)
bp4 = make_bp_filters(flatskymapparams, B_FIXED)

[ ]:
# Strong scaling — time vs n_workers for three functions
scale_fns = {
    'compute_minkowski_tensors': lambda nj: compute_minkowski_tensors(
        maps50, lambda x: x, thresholds, n_jobs=nj),
    'compute_cross_moments': lambda nj: compute_cross_moments(
        maps50, maps50, bp4, n_jobs=nj),
    'compute_peak_minima_counts': lambda nj: compute_peak_minima_counts(
        maps50, thresholds_p, thresholds_m,
        smoothing_scales_arcmin=(2.5,), n_jobs=nj),
}

scale_times = {fn: [] for fn in scale_fns}
for fn_name, fn in scale_fns.items():
    print(f'\n{fn_name}')
    for nj in N_WORKERS:
        r = profile_fn(fn, nj)
        scale_times[fn_name].append(r['time_median_s'])
        eff = scale_times[fn_name][0] / (nj * r['time_median_s']) * 100
        print(f'  n_jobs={nj:2d}  {r["time_median_s"]:.3f}s  eff={eff:.0f}%')

[ ]:
# Figure 11 — Strong scaling: wall-clock time vs n_workers
fig, ax = plt.subplots(figsize=(6, 4))

colors_scale = [plt.rcParams['axes.prop_cycle'].by_key()['color'][i] for i in range(3)]
for (fn_name, times), color in zip(scale_times.items(), colors_scale):
    t1 = times[0]
    ax.plot(N_WORKERS, times, 'o-', color=color,
            label=fn_name.replace('_', '\\_') if False else fn_name)

# Ideal linear speedup reference (dashed)
t1_ref = max(t[0] for t in scale_times.values())
ideal = [t1_ref / nj for nj in N_WORKERS]
ax.plot(N_WORKERS, ideal, 'k--', lw=0.8, label='Ideal (linear)')

ax.set_xlabel('Number of workers (n_jobs)')
ax.set_ylabel('Wall-clock time (s)')
ax.set_title(f'Strong scaling — N={N_PARALLEL} maps, H={H_FIXED}²')
ax.legend(fontsize=7)
ax.set_xticks(N_WORKERS)
plt.tight_layout()
plt.savefig('plots/benchmarks/fig11_strong_scaling.pdf')
plt.savefig('plots/benchmarks/fig11_strong_scaling.png', dpi=150)
plt.show()

[ ]:
# Strong-scaling summary table
rows = []
for fn_name, times in scale_times.items():
    t1 = times[0]
    row = {'Function': fn_name, 'Serial (s)': f'{t1:.3f}'}
    for nj, t in zip(N_WORKERS[1:], times[1:]):
        eff = t1 / (nj * t) * 100
        row[f'{nj} workers'] = f'{t:.3f}s ({eff:.0f}%)'
    rows.append(row)

import pandas as pd
pd.set_option('display.max_colwidth', 40)
pd.DataFrame(rows).set_index('Function')