addr.c 49.4 KB
Newer Older
1
#include <linux/ceph/ceph_debug.h>
Sage Weil's avatar
Sage Weil committed
2 3 4 5 6 7

#include <linux/backing-dev.h>
#include <linux/fs.h>
#include <linux/mm.h>
#include <linux/pagemap.h>
#include <linux/writeback.h>	/* generic_writepages */
8
#include <linux/slab.h>
Sage Weil's avatar
Sage Weil committed
9 10 11 12
#include <linux/pagevec.h>
#include <linux/task_io_accounting_ops.h>

#include "super.h"
13
#include "mds_client.h"
14
#include "cache.h"
15
#include <linux/ceph/osd_client.h>
Sage Weil's avatar
Sage Weil committed
16 17 18 19 20 21 22 23 24 25 26 27

/*
 * Ceph address space ops.
 *
 * There are a few funny things going on here.
 *
 * The page->private field is used to reference a struct
 * ceph_snap_context for _every_ dirty page.  This indicates which
 * snapshot the page was logically dirtied in, and thus which snap
 * context needs to be associated with the osd write during writeback.
 *
 * Similarly, struct ceph_inode_info maintains a set of counters to
Lucas De Marchi's avatar
Lucas De Marchi committed
28
 * count dirty pages on the inode.  In the absence of snapshots,
Sage Weil's avatar
Sage Weil committed
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count.
 *
 * When a snapshot is taken (that is, when the client receives
 * notification that a snapshot was taken), each inode with caps and
 * with dirty pages (dirty pages implies there is a cap) gets a new
 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending
 * order, new snaps go to the tail).  The i_wrbuffer_ref_head count is
 * moved to capsnap->dirty. (Unless a sync write is currently in
 * progress.  In that case, the capsnap is said to be "pending", new
 * writes cannot start, and the capsnap isn't "finalized" until the
 * write completes (or fails) and a final size/mtime for the inode for
 * that snap can be settled upon.)  i_wrbuffer_ref_head is reset to 0.
 *
 * On writeback, we must submit writes to the osd IN SNAP ORDER.  So,
 * we look for the first capsnap in i_cap_snaps and write out pages in
 * that snap context _only_.  Then we move on to the next capsnap,
 * eventually reaching the "live" or "head" context (i.e., pages that
 * are not yet snapped) and are writing the most recently dirtied
 * pages.
 *
 * Invalidate and so forth must take care to ensure the dirty page
 * accounting is preserved.
 */

53 54 55 56 57
#define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
#define CONGESTION_OFF_THRESH(congestion_kb)				\
	(CONGESTION_ON_THRESH(congestion_kb) -				\
	 (CONGESTION_ON_THRESH(congestion_kb) >> 2))

58 59 60 61 62 63
static inline struct ceph_snap_context *page_snap_context(struct page *page)
{
	if (PagePrivate(page))
		return (void *)page->private;
	return NULL;
}
Sage Weil's avatar
Sage Weil committed
64 65 66 67 68 69 70 71 72 73 74

/*
 * Dirty a page.  Optimistically adjust accounting, on the assumption
 * that we won't race with invalidate.  If we do, readjust.
 */
static int ceph_set_page_dirty(struct page *page)
{
	struct address_space *mapping = page->mapping;
	struct inode *inode;
	struct ceph_inode_info *ci;
	struct ceph_snap_context *snapc;
75
	int ret;
Sage Weil's avatar
Sage Weil committed
76 77 78 79

	if (unlikely(!mapping))
		return !TestSetPageDirty(page);

80
	if (PageDirty(page)) {
Sage Weil's avatar
Sage Weil committed
81 82
		dout("%p set_page_dirty %p idx %lu -- already dirty\n",
		     mapping->host, page, page->index);
83
		BUG_ON(!PagePrivate(page));
Sage Weil's avatar
Sage Weil committed
84 85 86 87 88 89 90
		return 0;
	}

	inode = mapping->host;
	ci = ceph_inode(inode);

	/* dirty the head */
91
	spin_lock(&ci->i_ceph_lock);
92 93 94 95 96 97 98 99 100 101 102 103 104
	BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference
	if (__ceph_have_pending_cap_snap(ci)) {
		struct ceph_cap_snap *capsnap =
				list_last_entry(&ci->i_cap_snaps,
						struct ceph_cap_snap,
						ci_item);
		snapc = ceph_get_snap_context(capsnap->context);
		capsnap->dirty_pages++;
	} else {
		BUG_ON(!ci->i_head_snapc);
		snapc = ceph_get_snap_context(ci->i_head_snapc);
		++ci->i_wrbuffer_ref_head;
	}
Sage Weil's avatar
Sage Weil committed
105
	if (ci->i_wrbuffer_ref == 0)
106
		ihold(inode);
Sage Weil's avatar
Sage Weil committed
107 108 109 110 111 112 113
	++ci->i_wrbuffer_ref;
	dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d "
	     "snapc %p seq %lld (%d snaps)\n",
	     mapping->host, page, page->index,
	     ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
	     ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
	     snapc, snapc->seq, snapc->num_snaps);
114
	spin_unlock(&ci->i_ceph_lock);
Sage Weil's avatar
Sage Weil committed
115

116 117 118 119 120 121 122
	/*
	 * Reference snap context in page->private.  Also set
	 * PagePrivate so that we get invalidatepage callback.
	 */
	BUG_ON(PagePrivate(page));
	page->private = (unsigned long)snapc;
	SetPagePrivate(page);
Sage Weil's avatar
Sage Weil committed
123

124 125 126
	ret = __set_page_dirty_nobuffers(page);
	WARN_ON(!PageLocked(page));
	WARN_ON(!page->mapping);
Sage Weil's avatar
Sage Weil committed
127

128
	return ret;
Sage Weil's avatar
Sage Weil committed
129 130 131 132 133 134 135
}

/*
 * If we are truncating the full page (i.e. offset == 0), adjust the
 * dirty page counters appropriately.  Only called if there is private
 * data on the page.
 */
136 137
static void ceph_invalidatepage(struct page *page, unsigned int offset,
				unsigned int length)
Sage Weil's avatar
Sage Weil committed
138
{
139
	struct inode *inode;
Sage Weil's avatar
Sage Weil committed
140
	struct ceph_inode_info *ci;
141
	struct ceph_snap_context *snapc = page_snap_context(page);
Sage Weil's avatar
Sage Weil committed
142

143
	inode = page->mapping->host;
144 145
	ci = ceph_inode(inode);

146
	if (offset != 0 || length != PAGE_SIZE) {
147 148 149 150
		dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n",
		     inode, page, page->index, offset, length);
		return;
	}
151

152 153 154 155 156
	ceph_invalidate_fscache_page(inode, page);

	if (!PagePrivate(page))
		return;

