Implement clustered locking. The cluster-wide lock is held if: the local lock is held OR there are pending exceptions. s->pending_exceptions_submitted can be manipulated only when the local lock is held. So we unlock the cluster if we are unlocking the local lock and s->pending_exceptions_submitted is zero. If we are locking the local lock and s->pending_exceptions_submitted is nonzero, we don't lock the cluster lock because we have are already holding it. Signed-off-by: Mikulas Patocka --- drivers/md/dm-snap.c | 47 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 9 deletions(-) Index: linux-2.6.31-fast/drivers/md/dm-snap.c =================================================================== --- linux-2.6.31-fast.orig/drivers/md/dm-snap.c 2009-10-19 12:50:23.000000000 +0200 +++ linux-2.6.31-fast/drivers/md/dm-snap.c 2009-10-19 12:50:42.000000000 +0200 @@ -340,15 +340,32 @@ static void __insert_origin(struct origi list_add_tail(&o->hash_list, sl); } -#define LOCK_DEPTH 0xff +#define LOCK_DEPTH 0xff +#define LOCK_LOCAL_ONLY 0x100 static void lock_snapshot(struct dm_snapshot *s, int flags) { down_write_nested(&s->lock, flags & LOCK_DEPTH); +#ifdef CLUSTER_SNAPSHOTS + if (is_clustered(s) && !s->pending_exceptions_submitted && + !(flags & LOCK_LOCAL_ONLY)) { + int r = dm_cluster_lock_by_str(lockspace_handle, s->lockid, + DM_CLUSTER_LOCK_EXCLUSIVE, NULL, NULL); + BUG_ON(r < 0); + } +#endif } static void unlock_snapshot(struct dm_snapshot *s, int flags) { +#ifdef CLUSTER_SNAPSHOTS + if (is_clustered(s) && !s->pending_exceptions_submitted && + !(flags & LOCK_LOCAL_ONLY)) { + int r = dm_cluster_lock_by_str(lockspace_handle, s->lockid, + DM_CLUSTER_LOCK_MONITOR, NULL, NULL); + BUG_ON(r < 0); + } +#endif up_write(&s->lock); } @@ -1157,15 +1174,19 @@ static void snapshot_dtr(struct dm_targe #endif struct dm_snapshot *s = ti->private; + /* + * The exception store may be suspended at this place, so we can't + * re-read, so we must do local lock. + */ + lock_snapshot(s, LOCK_LOCAL_ONLY); /* This snapshot may need to handover its exception store */ - lock_snapshot(s, 0); if (s->handover_snap) { struct dm_snapshot *new_snap = s->handover_snap; - lock_snapshot(new_snap, 1); + lock_snapshot(new_snap, 1 | LOCK_LOCAL_ONLY); handover_exceptions(s, new_snap); - unlock_snapshot(new_snap, 1); + unlock_snapshot(new_snap, 1 | LOCK_LOCAL_ONLY); } - unlock_snapshot(s, 0); + unlock_snapshot(s, 0 | LOCK_LOCAL_ONLY); if (is_merge(ti)) stop_merge(s); @@ -1188,6 +1209,14 @@ static void snapshot_dtr(struct dm_targe BUG_ON(!hlist_empty(&s->tracked_chunk_hash[i])); #endif +#ifdef CLUSTER_SNAPSHOTS + if (is_clustered(s)) { + int r = dm_cluster_lock_by_str(lockspace_handle, s->lockid, + DM_CLUSTER_LOCK_UNLOCK, NULL, NULL); + BUG_ON(r < 0); + } +#endif + mempool_destroy(s->tracked_chunk_pool); __free_exceptions(s); @@ -1632,20 +1661,20 @@ static void snapshot_resume(struct dm_ta { struct dm_snapshot *s = ti->private; - lock_snapshot(s, 0); + lock_snapshot(s, 0 | LOCK_LOCAL_ONLY); if (s->handover) { /* Get exception store from another snapshot */ struct dm_snapshot *old_snap = s->handover_snap; BUG_ON(!old_snap); - lock_snapshot(old_snap, 1); + lock_snapshot(old_snap, 1 | LOCK_LOCAL_ONLY); handover_exceptions(old_snap, s); - unlock_snapshot(old_snap, 1); + unlock_snapshot(old_snap, 1 | LOCK_LOCAL_ONLY); } /* An incomplete exception handover is not allowed */ BUG_ON(s->handover || s->handover_snap); s->active = 1; s->suspended = 0; - unlock_snapshot(s, 0); + unlock_snapshot(s, 0 | LOCK_LOCAL_ONLY); } static void snapshot_merge_resume(struct dm_target *ti)