Skip to content

Commit 9dc2b7f

Browse files
committed
Integration with offer server
1 parent 2cf87c9 commit 9dc2b7f

File tree

7 files changed

+133
-74
lines changed

7 files changed

+133
-74
lines changed

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/market/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ actix-web = "4"
3434
alloy = { version = "1.0", features = ["full"] }
3535
anyhow = "1.0"
3636
async-trait = { version = "0.1.33" }
37-
backtrace = "0.3.50"
3837
bincode = "1.3.3"
3938
bigdecimal = "0.4"
4039
chrono = { version = "0.4", features = ["serde"] }
@@ -45,7 +44,6 @@ diesel_migrations = "1.4"
4544
digest = "0.8.1"
4645
env_logger = { version = "0.7" }
4746
futures = "0.3"
48-
glob = "0.3.3"
4947
hex = "0.4"
5048
humantime = "2"
5149
lazy_static = "1.4"

core/market/src/db/model/demand.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use chrono::{NaiveDateTime, TimeZone, Utc};
2-
2+
use serde::{Deserialize, Serialize};
33
use ya_client::model::{market::Demand as ClientDemand, ErrorMessage, NodeId};
44
use ya_service_api_web::middleware::Identity;
55

66
use super::SubscriptionId;
77
use crate::db::schema::market_demand;
88
use ya_client::model::market::NewDemand;
99

