1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
use tokio_util::sync::CancellationToken;

use crate::{
    commons::{
        channel::{ChannelData, MpscChannel, SenderEnd},
        self_signature_manager::SelfSignatureManager,
    },
    database::DB,
    governance::{GovernanceAPI, GovernanceUpdatedMessage},
    message::MessageTaskCommand,
    protocol::protocol_message_manager::TapleMessages,
    DatabaseCollection, Notification, Settings,
};

use super::{
    error::{DistributionErrorResponses, DistributionManagerError},
    inner_manager::InnerDistributionManager,
    DistributionMessagesNew,
};

pub struct DistributionManager<C: DatabaseCollection> {
    governance_update_input: tokio::sync::broadcast::Receiver<GovernanceUpdatedMessage>,
    input_channel: MpscChannel<DistributionMessagesNew, Result<(), DistributionErrorResponses>>,
    token: CancellationToken,
    notification_tx: tokio::sync::mpsc::Sender<Notification>,
    inner_manager: InnerDistributionManager<GovernanceAPI, C>,
}

impl<C: DatabaseCollection> DistributionManager<C> {
    pub fn new(
        input_channel: MpscChannel<DistributionMessagesNew, Result<(), DistributionErrorResponses>>,
        governance_update_input: tokio::sync::broadcast::Receiver<GovernanceUpdatedMessage>,
        token: CancellationToken,
        notification_tx: tokio::sync::mpsc::Sender<Notification>,
        messenger_channel: SenderEnd<MessageTaskCommand<TapleMessages>, ()>,
        gov_api: GovernanceAPI,
        signature_manager: SelfSignatureManager,
        settings: Settings,
        db: DB<C>,
    ) -> Self {
        Self {
            input_channel,
            governance_update_input,
            token,
            notification_tx,
            inner_manager: InnerDistributionManager::new(
                gov_api,
                db,
                messenger_channel,
                signature_manager,
                settings,
            ),
        }
    }

    pub async fn run(mut self) {
        if let Err(error) = self.inner_manager.init().await {
            log::error!("{}", error);
        }
        loop {
            tokio::select! {
                command = self.input_channel.receive() => {
                    match command {
                        Some(command) => {
                            let result = self.process_command(command).await;
                            if result.is_err() {
                                log::error!("{}", result.unwrap_err());
                                break;
                            }
                        }
                        None => {
                            break;
                        },
                    }
                },
                _ = self.token.cancelled() => {
                    log::debug!("Shutdown received");
                    break;
                },
                msg = self.governance_update_input.recv() => {
                    match msg {
                        Ok(data) => {
                            match data {
                                GovernanceUpdatedMessage::GovernanceUpdated{ governance_id, governance_version: _governance_version } => {
                                    if let Err(error) = self.inner_manager.governance_updated(&governance_id).await {
                                        log::error!("{}", error);
                                        break;
                                    }
                                }
                            }
                        },
                        Err(_) => {
                            break;
                        }
                    }
                }
            }
        }
        self.token.cancel();
        log::info!("Ended");
    }

    async fn process_command(
        &mut self,
        command: ChannelData<DistributionMessagesNew, Result<(), DistributionErrorResponses>>,
    ) -> Result<(), DistributionManagerError> {
        let (sender, data) = match command {
            ChannelData::AskData(data) => {
                let (sender, data) = data.get();
                (Some(sender), data)
            }
            ChannelData::TellData(data) => {
                let data = data.get();
                (None, data)
            }
        };

        let response = match data {
            DistributionMessagesNew::ProvideSignatures(data) => {
                self.inner_manager.provide_signatures(&data).await?
            }
            DistributionMessagesNew::SignaturesReceived(data) => {
                self.inner_manager.signatures_received(data).await?
            }
            DistributionMessagesNew::SignaturesNeeded { subject_id, sn } => {
                self.inner_manager
                    .start_distribution(super::StartDistribution { subject_id, sn })
                    .await?
            }
        };
        if sender.is_some() {
            sender
                .unwrap()
                .send(response)
                .map_err(|_| DistributionManagerError::ResponseChannelNotAvailable)?;
        }
        Ok(())
    }
}