padata.c 27.1 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0
2 3 4
/*
 * padata.c - generic interface to process data streams in parallel
 *
5 6
 * See Documentation/padata.txt for an api documentation.
 *
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 * Copyright (C) 2008, 2009 secunet Security Networks AG
 * Copyright (C) 2008, 2009 Steffen Klassert <steffen.klassert@secunet.com>
 *
 * This program is free software; you can redistribute it and/or modify it
 * under the terms and conditions of the GNU General Public License,
 * version 2, as published by the Free Software Foundation.
 *
 * This program is distributed in the hope it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
 * more details.
 *
 * You should have received a copy of the GNU General Public License along with
 * this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
 */

24
#include <linux/export.h>
25 26 27 28 29 30
#include <linux/cpumask.h>
#include <linux/err.h>
#include <linux/cpu.h>
#include <linux/padata.h>
#include <linux/mutex.h>
#include <linux/sched.h>
31
#include <linux/slab.h>
32
#include <linux/sysfs.h>
33
#include <linux/rcupdate.h>
34
#include <linux/module.h>
35

36
#define MAX_OBJ_NUM 1000
37 38 39 40 41

static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index)
{
	int cpu, target_cpu;

42
	target_cpu = cpumask_first(pd->cpumask.pcpu);
43
	for (cpu = 0; cpu < cpu_index; cpu++)
44
		target_cpu = cpumask_next(target_cpu, pd->cpumask.pcpu);
45 46 47 48

	return target_cpu;
}

49
static int padata_cpu_hash(struct parallel_data *pd)
50
{
51
	unsigned int seq_nr;
52 53 54 55 56 57
	int cpu_index;

	/*
	 * Hash the sequence numbers to the cpus by taking
	 * seq_nr mod. number of cpus in use.
	 */
58

59 60
	seq_nr = atomic_inc_return(&pd->seq_nr);
	cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu);
61 62 63 64

	return padata_index_to_cpu(pd, cpu_index);
}

65
static void padata_parallel_worker(struct work_struct *parallel_work)
66
{
67
	struct padata_parallel_queue *pqueue;
68 69 70
	LIST_HEAD(local_list);

	local_bh_disable();
71 72
	pqueue = container_of(parallel_work,
			      struct padata_parallel_queue, work);
73

74 75 76
	spin_lock(&pqueue->parallel.lock);
	list_replace_init(&pqueue->parallel.list, &local_list);
	spin_unlock(&pqueue->parallel.lock);
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91

	while (!list_empty(&local_list)) {
		struct padata_priv *padata;

		padata = list_entry(local_list.next,
				    struct padata_priv, list);

		list_del_init(&padata->list);

		padata->parallel(padata);
	}

	local_bh_enable();
}

92
/**
93 94 95 96 97
 * padata_do_parallel - padata parallelization function
 *
 * @pinst: padata instance
 * @padata: object to be parallelized
 * @cb_cpu: cpu the serialization callback function will run on,
98
 *          must be in the serial cpumask of padata(i.e. cpumask.cbcpu).
99 100 101 102 103 104 105 106 107
 *
 * The parallelization callback function will run with BHs off.
 * Note: Every object which is parallelized by padata_do_parallel
 * must be seen by padata_do_serial.
 */
int padata_do_parallel(struct padata_instance *pinst,
		       struct padata_priv *padata, int cb_cpu)
{
	int target_cpu, err;
108
	struct padata_parallel_queue *queue;
109 110 111 112
	struct parallel_data *pd;

	rcu_read_lock_bh();

113
	pd = rcu_dereference_bh(pinst->pd);
114

115
	err = -EINVAL;
116
	if (!(pinst->flags & PADATA_INIT) || pinst->flags & PADATA_INVALID)
117 118
		goto out;

119
	if (!cpumask_test_cpu(cb_cpu, pd->cpumask.cbcpu))
120 121 122 123 124 125 126 127 128
		goto out;

	err =  -EBUSY;
	if ((pinst->flags & PADATA_RESET))
		goto out;

	if (atomic_read(&pd->refcnt) >= MAX_OBJ_NUM)
		goto out;

129
	err = 0;
130 131 132 133
	atomic_inc(&pd->refcnt);
	padata->pd = pd;
	padata->cb_cpu = cb_cpu;

134
	target_cpu = padata_cpu_hash(pd);
135
	padata->cpu = target_cpu;
136
	queue = per_cpu_ptr(pd->pqueue, target_cpu);
137 138 139 140 141

	spin_lock(&queue->parallel.lock);
	list_add_tail(&padata->list, &queue->parallel.list);
	spin_unlock(&queue->parallel.lock);

142
	queue_work_on(target_cpu, pinst->wq, &queue->work);
143 144 145 146 147 148 149 150

out:
	rcu_read_unlock_bh();