10-
#[derive(Clone, Debug, Identifiable, Insertable, Queryable)]
10+
#[derive(Clone, Debug, Identifiable, Insertable, Queryable, Serialize, Deserialize)]
11+
#[serde(rename_all = "camelCase")]
1112
#[table_name = "market_demand"]
1213
pub struct Demand {
1314
pub id: SubscriptionId,

core/market/src/market.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use ya_client::model::market::{
99
Agreement, AgreementListEntry, AgreementOperationEvent as ClientAgreementEvent, Demand,
1010
NewDemand, NewOffer, Offer, Reason, Role,
1111
};
12-
1312
use ya_core_model::bus::GsbBindPoints;
1413
use ya_core_model::{self as model};
1514
use ya_service_api_interfaces::{Provider, Service};

core/market/src/matcher.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,34 @@ impl Matcher {
347347
) -> Result<Demand, MatcherError> {
348348
let demand = self.store.create_demand(id, demand).await?;
349349

350+
let matcher = env::var("YAGNA_MARKET_MATCHER_URL");
351+
if let Ok(matcher) = matcher {
352+
let client = reqwest::Client::new();
353+
let res = client
354+
.post(&(matcher + "/requestor/demand/new"))
355+
.header("Content-Type", "application/json")
356+
.body(serde_json::to_string(&demand).unwrap())
357+
.send()
358+
.await;
359+
match res {
360+
Ok(response) => {
361+
if response.status().is_success() {
362+
log::info!("Successfully notified matcher about new demand");
363+
} else {
364+
log::error!(
365+
"Failed to notify matcher about new demand: HTTP {}",
366+
response.status()
367+
);
368+
}
369+
}
370+
Err(e) => {
371+
log::error!("Failed to notify matcher about new demand: {}", e);
372+
}
373+
}
374+
} else {
375+
log::warn!("YAGNA_MARKET_MATCHER_URL not set, skipping demand notification");
376+
}
377+
350378
// Track demand expiration
351379
self.expiration_tracker
352380
.send(TrackDeadline {
@@ -394,6 +422,34 @@ impl Matcher {
394422
.await
395423
.ok();
396424

425+
let matcher = env::var("YAGNA_MARKET_MATCHER_URL");
426+
if let Ok(matcher) = matcher {
427+
let client = reqwest::Client::new();
428+
let res = client
429+
.post(&(matcher + "/requestor/demand/cancel"))
430+
.header("Content-Type", "application/json")
431+
.body("{\"demandId\":\"".to_string() + &demand_id.to_string() + "\"}")
432+
.send()
433+
.await;
434+
match res {
435+
Ok(response) => {
436+
if response.status().is_success() {
437+
log::info!("Successfully notified matcher about demand cancellation");
438+
} else {
439+
log::error!(
440+
"Failed to notify matcher about demand cancellation: HTTP {}",
441+
response.status()
442+
);
443+
}
444+
}
445+
Err(e) => {
446+
log::error!("Failed to notify matcher about demand cancellation: {}", e);
447+
}
448+
}
449+
} else {
450+
log::warn!("YAGNA_MARKET_MATCHER_URL not set, skipping demand notification");
451+
}
452+
397453
// Check if this was the last demand - if so, stop listening for offers
398454
self.maybe_stop_listening().await;
399455

core/market/src/protocol/discovery.rs

Lines changed: 73 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ use anyhow::Result;
22
use chrono::{DateTime, Utc};
33
use offer::GolemBaseOffer;
44
use std::collections::HashSet;
5-
use std::fs;
6-
use std::path::Path;
7-
use std::str::FromStr;
5+
use std::env;
86
use std::sync::{Arc, Mutex};
97
use std::time::Duration;
108
use tokio::task::JoinHandle;
@@ -26,16 +24,13 @@ use crate::db::model::{Offer as ModelOffer, SubscriptionId};
2624
use crate::identity::{IdentityApi, IdentityError, YagnaIdSigner};
2725
use crate::protocol::discovery::error::*;
2826
use crate::protocol::discovery::message::*;
29-
use crate::testing::MatcherError;
3027
use arkiv_sdk::client::ArkivClient;
3128
use arkiv_sdk::entity::Create;
3229
use arkiv_sdk::events::Event;
3330
use arkiv_sdk::rpc::{QueryOptions, SearchResult};
3431
use arkiv_sdk::signers::TransactionSigner;
3532
use arkiv_sdk::{Address, Hash};
36-
use glob::glob;
3733
use rand::{thread_rng, Rng};
38-
use serde_json::Value;
3934

4035
const ARKIV_CALLER: &str = "Arkiv";
4136

@@ -311,74 +306,87 @@ impl Discovery {
311306
Ok(())
312307
}
313308

314-
async fn read_offer_from_file(path: &Path) -> anyhow::Result<ModelOffer> {
315-
// Read the file
316-
let data = fs::read_to_string(path)?;
309+
async fn offers_events_loop(&self, _starting_block: u64) -> anyhow::Result<()> {
310+
let default_identity = self
311+
.inner
312+
.identity
313+
.default_identity()
314+
.await
315+
.expect("Failed to get default identity");
317316

318-
let val: Value = serde_json::from_str(&data)?;
317+
loop {
318+
// Wait for either Ctrl+C or timeout to continue loop
319+
tokio::select! {
320+
_ = tokio::signal::ctrl_c() => {
321+
log::info!("Received Ctrl+C, shutting down offers events loop");
322+
break;
323+
}
324+
_ = tokio::time::sleep(Duration::from_secs(10)) => {}
325+
}
319326

320-
let hash = Hash::from_str(
321-
val.as_object()
322-
.ok_or_else(|| {
323-
MatcherError::GolemBaseOfferError(
324-
"Offer serialized to non-object JSON".to_string(),
327+
let matcher = env::var("YAGNA_MARKET_MATCHER_URL");
328+
if let Ok(matcher) = matcher {
329+
let client = reqwest::Client::new();
330+
let res = client
331+
.post(&(matcher + "/requestor/demand/take-from-queue"))
332+
.header("Content-Type", "application/json")
333+
.body(
334+
"{
335+
\"demandId\""
336+
.to_string()
337+
+ ": \""
338+
+ &default_identity.to_string()
339+
+ "\"}",
325340
)
326-
})?
327-
.get("id")
328-
.ok_or_else(|| {
329-
MatcherError::GolemBaseOfferError("Offer JSON missing 'id' field".to_string())
330-
})?
331-
.as_str()
332-
.ok_or_else(|| {
333-
MatcherError::GolemBaseOfferError(
334-
"Offer 'id' field is not a string".to_string(),
335-
)
336-
})?,
337-
)
338-
.map_err(|e| {
339-
MatcherError::GolemBaseOfferError(format!("Failed to hash from id field: {}", e))
340-
})?;
341-
342-
let offer: GolemBaseOffer = serde_json::from_str(&data)?;
343-
344-
let offer = offer.into_model_offer(hash)?;
345-
346-
Ok(offer)
347-
}
348-
349-
async fn offers_events_loop(&self, _starting_block: u64) -> anyhow::Result<()> {
350-
loop {
351-
//read file from offer.json
352-
if let Ok(paths) = glob("incoming-offer-*.json") {
353-
#[allow(clippy::manual_flatten)]
354-
for entry in paths {
355-
if let Ok(path) = entry {
356-
match Self::read_offer_from_file(&path).await {
357-
Ok(offer) => {
358-
log::info!("Registering incoming offer from file: {:?}", path);
359-
self.register_incoming_offers(vec![offer]).await?;
341+
.send()
342+
.await;
343+
match res {
344+
Ok(response) => {
345+
if response.status().is_success() {
346+
let data = response.text().await.unwrap_or_default();
347+
348+
let offer = serde_json::from_str::<ModelOffer>(&data);
349+
match offer {
350+
Ok(offer) => {
351+
log::info!(
352+
"Registering incoming offer from matcher: {:?}",
353+
offer.id
354+
);
355+
self.register_incoming_offers(vec![offer]).await?;
356+
}
357+
Err(e) => {
358+
log::error!(
359+
"Failed to parse offer from matcher response: {}",
360+
e
361+
);
362+
}
360363
}
361-
Err(e) => {
364+
} else if response.status() == reqwest::StatusCode::NOT_FOUND {
365+
let response_text = response.text().await.unwrap_or_default();
366+
if response_text.is_empty() {
362367
log::error!(
363-
"Failed to read and register offer from file {:?}: {}",
364-
path,
365-
e
368+
"Failed to take offer due to other error (no body): {}",
369+
default_identity
370+
);
371+
} else {
372+
log::info!(
373+
"No offers available in matcher queue for demand {}",
374+
response_text
366375
);
367376
}
377+
} else {
378+
log::error!(
379+
"Failed to take offer, other status: {}",
380+
response.status()
381+
);
368382
}
369-
fs::remove_file(path).map_err(|e| {
370-
anyhow::anyhow!("Failed to remove invalid offer file: {}", e)
371-
})?;
383+
}
384+
Err(e) => {
385+
log::error!("Failed to take offer due to other error: {}", e);
372386
}
373387
}
374-
}
375-
// Wait for either Ctrl+C or timeout to continue loop
376-
tokio::select! {
377-
_ = tokio::signal::ctrl_c() => {
378-
log::info!("Received Ctrl+C, shutting down offers events loop");
379-
break;
380-
}
381-
_ = tokio::time::sleep(Duration::from_secs(1)) => {}
388+
} else {
389+
log::warn!("YAGNA_MARKET_MATCHER_URL not set, skipping demand notification");
382390
}
383391
}
384392
Ok(())
@@ -402,7 +410,7 @@ impl Discovery {
402410
DiscoveryInitError::GolemBaseInitFailed(format!("Failed to register offers: {}", e))
403411
})?;
404412

405-
let handle = tokio::spawn(async move {
413+
let handle = tokio::task::spawn_local(async move {
406414
discovery
407415
.offers_events_loop(starting_block)
408416
.await

core/market/src/protocol/discovery/builder.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@ use std::collections::HashSet;
44
use std::ops::Div;
55
use std::sync::Arc;
66

7-
use arkiv_sdk::client::{ArkivClient, TransactionConfig};
8-
97
use crate::protocol::callback::{CallbackFuture, OutputFuture};
108
use crate::protocol::callback::{CallbackHandler, CallbackMessage, HandlerSlot};
9+
use arkiv_sdk::client::{ArkivClient, TransactionConfig};
1110

1211
use super::error::DiscoveryInitError;
1312
use super::{Discovery, DiscoveryImpl};

0 commit comments

Comments
 (0)