Sage Weil's avatar
Sage Weil committed
157 158 159 160 161 162 163 164
	/*
	 * We can get non-dirty pages here due to races between
	 * set_page_dirty and truncate_complete_page; just spit out a
	 * warning, in case we end up with accounting problems later.
	 */
	if (!PageDirty(page))
		pr_err("%p invalidatepage %p page not dirty\n", inode, page);

165
	ClearPageChecked(page);
Sage Weil's avatar
Sage Weil committed
166

167 168 169 170 171 172 173
	dout("%p invalidatepage %p idx %lu full dirty page\n",
	     inode, page, page->index);

	ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
	ceph_put_snap_context(snapc);
	page->private = 0;
	ClearPagePrivate(page);
Sage Weil's avatar
Sage Weil committed
174 175 176 177
}

static int ceph_releasepage(struct page *page, gfp_t g)
{
178 179
	dout("%p releasepage %p idx %lu\n", page->mapping->host,
	     page, page->index);
Sage Weil's avatar
Sage Weil committed
180
	WARN_ON(PageDirty(page));
181 182 183 184 185 186

	/* Can we release the page from the cache? */
	if (!ceph_release_fscache_page(page, g))
		return 0;

	return !PagePrivate(page);
Sage Weil's avatar
Sage Weil committed
187 188 189 190 191 192 193
}

/*
 * read a single page, without unlocking it.
 */
static int readpage_nounlock(struct file *filp, struct page *page)
{
Al Viro's avatar
Al Viro committed
194
	struct inode *inode = file_inode(filp);
Sage Weil's avatar
Sage Weil committed
195
	struct ceph_inode_info *ci = ceph_inode(inode);
196
	struct ceph_osd_client *osdc =
197
		&ceph_inode_to_client(inode)->client->osdc;
Sage Weil's avatar
Sage Weil committed
198
	int err = 0;
Yan, Zheng's avatar
Yan, Zheng committed
199
	u64 off = page_offset(page);
200
	u64 len = PAGE_SIZE;
Sage Weil's avatar
Sage Weil committed
201

Yan, Zheng's avatar
Yan, Zheng committed
202
	if (off >= i_size_read(inode)) {
203
		zero_user_segment(page, 0, PAGE_SIZE);
Yan, Zheng's avatar
Yan, Zheng committed
204 205 206
		SetPageUptodate(page);
		return 0;
	}
207

208 209 210 211 212 213 214
	if (ci->i_inline_version != CEPH_INLINE_NONE) {
		/*
		 * Uptodate inline data should have been added
		 * into page cache while getting Fcr caps.
		 */
		if (off == 0)
			return -EINVAL;
215
		zero_user_segment(page, 0, PAGE_SIZE);
216 217 218
		SetPageUptodate(page);
		return 0;
	}
Yan, Zheng's avatar
Yan, Zheng committed
219 220

	err = ceph_readpage_from_fscache(inode, page);
221 222 223
	if (err == 0)
		goto out;

Sage Weil's avatar
Sage Weil committed
224 225 226
	dout("readpage inode %p file %p page %p index %lu\n",
	     inode, filp, page, page->index);
	err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
Yan, Zheng's avatar
Yan, Zheng committed
227
				  off, &len,
Sage Weil's avatar
Sage Weil committed
228
				  ci->i_truncate_seq, ci->i_truncate_size,
229
				  &page, 1, 0);
Sage Weil's avatar
Sage Weil committed
230 231 232 233
	if (err == -ENOENT)
		err = 0;
	if (err < 0) {
		SetPageError(page);
234
		ceph_fscache_readpage_cancel(inode, page);
Sage Weil's avatar
Sage Weil committed
235 236
		goto out;
	}
237
	if (err < PAGE_SIZE)
238
		/* zero fill remainder of page */
239
		zero_user_segment(page, err, PAGE_SIZE);
240 241
	else
		flush_dcache_page(page);
Sage Weil's avatar
Sage Weil committed
242

243 244
	SetPageUptodate(page);
	ceph_readpage_to_fscache(inode, page);
245

Sage Weil's avatar
Sage Weil committed
246 247 248 249 250 251 252 253 254 255 256 257
out:
	return err < 0 ? err : 0;
}

static int ceph_readpage(struct file *filp, struct page *page)
{
	int r = readpage_nounlock(filp, page);
	unlock_page(page);
	return r;
}

/*
Sage Weil's avatar
Sage Weil committed
258
 * Finish an async read(ahead) op.
Sage Weil's avatar
Sage Weil committed
259
 */
260
static void finish_read(struct ceph_osd_request *req)
Sage Weil's avatar
Sage Weil committed
261
{
Sage Weil's avatar
Sage Weil committed
262
	struct inode *inode = req->r_inode;
263
	struct ceph_osd_data *osd_data;
264 265
	int rc = req->r_result <= 0 ? req->r_result : 0;
	int bytes = req->r_result >= 0 ? req->r_result : 0;
266
	int num_pages;
Sage Weil's avatar
Sage Weil committed
267
	int i;
Sage Weil's avatar
Sage Weil committed
268

Sage Weil's avatar
Sage Weil committed
269 270 271
	dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);

	/* unlock all pages, zeroing any data we didn't read */
272
	osd_data = osd_req_op_extent_osd_data(req, 0);
273 274 275
	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
	num_pages = calc_pages_for((u64)osd_data->alignment,
					(u64)osd_data->length);
276
	for (i = 0; i < num_pages; i++) {
277
		struct page *page = osd_data->pages[i];
Sage Weil's avatar
Sage Weil committed
278

279
		if (rc < 0 && rc != -ENOENT)
280
			goto unlock;
281
		if (bytes < (int)PAGE_SIZE) {
Sage Weil's avatar
Sage Weil committed
282 283
			/* zero (remainder of) page */
			int s = bytes < 0 ? 0 : bytes;
284
			zero_user_segment(page, s, PAGE_SIZE);
Sage Weil's avatar
Sage Weil committed
285
		}
Sage Weil's avatar
Sage Weil committed
286 287 288 289
 		dout("finish_read %p uptodate %p idx %lu\n", inode, page,
		     page->index);
		flush_dcache_page(page);
		SetPageUptodate(page);
290
		ceph_readpage_to_fscache(inode, page);
291
unlock:
Sage Weil's avatar
Sage Weil committed
292
		unlock_page(page);
293 294
		put_page(page);
		bytes -= PAGE_SIZE;
Sage Weil's avatar
Sage Weil committed
295
	}
296
	kfree(osd_data->pages);
Sage Weil's avatar
Sage Weil committed
297 298
}