	return err;
}
EXPORT_SYMBOL(padata_do_parallel);

151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
/*
 * padata_get_next - Get the next object that needs serialization.
 *
 * Return values are:
 *
 * A pointer to the control struct of the next object that needs
 * serialization, if present in one of the percpu reorder queues.
 *
 * -EINPROGRESS, if the next object that needs serialization will
 *  be parallel processed by another cpu and is not yet present in
 *  the cpu's reorder queue.
 *
 * -ENODATA, if this cpu has to do the parallel processing for
 *  the next object.
 */
166 167
static struct padata_priv *padata_get_next(struct parallel_data *pd)
{
168
	int cpu, num_cpus;
169
	unsigned int next_nr, next_index;
170
	struct padata_parallel_queue *next_queue;
171 172 173
	struct padata_priv *padata;
	struct padata_list *reorder;

174
	num_cpus = cpumask_weight(pd->cpumask.pcpu);
175

176 177 178 179 180 181 182
	/*
	 * Calculate the percpu reorder queue and the sequence
	 * number of the next object.
	 */
	next_nr = pd->processed;
	next_index = next_nr % num_cpus;
	cpu = padata_index_to_cpu(pd, next_index);
183
	next_queue = per_cpu_ptr(pd->pqueue, cpu);
184

185 186
	reorder = &next_queue->reorder;

187
	spin_lock(&reorder->lock);
188 189 190 191 192 193 194
	if (!list_empty(&reorder->list)) {
		padata = list_entry(reorder->list.next,
				    struct padata_priv, list);

		list_del_init(&padata->list);
		atomic_dec(&pd->reorder_objects);

195
		pd->processed++;
196

197
		spin_unlock(&reorder->lock);
198 199
		goto out;
	}
200
	spin_unlock(&reorder->lock);
201

202
	if (__this_cpu_read(pd->pqueue->cpu_index) == next_queue->cpu_index) {
203 204 205 206 207 208 209 210 211 212 213
		padata = ERR_PTR(-ENODATA);
		goto out;
	}

	padata = ERR_PTR(-EINPROGRESS);
out:
	return padata;
}

static void padata_reorder(struct parallel_data *pd)
{
214
	int cb_cpu;
215
	struct padata_priv *padata;
216
	struct padata_serial_queue *squeue;
217 218
	struct padata_instance *pinst = pd->pinst;

219 220 221 222 223 224 225 226 227 228
	/*
	 * We need to ensure that only one cpu can work on dequeueing of
	 * the reorder queue the time. Calculating in which percpu reorder
	 * queue the next object will arrive takes some time. A spinlock
	 * would be highly contended. Also it is not clear in which order
	 * the objects arrive to the reorder queues. So a cpu could wait to
	 * get the lock just to notice that there is nothing to do at the
	 * moment. Therefore we use a trylock and let the holder of the lock
	 * care for all the objects enqueued during the holdtime of the lock.
	 */
229
	if (!spin_trylock_bh(&pd->lock))
230
		return;
231 232 233 234

	while (1) {
		padata = padata_get_next(pd);

235
		/*
236 237 238
		 * If the next object that needs serialization is parallel
		 * processed by another cpu and is still on it's way to the
		 * cpu's reorder queue, nothing to do for now.
239
		 */
240
		if (PTR_ERR(padata) == -EINPROGRESS)
241 242
			break;

243 244 245
		/*
		 * This cpu has to do the parallel processing of the next
		 * object. It's waiting in the cpu's parallelization queue,
246
		 * so exit immediately.
247
		 */
248
		if (PTR_ERR(padata) == -ENODATA) {
249
			del_timer(&pd->timer);
250
			spin_unlock_bh(&pd->lock);
251
			return;
252 253
		}

254 255
		cb_cpu = padata->cb_cpu;
		squeue = per_cpu_ptr(pd->squeue, cb_cpu);
256

257 258 259
		spin_lock(&squeue->serial.lock);
		list_add_tail(&padata->list, &squeue->serial.list);
		spin_unlock(&squeue->serial.lock);
260

261
		queue_work_on(cb_cpu, pinst->wq, &squeue->work);
262 263 264 265
	}

	spin_unlock_bh(&pd->lock);

266 267 268
	/*
	 * The next object that needs serialization might have arrived to
	 * the reorder queues in the meantime, we will be called again
269
	 * from the timer function if no one else cares for it.
270
	 */
271 272 273 274 275
	if (atomic_read(&pd->reorder_objects)
			&& !(pinst->flags & PADATA_RESET))
		mod_timer(&pd->timer, jiffies + HZ);
	else
		del_timer(&pd->timer);
276 277 278 279

	return;
}

