Skip to main content

java-wait-notify

· 9 min read

背景

java的线程间通信,偶尔会用到wait和notify

实现

注册:

// Register native methods of Object
void java_lang_Object::register_natives(TRAPS) {
InstanceKlass* obj = vmClasses::Object_klass();
Method::register_native(obj, vmSymbols::hashCode_name(),
vmSymbols::void_int_signature(), (address) &JVM_IHashCode, CHECK);
Method::register_native(obj, vmSymbols::wait_name(),
vmSymbols::long_void_signature(), (address) &JVM_MonitorWait, CHECK);
Method::register_native(obj, vmSymbols::notify_name(),
vmSymbols::void_method_signature(), (address) &JVM_MonitorNotify, CHECK);
Method::register_native(obj, vmSymbols::notifyAll_name(),
vmSymbols::void_method_signature(), (address) &JVM_MonitorNotifyAll, CHECK);
Method::register_native(obj, vmSymbols::clone_name(),
vmSymbols::void_object_signature(), (address) &JVM_Clone, THREAD);
}
// -----------------------------------------------------------------------------
// Wait/Notify/NotifyAll
//
// Note: a subset of changes to ObjectMonitor::wait()
// will need to be replicated in complete_exit
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
JavaThread* current = THREAD;

assert(InitDone, "Unexpectedly not initialized");

CHECK_OWNER(); // Throws IMSE if not owner.

EventJavaMonitorWait event;

