1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
use std::collections::HashSet;
use crate::{
commons::channel::SenderEnd,
database::DB,
ledger::LedgerCommand,
message::{MessageConfig, MessageTaskCommand},
protocol::protocol_message_manager::TapleMessages,
DatabaseCollection, DigestIdentifier, KeyIdentifier,
};
use super::error::AuthorizedSubjectsError;
/// Structure that manages the pre-authorized subjects in a system and communicates with other components of the system through a message channel.
pub struct AuthorizedSubjects<C: DatabaseCollection> {
/// Object that handles the connection to the database.
database: DB<C>,
/// Message channel used to communicate with other system components.
message_channel: SenderEnd<MessageTaskCommand<TapleMessages>, ()>,
/// Unique identifier for the component using this structure.
our_id: KeyIdentifier,
}
impl<C: DatabaseCollection> AuthorizedSubjects<C> {
/// Creates a new instance of the `AuthorizedSubjects` structure.
///
/// # Arguments
///
/// * `database` - Database connection.
/// * `message_channel` - Message channel.
/// * `our_id` - Unique identifier.
pub fn new(
database: DB<C>,
message_channel: SenderEnd<MessageTaskCommand<TapleMessages>, ()>,
our_id: KeyIdentifier,
) -> Self {
Self {
database,
message_channel,
our_id,
}
}
/// Obtains all pre-authorized subjects and sends a message to the associated providers through the message channel.
///
/// # Errors
///
/// Returns an error if the preauthorized subjects cannot be obtained or if a message cannot be sent through the message channel.
pub async fn ask_for_all(&self) -> Result<(), AuthorizedSubjectsError> {
// We obtain all pre-authorized subjects from the database.
let preauthorized_subjects = match self
.database
.get_allowed_subjects_and_providers(None, 10000)
{
Ok(psp) => psp,
Err(error) => match error {
_ => return Err(AuthorizedSubjectsError::DatabaseError(error)),
},
};
// For each pre-authorized subject, we send a message to the associated providers through the message channel.
for (subject_id, providers) in preauthorized_subjects.into_iter() {
if !providers.is_empty() {
self.message_channel
.tell(MessageTaskCommand::Request(
None,
TapleMessages::LedgerMessages(LedgerCommand::GetLCE {
who_asked: self.our_id.clone(),
subject_id,
}),
providers.into_iter().collect(),
MessageConfig::direct_response(),
))
.await?;
}
}
Ok(())
}
/// Add a new pre-authorized subject and send a message to the associated suppliers through the message channel.
///
/// # Arguments
///
/// * `subject_id` - Identifier of the new pre-authorized subject.
/// * `providers` - Set of associated provider identifiers.
///
/// # Errors
///
/// Returns an error if a message cannot be sent through the message channel.
pub async fn new_authorized_subject(
&self,
subject_id: DigestIdentifier,
providers: HashSet<KeyIdentifier>,
) -> Result<(), AuthorizedSubjectsError> {
self.database
.set_preauthorized_subject_and_providers(&subject_id, providers.clone())?;
if !providers.is_empty() {
self.message_channel
.tell(MessageTaskCommand::Request(
None,
TapleMessages::LedgerMessages(LedgerCommand::GetLCE {
who_asked: self.our_id.clone(),
subject_id,
}),
providers.into_iter().collect(),
MessageConfig::direct_response(),
))
.await?;
}
Ok(())
}
}