1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
//! The communication streams between [`Agent`](crate::agent::Agent)s.
//!
//! These are currently implemented by using an in-memory buffer.
//!
//! One might ask why we want two channels. There two very practical reasons
//! for this. Note that these are advantages for the implementation and are not
//! strictly required from a theoretical point of view.
//!
//! * Having two buffers resembles how networking works in reality: Each computer has an input and
//! an output buffer. In case of TCP the input buffer can become full and therefore the
//! transmission is throttled.
//! * It is beneficial to model each agent with two buffers according to the Single-responsibility
//! principle. When sending or receiving data each agent only has to look at its own two buffer.
//! If each agent had only one buffer, then you would need to read from another agent which has
//! the data you want. Or if you design it the other way around you would need to write to the
//! buffer of the agent to which you want to send data.
//!
//! The [`Agent`](crate::agent::Agent) Alice can add data to the *inbound channel* of Bob.
//! Bob can then read the data from his *inbound channel* and put data in his *outbound channel*.
//! If Bob is an [`Agent`](crate::agent::Agent), which has an underlying *PUT state* then OpenSSL
//! may write into the *outbound channel* of Bob.
use std::io::{self, Read, Write};
use crate::algebra::ConcreteMessage;
use crate::codec::Codec;
use crate::error::Error;
use crate::protocol::ProtocolBehavior;
pub trait Stream<PB: ProtocolBehavior> {
fn add_to_inbound(&mut self, message: &ConcreteMessage);
/// Takes a single TLS message from the outbound channel
fn take_message_from_outbound(
&mut self,
) -> Result<Option<PB::OpaqueProtocolMessageFlight>, Error>;
}
/// Describes in- or outbound channels of an [`crate::agent::Agent`].
///
/// Each [`crate::agent::Agent`] can send and receive data. This is modeled by two separate Channels
/// in [`MemoryStream`]. Internally a Channel is just an in-memory seekable buffer.
pub type Channel = io::Cursor<Vec<u8>>;
/// A `MemoryStream` has two [`Channel`]s. The Stream also implements the [`Write`] and [`Read`]
/// trait.
/// * When writing to a `MemoryStream` its outbound channel gets filled.
/// * When reading from a `MemoryStream` data is taken from the inbound channel.
///
/// This makes it possible for an [`crate::agent::Agent`] to treat a [`MemoryStream`] like a TLS
/// socket! By writing to this socket you are sending data out. By reading from it you receive data.
///
/// **Note: There need to be two separate buffer! Else for example a TLS socket would read and write
/// into the same buffer**
#[derive(Default, Debug)]
pub struct MemoryStream {
inbound: Channel,
outbound: Channel,
}
impl MemoryStream {
#[must_use]
pub fn new() -> Self {
Self {
inbound: io::Cursor::new(Vec::new()),
outbound: io::Cursor::new(Vec::new()),
}
}
}
impl<PB: ProtocolBehavior> Stream<PB> for MemoryStream {
fn add_to_inbound(&mut self, message: &ConcreteMessage) {
message.encode(self.inbound.get_mut());
}
fn take_message_from_outbound(
&mut self,
) -> Result<Option<PB::OpaqueProtocolMessageFlight>, Error> {
let flight =
PB::OpaqueProtocolMessageFlight::read_bytes(self.outbound.get_ref().as_slice());
self.outbound.set_position(0);
self.outbound.get_mut().clear();
Ok(flight)
}
}
impl Read for MemoryStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n = self.inbound.read(buf)?;
// Clear as soon as we read all data
if self.inbound.position() == self.inbound.get_ref().len() as u64 {
self.inbound.set_position(0);
self.inbound.get_mut().clear();
}
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::WouldBlock,
"no data available",
));
}
Ok(n)
}
}
impl Write for MemoryStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.outbound.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}