dm-buffered: rework and fix async_memcpy The _complete_memcpy function is used as a async memcpy callback, this function may be called from an interrupt context, so it should not block. Unfortunatelly, it calls functions that can block - dm_bufio_mark_buffer_dirty and dm_bufio_release. We fix this bug by reworking _complete_memcpy so that it submits the work item to a workqueue, and we complete processing from the workqueue. Signed-off-by: Mikulas Patocka --- drivers/md/dm-buffered-target.c | 121 ++++++++++++---------------------------- 1 file changed, 39 insertions(+), 82 deletions(-) Index: linux-2.6/drivers/md/dm-buffered-target.c =================================================================== --- linux-2.6.orig/drivers/md/dm-buffered-target.c +++ linux-2.6/drivers/md/dm-buffered-target.c @@ -62,11 +62,7 @@ static struct kmem_cache *async_request_ /* buffer async_memcpy context */ struct bio_c { - struct async_submit_ctl submit; - struct buffered_c *bc; - struct dm_buffer *bp; - unsigned int buffer_offset; - unsigned int len; + atomic_t memcpy_in_progress; }; /* Convert sector to bufio block number */ @@ -87,70 +83,42 @@ static sector_t _sector_mod(struct buffe return sector & bc->block_mask; } -/* Use unutilized @bio->bi_next as endio reference counter and take out @ref_count number. */ -static void _bio_set_references(struct bio *bio, unsigned int ref_count) -{ - atomic_set((atomic_t *)&bio->bi_next, ref_count); -} - -/* Use unutilized @bio->bi_next as endio reference counter */ -static void _bio_get(struct bio *bio) -{ - atomic_inc((atomic_t *)&bio->bi_next); -} - -/* Perform bio endio completion when unreferenced using bio clone's unutilized bi_next member */ -static void _bio_put(struct bio *bio) -{ - if (atomic_dec_and_test((atomic_t *)&bio->bi_next)) - bio_endio(bio); -} - /* Flush any dirty buffers of @bc out */ static blk_status_t _buffered_flush(struct buffered_c *bc) { return errno_to_blk_status(dm_bufio_write_dirty_buffers(bc->bufio)); } -/* async_memcpy() completion callback */ -static void _complete_memcpy(void *context) +static void _complete_buffer(struct buffered_c *bc, struct bio *bio, struct dm_buffer *bp) { - struct bio *bio = context; - struct bio_c *bio_c = dm_per_bio_data(bio, sizeof(*bio_c)); - struct buffered_c *bc = bio_c->bc; - struct dm_buffer *bp = bio_c->bp; - unsigned int len = bio_c->len; - - /* Avoid race with bufio flushing code on async memory copy by dirtying here. */ - if (len) { - unsigned int buffer_offset = bio_c->buffer_offset; - - /* Superfluous conditional to show both functions dirtying buffers. */ - if (!buffer_offset && len == dm_bufio_get_block_size(bc->bufio)) - dm_bufio_mark_buffer_dirty(bp); - else - dm_bufio_mark_partial_buffer_dirty(bp, buffer_offset, buffer_offset + len); - + if (bio_op(bio) == REQ_OP_WRITE) { + dm_bufio_mark_buffer_dirty(bp); atomic_inc(&bc->stats[S_BUFFERS_DIRTIED]); } - - _bio_put(bio); dm_bufio_release(bp); } -/* Initialize per bufio buffer async_memcpy() context */ -static struct async_submit_ctl *_init_async_memcpy(struct bio *bio, - struct buffered_c *bc, struct dm_buffer *bp, - unsigned int buffer_offset, unsigned int len) +static void _complete_memcpy_work(struct work_struct *ws) { + struct async_request *as = container_of(ws, struct async_request, work); + struct bio *bio = as->bio; struct bio_c *bio_c = dm_per_bio_data(bio, sizeof(*bio_c)); + struct buffered_c *bc = as->bc; - bio_c->bc = bc; - bio_c->bp = bp; - bio_c->buffer_offset = buffer_offset; - bio_c->len = len; - init_async_submit(&bio_c->submit, 0, NULL, _complete_memcpy, bio, NULL); - return &bio_c->submit; + _complete_buffer(bc, bio, as->bp); + + mempool_free(as, &bc->async_request_pool); + + if (atomic_dec_and_test(&bio_c->memcpy_in_progress)) + bio_endio(bio); +} + +static void _complete_memcpy(void *context) +{ + struct async_request *as = context; + struct buffered_c *bc = as->bc; + INIT_WORK(&as->work, _complete_memcpy_work); + queue_work(bc->buffered_async_wq, &as->work); } /* Return total number of blocks for @ti */ @@ -163,19 +131,24 @@ static sector_t _buffered_size(struct dm static void _memcpy(struct bio *bio, struct buffered_c *bc, struct dm_buffer *bp, struct page *dst, struct page *src, - loff_t dst_offset, loff_t src_offset, size_t len, - struct async_submit_ctl *ctl) + loff_t dst_offset, loff_t src_offset, unsigned len) { if (bc->async_memcpy) { - async_memcpy(dst, src, dst_offset, src_offset, len, ctl); + struct bio_c *bio_c = dm_per_bio_data(bio, sizeof(*bio_c)); + struct async_request *as = mempool_alloc(&bc->async_request_pool, GFP_NOIO); + as->bc = bc; + as->bp = bp; + as->bio = bio; + init_async_submit(&as->submit, 0, NULL, _complete_memcpy, as, NULL); + atomic_inc(&bio_c->memcpy_in_progress); + async_memcpy(dst, src, dst_offset, src_offset, len, &as->submit); } else { void *d = kmap_local_page(dst); void *s = kmap_local_page(src); - memcpy(d + dst_offset, s + src_offset, len); kunmap_local(d); kunmap_local(s); - _complete_memcpy(bio); + _complete_buffer(bc, bio, bp); } } @@ -193,7 +166,6 @@ static void _io(struct buffered_c *bc, s void *buffer; struct dm_buffer *bp; struct page *buffer_page; - struct bio_c *bio_c = dm_per_bio_data(bio, sizeof(*bio_c)); while (total_len) { block = _to_block(bc, sector); @@ -205,7 +177,6 @@ static void _io(struct buffered_c *bc, s * in case the segment is split across 2 buffers. */ if (len < total_len) { - _bio_get(bio); atomic_inc(&bc->stats[S_BUFFER_SPLITS]); } @@ -225,10 +196,7 @@ static void _io(struct buffered_c *bc, s /* Memorize first error in bio status. */ if (!bio->bi_status) bio->bi_status = errno_to_blk_status(PTR_ERR(buffer)); - - _bio_put(bio); /* Continue with any split bio payload */ - } else if (write) { /* * Take out 2 references to be savely handling any single aysnchronous @@ -244,8 +212,7 @@ static void _io(struct buffered_c *bc, s buffer_page = unlikely(is_vmalloc_addr(buffer)) ? vmalloc_to_page(buffer) : virt_to_page(buffer); _memcpy(bio, bc, bp, buffer_page, bvec->bv_page, - offset_in_page(buffer), bvec_offset, len, - _init_async_memcpy(bio, bc, bp, buffer_offset, len)); + offset_in_page(buffer), bvec_offset, len); } else { /* (Superfluous) function consistency check example */ WARN_ON(block != dm_bufio_get_block_number(bp)); @@ -253,8 +220,7 @@ static void _io(struct buffered_c *bc, s buffer_page = unlikely(is_vmalloc_addr(buffer)) ? vmalloc_to_page(buffer) : virt_to_page(buffer); _memcpy(bio, bc, bp, bvec->bv_page, buffer_page, - bvec_offset, offset_in_page(buffer), len, - _init_async_memcpy(bio, bc, bp, 0, 0)); + bvec_offset, offset_in_page(buffer), len); } /* Process any additional buffer even in case of I/O error */ @@ -371,16 +337,13 @@ static void __process_bio(struct buffere { struct bio_vec bvec; blk_status_t r, rio; + struct bio_c *bio_c = dm_per_bio_data(bio, sizeof(*bio_c)); + + atomic_set(&bio_c->memcpy_in_progress, 1); switch (bio_op(bio)) { case REQ_OP_READ: case REQ_OP_WRITE: - /* - * Take references per segment out and add an - * additional one to prevent an endio race. - */ - _bio_set_references(bio, bio_segments(bio) + 1); - if (bio->bi_opf & REQ_PREFLUSH) { atomic_inc(&bc->stats[S_PREFLUSHS]); r = _buffered_flush(bc); @@ -406,26 +369,20 @@ static void __process_bio(struct buffere * Try forgetting buffers on discard (least we can do * with lag of discard passdown in dm-bufio). */ - _bio_set_references(bio, 1); if (bc->discard) _discard_blocks(bc, bio); break; case REQ_OP_WRITE_ZEROES: - _bio_set_references(bio, 1); _write_zeroes(bc, bio); break; default: - _bio_set_references(bio, 1); /* Return error for unsupported operation */ bio->bi_status = errno_to_blk_status(-EOPNOTSUPP); } - goto out; err: - /* On error, reset refecount to 1 for _bio_put() to endio */ - _bio_set_references(bio, 1); -out: - _bio_put(bio); /* Release (additional) reference */ + if (atomic_dec_and_test(&bio_c->memcpy_in_progress)) + bio_endio(bio); } /* Process I/O on a bio prefetching buffers on a single @bio in a worker. */