299 300 301 302 303 304 305 306
static void ceph_unlock_page_vector(struct page **pages, int num_pages)
{
	int i;

	for (i = 0; i < num_pages; i++)
		unlock_page(pages[i]);
}

Sage Weil's avatar
Sage Weil committed
307
/*
Sage Weil's avatar
Sage Weil committed
308 309
 * start an async read(ahead) operation.  return nr_pages we submitted
 * a read for on success, or negative error code.
Sage Weil's avatar
Sage Weil committed
310
 */
311
static int start_read(struct inode *inode, struct list_head *page_list, int max)
Sage Weil's avatar
Sage Weil committed
312
{
313 314
	struct ceph_osd_client *osdc =
		&ceph_inode_to_client(inode)->client->osdc;
Sage Weil's avatar
Sage Weil committed
315 316
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct page *page = list_entry(page_list->prev, struct page, lru);
317
	struct ceph_vino vino;
Sage Weil's avatar
Sage Weil committed
318 319
	struct ceph_osd_request *req;
	u64 off;
Sage Weil's avatar
Sage Weil committed
320
	u64 len;
Sage Weil's avatar
Sage Weil committed
321 322 323 324 325
	int i;
	struct page **pages;
	pgoff_t next_index;
	int nr_pages = 0;
	int ret;
Sage Weil's avatar
Sage Weil committed
326

327
	off = (u64) page_offset(page);
Sage Weil's avatar
Sage Weil committed
328

Sage Weil's avatar
Sage Weil committed
329 330 331 332 333 334 335
	/* count pages */
	next_index = page->index;
	list_for_each_entry_reverse(page, page_list, lru) {
		if (page->index != next_index)
			break;
		nr_pages++;
		next_index++;
336 337
		if (max && nr_pages == max)
			break;
Sage Weil's avatar
Sage Weil committed
338
	}
339
	len = nr_pages << PAGE_SHIFT;
Sage Weil's avatar
Sage Weil committed
340 341
	dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
	     off, len);
342 343
	vino = ceph_vino(inode);
	req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
344
				    0, 1, CEPH_OSD_OP_READ,
345
				    CEPH_OSD_FLAG_READ, NULL,
Sage Weil's avatar
Sage Weil committed
346
				    ci->i_truncate_seq, ci->i_truncate_size,
347
				    false);
348 349
	if (IS_ERR(req))
		return PTR_ERR(req);
Sage Weil's avatar
Sage Weil committed
350

Sage Weil's avatar
Sage Weil committed
351
	/* build page vector */
352
	nr_pages = calc_pages_for(0, len);
353
	pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL);
Sage Weil's avatar
Sage Weil committed
354 355 356 357 358 359
	ret = -ENOMEM;
	if (!pages)
		goto out;
	for (i = 0; i < nr_pages; ++i) {
		page = list_entry(page_list->prev, struct page, lru);
		BUG_ON(PageLocked(page));
Sage Weil's avatar
Sage Weil committed
360
		list_del(&page->lru);
361

Sage Weil's avatar
Sage Weil committed
362 363 364
 		dout("start_read %p adding %p idx %lu\n", inode, page,
		     page->index);
		if (add_to_page_cache_lru(page, &inode->i_data, page->index,
365
					  GFP_KERNEL)) {
366
			ceph_fscache_uncache_page(inode, page);
367
			put_page(page);
Sage Weil's avatar
Sage Weil committed
368
			dout("start_read %p add_to_page_cache failed %p\n",
Sage Weil's avatar
Sage Weil committed
369
			     inode, page);
Sage Weil's avatar
Sage Weil committed
370 371
			nr_pages = i;
			goto out_pages;
Sage Weil's avatar
Sage Weil committed
372
		}
Sage Weil's avatar
Sage Weil committed
373
		pages[i] = page;
Sage Weil's avatar
Sage Weil committed
374
	}
375
	osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
Sage Weil's avatar
Sage Weil committed
376 377 378 379 380 381 382 383 384 385 386
	req->r_callback = finish_read;
	req->r_inode = inode;

	dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
	ret = ceph_osdc_start_request(osdc, req, false);
	if (ret < 0)
		goto out_pages;
	ceph_osdc_put_request(req);
	return nr_pages;

out_pages:
387
	ceph_unlock_page_vector(pages, nr_pages);
Sage Weil's avatar
Sage Weil committed
388 389 390 391 392
	ceph_release_page_vector(pages, nr_pages);
out:
	ceph_osdc_put_request(req);
	return ret;
}
Sage Weil's avatar
Sage Weil committed
393

Sage Weil's avatar
Sage Weil committed
394 395 396 397 398 399 400 401

/*
 * Read multiple pages.  Leave pages we don't read + unlock in page_list;
 * the caller (VM) cleans them up.
 */
static int ceph_readpages(struct file *file, struct address_space *mapping,
			  struct list_head *page_list, unsigned nr_pages)
{
Al Viro's avatar
Al Viro committed
402
	struct inode *inode = file_inode(file);
403
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
Sage Weil's avatar
Sage Weil committed
404
	int rc = 0;
405 406
	int max = 0;

Yan, Zheng's avatar
Yan, Zheng committed
407 408 409
	if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
		return -EINVAL;

410 411 412 413 414 415
	rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
					 &nr_pages);

	if (rc == 0)
		goto out;

416 417
	if (fsc->mount_options->rsize >= PAGE_SIZE)
		max = (fsc->mount_options->rsize + PAGE_SIZE - 1)
418
			>> PAGE_SHIFT;
Sage Weil's avatar
Sage Weil committed
419

420 421
	dout("readpages %p file %p nr_pages %d max %d\n", inode,
		file, nr_pages,
422
	     max);
Sage Weil's avatar
Sage Weil committed
423
	while (!list_empty(page_list)) {
424
		rc = start_read(inode, page_list, max);
Sage Weil's avatar
Sage Weil committed
425 426 427 428
		if (rc < 0)
			goto out;
		BUG_ON(rc == 0);
	}
Sage Weil's avatar
Sage Weil committed
429
out:
430 431
	ceph_fscache_readpages_cancel(inode, page_list);

Sage Weil's avatar
Sage Weil committed
432
	dout("readpages %p file %p ret %d\n", inode, file, rc);
Sage Weil's avatar
Sage Weil committed
433 434 435 436 437 438 439
	return rc;
}

/*
 * Get ref for the oldest snapc for an inode with dirty data... that is, the
 * only snap context we are allowed to write back.
 */
440
static struct ceph_snap_context *get_oldest_context(struct inode *inode,
441
						    loff_t *snap_size)
