1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use futures::StreamExt;
use rmp_serde::Deserializer;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::io::Cursor;
use tokio::sync::mpsc::{self};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;

use crate::{commons::channel::SenderEnd, signature::Signed, KeyIdentifier, Notification};

use super::{MessageContent, TaskCommandContent};

#[derive(Debug)]
pub enum NetworkEvent {
    MessageReceived { message: Vec<u8> },
}

pub struct MessageReceiver<T>
where
    T: TaskCommandContent + Serialize + DeserializeOwned,
{
    receiver: ReceiverStream<NetworkEvent>,
    sender: SenderEnd<Signed<MessageContent<T>>, ()>,
    token: CancellationToken,
    notification_tx: tokio::sync::mpsc::Sender<Notification>,
    own_id: KeyIdentifier,
}

impl<T: TaskCommandContent + Serialize + DeserializeOwned + 'static> MessageReceiver<T> {
    pub fn new(
        receiver: mpsc::Receiver<NetworkEvent>,
        sender: SenderEnd<Signed<MessageContent<T>>, ()>,
        token: CancellationToken,
        notification_tx: tokio::sync::mpsc::Sender<Notification>,
        own_id: KeyIdentifier,
    ) -> Self {
        let receiver = ReceiverStream::new(receiver);
        Self {
            receiver,
            sender,
            token,
            notification_tx,
            own_id,
        }
    }

    pub async fn run(mut self) {
        loop {
            tokio::select! {
                event = self.receiver.next() => match event {
                    Some(NetworkEvent::MessageReceived { message }) => {
                        // The message will be a string for now
                        // Deserialize the message
                        let cur = Cursor::new(message);
                        let mut de = Deserializer::new(cur);
                        let message: Signed<MessageContent<T>> = Deserialize::deserialize(&mut de).expect("Fallo de deserialización");
                        // Check message signature
                        if message.verify().is_err() || message.content.sender_id != message.signature.signer {
                            log::error!("Invalid signature in message");
                        } else if message.content.receiver != self.own_id {
                            log::error!("Message not for me");
                        } else {
                            self.sender.tell(message).await.expect("Channel Error");
                        }
                    },
                    None => {}
                },
                _ = self.token.cancelled() => {
                    log::debug!("Shutdown received");
                    break;
                }
            }
        }
        self.token.cancel();
        log::info!("Ended");
    }
}