#[cfg(feature = "aproval")]
use crate::approval::manager::{ApprovalAPI, ApprovalManager};
#[cfg(feature = "aproval")]
use crate::approval::{ApprovalMessages, ApprovalResponses};
use crate::authorized_subjecs::manager::{AuthorizedSubjectsAPI, AuthorizedSubjectsManager};
use crate::authorized_subjecs::{AuthorizedSubjectsCommand, AuthorizedSubjectsResponse};
use crate::commons::channel::MpscChannel;
use crate::commons::config::NetworkSettings;
use crate::commons::config::{NodeSettings, TapleSettings};
use crate::commons::crypto::{
Ed25519KeyPair, KeyGenerator, KeyMaterial, KeyPair, Secp256k1KeyPair,
};
use crate::commons::identifier::derive::KeyDerivator;
use crate::commons::identifier::{Derivable, KeyIdentifier};
use crate::commons::models::notification::Notification;
use crate::commons::self_signature_manager::{SelfSignatureInterface, SelfSignatureManager};
use crate::database::{DatabaseCollection, DatabaseManager, DB};
use crate::distribution::error::DistributionErrorResponses;
use crate::distribution::manager::DistributionManager;
use crate::distribution::DistributionMessagesNew;
#[cfg(feature = "evaluation")]
use crate::evaluator::{EvaluatorManager, EvaluatorMessage, EvaluatorResponse};
use crate::event::manager::{EventAPI, EventManager};
use crate::event::{EventCommand, EventResponse};
use crate::governance::GovernanceAPI;
use crate::governance::{governance::Governance, GovernanceMessage, GovernanceResponse};
use crate::ledger::manager::EventManagerAPI;
use crate::ledger::{manager::LedgerManager, LedgerCommand, LedgerResponse};
use crate::message::{
MessageContent, MessageReceiver, MessageSender, MessageTaskCommand, MessageTaskManager,
NetworkEvent,
};
use crate::network::network::{NetworkProcessor, SendMode};
use crate::protocol::protocol_message_manager::{ProtocolManager, TapleMessages};
use crate::signature::Signed;
#[cfg(feature = "validation")]
use crate::validation::manager::ValidationManager;
#[cfg(feature = "validation")]
use crate::validation::{ValidationCommand, ValidationResponse};
use crate::ListenAddr;
use futures::future::BoxFuture;
use futures::FutureExt;
use libp2p::{Multiaddr, PeerId};
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::broadcast::error::{RecvError, TryRecvError};
use crate::api::{APICommands, ApiResponses, NodeAPI, API};
use crate::error::Error;
const BUFFER_SIZE: usize = 1000;
pub fn get_default_settings() -> TapleSettings {
TapleSettings {
network: NetworkSettings {
listen_addr: vec![ListenAddr::default()],
known_nodes: Vec::<String>::new(),
external_address: vec![],
},
node: NodeSettings {
key_derivator: KeyDerivator::Ed25519,
secret_key: Option::<String>::None,
digest_derivator:
crate::commons::identifier::derive::digest::DigestDerivator::Blake3_256,
replication_factor: 0.25f64,
timeout: 3000u32,
passvotation: 0,
#[cfg(feature = "evaluation")]
smartcontracts_directory: "./contracts".into(),
},
}
}
pub struct NotificationHandler {
notification_receiver: tokio::sync::broadcast::Receiver<Notification>,
}
impl NotificationHandler {
pub fn receive<'a>(&'a mut self) -> BoxFuture<'a, Result<Notification, Error>> {
async move {
loop {
match self.notification_receiver.recv().await {
Ok(value) => break Ok(value),
Err(RecvError::Lagged(_)) => continue,
Err(RecvError::Closed) => break Err(Error::CantReceiveNotification),
}
}
}
.boxed()
}
pub fn try_rec(&mut self) -> Result<Notification, Error> {
loop {
match self.notification_receiver.try_recv() {
Ok(value) => break Ok(value),
Err(TryRecvError::Lagged(_)) => continue,
Err(TryRecvError::Closed) => break Err(Error::CantReceiveNotification),
Err(TryRecvError::Empty) => break Err(Error::NoNewNotification),
}
}
}
}
pub struct TapleShutdownManager {
shutdown_sender: tokio::sync::broadcast::Sender<()>,
shutdown_receiver: tokio::sync::broadcast::Receiver<()>,
}
impl TapleShutdownManager {
pub(crate) fn new(sender: tokio::sync::broadcast::Sender<()>) -> Self {
Self {
shutdown_receiver: sender.subscribe(),
shutdown_sender: sender,
}
}
pub fn get_raw_receiver(&self) -> tokio::sync::broadcast::Receiver<()> {
self.shutdown_sender.subscribe()
}
pub fn get_raw_sender(&self) -> tokio::sync::broadcast::Sender<()> {
self.shutdown_sender.clone()
}
pub async fn wait_for_shutdown(mut self) {
loop {
match self.shutdown_receiver.recv().await {
Err(RecvError::Lagged(_)) => continue,
_ => break,
}
}
}
pub async fn shutdown(mut self) {
self.shutdown_sender.send(()).unwrap();
drop(self.shutdown_sender);
loop {
match self.shutdown_receiver.recv().await {
Err(RecvError::Closed) => break,
_ => continue,
}
}
}
}
#[derive(Debug)]
pub struct Taple<M: DatabaseManager<C>, C: DatabaseCollection> {
api: NodeAPI,
peer_id: Option<PeerId>,
controller_id: Option<String>,
public_key: Option<Vec<u8>>,
api_input: Option<MpscChannel<APICommands, ApiResponses>>,
notification_sender: tokio::sync::broadcast::Sender<Notification>,
settings: TapleSettings,
database: Option<M>,
shutdown_sender: Option<tokio::sync::broadcast::Sender<()>>,
_shutdown_receiver: tokio::sync::broadcast::Receiver<()>,
_c: PhantomData<C>,
}
impl<M: DatabaseManager<C> + 'static, C: DatabaseCollection + 'static> Taple<M, C> {
pub fn peer_id(&self) -> Option<PeerId> {
self.peer_id.clone()
}
pub fn public_key(&self) -> Option<Vec<u8>> {
self.public_key.clone()
}
pub fn controller_id(&self) -> Option<String> {
self.controller_id.clone()
}
pub fn get_api(&self) -> NodeAPI {
self.api.clone()
}
pub fn get_notification_handler(&self) -> NotificationHandler {
NotificationHandler {
notification_receiver: self.notification_sender.subscribe(),
}
}
pub fn get_shutdown_manager(&self) -> TapleShutdownManager {
TapleShutdownManager::new(self.shutdown_sender.as_ref().unwrap().clone())
}
fn generate_mc(&mut self, stored_public_key: Option<String>) -> Result<KeyPair, Error> {
let kp = Self::create_key_pair(
&self.settings.node.key_derivator,
None,
self.settings.node.secret_key.clone(),
)?;
let public_key = kp.public_key_bytes();
let key_identifier = KeyIdentifier::new(kp.get_key_derivator(), &public_key).to_str();
if let Some(key) = stored_public_key {
if key_identifier != key {
log::error!("Invalid MC specified. There is a previous defined MC in the system");
return Err(Error::InvalidKeyPairSpecified(key_identifier));
}
}
self.controller_id = Some(key_identifier);
self.public_key = Some(public_key);
Ok(kp)
}
pub fn new(settings: TapleSettings, database: M) -> Self {
let (api_input, api_sender) = MpscChannel::new(BUFFER_SIZE);
let (sender, _) = tokio::sync::broadcast::channel(BUFFER_SIZE);
let (bsx, brx) = tokio::sync::broadcast::channel::<()>(10);
let api = NodeAPI { sender: api_sender };
Self {
api,
peer_id: None,
public_key: None,
controller_id: None,
api_input: Some(api_input),
notification_sender: sender,
settings,
database: Some(database),
shutdown_sender: Some(bsx),
_shutdown_receiver: brx,
_c: PhantomData::default(),
}
}
pub async fn start(&mut self) -> Result<(), Error> {
let shutdown_sender = self.shutdown_sender.take().unwrap();
let (sender_network, receiver_network): (
tokio::sync::mpsc::Sender<NetworkEvent>,
tokio::sync::mpsc::Receiver<NetworkEvent>,
) = tokio::sync::mpsc::channel(BUFFER_SIZE);
let (event_receiver, event_sender) =
MpscChannel::<EventCommand, EventResponse>::new(BUFFER_SIZE);
let (ledger_receiver, ledger_sender) =
MpscChannel::<LedgerCommand, LedgerResponse>::new(BUFFER_SIZE);
let (as_receiver, as_sender) =
MpscChannel::<AuthorizedSubjectsCommand, AuthorizedSubjectsResponse>::new(BUFFER_SIZE);
let (governance_receiver, governance_sender) =
MpscChannel::<GovernanceMessage, GovernanceResponse>::new(BUFFER_SIZE);
let (governance_update_sx, governance_update_rx) =
tokio::sync::broadcast::channel(BUFFER_SIZE);
let (task_receiver, task_sender) =
MpscChannel::<MessageTaskCommand<TapleMessages>, ()>::new(BUFFER_SIZE);
let (protocol_receiver, protocol_sender) =
MpscChannel::<Signed<MessageContent<TapleMessages>>, ()>::new(BUFFER_SIZE);
let (distribution_receiver, distribution_sender) = MpscChannel::<
DistributionMessagesNew,
Result<(), DistributionErrorResponses>,
>::new(BUFFER_SIZE);
#[cfg(feature = "aproval")]
let (approval_receiver, approval_sender) =
MpscChannel::<ApprovalMessages, ApprovalResponses>::new(BUFFER_SIZE);
#[cfg(feature = "evaluation")]
let (evaluation_receiver, evaluation_sender) =
MpscChannel::<EvaluatorMessage, EvaluatorResponse>::new(BUFFER_SIZE);
#[cfg(feature = "validation")]
let (validation_receiver, validation_sender) =
MpscChannel::<ValidationCommand, ValidationResponse>::new(BUFFER_SIZE);
let (wath_sender, _watch_receiver): (
tokio::sync::watch::Sender<TapleSettings>,
tokio::sync::watch::Receiver<TapleSettings>,
) = tokio::sync::watch::channel(self.settings.clone());
let db = self.database.take().unwrap();
let db = Arc::new(db);
let db_access = DB::new(db.clone());
let stored_public_key = db_access.get_controller_id().ok();
let kp = self.generate_mc(stored_public_key)?;
db_access
.set_controller_id(self.controller_id().unwrap())
.map_err(|e| Error::DatabaseError(e.to_string()))?;
let public_key = kp.public_key_bytes();
let key_identifier = KeyIdentifier::new(kp.get_key_derivator(), &public_key);
let network_manager = NetworkProcessor::new(
self.settings.network.listen_addr.clone(),
network_access_points(&self.settings.network.known_nodes)?, sender_network,
kp.clone(),
shutdown_sender.subscribe(),
external_addresses(&self.settings.network.external_address)?,
)
.await
.expect("Error en creación de la capa de red");
self.peer_id = Some(network_manager.local_peer_id().to_owned());
let signature_manager = SelfSignatureManager::new(kp.clone(), &self.settings);
let network_receiver = MessageReceiver::new(
receiver_network,
protocol_sender,
shutdown_sender.subscribe(),
signature_manager.get_own_identifier(),
);
let network_sender = MessageSender::new(
network_manager.client(),
key_identifier.clone(),
signature_manager.clone(),
);
let mut task_manager = MessageTaskManager::new(
network_sender.clone(),
task_receiver,
shutdown_sender.clone(),
shutdown_sender.subscribe(),
);
let protocol_manager = ProtocolManager::new(
protocol_receiver,
distribution_sender.clone(),
#[cfg(feature = "evaluation")]
evaluation_sender.clone(),
#[cfg(feature = "validation")]
validation_sender.clone(),
event_sender.clone(),
#[cfg(feature = "aproval")]
approval_sender.clone(),
ledger_sender.clone(),
shutdown_sender.clone(),
);
let mut governance_manager = Governance::<M, C>::new(
governance_receiver,
shutdown_sender.clone(),
shutdown_sender.subscribe(),
DB::new(db.clone()),
governance_update_sx.clone(),
);
let event_manager = EventManager::new(
event_receiver,
governance_update_rx,
GovernanceAPI::new(governance_sender.clone()),
DB::new(db.clone()),
shutdown_sender.clone(),
shutdown_sender.subscribe(),
task_sender.clone(),
self.notification_sender.clone(),
ledger_sender.clone(),
signature_manager.get_own_identifier(),
signature_manager.clone(),
);
let ledger_manager = LedgerManager::new(
ledger_receiver,
shutdown_sender.clone(),
shutdown_sender.subscribe(),
self.notification_sender.clone(),
GovernanceAPI::new(governance_sender.clone()),
DB::new(db.clone()),
task_sender.clone(),
distribution_sender,
key_identifier.clone(),
);
let as_manager = AuthorizedSubjectsManager::new(
as_receiver,
DB::new(db.clone()),
task_sender.clone(),
key_identifier.clone(),
shutdown_sender.clone(),
shutdown_sender.subscribe(),
);
let api = API::new(
self.api_input.take().unwrap(),
EventAPI::new(event_sender),
#[cfg(feature = "aproval")]
ApprovalAPI::new(approval_sender),
AuthorizedSubjectsAPI::new(as_sender),
EventManagerAPI::new(ledger_sender),
wath_sender,
shutdown_sender.clone(),
shutdown_sender.subscribe(),
DB::new(db.clone()),
);
#[cfg(feature = "evaluation")]
let evaluator_manager = EvaluatorManager::new(
evaluation_receiver,
db.clone(),
signature_manager.clone(),
governance_update_sx.subscribe(),
shutdown_sender.clone(),
shutdown_sender.subscribe(),
GovernanceAPI::new(governance_sender.clone()),
self.settings.node.smartcontracts_directory.clone(),
task_sender.clone(),
);
#[cfg(feature = "aproval")]
let approval_manager = ApprovalManager::new(
GovernanceAPI::new(governance_sender.clone()),
approval_receiver,
shutdown_sender.clone(),
shutdown_sender.subscribe(),
task_sender.clone(),
governance_update_sx.subscribe(),
signature_manager.clone(),
self.notification_sender.clone(),
self.settings.clone(),
DB::new(db.clone()),
);
let distribution_manager = DistributionManager::new(
distribution_receiver,
governance_update_sx.subscribe(),
shutdown_sender.clone(),
shutdown_sender.subscribe(),
task_sender.clone(),
GovernanceAPI::new(governance_sender.clone()),
signature_manager.clone(),
self.settings.clone(),
DB::new(db.clone()),
);
#[cfg(feature = "validation")]
let validation_manager = ValidationManager::new(
validation_receiver,
GovernanceAPI::new(governance_sender),
DB::new(db.clone()),
signature_manager,
shutdown_sender.clone(),
shutdown_sender.subscribe(),
task_sender,
);
tokio::spawn(async move {
governance_manager.start().await;
});
tokio::spawn(async move {
ledger_manager.start().await;
});
tokio::spawn(async move {
event_manager.start().await;
});
tokio::spawn(async move {
task_manager.start().await;
});
tokio::spawn(async move {
protocol_manager.start().await;
});
tokio::spawn(async move {
network_receiver.run().await;
});
#[cfg(feature = "evaluation")]
tokio::spawn(async move {
evaluator_manager.start().await;
});
#[cfg(feature = "validation")]
tokio::spawn(async move {
validation_manager.start().await;
});
tokio::spawn(async move {
distribution_manager.start().await;
});
#[cfg(feature = "aproval")]
tokio::spawn(async move {
approval_manager.start().await;
});
tokio::spawn(async move {
as_manager.start().await;
});
tokio::spawn(network_manager.run());
tokio::spawn(async move {
api.start().await;
});
Ok(())
}
fn create_key_pair(
derivator: &KeyDerivator,
seed: Option<String>,
current_key: Option<String>,
) -> Result<KeyPair, Error> {
let mut counter: u32 = 0;
if seed.is_some() {
counter += 1
};
if current_key.is_some() {
counter += 2
};
if counter == 2 {
let str_key = current_key.unwrap();
match derivator {
KeyDerivator::Ed25519 => Ok(KeyPair::Ed25519(Ed25519KeyPair::from_secret_key(
&hex::decode(str_key).map_err(|_| Error::InvalidHexString)?,
))),
KeyDerivator::Secp256k1 => {
Ok(KeyPair::Secp256k1(Secp256k1KeyPair::from_secret_key(
&hex::decode(str_key).map_err(|_| Error::InvalidHexString)?,
)))
}
}
} else if counter == 1 {
match derivator {
KeyDerivator::Ed25519 => Ok(KeyPair::Ed25519(
crate::commons::crypto::Ed25519KeyPair::from_seed(seed.unwrap().as_bytes()),
)),
KeyDerivator::Secp256k1 => Ok(KeyPair::Secp256k1(
crate::commons::crypto::Secp256k1KeyPair::from_seed(seed.unwrap().as_bytes()),
)),
}
} else if counter == 3 {
Err(Error::PkConflict)
} else {
Err(Error::NoMCAvailable)
}
}
}
fn network_access_points(points: &[String]) -> Result<Vec<(PeerId, Multiaddr)>, Error> {
let mut access_points: Vec<(PeerId, Multiaddr)> = Vec::new();
for point in points {
let data: Vec<&str> = point.split("/p2p/").collect();
if data.len() != 2 {
return Err(Error::AcessPointError(point.to_string()));
}
if let Some(value) = multiaddr(point) {
if let Ok(id) = data[1].parse::<PeerId>() {
access_points.push((id, value));
} else {
return Err(Error::AcessPointError(format!(
"Invalid PeerId conversion: {}",
point
)));
}
} else {
return Err(Error::AcessPointError(format!(
"Invalid MultiAddress conversion: {}",
point
)));
}
}
Ok(access_points)
}
fn external_addresses(addresses: &[String]) -> Result<Vec<Multiaddr>, Error> {
let mut external_addresses: Vec<Multiaddr> = Vec::new();
for address in addresses {
if let Some(value) = multiaddr(address) {
external_addresses.push(value);
} else {
return Err(Error::AcessPointError(format!(
"Invalid MultiAddress conversion in External Address: {}",
address
)));
}
}
Ok(external_addresses)
}
fn multiaddr(addr: &str) -> Option<Multiaddr> {
match addr.parse::<Multiaddr>() {
Ok(a) => Some(a),
Err(_) => None,
}
}