Sage Weil's avatar
Sage Weil committed
442 443 444 445 446
{
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_snap_context *snapc = NULL;
	struct ceph_cap_snap *capsnap = NULL;

447
	spin_lock(&ci->i_ceph_lock);
Sage Weil's avatar
Sage Weil committed
448 449 450 451 452 453 454 455 456 457
	list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
		dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
		     capsnap->context, capsnap->dirty_pages);
		if (capsnap->dirty_pages) {
			snapc = ceph_get_snap_context(capsnap->context);
			if (snap_size)
				*snap_size = capsnap->size;
			break;
		}
	}
458
	if (!snapc && ci->i_wrbuffer_ref_head) {
459
		snapc = ceph_get_snap_context(ci->i_head_snapc);
Sage Weil's avatar
Sage Weil committed
460 461 462
		dout(" head snapc %p has %d dirty pages\n",
		     snapc, ci->i_wrbuffer_ref_head);
	}
463
	spin_unlock(&ci->i_ceph_lock);
Sage Weil's avatar
Sage Weil committed
464 465 466 467 468 469 470 471 472 473 474 475 476
	return snapc;
}

/*
 * Write a single page, but leave the page locked.
 *
 * If we get a write error, set the page error bit, but still adjust the
 * dirty page accounting (i.e., page is no longer dirty).
 */
static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
{
	struct inode *inode;
	struct ceph_inode_info *ci;
477
	struct ceph_fs_client *fsc;
Sage Weil's avatar
Sage Weil committed
478
	struct ceph_osd_client *osdc;
479
	struct ceph_snap_context *snapc, *oldest;
480
	loff_t page_off = page_offset(page);
481
	loff_t snap_size = -1;
482
	long writeback_stat;
483
	u64 truncate_size;
484
	u32 truncate_seq;
485
	int err = 0, len = PAGE_SIZE;
Sage Weil's avatar
Sage Weil committed
486 487 488 489 490 491 492 493 494

	dout("writepage %p idx %lu\n", page, page->index);

	if (!page->mapping || !page->mapping->host) {
		dout("writepage %p - no mapping\n", page);
		return -EFAULT;
	}
	inode = page->mapping->host;
	ci = ceph_inode(inode);
495 496
	fsc = ceph_inode_to_client(inode);
	osdc = &fsc->client->osdc;
Sage Weil's avatar
Sage Weil committed
497 498

	/* verify this is a writeable snap context */
499
	snapc = page_snap_context(page);
Sage Weil's avatar
Sage Weil committed
500 501 502 503
	if (snapc == NULL) {
		dout("writepage %p page %p not dirty?\n", inode, page);
		goto out;
	}
504 505
	oldest = get_oldest_context(inode, &snap_size);
	if (snapc->seq > oldest->seq) {
Sage Weil's avatar
Sage Weil committed
506
		dout("writepage %p page %p snapc %p not writeable - noop\n",
507
		     inode, page, snapc);
Sage Weil's avatar
Sage Weil committed
508 509
		/* we should only noop if called by kswapd */
		WARN_ON((current->flags & PF_MEMALLOC) == 0);
510
		ceph_put_snap_context(oldest);
Sage Weil's avatar
Sage Weil committed
511 512
		goto out;
	}
513
	ceph_put_snap_context(oldest);
Sage Weil's avatar
Sage Weil committed
514

515 516 517
	spin_lock(&ci->i_ceph_lock);
	truncate_seq = ci->i_truncate_seq;
	truncate_size = ci->i_truncate_size;
518
	if (snap_size == -1)
519 520 521
		snap_size = i_size_read(inode);
	spin_unlock(&ci->i_ceph_lock);

Sage Weil's avatar
Sage Weil committed
522
	/* is this a partial page at end of file? */
523 524 525 526 527 528
	if (page_off >= snap_size) {
		dout("%p page eof %llu\n", page, snap_size);
		goto out;
	}
	if (snap_size < page_off + len)
		len = snap_size - page_off;
Sage Weil's avatar
Sage Weil committed
529

530 531
	dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
	     inode, page, page->index, page_off, len, snapc);
Sage Weil's avatar
Sage Weil committed
532

533
	writeback_stat = atomic_long_inc_return(&fsc->writeback_count);
534
	if (writeback_stat >
535 536
	    CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
		set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
537

538 539
	ceph_readpage_to_fscache(inode, page);

Sage Weil's avatar
Sage Weil committed
540 541 542 543
	set_page_writeback(page);
	err = ceph_osdc_writepages(osdc, ceph_vino(inode),
				   &ci->i_layout, snapc,
				   page_off, len,
544
				   truncate_seq, truncate_size,
545
				   &inode->i_mtime, &page, 1);
Sage Weil's avatar
Sage Weil committed
546
	if (err < 0) {
547 548 549 550 551 552 553 554 555 556 557 558
		struct writeback_control tmp_wbc;
		if (!wbc)
			wbc = &tmp_wbc;
		if (err == -ERESTARTSYS) {
			/* killed by SIGKILL */
			dout("writepage interrupted page %p\n", page);
			redirty_page_for_writepage(wbc, page);
			end_page_writeback(page);
			goto out;
		}
		dout("writepage setting page/mapping error %d %p\n",
		     err, page);
Sage Weil's avatar
Sage Weil committed
559 560
		SetPageError(page);
		mapping_set_error(&inode->i_data, err);
561
		wbc->pages_skipped++;
Sage Weil's avatar
Sage Weil committed
562 563 564 565 566 567 568 569
	} else {
		dout("writepage cleaned page %p\n", page);
		err = 0;  /* vfs expects us to return 0 */
	}
	page->private = 0;
	ClearPagePrivate(page);
	end_page_writeback(page);
	ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
570
	ceph_put_snap_context(snapc);  /* page's reference */
Sage Weil's avatar
Sage Weil committed
571 572 573 574 575 576
out:
	return err;
}

static int ceph_writepage(struct page *page, struct writeback_control *wbc)
{
577 578 579
	int err;
	struct inode *inode = page->mapping->host;
	BUG_ON(!inode);
580
	ihold(inode);
581
	err = writepage_nounlock(page, wbc);
582 583 584 585 586
	if (err == -ERESTARTSYS) {
		/* direct memory reclaimer was killed by SIGKILL. return 0
		 * to prevent caller from setting mapping/page error */
		err = 0;
	}
Sage Weil's avatar
Sage Weil committed
587
	unlock_page(page);
588
	iput(inode);
Sage Weil's avatar
Sage Weil committed
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614
	return err;
}

/*
 * lame release_pages helper.  release_pages() isn't exported to
 * modules.
 */
static void ceph_release_pages(struct page **pages, int num)
{
	struct pagevec pvec;
	int i;

	pagevec_init(&pvec, 0);
	for (i = 0; i < num; i++) {
		if (pagevec_add(&pvec, pages[i]) == 0)
			pagevec_release(&pvec);
	}
	pagevec_release(&pvec);
}

