
 fs/reiser4/forward.h |    2 
 fs/reiser4/lock.h    |    1 

diff -puN fs/reiser4/forward.h~reiser4-fix-livelock fs/reiser4/forward.h


 fs/reiser4/forward.h |    2 
 fs/reiser4/lock.c    |  231 ++++++++++++++++++---------------------------------
 fs/reiser4/lock.h    |    1 
 3 files changed, 86 insertions(+), 148 deletions(-)

diff -puN fs/reiser4/forward.h~reiser4-fix-livelock fs/reiser4/forward.h
--- linux-2.6.14-rc4-mm1/fs/reiser4/forward.h~reiser4-fix-livelock	2005-10-19 18:41:42.000362250 +0400
+++ linux-2.6.14-rc4-mm1-vs/fs/reiser4/forward.h	2005-10-19 18:41:42.008362750 +0400
@@ -128,6 +128,8 @@ typedef enum {
 	ZNODE_NO_LOCK = 0,
 	ZNODE_READ_LOCK = 1,
 	ZNODE_WRITE_LOCK = 2,
+	/* invalidate_lock support, for internal use in lock.c */
+	ZNODE_INVALID_LOCK = 3
 } znode_lock_mode;
 
 /* type of lock request */
diff -puN fs/reiser4/lock.c~reiser4-fix-livelock fs/reiser4/lock.c
--- linux-2.6.14-rc4-mm1/fs/reiser4/lock.c~reiser4-fix-livelock	2005-10-19 18:41:42.004362500 +0400
+++ linux-2.6.14-rc4-mm1-vs/fs/reiser4/lock.c	2005-10-20 14:01:11.902437250 +0400
@@ -257,7 +257,6 @@ static inline void
 link_object(lock_handle * handle, lock_stack * owner, znode * node)
 {
 	assert("jmacd-810", handle->owner == NULL);
-	assert("nikita-1828", owner == get_current_lock_stack());
 	assert("nikita-1830", rw_zlock_is_locked(&node->lock));
 
 	handle->owner = owner;
@@ -303,7 +302,6 @@ static void lock_object(lock_stack * own
 {
 	lock_request *request;
 	znode *node;
-	assert("nikita-1839", owner == get_current_lock_stack());
 
 	request = &owner->request;
 	node = request->node;
@@ -425,30 +423,34 @@ static inline int check_deadlock_conditi
 	    && node->lock.nr_hipri_owners == 0;
 }
 
+static int check_livelock_condition(znode * node, znode_lock_mode mode)
+{
+	zlock * lock = &node->lock;
+
+	return mode == ZNODE_READ_LOCK && 
+		lock -> nr_readers >= 0 && lock->nr_hipri_write_requests > 0;
+}
+
 /* checks lock/request compatibility */
 static int can_lock_object(lock_stack * owner)
 {
 	znode *node = owner->request.node;
 
-	assert("nikita-1842", owner == get_current_lock_stack());
 	assert("nikita-1843", rw_zlock_is_locked(&node->lock));
 
 	/* See if the node is disconnected. */
-	if (unlikely(ZF_ISSET(node, JNODE_IS_DYING))) {
+	if (unlikely(ZF_ISSET(node, JNODE_IS_DYING)))
 		return RETERR(-EINVAL);
-	}
 
 	/* Do not ever try to take a lock if we are going in low priority
 	   direction and a node have a high priority request without high
 	   priority owners. */
-	if (unlikely(!owner->curpri && check_deadlock_condition(node))) {
+	if (unlikely(!owner->curpri && check_deadlock_condition(node)))
 		return RETERR(-E_REPEAT);
-	}
-
-	if (unlikely(!is_lock_compatible(node, owner->request.mode))) {
+	if (unlikely(owner->curpri && check_livelock_condition(node, owner->request.mode)))
+		return RETERR(-E_REPEAT);
+	if (unlikely(!is_lock_compatible(node, owner->request.mode)))
 		return RETERR(-E_REPEAT);
-	}
-
 	return 0;
 }
 
@@ -524,108 +526,46 @@ static void set_low_priority(lock_stack 
 	}
 }
 
-#define MAX_CONVOY_SIZE ((NR_CPUS - 1))
-
-/* helper function used by longterm_unlock_znode() to wake up requestor(s). */
-/*
- * In certain multi threaded work loads jnode spin lock is the most
- * contented one. Wake up of threads waiting for znode is, thus,
- * important to do right. There are three well known strategies:
- *
- *  (1) direct hand-off. Hasn't been tried.
- *
- *  (2) wake all (thundering herd). This degrades performance in our
- *      case.
- *
- *  (3) wake one. Simplest solution where requestor in the front of
- *      requestors list is awaken under znode spin lock is not very
- *      good on the SMP, because first thing requestor will try to do
- *      after waking up on another CPU is to acquire znode spin lock
- *      that is still held by this thread. As an optimization we grab
- *      lock stack spin lock, release znode spin lock and wake
- *      requestor. done_context() synchronize against stack spin lock
- *      to avoid (impossible) case where requestor has been waked by
- *      some other thread (wake_up_all_lopri_owners(), or something
- *      similar) and managed to exit before we waked it up.
- *
- *      Effect of this optimization wasn't big, after all.
- *
- */
-static void wake_up_requestor(znode * node)
+static void remove_lock_request(lock_stack * requestor)
 {
-#if NR_CPUS > 2
-	struct list_head *creditors;
-	lock_stack *convoy[MAX_CONVOY_SIZE];
-	int convoyused;
-	int convoylimit;
-
-	assert("nikita-3180", node != NULL);
-	assert("nikita-3181", rw_zlock_is_locked(&node->lock));
-
-	convoyused = 0;
-	convoylimit = min(num_online_cpus() - 1, MAX_CONVOY_SIZE);
-	creditors = &node->lock.requestors;
-	if (!list_empty_careful(creditors)) {
-		convoy[0] = list_entry(creditors->next, lock_stack, requestors_link);
-		convoyused = 1;
-		/*
-		 * it has been verified experimentally, that there are no
-		 * convoys on the leaf level.
-		 */
-		if (znode_get_level(node) != LEAF_LEVEL &&
-		    convoy[0]->request.mode == ZNODE_READ_LOCK &&
-		    convoylimit > 1) {
-			lock_stack *item;
-
-			for (item = list_entry(convoy[0]->requestors_link.next, lock_stack, requestors_link);
-			     creditors != &item->requestors_link;
-			     item = list_entry(item->requestors_link.next, lock_stack, requestors_link)) {
-				if (item->request.mode == ZNODE_READ_LOCK) {
-					convoy[convoyused] = item;
-					++convoyused;
-					/*
-					 * it is safe to spin lock multiple
-					 * lock stacks here, because lock
-					 * stack cannot sleep on more than one
-					 * requestors queue.
-					 */
-					/*
-					 * use raw spin_lock in stead of macro
-					 * wrappers, because spin lock
-					 * profiling code cannot cope with so
-					 * many locks held at the same time.
-					 */
-					spin_lock(&item->sguard.lock);
-					if (convoyused == convoylimit)
-						break;
-				}
-			}
-		}
-		spin_lock(&convoy[0]->sguard.lock);
+	zlock * lock = &requestor->request.node->lock;
+
+	if (requestor->curpri) {
+		assert("nikita-1838", lock->nr_hipri_requests > 0);
+		lock->nr_hipri_requests--;
+		if (requestor->request.mode == ZNODE_WRITE_LOCK)
+			lock->nr_hipri_write_requests --;
 	}
+	list_del_init(&requestor->requestors_link);
+}
 
-	WUNLOCK_ZLOCK(&node->lock);
+static void dispatch_lock_requests(znode * node)
+{
+	lock_stack *requestor, *tmp;
 
-	while (convoyused > 0) {
-		--convoyused;
-		__reiser4_wake_up(convoy[convoyused]);
-		spin_unlock(&convoy[convoyused]->sguard.lock);
-	}
-#else
-	/* uniprocessor case: keep it simple */
-	if (!list_empty_careful(&node->lock.requestors)) {
-		lock_stack *requestor;
-
-		requestor = list_entry(node->lock.requestors.next, lock_stack,
-				       requestors_link);
-		reiser4_wake_up(requestor);
-	}
+	assert("zam-1056", rw_zlock_is_locked(&node->lock));
 
-	WUNLOCK_ZLOCK(&node->lock);
-#endif
-}
+	list_for_each_entry_safe(requestor, tmp, &node->lock.requestors, requestors_link) {
+		int can_lock;
 
-#undef MAX_CONVOY_SIZE
+		/* invalidate_lock() algorithm support */
+		if (requestor->request.mode == ZNODE_INVALID_LOCK) {
+			reiser4_wake_up(requestor);
+			continue;
+		}
+		can_lock = can_lock_object(requestor);
+		if (can_lock == -EINVAL || atomic_read(&requestor->nr_signaled)) {
+			reiser4_wake_up(requestor);
+			continue;
+		}
+		if (can_lock == 0) {
+			lock_object(requestor);
+			remove_lock_request(requestor);
+			requestor->request.mode = ZNODE_NO_LOCK;
+			reiser4_wake_up(requestor);
+		}
+	}
+}
 
 /* release long-term lock, acquired by longterm_lock_znode() */
 void longterm_unlock_znode(lock_handle * handle)
@@ -704,9 +644,10 @@ void longterm_unlock_znode(lock_handle *
 
 	/* If there are pending lock requests we wake up a requestor */
 	if (!znode_is_wlocked(node))
-		wake_up_requestor(node);
-	else
-		WUNLOCK_ZLOCK(&node->lock);
+		dispatch_lock_requests(node);
+	if (check_deadlock_condition(node))
+		wake_up_all_lopri_owners(node);
+	WUNLOCK_ZLOCK(&node->lock);
 
 	assert("nikita-3182", rw_zlock_is_not_locked(&node->lock));
 	/* minus one reference from handle->node */
@@ -719,7 +660,7 @@ void longterm_unlock_znode(lock_handle *
 
 /* final portion of longterm-lock */
 static int
-lock_tail(lock_stack * owner, int wake_up_next, int ok, znode_lock_mode mode)
+lock_tail(lock_stack * owner, int ok, znode_lock_mode mode)
 {
 	znode *node = owner->request.node;
 
@@ -729,15 +670,8 @@ lock_tail(lock_stack * owner, int wake_u
 	if (ok == 0) {
 		lock_object(owner);
 		owner->request.mode = 0;
-		if (mode == ZNODE_READ_LOCK)
-			wake_up_next = 1;
 	}
 
-	if (wake_up_next)
-		wake_up_requestor(node);
-	else
-		WUNLOCK_ZLOCK(&node->lock);
-
 	if (ok == 0) {
 		/* count a reference from lockhandle->node
 
@@ -748,8 +682,10 @@ lock_tail(lock_stack * owner, int wake_u
 		zref(node);
 
 		LOCK_CNT_INC(long_term_locked_znode);
-	}
-
+	} else if (ok == -EINVAL)
+		/* wake the invalidate_lock() thread up. */
+		dispatch_lock_requests(node);
+	WUNLOCK_ZLOCK(&node->lock);
 	ON_DEBUG(check_lock_data());
 	ON_DEBUG(check_lock_node_data(node));
 	return ok;
@@ -763,7 +699,6 @@ lock_tail(lock_stack * owner, int wake_u
 static int longterm_lock_tryfast(lock_stack * owner)
 {
 	int result;
-	int wake_up_next = 0;
 	znode *node;
 	zlock *lock;
 
@@ -786,7 +721,6 @@ static int longterm_lock_tryfast(lock_st
 		WLOCK_ZLOCK(lock);
 		if (unlikely(result != 0)) {
 			owner->request.mode = 0;
-			wake_up_next = 1;
 		} else {
 			result = can_lock_object(owner);
 			if (unlikely(result == -E_REPEAT)) {
@@ -795,7 +729,7 @@ static int longterm_lock_tryfast(lock_st
 				return 1;
 			}
 		}
-		return lock_tail(owner, wake_up_next, result, ZNODE_READ_LOCK);
+		return lock_tail(owner, result, ZNODE_READ_LOCK);
 	} else
 		return 1;
 }
@@ -813,7 +747,6 @@ int longterm_lock_znode(
 			       znode_lock_request request) {
 	int ret;
 	int hipri = (request & ZNODE_LOCK_HIPRI) != 0;
-	int wake_up_next = 0;
 	int non_blocking = 0;
 	int has_atom;
 	txn_capture cap_flags;
@@ -877,7 +810,7 @@ int longterm_lock_znode(
 
 	if (znode_is_locked(node) &&
 	    mode == ZNODE_WRITE_LOCK && recursive(owner))
-		return lock_tail(owner, 0, 0, mode);
+		return lock_tail(owner, 0, mode);
 
 	for (;;) {
 		/* Check the lock's availability: if it is unavaiable we get
@@ -887,8 +820,6 @@ int longterm_lock_znode(
 
 		if (unlikely(ret == -EINVAL)) {
 			/* @node is dying. Leave it alone. */
-			/* wakeup next requestor to support lock invalidating */
-			wake_up_next = 1;
 			break;
 		}
 
@@ -981,8 +912,6 @@ int longterm_lock_znode(
 				   reacquire it so we should return here,
 				   avoid releasing the lock. */
 				owner->request.mode = 0;
-				/* next requestor may not fail */
-				wake_up_next = 1;
 				break;
 			}
 
@@ -1014,39 +943,44 @@ int longterm_lock_znode(
 			   increase high priority requests counter for the
 			   node */
 			lock->nr_hipri_requests++;
+			if (mode == ZNODE_WRITE_LOCK)
+				lock->nr_hipri_write_requests ++;
 			/* If there are no high priority owners for a node,
 			   then immediately wake up low priority owners, so
 			   they can detect possible deadlock */
 			if (lock->nr_hipri_owners == 0)
 				wake_up_all_lopri_owners(node);
-			/* And prepare a lock request */
-			list_add(&owner->requestors_link, &lock->requestors);
-		} else {
-			/* If we are going in low priority direction then we
-			   set low priority to our process. This is the only
-			   case  when a process may become low priority */
-			/* And finally prepare a lock request */
-			list_add_tail(&owner->requestors_link, &lock->requestors);
 		}
+		list_add_tail(&owner->requestors_link, &lock->requestors);
 
 		/* Ok, here we have prepared a lock request, so unlock
 		   a znode ... */
 		WUNLOCK_ZLOCK(lock);
 		/* ... and sleep */
 		go_to_sleep(owner);
-
+		/* Fast check whether the lock was passed
+		 * from another thread by dispatch_lock_requests. */
+		/* FIXME(Zam):  return more error codes from
+		 * dispatch_lock_requests() would help to avoid spin-locks here. */
+		if (owner->request.mode == ZNODE_NO_LOCK) {
+		lock_is_done:
+			LOCK_CNT_INC(long_term_locked_znode);
+			/* the request was processed successfully by
+			 * dispatch_lock_requests() */
+			zref(node);
+			return 0;
+		}
 		WLOCK_ZLOCK(lock);
-
-		if (hipri) {
-			assert("nikita-1838", lock->nr_hipri_requests > 0);
-			lock->nr_hipri_requests--;
+		/* non-racy check the same after getting the spin-lock. */
+		if (unlikely(owner->request.mode == ZNODE_NO_LOCK)) {
+			WUNLOCK_ZLOCK(lock);
+			goto lock_is_done;
 		}
-
-		list_del_init(&owner->requestors_link);
+		remove_lock_request(owner);
 	}
 
 	assert("jmacd-807/a", rw_zlock_is_locked(&node->lock));
-	return lock_tail(owner, wake_up_next, ret, mode);
+	return lock_tail(owner, ret, mode);
 }
 
 /* lock object invalidation means changing of lock object state to `INVALID'
@@ -1079,9 +1013,10 @@ void invalidate_lock(lock_handle * handl
 	list_for_each_entry(rq, &node->lock.requestors, requestors_link)
 		reiser4_wake_up(rq);
 
-	/* We use that each unlock() will wakeup first item from requestors
-	   list; our lock stack is the last one. */
 	while (!list_empty_careful(&node->lock.requestors)) {
+		/* inform dispatch_lock_requests that this thread should be
+		 * woken up unconditionally */
+		owner->request.mode = ZNODE_INVALID_LOCK;
 		list_add_tail(&owner->requestors_link, &node->lock.requestors);
 
 		prepare_to_sleep(owner);
@@ -1381,6 +1316,6 @@ const char *lock_mode_name(znode_lock_mo
    mode-name: "LC"
    c-basic-offset: 8
    tab-width: 8
-   fill-column: 120
+   fill-column: 79
    End:
 */
diff -puN fs/reiser4/lock.h~reiser4-fix-livelock fs/reiser4/lock.h
--- linux-2.6.14-rc4-mm1/fs/reiser4/lock.h~reiser4-fix-livelock	2005-10-19 18:41:42.008362750 +0400
+++ linux-2.6.14-rc4-mm1-vs/fs/reiser4/lock.h	2005-10-20 13:55:39.497663250 +0400
@@ -34,6 +34,7 @@ struct zlock {
 	unsigned nr_hipri_requests;
 	/* A linked list of lock_handle objects that contains pointers
 	   for all lock_stacks which have this lock object locked */
+	unsigned nr_hipri_write_requests;
 	struct list_head owners;
 	/* A linked list of lock_stacks that wait for this lock */
 	struct list_head requestors;

_
