dm-buffered: improce concurrency This patch improves dm-buffered concurrency. Previously, there was just one work item and one workqueue, so if bio processing blocked, it would not be possible to process other bios. This patch moves the work struct to struct bio_c, so that it is possible to process more work items concurrently. Signed-off-by: Mikulas Patocka --- drivers/md/dm-buffered-target.c | 42 +++++++++------------------------------- 1 file changed, 10 insertions(+), 32 deletions(-) Index: linux-2.6/drivers/md/dm-buffered-target.c =================================================================== --- linux-2.6.orig/drivers/md/dm-buffered-target.c +++ linux-2.6/drivers/md/dm-buffered-target.c @@ -27,11 +27,8 @@ enum { S_BUFFER_SPLITS, S_PREFLUSHS, S_F /* buffered target context */ struct buffered_c { - spinlock_t lock; /* Protect following bio list */ - struct bio_list bios; struct dm_bufio_client *bufio; struct workqueue_struct *buffered_wq; - struct work_struct buffered_ws; struct workqueue_struct *buffered_flush_wq; struct delayed_work buffered_flush_ws; mempool_t async_request_pool; @@ -65,6 +62,8 @@ static struct kmem_cache *async_request_ /* buffer async_memcpy context */ struct bio_c { + struct work_struct work; + struct buffered_c *bc; atomic_t memcpy_in_progress; }; @@ -402,18 +401,12 @@ err: /* Process I/O on a bio prefetching buffers on a single @bio in a worker. */ static void _process_bio(struct work_struct *work) { - struct buffered_c *bc = container_of(work, struct buffered_c, buffered_ws); - struct bio *bio; - bool queue, write; + struct bio_c *bio_c = container_of(work, struct bio_c, work); + struct buffered_c *bc = bio_c->bc; + struct bio *bio = dm_bio_from_per_bio_data(bio_c, sizeof(*bio_c)); + bool write; sector_t blocks, sectors, start, end; - spin_lock(&bc->lock); - bio = bio_list_pop(&bc->bios); - spin_unlock(&bc->lock); - - if (!bio) - return; - write = false; start = _to_block(bc, bio->bi_iter.bi_sector); @@ -448,14 +441,6 @@ static void _process_bio(struct work_str __process_bio(bc, bio); - /* Use spinlock here as in map function to avoid any memory access reordering issues. */ - spin_lock(&bc->lock); - queue = !bio_list_empty(&bc->bios); - spin_unlock(&bc->lock); - - if (queue) - queue_work(bc->buffered_wq, &bc->buffered_ws); - /* Reschedule the flush thread in case of new write(s). */ if (write) schedule_delayed_work(&bc->buffered_flush_ws, 2 * HZ); @@ -694,9 +679,6 @@ static int buffered_ctr(struct dm_target } dm_bufio_set_sector_offset(bc->bufio, bc->start); - spin_lock_init(&bc->lock); - bio_list_init(&bc->bios); - INIT_WORK(&bc->buffered_ws, _process_bio); INIT_DELAYED_WORK(&bc->buffered_flush_ws, _process_flushs); bc->block_shift = __ffs(bc->buffer_size) - SECTOR_SHIFT; bc->block_mask = ~(~((sector_t)0) << bc->block_shift); @@ -726,18 +708,14 @@ bad: static int buffered_map(struct dm_target *ti, struct bio *bio) { + struct bio_c *bio_c = dm_per_bio_data(bio, sizeof(*bio_c)); struct buffered_c *bc = ti->private; - bool queue; bio->bi_iter.bi_sector = dm_target_offset(ti, bio->bi_iter.bi_sector); - spin_lock(&bc->lock); - queue = bio_list_empty(&bc->bios); - bio_list_add(&bc->bios, bio); - spin_unlock(&bc->lock); - - if (queue) - queue_work(bc->buffered_wq, &bc->buffered_ws); + INIT_WORK(&bio_c->work, _process_bio); + bio_c->bc = bc; + queue_work(bc->buffered_wq, &bio_c->work); return DM_MAPIO_SUBMITTED; }