1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
//! The communication streams between [`Agent`](crate::agent::Agent)s.
//!
//! These are currently implemented by using an in-memory buffer.
//! One might ask why we want two channels. There two very practical reasons
//! for this. Note that these are advantages for the implementation and are not
//! strictly required from a theoretical point of view.
//!
//! * Having two buffers resembles how networking works in reality: Each computer has an input and
//!   an output buffer. In case of TCP the input buffer can become full and therefore the
//!   transmission is throttled.
//! * It is beneficial to model each agent with two buffers according to the Single-responsibility
//!   principle. When sending or receiving data each agent only has to look at its own two buffer.
//!   If each agent had only one buffer, then you would need to read from another agent which has
//!   the data you want. Or if you design it the other way around you would need to write to the
//!   buffer of the agent to which you want to send data.
//!
//! The [`Agent`](crate::agent::Agent) Alice can add data to the *inbound channel* of Bob.
//! Bob can then read the data from his *inbound channel* and put data in his *outbound channel*.
//! If Bob is an [`Agent`](crate::agent::Agent), which has an underlying *PUT state* then OpenSSL
//! may write into the *outbound channel* of Bob.

use std::io::{self, Read, Write};

use crate::codec::Codec;
use crate::error::Error;
use crate::protocol::ProtocolBehavior;

pub trait Stream<PB: ProtocolBehavior> {
    fn add_to_inbound(&mut self, message_flight: &PB::OpaqueProtocolMessageFlight);

    /// Takes a single TLS message from the outbound channel
    fn take_message_from_outbound(
        &mut self,
    ) -> Result<Option<PB::OpaqueProtocolMessageFlight>, Error>;
}

/// Describes in- or outbound channels of an [`crate::agent::Agent`].
///
/// Each [`crate::agent::Agent`] can send and receive data. This is modeled by two separate Channels
/// in [`MemoryStream`]. Internally a Channel is just an in-memory seekable buffer.
pub type Channel = io::Cursor<Vec<u8>>;

/// A MemoryStream has two [`Channel`]s. The Stream also implements the [`Write`] and [`Read`]
/// trait.
/// * When writing to a MemoryStream its outbound channel gets filled.
/// * When reading from a MemoryStream data is taken from the inbound channel.
///
/// This makes it possible for an [`crate::agent::Agent`] to treat a [`MemoryStream`] like a TLS
/// socket! By writing to this socket you are sending data out. By reading from it you receive data.
///
/// **Note: There need to be two separate buffer! Else for example a TLS socket would read and write
/// into the same buffer**
#[derive(Default, Debug)]
pub struct MemoryStream {
    inbound: Channel,
    outbound: Channel,
}

impl MemoryStream {
    pub fn new() -> Self {
        Self {
            inbound: io::Cursor::new(Vec::new()),
            outbound: io::Cursor::new(Vec::new()),
        }
    }
}

impl<PB: ProtocolBehavior> Stream<PB> for MemoryStream {
    fn add_to_inbound(&mut self, message_flight: &PB::OpaqueProtocolMessageFlight) {
        message_flight.encode(self.inbound.get_mut());
    }

    fn take_message_from_outbound(
        &mut self,
    ) -> Result<Option<PB::OpaqueProtocolMessageFlight>, Error> {
        let flight =
            PB::OpaqueProtocolMessageFlight::read_bytes(&mut self.outbound.get_ref().as_slice());
        self.outbound.set_position(0);
        self.outbound.get_mut().clear();

        Ok(flight)
    }
}

impl Read for MemoryStream {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let n = self.inbound.read(buf)?;

        // Clear as soon as we read all data
        if self.inbound.position() == self.inbound.get_ref().len() as u64 {
            self.inbound.set_position(0);
            self.inbound.get_mut().clear();
        }
        if n == 0 {
            return Err(io::Error::new(
                io::ErrorKind::WouldBlock,
                "no data available",
            ));
        }
        Ok(n)
    }
}

impl Write for MemoryStream {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        self.outbound.write(buf)
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}