Commit d5a38f6e authored by Linus Torvalds's avatar Linus Torvalds
Browse files

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph updates from Sage Weil:
 "There is quite a bit here, including some overdue refactoring and
  cleanup on the mon_client and osd_client code from Ilya, scattered
  writeback support for CephFS and a pile of bug fixes from Zheng, and a
  few random cleanups and fixes from others"

[ I already decided not to pull this because of it having been rebased
  recently, but ended up changing my mind after all.  Next time I'll
  really hold people to it.  Oh well.   - Linus ]

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (34 commits)
  libceph: use KMEM_CACHE macro
  ceph: use kmem_cache_zalloc
  rbd: use KMEM_CACHE macro
  ceph: use lookup request to revalidate dentry
  ceph: kill ceph_get_dentry_parent_inode()
  ceph: fix security xattr deadlock
  ceph: don't request vxattrs from MDS
  ceph: fix mounting same fs multiple times
  ceph: remove unnecessary NULL check
  ceph: avoid updating directory inode's i_size accidentally
  ceph: fix race during filling readdir cache
  libceph: use sizeof_footer() more
  ceph: kill ceph_empty_snapc
  ceph: fix a wrong comparison
  ceph: replace CURRENT_TIME by current_fs_time()
  ceph: scattered page writeback
  libceph: add helper that duplicates last extent operation
  libceph: enable large, variable-sized OSD requests
  libceph: osdc->req_mempool should be backed by a slab pool
  libceph: make r_request msg_size calculation clearer
  ...
