use std::collections::HashSet;
use tokio::time::{interval, Duration};
use tokio_util::sync::CancellationToken;
use crate::database::Error as DbError;
use crate::Notification;
use crate::{
commons::channel::{ChannelData, MpscChannel, SenderEnd},
database::DB,
message::MessageTaskCommand,
protocol::protocol_message_manager::TapleMessages,
DatabaseCollection, DigestIdentifier, KeyIdentifier,
};
use super::{
authorized_subjects::AuthorizedSubjects, error::AuthorizedSubjectsError,
AuthorizedSubjectsCommand, AuthorizedSubjectsResponse,
};
#[derive(Clone, Debug)]
pub struct AuthorizedSubjectsAPI {
sender: SenderEnd<AuthorizedSubjectsCommand, AuthorizedSubjectsResponse>,
}
impl AuthorizedSubjectsAPI {
pub fn new(sender: SenderEnd<AuthorizedSubjectsCommand, AuthorizedSubjectsResponse>) -> Self {
Self { sender }
}
pub async fn new_authorized_subject(
&self,
subject_id: DigestIdentifier,
providers: HashSet<KeyIdentifier>,
) -> Result<(), AuthorizedSubjectsError> {
self.sender
.tell(AuthorizedSubjectsCommand::NewAuthorizedSubject {
subject_id,
providers,
})
.await?;
Ok(())
}
}
pub struct AuthorizedSubjectsManager<C: DatabaseCollection> {
input_channel: MpscChannel<AuthorizedSubjectsCommand, AuthorizedSubjectsResponse>,
inner_authorized_subjects: AuthorizedSubjects<C>,
token: CancellationToken,
notification_tx: tokio::sync::mpsc::Sender<Notification>,
}
impl<C: DatabaseCollection> AuthorizedSubjectsManager<C> {
pub fn new(
input_channel: MpscChannel<AuthorizedSubjectsCommand, AuthorizedSubjectsResponse>,
database: DB<C>,
message_channel: SenderEnd<MessageTaskCommand<TapleMessages>, ()>,
our_id: KeyIdentifier,
token: CancellationToken,
notification_tx: tokio::sync::mpsc::Sender<Notification>,
) -> Self {
Self {
input_channel,
inner_authorized_subjects: AuthorizedSubjects::new(database, message_channel, our_id),
token,
notification_tx,
}
}
pub async fn run(mut self) {
match self.inner_authorized_subjects.ask_for_all().await {
Ok(_) => {}
Err(AuthorizedSubjectsError::DatabaseError(DbError::EntryNotFound)) => {}
Err(error) => {
log::error!("{}", error);
self.token.cancel();
return;
}
};
let mut timer = interval(Duration::from_secs(15));
loop {
tokio::select! {
command = self.input_channel.receive() => {
match command {
Some(command) => {
let result = self.process_command(command).await;
if result.is_err() {
log::error!("{}", result.unwrap_err());
break;
}
}
None => {
break;
},
}
},
_ = timer.tick() => {
match self.inner_authorized_subjects.ask_for_all().await {
Ok(_) => {}
Err(AuthorizedSubjectsError::DatabaseError(DbError::EntryNotFound)) => {}
Err(error) => {
log::error!("{}", error);
break;
}
};
},
_ = self.token.cancelled() => {
log::debug!("Shutdown received");
break;
}
}
}
self.token.cancel();
log::info!("Ended");
}
async fn process_command(
&mut self,
command: ChannelData<AuthorizedSubjectsCommand, AuthorizedSubjectsResponse>,
) -> Result<(), AuthorizedSubjectsError> {
let (sender, data) = match command {
ChannelData::AskData(data) => {
let (sender, data) = data.get();
(Some(sender), data)
}
ChannelData::TellData(data) => {
let data = data.get();
(None, data)
}
};
let response = {
match data {
AuthorizedSubjectsCommand::NewAuthorizedSubject {
subject_id,
providers,
} => {
let response = self
.inner_authorized_subjects
.new_authorized_subject(subject_id, providers)
.await;
match response {
Ok(_) => {}
Err(error) => match error {
AuthorizedSubjectsError::DatabaseError(db_error) => match db_error {
crate::DbError::EntryNotFound => todo!(),
_ => return Err(AuthorizedSubjectsError::DatabaseError(db_error)),
},
_ => return Err(error),
},
}
AuthorizedSubjectsResponse::NoResponse
}
}
};
if sender.is_some() {
sender.unwrap().send(response).expect("Sender Dropped");
}
Ok(())
}
}