percpu-refcount.c 12.4 KB
Newer Older
1 2 3
#define pr_fmt(fmt) "%s: " fmt "\n", __func__

#include <linux/kernel.h>
4 5
#include <linux/sched.h>
#include <linux/wait.h>
6 7 8 9 10 11 12 13 14
#include <linux/percpu-refcount.h>

/*
 * Initially, a percpu refcount is just a set of percpu counters. Initially, we
 * don't try to detect the ref hitting 0 - which means that get/put can just
 * increment or decrement the local counter. Note that the counter on a
 * particular cpu can (and will) wrap - this is fine, when we go to shutdown the
 * percpu counters will all sum to the correct value
 *
15
 * (More precisely: because modular arithmetic is commutative the sum of all the
16 17
 * percpu_count vars will be equal to what it would have been if all the gets
 * and puts were done to a single integer, even if some of the percpu integers
18 19 20 21 22 23 24 25 26 27 28 29
 * overflow or underflow).
 *
 * The real trick to implementing percpu refcounts is shutdown. We can't detect
 * the ref hitting 0 on every put - this would require global synchronization
 * and defeat the whole purpose of using percpu refs.
 *
 * What we do is require the user to keep track of the initial refcount; we know
 * the ref can't hit 0 before the user drops the initial ref, so as long as we
 * convert to non percpu mode before the initial ref is dropped everything
 * works.
 *
 * Converting to non percpu mode is done with some RCUish stuff in
30 31
 * percpu_ref_kill. Additionally, we need a bias value so that the
 * atomic_long_t can't hit 0 before we've added up all the percpu refs.
32 33
 */

34
#define PERCPU_COUNT_BIAS	(1LU << (BITS_PER_LONG - 1))
35

36
static DEFINE_SPINLOCK(percpu_ref_switch_lock);
37 38
static DECLARE_WAIT_QUEUE_HEAD(percpu_ref_switch_waitq);

39
static unsigned long __percpu *percpu_count_ptr(struct percpu_ref *ref)
40
{
41
	return (unsigned long __percpu *)
Tejun Heo's avatar
Tejun Heo committed
42
		(ref->percpu_count_ptr & ~__PERCPU_REF_ATOMIC_DEAD);
43 44
}

45 46
/**
 * percpu_ref_init - initialize a percpu refcount
47 48
 * @ref: percpu_ref to initialize
 * @release: function which will be called when refcount hits 0
49
 * @flags: PERCPU_REF_INIT_* flags
50
 * @gfp: allocation mask to use
51
 *
52 53 54
 * Initializes @ref.  If @flags is zero, @ref starts in percpu mode with a
 * refcount of 1; analagous to atomic_long_set(ref, 1).  See the
 * definitions of PERCPU_REF_INIT_* flags for flag behaviors.
55 56 57 58
 *
 * Note that @release must not sleep - it may potentially be called from RCU
 * callback context by percpu_ref_kill().
 */
59
int percpu_ref_init(struct percpu_ref *ref, percpu_ref_func_t *release,
60
		    unsigned int flags, gfp_t gfp)
61
{
Tejun Heo's avatar
Tejun Heo committed
62 63
	size_t align = max_t(size_t, 1 << __PERCPU_REF_FLAG_BITS,
			     __alignof__(unsigned long));
64
	unsigned long start_count = 0;
65

Tejun Heo's avatar
Tejun Heo committed
66 67
	ref->percpu_count_ptr = (unsigned long)
		__alloc_percpu_gfp(sizeof(unsigned long), align, gfp);
68
	if (!ref->percpu_count_ptr)
69 70
		return -ENOMEM;

71 72
	ref->force_atomic = flags & PERCPU_REF_INIT_ATOMIC;

73 74 75 76 77 78 79 80 81 82 83 84
	if (flags & (PERCPU_REF_INIT_ATOMIC | PERCPU_REF_INIT_DEAD))
		ref->percpu_count_ptr |= __PERCPU_REF_ATOMIC;
	else
		start_count += PERCPU_COUNT_BIAS;

	if (flags & PERCPU_REF_INIT_DEAD)
		ref->percpu_count_ptr |= __PERCPU_REF_DEAD;
	else
		start_count++;

	atomic_long_set(&ref->count, start_count);

85
	ref->release = release;
86
	ref->confirm_switch = NULL;
87 88
	return 0;
}
89
EXPORT_SYMBOL_GPL(percpu_ref_init);
90

