block copy: use asynchronous notification In dm-snapshot target there may be large number of copy requests in progress. If every pending copy request consumed a process context, it would put too much load on the system. To avoid this load, we need asynchronous notification when copy finishes - we can pass a callback to the function blkdev_issue_copy, if the callback is non-NULL, blkdev_issue_copy exits when it submits all the copy bios and the callback is called when the copy operation finishes. With the callback mechanism, there can be large number of in-progress copy requests and we do not need process context for each of them. Signed-off-by: Mikulas Patocka --- block/blk-lib.c | 155 +++++++++++++++++++++++++++++++--------------- block/ioctl.c | 2 include/linux/blk_types.h | 5 - include/linux/blkdev.h | 3 4 files changed, 113 insertions(+), 52 deletions(-) Index: linux-4.11-rc2/block/blk-lib.c =================================================================== --- linux-4.11-rc2.orig/block/blk-lib.c +++ linux-4.11-rc2/block/blk-lib.c @@ -9,12 +9,6 @@ #include "blk.h" -struct bio_batch { - atomic_t done; - int error; - struct completion *wait; -}; - static struct bio *next_bio(struct bio *bio, unsigned int nr_pages, gfp_t gfp) { @@ -387,6 +381,17 @@ int blkdev_issue_zeroout(struct block_de } EXPORT_SYMBOL(blkdev_issue_zeroout); +struct bio_copy_batch { + atomic_long_t done; + int async_error; + int sync_error; + sector_t sync_copied; + atomic64_t first_error; + void (*callback)(void *data, int error); + void *data; + sector_t *copied; +}; + #define BLK_COPY_TIMEOUT (10 * HZ) static void blk_copy_timeout(unsigned long bc_) @@ -415,6 +420,18 @@ static void blk_copy_timeout(unsigned lo } } +static void blk_copy_batch_finish(struct bio_copy_batch *batch) +{ + void (*fn)(void *, int) = batch->callback; + void *data = batch->data; + int error = unlikely(batch->sync_error) ? batch->sync_error : batch->async_error; + if (batch->copied) + *batch->copied = min(batch->sync_copied, (sector_t)atomic64_read(&batch->first_error)); + kfree(batch); + if (fn) + fn(data, error); +} + static void bio_copy_end_io(struct bio *bio) { struct bio_copy *bc = bio->bi_copy; @@ -438,25 +455,37 @@ static void bio_copy_end_io(struct bio * } bio_put(bio); if (atomic_dec_and_test(&bc->in_flight)) { - struct bio_batch *bb = bc->private; + struct bio_copy_batch *batch = bc->batch; if (unlikely(bc->error < 0)) { u64 first_error; - if (!ACCESS_ONCE(bb->error)) - ACCESS_ONCE(bb->error) = bc->error; + if (!ACCESS_ONCE(batch->async_error)) + ACCESS_ONCE(batch->async_error) = bc->error; do { - first_error = atomic64_read(bc->first_error); + first_error = atomic64_read(&batch->first_error); if (bc->offset >= first_error) break; - } while (unlikely(atomic64_cmpxchg(bc->first_error, + } while (unlikely(atomic64_cmpxchg(&batch->first_error, first_error, bc->offset) != first_error)); } del_timer_sync(&bc->timer); kfree(bc); - if (atomic_dec_and_test(&bb->done)) - complete(bb->wait); + if (atomic_long_dec_and_test(&batch->done)) + blk_copy_batch_finish(batch); } } +struct bio_copy_completion { + struct completion wait; + int error; +}; + +static void bio_copy_sync_callback(void *ptr, int error) +{ + struct bio_copy_completion *comp = ptr; + comp->error = error; + complete(&comp->wait); +} + /** * blkdev_issue_copy - queue a copy same operation * @src_bdev: source blockdev @@ -471,57 +500,82 @@ static void bio_copy_end_io(struct bio * */ int blkdev_issue_copy(struct block_device *src_bdev, sector_t src_sector, struct block_device *dst_bdev, sector_t dst_sector, - sector_t nr_sects, sector_t *copied, gfp_t gfp_mask) + sector_t nr_sects, sector_t *copied, + void (*callback)(void *, int), void *data, gfp_t gfp_mask) { DECLARE_COMPLETION_ONSTACK(wait); struct request_queue *sq = bdev_get_queue(src_bdev); struct request_queue *dq = bdev_get_queue(dst_bdev); sector_t max_copy_sectors; - struct bio_batch bb; - int ret = 0; - atomic64_t first_error = ATOMIC64_INIT(nr_sects); - sector_t offset = 0; + int ret; + struct bio_copy_batch *batch; + struct bio_copy_completion comp; if (copied) *copied = 0; - if (!sq || !dq) - return -ENXIO; + if (!sq || !dq) { + ret = -ENXIO; + goto end_callback; + } max_copy_sectors = min(sq->limits.max_copy_sectors, dq->limits.max_copy_sectors); - if (unlikely(!max_copy_sectors)) - return -EOPNOTSUPP; + if (unlikely(!max_copy_sectors)) { + ret = -EOPNOTSUPP; + goto end_callback; + } if (unlikely(src_sector + nr_sects < src_sector) || - unlikely(dst_sector + nr_sects < dst_sector)) - return -EINVAL; + unlikely(dst_sector + nr_sects < dst_sector)) { + ret = -EINVAL; + goto end_callback; + } /* Do not support overlapping copies */ if (src_bdev == dst_bdev && - unlikely(abs((u64)dst_sector - (u64)src_sector) < nr_sects)) - return -EOPNOTSUPP; - - atomic_set(&bb.done, 1); - bb.error = 0; - bb.wait = &wait; + unlikely(abs((u64)dst_sector - (u64)src_sector) < nr_sects)) { + ret = -EOPNOTSUPP; + goto end_callback; + } + + batch = kmalloc(sizeof(struct bio_copy_batch), gfp_mask); + if (!batch) { + ret = -ENOMEM; + goto end_callback; + } + + batch->done = (atomic_long_t)ATOMIC_LONG_INIT(1); + batch->async_error = 0; + batch->sync_error = 0; + batch->sync_copied = 0; + batch->first_error = (atomic64_t)ATOMIC64_INIT(nr_sects); + batch->copied = copied; + if (callback) { + batch->callback = callback; + batch->data = data; + } else { + comp.wait = COMPLETION_INITIALIZER_ONSTACK(comp.wait); + batch->callback = bio_copy_sync_callback; + batch->data = ∁ + } - while (nr_sects && !ACCESS_ONCE(bb.error)) { + while (nr_sects && !ACCESS_ONCE(batch->async_error)) { struct bio *read_bio, *write_bio; struct bio_copy *bc; unsigned chunk = (unsigned)min(nr_sects, (sector_t)max_copy_sectors); bc = kmalloc(sizeof(struct bio_copy), gfp_mask); if (!bc) { - ret = -ENOMEM; + batch->sync_error = -ENOMEM; break; } read_bio = bio_alloc(gfp_mask, 1); if (!read_bio) { kfree(bc); - ret = -ENOMEM; + batch->sync_error = -ENOMEM; break; } @@ -529,7 +583,7 @@ int blkdev_issue_copy(struct block_devic if (!write_bio) { bio_put(read_bio); kfree(bc); - ret = -ENOMEM; + batch->sync_error = -ENOMEM; break; } @@ -537,9 +591,8 @@ int blkdev_issue_copy(struct block_devic bc->error = 1; bc->pair[0] = NULL; bc->pair[1] = NULL; - bc->private = &bb; - bc->first_error = &first_error; - bc->offset = offset; + bc->batch = batch; + bc->offset = batch->sync_copied; spin_lock_init(&bc->spinlock); __setup_timer(&bc->timer, blk_copy_timeout, (unsigned long)bc, TIMER_IRQSAFE); mod_timer(&bc->timer, jiffies + BLK_COPY_TIMEOUT); @@ -558,27 +611,33 @@ int blkdev_issue_copy(struct block_devic write_bio->bi_bdev = dst_bdev; write_bio->bi_copy = bc; - atomic_inc(&bb.done); + atomic_long_inc(&batch->done); submit_bio(read_bio); submit_bio(write_bio); src_sector += chunk; dst_sector += chunk; nr_sects -= chunk; - offset += chunk; + batch->sync_copied += chunk; } - /* Wait for bios in-flight */ - if (!atomic_dec_and_test(&bb.done)) - wait_for_completion_io(&wait); - - if (copied) - *copied = min((sector_t)atomic64_read(&first_error), offset); + if (atomic_long_dec_and_test(&batch->done)) + blk_copy_batch_finish(batch); - if (likely(!ret)) - ret = bb.error; + if (callback) { + return 0; + } else { + wait_for_completion_io(&comp.wait); + return comp.error; + } - return ret; +end_callback: + if (callback) { + callback(data, ret); + return 0; + } else { + return ret; + } } EXPORT_SYMBOL(blkdev_issue_copy); Index: linux-4.11-rc2/include/linux/blk_types.h =================================================================== --- linux-4.11-rc2.orig/include/linux/blk_types.h +++ linux-4.11-rc2/include/linux/blk_types.h @@ -92,6 +92,8 @@ struct bio { #define BIO_RESET_BYTES offsetof(struct bio, bi_max_vecs) +struct bio_copy_batch; + struct bio_copy { /* * error == 1 - bios are waiting to be paired @@ -101,8 +103,7 @@ struct bio_copy { int error; atomic_t in_flight; struct bio *pair[2]; - void *private; - atomic64_t *first_error; + struct bio_copy_batch *batch; sector_t offset; spinlock_t spinlock; struct timer_list timer; Index: linux-4.11-rc2/include/linux/blkdev.h =================================================================== --- linux-4.11-rc2.orig/include/linux/blkdev.h +++ linux-4.11-rc2/include/linux/blkdev.h @@ -1352,7 +1352,8 @@ extern int blkdev_issue_zeroout(struct b sector_t nr_sects, gfp_t gfp_mask, bool discard); extern int blkdev_issue_copy(struct block_device *src_bdev, sector_t src_sector, struct block_device *dst_bdev, sector_t dst_sector, - sector_t nr_sects, sector_t *copied, gfp_t gfp_gfp_mask); + sector_t nr_sects, sector_t *copied, + void (*callback)(void *, int), void *data, gfp_t gfp_gfp_mask); static inline int sb_issue_discard(struct super_block *sb, sector_t block, sector_t nr_blocks, gfp_t gfp_mask, unsigned long flags) { Index: linux-4.11-rc2/block/ioctl.c =================================================================== --- linux-4.11-rc2.orig/block/ioctl.c +++ linux-4.11-rc2/block/ioctl.c @@ -306,7 +306,7 @@ static int blk_ioctl_copy(struct block_d return -EINVAL; ret = blkdev_issue_copy(bdev, src_offset, bdev, dst_offset, len, - &copied_sec, GFP_KERNEL); + &copied_sec, NULL, NULL, GFP_KERNEL); range[3] = (uint64_t)copied_sec << 9; if (unlikely(copy_to_user((void __user *)(arg + 24), &range[3], 8)))