Skip to content

Commit 8bd2e2e

Browse files
committed
Do not broadcast offers if the agreements is open (approved)
1 parent 369703e commit 8bd2e2e

File tree

8 files changed

+75
-95
lines changed

8 files changed

+75
-95
lines changed

core/market/src/db/model.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
mod agreement;
1+
pub(crate) mod agreement;
22
mod agreement_events;
33
mod demand;
44
mod negotiation_events;

core/market/src/identity.rs

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
use anyhow::anyhow;
2-
use async_trait::async_trait;
31
use serde::{Deserialize, Serialize};
42
use std::sync::Arc;
53

6-
use arkiv_sdk::{keccak256, signers::TransactionSigner, Address, Signature};
74
use ya_client::model::NodeId;
85
use ya_core_model::{
96
bus::GsbBindPoints,
@@ -133,43 +130,3 @@ impl IdentityGSB {
133130
})
134131
}
135132
}
136-
137-
pub struct YagnaIdSigner {
138-
pub identity_api: Arc<dyn IdentityApi>,
139-
pub node_id: NodeId,
140-
}
141-
142-
#[async_trait]
143-
impl TransactionSigner for YagnaIdSigner {
144-
fn address(&self) -> Address {
145-
Address::from(&self.node_id.into_array())
146-
}
147-
148-
async fn sign(&self, data: &[u8]) -> anyhow::Result<Signature> {
149-
let hash = keccak256(data);
150-
let identity_api = self.identity_api.clone();
151-
let node_id = self.node_id;
152-
153-
Ok(tokio::task::spawn_local(async move {
154-
let mut sig_bytes = identity_api.sign(&node_id, hash.as_ref()).await?;
155-
let v = sig_bytes[0];
156-
sig_bytes.append(&mut vec![v]);
157-
158-
anyhow::Ok(
159-
Signature::from_raw(&sig_bytes[1..])
160-
.map_err(|e| anyhow!("Failed to parse signature: {e}"))?,
161-
)
162-
})
163-
.await
164-
.map_err(|e| anyhow!("Failed to sign data: {e}"))??)
165-
}
166-
}
167-
168-
impl YagnaIdSigner {
169-
pub fn new(identity_api: Arc<dyn IdentityApi>, node_id: NodeId) -> Self {
170-
Self {
171-
identity_api,
172-
node_id,
173-
}
174-
}
175-
}