280 281 282 283 284 285 286 287 288 289 290 291
static void invoke_padata_reorder(struct work_struct *work)
{
	struct padata_parallel_queue *pqueue;
	struct parallel_data *pd;

	local_bh_disable();
	pqueue = container_of(work, struct padata_parallel_queue, reorder_work);
	pd = pqueue->pd;
	padata_reorder(pd);
	local_bh_enable();
}

292
static void padata_reorder_timer(struct timer_list *t)
293
{
294
	struct parallel_data *pd = from_timer(pd, t, timer);
295 296
	unsigned int weight;
	int target_cpu, cpu;
297

298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
	cpu = get_cpu();

	/* We don't lock pd here to not interfere with parallel processing
	 * padata_reorder() calls on other CPUs. We just need any CPU out of
	 * the cpumask.pcpu set. It would be nice if it's the right one but
	 * it doesn't matter if we're off to the next one by using an outdated
	 * pd->processed value.
	 */
	weight = cpumask_weight(pd->cpumask.pcpu);
	target_cpu = padata_index_to_cpu(pd, pd->processed % weight);

	/* ensure to call the reorder callback on the correct CPU */
	if (cpu != target_cpu) {
		struct padata_parallel_queue *pqueue;
		struct padata_instance *pinst;

		/* The timer function is serialized wrt itself -- no locking
		 * needed.
		 */
		pinst = pd->pinst;
		pqueue = per_cpu_ptr(pd->pqueue, target_cpu);
		queue_work_on(target_cpu, pinst->wq, &pqueue->reorder_work);
	} else {
		padata_reorder(pd);
	}

	put_cpu();
325 326
}

327
static void padata_serial_worker(struct work_struct *serial_work)
328
{
329
	struct padata_serial_queue *squeue;
330 331 332 333
	struct parallel_data *pd;
	LIST_HEAD(local_list);

	local_bh_disable();
334 335
	squeue = container_of(serial_work, struct padata_serial_queue, work);
	pd = squeue->pd;
336

337 338 339
	spin_lock(&squeue->serial.lock);
	list_replace_init(&squeue->serial.list, &local_list);
	spin_unlock(&squeue->serial.lock);
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354

	while (!list_empty(&local_list)) {
		struct padata_priv *padata;

		padata = list_entry(local_list.next,
				    struct padata_priv, list);

		list_del_init(&padata->list);

		padata->serial(padata);
		atomic_dec(&pd->refcnt);
	}
	local_bh_enable();
}

355
/**
356 357 358 359 360 361 362 363 364 365
 * padata_do_serial - padata serialization function
 *
 * @padata: object to be serialized.
 *
 * padata_do_serial must be called for every parallelized object.
 * The serialization callback function will run with BHs off.
 */
void padata_do_serial(struct padata_priv *padata)
{
	int cpu;
366
	struct padata_parallel_queue *pqueue;
367
	struct parallel_data *pd;
368
	int reorder_via_wq = 0;
369 370 371 372

	pd = padata->pd;

	cpu = get_cpu();
373 374 375 376 377 378 379 380 381 382

	/* We need to run on the same CPU padata_do_parallel(.., padata, ..)
	 * was called on -- or, at least, enqueue the padata object into the
	 * correct per-cpu queue.
	 */
	if (cpu != padata->cpu) {
		reorder_via_wq = 1;
		cpu = padata->cpu;
	}

383
	pqueue = per_cpu_ptr(pd->pqueue, cpu);
384

385
	spin_lock(&pqueue->reorder.lock);
386
	atomic_inc(&pd->reorder_objects);
387 388
	list_add_tail(&padata->list, &pqueue->reorder.list);
	spin_unlock(&pqueue->reorder.lock);
389 390 391

	put_cpu();

392 393 394 395 396 397 398
	/* If we're running on the wrong CPU, call padata_reorder() via a
	 * kernel worker.
	 */
	if (reorder_via_wq)
		queue_work_on(cpu, pd->pinst->wq, &pqueue->reorder_work);
	else
		padata_reorder(pd);
399 400 401
}
EXPORT_SYMBOL(padata_do_serial);

402 403 404
static int padata_setup_cpumasks(struct parallel_data *pd,
				 const struct cpumask *pcpumask,
				 const struct cpumask *cbcpumask)
405
{
406 407
	if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL))
		return -ENOMEM;
408

409
	cpumask_and(pd->cpumask.pcpu, pcpumask, cpu_online_mask);
410
	if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL)) {
411
		free_cpumask_var(pd->cpumask.pcpu);
412 413
		return -ENOMEM;
	}
414

415
	cpumask_and(pd->cpumask.cbcpu, cbcpumask, cpu_online_mask);
416 417
	return 0;
}
418

419 420 421 422 423
static void __padata_list_init(struct padata_list *pd_list)
{
	INIT_LIST_HEAD(&pd_list->list);
	spin_lock_init(&pd_list->lock);
}
424

