Commit 39936837 authored by Jaegeuk Kim's avatar Jaegeuk Kim
Browse files

f2fs: introduce a new global lock scheme



In the previous version, f2fs uses global locks according to the usage types,
such as directory operations, block allocation, block write, and so on.

Reference the following lock types in f2fs.h.
enum lock_type {
	RENAME,		/* for renaming operations */
	DENTRY_OPS,	/* for directory operations */
	DATA_WRITE,	/* for data write */
	DATA_NEW,	/* for data allocation */
	DATA_TRUNC,	/* for data truncate */
	NODE_NEW,	/* for node allocation */
	NODE_TRUNC,	/* for node truncate */
	NODE_WRITE,	/* for node write */
	NR_LOCK_TYPE,
};

In that case, we lose the performance under the multi-threading environment,
since every types of operations must be conducted one at a time.

In order to address the problem, let's share the locks globally with a mutex
array regardless of any types.
So, let users grab a mutex and perform their jobs in parallel as much as
possbile.

For this, I propose a new global lock scheme as follows.

0. Data structure
 - f2fs_sb_info -> mutex_lock[NR_GLOBAL_LOCKS]
 - f2fs_sb_info -> node_write

1. mutex_lock_op(sbi)
 - try to get an avaiable lock from the array.
 - returns the index of the gottern lock variable.

2. mutex_unlock_op(sbi, index of the lock)
 - unlock the given index of the lock.

3. mutex_lock_all(sbi)
 - grab all the locks in the array before the checkpoint.

4. mutex_unlock_all(sbi)
 - release all the locks in the array after checkpoint.

5. block_operations()
 - call mutex_lock_all()
 - sync_dirty_dir_inodes()
 - grab node_write
 - sync_node_pages()

Note that,
 the pairs of mutex_lock_op()/mutex_unlock_op() and
 mutex_lock_all()/mutex_unlock_all() should be used together.