core/market/src/matcher.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,6 @@ impl Matcher {
318318
.await
319319
.ok();
320320

321-
322321
log::info!(
323322
"Unsubscribed Offer: [{}] using identity: {} [{}]",
324323
&offer_id,

core/market/src/matcher/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ pub enum DemandError {
1717
NotFound(SubscriptionId),
1818
#[error(transparent)]
1919
JsonObjectExpected(#[from] serde_json::error::Error),
20+
#[error("{0}")]
21+
Other(String),
2022
}
2123

2224
#[derive(thiserror::Error, Debug)]

core/market/src/matcher/resolver.rs

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
2-
2+
use tokio::time::Instant;
33
use ya_market_resolver::{match_demand_offer, Match};
44

55
use super::{error::ResolverError, RawProposal, SubscriptionStore};
@@ -84,12 +84,57 @@ impl Resolver {
8484
}
8585
Subscription::Demand(id) => {
8686
let demand = self.store.get_demand(id).await?;
87-
self.store
87+
88+
let perf_time = Instant::now();
89+
90+
let current_offers = self
91+
.store
8892
.get_offers_before(demand.insertion_ts.unwrap())
8993
.await?
9094
.into_iter()
91-
.filter(|offer| matches(offer, &demand))
92-
.for_each(|offer| self.emit_proposal(offer, demand.clone()));
95+
.filter(|offer| matches(offer, &demand));
96+
let agreements = match self.store.get_approved_agreements().await {
97+
Ok(agreements) => agreements,
98+
Err(e) => {
99+
return Err(ResolverError::Demand(e));
100+
}
101+
};
102+
103+
let mut proposals_to_emit = Vec::new();
104+
for offer in current_offers {
105+
//@todo check if offer is not used in open agreement
106+
if agreements.iter().any(|a| a.provider_id == offer.node_id) {
107+
log::info!(
108+
"Skipping offer from provider [{}] as it is used in approved agreement",
109+
offer.node_id
110+
);
111+
continue;
112+
}
113+
proposals_to_emit.push(offer);
114+
}
115+
let elapsed = perf_time.elapsed().as_secs_f64();
116+
117+
if elapsed > 0.1 {
118+
log::warn!(
119+
"Emitting {} proposals, preparation took {:.2} ms",
120+
proposals_to_emit.len(),
121+
elapsed * 1000.0
122+
);
123+
} else {
124+
log::info!(
125+
"Emitting {} proposals, preparation took {:.2} ms",
126+
proposals_to_emit.len(),
127+
elapsed * 1000.0
128+
);
129+
}
130+
for offer in proposals_to_emit {
131+
log::info!(
132+
"Emitting proposal for Demand [{}] and Offer [{}]",
133+
demand.id,
134+
offer.id
135+
);
136+
self.emit_proposal(offer, demand.clone());
137+
}
93138
}
94139
}
95140
Ok(())
@@ -98,6 +143,7 @@ impl Resolver {
98143
pub fn emit_proposal(&self, offer: Offer, demand: Demand) {
99144
let offer_id = offer.id.clone();
100145
let demand_id = demand.id.clone();
146+
log::info!("PROPOSAL: Offer [{}] <-> Demand [{}]", offer_id, demand_id);
101147
if let Err(e) = self.proposal_tx.send(RawProposal { offer, demand }) {
102148
log::warn!(
103149
"Emitting proposal for Offer [{}] and Demand [{}] error: {}",

core/market/src/matcher/store.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use chrono::{NaiveDateTime, Utc};
33
use std::collections::HashSet;
44
use std::sync::Arc;
55
use std::time::Duration;
6-
76
use ya_client::model::market::{Demand as ClientDemand, NewDemand, Offer as ClientOffer};
87
use ya_client::model::NodeId;
98
use ya_service_api_web::middleware::Identity;
@@ -19,6 +18,7 @@ use crate::matcher::error::{
1918
use crate::matcher::sync::{OfferPlaceholderGuard, OfferSync};
2019
use crate::negotiation::ScannerSet;
2120
use crate::protocol::discovery::message::{QueryOffers, QueryOffersResult};
21+
use crate::testing::AgreementState;
2222

2323
#[derive(Clone)]
2424
pub struct SubscriptionStore {
@@ -318,6 +318,22 @@ impl SubscriptionStore {
318318
}
319319
}
320320

321+
pub async fn get_approved_agreements(
322+
&self,
323+
) -> Result<Vec<crate::db::model::agreement::Agreement>, DemandError> {
324+
match self
325+
.db
326+
.as_dao::<AgreementDao>()
327+
.list(None, Some(AgreementState::Approved), None, None, None)
328+
.await
329+
{
330+
Ok(agreements) => Ok(agreements),
331+
Err(e) => Err(DemandError::Other(format!(
332+
"Failed to get Agreements from database {e}"
333+
))),
334+
}
335+
}
336+
321337
pub async fn get_client_demands(
322338
&self,
323339
node_id: Option<NodeId>,

core/market/src/protocol/discovery.rs

Lines changed: 4 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,23 @@
11
use anyhow::Result;
22
use chrono::{DateTime, Utc};
3-
use offer::GolemBaseOffer;
43
use std::collections::HashSet;
54
use std::env;
65
use std::sync::{Arc, Mutex};
76
use std::time::Duration;
87
use tokio::task::JoinHandle;
9-
use ya_service_bus::timeout::IntoTimeoutFuture;
108

119
use ya_client::model::NodeId;
1210
use ya_core_model::bus::GsbBindPoints;
13-
use ya_core_model::identity::event::IdentityEvent;
14-
use ya_core_model::identity::Error;
15-
use ya_core_model::market::{local, GetGolemBaseOffer, GetGolemBaseOfferResponse};
16-
use ya_core_model::market::{
17-
FundGolemBase, GetGolemBaseBalance, GolemBaseCommand, RpcMessageError,
18-
};
19-
use ya_service_bus::typed as bus;
2011

2112
use super::callback::HandlerSlot;
2213
use crate::config::DiscoveryConfig;
23-
use crate::db::model::{Offer as ModelOffer, SubscriptionId};
24-
use crate::identity::{IdentityApi, IdentityError, YagnaIdSigner};
14+
use crate::db::model::Offer as ModelOffer;
15+
use crate::identity::IdentityApi;
2516
use crate::protocol::discovery::error::*;
2617
use crate::protocol::discovery::message::*;
27-
use arkiv_sdk::client::ArkivClient;
28-
use arkiv_sdk::entity::Create;
29-
use arkiv_sdk::events::Event;
30-
use arkiv_sdk::rpc::{QueryOptions, SearchResult};
31-
use arkiv_sdk::signers::TransactionSigner;
32-
use arkiv_sdk::{Address, Hash};
33-
use rand::{thread_rng, Rng};
3418

3519
const ARKIV_CALLER: &str = "Arkiv";
3620

37-
// TODO: Get this value from node configuration
38-
const BLOCK_TIME_SECONDS: i64 = 2;
39-
4021
pub mod builder;
4122
pub mod error;
4223
pub mod faucet;
@@ -61,24 +42,11 @@ pub struct DiscoveryImpl {
6142
//arkiv: ArkivClient,
6243
offer_handlers: OfferHandlers,
6344
config: DiscoveryConfig,
64-
identities: Mutex<HashSet<NodeId>>,
45+
_identities: Mutex<HashSet<NodeId>>,
6546
websocket_task: Mutex<Option<JoinHandle<()>>>,
6647
}
6748

6849
impl Discovery {
69-
70-
/// Checks if an offer belongs to us based on metadata and entity_id
71-
fn _is_own_offer(&self, metadata: &SearchResult) -> bool {
72-
let Some(owner) = metadata.owner.as_ref() else {
73-
log::warn!("[Programming error] Entity metadata should contain owner!");
74-
return false;
75-
};
76-
77-
let identities = self.inner.identities.lock().unwrap();
78-
let owner_bytes = NodeId::from(owner.as_slice());
79-
identities.contains(&owner_bytes)
80-
}
81-
8250
async fn offers_events_loop(&self, _starting_block: u64) -> anyhow::Result<()> {
8351
let default_identity = self
8452
.inner
@@ -246,10 +214,9 @@ impl Discovery {
246214
Ok(())
247215
}
248216

249-
250217
/// Function doesn't bind any GSB handlers.
251218
/// It's only used to sync with GolemBase node and initialize Discovery struct state.
252-
pub async fn bind_gsb(&self, gsb: GsbBindPoints) -> Result<(), DiscoveryInitError> {
219+
pub async fn bind_gsb(&self, _gsb: GsbBindPoints) -> Result<(), DiscoveryInitError> {
253220
log::info!("Arkiv Configuration:");
254221
log::info!(" Network: {:?}", self.inner.config.get_network_type());
255222
log::info!(" RPC URL: {}", self.inner.config.get_rpc_url());
@@ -295,10 +262,6 @@ impl Discovery {
295262
Ok(())
296263
}
297264

298-
299-
300-
301-
302265
pub(crate) async fn get_last_bcast_ts(&self) -> DateTime<Utc> {
303266
Utc::now()
304267
}

core/market/src/protocol/discovery/builder.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
use std::any::{Any, TypeId};
22
use std::collections::HashMap;
33
use std::collections::HashSet;
4-
use std::ops::Div;
54
use std::sync::Arc;
65

76
use crate::protocol::callback::{CallbackFuture, OutputFuture};
87
use crate::protocol::callback::{CallbackHandler, CallbackMessage, HandlerSlot};
9-
use arkiv_sdk::client::{ArkivClient, TransactionConfig};
108

119
use super::error::DiscoveryInitError;
1210
use super::{Discovery, DiscoveryImpl};
@@ -76,14 +74,13 @@ impl DiscoveryBuilder {
7674
let config = self.config.clone().ok_or_else(|| {
7775
DiscoveryInitError::BuilderIncomplete("Configuration is required".to_string())
7876
})?;
79-
8077

8178
let discovery = Discovery {
8279
inner: Arc::new(DiscoveryImpl {
8380
identity: self.get_data(),
8481
offer_handlers,
8582
config: config.clone(),
86-
identities: std::sync::Mutex::new(HashSet::new()),
83+
_identities: std::sync::Mutex::new(HashSet::new()),
8784
websocket_task: std::sync::Mutex::new(None),
8885
}),
8986
};

0 commit comments

Comments
 (0)