425 426 427 428 429
/* Initialize all percpu queues used by serial workers */
static void padata_init_squeues(struct parallel_data *pd)
{
	int cpu;
	struct padata_serial_queue *squeue;
430

431 432 433 434 435 436 437
	for_each_cpu(cpu, pd->cpumask.cbcpu) {
		squeue = per_cpu_ptr(pd->squeue, cpu);
		squeue->pd = pd;
		__padata_list_init(&squeue->serial);
		INIT_WORK(&squeue->work, padata_serial_worker);
	}
}
438

439 440 441
/* Initialize all percpu queues used by parallel workers */
static void padata_init_pqueues(struct parallel_data *pd)
{
442
	int cpu_index, cpu;
443
	struct padata_parallel_queue *pqueue;
444

445
	cpu_index = 0;
446
	for_each_possible_cpu(cpu) {
447
		pqueue = per_cpu_ptr(pd->pqueue, cpu);
448 449 450 451 452 453

		if (!cpumask_test_cpu(cpu, pd->cpumask.pcpu)) {
			pqueue->cpu_index = -1;
			continue;
		}

454 455
		pqueue->pd = pd;
		pqueue->cpu_index = cpu_index;
456
		cpu_index++;
457

458 459 460
		__padata_list_init(&pqueue->reorder);
		__padata_list_init(&pqueue->parallel);
		INIT_WORK(&pqueue->work, padata_parallel_worker);
461
		INIT_WORK(&pqueue->reorder_work, invoke_padata_reorder);
462
		atomic_set(&pqueue->num_obj, 0);
463
	}
464
}
465

466 467 468 469 470 471
/* Allocate and initialize the internal cpumask dependend resources. */
static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
					     const struct cpumask *pcpumask,
					     const struct cpumask *cbcpumask)
{
	struct parallel_data *pd;
472

473 474 475
	pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL);
	if (!pd)
		goto err;
476

477 478 479 480 481 482 483 484 485
	pd->pqueue = alloc_percpu(struct padata_parallel_queue);
	if (!pd->pqueue)
		goto err_free_pd;

	pd->squeue = alloc_percpu(struct padata_serial_queue);
	if (!pd->squeue)
		goto err_free_pqueue;
	if (padata_setup_cpumasks(pd, pcpumask, cbcpumask) < 0)
		goto err_free_squeue;
486

487 488
	padata_init_pqueues(pd);
	padata_init_squeues(pd);
489
	timer_setup(&pd->timer, padata_reorder_timer, 0);
490
	atomic_set(&pd->seq_nr, -1);
491 492 493 494 495 496 497
	atomic_set(&pd->reorder_objects, 0);
	atomic_set(&pd->refcnt, 0);
	pd->pinst = pinst;
	spin_lock_init(&pd->lock);

	return pd;

498 499 500 501
err_free_squeue:
	free_percpu(pd->squeue);
err_free_pqueue:
	free_percpu(pd->pqueue);
502 503 504 505 506 507 508 509
err_free_pd:
	kfree(pd);
err:
	return NULL;
}

static void padata_free_pd(struct parallel_data *pd)
{
510 511 512 513
	free_cpumask_var(pd->cpumask.pcpu);
	free_cpumask_var(pd->cpumask.cbcpu);
	free_percpu(pd->pqueue);
	free_percpu(pd->squeue);
514 515 516
	kfree(pd);
}

517
/* Flush all objects out of the padata queues. */
518 519 520
static void padata_flush_queues(struct parallel_data *pd)
{
	int cpu;
521 522
	struct padata_parallel_queue *pqueue;
	struct padata_serial_queue *squeue;
523

524 525 526
	for_each_cpu(cpu, pd->cpumask.pcpu) {
		pqueue = per_cpu_ptr(pd->pqueue, cpu);
		flush_work(&pqueue->work);
527 528 529 530 531 532 533
	}

	del_timer_sync(&pd->timer);

	if (atomic_read(&pd->reorder_objects))
		padata_reorder(pd);

534 535 536
	for_each_cpu(cpu, pd->cpumask.cbcpu) {
		squeue = per_cpu_ptr(pd->squeue, cpu);
		flush_work(&squeue->work);
537 538 539 540 541
	}

	BUG_ON(atomic_read(&pd->refcnt) != 0);
}

542 543 544 545 546
static void __padata_start(struct padata_instance *pinst)
{
	pinst->flags |= PADATA_INIT;
}

547 548 549 550 551 552 553 554 555 556 557 558 559 560
static void __padata_stop(struct padata_instance *pinst)
{
	if (!(pinst->flags & PADATA_INIT))
		return;

	pinst->flags &= ~PADATA_INIT;

	synchronize_rcu();

	get_online_cpus();
	padata_flush_queues(pinst->pd);
	put_online_cpus();
}