91
/**
92 93
 * percpu_ref_exit - undo percpu_ref_init()
 * @ref: percpu_ref to exit
94
 *
95 96 97 98 99
 * This function exits @ref.  The caller is responsible for ensuring that
 * @ref is no longer in active use.  The usual places to invoke this
 * function from are the @ref->release() callback or in init failure path
 * where percpu_ref_init() succeeded but other parts of the initialization
 * of the embedding object failed.
100
 */
101
void percpu_ref_exit(struct percpu_ref *ref)
102
{
103
	unsigned long __percpu *percpu_count = percpu_count_ptr(ref);
104

105
	if (percpu_count) {
106 107
		/* non-NULL confirm_switch indicates switching in progress */
		WARN_ON_ONCE(ref->confirm_switch);
108
		free_percpu(percpu_count);
Tejun Heo's avatar
Tejun Heo committed
109
		ref->percpu_count_ptr = __PERCPU_REF_ATOMIC_DEAD;
110 111
	}
}
112
EXPORT_SYMBOL_GPL(percpu_ref_exit);
113

114 115 116 117 118 119 120 121 122 123 124 125 126
static void percpu_ref_call_confirm_rcu(struct rcu_head *rcu)
{
	struct percpu_ref *ref = container_of(rcu, struct percpu_ref, rcu);

	ref->confirm_switch(ref);
	ref->confirm_switch = NULL;
	wake_up_all(&percpu_ref_switch_waitq);

	/* drop ref from percpu_ref_switch_to_atomic() */
	percpu_ref_put(ref);
}

static void percpu_ref_switch_to_atomic_rcu(struct rcu_head *rcu)
127 128
{
	struct percpu_ref *ref = container_of(rcu, struct percpu_ref, rcu);
129
	unsigned long __percpu *percpu_count = percpu_count_ptr(ref);
130
	unsigned long count = 0;
131 132 133
	int cpu;

	for_each_possible_cpu(cpu)
134
		count += *per_cpu_ptr(percpu_count, cpu);
135

136
	pr_debug("global %ld percpu %ld",
137
		 atomic_long_read(&ref->count), (long)count);
138 139 140 141 142 143 144 145 146 147 148 149 150

	/*
	 * It's crucial that we sum the percpu counters _before_ adding the sum
	 * to &ref->count; since gets could be happening on one cpu while puts
	 * happen on another, adding a single cpu's count could cause
	 * @ref->count to hit 0 before we've got a consistent value - but the
	 * sum of all the counts will be consistent and correct.
	 *
	 * Subtracting the bias value then has to happen _after_ adding count to
	 * &ref->count; we need the bias value to prevent &ref->count from
	 * reaching 0 before we add the percpu counts. But doing it at the same
	 * time is equivalent and saves us atomic operations:
	 */
151
	atomic_long_add((long)count - PERCPU_COUNT_BIAS, &ref->count);
152

153
	WARN_ONCE(atomic_long_read(&ref->count) <= 0,
154
		  "percpu ref (%pf) <= 0 (%ld) after switching to atomic",
155
		  ref->release, atomic_long_read(&ref->count));
156

157 158 159
	/* @ref is viewed as dead on all CPUs, send out switch confirmation */
	percpu_ref_call_confirm_rcu(rcu);
}
160

161 162 163 164 165 166 167
static void percpu_ref_noop_confirm_switch(struct percpu_ref *ref)
{
}

static void __percpu_ref_switch_to_atomic(struct percpu_ref *ref,
					  percpu_ref_func_t *confirm_switch)
{
168
	if (ref->percpu_count_ptr & __PERCPU_REF_ATOMIC) {
169
		if (confirm_switch)
170 171
			confirm_switch(ref);
		return;
172
	}
173

174 175 176 177 178 179 180 181 182 183 184
	/* switching from percpu to atomic */
	ref->percpu_count_ptr |= __PERCPU_REF_ATOMIC;

	/*
	 * Non-NULL ->confirm_switch is used to indicate that switching is
	 * in progress.  Use noop one if unspecified.
	 */
	ref->confirm_switch = confirm_switch ?: percpu_ref_noop_confirm_switch;

	percpu_ref_get(ref);	/* put after confirmation */
	call_rcu_sched(&ref->rcu, percpu_ref_switch_to_atomic_rcu);
185
}
186