parents 698f415c 5ee61e95
......@@ -1847,14 +1847,12 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
if (osd_req->r_result < 0)
obj_request->result = osd_req->r_result;
rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
/*
* We support a 64-bit length, but ultimately it has to be
* passed to the block layer, which just supports a 32-bit
* length field.
*/
obj_request->xferred = osd_req->r_reply_op_len[0];
obj_request->xferred = osd_req->r_ops[0].outdata_len;
rbd_assert(obj_request->xferred < (u64)UINT_MAX);
opcode = osd_req->r_ops[0].op;
......@@ -5643,18 +5641,12 @@ static void rbd_sysfs_cleanup(void)
static int rbd_slab_init(void)
{
rbd_assert(!rbd_img_request_cache);
rbd_img_request_cache = kmem_cache_create("rbd_img_request",
sizeof (struct rbd_img_request),
__alignof__(struct rbd_img_request),
0, NULL);
rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
if (!rbd_img_request_cache)
return -ENOMEM;
rbd_assert(!rbd_obj_request_cache);
rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
sizeof (struct rbd_obj_request),
__alignof__(struct rbd_obj_request),
0, NULL);
rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
if (!rbd_obj_request_cache)
goto out_err;
......
......@@ -175,8 +175,8 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
static int ceph_releasepage(struct page *page, gfp_t g)
{
struct inode *inode = page->mapping ? page->mapping->host : NULL;
dout("%p releasepage %p idx %lu\n", inode, page, page->index);
dout("%p releasepage %p idx %lu\n", page->mapping->host,
page, page->index);
WARN_ON(PageDirty(page));
/* Can we release the page from the cache? */
......@@ -276,7 +276,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
for (i = 0; i < num_pages; i++) {
struct page *page = osd_data->pages[i];
if (rc < 0 && rc != ENOENT)
if (rc < 0 && rc != -ENOENT)
goto unlock;
if (bytes < (int)PAGE_CACHE_SIZE) {
/* zero (remainder of) page */
......@@ -606,71 +606,71 @@ static void writepages_finish(struct ceph_osd_request *req,
struct inode *inode = req->r_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_osd_data *osd_data;
unsigned wrote;
struct page *page;
int num_pages;
int i;
int num_pages, total_pages = 0;
int i, j;
int rc = req->r_result;
struct ceph_snap_context *snapc = req->r_snapc;
struct address_space *mapping = inode->i_mapping;
int rc = req->r_result;
u64 bytes = req->r_ops[0].extent.length;
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
long writeback_stat;
unsigned issued = ceph_caps_issued(ci);
bool remove_page;
osd_data = osd_req_op_extent_osd_data(req, 0);
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
num_pages = calc_pages_for((u64)osd_data->alignment,
(u64)osd_data->length);
if (rc >= 0) {
/*
* Assume we wrote the pages we originally sent. The
* osd might reply with fewer pages if our writeback
* raced with a truncation and was adjusted at the osd,
* so don't believe the reply.
*/
wrote = num_pages;
} else {
wrote = 0;
dout("writepages_finish %p rc %d\n", inode, rc);
if (rc < 0)
mapping_set_error(mapping, rc);
}
dout("writepages_finish %p rc %d bytes %llu wrote %d (pages)\n",
inode, rc, bytes, wrote);
/* clean all pages */
for (i = 0; i < num_pages; i++) {
page = osd_data->pages[i];
BUG_ON(!page);
WARN_ON(!PageUptodate(page));
/*
* We lost the cache cap, need to truncate the page before
* it is unlocked, otherwise we'd truncate it later in the
* page truncation thread, possibly losing some data that
* raced its way in
*/
remove_page = !(ceph_caps_issued(ci) &
(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
writeback_stat =
atomic_long_dec_return(&fsc->writeback_count);
if (writeback_stat <
CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
clear_bdi_congested(&fsc->backing_dev_info,
BLK_RW_ASYNC);
/* clean all pages */
for (i = 0; i < req->r_num_ops; i++) {
if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
break;
ceph_put_snap_context(page_snap_context(page));
page->private = 0;
ClearPagePrivate(page);
dout("unlocking %d %p\n", i, page);
end_page_writeback(page);
osd_data = osd_req_op_extent_osd_data(req, i);
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
num_pages = calc_pages_for((u64)osd_data->alignment,
(u64)osd_data->length);
total_pages += num_pages;
for (j = 0; j < num_pages; j++) {
page = osd_data->pages[j];
BUG_ON(!page);
WARN_ON(!PageUptodate(page));
if (atomic_long_dec_return(&fsc->writeback_count) <
CONGESTION_OFF_THRESH(
fsc->mount_options->congestion_kb))
clear_bdi_congested(&fsc->backing_dev_info,
BLK_RW_ASYNC);
ceph_put_snap_context(page_snap_context(page));
page->private = 0;
ClearPagePrivate(page);
dout("unlocking %p\n", page);
end_page_writeback(page);
if (remove_page)
generic_error_remove_page(inode->i_mapping,
page);
/*
* We lost the cache cap, need to truncate the page before
* it is unlocked, otherwise we'd truncate it later in the
* page truncation thread, possibly losing some data that
* raced its way in
*/
if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
generic_error_remove_page(inode->i_mapping, page);
unlock_page(page);
}
dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
inode, osd_data->length, rc >= 0 ? num_pages : 0);
unlock_page(page);
ceph_release_pages(osd_data->pages, num_pages);
}
dout("%p wrote+cleaned %d pages\n", inode, wrote);
ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc);
ceph_release_pages(osd_data->pages, num_pages);
ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
osd_data = osd_req_op_extent_osd_data(req, 0);
if (osd_data->pages_from_pool)
mempool_free(osd_data->pages,
ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
......@@ -778,17 +778,15 @@ static int ceph_writepages_start(struct address_space *mapping,
while (!done && index <= end) {
unsigned i;
int first;
pgoff_t next;
int pvec_pages, locked_pages;
struct page **pages = NULL;
pgoff_t strip_unit_end = 0;
int num_ops = 0, op_idx;
int pvec_pages, locked_pages = 0;
struct page **pages = NULL, **data_pages;
mempool_t *pool = NULL; /* Becomes non-null if mempool used */
struct page *page;
int want;
u64 offset, len;
long writeback_stat;
u64 offset = 0, len = 0;
next = 0;
locked_pages = 0;
max_pages = max_pages_ever;
get_more_pages:
......@@ -824,8 +822,8 @@ static int ceph_writepages_start(struct address_space *mapping,
unlock_page(page);
break;
}
if (next && (page->index != next)) {
dout("not consecutive %p\n", page);
if (strip_unit_end && (page->index > strip_unit_end)) {
dout("end of strip unit %p\n", page);
unlock_page(page);
break;
}
......@@ -867,36 +865,31 @@ static int ceph_writepages_start(struct address_space *mapping,
/*
* We have something to write. If this is
* the first locked page this time through,
* allocate an osd request and a page array
* that it will use.
* calculate max possinle write size and
* allocate a page array
*/
if (locked_pages == 0) {
BUG_ON(pages);
u64 objnum;
u64 objoff;
/* prepare async write request */
offset = (u64)page_offset(page);
len = wsize;
req = ceph_osdc_new_request(&fsc->client->osdc,
&ci->i_layout, vino,
offset, &len, 0,
do_sync ? 2 : 1,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_WRITE |
CEPH_OSD_FLAG_ONDISK,
snapc, truncate_seq,
truncate_size, true);
if (IS_ERR(req)) {
rc = PTR_ERR(req);
rc = ceph_calc_file_object_mapping(&ci->i_layout,
offset, len,
&objnum, &objoff,
&len);
if (rc < 0) {
unlock_page(page);
break;
}
if (do_sync)
osd_req_op_init(req, 1,
CEPH_OSD_OP_STARTSYNC, 0);
req->r_callback = writepages_finish;
req->r_inode = inode;
num_ops = 1 + do_sync;
strip_unit_end = page->index +
((len - 1) >> PAGE_CACHE_SHIFT);
BUG_ON(pages);
max_pages = calc_pages_for(0, (u64)len);
pages = kmalloc(max_pages * sizeof (*pages),
GFP_NOFS);
......@@ -905,6 +898,20 @@ static int ceph_writepages_start(struct address_space *mapping,
pages = mempool_alloc(pool, GFP_NOFS);
BUG_ON(!pages);
}
len = 0;
} else if (page->index !=
(offset + len) >> PAGE_CACHE_SHIFT) {
if (num_ops >= (pool ? CEPH_OSD_SLAB_OPS :
CEPH_OSD_MAX_OPS)) {
redirty_page_for_writepage(wbc, page);
unlock_page(page);
break;
}
num_ops++;
offset = (u64)page_offset(page);
len = 0;
}
/* note position of first page in pvec */
......@@ -913,18 +920,16 @@ static int ceph_writepages_start(struct address_space *mapping,
dout("%p will write page %p idx %lu\n",
inode, page, page->index);
writeback_stat =
atomic_long_inc_return(&fsc->writeback_count);
if (writeback_stat > CONGESTION_ON_THRESH(
if (atomic_long_inc_return(&fsc->writeback_count) >
CONGESTION_ON_THRESH(
fsc->mount_options->congestion_kb)) {
set_bdi_congested(&fsc->backing_dev_info,
BLK_RW_ASYNC);
}
set_page_writeback(page);
pages[locked_pages] = page;
locked_pages++;
next = page->index + 1;
len += PAGE_CACHE_SIZE;
}
/* did we get anything? */
......@@ -944,38 +949,119 @@ static int ceph_writepages_start(struct address_space *mapping,
/* shift unused pages over in the pvec... we
* will need to release them below. */
for (j = i; j < pvec_pages; j++) {
dout(" pvec leftover page %p\n",
pvec.pages[j]);
dout(" pvec leftover page %p\n", pvec.pages[j]);
pvec.pages[j-i+first] = pvec.pages[j];
}
pvec.nr -= i-first;
}
/* Format the osd request message and submit the write */
new_request:
offset = page_offset(pages[0]);
len = (u64)locked_pages << PAGE_CACHE_SHIFT;
if (snap_size == -1) {
len = min(len, (u64)i_size_read(inode) - offset);
/* writepages_finish() clears writeback pages
* according to the data length, so make sure
* data length covers all locked pages */
len = max(len, 1 +
((u64)(locked_pages - 1) << PAGE_CACHE_SHIFT));
} else {
len = min(len, snap_size - offset);
len = wsize;
req = ceph_osdc_new_request(&fsc->client->osdc,
&ci->i_layout, vino,
offset, &len, 0, num_ops,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_WRITE |
CEPH_OSD_FLAG_ONDISK,
snapc, truncate_seq,
truncate_size, false);
if (IS_ERR(req)) {
req = ceph_osdc_new_request(&fsc->client->osdc,
&ci->i_layout, vino,
offset, &len, 0,
min(num_ops,
CEPH_OSD_SLAB_OPS),
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_WRITE |
CEPH_OSD_FLAG_ONDISK,
snapc, truncate_seq,
truncate_size, true);
BUG_ON(IS_ERR(req));
}
dout("writepages got %d pages at %llu~%llu\n",
locked_pages, offset, len);
BUG_ON(len < page_offset(pages[locked_pages - 1]) +
PAGE_CACHE_SIZE - offset);
req->r_callback = writepages_finish;
req->r_inode = inode;
osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
/* Format the osd request message and submit the write */
len = 0;
data_pages = pages;
op_idx = 0;
for (i = 0; i < locked_pages; i++) {
u64 cur_offset = page_offset(pages[i]);
if (offset + len != cur_offset) {
if (op_idx + do_sync + 1 == req->r_num_ops)
break;
osd_req_op_extent_dup_last(req, op_idx,
cur_offset - offset);
dout("writepages got pages at %llu~%llu\n",
offset, len);
osd_req_op_extent_osd_data_pages(req, op_idx,
data_pages, len, 0,
!!pool, false);
osd_req_op_extent_update(req, op_idx, len);
pages = NULL; /* request message now owns the pages array */
pool = NULL;
len = 0;
offset = cur_offset;
data_pages = pages + i;
op_idx++;
}
/* Update the write op length in case we changed it */
set_page_writeback(pages[i]);
len += PAGE_CACHE_SIZE;
}
if (snap_size != -1) {
len = min(len, snap_size - offset);
} else if (i == locked_pages) {
/* writepages_finish() clears writeback pages
* according to the data length, so make sure
* data length covers all locked pages */
u64 min_len = len + 1 - PAGE_CACHE_SIZE;
len = min(len, (u64)i_size_read(inode) - offset);
len = max(len, min_len);
}
dout("writepages got pages at %llu~%llu\n", offset, len);
osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
0, !!pool, false);
osd_req_op_extent_update(req, op_idx, len);
osd_req_op_extent_update(req, 0, len);
if (do_sync) {
op_idx++;
osd_req_op_init(req, op_idx, CEPH_OSD_OP_STARTSYNC, 0);
}
BUG_ON(op_idx + 1 != req->r_num_ops);
pool = NULL;
if (i < locked_pages) {
BUG_ON(num_ops <= req->r_num_ops);
num_ops -= req->r_num_ops;
num_ops += do_sync;
locked_pages -= i;
/* allocate new pages array for next request */
data_pages = pages;
pages = kmalloc(locked_pages * sizeof (*pages),
GFP_NOFS);
if (!pages) {
pool = fsc->wb_pagevec_pool;
pages = mempool_alloc(pool, GFP_NOFS);
BUG_ON(!pages);
}
memcpy(pages, data_pages + i,
locked_pages * sizeof(*pages));
memset(data_pages + i, 0,
locked_pages * sizeof(*pages));
} else {
BUG_ON(num_ops != req->r_num_ops);
index = pages[i - 1]->index + 1;
/* request message now owns the pages array */
pages = NULL;
}
vino = ceph_vino(inode);
ceph_osdc_build_request(req, offset, snapc, vino.snap,
......@@ -985,9 +1071,10 @@ static int ceph_writepages_start(struct address_space *mapping,
BUG_ON(rc);
req = NULL;
/* continue? */
index = next;
wbc->nr_to_write -= locked_pages;
wbc->nr_to_write -= i;
if (pages)
goto new_request;
if (wbc->nr_to_write <= 0)
done = 1;
......@@ -1522,7 +1609,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
ceph_vino(inode), 0, &len, 0, 1,
CEPH_OSD_OP_CREATE,
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
ceph_empty_snapc, 0, 0, false);
NULL, 0, 0, false);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
......@@ -1540,9 +1627,8 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
ceph_vino(inode), 0, &len, 1, 3,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
ceph_empty_snapc,
ci->i_truncate_seq, ci->i_truncate_size,
false);
NULL, ci->i_truncate_seq,
ci->i_truncate_size, false);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
......@@ -1663,8 +1749,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
goto out;
}
rd_req = ceph_osdc_alloc_request(&fsc->client->osdc,
ceph_empty_snapc,
rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1, false, GFP_NOFS);
if (!rd_req) {
err = -ENOMEM;
......@@ -1678,8 +1763,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
"%llx.00000000", ci->i_vino.ino);
rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name);
wr_req = ceph_osdc_alloc_request(&fsc->client->osdc,
ceph_empty_snapc,
wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1, false, GFP_NOFS);
if (!wr_req) {
err = -ENOMEM;
......
......@@ -991,7 +991,7 @@ static int send_cap_msg(struct ceph_mds_session *session,
u32 seq, u64 flush_tid, u64 oldest_flush_tid,
u32 issue_seq, u32 mseq, u64 size, u64 max_size,
struct timespec *mtime, struct timespec *atime,
u64 time_warp_seq,
struct timespec *ctime, u64 time_warp_seq,
kuid_t uid, kgid_t gid, umode_t mode,
u64 xattr_version,
struct ceph_buffer *xattrs_buf,
......@@ -1042,6 +1042,8 @@ static int send_cap_msg(struct ceph_mds_session *session,
ceph_encode_timespec(&fc->mtime, mtime);
if (atime)
ceph_encode_timespec(&fc->atime, atime);
if (ctime)
ceph_encode_timespec(&fc->ctime, ctime);
fc->time_warp_seq = cpu_to_le32(time_warp_seq);
fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid));
......@@ -1116,7 +1118,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
int held, revoking, dropping, keep;
u64 seq, issue_seq, mseq, time_warp_seq, follows;
u64 size, max_size;
struct timespec mtime, atime;
struct timespec mtime, atime, ctime;
int wake = 0;
umode_t mode;
kuid_t uid;
......@@ -1180,6 +1182,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
ci->i_requested_max_size = max_size;
mtime = inode->i_mtime;
atime = inode->i_atime;
ctime = inode->i_ctime;
time_warp_seq = ci->i_time_warp_seq;
uid = inode->i_uid;
gid = inode->i_gid;
......@@ -1198,7 +1201,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
op, keep, want, flushing, seq,
flush_tid, oldest_flush_tid, issue_seq, mseq,
size, max_size, &mtime, &atime, time_warp_seq,
size, max_size, &mtime, &atime, &ctime, time_warp_seq,
uid, gid, mode, xattr_version, xattr_blob,
follows, inline_data);
if (ret < 0) {
......@@ -1320,7 +1323,7 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci,
capsnap->dirty, 0, capsnap->flush_tid, 0,
0, mseq, capsnap->size, 0,
&capsnap->mtime, &capsnap->atime,
capsnap->time_warp_seq,
&capsnap->ctime, capsnap->time_warp_seq,
capsnap->uid, capsnap->gid, capsnap->mode,
capsnap->xattr_version, capsnap->xattr_blob,
capsnap->follows, capsnap->inline_data);
......
......@@ -38,7 +38,7 @@ int ceph_init_dentry(struct dentry *dentry)
if (dentry->d_fsdata)
return 0;
di = kmem_cache_alloc(ceph_dentry_cachep, GFP_KERNEL | __GFP_ZERO);
di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
if (!di)
return -ENOMEM; /* oh well */
......@@ -68,23 +68,6 @@ int ceph_init_dentry(struct dentry *dentry)
return 0;
}
struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry)
{
struct inode *inode = NULL;
if (!dentry)
return NULL;
spin_lock(&dentry->d_lock);
if (!IS_ROOT(dentry)) {
inode = d_inode(dentry->d_parent);
ihold(inode);
}
spin_unlock(&dentry->d_lock);
return inode;
}
/*
* for readdir, we encode the directory frag and offset within that
* frag into f_pos.
......@@ -624,6 +607,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_mds_request *req;
int op;
int mask;
int err;
dout("lookup %p dentry %p '%pd'\n",
......@@ -666,8 +650,12 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
return ERR_CAST(req);
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
/* we only need inode linkage */
req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE);
mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
if (ceph_security_xattr_wanted(dir))
mask |= CEPH_CAP_XATTR_SHARED;
req->r_args.getattr.mask = cpu_to_le32(mask);
req->r_locked_dir = dir;
err = ceph_mdsc_do_request(mdsc, NULL, req);
err = ceph_handle_snapdir(req, dentry, err);
......@@ -1095,6 +1083,7 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
{
int valid = 0;
struct dentry *parent;
struct inode *dir;
if (flags & LOOKUP_RCU)
......@@ -1103,7 +1092,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry,
dentry, d_inode(dentry), ceph_dentry(dentry)->offset);