561
/* Replace the internal control structure with a new one. */
562 563 564 565
static void padata_replace(struct padata_instance *pinst,
			   struct parallel_data *pd_new)
{
	struct parallel_data *pd_old = pinst->pd;
566
	int notification_mask = 0;
567 568 569 570 571 572 573

	pinst->flags |= PADATA_RESET;

	rcu_assign_pointer(pinst->pd, pd_new);

	synchronize_rcu();

574 575 576 577 578
	if (!cpumask_equal(pd_old->cpumask.pcpu, pd_new->cpumask.pcpu))
		notification_mask |= PADATA_CPU_PARALLEL;
	if (!cpumask_equal(pd_old->cpumask.cbcpu, pd_new->cpumask.cbcpu))
		notification_mask |= PADATA_CPU_SERIAL;

579
	padata_flush_queues(pd_old);
580 581
	padata_free_pd(pd_old);

582 583
	if (notification_mask)
		blocking_notifier_call_chain(&pinst->cpumask_change_notifier,
584 585
					     notification_mask,
					     &pd_new->cpumask);
586 587 588 589

	pinst->flags &= ~PADATA_RESET;
}

590
/**
591 592
 * padata_register_cpumask_notifier - Registers a notifier that will be called
 *                             if either pcpu or cbcpu or both cpumasks change.
593
 *
594 595
 * @pinst: A poineter to padata instance
 * @nblock: A pointer to notifier block.
596
 */
597 598
int padata_register_cpumask_notifier(struct padata_instance *pinst,
				     struct notifier_block *nblock)
599
{
600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621
	return blocking_notifier_chain_register(&pinst->cpumask_change_notifier,
						nblock);
}
EXPORT_SYMBOL(padata_register_cpumask_notifier);

/**
 * padata_unregister_cpumask_notifier - Unregisters cpumask notifier
 *        registered earlier  using padata_register_cpumask_notifier
 *
 * @pinst: A pointer to data instance.
 * @nlock: A pointer to notifier block.
 */
int padata_unregister_cpumask_notifier(struct padata_instance *pinst,
				       struct notifier_block *nblock)
{
	return blocking_notifier_chain_unregister(
		&pinst->cpumask_change_notifier,
		nblock);
}
EXPORT_SYMBOL(padata_unregister_cpumask_notifier);


622 623 624 625
/* If cpumask contains no active cpu, we mark the instance as invalid. */
static bool padata_validate_cpumask(struct padata_instance *pinst,
				    const struct cpumask *cpumask)
{
626
	if (!cpumask_intersects(cpumask, cpu_online_mask)) {
627 628 629 630 631 632 633 634
		pinst->flags |= PADATA_INVALID;
		return false;
	}

	pinst->flags &= ~PADATA_INVALID;
	return true;
}

635 636 637 638 639
static int __padata_set_cpumasks(struct padata_instance *pinst,
				 cpumask_var_t pcpumask,
				 cpumask_var_t cbcpumask)
{
	int valid;
640
	struct parallel_data *pd;
641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667

	valid = padata_validate_cpumask(pinst, pcpumask);
	if (!valid) {
		__padata_stop(pinst);
		goto out_replace;
	}

	valid = padata_validate_cpumask(pinst, cbcpumask);
	if (!valid)
		__padata_stop(pinst);

out_replace:
	pd = padata_alloc_pd(pinst, pcpumask, cbcpumask);
	if (!pd)
		return -ENOMEM;

	cpumask_copy(pinst->cpumask.pcpu, pcpumask);
	cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);

	padata_replace(pinst, pd);

	if (valid)
		__padata_start(pinst);

	return 0;
}

668 669 670
/**
 * padata_set_cpumask: Sets specified by @cpumask_type cpumask to the value
 *                     equivalent to @cpumask.
671 672
 *
 * @pinst: padata instance
673 674
 * @cpumask_type: PADATA_CPU_SERIAL or PADATA_CPU_PARALLEL corresponding
 *                to parallel and serial cpumasks respectively.
675 676
 * @cpumask: the cpumask to use
 */
677 678 679 680
int padata_set_cpumask(struct padata_instance *pinst, int cpumask_type,
		       cpumask_var_t cpumask)
{
	struct cpumask *serial_mask, *parallel_mask;
681 682 683
	int err = -EINVAL;

	mutex_lock(&pinst->lock);
684 685
	get_online_cpus();

686 687 688 689 690 691 692 693 694 695
	switch (cpumask_type) {
	case PADATA_CPU_PARALLEL:
		serial_mask = pinst->cpumask.cbcpu;
		parallel_mask = cpumask;
		break;
	case PADATA_CPU_SERIAL:
		parallel_mask = pinst->cpumask.pcpu;
		serial_mask = cpumask;
		break;
	default:
696
		 goto out;
697 698
	}

699
	err =  __padata_set_cpumasks(pinst, parallel_mask, serial_mask);
700 701

out:
702
	put_online_cpus();
703 704 705 706 707 708
	mutex_unlock(&pinst->lock);

	return err;
}
EXPORT_SYMBOL(padata_set_cpumask);

