@ -139,12 +139,8 @@
#![ allow(clippy::expl_impl_clone_on_copy) ]
#![ allow(clippy::expl_impl_clone_on_copy) ]
#![ allow(unstable_name_collisions) ]
#![ allow(unstable_name_collisions) ]
use std ::{
use std ::{ cell ::UnsafeCell , sync ::atomic ::Ordering } ;
cell ::UnsafeCell ,
sync ::{ atomic ::Ordering , Condvar , Mutex } ,
} ;
use intrusive_collections ::{ intrusive_adapter , LinkedList , LinkedListLink , UnsafeRef } ;
use sptr ::Strict ;
use sptr ::Strict ;
use crate ::{
use crate ::{
@ -152,107 +148,130 @@ use crate::{
array_buffer ::{ utils ::SliceRef , SharedArrayBuffer } ,
array_buffer ::{ utils ::SliceRef , SharedArrayBuffer } ,
typed_array ::Element ,
typed_array ::Element ,
} ,
} ,
small_map ::{ Entry , SmallMap } ,
sys ::time ::{ Duration , Instant } ,
sys ::time ::{ Duration , Instant } ,
JsNativeError , JsResult ,
JsNativeError , JsResult ,
} ;
} ;
/// Map of shared data addresses and its corresponding list of agents waiting on that location.
mod sync {
pub ( crate ) static CRITICAL_SECTION : Mutex < FutexWaiters > = Mutex ::new ( FutexWaiters {
use std ::sync ::{ Condvar , Mutex , MutexGuard } ;
waiters : SmallMap ::new ( ) ,
} ) ;
/// A waiter of a memory address.
#[ derive(Debug, Default) ]
pub ( crate ) struct FutexWaiter {
pub ( super ) link : LinkedListLink ,
pub ( super ) cond_var : Condvar ,
pub ( super ) waiting : bool ,
addr : usize ,
}
intrusive_adapter ! ( FutexWaiterAdapter = UnsafeRef < FutexWaiter > : FutexWaiter { link : LinkedListLink } ) ;
use intrusive_collections ::{ intrusive_adapter , LinkedList , LinkedListLink , UnsafeRef } ;
/// List of memory addresses and its corresponding list of waiters for that address.
use crate ::{
#[ derive(Debug) ]
small_map ::{ Entry , SmallMap } ,
pub ( crate ) struct FutexWaiters {
JsNativeError , JsResult ,
waiters : SmallMap < usize , LinkedList < FutexWaiterAdapter > , 16 > ,
} ;
}
impl FutexWaiters {
/// A waiter of a memory address.
/// Notifies at most `max_count` waiters that are waiting on the address `addr`, and
#[ derive(Debug, Default) ]
/// returns the number of waiters that were notified.
pub ( crate ) struct FutexWaiter {
///
pub ( super ) link : LinkedListLink ,
/// Equivalent to [`RemoveWaiters`][remove] and [`NotifyWaiter`][notify], but in a single operation.
pub ( super ) cond_var : Condvar ,
///
pub ( super ) waiting : bool ,
/// [remove]: https://tc39.es/ecma262/#sec-removewaiters
addr : usize ,
/// [notify]: https://tc39.es/ecma262/#sec-notifywaiter
}
pub ( crate ) fn notify_many ( & mut self , addr : usize , max_count : u64 ) -> u64 {
let Entry ::Occupied ( mut wl ) = self . waiters . entry ( addr ) else {
intrusive_adapter ! ( FutexWaiterAdapter = UnsafeRef < FutexWaiter > : FutexWaiter { link : LinkedListLink } ) ;
return 0 ;
} ;
/// List of memory addresses and its corresponding list of waiters for that address.
#[ derive(Debug) ]
for i in 0 .. max_count {
pub ( super ) struct FutexWaiters {
let Some ( elem ) = wl . get_mut ( ) . pop_front ( ) else {
waiters : SmallMap < usize , LinkedList < FutexWaiterAdapter > , 16 > ,
wl . remove ( ) ;
}
return i ;
// SAFETY: `FutexWaiters` is not constructable outside its `get` method, and it's only exposed by
// a global lock, meaning the inner data of `FutexWaiters` (which includes non-Send pointers)
// can only be accessed by a single thread at once.
unsafe impl Send for FutexWaiters { }
impl FutexWaiters {
/// Gets the map of all shared data addresses and its corresponding list of agents waiting on that location.
pub ( super ) fn get ( ) -> JsResult < MutexGuard < ' static , Self > > {
static CRITICAL_SECTION : Mutex < FutexWaiters > = Mutex ::new ( FutexWaiters {
waiters : SmallMap ::new ( ) ,
} ) ;
CRITICAL_SECTION . lock ( ) . map_err ( | _ | {
JsNativeError ::typ ( )
. with_message ( "failed to synchronize with the agent cluster" )
. into ( )
} )
}
/// Notifies at most `max_count` waiters that are waiting on the address `addr`, and
/// returns the number of waiters that were notified.
///
/// Equivalent to [`RemoveWaiters`][remove] and [`NotifyWaiter`][notify], but in a single operation.
///
/// [remove]: https://tc39.es/ecma262/#sec-removewaiters
/// [notify]: https://tc39.es/ecma262/#sec-notifywaiter
pub ( super ) fn notify_many ( & mut self , addr : usize , max_count : u64 ) -> u64 {
let Entry ::Occupied ( mut wl ) = self . waiters . entry ( addr ) else {
return 0 ;
} ;
} ;
elem . cond_var . notify_one ( ) ;
for i in 0 .. max_count {
let Some ( elem ) = wl . get_mut ( ) . pop_front ( ) else {
wl . remove ( ) ;
return i ;
} ;
// SAFETY: all elements of the waiters list are guaranteed to be valid.
elem . cond_var . notify_one ( ) ;
unsafe {
( * UnsafeRef ::into_raw ( elem ) ) . waiting = false ;
// SAFETY: all elements of the waiters list are guaranteed to be valid.
unsafe {
( * UnsafeRef ::into_raw ( elem ) ) . waiting = false ;
}
}
}
}
if wl . get ( ) . is_empty ( ) {
if wl . get ( ) . is_empty ( ) {
wl . remove ( ) ;
wl . remove ( ) ;
}
}
max_count
max_count
}
}
/// # Safety
/// # Safety
///
///
/// - `node` must NOT be linked to an existing waiter list.
/// - `node` must NOT be linked to an existing waiter list.
/// - `node` must always point to a valid instance of `FutexWaiter` until `node` is
/// - `node` must always point to a valid instance of `FutexWaiter` until `node` is
/// removed from its linked list. This can happen by either `remove_waiter` or `notify_many`.
/// removed from its linked list. This can happen by either `remove_waiter` or `notify_many`.
pub ( crate ) unsafe fn add_waiter ( & mut self , node : * mut FutexWaiter , addr : usize ) {
pub ( super ) unsafe fn add_waiter ( & mut self , node : * mut FutexWaiter , addr : usize ) {
// SAFETY: `node` must point to a valid instance.
// SAFETY: `node` must point to a valid instance.
let node = unsafe {
let node = unsafe {
debug_assert! ( ! ( * node ) . link . is_linked ( ) ) ;
debug_assert! ( ! ( * node ) . link . is_linked ( ) ) ;
( * node ) . waiting = true ;
( * node ) . waiting = true ;
( * node ) . addr = addr ;
( * node ) . addr = addr ;
UnsafeRef ::from_raw ( node )
UnsafeRef ::from_raw ( node )
} ;
} ;
self . waiters
. entry ( addr )
. or_insert_with ( | | LinkedList ::new ( FutexWaiterAdapter ::new ( ) ) )
. push_back ( node ) ;
}
/// # Safety
self . waiters
///
. entry ( addr )
/// - `node` must point to a valid instance of `FutexWaiter`.
. or_insert_with ( | | LinkedList ::new ( FutexWaiterAdapter ::new ( ) ) )
/// - `node` must be inside the wait list associated with `node.addr`.
. push_back ( node ) ;
pub ( crate ) unsafe fn remove_waiter ( & mut self , node : * mut FutexWaiter ) {
// SAFETY: `node` must point to a valid instance.
let addr = unsafe { ( * node ) . addr } ;
let mut wl = match self . waiters . entry ( addr ) {
Entry ::Occupied ( wl ) = > wl ,
Entry ::Vacant ( _ ) = > return ,
} ;
// SAFETY: `node` must be inside the wait list associated with `node.addr`.
unsafe {
wl . get_mut ( ) . cursor_mut_from_ptr ( node ) . remove ( ) ;
}
}
if wl . get ( ) . is_empty ( ) {
/// # Safety
wl . remove ( ) ;
///
/// - `node` must point to a valid instance of `FutexWaiter`.
/// - `node` must be inside the wait list associated with `node.addr`.
pub ( super ) unsafe fn remove_waiter ( & mut self , node : * mut FutexWaiter ) {
// SAFETY: `node` must point to a valid instance.
let addr = unsafe { ( * node ) . addr } ;
let mut wl = match self . waiters . entry ( addr ) {
Entry ::Occupied ( wl ) = > wl ,
Entry ::Vacant ( _ ) = > return ,
} ;
// SAFETY: `node` must be inside the wait list associated with `node.addr`.
unsafe {
wl . get_mut ( ) . cursor_mut_from_ptr ( node ) . remove ( ) ;
}
if wl . get ( ) . is_empty ( ) {
wl . remove ( ) ;
}
}
}
}
}
}
}
@ -281,10 +300,7 @@ pub(super) unsafe fn wait<E: Element + PartialEq>(
// 10. Let block be buffer.[[ArrayBufferData]].
// 10. Let block be buffer.[[ArrayBufferData]].
// 11. Let WL be GetWaiterList(block, indexedPosition).
// 11. Let WL be GetWaiterList(block, indexedPosition).
// 12. Perform EnterCriticalSection(WL).
// 12. Perform EnterCriticalSection(WL).
let mut waiters = CRITICAL_SECTION . lock ( ) . map_err ( | _ | {
let mut waiters = sync ::FutexWaiters ::get ( ) ? ;
// avoids exposing internals of our implementation.
JsNativeError ::typ ( ) . with_message ( "failed to synchronize with the agent cluster" )
} ) ? ;
let time_info = timeout . map ( | timeout | ( Instant ::now ( ) , timeout ) ) ;
let time_info = timeout . map ( | timeout | ( Instant ::now ( ) , timeout ) ) ;
@ -307,7 +323,7 @@ pub(super) unsafe fn wait<E: Element + PartialEq>(
// 17. Perform AddWaiter(WL, W).
// 17. Perform AddWaiter(WL, W).
// ensure we can have aliased pointers to the waiter in a sound way.
// ensure we can have aliased pointers to the waiter in a sound way.
let waiter = UnsafeCell ::new ( FutexWaiter ::default ( ) ) ;
let waiter = UnsafeCell ::new ( sync ::FutexWaiter ::default ( ) ) ;
let waiter_ptr = waiter . get ( ) ;
let waiter_ptr = waiter . get ( ) ;
// SAFETY: waiter is valid and we call `remove_node` below.
// SAFETY: waiter is valid and we call `remove_node` below.
@ -385,10 +401,7 @@ pub(super) fn notify(buffer: &SharedArrayBuffer, offset: usize, count: u64) -> J
// 7. Let WL be GetWaiterList(block, indexedPosition).
// 7. Let WL be GetWaiterList(block, indexedPosition).
// 8. Perform EnterCriticalSection(WL).
// 8. Perform EnterCriticalSection(WL).
let mut waiters = CRITICAL_SECTION . lock ( ) . map_err ( | _ | {
let mut waiters = sync ::FutexWaiters ::get ( ) ? ;
// avoids exposing internals of our implementation.
JsNativeError ::typ ( ) . with_message ( "failed to synchronize with the agent cluster" )
} ) ? ;
// 9. Let S be RemoveWaiters(WL, c).
// 9. Let S be RemoveWaiters(WL, c).
// 10. For each element W of S, do
// 10. For each element W of S, do