// check for a pending interrupt
if (interruptible && current->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {
// post monitor waited event. Note that this is past-tense, we are done waiting.
if (JvmtiExport::should_post_monitor_waited()) {
// Note: 'false' parameter is passed here because the
// wait was not timed out due to thread interrupt.
JvmtiExport::post_monitor_waited(current, this, false);

// In this short circuit of the monitor wait protocol, the
// current thread never drops ownership of the monitor and
// never gets added to the wait queue so the current thread
// cannot be made the successor. This means that the
// JVMTI_EVENT_MONITOR_WAITED event handler cannot accidentally
// consume an unpark() meant for the ParkEvent associated with
// this ObjectMonitor.
}
if (event.should_commit()) {
post_monitor_wait_event(&event, this, 0, millis, false);
}
THROW(vmSymbols::java_lang_InterruptedException());
return;
}

assert(current->_Stalled == 0, "invariant");
current->_Stalled = intptr_t(this);
current->set_current_waiting_monitor(this);

// create a node to be put into the queue
// Critically, after we reset() the event but prior to park(), we must check
// for a pending interrupt.
ObjectWaiter node(current);
node.TState = ObjectWaiter::TS_WAIT;
current->_ParkEvent->reset();
OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag

// Enter the waiting queue, which is a circular doubly linked list in this case
// but it could be a priority queue or any data structure.
// _WaitSetLock protects the wait queue. Normally the wait queue is accessed only
// by the the owner of the monitor *except* in the case where park()
// returns because of a timeout of interrupt. Contention is exceptionally rare
// so we use a simple spin-lock instead of a heavier-weight blocking lock.

Thread::SpinAcquire(&_WaitSetLock, "WaitSet - add");
AddWaiter(&node);
Thread::SpinRelease(&_WaitSetLock);

_Responsible = NULL;

intx save = _recursions; // record the old recursion count
_waiters++; // increment the number of waiters
_recursions = 0; // set the recursion level to be 1
exit(current); // exit the monitor
guarantee(owner_raw() != current, "invariant");

// The thread is on the WaitSet list - now park() it.
// On MP systems it's conceivable that a brief spin before we park
// could be profitable.
//
// TODO-FIXME: change the following logic to a loop of the form
// while (!timeout && !interrupted && _notified == 0) park()

int ret = OS_OK;
int WasNotified = 0;

// Need to check interrupt state whilst still _thread_in_vm
bool interrupted = interruptible && current->is_interrupted(false);

{ // State transition wrappers
OSThread* osthread = current->osthread();
OSThreadWaitState osts(osthread, true);

assert(current->thread_state() == _thread_in_vm, "invariant");

{
ClearSuccOnSuspend csos(this);
ThreadBlockInVMPreprocess<ClearSuccOnSuspend> tbivs(current, csos, true /* allow_suspend */);
if (interrupted || HAS_PENDING_EXCEPTION) {
// Intentionally empty
} else if (node._notified == 0) {
if (millis <= 0) {
current->_ParkEvent->park();
} else {
ret = current->_ParkEvent->park(millis);
}
}
}

// Node may be on the WaitSet, the EntryList (or cxq), or in transition
// from the WaitSet to the EntryList.
// See if we need to remove Node from the WaitSet.
// We use double-checked locking to avoid grabbing _WaitSetLock
// if the thread is not on the wait queue.
//
// Note that we don't need a fence before the fetch of TState.
// In the worst case we'll fetch a old-stale value of TS_WAIT previously
// written by the is thread. (perhaps the fetch might even be satisfied
// by a look-aside into the processor's own store buffer, although given
// the length of the code path between the prior ST and this load that's
// highly unlikely). If the following LD fetches a stale TS_WAIT value
// then we'll acquire the lock and then re-fetch a fresh TState value.
// That is, we fail toward safety.

if (node.TState == ObjectWaiter::TS_WAIT) {
Thread::SpinAcquire(&_WaitSetLock, "WaitSet - unlink");
if (node.TState == ObjectWaiter::TS_WAIT) {
DequeueSpecificWaiter(&node); // unlink from WaitSet
assert(node._notified == 0, "invariant");
node.TState = ObjectWaiter::TS_RUN;
}
Thread::SpinRelease(&_WaitSetLock);
}

// The thread is now either on off-list (TS_RUN),
// on the EntryList (TS_ENTER), or on the cxq (TS_CXQ).
// The Node's TState variable is stable from the perspective of this thread.
// No other threads will asynchronously modify TState.
guarantee(node.TState != ObjectWaiter::TS_WAIT, "invariant");
OrderAccess::loadload();
if (_succ == current) _succ = NULL;
WasNotified = node._notified;

// Reentry phase -- reacquire the monitor.
// re-enter contended monitor after object.wait().
// retain OBJECT_WAIT state until re-enter successfully completes
// Thread state is thread_in_vm and oop access is again safe,
// although the raw address of the object may have changed.
// (Don't cache naked oops over safepoints, of course).

// post monitor waited event. Note that this is past-tense, we are done waiting.
if (JvmtiExport::should_post_monitor_waited()) {
JvmtiExport::post_monitor_waited(current, this, ret == OS_TIMEOUT);

if (node._notified != 0 && _succ == current) {
// In this part of the monitor wait-notify-reenter protocol it
// is possible (and normal) for another thread to do a fastpath
// monitor enter-exit while this thread is still trying to get
// to the reenter portion of the protocol.
//
// The ObjectMonitor was notified and the current thread is
// the successor which also means that an unpark() has already
// been done. The JVMTI_EVENT_MONITOR_WAITED event handler can
// consume the unpark() that was done when the successor was
// set because the same ParkEvent is shared between Java
// monitors and JVM/TI RawMonitors (for now).
//
// We redo the unpark() to ensure forward progress, i.e., we
// don't want all pending threads hanging (parked) with none
// entering the unlocked monitor.
node._event->unpark();
}
}

if (event.should_commit()) {
post_monitor_wait_event(&event, this, node._notifier_tid, millis, ret == OS_TIMEOUT);
}

OrderAccess::fence();

assert(current->_Stalled != 0, "invariant");
current->_Stalled = 0;

assert(owner_raw() != current, "invariant");
ObjectWaiter::TStates v = node.TState;
if (v == ObjectWaiter::TS_RUN) {
enter(current);
} else {
guarantee(v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant");
ReenterI(current, &node);
node.wait_reenter_end(this);
}

// current has reacquired the lock.
// Lifecycle - the node representing current must not appear on any queues.
// Node is about to go out-of-scope, but even if it were immortal we wouldn't
// want residual elements associated with this thread left on any lists.
guarantee(node.TState == ObjectWaiter::TS_RUN, "invariant");
assert(owner_raw() == current, "invariant");
assert(_succ != current, "invariant");
} // OSThreadWaitState()

current->set_current_waiting_monitor(NULL);

guarantee(_recursions == 0, "invariant");
_recursions = save // restore the old recursion count
+ JvmtiDeferredUpdates::get_and_reset_relock_count_after_wait(current); // increased by the deferred relock count
_waiters--; // decrement the number of waiters

// Verify a few postconditions
assert(owner_raw() == current, "invariant");
assert(_succ != current, "invariant");
assert(object()->mark() == markWord::encode(this), "invariant");

// check if the notification happened
if (!WasNotified) {
// no, it could be timeout or Thread.interrupt() or both
// check for interrupt event, otherwise it is timeout
if (interruptible && current->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {
THROW(vmSymbols::java_lang_InterruptedException());
}
}

// NOTE: Spurious wake up will be consider as timeout.
// Monitor notify has precedence over thread interrupt.
}

wait:

Thread 20 "Thread-0" hit Breakpoint 2, __pthread_cond_wait (cond=0x7ffff0510058, mutex=0x7ffff0510030) at forward.c:121
121 forward.c: No such file or directory.
(gdb) bt
#0 __pthread_cond_wait (cond=0x7ffff0510058, mutex=0x7ffff0510030) at forward.c:121
#1 0x00007ffff6c21713 in os::PlatformEvent::park (this=0x7ffff0510000) at /home/ubuntu/daixiao/jdk/src/hotspot/os/posix/os_posix.cpp:1484
#2 0x00007ffff6bd003c in ObjectMonitor::wait (this=0x7fffac0013b0, millis=0, interruptible=true, __the_thread__=0x7ffff050f5b0) at /home/ubuntu/daixiao/jdk/src/hotspot/share/runtime/objectMonitor.cpp:1544
#3 0x00007ffff6e90188 in ObjectSynchronizer::wait (obj=..., millis=0, __the_thread__=0x7ffff050f5b0) at /home/ubuntu/daixiao/jdk/src/hotspot/share/runtime/synchronizer.cpp:654
#4 0x00007ffff68298ae in JVM_MonitorWait (env=0x7ffff050f8a8, handle=0x7fffd0df77c0, ms=0) at /home/ubuntu/daixiao/jdk/src/hotspot/share/prims/jvm.cpp:617
#5 0x00007fffe100f68b in ?? ()
#6 0x00000008f7c32db8 in ?? ()
#7 0x00007ffff050f5b0 in ?? ()
#8 0x00007fffd0df7760 in ?? ()
#9 0x00007fffd0df7748 in ?? ()
#10 0x0000000000000000 in ?? ()

notify:

(gdb) bt
#0 __pthread_cond_signal (cond=0x7ffff04f0958) at forward.c:110
#1 0x00007ffff6c21c13 in os::PlatformEvent::unpark (this=0x7ffff04f0900) at /home/ubuntu/daixiao/jdk/src/hotspot/os/posix/os_posix.cpp:1590
#2 0x00007ffff6bcf654 in ObjectMonitor::ExitEpilog (this=0x7fffac0010b0, current=0x7ffff04ef410, Wakee=0x0) at /home/ubuntu/daixiao/jdk/src/hotspot/share/runtime/objectMonitor.cpp:1350
#3 0x00007ffff6bcf57b in ObjectMonitor::exit (this=0x7fffac0010b0, current=0x7ffff04ef410, not_suspended=true) at /home/ubuntu/daixiao/jdk/src/hotspot/share/runtime/objectMonitor.cpp:1321
#4 0x00007ffff6bcfe8e in ObjectMonitor::wait (this=0x7fffac0010b0, millis=0, interruptible=true, __the_thread__=0x7ffff04ef410) at /home/ubuntu/daixiao/jdk/src/hotspot/share/runtime/objectMonitor.cpp:1515
#5 0x00007ffff6e90188 in ObjectSynchronizer::wait (obj=..., millis=0, __the_thread__=0x7ffff04ef410) at /home/ubuntu/daixiao/jdk/src/hotspot/share/runtime/synchronizer.cpp:654
#6 0x00007ffff68298ae in JVM_MonitorWait (env=0x7ffff04ef708, handle=0x7fffd0df77c0, ms=0) at /home/ubuntu/daixiao/jdk/src/hotspot/share/prims/jvm.cpp:617
#7 0x00007fffe100f68b in ?? ()
#8 0x00000008f7c32db8 in ?? ()
#9 0x00007ffff04ef410 in ?? ()
#10 0x00007fffd0df7760 in ?? ()
#11 0x00007fffd0df7748 in ?? ()
#12 0x0000000000000000 in ?? ()
void PlatformEvent::park() {       // AKA "down()"
// Transitions for _event:
// -1 => -1 : illegal
// 1 => 0 : pass - return immediately
// 0 => -1 : block; then set _event to 0 before returning

// Invariant: Only the thread associated with the PlatformEvent
// may call park().
assert(_nParked == 0, "invariant");

int v;

// atomically decrement _event
for (;;) {
v = _event;
if (Atomic::cmpxchg(&_event, v, v - 1) == v) break;
}
guarantee(v >= 0, "invariant");

if (v == 0) { // Do this the hard way by blocking ...
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "mutex_lock");
guarantee(_nParked == 0, "invariant");
++_nParked;
while (_event < 0) {
// OS-level "spurious wakeups" are ignored
status = pthread_cond_wait(_cond, _mutex);
assert_status(status == 0 MACOS_ONLY(|| status == ETIMEDOUT),
status, "cond_wait");
}
--_nParked;

_event = 0;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "mutex_unlock");
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other.
OrderAccess::fence();
}
guarantee(_event >= 0, "invariant");
}