709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
/**
 * padata_start - start the parallel processing
 *
 * @pinst: padata instance to start
 */
int padata_start(struct padata_instance *pinst)
{
	int err = 0;

	mutex_lock(&pinst->lock);

	if (pinst->flags & PADATA_INVALID)
		err = -EINVAL;

	 __padata_start(pinst);

	mutex_unlock(&pinst->lock);

	return err;
}
EXPORT_SYMBOL(padata_start);

/**
 * padata_stop - stop the parallel processing
 *
 * @pinst: padata instance to stop
 */
void padata_stop(struct padata_instance *pinst)
{
	mutex_lock(&pinst->lock);
	__padata_stop(pinst);
	mutex_unlock(&pinst->lock);
}
EXPORT_SYMBOL(padata_stop);

#ifdef CONFIG_HOTPLUG_CPU

746 747 748 749
static int __padata_add_cpu(struct padata_instance *pinst, int cpu)
{
	struct parallel_data *pd;

750
	if (cpumask_test_cpu(cpu, cpu_online_mask)) {
751 752
		pd = padata_alloc_pd(pinst, pinst->cpumask.pcpu,
				     pinst->cpumask.cbcpu);
753 754 755 756
		if (!pd)
			return -ENOMEM;

		padata_replace(pinst, pd);
757

758 759
		if (padata_validate_cpumask(pinst, pinst->cpumask.pcpu) &&
		    padata_validate_cpumask(pinst, pinst->cpumask.cbcpu))
760
			__padata_start(pinst);
761 762 763 764 765 766 767
	}

	return 0;
}

static int __padata_remove_cpu(struct padata_instance *pinst, int cpu)
{
768
	struct parallel_data *pd = NULL;
769 770

	if (cpumask_test_cpu(cpu, cpu_online_mask)) {
771

772
		if (!padata_validate_cpumask(pinst, pinst->cpumask.pcpu) ||
773
		    !padata_validate_cpumask(pinst, pinst->cpumask.cbcpu))
774 775
			__padata_stop(pinst);

776 777
		pd = padata_alloc_pd(pinst, pinst->cpumask.pcpu,
				     pinst->cpumask.cbcpu);
778 779 780 781
		if (!pd)
			return -ENOMEM;

		padata_replace(pinst, pd);
782 783 784

		cpumask_clear_cpu(cpu, pd->cpumask.cbcpu);
		cpumask_clear_cpu(cpu, pd->cpumask.pcpu);
785 786 787 788 789
	}

	return 0;
}

790
 /**
791
 * padata_remove_cpu - remove a cpu from the one or both(serial and parallel)
792
 *                     padata cpumasks.
793 794 795
 *
 * @pinst: padata instance
 * @cpu: cpu to remove
796 797 798 799
 * @mask: bitmask specifying from which cpumask @cpu should be removed
 *        The @mask may be any combination of the following flags:
 *          PADATA_CPU_SERIAL   - serial cpumask
 *          PADATA_CPU_PARALLEL - parallel cpumask
800
 */
801
int padata_remove_cpu(struct padata_instance *pinst, int cpu, int mask)
802 803 804
{
	int err;

805 806 807
	if (!(mask & (PADATA_CPU_SERIAL | PADATA_CPU_PARALLEL)))
		return -EINVAL;

808 809
	mutex_lock(&pinst->lock);

810
	get_online_cpus();
811 812 813 814 815
	if (mask & PADATA_CPU_SERIAL)
		cpumask_clear_cpu(cpu, pinst->cpumask.cbcpu);
	if (mask & PADATA_CPU_PARALLEL)
		cpumask_clear_cpu(cpu, pinst->cpumask.pcpu);

816
	err = __padata_remove_cpu(pinst, cpu);
817
	put_online_cpus();
818 819 820 821 822 823 824

	mutex_unlock(&pinst->lock);

	return err;
}
EXPORT_SYMBOL(padata_remove_cpu);

825 826 827 828 829 830
static inline int pinst_has_cpu(struct padata_instance *pinst, int cpu)
{
	return cpumask_test_cpu(cpu, pinst->cpumask.pcpu) ||
		cpumask_test_cpu(cpu, pinst->cpumask.cbcpu);
}

831
static int padata_cpu_online(unsigned int cpu, struct hlist_node *node)
832 833
{
	struct padata_instance *pinst;
834
	int ret;
835

836 837 838
	pinst = hlist_entry_safe(node, struct padata_instance, node);
	if (!pinst_has_cpu(pinst, cpu))
		return 0;
839

840 841 842 843 844
	mutex_lock(&pinst->lock);
	ret = __padata_add_cpu(pinst, cpu);
	mutex_unlock(&pinst->lock);
	return ret;
}
845