/*
 * async writeback completion handler.
 *
 * If we get an error, set the mapping error bit, but not the individual
 * page error bits.
 */
615
static void writepages_finish(struct ceph_osd_request *req)
Sage Weil's avatar
Sage Weil committed
616 617 618
{
	struct inode *inode = req->r_inode;
	struct ceph_inode_info *ci = ceph_inode(inode);
619
	struct ceph_osd_data *osd_data;
Sage Weil's avatar
Sage Weil committed
620
	struct page *page;
Yan, Zheng's avatar
Yan, Zheng committed
621 622 623
	int num_pages, total_pages = 0;
	int i, j;
	int rc = req->r_result;
Sage Weil's avatar
Sage Weil committed
624 625
	struct ceph_snap_context *snapc = req->r_snapc;
	struct address_space *mapping = inode->i_mapping;
626
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
Yan, Zheng's avatar
Yan, Zheng committed
627
	bool remove_page;
Sage Weil's avatar
Sage Weil committed
628

Yan, Zheng's avatar
Yan, Zheng committed
629 630
	dout("writepages_finish %p rc %d\n", inode, rc);
	if (rc < 0)
Sage Weil's avatar
Sage Weil committed
631
		mapping_set_error(mapping, rc);
Yan, Zheng's avatar
Yan, Zheng committed
632 633 634 635 636 637 638 639 640

	/*
	 * We lost the cache cap, need to truncate the page before
	 * it is unlocked, otherwise we'd truncate it later in the
	 * page truncation thread, possibly losing some data that
	 * raced its way in
	 */
	remove_page = !(ceph_caps_issued(ci) &
			(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
Sage Weil's avatar
Sage Weil committed
641 642

	/* clean all pages */
Yan, Zheng's avatar
Yan, Zheng committed
643 644 645
	for (i = 0; i < req->r_num_ops; i++) {
		if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
			break;
646

Yan, Zheng's avatar
Yan, Zheng committed
647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662
		osd_data = osd_req_op_extent_osd_data(req, i);
		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
		num_pages = calc_pages_for((u64)osd_data->alignment,
					   (u64)osd_data->length);
		total_pages += num_pages;
		for (j = 0; j < num_pages; j++) {
			page = osd_data->pages[j];
			BUG_ON(!page);
			WARN_ON(!PageUptodate(page));

			if (atomic_long_dec_return(&fsc->writeback_count) <
			     CONGESTION_OFF_THRESH(
					fsc->mount_options->congestion_kb))
				clear_bdi_congested(&fsc->backing_dev_info,
						    BLK_RW_ASYNC);

663 664 665
			if (rc < 0)
				SetPageError(page);

Yan, Zheng's avatar
Yan, Zheng committed
666 667 668 669 670 671 672 673 674 675 676 677 678 679
			ceph_put_snap_context(page_snap_context(page));
			page->private = 0;
			ClearPagePrivate(page);
			dout("unlocking %p\n", page);
			end_page_writeback(page);

			if (remove_page)
				generic_error_remove_page(inode->i_mapping,
							  page);

			unlock_page(page);
		}
		dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
		     inode, osd_data->length, rc >= 0 ? num_pages : 0);
680

Yan, Zheng's avatar
Yan, Zheng committed
681
		ceph_release_pages(osd_data->pages, num_pages);
Sage Weil's avatar
Sage Weil committed
682 683
	}

Yan, Zheng's avatar
Yan, Zheng committed
684 685 686
	ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);

	osd_data = osd_req_op_extent_osd_data(req, 0);
687 688
	if (osd_data->pages_from_pool)
		mempool_free(osd_data->pages,
689
			     ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
Sage Weil's avatar
Sage Weil committed
690
	else
691
		kfree(osd_data->pages);
Sage Weil's avatar
Sage Weil committed
692 693 694 695 696 697 698 699 700 701 702
	ceph_osdc_put_request(req);
}

/*
 * initiate async writeback
 */
static int ceph_writepages_start(struct address_space *mapping,
				 struct writeback_control *wbc)
{
	struct inode *inode = mapping->host;
	struct ceph_inode_info *ci = ceph_inode(inode);
703 704
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_vino vino = ceph_vino(inode);
Sage Weil's avatar
Sage Weil committed
705 706 707 708
	pgoff_t index, start, end;
	int range_whole = 0;
	int should_loop = 1;
	pgoff_t max_pages = 0, max_pages_ever = 0;
709
	struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
Sage Weil's avatar
Sage Weil committed
710 711 712 713 714
	struct pagevec pvec;
	int done = 0;
	int rc = 0;
	unsigned wsize = 1 << inode->i_blkbits;
	struct ceph_osd_request *req = NULL;
715
	int do_sync = 0;
716 717
	loff_t snap_size, i_size;
	u64 truncate_size;
718
	u32 truncate_seq;
Sage Weil's avatar
Sage Weil committed
719 720 721 722 723 724

	/*
	 * Include a 'sync' in the OSD request if this is a data
	 * integrity write (e.g., O_SYNC write or fsync()), or if our
	 * cap is being revoked.
	 */
725 726
	if ((wbc->sync_mode == WB_SYNC_ALL) ||
		ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))
Sage Weil's avatar
Sage Weil committed
727 728 729 730 731 732
		do_sync = 1;
	dout("writepages_start %p dosync=%d (mode=%s)\n",
	     inode, do_sync,
	     wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
	     (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));

733
	if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
734 735 736 737 738
		if (ci->i_wrbuffer_ref > 0) {
			pr_warn_ratelimited(
				"writepage_start %p %lld forced umount\n",
				inode, ceph_ino(inode));
		}
739
		mapping_set_error(mapping, -EIO);
Sage Weil's avatar
Sage Weil committed
740 741
		return -EIO; /* we're in a forced umount, don't write! */
	}
742 743
	if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize)
		wsize = fsc->mount_options->wsize;
744 745 746
	if (wsize < PAGE_SIZE)
		wsize = PAGE_SIZE;
	max_pages_ever = wsize >> PAGE_SHIFT;
Sage Weil's avatar
Sage Weil committed
747 748 749 750 751 752 753 754 755

	pagevec_init(&pvec, 0);

	/* where to start/end? */
	if (wbc->range_cyclic) {
		start = mapping->writeback_index; /* Start from prev offset */
		end = -1;
		dout(" cyclic, start at %lu\n", start);
	} else {
756 757
		start = wbc->range_start >> PAGE_SHIFT;
		end = wbc->range_end >> PAGE_SHIFT;
Sage Weil's avatar
Sage Weil committed
758 759 760 761 762 763 764 765 766 767
		if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
			range_whole = 1;
		should_loop = 0;
		dout(" not cyclic, %lu to %lu\n", start, end);
	}
	index = start;