demo

#include <stdio.h>
#include <pthread.h>
#include<unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int condition = 0;
int count = 0;
pthread_t thread_id;
int consume( void )
{
while( 1 )
{
pthread_mutex_lock( &mutex );
while( condition == 0 )
pthread_cond_wait( &cond, &mutex );
printf( "Consumed %d\n", count );
condition = 0;
pthread_cond_signal( &cond );
pthread_mutex_unlock( &mutex );
}

return( 0 );
}

void* produce( void * arg )
{
while( 1 )
{
pthread_mutex_lock( &mutex );
while( condition == 1 )
pthread_cond_wait( &cond, &mutex );
printf( "Produced %d\n", count++ );
condition = 1;
pthread_cond_signal( &cond );
pthread_mutex_unlock( &mutex );
}
return( 0 );
}

int main( void )
{
pthread_create( thread_id, NULL, &produce, NULL );
return consume();
}

例子

#include<pthread.h>
#include<stdlib.h>
#include<stdio.h>
#include<string.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int num = 0;
int main(){
int error = pthread_mutex_lock( &mutex);
if(error){
perror(strerror(error));
exit(0);
}
while(num <= 0){
error = pthread_cond_wait( &cond, &mutex);
if(error){
perror(strerror(error));
exit(0);
}
}
pthread_mutex_unlock(&mutex);
return 0;

}

相关阅读