846 847 848 849 850 851 852 853
static int padata_cpu_prep_down(unsigned int cpu, struct hlist_node *node)
{
	struct padata_instance *pinst;
	int ret;

	pinst = hlist_entry_safe(node, struct padata_instance, node);
	if (!pinst_has_cpu(pinst, cpu))
		return 0;
854

855 856 857 858
	mutex_lock(&pinst->lock);
	ret = __padata_remove_cpu(pinst, cpu);
	mutex_unlock(&pinst->lock);
	return ret;
859
}
860 861

static enum cpuhp_state hp_online;
862
#endif
863

864 865 866
static void __padata_free(struct padata_instance *pinst)
{
#ifdef CONFIG_HOTPLUG_CPU
867
	cpuhp_state_remove_instance_nocalls(hp_online, &pinst->node);
868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906
#endif

	padata_stop(pinst);
	padata_free_pd(pinst->pd);
	free_cpumask_var(pinst->cpumask.pcpu);
	free_cpumask_var(pinst->cpumask.cbcpu);
	kfree(pinst);
}

#define kobj2pinst(_kobj)					\
	container_of(_kobj, struct padata_instance, kobj)
#define attr2pentry(_attr)					\
	container_of(_attr, struct padata_sysfs_entry, attr)

static void padata_sysfs_release(struct kobject *kobj)
{
	struct padata_instance *pinst = kobj2pinst(kobj);
	__padata_free(pinst);
}

struct padata_sysfs_entry {
	struct attribute attr;
	ssize_t (*show)(struct padata_instance *, struct attribute *, char *);
	ssize_t (*store)(struct padata_instance *, struct attribute *,
			 const char *, size_t);
};

static ssize_t show_cpumask(struct padata_instance *pinst,
			    struct attribute *attr,  char *buf)
{
	struct cpumask *cpumask;
	ssize_t len;

	mutex_lock(&pinst->lock);
	if (!strcmp(attr->name, "serial_cpumask"))
		cpumask = pinst->cpumask.cbcpu;
	else
		cpumask = pinst->cpumask.pcpu;

907 908
	len = snprintf(buf, PAGE_SIZE, "%*pb\n",
		       nr_cpu_ids, cpumask_bits(cpumask));
909
	mutex_unlock(&pinst->lock);
910
	return len < PAGE_SIZE ? len : -EINVAL;
911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001
}

static ssize_t store_cpumask(struct padata_instance *pinst,
			     struct attribute *attr,
			     const char *buf, size_t count)
{
	cpumask_var_t new_cpumask;
	ssize_t ret;
	int mask_type;

	if (!alloc_cpumask_var(&new_cpumask, GFP_KERNEL))
		return -ENOMEM;

	ret = bitmap_parse(buf, count, cpumask_bits(new_cpumask),
			   nr_cpumask_bits);
	if (ret < 0)
		goto out;

	mask_type = !strcmp(attr->name, "serial_cpumask") ?
		PADATA_CPU_SERIAL : PADATA_CPU_PARALLEL;
	ret = padata_set_cpumask(pinst, mask_type, new_cpumask);
	if (!ret)
		ret = count;

out:
	free_cpumask_var(new_cpumask);
	return ret;
}

#define PADATA_ATTR_RW(_name, _show_name, _store_name)		\
	static struct padata_sysfs_entry _name##_attr =		\
		__ATTR(_name, 0644, _show_name, _store_name)
#define PADATA_ATTR_RO(_name, _show_name)		\
	static struct padata_sysfs_entry _name##_attr = \
		__ATTR(_name, 0400, _show_name, NULL)

PADATA_ATTR_RW(serial_cpumask, show_cpumask, store_cpumask);
PADATA_ATTR_RW(parallel_cpumask, show_cpumask, store_cpumask);

/*
 * Padata sysfs provides the following objects:
 * serial_cpumask   [RW] - cpumask for serial workers
 * parallel_cpumask [RW] - cpumask for parallel workers
 */
static struct attribute *padata_default_attrs[] = {
	&serial_cpumask_attr.attr,
	&parallel_cpumask_attr.attr,
	NULL,
};

static ssize_t padata_sysfs_show(struct kobject *kobj,
				 struct attribute *attr, char *buf)
{
	struct padata_instance *pinst;
	struct padata_sysfs_entry *pentry;
	ssize_t ret = -EIO;

	pinst = kobj2pinst(kobj);
	pentry = attr2pentry(attr);
	if (pentry->show)
		ret = pentry->show(pinst, attr, buf);

	return ret;
}

