use std::collections::HashSet;
use super::{
error::{APIInternalError, ApiError},
inner_api::InnerAPI,
APICommands, ApiResponses, GetAllowedSubjects,
};
use super::{GetEvents, GetGovernanceSubjects};
#[cfg(feature = "aproval")]
use crate::approval::manager::ApprovalAPI;
use crate::commons::models::request::TapleRequest;
use crate::commons::models::state::SubjectData;
use crate::commons::{
channel::{ChannelData, MpscChannel, SenderEnd},
config::TapleSettings,
};
use crate::event::manager::EventAPI;
use crate::ledger::manager::EventManagerAPI;
use crate::signature::Signature;
#[cfg(feature = "aproval")]
use crate::ApprovalEntity;
use crate::ValidationProof;
use crate::{
authorized_subjecs::manager::AuthorizedSubjectsAPI, signature::Signed, Event, EventRequest,
};
use crate::{identifier::DigestIdentifier, DatabaseCollection, DB};
use crate::{KeyDerivator, KeyIdentifier};
use async_trait::async_trait;
use tokio::sync::watch::Sender;
#[async_trait]
pub trait ApiModuleInterface {
async fn external_request(
&self,
event_request: Signed<EventRequest>,
) -> Result<DigestIdentifier, ApiError>;
async fn get_subjects(
&self,
namespace: String,
from: Option<String>,
quantity: Option<i64>,
) -> Result<Vec<SubjectData>, ApiError>;
async fn get_governances(
&self,
namespace: String,
from: Option<String>,
quantity: Option<i64>,
) -> Result<Vec<SubjectData>, ApiError>;
async fn get_subjects_by_governance(
&self,
governance_id: DigestIdentifier,
from: Option<String>,
quantity: Option<i64>,
) -> Result<Vec<SubjectData>, ApiError>;
async fn get_events(
&self,
subject_id: DigestIdentifier,
from: Option<i64>,
quantity: Option<i64>,
) -> Result<Vec<Signed<Event>>, ApiError>;
async fn get_event(
&self,
subject_id: DigestIdentifier,
sn: u64,
) -> Result<Signed<Event>, ApiError>;
async fn get_subject(&self, subject_id: DigestIdentifier) -> Result<SubjectData, ApiError>;
async fn shutdown(self) -> Result<(), ApiError>;
#[cfg(feature = "aproval")]
async fn approval_request(
&self,
request_id: DigestIdentifier,
acceptance: bool,
) -> Result<ApprovalEntity, ApiError>;
#[cfg(feature = "aproval")]
async fn get_pending_requests(&self) -> Result<Vec<ApprovalEntity>, ApiError>;
#[cfg(feature = "aproval")]
async fn get_single_request(&self, id: DigestIdentifier) -> Result<ApprovalEntity, ApiError>;
async fn add_preauthorize_subject(
&self,
subject_id: &DigestIdentifier,
providers: &HashSet<KeyIdentifier>,
) -> Result<(), ApiError>;
async fn get_all_allowed_subjects_and_providers(
&self,
from: Option<String>,
quantity: Option<i64>,
) -> Result<Vec<(DigestIdentifier, HashSet<KeyIdentifier>)>, ApiError>;
async fn add_keys(&self, derivator: KeyDerivator) -> Result<KeyIdentifier, ApiError>;
async fn get_validation_proof(
&self,
subject_id: DigestIdentifier,
) -> Result<(HashSet<Signature>, ValidationProof), ApiError>;
async fn get_request(&self, request_id: DigestIdentifier) -> Result<TapleRequest, ApiError>;
async fn get_governance_subjects(
&self,
governance_id: DigestIdentifier,
from: Option<String>,
quantity: Option<i64>,
) -> Result<Vec<SubjectData>, ApiError>;
#[cfg(feature = "aproval")]
async fn get_approval(&self, request_id: DigestIdentifier) -> Result<ApprovalEntity, ApiError>;
#[cfg(feature = "aproval")]
async fn get_approvals(
&self,
state: Option<crate::ApprovalState>,
from: Option<String>,
quantity: Option<i64>,
) -> Result<Vec<ApprovalEntity>, ApiError>;
}
#[derive(Clone, Debug)]
pub struct NodeAPI {
pub(crate) sender: SenderEnd<APICommands, ApiResponses>,
}
#[async_trait]
impl ApiModuleInterface for NodeAPI {
async fn get_request(&self, request_id: DigestIdentifier) -> Result<TapleRequest, ApiError> {
let response = self
.sender
.ask(APICommands::GetRequest(request_id))
.await
.unwrap();
if let ApiResponses::GetRequest(data) = response {
data
} else {
unreachable!()
}
}
async fn external_request(
&self,
event_request: Signed<EventRequest>,
) -> Result<DigestIdentifier, ApiError> {
let response = self
.sender
.ask(APICommands::ExternalRequest(event_request))
.await
.unwrap();
if let ApiResponses::HandleExternalRequest(data) = response {
data
} else {
unreachable!()
}
}
#[cfg(feature = "aproval")]
async fn get_pending_requests(&self) -> Result<Vec<ApprovalEntity>, ApiError> {
let response = self
.sender
.ask(APICommands::GetPendingRequests)
.await
.unwrap();
if let ApiResponses::GetPendingRequests(data) = response {
data
} else {
unreachable!()
}
}
#[cfg(feature = "aproval")]
async fn get_single_request(&self, id: DigestIdentifier) -> Result<ApprovalEntity, ApiError> {
let response = self
.sender
.ask(APICommands::GetSingleRequest(id))
.await
.unwrap();
if let ApiResponses::GetSingleRequest(data) = response {
data
} else {
unreachable!()
}
}
async fn get_subjects(
&self,
namespace: String,
from: Option<String>,
quantity: Option<i64>,
) -> Result<Vec<SubjectData>, ApiError> {
let response = self
.sender
.ask(APICommands::GetSubjects(super::GetSubjects {
namespace,
from,
quantity,
}))
.await
.unwrap();
if let ApiResponses::GetSubjects(data) = response {
data
} else {
unreachable!()
}
}
async fn get_subjects_by_governance(
&self,
governance_id: DigestIdentifier,
from: Option<String>,
quantity: Option<i64>,
) -> Result<Vec<SubjectData>, ApiError> {
let response = self
.sender
.ask(APICommands::GetSubjectByGovernance(
super::GetSubjects {
namespace: "".into(),
from,
quantity,
},
governance_id,
))
.await
.unwrap();
if let ApiResponses::GetSubjectByGovernance(data) = response {
data
} else {
unreachable!()
}
}
async fn get_governances(
&self,
namespace: String,
from: Option<String>,
quantity: Option<i64>,
) -> Result<Vec<SubjectData>, ApiError> {
let response = self
.sender
.ask(APICommands::GetGovernances(super::GetSubjects {
namespace,
from,
quantity,
}))
.await
.unwrap();
if let ApiResponses::GetGovernances(data) = response {
data
} else {
unreachable!()
}
}
async fn get_event(
&self,
subject_id: DigestIdentifier,
sn: u64,
) -> Result<Signed<Event>, ApiError> {
let response = self
.sender
.ask(APICommands::GetEvent(subject_id, sn))
.await
.unwrap();
if let ApiResponses::GetEvent(data) = response {
data
} else {
unreachable!()
}
}
async fn get_events(
&self,
subject_id: DigestIdentifier,
from: Option<i64>,
quantity: Option<i64>,
) -> Result<Vec<Signed<Event>>, ApiError> {
let response = self
.sender
.ask(APICommands::GetEvents(GetEvents {
subject_id,
from,
quantity,
}))
.await
.unwrap();
if let ApiResponses::GetEvents(data) = response {
data
} else {
unreachable!()
}
}
async fn get_subject(&self, subject_id: DigestIdentifier) -> Result<SubjectData, ApiError> {
let response = self
.sender
.ask(APICommands::GetSubject(super::GetSubject { subject_id }))
.await
.unwrap();
if let ApiResponses::GetSubject(data) = response {
data
} else {
unreachable!()
}
}
#[cfg(feature = "aproval")]
async fn approval_request(
&self,
request_id: DigestIdentifier,
acceptance: bool,
) -> Result<ApprovalEntity, ApiError> {
let response = self
.sender
.ask(APICommands::VoteResolve(acceptance, request_id))
.await
.unwrap();
if let ApiResponses::VoteResolve(data) = response {
data
} else {
unreachable!()
}
}
async fn shutdown(self) -> Result<(), ApiError> {
let response = self.sender.ask(APICommands::Shutdown).await.unwrap();
if let ApiResponses::ShutdownCompleted = response {
Ok(())
} else {
unreachable!()
}
}
async fn add_preauthorize_subject(
&self,
subject_id: &DigestIdentifier,
providers: &HashSet<KeyIdentifier>,
) -> Result<(), ApiError> {
let response = self
.sender
.ask(APICommands::SetPreauthorizedSubject(
subject_id.clone(),
providers.clone(),
))
.await
.unwrap();
if let ApiResponses::SetPreauthorizedSubjectCompleted = response {
Ok(())
} else {
unreachable!()
}
}
async fn get_all_allowed_subjects_and_providers(
&self,
from: Option<String>,
quantity: Option<i64>,
) -> Result<Vec<(DigestIdentifier, HashSet<KeyIdentifier>)>, ApiError> {
let response = self
.sender
.ask(APICommands::GetAllPreauthorizedSubjects(
GetAllowedSubjects { from, quantity },
))
.await
.unwrap();
if let ApiResponses::GetAllPreauthorizedSubjects(data) = response {
data
} else {
unreachable!()
}
}
async fn add_keys(&self, derivator: KeyDerivator) -> Result<KeyIdentifier, ApiError> {
let response = self
.sender
.ask(APICommands::AddKeys(derivator))
.await
.unwrap();
if let ApiResponses::AddKeys(data) = response {
data
} else {
unreachable!()
}
}
async fn get_validation_proof(
&self,
subject_id: DigestIdentifier,
) -> Result<(HashSet<Signature>, ValidationProof), ApiError> {
let response = self
.sender
.ask(APICommands::GetValidationProof(subject_id))
.await
.unwrap();
if let ApiResponses::GetValidationProof(data) = response {
data
} else {
unreachable!()
}
}
async fn get_governance_subjects(
&self,
governance_id: DigestIdentifier,
from: Option<String>,
quantity: Option<i64>,
) -> Result<Vec<SubjectData>, ApiError> {
let response = self
.sender
.ask(APICommands::GetGovernanceSubjects(GetGovernanceSubjects {
governance_id,
from,
quantity,
}))
.await
.unwrap();
if let ApiResponses::GetGovernanceSubjects(data) = response {
data
} else {
unreachable!()
}
}
#[cfg(feature = "aproval")]
async fn get_approval(&self, request_id: DigestIdentifier) -> Result<ApprovalEntity, ApiError> {
let response = self
.sender
.ask(APICommands::GetApproval(request_id))
.await
.unwrap();
if let ApiResponses::GetApproval(data) = response {
data
} else {
unreachable!()
}
}
#[cfg(feature = "aproval")]
async fn get_approvals(
&self,
state: Option<crate::ApprovalState>,
from: Option<String>,
quantity: Option<i64>,
) -> Result<Vec<ApprovalEntity>, ApiError> {
let response = self
.sender
.ask(APICommands::GetApprovals(super::GetApprovals {
state,
from,
quantity,
}))
.await
.unwrap();
if let ApiResponses::GetApprovals(data) = response {
data
} else {
unreachable!()
}
}
}
pub struct API<C: DatabaseCollection> {
input: MpscChannel<APICommands, ApiResponses>,
_settings_sender: Sender<TapleSettings>,
inner_api: InnerAPI<C>,
shutdown_sender: Option<tokio::sync::broadcast::Sender<()>>,
shutdown_receiver: tokio::sync::broadcast::Receiver<()>,
}
impl<C: DatabaseCollection> API<C> {
pub fn new(
input: MpscChannel<APICommands, ApiResponses>,
event_api: EventAPI,
#[cfg(feature = "aproval")] approval_api: ApprovalAPI,
authorized_subjects_api: AuthorizedSubjectsAPI,
ledger_api: EventManagerAPI,
settings_sender: Sender<TapleSettings>,
shutdown_sender: tokio::sync::broadcast::Sender<()>,
shutdown_receiver: tokio::sync::broadcast::Receiver<()>,
db: DB<C>,
) -> Self {
Self {
input,
_settings_sender: settings_sender,
inner_api: InnerAPI::new(
event_api,
authorized_subjects_api,
db,
#[cfg(feature = "aproval")]
approval_api,
ledger_api,
),
shutdown_sender: Some(shutdown_sender),
shutdown_receiver: shutdown_receiver,
}
}
pub async fn start(mut self) {
let mut response_channel = None;
loop {
tokio::select! {
msg = self.input.receive() => {
let must_shutdown = if msg.is_none() {
true
} else {
let result = self.process_input(msg.unwrap()).await;
if result.is_err() {
true
} else {
let response = result.unwrap();
if response.is_some() {
response_channel = response;
true
} else {
false
}
}
};
if must_shutdown {
log::error!("must shutdown before unwrap");
let sender = self.shutdown_sender.take().unwrap();
sender.send(()).expect("Shutdown Channel Closed");
drop(sender);
_ = self.shutdown_receiver.recv().await;
if response_channel.is_some() {
let response_channel = response_channel.unwrap();
let _ = response_channel.send(ApiResponses::ShutdownCompleted);
}
break;
}
},
_ = self.shutdown_receiver.recv() => {
break;
}
}
}
}
async fn process_input(
&mut self,
input: ChannelData<APICommands, ApiResponses>,
) -> Result<Option<tokio::sync::oneshot::Sender<ApiResponses>>, APIInternalError> {
match input {
ChannelData::AskData(data) => {
let (sx, command) = data.get();
let response = match command {
APICommands::Shutdown => {
return Ok(Some(sx));
}
APICommands::GetSubjects(data) => self.inner_api.get_all_subjects(data),
APICommands::GetGovernances(data) => {
self.inner_api.get_all_governances(data).await
}
APICommands::GetEvents(data) => {
self.inner_api.get_events_of_subject(data).await
}
APICommands::GetSubject(data) => self.inner_api.get_single_subject(data).await,
APICommands::GetRequest(request_id) => {
self.inner_api.get_request(request_id).await
}
APICommands::GetEvent(subject_id, sn) => {
self.inner_api.get_event(subject_id, sn)
}
#[cfg(feature = "aproval")]
APICommands::VoteResolve(acceptance, id) => {
self.inner_api.emit_vote(id, acceptance).await?
}
#[cfg(feature = "aproval")]
APICommands::GetPendingRequests => self.inner_api.get_pending_request().await,
#[cfg(feature = "aproval")]
APICommands::GetSingleRequest(data) => {
self.inner_api.get_single_request(data).await
}
APICommands::ExternalRequest(event_request) => {
let response = self.inner_api.handle_external_request(event_request).await;
response?
}
APICommands::SetPreauthorizedSubject(subject_id, providers) => {
self.inner_api
.set_preauthorized_subject(subject_id, providers)
.await?
}
APICommands::AddKeys(derivator) => {
self.inner_api.generate_keys(derivator).await?
}
APICommands::GetValidationProof(subject_id) => {
self.inner_api.get_validation_proof(subject_id).await
}
APICommands::GetGovernanceSubjects(data) => {
self.inner_api.get_governance_subjects(data).await
}
#[cfg(feature = "aproval")]
APICommands::GetApproval(request_id) => {
self.inner_api.get_approval(request_id).await
}
#[cfg(feature = "aproval")]
APICommands::GetApprovals(get_approvals) => {
self.inner_api
.get_approvals(
get_approvals.state,
get_approvals.from,
get_approvals.quantity,
)
.await
}
APICommands::GetAllPreauthorizedSubjects(data) => {
self.inner_api
.get_all_preauthorized_subjects_and_providers(data)
.await?
}
APICommands::GetSubjectByGovernance(params, gov_id) => {
self.inner_api.get_subjects_by_governance(params, gov_id)
}
};
sx.send(response)
.map_err(|_| APIInternalError::OneshotUnavailable)?;
}
ChannelData::TellData(_data) => {
panic!("Tell in API")
}
}
Ok(None)
}
}