187
static void __percpu_ref_switch_to_percpu(struct percpu_ref *ref)
188
{
189
	unsigned long __percpu *percpu_count = percpu_count_ptr(ref);
190 191
	int cpu;

192
	BUG_ON(!percpu_count);
193

194 195 196 197
	if (!(ref->percpu_count_ptr & __PERCPU_REF_ATOMIC))
		return;

	atomic_long_add(PERCPU_COUNT_BIAS, &ref->count);
198 199

	/*
200 201 202 203
	 * Restore per-cpu operation.  smp_store_release() is paired
	 * with READ_ONCE() in __ref_is_percpu() and guarantees that the
	 * zeroing is visible to all percpu accesses which can see the
	 * following __PERCPU_REF_ATOMIC clearing.
204 205
	 */
	for_each_possible_cpu(cpu)
206
		*per_cpu_ptr(percpu_count, cpu) = 0;
207

208
	smp_store_release(&ref->percpu_count_ptr,
209 210 211
			  ref->percpu_count_ptr & ~__PERCPU_REF_ATOMIC);
}

212 213 214
static void __percpu_ref_switch_mode(struct percpu_ref *ref,
				     percpu_ref_func_t *confirm_switch)
{
215 216
	lockdep_assert_held(&percpu_ref_switch_lock);

217 218 219 220 221
	/*
	 * If the previous ATOMIC switching hasn't finished yet, wait for
	 * its completion.  If the caller ensures that ATOMIC switching
	 * isn't in progress, this function can be called from any context.
	 */
222 223
	wait_event_lock_irq(percpu_ref_switch_waitq, !ref->confirm_switch,
			    percpu_ref_switch_lock);
224 225 226 227 228 229 230

	if (ref->force_atomic || (ref->percpu_count_ptr & __PERCPU_REF_DEAD))
		__percpu_ref_switch_to_atomic(ref, confirm_switch);
	else
		__percpu_ref_switch_to_percpu(ref);
}

231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
/**
 * percpu_ref_switch_to_atomic - switch a percpu_ref to atomic mode
 * @ref: percpu_ref to switch to atomic mode
 * @confirm_switch: optional confirmation callback
 *
 * There's no reason to use this function for the usual reference counting.
 * Use percpu_ref_kill[_and_confirm]().
 *
 * Schedule switching of @ref to atomic mode.  All its percpu counts will
 * be collected to the main atomic counter.  On completion, when all CPUs
 * are guaraneed to be in atomic mode, @confirm_switch, which may not
 * block, is invoked.  This function may be invoked concurrently with all
 * the get/put operations and can safely be mixed with kill and reinit
 * operations.  Note that @ref will stay in atomic mode across kill/reinit
 * cycles until percpu_ref_switch_to_percpu() is called.
 *
247 248 249
 * This function may block if @ref is in the process of switching to atomic
 * mode.  If the caller ensures that @ref is not in the process of
 * switching to atomic mode, this function can be called from any context.
250 251 252 253
 */
void percpu_ref_switch_to_atomic(struct percpu_ref *ref,
				 percpu_ref_func_t *confirm_switch)
{
254 255 256 257
	unsigned long flags;

	spin_lock_irqsave(&percpu_ref_switch_lock, flags);

258
	ref->force_atomic = true;
259
	__percpu_ref_switch_mode(ref, confirm_switch);
260 261

	spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
262
}
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
EXPORT_SYMBOL_GPL(percpu_ref_switch_to_atomic);

/**
 * percpu_ref_switch_to_atomic_sync - switch a percpu_ref to atomic mode
 * @ref: percpu_ref to switch to atomic mode
 *
 * Schedule switching the ref to atomic mode, and wait for the
 * switch to complete.  Caller must ensure that no other thread
 * will switch back to percpu mode.
 */
void percpu_ref_switch_to_atomic_sync(struct percpu_ref *ref)
{
	percpu_ref_switch_to_atomic(ref, NULL);
	wait_event(percpu_ref_switch_waitq, !ref->confirm_switch);
}
EXPORT_SYMBOL_GPL(percpu_ref_switch_to_atomic_sync);
279

