From: David Howells <dhowells@redhat.com>
Subject: [PATCH] Fix things in the slow work facility

Fix a number of things in the slow work facility:

 (1) Better explain the way the PENDING and ENQ_DEFERRED flags interact.

 (2) Make slow_work_available() an inline function rather than a macro.

 (3) Use kthread_run() rather than kthread_create() and wake_up_process().

 (4) The return from kthread_create() and kthread_run() should be checked with
     IS_ERR() rather than the ! operator.

 (5) slow_work_max_max_threads can't be initialised if CONFIG_SYSCTL=n as it
     doesn't then exist.

Reported-by: Randy Dunlap <randy.dunlap@oracle.com>
Reported-by: Serge Hallyn <serue@us.ibm.com>
Reported-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: David Howells <dhowells@redhat.com>
---
diff --git a/kernel/slow-work.c b/kernel/slow-work.c
index b398352..5fb176e 100644
--- a/kernel/slow-work.c
+++ b/kernel/slow-work.c
@@ -271,12 +271,32 @@ int slow_work_enqueue(struct slow_work *work)
 	BUG_ON(!work->ops);
 	BUG_ON(!work->ops->get_ref);
 
+	/* when honouring an enqueue request, we only promise that we will run
+	 * the work function in the future; we do not promise to run it once
+	 * per enqueue request
+	 *
+	 * we use the PENDING bit to merge together repeat requests without
+	 * having to disable IRQs and take the spinlock, whilst still
+	 * maintaining our promise
+	 */
 	if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
 		spin_lock_irqsave(&slow_work_queue_lock, flags);
 
+		/* we promise that we will not attempt to execute the work
+		 * function in more than one thread simultaneously
+		 *
+		 * this, however, leaves us with a problem if we're asked to
+		 * enqueue the work whilst someone is executing the work
+		 * function as simply queueing the work immediately means that
+		 * another thread may try executing it whilst it is already
+		 * under execution
+		 *
+		 * to deal with this, we set the ENQ_DEFERRED bit instead of
+		 * enqueueing, and the thread currently executing the work
+		 * function will enqueue the work item when the work function
+		 * returns and it has cleared the EXECUTING bit
+		 */
 		if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
-			/* can't queue lest we cause multiple threads to try
-			 * executing this item, so defer for later */
 			set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
 		} else {
 			if (work->ops->get_ref(work) < 0)
@@ -326,6 +346,16 @@ static bool slow_work_cull_thread(void)
 }
 
 /*
+ *
+ */
+static inline bool slow_work_available(int vsmax)
+{
+	return !list_empty(&slow_work_queue) ||
+		(!list_empty(&vslow_work_queue) &&
+		 atomic_read(&vslow_work_executing_count) < (vsmax));
+}
+
+/*
  * Worker thread dispatcher
  */
 static int slow_work_thread(void *_data)
@@ -334,11 +364,6 @@ static int slow_work_thread(void *_data)
 
 	DEFINE_WAIT(wait);
 
-#define slow_work_available(vsmax) \
-	(!list_empty(&slow_work_queue) || \
-	 (!list_empty(&vslow_work_queue) && \
-	  atomic_read(&vslow_work_executing_count) < (vsmax)))
-
 	set_freezable();
 	set_user_nice(current, -5);
 
@@ -427,16 +452,14 @@ static void slow_work_new_thread_execute(struct slow_work *work)
 
 	slow_work_may_not_start_new_thread = true;
 	atomic_inc(&slow_work_thread_count);
-	p = kthread_create(slow_work_thread, NULL, "kslowd");
-	if (!p) {
+	p = kthread_run(slow_work_thread, NULL, "kslowd");
+	if (IS_ERR(p)) {
 		printk(KERN_DEBUG "Slow work thread pool: OOM\n");
 		if (atomic_dec_and_test(&slow_work_thread_count))
 			BUG(); /* we're running on a slow work thread... */
 		mod_timer(&slow_work_oom_timer,
 			  jiffies + SLOW_WORK_OOM_TIMEOUT);
 	} else {
-		wake_up_process(p);
-
 		/* ratelimit the starting of new threads */
 		mod_timer(&slow_work_oom_timer, jiffies + 1);
 	}
@@ -543,11 +566,10 @@ int slow_work_register_user(void)
 		/* start the minimum number of threads */
 		for (loop = 0; loop < slow_work_min_threads; loop++) {
 			atomic_inc(&slow_work_thread_count);
-			p = kthread_create(slow_work_thread, NULL,
-					   "kslow%Xd", loop);
-			if (!p)
+			p = kthread_run(slow_work_thread, NULL,
+					"kslow%Xd", loop);
+			if (IS_ERR(p))
 				goto error;
-			wake_up_process(p);
 		}
 		printk(KERN_NOTICE "Slow work thread pool: Ready\n");
 	}
@@ -609,8 +631,10 @@ static int __init init_slow_work(void)
 
 	if (slow_work_max_threads < nr_cpus)
 		slow_work_max_threads = nr_cpus;
+#ifdef CONFIG_SYSCTL
 	if (slow_work_max_max_threads < nr_cpus * 2)
 		slow_work_max_max_threads = nr_cpus * 2;
+#endif
 	return 0;
 }
 