retry:
	/* find oldest snap context with dirty data */
	ceph_put_snap_context(snapc);
768
	snap_size = -1;
Sage Weil's avatar
Sage Weil committed
769 770 771 772 773 774 775 776 777
	snapc = get_oldest_context(inode, &snap_size);
	if (!snapc) {
		/* hmm, why does writepages get called when there
		   is no dirty data? */
		dout(" no snap context with dirty data?\n");
		goto out;
	}
	dout(" oldest snapc is %p seq %lld (%d snaps)\n",
	     snapc, snapc->seq, snapc->num_snaps);
778 779 780 781

	spin_lock(&ci->i_ceph_lock);
	truncate_seq = ci->i_truncate_seq;
	truncate_size = ci->i_truncate_size;
782
	i_size = i_size_read(inode);
783 784
	spin_unlock(&ci->i_ceph_lock);

Sage Weil's avatar
Sage Weil committed
785 786 787 788 789 790 791 792 793 794 795 796
	if (last_snapc && snapc != last_snapc) {
		/* if we switched to a newer snapc, restart our scan at the
		 * start of the original file range. */
		dout("  snapc differs from last pass, restarting at %lu\n",
		     index);
		index = start;
	}
	last_snapc = snapc;

	while (!done && index <= end) {
		unsigned i;
		int first;
Yan, Zheng's avatar
Yan, Zheng committed
797 798 799 800
		pgoff_t strip_unit_end = 0;
		int num_ops = 0, op_idx;
		int pvec_pages, locked_pages = 0;
		struct page **pages = NULL, **data_pages;
801
		mempool_t *pool = NULL;	/* Becomes non-null if mempool used */
Sage Weil's avatar
Sage Weil committed
802 803
		struct page *page;
		int want;
Yan, Zheng's avatar
Yan, Zheng committed
804
		u64 offset = 0, len = 0;
Sage Weil's avatar
Sage Weil committed
805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840

		max_pages = max_pages_ever;

get_more_pages:
		first = -1;
		want = min(end - index,
			   min((pgoff_t)PAGEVEC_SIZE,
			       max_pages - (pgoff_t)locked_pages) - 1)
			+ 1;
		pvec_pages = pagevec_lookup_tag(&pvec, mapping, &index,
						PAGECACHE_TAG_DIRTY,
						want);
		dout("pagevec_lookup_tag got %d\n", pvec_pages);
		if (!pvec_pages && !locked_pages)
			break;
		for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
			page = pvec.pages[i];
			dout("? %p idx %lu\n", page, page->index);
			if (locked_pages == 0)
				lock_page(page);  /* first page */
			else if (!trylock_page(page))
				break;

			/* only dirty pages, or our accounting breaks */
			if (unlikely(!PageDirty(page)) ||
			    unlikely(page->mapping != mapping)) {
				dout("!dirty or !mapping %p\n", page);
				unlock_page(page);
				break;
			}
			if (!wbc->range_cyclic && page->index > end) {
				dout("end of range %p\n", page);
				done = 1;
				unlock_page(page);
				break;
			}
Yan, Zheng's avatar
Yan, Zheng committed
841 842
			if (strip_unit_end && (page->index > strip_unit_end)) {
				dout("end of strip unit %p\n", page);
Sage Weil's avatar
Sage Weil committed
843 844 845 846 847 848 849
				unlock_page(page);
				break;
			}
			if (wbc->sync_mode != WB_SYNC_NONE) {
				dout("waiting on writeback %p\n", page);
				wait_on_page_writeback(page);
			}
850 851 852 853
			if (page_offset(page) >=
			    (snap_size == -1 ? i_size : snap_size)) {
				dout("%p page eof %llu\n", page,
				     (snap_size == -1 ? i_size : snap_size));
Sage Weil's avatar
Sage Weil committed
854 855 856 857 858 859 860 861 862 863 864
				done = 1;
				unlock_page(page);
				break;
			}
			if (PageWriteback(page)) {
				dout("%p under writeback\n", page);
				unlock_page(page);
				break;
			}

			/* only if matching snap context */
865
			pgsnapc = page_snap_context(page);
866 867 868
			if (pgsnapc->seq > snapc->seq) {
				dout("page snapc %p %lld > oldest %p %lld\n",
				     pgsnapc, pgsnapc->seq, snapc, snapc->seq);
Sage Weil's avatar
Sage Weil committed
869 870 871 872 873 874 875 876 877 878 879 880
				unlock_page(page);
				if (!locked_pages)
					continue; /* keep looking for snap */
				break;
			}

			if (!clear_page_dirty_for_io(page)) {
				dout("%p !clear_page_dirty_for_io\n", page);
				unlock_page(page);
				break;
			}

881 882 883
			/*
			 * We have something to write.  If this is
			 * the first locked page this time through,
Yan, Zheng's avatar
Yan, Zheng committed
884 885
			 * calculate max possinle write size and
			 * allocate a page array
886
			 */
Sage Weil's avatar
Sage Weil committed
887
			if (locked_pages == 0) {
Yan, Zheng's avatar
Yan, Zheng committed
888 889 890
				u64 objnum;
				u64 objoff;

Sage Weil's avatar
Sage Weil committed
891
				/* prepare async write request */
892
				offset = (u64)page_offset(page);
Sage Weil's avatar
Sage Weil committed
893
				len = wsize;
Yan, Zheng's avatar
Yan, Zheng committed
894 895 896 897 898 899

				rc = ceph_calc_file_object_mapping(&ci->i_layout,
								offset, len,
								&objnum, &objoff,
								&len);
				if (rc < 0) {
900 901 902 903
					unlock_page(page);
					break;
				}

Yan, Zheng's avatar
Yan, Zheng committed
904 905
				num_ops = 1 + do_sync;
				strip_unit_end = page->index +
906
					((len - 1) >> PAGE_SHIFT);
907

Yan, Zheng's avatar
Yan, Zheng committed
908
				BUG_ON(pages);
909
				max_pages = calc_pages_for(0, (u64)len);
910 911
				pages = kmalloc(max_pages * sizeof (*pages),
						GFP_NOFS);
912 913 914
				if (!pages) {
					pool = fsc->wb_pagevec_pool;
					pages = mempool_alloc(pool, GFP_NOFS);
915
					BUG_ON(!pages);
916
				}
Yan, Zheng's avatar
Yan, Zheng committed
917 918 919

				len = 0;
			} else if (page->index !=
920
				   (offset + len) >> PAGE_SHIFT) {
Yan, Zheng's avatar
Yan, Zheng committed
921 922 923 924 925 926 927 928 929 930
				if (num_ops >= (pool ?  CEPH_OSD_SLAB_OPS :
							CEPH_OSD_MAX_OPS)) {
					redirty_page_for_writepage(wbc, page);
					unlock_page(page);
					break;
				}

				num_ops++;
				offset = (u64)page_offset(page);
				len = 0;
Sage Weil's avatar
Sage Weil committed
931 932 933 934 935 936 937
			}

			/* note position of first page in pvec */
			if (first < 0)
				first = i;
			dout("%p will write page %p idx %lu\n",
			     inode, page, page->index);
938

Yan, Zheng's avatar
Yan, Zheng committed
939 940
			if (atomic_long_inc_return(&fsc->writeback_count) >
			    CONGESTION_ON_THRESH(
941 942
				    fsc->mount_options->congestion_kb)) {
				set_bdi_congested(&fsc->backing_dev_info,
Sage Weil's avatar
Sage Weil committed
943
						  BLK_RW_ASYNC);
944 945
			}

946
			pages[locked_pages] = page;
Sage Weil's avatar
Sage Weil committed
947
			locked_pages++;
948
			len += PAGE_SIZE;
Sage Weil's avatar
Sage Weil committed
949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967
		}

		/* did we get anything? */
		if (!locked_pages)
			goto release_pvec_pages;
		if (i) {
			int j;
			BUG_ON(!locked_pages || first < 0);

			if (pvec_pages && i == pvec_pages &&
			    locked_pages < max_pages) {
				dout("reached end pvec, trying for more\n");
				pagevec_reinit(&pvec);
				goto get_more_pages;
			}

			/* shift unused pages over in the pvec...  we
			 * will need to release them below. */
			for (j = i; j < pvec_pages; j++) {
Yan, Zheng's avatar
Yan, Zheng committed
968
				dout(" pvec leftover page %p\n", pvec.pages[j]);
Sage Weil's avatar
Sage Weil committed
969 970 971 972 973
				pvec.pages[j-i+first] = pvec.pages[j];
			}
			pvec.nr -= i-first;
		}