280 281 282 283 284 285 286 287 288
/**
 * percpu_ref_switch_to_percpu - switch a percpu_ref to percpu mode
 * @ref: percpu_ref to switch to percpu mode
 *
 * There's no reason to use this function for the usual reference counting.
 * To re-use an expired ref, use percpu_ref_reinit().
 *
 * Switch @ref to percpu mode.  This function may be invoked concurrently
 * with all the get/put operations and can safely be mixed with kill and
289 290 291 292
 * reinit operations.  This function reverses the sticky atomic state set
 * by PERCPU_REF_INIT_ATOMIC or percpu_ref_switch_to_atomic().  If @ref is
 * dying or dead, the actual switching takes place on the following
 * percpu_ref_reinit().
293
 *
294 295 296
 * This function may block if @ref is in the process of switching to atomic
 * mode.  If the caller ensures that @ref is not in the process of
 * switching to atomic mode, this function can be called from any context.
297 298 299
 */
void percpu_ref_switch_to_percpu(struct percpu_ref *ref)
{
300 301 302 303
	unsigned long flags;

	spin_lock_irqsave(&percpu_ref_switch_lock, flags);

304
	ref->force_atomic = false;
305
	__percpu_ref_switch_mode(ref, NULL);
306 307

	spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
308
}
309
EXPORT_SYMBOL_GPL(percpu_ref_switch_to_percpu);
310 311 312 313 314 315 316 317 318 319 320 321 322

/**
 * percpu_ref_kill_and_confirm - drop the initial ref and schedule confirmation
 * @ref: percpu_ref to kill
 * @confirm_kill: optional confirmation callback
 *
 * Equivalent to percpu_ref_kill() but also schedules kill confirmation if
 * @confirm_kill is not NULL.  @confirm_kill, which may not block, will be
 * called after @ref is seen as dead from all CPUs at which point all
 * further invocations of percpu_ref_tryget_live() will fail.  See
 * percpu_ref_tryget_live() for details.
 *
 * This function normally doesn't block and can be called from any context
323
 * but it may block if @confirm_kill is specified and @ref is in the
324
 * process of switching to atomic mode by percpu_ref_switch_to_atomic().
325 326 327 328
 */
void percpu_ref_kill_and_confirm(struct percpu_ref *ref,
				 percpu_ref_func_t *confirm_kill)
{
329 330 331 332
	unsigned long flags;

	spin_lock_irqsave(&percpu_ref_switch_lock, flags);

333 334 335 336
	WARN_ONCE(ref->percpu_count_ptr & __PERCPU_REF_DEAD,
		  "%s called more than once on %pf!", __func__, ref->release);

	ref->percpu_count_ptr |= __PERCPU_REF_DEAD;
337
	__percpu_ref_switch_mode(ref, confirm_kill);
338
	percpu_ref_put(ref);
339 340

	spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
341 342
}
EXPORT_SYMBOL_GPL(percpu_ref_kill_and_confirm);
343 344 345 346 347 348

/**
 * percpu_ref_reinit - re-initialize a percpu refcount
 * @ref: perpcu_ref to re-initialize
 *
 * Re-initialize @ref so that it's in the same state as when it finished
349 350
 * percpu_ref_init() ignoring %PERCPU_REF_INIT_DEAD.  @ref must have been
 * initialized successfully and reached 0 but not exited.
351 352 353 354 355 356
 *
 * Note that percpu_ref_tryget[_live]() are safe to perform on @ref while
 * this function is in progress.
 */
void percpu_ref_reinit(struct percpu_ref *ref)
{
357 358 359 360
	unsigned long flags;

	spin_lock_irqsave(&percpu_ref_switch_lock, flags);

361 362 363 364
	WARN_ON_ONCE(!percpu_ref_is_zero(ref));

	ref->percpu_count_ptr &= ~__PERCPU_REF_DEAD;
	percpu_ref_get(ref);
365
	__percpu_ref_switch_mode(ref, NULL);
366 367

	spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
368 369
}
EXPORT_SYMBOL_GPL(percpu_ref_reinit);