Signed-off-by: default avatarJaegeuk Kim <jaegeuk.kim@samsung.com>
parent 1127a3d4
......@@ -543,54 +543,39 @@ void sync_dirty_dir_inodes(struct f2fs_sb_info *sbi)
*/
static void block_operations(struct f2fs_sb_info *sbi)
{
int t;
struct writeback_control wbc = {
.sync_mode = WB_SYNC_ALL,
.nr_to_write = LONG_MAX,
.for_reclaim = 0,
};
retry_flush_dents:
mutex_lock_all(sbi);
/* Stop renaming operation */
mutex_lock_op(sbi, RENAME);
mutex_lock_op(sbi, DENTRY_OPS);
retry_dents:
/* write all the dirty dentry pages */
sync_dirty_dir_inodes(sbi);
mutex_lock_op(sbi, DATA_WRITE);
if (get_pages(sbi, F2FS_DIRTY_DENTS)) {
mutex_unlock_op(sbi, DATA_WRITE);
goto retry_dents;
mutex_unlock_all(sbi);
sync_dirty_dir_inodes(sbi);
goto retry_flush_dents;
}
/* block all the operations */
for (t = DATA_NEW; t <= NODE_TRUNC; t++)
mutex_lock_op(sbi, t);
mutex_lock(&sbi->write_inode);
/*
* POR: we should ensure that there is no dirty node pages
* until finishing nat/sit flush.
*/
retry:
sync_node_pages(sbi, 0, &wbc);
mutex_lock_op(sbi, NODE_WRITE);
retry_flush_nodes:
mutex_lock(&sbi->node_write);
if (get_pages(sbi, F2FS_DIRTY_NODES)) {
mutex_unlock_op(sbi, NODE_WRITE);
goto retry;
mutex_unlock(&sbi->node_write);
sync_node_pages(sbi, 0, &wbc);
goto retry_flush_nodes;
}
mutex_unlock(&sbi->write_inode);
}
static void unblock_operations(struct f2fs_sb_info *sbi)
{
int t;
for (t = NODE_WRITE; t >= RENAME; t--)
mutex_unlock_op(sbi, t);
mutex_unlock(&sbi->node_write);
mutex_unlock_all(sbi);
}
static void do_checkpoint(struct f2fs_sb_info *sbi, bool is_umount)
......
......@@ -260,6 +260,9 @@ struct page *get_lock_data_page(struct inode *inode, pgoff_t index)
/*
* Caller ensures that this data page is never allocated.
* A new zero-filled data page is allocated in the page cache.
*
* Also, caller should grab and release a mutex by calling mutex_lock_op() and
* mutex_unlock_op().
*/
struct page *get_new_data_page(struct inode *inode, pgoff_t index,
bool new_i_size)
......@@ -479,10 +482,11 @@ static int f2fs_write_data_page(struct page *page,
const pgoff_t end_index = ((unsigned long long) i_size)
>> PAGE_CACHE_SHIFT;
unsigned offset;
bool need_balance_fs = false;
int err = 0;
if (page->index < end_index)
goto out;
goto write;
/*
* If the offset is out-of-range of file size,
......@@ -494,50 +498,46 @@ static int f2fs_write_data_page(struct page *page,
dec_page_count(sbi, F2FS_DIRTY_DENTS);
inode_dec_dirty_dents(inode);
}
goto unlock_out;
goto out;
}
zero_user_segment(page, offset, PAGE_CACHE_SIZE);
out:
if (sbi->por_doing)
goto redirty_out;
if (wbc->for_reclaim && !S_ISDIR(inode->i_mode) && !is_cold_data(page))
write:
if (sbi->por_doing) {
err = AOP_WRITEPAGE_ACTIVATE;
goto redirty_out;
}
mutex_lock_op(sbi, DATA_WRITE);
/* Dentry blocks are controlled by checkpoint */
if (S_ISDIR(inode->i_mode)) {
dec_page_count(sbi, F2FS_DIRTY_DENTS);
inode_dec_dirty_dents(inode);
err = do_write_data_page(page);
} else {
int ilock = mutex_lock_op(sbi);
err = do_write_data_page(page);
mutex_unlock_op(sbi, ilock);
need_balance_fs = true;
}
err = do_write_data_page(page);
if (err && err != -ENOENT) {
wbc->pages_skipped++;
set_page_dirty(page);
}
mutex_unlock_op(sbi, DATA_WRITE);
if (err == -ENOENT)
goto out;
else if (err)
goto redirty_out;
if (wbc->for_reclaim)
f2fs_submit_bio(sbi, DATA, true);
if (err == -ENOENT)
goto unlock_out;
clear_cold_data(page);
out:
unlock_page(page);
if (!wbc->for_reclaim && !S_ISDIR(inode->i_mode))
if (need_balance_fs)
f2fs_balance_fs(sbi);
return 0;
unlock_out:
unlock_page(page);
return (err == -ENOENT) ? 0 : err;
redirty_out:
wbc->pages_skipped++;
set_page_dirty(page);
return AOP_WRITEPAGE_ACTIVATE;
return err;
}
#define MAX_DESIRED_PAGES_WP 4096
......@@ -592,6 +592,7 @@ static int f2fs_write_begin(struct file *file, struct address_space *mapping,
pgoff_t index = ((unsigned long long) pos) >> PAGE_CACHE_SHIFT;
struct dnode_of_data dn;
int err = 0;
int ilock;
/* for nobh_write_end */
*fsdata = NULL;
......@@ -603,28 +604,21 @@ static int f2fs_write_begin(struct file *file, struct address_space *mapping,
return -ENOMEM;
*pagep = page;
mutex_lock_op(sbi, DATA_NEW);
ilock = mutex_lock_op(sbi);
set_new_dnode(&dn, inode, NULL, NULL, 0);
err = get_dnode_of_data(&dn, index, ALLOC_NODE);
if (err) {
mutex_unlock_op(sbi, DATA_NEW);
f2fs_put_page(page, 1);
return err;
}
if (err)
goto err;
if (dn.data_blkaddr == NULL_ADDR) {
if (dn.data_blkaddr == NULL_ADDR)
err = reserve_new_block(&dn);
if (err) {
f2fs_put_dnode(&dn);
mutex_unlock_op(sbi, DATA_NEW);
f2fs_put_page(page, 1);
return err;
}
}
f2fs_put_dnode(&dn);
if (err)
goto err;
mutex_unlock_op(sbi, DATA_NEW);
mutex_unlock_op(sbi, ilock);
if ((len == PAGE_CACHE_SIZE) || PageUptodate(page))
return 0;
......@@ -654,6 +648,11 @@ static int f2fs_write_begin(struct file *file, struct address_space *mapping,
SetPageUptodate(page);
clear_cold_data(page);
return 0;
err:
mutex_unlock_op(sbi, ilock);
f2fs_put_page(page, 1);
return err;
}
static ssize_t f2fs_direct_IO(int rw, struct kiocb *iocb,
......
......@@ -249,9 +249,6 @@ ino_t f2fs_inode_by_name(struct inode *dir, struct qstr *qstr)
void f2fs_set_link(struct inode *dir, struct f2fs_dir_entry *de,
struct page *page, struct inode *inode)
{
struct f2fs_sb_info *sbi = F2FS_SB(dir->i_sb);
mutex_lock_op(sbi, DENTRY_OPS);
lock_page(page);
wait_on_page_writeback(page);
de->ino = cpu_to_le32(inode->i_ino);
......@@ -265,7 +262,6 @@ void f2fs_set_link(struct inode *dir, struct f2fs_dir_entry *de,
F2FS_I(inode)->i_pino = dir->i_ino;
f2fs_put_page(page, 1);
mutex_unlock_op(sbi, DENTRY_OPS);
}
void init_dent_inode(const struct qstr *name, struct page *ipage)
......@@ -284,6 +280,43 @@ void init_dent_inode(const struct qstr *name, struct page *ipage)
set_page_dirty(ipage);
}
static int make_empty_dir(struct inode *inode, struct inode *parent)
{
struct page *dentry_page;
struct f2fs_dentry_block *dentry_blk;
struct f2fs_dir_entry *de;
void *kaddr;
dentry_page = get_new_data_page(inode, 0, true);
if (IS_ERR(dentry_page))
return PTR_ERR(dentry_page);
kaddr = kmap_atomic(dentry_page);
dentry_blk = (struct f2fs_dentry_block *)kaddr;
de = &dentry_blk->dentry[0];
de->name_len = cpu_to_le16(1);
de->hash_code = 0;
de->ino = cpu_to_le32(inode->i_ino);
memcpy(dentry_blk->filename[0], ".", 1);
set_de_type(de, inode);
de = &dentry_blk->dentry[1];
de->hash_code = 0;
de->name_len = cpu_to_le16(2);
de->ino = cpu_to_le32(parent->i_ino);
memcpy(dentry_blk->filename[1], "..", 2);
set_de_type(de, inode);
test_and_set_bit_le(0, &dentry_blk->dentry_bitmap);
test_and_set_bit_le(1, &dentry_blk->dentry_bitmap);
kunmap_atomic(kaddr);
set_page_dirty(dentry_page);
f2fs_put_page(dentry_page, 1);
return 0;
}
static int init_inode_metadata(struct inode *inode,
struct inode *dir, const struct qstr *name)
{
......@@ -294,7 +327,7 @@ static int init_inode_metadata(struct inode *inode,
return err;
if (S_ISDIR(inode->i_mode)) {
err = f2fs_make_empty(inode, dir);
err = make_empty_dir(inode, dir);
if (err) {
remove_inode_page(inode);
return err;
......@@ -317,7 +350,7 @@ static int init_inode_metadata(struct inode *inode,
}
if (is_inode_flag_set(F2FS_I(inode), FI_INC_LINK)) {
inc_nlink(inode);
f2fs_write_inode(inode, NULL);
update_inode_page(inode);
}
return 0;
}
......@@ -341,7 +374,7 @@ static void update_parent_metadata(struct inode *dir, struct inode *inode,
}
if (need_dir_update)
f2fs_write_inode(dir, NULL);
update_inode_page(dir);
else
mark_inode_dirty(dir);
......@@ -373,6 +406,10 @@ static int room_for_filename(struct f2fs_dentry_block *dentry_blk, int slots)
goto next;
}
/*
* Caller should grab and release a mutex by calling mutex_lock_op() and
* mutex_unlock_op().
*/
int __f2fs_add_link(struct inode *dir, const struct qstr *name, struct inode *inode)
{
unsigned int bit_pos;
......@@ -382,7 +419,6 @@ int __f2fs_add_link(struct inode *dir, const struct qstr *name, struct inode *in
f2fs_hash_t dentry_hash;
struct f2fs_dir_entry *de;
unsigned int nbucket, nblock;
struct f2fs_sb_info *sbi = F2FS_SB(dir->i_sb);
size_t namelen = name->len;
struct page *dentry_page = NULL;
struct f2fs_dentry_block *dentry_blk = NULL;
......@@ -412,12 +448,9 @@ int __f2fs_add_link(struct inode *dir, const struct qstr *name, struct inode *in
bidx = dir_block_index(level, (le32_to_cpu(dentry_hash) % nbucket));
for (block = bidx; block <= (bidx + nblock - 1); block++) {
mutex_lock_op(sbi, DENTRY_OPS);
dentry_page = get_new_data_page(dir, block, true);
if (IS_ERR(dentry_page)) {
mutex_unlock_op(sbi, DENTRY_OPS);
if (IS_ERR(dentry_page))
return PTR_ERR(dentry_page);
}
dentry_blk = kmap(dentry_page);
bit_pos = room_for_filename(dentry_blk, slots);
......@@ -426,7 +459,6 @@ int __f2fs_add_link(struct inode *dir, const struct qstr *name, struct inode *in
kunmap(dentry_page);
f2fs_put_page(dentry_page, 1);
mutex_unlock_op(sbi, DENTRY_OPS);
}
/* Move to next level to find the empty slot for new dentry */
......@@ -456,7 +488,6 @@ int __f2fs_add_link(struct inode *dir, const struct qstr *name, struct inode *in
fail:
kunmap(dentry_page);
f2fs_put_page(dentry_page, 1);
mutex_unlock_op(sbi, DENTRY_OPS);
return err;
}
......@@ -476,8 +507,6 @@ void f2fs_delete_entry(struct f2fs_dir_entry *dentry, struct page *page,
void *kaddr = page_address(page);
int i;
mutex_lock_op(sbi, DENTRY_OPS);
lock_page(page);
wait_on_page_writeback(page);
......@@ -497,7 +526,7 @@ void f2fs_delete_entry(struct f2fs_dir_entry *dentry, struct page *page,
if (inode && S_ISDIR(inode->i_mode)) {
drop_nlink(dir);
f2fs_write_inode(dir, NULL);
update_inode_page(dir);
} else {
mark_inode_dirty(dir);
}
......@@ -509,7 +538,8 @@ void f2fs_delete_entry(struct f2fs_dir_entry *dentry, struct page *page,
drop_nlink(inode);
i_size_write(inode, 0);
}
f2fs_write_inode(inode, NULL);
update_inode_page(inode);
if (inode->i_nlink == 0)
add_orphan_inode(sbi, inode->i_ino);
}
......@@ -522,45 +552,6 @@ void f2fs_delete_entry(struct f2fs_dir_entry *dentry, struct page *page,
inode_dec_dirty_dents(dir);
}
f2fs_put_page(page, 1);
mutex_unlock_op(sbi, DENTRY_OPS);
}
int f2fs_make_empty(struct inode *inode, struct inode *parent)
{
struct page *dentry_page;
struct f2fs_dentry_block *dentry_blk;
struct f2fs_dir_entry *de;
void *kaddr;
dentry_page = get_new_data_page(inode, 0, true);
if (IS_ERR(dentry_page))
return PTR_ERR(dentry_page);
kaddr = kmap_atomic(dentry_page);
dentry_blk = (struct f2fs_dentry_block *)kaddr;
de = &dentry_blk->dentry[0];
de->name_len = cpu_to_le16(1);
de->hash_code = f2fs_dentry_hash(".", 1);
de->ino = cpu_to_le32(inode->i_ino);
memcpy(dentry_blk->filename[0], ".", 1);
set_de_type(de, inode);
de = &dentry_blk->dentry[1];
de->hash_code = f2fs_dentry_hash("..", 2);
de->name_len = cpu_to_le16(2);
de->ino = cpu_to_le32(parent->i_ino);
memcpy(dentry_blk->filename[1], "..", 2);
set_de_type(de, inode);
test_and_set_bit_le(0, &dentry_blk->dentry_bitmap);
test_and_set_bit_le(1, &dentry_blk->dentry_bitmap);
kunmap_atomic(kaddr);
set_page_dirty(dentry_page);
f2fs_put_page(dentry_page, 1);
return 0;
}
bool f2fs_empty_dir(struct inode *dir)
......
......@@ -309,23 +309,12 @@ enum count_type {
};
/*
* FS_LOCK nesting subclasses for the lock validator:
*
* The locking order between these classes is
* RENAME -> DENTRY_OPS -> DATA_WRITE -> DATA_NEW
* -> DATA_TRUNC -> NODE_WRITE -> NODE_NEW -> NODE_TRUNC
* Uses as sbi->fs_lock[NR_GLOBAL_LOCKS].
* The checkpoint procedure blocks all the locks in this fs_lock array.
* Some FS operations grab free locks, and if there is no free lock,
* then wait to grab a lock in a round-robin manner.
*/
enum lock_type {
RENAME, /* for renaming operations */
DENTRY_OPS, /* for directory operations */
DATA_WRITE, /* for data write */
DATA_NEW, /* for data allocation */
DATA_TRUNC, /* for data truncate */
NODE_NEW, /* for node allocation */
NODE_TRUNC, /* for node truncate */
NODE_WRITE, /* for node write */
NR_LOCK_TYPE,
};
#define NR_GLOBAL_LOCKS 8
/*
* The below are the page types of bios used in submti_bio().
......@@ -365,10 +354,11 @@ struct f2fs_sb_info {
/* for checkpoint */
struct f2fs_checkpoint *ckpt; /* raw checkpoint pointer */
struct inode *meta_inode; /* cache meta blocks */
struct mutex cp_mutex; /* for checkpoint procedure */
struct mutex fs_lock[NR_LOCK_TYPE]; /* for blocking FS operations */
struct mutex write_inode; /* mutex for write inode */
struct mutex cp_mutex; /* checkpoint procedure lock */
struct mutex fs_lock[NR_GLOBAL_LOCKS]; /* blocking FS operations */
struct mutex node_write; /* locking node writes */
struct mutex writepages; /* mutex for writepages() */
unsigned char next_lock_num; /* round-robin global locks */
int por_doing; /* recovery is doing or not */
/* for orphan inode management */
......@@ -503,14 +493,40 @@ static inline void clear_ckpt_flags(struct f2fs_checkpoint *cp, unsigned int f)
cp->ckpt_flags = cpu_to_le32(ckpt_flags);
}
static inline void mutex_lock_op(struct f2fs_sb_info *sbi, enum lock_type t)
static inline void mutex_lock_all(struct f2fs_sb_info *sbi)
{
int i = 0;
for (; i < NR_GLOBAL_LOCKS; i++)
mutex_lock(&sbi->fs_lock[i]);
}
static inline void mutex_unlock_all(struct f2fs_sb_info *sbi)
{
mutex_lock_nested(&sbi->fs_lock[t], t);
int i = 0;
for (; i < NR_GLOBAL_LOCKS; i++)
mutex_unlock(&sbi->fs_lock[i]);
}
static inline void mutex_unlock_op(struct f2fs_sb_info *sbi, enum lock_type t)
static inline int mutex_lock_op(struct f2fs_sb_info *sbi)
{
mutex_unlock(&sbi->fs_lock[t]);
unsigned char next_lock = sbi->next_lock_num % NR_GLOBAL_LOCKS;
int i = 0;
for (; i < NR_GLOBAL_LOCKS; i++)
if (mutex_trylock(&sbi->fs_lock[i]))
return i;
mutex_lock(&sbi->fs_lock[next_lock]);
sbi->next_lock_num++;
return next_lock;
}
static inline void mutex_unlock_op(struct f2fs_sb_info *sbi, int ilock)
{
if (ilock < 0)
return;
BUG_ON(ilock >= NR_GLOBAL_LOCKS);
mutex_unlock(&sbi->fs_lock[ilock]);
}
/*
......@@ -879,6 +895,7 @@ long f2fs_compat_ioctl(struct file *, unsigned int, unsigned long);
void f2fs_set_inode_flags(struct inode *);
struct inode *f2fs_iget(struct super_block *, unsigned long);
void update_inode(struct inode *, struct page *);
int update_inode_page(struct inode *);
int f2fs_write_inode(struct inode *, struct writeback_control *);
void f2fs_evict_inode(struct inode *);
......
......@@ -34,19 +34,18 @@ static int f2fs_vm_page_mkwrite(struct vm_area_struct *vma,
struct f2fs_sb_info *sbi = F2FS_SB(inode->i_sb);
block_t old_blk_addr;
struct dnode_of_data dn;
int err;
int err, ilock;
f2fs_balance_fs(sbi);
sb_start_pagefault(inode->i_sb);
mutex_lock_op(sbi, DATA_NEW);
/* block allocation */
ilock = mutex_lock_op(sbi);
set_new_dnode(&dn, inode, NULL, NULL, 0);
err = get_dnode_of_data(&dn, page->index, ALLOC_NODE);
if (err) {
mutex_unlock_op(sbi, DATA_NEW);
mutex_unlock_op(sbi, ilock);
goto out;
}
......@@ -56,13 +55,12 @@ static int f2fs_vm_page_mkwrite(struct vm_area_struct *vma,
err = reserve_new_block(&dn);
if (err) {
f2fs_put_dnode(&dn);
mutex_unlock_op(sbi, DATA_NEW);
mutex_unlock_op(sbi, ilock);
goto out;
}
}
f2fs_put_dnode(&dn);
mutex_unlock_op(sbi, DATA_NEW);
mutex_unlock_op(sbi, ilock);
lock_page(page);
if (page->mapping != inode->i_mapping ||
......@@ -223,20 +221,19 @@ static int truncate_blocks(struct inode *inode, u64 from)
unsigned int blocksize = inode->i_sb->s_blocksize;
struct dnode_of_data dn;
pgoff_t free_from;
int count = 0;
int count = 0, ilock = -1;
int err;
free_from = (pgoff_t)
((from + blocksize - 1) >> (sbi->log_blocksize));
mutex_lock_op(sbi, DATA_TRUNC);
ilock = mutex_lock_op(sbi);
set_new_dnode(&dn, inode, NULL, NULL, 0);
err = get_dnode_of_data(&dn, free_from, LOOKUP_NODE);
if (err) {
if (err == -ENOENT)
goto free_next;
mutex_unlock_op(sbi, DATA_TRUNC);
mutex_unlock_op(sbi, ilock);
return err;
}
......@@ -247,6 +244,7 @@ static int truncate_blocks(struct inode *inode, u64 from)
count -= dn.ofs_in_node;
BUG_ON(count < 0);
if (dn.ofs_in_node || IS_INODE(dn.node_page)) {
truncate_data_blocks_range(&dn, count);
free_from += count;
......@@ -255,7 +253,7 @@ static int truncate_blocks(struct inode *inode, u64 from)
f2fs_put_dnode(&dn);
free_next:
err = truncate_inode_blocks(inode, free_from);
mutex_unlock_op(sbi, DATA_TRUNC);
mutex_unlock_op(sbi, ilock);
/* lastly zero out the first data page */
truncate_partial_data_page(inode, from);
......@@ -363,15 +361,16 @@ static void fill_zero(struct inode *inode, pgoff_t index,
{
struct f2fs_sb_info *sbi = F2FS_SB(inode->i_sb);
struct page *page;
int ilock;
if (!len)
return;
f2fs_balance_fs(sbi);
mutex_lock_op(sbi, DATA_NEW);
ilock = mutex_lock_op(sbi);
page = get_new_data_page(inode, index, false);
mutex_unlock_op(sbi, DATA_NEW);
mutex_unlock_op(sbi, ilock);
if (!IS_ERR(page)) {
wait_on_page_writeback(page);
......@@ -388,13 +387,10 @@ int truncate_hole(struct inode *inode, pgoff_t pg_start, pgoff_t pg_end)
for (index = pg_start; index < pg_end; index++) {
struct dnode_of_data dn;
struct f2fs_sb_info *sbi = F2FS_SB(inode->i_sb);
mutex_lock_op(sbi, DATA_TRUNC);
set_new_dnode(&dn, inode, NULL, NULL, 0);
err = get_dnode_of_data(&dn, index, LOOKUP_NODE);
if (err) {
mutex_unlock_op(sbi, DATA_TRUNC);
if (err == -ENOENT)
continue;
return err;
......@@ -403,7 +399,6 @@ int truncate_hole(struct inode *inode, pgoff_t pg_start, pgoff_t pg_end)
if (dn.data_blkaddr != NULL_ADDR)
truncate_data_blocks_range(&dn, 1);
f2fs_put_dnode(&dn);
mutex_unlock_op(sbi, DATA_TRUNC);