1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
//! [`Sender`] part of a stream.
use std::{marker::PhantomData, mem::size_of};
use bytes::{BufMut, Bytes, BytesMut};
use futures_util::StreamExt;
use quinn::{SendStream, VarInt};
use serde::Serialize;
use super::Task;
use crate::error;
/// Used to send data to a stream.
#[derive(Clone, Debug)]
pub struct Sender<T: Serialize> {
/// Send [`Serialize`]d data to the sending task.
sender: flume::Sender<Bytes>,
/// Holds the type to [`Serialize`] too.
_type: PhantomData<T>,
/// [`Task`] handle that does the sending into the stream.
task: Task<Result<(), error::Sender>, Message>,
}
/// Messages sent to the [`Sender`] task.
#[derive(Clone, Debug)]
enum Message {
/// Data to be sent.
Data(Bytes),
/// Tell [`Task`] to finish the [`Sender`] part of the stream and close it.
Finish,
/// Tell [`Task`] to close the [`Sender`].
Close,
}
impl<T: Serialize> Sender<T> {
/// Builds a new [`Sender`] from a raw [`quinn`] type. Spawns a task that
/// sends data into the stream.
pub(super) fn new(mut stream_sender: SendStream) -> Self {
// sender channels
let (sender, receiver) = flume::unbounded();
// `Task` handling `Sender`
let task = Task::new(|mut shutdown| async move {
let mut receiver = receiver.into_stream().fuse();
while let Some(message) = futures_util::select_biased! {
message = receiver.next() => message.map(Message::Data),
shutdown = shutdown => shutdown.ok(),
complete => None,
} {
match message {
Message::Data(bytes) => stream_sender.write_chunk(bytes).await?,
Message::Finish => {
stream_sender.finish().await?;
break;
}
Message::Close => {
stream_sender
.reset(VarInt::from_u32(0))
.map_err(|_error| error::AlreadyClosed)?;
break;
}
}
}
Ok(())
});
Self {
sender,
_type: PhantomData,
task,
}
}
/// Send `data` into the stream.
///
/// # Errors
/// - [`error::Sender::Serialize`] if `data` failed to be serialized
/// - [`error::Sender::Write`] if the [`Sender`] failed to to write to the
/// stream
/// - [`error::Sender::Closed`] if the [`Sender`] is closed
pub fn send(&self, data: &T) -> Result<(), error::Sender> {
self.send_any(data)
}
/// Send any `data` into the stream. This will fail on the receiving end if
/// not decoded into the proper type.
///
/// # Errors
/// - [`error::Sender::Serialize`] if `data` failed to be serialized
/// - [`error::Sender::Write`] if the [`Sender`] failed to to write to the
/// stream
/// - [`error::Sender::Closed`] if the [`Sender`] is closed
pub(super) fn send_any<A: Serialize>(&self, data: &A) -> Result<(), error::Sender> {
let mut bytes = BytesMut::new();
// get size
let len = bincode::serialized_size(&data)?;
// reserve an appropriate amount of space
bytes.reserve(
usize::try_from(len)
.expect("not a 64-bit system")
.checked_add(size_of::<u64>())
.expect("data trying to be sent is too big"),
);
// insert length first, this enables framing
bytes.put_u64_le(len);
let mut bytes = bytes.writer();
// serialize `data` into `bytes`
bincode::serialize_into(&mut bytes, &data)?;
// send data to task
let bytes = bytes.into_inner().freeze();
// make sure that our length is correct
debug_assert_eq!(
u64::try_from(bytes.len()).expect("not a 64-bit system"),
u64::try_from(size_of::<u64>())
.expect("not a 64-bit system")
.checked_add(len)
.expect("message to long")
);
// if the sender task has been dropped, return it's error
if self.sender.send(bytes).is_err() {
// TODO: configurable executor
futures_executor::block_on(async { (&self.task).await })?
} else {
Ok(())
}
}
/// Shut down the [`Send`] part of the stream gracefully.
///
/// No new data may be written after calling this method. Completes when the
/// peer has acknowledged all sent data, retransmitting data as needed.
///
/// # Errors
/// This can only return [`error::Sender::Closed`] as an [`Err`], if it was
/// already closed, but if the [`Sender`] failed to write to the stream it
/// will return a queued [`error::Sender::Write`].
pub async fn finish(&self) -> Result<(), error::Sender> {
self.task.close(Message::Finish).await?
}
/// Close the [`Sender`] immediately.
///
/// To close a [`Sender`] gracefully use [`Sender::finish`].
///
/// # Errors
/// This can only return [`error::Sender::Closed`] as an [`Err`], if it was
/// already closed, but if the [`Sender`] failed to write to the stream it
/// will return a queued [`error::Sender::Write`].
pub async fn close(&self) -> Result<(), error::Sender> {
self.task.close(Message::Close).await?
}
}