Yan, Zheng's avatar
Yan, Zheng committed
974
new_request:
975
		offset = page_offset(pages[0]);
Yan, Zheng's avatar
Yan, Zheng committed
976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997
		len = wsize;

		req = ceph_osdc_new_request(&fsc->client->osdc,
					&ci->i_layout, vino,
					offset, &len, 0, num_ops,
					CEPH_OSD_OP_WRITE,
					CEPH_OSD_FLAG_WRITE |
					CEPH_OSD_FLAG_ONDISK,
					snapc, truncate_seq,
					truncate_size, false);
		if (IS_ERR(req)) {
			req = ceph_osdc_new_request(&fsc->client->osdc,
						&ci->i_layout, vino,
						offset, &len, 0,
						min(num_ops,
						    CEPH_OSD_SLAB_OPS),
						CEPH_OSD_OP_WRITE,
						CEPH_OSD_FLAG_WRITE |
						CEPH_OSD_FLAG_ONDISK,
						snapc, truncate_seq,
						truncate_size, true);
			BUG_ON(IS_ERR(req));
998
		}
Yan, Zheng's avatar
Yan, Zheng committed
999
		BUG_ON(len < page_offset(pages[locked_pages - 1]) +
1000
			     PAGE_SIZE - offset);
Yan, Zheng's avatar
Yan, Zheng committed
1001 1002 1003

		req->r_callback = writepages_finish;
		req->r_inode = inode;
Sage Weil's avatar
Sage Weil committed
1004

Yan, Zheng's avatar
Yan, Zheng committed
1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019
		/* Format the osd request message and submit the write */
		len = 0;
		data_pages = pages;
		op_idx = 0;
		for (i = 0; i < locked_pages; i++) {
			u64 cur_offset = page_offset(pages[i]);
			if (offset + len != cur_offset) {
				if (op_idx + do_sync + 1 == req->r_num_ops)
					break;
				osd_req_op_extent_dup_last(req, op_idx,
							   cur_offset - offset);
				dout("writepages got pages at %llu~%llu\n",
				     offset, len);
				osd_req_op_extent_osd_data_pages(req, op_idx,
							data_pages, len, 0,
1020
							!!pool, false);
Yan, Zheng's avatar
Yan, Zheng committed
1021
				osd_req_op_extent_update(req, op_idx, len);
1022

Yan, Zheng's avatar
Yan, Zheng committed
1023 1024 1025 1026 1027 1028 1029
				len = 0;
				offset = cur_offset; 
				data_pages = pages + i;
				op_idx++;
			}

			set_page_writeback(pages[i]);
1030
			len += PAGE_SIZE;
Yan, Zheng's avatar
Yan, Zheng committed
1031 1032 1033 1034 1035 1036 1037 1038
		}

		if (snap_size != -1) {
			len = min(len, snap_size - offset);
		} else if (i == locked_pages) {
			/* writepages_finish() clears writeback pages
			 * according to the data length, so make sure
			 * data length covers all locked pages */
1039
			u64 min_len = len + 1 - PAGE_SIZE;
Yan, Zheng's avatar
Yan, Zheng committed
1040 1041 1042 1043
			len = min(len, (u64)i_size_read(inode) - offset);
			len = max(len, min_len);
		}
		dout("writepages got pages at %llu~%llu\n", offset, len);
1044

Yan, Zheng's avatar
Yan, Zheng committed
1045 1046 1047
		osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
						 0, !!pool, false);
		osd_req_op_extent_update(req, op_idx, len);
1048

Yan, Zheng's avatar
Yan, Zheng committed
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080
		if (do_sync) {
			op_idx++;
			osd_req_op_init(req, op_idx, CEPH_OSD_OP_STARTSYNC, 0);
		}
		BUG_ON(op_idx + 1 != req->r_num_ops);

		pool = NULL;
		if (i < locked_pages) {
			BUG_ON(num_ops <= req->r_num_ops);
			num_ops -= req->r_num_ops;
			num_ops += do_sync;
			locked_pages -= i;

			/* allocate new pages array for next request */
			data_pages = pages;
			pages = kmalloc(locked_pages * sizeof (*pages),
					GFP_NOFS);
			if (!pages) {
				pool = fsc->wb_pagevec_pool;
				pages = mempool_alloc(pool, GFP_NOFS);
				BUG_ON(!pages);
			}
			memcpy(pages, data_pages + i,
			       locked_pages * sizeof(*pages));
			memset(data_pages + i, 0,
			       locked_pages * sizeof(*pages));
		} else {
			BUG_ON(num_ops != req->r_num_ops);
			index = pages[i - 1]->index + 1;
			/* request message now owns the pages array */
			pages = NULL;
		}
1081

1082
		req->r_mtime = inode->i_mtime;
1083 1084
		rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
		BUG_ON(rc);
Sage Weil's avatar
Sage Weil committed
1085 1086
		req = NULL;

