Lines
97.24 %
Functions
92.81 %
Branches
100 %
#![doc = include_str!(".crate-docs.md")]
#![forbid(unsafe_code)]
#![warn(
clippy::cargo,
missing_docs,
// clippy::missing_docs_in_private_items,
clippy::pedantic,
future_incompatible,
rust_2018_idioms,
)]
#![allow(clippy::option_if_let_else, clippy::module_name_repetitions)]
use std::{
ops::{Deref, DerefMut},
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::Poll,
time::{Duration, Instant},
};
use event_listener::{Event, EventListener};
use futures_util::{FutureExt, Stream};
use parking_lot::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
/// A watchable wrapper for a value.
#[derive(Default, Debug)]
pub struct Watchable<T> {
data: Arc<Data<T>>,
}
impl<T> Clone for Watchable<T> {
fn clone(&self) -> Self {
self.data.watchables.fetch_add(1, Ordering::AcqRel);
Self {
data: self.data.clone(),
impl<T> Drop for Watchable<T> {
fn drop(&mut self) {
if self.data.watchables.fetch_sub(1, Ordering::AcqRel) == 1 {
// Last watchable
self.shutdown();
impl<T> Watchable<T> {
/// Returns a new instance with the initial value provided.
pub fn new(initial_value: T) -> Self {
data: Arc::new(Data {
value: RwLock::new(initial_value),
changed: RwLock::new(Some(Event::new())),
version: AtomicUsize::new(0),
watchers: AtomicUsize::new(0),
watchables: AtomicUsize::new(1),
}),
/// Returns a new watcher that can monitor for changes to the contained
/// value.
pub fn watch(&self) -> Watcher<T> {
self.data.watchers.fetch_add(1, Ordering::AcqRel);
Watcher {
version: AtomicUsize::new(self.data.current_version()),
watched: self.data.clone(),
/// Replaces the current value contained and notifies all watching
/// [`Watcher`]s. Returns the previously stored value.
pub fn replace(&self, new_value: T) -> T {
let mut stored = self.data.value.write();
let mut old_value = new_value;
std::mem::swap(&mut *stored, &mut old_value);
self.data.increment_version();
old_value
/// Updates the current value, if it is different from the contained value.
/// Returns `Ok(previous_value)` if `new_value != previous_value`, otherwise
/// returns `Err(new_value)`.
///
/// # Errors
/// Returns `Err(new_value)` if the currently stored value is equal to `new_value`.
pub fn update(&self, new_value: T) -> Result<T, T>
where
T: PartialEq,
{
let stored = self.data.value.upgradable_read();
if *stored == new_value {
Err(new_value)
} else {
let mut stored = RwLockUpgradableReadGuard::upgrade(stored);
Ok(old_value)
/// Returns a write guard that allows updating the value. If the inner value
/// is accessed through [`DerefMut::deref_mut()`], all [`Watcher`]s will be
/// notified when the returned guard is dropped.
/// [`WatchableWriteGuard`] holds an exclusive lock. No other threads will
/// be able to read or write the contained value until the guard is dropped.
pub fn write(&self) -> WatchableWriteGuard<'_, T> {
WatchableWriteGuard {
watchable: self,
guard: self.data.value.write(),
accessed_mut: false,
/// Returns a guard which can be used to access the value held within the
/// variable. This guard does not block other threads from reading the
pub fn read(&self) -> WatchableReadGuard<'_, T> {
WatchableReadGuard(self.data.value.read())
/// Returns the currently contained value.
#[must_use]
pub fn get(&self) -> T
T: Clone,
self.data.value.read().clone()
/// Returns the number of [`Watcher`]s for this value.
pub fn watchers(&self) -> usize {
self.data.watchers.load(Ordering::Acquire)
/// Returns true if there are any [`Watcher`]s for this value.
pub fn has_watchers(&self) -> bool {
self.watchers() > 0
/// Disconnects all [`Watcher`]s.
/// All future value updates will not be observed by the watchers, but the
/// last value will still be readable before the watcher signals that it is
/// disconnected.
pub fn shutdown(&self) {
let mut changed = self.data.changed.write();
if let Some(changed) = changed.take() {
changed.notify(usize::MAX);
impl<T> Data<T> {
fn current_version(&self) -> usize {
self.version.load(Ordering::Acquire)
fn increment_version(&self) {
self.version.fetch_add(1, Ordering::AcqRel);
let changed = self.changed.read();
if let Some(changed) = changed.as_ref() {
/// A read guard that allows reading the currently stored value in a
/// [`Watchable`]. No values can be stored within the source [`Watchable`] while
/// this guard exists.
/// The inner value is accessible through [`Deref`].
pub struct WatchableReadGuard<'a, T>(RwLockReadGuard<'a, T>);
impl<'a, T> Deref for WatchableReadGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
/// A write guard that allows updating the currently stored value in a
/// [`Watchable`].
/// The inner value is readable through [`Deref`], and modifiable through
/// [`DerefMut`]. Any usage of [`DerefMut`] will cause all [`Watcher`]s to be
/// notified of an updated value when the guard is dropped.
/// [`WatchableWriteGuard`] is an exclusive guard. No other threads will be
/// able to read or write the contained value until the guard is dropped.
pub struct WatchableWriteGuard<'a, T> {
watchable: &'a Watchable<T>,
accessed_mut: bool,
guard: RwLockWriteGuard<'a, T>,
impl<'a, T> Deref for WatchableWriteGuard<'a, T> {
&self.guard
impl<'a, T> DerefMut for WatchableWriteGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.accessed_mut = true;
&mut self.guard
impl<'a, T> Drop for WatchableWriteGuard<'a, T> {
if self.accessed_mut {
self.watchable.data.increment_version();
#[derive(Debug)]
struct Data<T> {
changed: RwLock<Option<Event>>,
version: AtomicUsize,
watchers: AtomicUsize,
watchables: AtomicUsize,
value: RwLock<T>,
impl<T> Default for Data<T>
T: Default,
fn default() -> Self {
value: RwLock::default(),
/// An observer of a [`Watchable`] value.
/// ## Cloning behavior
/// Cloning a watcher also clones the current watching state. If the watcher
/// hasn't read the value currently stored, the cloned instance will also
/// consider the current value unread.
pub struct Watcher<T> {
watched: Arc<Data<T>>,
impl<T> Drop for Watcher<T> {
self.watched.watchers.fetch_sub(1, Ordering::AcqRel);
impl<T> Clone for Watcher<T> {
version: AtomicUsize::new(self.version.load(Ordering::Relaxed)),
watched: self.watched.clone(),
enum CreateListenerError {
NewValueAvailable,
Disconnected,
/// A watch operation failed because all [`Watchable`] instances have been
/// dropped.
#[derive(Debug, thiserror::Error, Eq, PartialEq)]
#[error("all watchable instances have been dropped")]
pub struct Disconnected;
/// A watch operation with a timeout failed.
pub enum TimeoutError {
/// No new values were written before the timeout elapsed
#[error("no new values were written before the timeout elapsed")]
Timeout,
impl<T> Watcher<T> {
fn create_listener_if_needed(&self) -> Result<Pin<Box<EventListener>>, CreateListenerError> {
let changed = self.watched.changed.read();
match (changed.as_ref(), self.is_current()) {
(_, false) => Err(CreateListenerError::NewValueAvailable),
(None, _) => Err(CreateListenerError::Disconnected),
(Some(changed), true) => {
let listener = changed.listen();
// Between now and creating the listener, an update may have
// come in, so we need to check again before returning the
// listener.
if self.is_current() {
Ok(listener)
Err(CreateListenerError::NewValueAvailable)
/// Returns true if the latest value has been read from this instance.
pub fn is_current(&self) -> bool {
self.version.load(Ordering::Relaxed) == self.watched.current_version()
/// Updates this instance's state to reflect that it has read the currently
/// stored value. The next call to a watch call will block until the next
/// value is stored.
/// Returns true if the internal state was updated, and false if no changes
/// were necessary.
pub fn mark_read(&self) -> bool {
let current_version = self.watched.current_version();
let mut stored_version = self.version.load(Ordering::Acquire);
while stored_version < current_version {
match self.version.compare_exchange(
stored_version,
current_version,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => return true,
Err(new_stored) => stored_version = new_stored,
false
/// Watches for a new value to be stored in the source [`Watchable`]. If the
/// current value hasn't been accessed through [`Self::read()`] or marked
/// read with [`Self::mark_read()`], this call will block the calling
/// thread until a new value has been published.
/// Returns [`Disconnected`] if all instances of [`Watchable`] have been
/// dropped and the current value has been read.
pub fn watch(&self) -> Result<(), Disconnected> {
loop {
match self.create_listener_if_needed() {
Ok(mut listener) => {
listener.as_mut().wait();
if !self.is_current() {
break;
Err(CreateListenerError::Disconnected) => return Err(Disconnected),
Err(CreateListenerError::NewValueAvailable) => break,
Ok(())
/// thread until a new value has been published or until `duration` has
/// elapsed.
/// - [`TimeoutError::Disconnected`]: All instances of [`Watchable`] have
/// been dropped and the current value has been read.
/// - [`TimeoutError::Timeout`]: A timeout occurred before a new value was
/// written.
pub fn watch_timeout(&self, duration: Duration) -> Result<(), TimeoutError> {
self.watch_until(Instant::now() + duration)
/// thread until a new value has been published or until `deadline`.
pub fn watch_until(&self, deadline: Instant) -> Result<(), TimeoutError> {
if listener.as_mut().wait_deadline(deadline).is_some() {
} else if Instant::now() < deadline {
// Spurious wake-up
return Err(TimeoutError::Timeout);
Err(CreateListenerError::Disconnected) => return Err(TimeoutError::Disconnected),
/// read with [`Self::mark_read()`], the async task will block until
/// a new value has been published.
pub async fn watch_async(&self) -> Result<(), Disconnected> {
Ok(listener) => {
listener.await;
/// Returns a read guard that allows reading the currently stored value.
/// This function does not consider the value read, and the next call to a
/// watch function will be unaffected.
pub fn peek(&self) -> WatchableReadGuard<'_, T> {
let guard = self.watched.value.read();
WatchableReadGuard(guard)
/// This function marks the stored value as read, ensuring that the next
/// call to a watch function will block until the a new value has been
/// published.
self.version
.store(self.watched.current_version(), Ordering::Relaxed);
/// Returns the currently contained value. This function marks the stored
/// value as read, ensuring that the next call to a watch function will
/// block until the a new value has been published.
self.read().clone()
/// Watches for a new value to be stored in the source [`Watchable`] and
/// returns a clone of it. If the current value hasn't been accessed through
/// [`Self::read()`] or marked read with [`Self::mark_read()`], this call
/// will block the calling thread until a new value has been published.
pub fn next_value(&self) -> Result<T, Disconnected>
self.watch().map(|()| self.read().clone())
/// will asynchronously wait for a new value to be published.
/// The async task is safe to be cancelled without losing track of the last
/// read value.
pub async fn next_value_async(&self) -> Result<T, Disconnected>
self.watch_async().await.map(|()| self.read().clone())
/// Returns this watcher in a type that implements [`Stream`].
pub fn into_stream(self) -> WatcherStream<T> {
WatcherStream {
watcher: self,
listener: None,
impl<T> Iterator for Watcher<T>
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.next_value().ok()
/// Asynchronous iterator for a [`Watcher`]. Implements [`Stream`].
pub struct WatcherStream<T> {
watcher: Watcher<T>,
listener: Option<Pin<Box<EventListener>>>,
impl<T> WatcherStream<T> {
/// Returns the wrapped [`Watcher`].
pub fn into_inner(self) -> Watcher<T> {
self.watcher
impl<T> Stream for WatcherStream<T>
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
// If we have a listener or we have already read the current value, we
// need to poll the listener as a future first.
match self
.listener
.take()
.ok_or(CreateListenerError::Disconnected)
.or_else(|_| self.watcher.create_listener_if_needed())
match listener.poll_unpin(cx) {
Poll::Ready(()) => {
if !self.watcher.is_current() {
// A new value is available. Fall through.
Poll::Pending => {
// The listener wasn't ready, store it again.
self.listener = Some(listener);
return Poll::Pending;
Err(CreateListenerError::Disconnected) => return Poll::Ready(None),
Poll::Ready(Some(self.watcher.read().clone()))
#[test]
fn basics() {
let watchable = Watchable::new(1_u32);
assert!(!watchable.has_watchers());
let watcher1 = watchable.watch();
let watcher2 = watchable.watch();
assert!(!watcher1.mark_read());
assert_eq!(watchable.watchers(), 2);
assert_eq!(watchable.replace(2), 1);
// A call to watch should not block since the value has already been sent
watcher1.watch().unwrap();
// Peek shouldn't cause the watcher to block.
assert_eq!(*watcher1.peek(), 2);
// Reading should switch the state back to needing to block, which we'll
// test in an other unit test
assert_eq!(*watcher1.read(), 2);
drop(watcher1);
assert_eq!(watchable.watchers(), 1);
// Now, despite watcher1 having updated, watcher2 should have independent state
assert!(watcher2.mark_read());
assert_eq!(*watcher2.read(), 2);
drop(watcher2);
assert_eq!(watchable.watchers(), 0);
fn accessing_values() {
let watchable = Watchable::new(String::from("hello"));
assert_eq!(watchable.get(), "hello");
assert_eq!(&*watchable.read(), "hello");
assert_eq!(&*watchable.write(), "hello");
let watcher = watchable.watch();
assert_eq!(watcher.get(), "hello");
assert_eq!(&*watcher.read(), "hello");
#[allow(clippy::redundant_clone)]
fn clones() {
let watchable = Watchable::default();
let cloned_watchable = watchable.clone();
let watcher2 = watcher1.clone();
watchable.replace(1);
assert_eq!(watcher1.next_value().unwrap(), 1);
assert_eq!(watcher2.next_value().unwrap(), 1);
cloned_watchable.replace(2);
assert_eq!(watcher1.next_value().unwrap(), 2);
assert_eq!(watcher2.next_value().unwrap(), 2);
fn drop_watchable() {
watchable.replace(1_u32);
assert_eq!(watcher.next_value().unwrap(), 1);
drop(watchable);
assert!(matches!(watcher.next_value().unwrap_err(), Disconnected));
fn drop_watchable_timeouts() {
let watchable = Watchable::new(0_u8);
let start = Instant::now();
let wait_timeout_thread = std::thread::spawn(move || {
assert!(matches!(
watcher.watch_timeout(Duration::from_secs(15)).unwrap_err(),
TimeoutError::Disconnected
));
});
let wait_until_thread = std::thread::spawn(move || {
watcher
.watch_until(Instant::now().checked_add(Duration::from_secs(15)).unwrap())
.unwrap_err(),
// Give time for the threads to spawn.
std::thread::sleep(Duration::from_millis(100));
wait_timeout_thread.join().unwrap();
wait_until_thread.join().unwrap();
let elapsed = Instant::now().checked_duration_since(start).unwrap();
assert!(elapsed.as_secs() < 1);
fn timeouts() {
watcher.watch_timeout(Duration::from_millis(100)),
Err(TimeoutError::Timeout)
watcher.watch_until(Instant::now() + Duration::from_millis(100)),
// We don't control the delay logic, so to ensure this test is stable, we're
// comparing against a duration slightly less than 200 ms even though in
// theory that shouldn't be possible.
assert!(elapsed.as_millis() >= 180);
// Test that watch_timeout/until return true when a new event is available
watchable.replace(2);
watcher.watch_timeout(Duration::from_secs(1)).unwrap();
watchable.replace(3);
.watch_until(Instant::now() + Duration::from_secs(1))
.unwrap();
fn deref_publish() {
// Reading the value (Deref) shouldn't publish a new value
let write_guard = watchable.write();
assert_eq!(*write_guard, 1);
assert!(!watcher.mark_read());
// Writing a value (DerefMut) should publish a new value
let mut write_guard = watchable.write();
*write_guard = 2;
assert!(watcher.mark_read());
fn blocking_tests() {
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let worker_thread = std::thread::spawn(move || {
watcher.watch().unwrap();
assert_eq!(*watcher.read(), 2);
sender.send(()).unwrap();
*watcher.read()
// Wait for the thread to perform its read.
receiver.recv().unwrap();
assert!(watchable.update(42).is_ok());
assert!(watchable.update(42).is_err());
assert_eq!(worker_thread.join().unwrap(), 42);
fn iterator_test() {
let mut last_value = watcher.next_value().unwrap();
for value in watcher {
assert_ne!(last_value, value);
println!("Received {value}");
last_value = value;
assert_eq!(last_value, 1000);
for i in 1..=1000 {
watchable.replace(i);
worker_thread.join().unwrap();
#[cfg(test)]
#[tokio::test(flavor = "multi_thread")]
async fn stream_test() {
use futures_util::StreamExt;
let worker_thread = tokio::task::spawn(async move {
let mut last_value = watcher.next_value_async().await.unwrap();
let mut stream = watcher.into_stream();
while let Some(value) = stream.next().await {
// Ensure it's safe to call next again with no blocking and no panics.
assert!(stream.next().await.is_none());
// Convert back to a normal watcher and check that the state still
// matches.
let watcher = stream.into_inner();
if i % 100 == 0 {
tokio::time::sleep(Duration::from_millis(10)).await;
// Allow the stream to end
// Wait for the task to finish.
worker_thread.await.unwrap();
fn stress_test() {
let mut workers = Vec::new();
for _ in 1..=10 {
workers.push(std::thread::spawn(move || {
let mut last_value = *watcher.read();
while watcher.watch().is_ok() {
let current_value = *watcher.read();
assert_ne!(last_value, current_value);
last_value = current_value;
assert_eq!(last_value, 10000);
}));
for i in 1..=10000 {
let _ = watchable.update(i);
for worker in workers {
worker.join().unwrap();
async fn stress_test_async() {
for _ in 1..=64 {
workers.push(tokio::task::spawn(async move {
watcher.watch_async().await.unwrap();
if current_value == 10000 {
tokio::task::spawn_blocking(move || {
})
.await
worker.await.unwrap();
fn shutdown() {
let watchable = Watchable::new(0);
// Set a new value, then shutdown
watchable.shutdown();
// The value should still be accessible
assert_eq!(watcher.next_value().expect("initial value missing"), 1);
.next_value()
.expect_err("watcher should be disconnected");