summaryrefslogtreecommitdiff
path: root/subprojects/pixpat/pixpat-native/src/threading.h
blob: 5e7fc01cd08a5d456dce850a27b229aebb5649a4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#pragma once

#include <unistd.h>

#include <cassert>
#include <cstddef>
#include <exception>
#include <thread>
#include <vector>

namespace pixpat
{

inline unsigned default_thread_count()
{
	long n = sysconf(_SC_NPROCESSORS_ONLN);
	if (n < 1)
		return 1;
	// Cap to keep per-stripe work meaningful and avoid heavy
	// oversubscription on large NUMA hosts.
	if (n > 16)
		n = 16;
	return static_cast<unsigned>(n);
}

/*
 * Run `fn(start_y, end_y)` over `[0, height)` partitioned into stripes
 * aligned to `v_sub`. Half-open ranges, matching the `for (by = 0;
 * by < H; by += bh)` block-loop style.
 *
 * `fn` must be callable as `void(size_t start_y, size_t end_y)` and is
 * invoked concurrently from multiple threads — it must be safe to call
 * with disjoint Y-ranges in parallel. Exceptions thrown from a worker
 * are captured and the first (by stripe index) is rethrown after all
 * workers join.
 *
 * When `n_threads <= 1`, `fn` is called inline on the calling thread —
 * no `std::thread` is spawned, no allocation occurs.
 */
template<typename F>
void run_stripes(size_t height, unsigned v_sub, unsigned n_threads, F&& fn)
{
	if (height == 0 || v_sub == 0)
		return;

	// Callers (pixpat_convert / pixpat_draw_pattern) validate divisibility
	// at the entry point.
	assert(height % v_sub == 0);

	const size_t max_useful = height / v_sub;
	if (n_threads == 0)
		n_threads = 1;
	if (static_cast<size_t>(n_threads) > max_useful)
		n_threads = static_cast<unsigned>(max_useful);

	if (n_threads <= 1) {
		fn(size_t{ 0 }, height);
		return;
	}

	// Stripe height rounded up to v_sub; last stripe absorbs the
	// remainder.
	size_t part_height = (height + n_threads - 1) / n_threads;
	part_height = (part_height + v_sub - 1) / v_sub * v_sub;

	std::vector<std::exception_ptr> errors(n_threads);
	std::vector<std::thread> workers;
	workers.reserve(n_threads);

	for (unsigned i = 0; i < n_threads; i++) {
		size_t start = i * part_height;
		if (start >= height)
			break;
		size_t end = start + part_height;
		if (i == n_threads - 1 || end > height)
			end = height;

		workers.emplace_back([&, i, start, end] {
				try {
					fn(start, end);
				} catch (...) {
					errors[i] = std::current_exception();
				}
			});
	}

	for (auto& t : workers)
		t.join();

	for (auto& e : errors)
		if (e)
			std::rethrow_exception(e);
}

} // namespace pixpat