Yan, Zheng's avatar
Yan, Zheng committed
1087 1088 1089 1090
		wbc->nr_to_write -= i;
		if (pages)
			goto new_request;

Sage Weil's avatar
Sage Weil committed
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114
		if (wbc->nr_to_write <= 0)
			done = 1;

release_pvec_pages:
		dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
		     pvec.nr ? pvec.pages[0] : NULL);
		pagevec_release(&pvec);

		if (locked_pages && !done)
			goto retry;
	}

	if (should_loop && !done) {
		/* more to do; loop back to beginning of file */
		dout("writepages looping back to beginning of file\n");
		should_loop = 0;
		index = 0;
		goto retry;
	}

	if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
		mapping->writeback_index = index;

out:
1115
	ceph_osdc_put_request(req);
Sage Weil's avatar
Sage Weil committed
1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129
	ceph_put_snap_context(snapc);
	dout("writepages done, rc = %d\n", rc);
	return rc;
}



/*
 * See if a given @snapc is either writeable, or already written.
 */
static int context_is_writeable_or_written(struct inode *inode,
					   struct ceph_snap_context *snapc)
{
	struct ceph_snap_context *oldest = get_oldest_context(inode, NULL);
1130 1131 1132 1133
	int ret = !oldest || snapc->seq <= oldest->seq;

	ceph_put_snap_context(oldest);
	return ret;
Sage Weil's avatar
Sage Weil committed
1134 1135 1136 1137 1138
}

/*
 * We are only allowed to write into/dirty the page if the page is
 * clean, or already dirty within the same snap context.
1139 1140 1141 1142
 *
 * called with page locked.
 * return success with page locked,
 * or any failure (incl -EAGAIN) with page unlocked.
Sage Weil's avatar
Sage Weil committed
1143
 */
1144 1145 1146
static int ceph_update_writeable_page(struct file *file,
			    loff_t pos, unsigned len,
			    struct page *page)
Sage Weil's avatar
Sage Weil committed
1147
{
Al Viro's avatar
Al Viro committed
1148
	struct inode *inode = file_inode(file);
1149
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
Sage Weil's avatar
Sage Weil committed
1150
	struct ceph_inode_info *ci = ceph_inode(inode);
1151 1152
	loff_t page_off = pos & PAGE_MASK;
	int pos_in_page = pos & ~PAGE_MASK;
Sage Weil's avatar
Sage Weil committed
1153 1154 1155
	int end_in_page = pos_in_page + len;
	loff_t i_size;
	int r;
1156
	struct ceph_snap_context *snapc, *oldest;
Sage Weil's avatar
Sage Weil committed
1157

1158 1159 1160 1161 1162 1163
	if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
		dout(" page %p forced umount\n", page);
		unlock_page(page);
		return -EIO;
	}

Sage Weil's avatar
Sage Weil committed
1164 1165 1166 1167
retry_locked:
	/* writepages currently holds page lock, but if we change that later, */
	wait_on_page_writeback(page);

1168
	snapc = page_snap_context(page);
1169
	if (snapc && snapc != ci->i_head_snapc) {
Sage Weil's avatar
Sage Weil committed
1170 1171 1172 1173
		/*
		 * this page is already dirty in another (older) snap
		 * context!  is it writeable now?
		 */
1174
		oldest = get_oldest_context(inode, NULL);
Sage Weil's avatar
Sage Weil committed
1175

1176
		if (snapc->seq > oldest->seq) {
1177
			ceph_put_snap_context(oldest);
Sage Weil's avatar
Sage Weil committed
1178
			dout(" page %p snapc %p not current or oldest\n",
1179
			     page, snapc);
Sage Weil's avatar
Sage Weil committed
1180 1181 1182 1183
			/*
			 * queue for writeback, and wait for snapc to
			 * be writeable or written
			 */
1184
			snapc = ceph_get_snap_context(snapc);
Sage Weil's avatar
Sage Weil committed
1185
			unlock_page(page);
1186
			ceph_queue_writeback(inode);
1187
			r = wait_event_killable(ci->i_cap_wq,
Sage Weil's avatar
Sage Weil committed
1188 1189
			       context_is_writeable_or_written(inode, snapc));
			ceph_put_snap_context(snapc);
1190 1191
			if (r == -ERESTARTSYS)
				return r;
1192
			return -EAGAIN;
Sage Weil's avatar
Sage Weil committed
1193
		}
1194
		ceph_put_snap_context(oldest);
Sage Weil's avatar
Sage Weil committed
1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212

		/* yay, writeable, do it now (without dropping page lock) */
		dout(" page %p snapc %p not current, but oldest\n",
		     page, snapc);
		if (!clear_page_dirty_for_io(page))
			goto retry_locked;
		r = writepage_nounlock(page, NULL);
		if (r < 0)
			goto fail_nosnap;
		goto retry_locked;
	}

	if (PageUptodate(page)) {
		dout(" page %p already uptodate\n", page);
		return 0;
	}

	/* full page? */
1213
	if (pos_in_page == 0 && len == PAGE_SIZE)
Sage Weil's avatar
Sage Weil committed
1214 1215 1216
		return 0;

	/* past end of file? */
1217
	i_size = i_size_read(inode);
Sage Weil's avatar
Sage Weil committed
1218 1219 1220

	if (page_off >= i_size ||
	    (pos_in_page == 0 && (pos+len) >= i_size &&
1221
	     end_in_page - pos_in_page != PAGE_SIZE)) {
Sage Weil's avatar
Sage Weil committed
1222
		dout(" zeroing %p 0 - %d and %d - %d\n",
1223
		     page, pos_in_page, end_in_page, (int)PAGE_SIZE);
Sage Weil's avatar
Sage Weil committed
1224 1225
		zero_user_segments(page,
				   0, pos_in_page,
1226
				   end_in_page, PAGE_SIZE);
Sage Weil's avatar
Sage Weil committed
1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239
		return 0;
	}

	/* we need to read it. */
	r = readpage_nounlock(file, page);
	if (r < 0)
		goto fail_nosnap;
	goto retry_locked;
fail_nosnap:
	unlock_page(page);
	return r;
}

1240 1241 1242 1243 1244 1245 1246 1247
/*
 * We are only allowed to write into/dirty the page if the page is
 * clean, or already dirty within the same snap context.
 */
static int ceph_write_begin(struct file *file, struct address_space *mapping,
			    loff_t pos, unsigned len, unsigned flags,
			    struct page **pagep, void **fsdata)
{
Al Viro's avatar
Al Viro committed
1248
	struct inode *inode = file_inode(file);
1249
	struct page *page;
1250
	pgoff_t index = pos >> PAGE_SHIFT;
Sage Weil's avatar
Sage Weil committed
1251
	int r;
1252 1253

	do {