static ssize_t padata_sysfs_store(struct kobject *kobj, struct attribute *attr,
				  const char *buf, size_t count)
{
	struct padata_instance *pinst;
	struct padata_sysfs_entry *pentry;
	ssize_t ret = -EIO;

	pinst = kobj2pinst(kobj);
	pentry = attr2pentry(attr);
	if (pentry->show)
		ret = pentry->store(pinst, attr, buf, count);

	return ret;
}

static const struct sysfs_ops padata_sysfs_ops = {
	.show = padata_sysfs_show,
	.store = padata_sysfs_store,
};

static struct kobj_type padata_attr_type = {
	.sysfs_ops = &padata_sysfs_ops,
	.default_attrs = padata_default_attrs,
	.release = padata_sysfs_release,
};

1002
/**
1003 1004
 * padata_alloc - allocate and initialize a padata instance and specify
 *                cpumasks for serial and parallel workers.
1005 1006
 *
 * @wq: workqueue to use for the allocated padata instance
1007 1008
 * @pcpumask: cpumask that will be used for padata parallelization
 * @cbcpumask: cpumask that will be used for padata serialization
1009 1010
 *
 * Must be called from a cpus_read_lock() protected region
1011
 */
1012 1013 1014
static struct padata_instance *padata_alloc(struct workqueue_struct *wq,
					    const struct cpumask *pcpumask,
					    const struct cpumask *cbcpumask)
1015 1016
{
	struct padata_instance *pinst;
1017
	struct parallel_data *pd = NULL;
1018 1019 1020 1021 1022

	pinst = kzalloc(sizeof(struct padata_instance), GFP_KERNEL);
	if (!pinst)
		goto err;

1023
	if (!alloc_cpumask_var(&pinst->cpumask.pcpu, GFP_KERNEL))
1024
		goto err_free_inst;
1025 1026
	if (!alloc_cpumask_var(&pinst->cpumask.cbcpu, GFP_KERNEL)) {
		free_cpumask_var(pinst->cpumask.pcpu);
1027
		goto err_free_inst;
1028
	}
1029 1030 1031
	if (!padata_validate_cpumask(pinst, pcpumask) ||
	    !padata_validate_cpumask(pinst, cbcpumask))
		goto err_free_masks;
1032

1033 1034 1035
	pd = padata_alloc_pd(pinst, pcpumask, cbcpumask);
	if (!pd)
		goto err_free_masks;
1036

1037 1038 1039 1040
	rcu_assign_pointer(pinst->pd, pd);

	pinst->wq = wq;

1041 1042
	cpumask_copy(pinst->cpumask.pcpu, pcpumask);
	cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);
1043 1044 1045

	pinst->flags = 0;

1046
	BLOCKING_INIT_NOTIFIER_HEAD(&pinst->cpumask_change_notifier);
1047
	kobject_init(&pinst->kobj, &padata_attr_type);
1048 1049
	mutex_init(&pinst->lock);

1050
#ifdef CONFIG_HOTPLUG_CPU
1051
	cpuhp_state_add_instance_nocalls_cpuslocked(hp_online, &pinst->node);
1052
#endif
1053 1054
	return pinst;

1055 1056 1057
err_free_masks:
	free_cpumask_var(pinst->cpumask.pcpu);
	free_cpumask_var(pinst->cpumask.cbcpu);
1058 1059 1060 1061 1062 1063
err_free_inst:
	kfree(pinst);
err:
	return NULL;
}

1064 1065 1066 1067 1068 1069
/**
 * padata_alloc_possible - Allocate and initialize padata instance.
 *                         Use the cpu_possible_mask for serial and
 *                         parallel workers.
 *
 * @wq: workqueue to use for the allocated padata instance
1070 1071
 *
 * Must be called from a cpus_read_lock() protected region
1072 1073 1074
 */
struct padata_instance *padata_alloc_possible(struct workqueue_struct *wq)
{
1075
	lockdep_assert_cpus_held();
1076 1077 1078 1079
	return padata_alloc(wq, cpu_possible_mask, cpu_possible_mask);
}
EXPORT_SYMBOL(padata_alloc_possible);

1080
/**
1081 1082
 * padata_free - free a padata instance
 *
1083
 * @padata_inst: padata instance to free
1084 1085 1086
 */
void padata_free(struct padata_instance *pinst)
{
1087
	kobject_put(&pinst->kobj);
1088 1089
}
EXPORT_SYMBOL(padata_free);
1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112

#ifdef CONFIG_HOTPLUG_CPU

static __init int padata_driver_init(void)
{
	int ret;

	ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "padata:online",
				      padata_cpu_online,
				      padata_cpu_prep_down);
	if (ret < 0)
		return ret;
	hp_online = ret;
	return 0;
}
module_init(padata_driver_init);

static __exit void padata_driver_exit(void)
{
	cpuhp_remove_multi_state(hp_online);
}
module_exit(padata_driver_exit);
#endif