diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index 17e58b6f7..79cced863 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "dc05b0d5d14def9120f2bdfa82aec1fac3b5e388003989fd56263827f1fab85b", + "checksum": "0a147d75b6943fe7fce1c7d185a8c1576d83eebf42e9e15a9f046cae1ff72dbb", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", @@ -28565,35 +28565,29 @@ ], "crate_features": { "common": [ + "elf", + "errno", "general", "ioctl", "no_std" ], "selects": { "aarch64-unknown-linux-gnu": [ - "elf", - "errno", "prctl", "std", "system" ], "arm-unknown-linux-gnueabi": [ - "elf", - "errno", "prctl", "std", "system" ], "armv7-unknown-linux-gnueabi": [ - "elf", - "errno", "prctl", "std", "system" ], "i686-unknown-linux-gnu": [ - "elf", - "errno", "prctl", "std", "system" @@ -28609,8 +28603,6 @@ "system" ], "x86_64-unknown-linux-gnu": [ - "elf", - "errno", "prctl", "std", "system" @@ -35831,6 +35823,78 @@ }, "license": "MIT" }, + "qualifier 0.4.3": { + "name": "qualifier", + "version": "0.4.3", + "repository": null, + "targets": [], + "library_target_name": null, + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "deps": { + "common": [ + { + "id": "anyhow 1.0.86", + "target": "anyhow" + }, + { + "id": "clap 4.5.9", + "target": "clap" + }, + { + "id": "dirs 5.0.1", + "target": "dirs" + }, + { + "id": "ic-nervous-system-common-test-keys 0.9.0", + "target": "ic_nervous_system_common_test_keys" + }, + { + "id": "itertools 0.13.0", + "target": "itertools" + }, + { + "id": "log 0.4.22", + "target": "log" + }, + { + "id": "pretty_env_logger 0.5.0", + "target": "pretty_env_logger" + }, + { + "id": "reqwest 0.12.5", + "target": "reqwest" + }, + { + "id": "serde 1.0.204", + "target": "serde" + }, + { + "id": "serde_json 1.0.120", + "target": "serde_json" + }, + { + "id": "tokio 1.38.1", + "target": "tokio" + }, + { + "id": "tokio-util 0.7.11", + "target": "tokio_util" + }, + { + "id": "url 2.5.2", + "target": "url" + } + ], + "selects": {} + }, + "edition": "2021", + "version": "0.4.3" + }, + "license": null + }, "quick-xml 0.23.1": { "name": "quick-xml", "version": "0.23.1", @@ -50321,6 +50385,7 @@ "np-notifications 0.4.3": "rs/np-notifications", "obs-canister-clients 0.4.3": "rs/ic-observability/obs-canister-clients", "prometheus-config-updater 0.4.3": "rs/ic-observability/prometheus-config-updater", + "qualifier 0.4.3": "rs/qualifier", "rollout-controller 0.4.3": "rs/rollout-controller", "service-discovery 0.4.3": "rs/ic-observability/service-discovery", "slack-notifications 0.4.3": "rs/slack-notifications", diff --git a/Cargo.lock b/Cargo.lock index 1bd4f6075..8dc66da47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7148,6 +7148,29 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "qualifier" +version = "0.4.3" +dependencies = [ + "anyhow", + "clap 4.5.9", + "dirs", + "dre", + "ic-canisters", + "ic-management-backend", + "ic-management-types", + "ic-nervous-system-common-test-keys", + "itertools 0.13.0", + "log", + "pretty_env_logger", + "reqwest", + "serde", + "serde_json", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "quick-xml" version = "0.23.1" diff --git a/Cargo.toml b/Cargo.toml index 028db033a..39075a4de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "rs/rollout-controller", "rs/slack-notifications", "rs/dre-canisters/trustworthy-node-metrics/src/trustworthy-node-metrics", + "rs/qualifier", "rs/dre-canisters/trustworthy-node-metrics/src/trustworthy-node-metrics-types", ] @@ -109,6 +110,7 @@ ic-base-types = { git = "https://github.com/dfinity/ic.git", rev = "5ba1412f9175 ic-canister-client = { git = "https://github.com/dfinity/ic.git", rev = "5ba1412f9175d987661ae3c0d8dbd1ac3e092b7d" } ic-canister-client-sender = { git = "https://github.com/dfinity/ic.git", rev = "5ba1412f9175d987661ae3c0d8dbd1ac3e092b7d" } ic-canisters = { path = "rs/ic-canisters" } +ic-nervous-system-common-test-keys = { git = "https://github.com/dfinity/ic.git", rev = "5ba1412f9175d987661ae3c0d8dbd1ac3e092b7d" } ic-cdk = "0.15.0" ic-config = { git = "https://github.com/dfinity/ic.git", rev = "5ba1412f9175d987661ae3c0d8dbd1ac3e092b7d" } ic-crypto-utils-threshold-sig-der = { git = "https://github.com/dfinity/ic.git", rev = "5ba1412f9175d987661ae3c0d8dbd1ac3e092b7d" } diff --git a/bazel/external_crates.bzl b/bazel/external_crates.bzl index af18bd169..1d444aee4 100644 --- a/bazel/external_crates.bzl +++ b/bazel/external_crates.bzl @@ -66,6 +66,7 @@ def external_crates_repository(): "//rs/slack-notifications:Cargo.toml", "//rs/rollout-controller:Cargo.toml", "//rs/dre-canisters/trustworthy-node-metrics/src/trustworthy-node-metrics:Cargo.toml", + "//rs/qualifier:Cargo.toml", "//rs/dre-canisters/trustworthy-node-metrics/src/trustworthy-node-metrics-types:Cargo.toml", ], splicing_config = splicing_config( diff --git a/rs/cli/src/commands/mod.rs b/rs/cli/src/commands/mod.rs index 168354567..3bb1c4540 100644 --- a/rs/cli/src/commands/mod.rs +++ b/rs/cli/src/commands/mod.rs @@ -28,14 +28,14 @@ mod api_boundary_nodes; mod completions; mod der_to_principal; mod firewall; -mod get; +pub mod get; mod heal; pub mod hostos; mod node_metrics; mod nodes; mod proposals; mod propose; -mod qualify; +pub mod qualify; mod registry; mod subnet; mod update_unassigned_nodes; @@ -170,7 +170,7 @@ pub trait ExecutableCommand { fn validate(&self, cmd: &mut Command); - async fn execute(&self, ctx: DreContext) -> anyhow::Result<()>; + fn execute(&self, ctx: DreContext) -> impl std::future::Future>; fn validate_min_nakamoto_coefficients(cmd: &mut clap::Command, min_nakamoto_coefficients: &[String]) { let _ = Self::_parse_min_nakamoto_coefficients_inner(Some(cmd), min_nakamoto_coefficients); diff --git a/rs/cli/src/commands/qualify/execute.rs b/rs/cli/src/commands/qualify/execute.rs index d6ca37ebf..5c0574cb0 100644 --- a/rs/cli/src/commands/qualify/execute.rs +++ b/rs/cli/src/commands/qualify/execute.rs @@ -12,26 +12,26 @@ use crate::{ pub struct Execute { /// Version which is to be qualified #[clap(long, short)] - version: String, + pub version: String, /// Starting version for the network. /// /// If left empty, the tool will use the current NNS version #[clap(long, short)] - from_version: Option, + pub from_version: Option, /// Specify the steps to run /// A range can be: `4`, `3..`, `..3, `1..3` #[clap(long)] - step_range: Option, + pub step_range: Option, /// Name of the deployment used for prometheus querying of `ic` label: `staging`, `from-config`... #[clap(long)] - deployment_name: String, + pub deployment_name: String, /// Prometheus compliant endpoint #[clap(long)] - prometheus_endpoint: String, + pub prometheus_endpoint: String, } impl ExecutableCommand for Execute { diff --git a/rs/cli/src/commands/qualify/mod.rs b/rs/cli/src/commands/qualify/mod.rs index b82ecefd8..43a142d1d 100644 --- a/rs/cli/src/commands/qualify/mod.rs +++ b/rs/cli/src/commands/qualify/mod.rs @@ -4,17 +4,17 @@ use list::List; use super::ExecutableCommand; -mod execute; +pub mod execute; mod list; #[derive(Args, Debug)] pub struct QualifyCmd { #[clap(subcommand)] - subcommand: QualifyCommands, + pub subcommand: QualifyCommands, } #[derive(Subcommand, Debug)] -enum QualifyCommands { +pub enum QualifyCommands { /// List all steps present in the qualification List(List), /// Execute the qualification diff --git a/rs/cli/src/commands/upgrade.rs b/rs/cli/src/commands/upgrade.rs index c120a3b77..d20257cb9 100644 --- a/rs/cli/src/commands/upgrade.rs +++ b/rs/cli/src/commands/upgrade.rs @@ -6,7 +6,7 @@ use tokio::task::JoinHandle; use super::{ExecutableCommand, IcAdminRequirement}; -#[derive(Args, Debug)] +#[derive(Args, Debug, Default)] pub struct Upgrade { /// Version to which the tool should be upgraded, if omitted /// the latest version will be used @@ -15,10 +15,6 @@ pub struct Upgrade { } impl Upgrade { - pub fn new() -> Self { - Self { version: None } - } - pub async fn run(&self) -> anyhow::Result { let version = self.version.clone(); tokio::task::spawn_blocking(move || Self::check_latest_release(env!("CARGO_PKG_VERSION"), true, version)).await? diff --git a/rs/cli/src/lib.rs b/rs/cli/src/lib.rs index a0552f989..a4829e382 100644 --- a/rs/cli/src/lib.rs +++ b/rs/cli/src/lib.rs @@ -1,3 +1,9 @@ #![allow(dead_code)] pub mod auth; +pub mod commands; +pub mod ctx; pub mod ic_admin; +mod operations; +mod qualification; +mod runner; +mod subnet_manager; diff --git a/rs/cli/src/main.rs b/rs/cli/src/main.rs index 4f973f689..d98c657cf 100644 --- a/rs/cli/src/main.rs +++ b/rs/cli/src/main.rs @@ -42,7 +42,7 @@ async fn main() -> anyhow::Result<()> { let r = args.execute(ctx).await; - let handle = Upgrade::new().check(); + let handle = Upgrade::default().check(); let maybe_update_status = handle.await?; match maybe_update_status { Ok(s) => match s { diff --git a/rs/cli/src/operations/hostos_rollout.rs b/rs/cli/src/operations/hostos_rollout.rs index 673033569..50cc66d2f 100644 --- a/rs/cli/src/operations/hostos_rollout.rs +++ b/rs/cli/src/operations/hostos_rollout.rs @@ -578,7 +578,7 @@ pub mod test { let open_proposals: Vec = vec![]; - let network = Network::new("mainnet", &vec![]).await.unwrap(); + let network = Network::new("mainnet", &[]).await.unwrap(); let nns_urls = network.get_nns_urls(); let hostos_rollout = HostosRollout::new( Arc::new(union.clone()), diff --git a/rs/cli/src/qualification/mod.rs b/rs/cli/src/qualification/mod.rs index f48f9e628..088114ced 100644 --- a/rs/cli/src/qualification/mod.rs +++ b/rs/cli/src/qualification/mod.rs @@ -287,14 +287,22 @@ impl QualificationExecutor { } print_text(format!("Executing step {}: `{}`", ordered_step.index, ordered_step.step.name())); - ordered_step.step.execute(&self.dre_ctx).await?; + let step_future = || async { ordered_step.step.execute(&self.dre_ctx).await }; + step_future.retry(&ExponentialBuilder::default()).await?; print_text(format!("Executed step {}: `{}`", ordered_step.index, ordered_step.step.name())); let registry = self.dre_ctx.registry().await; print_text(format!("Syncing with registry after step {}", ordered_step.index)); let sync_registry = || async { registry.sync_with_nns().await }; - sync_registry.retry(&ExponentialBuilder::default()).await?; + // If the system subnet downgraded it could be some time until it boots up + sync_registry + .retry( + &ExponentialBuilder::default() + .with_max_times(10) + .with_max_delay(Duration::from_secs(5 * 60)), + ) + .await?; } print_text(format!("Qualification of {} finished successfully!", self.to_version)); diff --git a/rs/cli/src/qualification/run_workload_test.rs b/rs/cli/src/qualification/run_workload_test.rs index f578fbd08..d3c80a2ab 100644 --- a/rs/cli/src/qualification/run_workload_test.rs +++ b/rs/cli/src/qualification/run_workload_test.rs @@ -99,16 +99,13 @@ async fn ensure_finalization_rate_for_subnet( common_labels ); let query = format!("avg(rate({}[{}s]))", query_selector, duration); - print_text(format!("Running query: {}", query)); - let response = client + let request = client .get(prom_endpoint) .header("Accept", "application/json") - .query(&[("time", end_timestamp.to_string()), ("query", query)]) - .send() - .await? - .error_for_status()? - .json::() - .await?; + .query(&[("time", end_timestamp.to_string()), ("query", query)]); + print_text(format!("Running query: {:?}", request)); + let response = request.send().await?.error_for_status()?.json::().await?; + print_text(format!("Received response: \n{}", serde_json::to_string_pretty(&response)?)); let finalization_rate = response["data"]["result"][0]["value"][1] .as_str() diff --git a/rs/qualifier/BUILD.bazel b/rs/qualifier/BUILD.bazel new file mode 100644 index 000000000..d6d1d3acb --- /dev/null +++ b/rs/qualifier/BUILD.bazel @@ -0,0 +1,23 @@ +load("@crate_index_dre//:defs.bzl", "aliases", "all_crate_deps") +load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_test") +load("@//rs:oci_images.bzl", "rust_binary_oci_image_rules") + +DEPS = [ + "//rs/cli:dre-lib", + "//rs/ic-canisters", + "//rs/ic-management-types", + "//rs/ic-management-backend:ic-management-backend-lib" +] + +rust_binary( + name = "qualifier", + srcs = glob(["src/**/*.rs"]), + aliases = aliases(), + proc_macro_deps = all_crate_deps( + proc_macro = True, + ), + stamp = 1, + deps = all_crate_deps( + normal = True, + ) + DEPS, +) diff --git a/rs/qualifier/Cargo.toml b/rs/qualifier/Cargo.toml new file mode 100644 index 000000000..ee266b6fd --- /dev/null +++ b/rs/qualifier/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "qualifier" +version.workspace = true +edition.workspace = true +authors.workspace = true +description.workspace = true +documentation.workspace = true + +[dependencies] +anyhow = { workspace = true } +clap = { workspace = true } +tokio = { workspace = true } +log = { workspace = true } +pretty_env_logger = { workspace = true } +ic-nervous-system-common-test-keys = { workspace = true } +dirs = { workspace = true } +dre = { path = "../cli" } +ic-management-types = { path = "../ic-management-types" } +ic-canisters = { path = "../ic-canisters" } +serde_json = { workspace = true } +serde = { workspace = true } +tokio-util = { workspace = true } +itertools = { workspace = true } +reqwest = { workspace = true } +url = { workspace = true } +ic-management-backend = { path = "../ic-management-backend" } diff --git a/rs/qualifier/src/cli.rs b/rs/qualifier/src/cli.rs new file mode 100644 index 000000000..e856b2c83 --- /dev/null +++ b/rs/qualifier/src/cli.rs @@ -0,0 +1,105 @@ +use std::{path::PathBuf, process::Stdio, str::FromStr}; + +use clap::Parser; + +use ic_nervous_system_common_test_keys::TEST_NEURON_1_OWNER_KEYPAIR; +use tokio::process::Command; +const TEST_NEURON_1_IDENTITY_PATH: &str = ".config/dfx/identity/test_neuron_1/identity.pem"; + +#[derive(Parser)] +#[clap(about, version)] +pub struct Args { + /// Version to qualify + pub version_to_qualify: String, + + /// Specify a version from which the qualification + /// should start. The default will be the same + /// version as the NNS + #[clap(long)] + pub initial_version: Option, + + /// Path which contains the layout of the network to + /// be deployed. The default value will be a network + /// consisting of: + /// 2 application subnets (4 nodes per subnet) + /// 1 system subnet (4 nodes) + /// 4 unassigned nodes + #[clap(long)] + pub config_override: Option, + + #[clap(long, default_value = dirs::cache_dir().unwrap().join("git/ic").display().to_string())] + pub ic_repo_path: PathBuf, + + /// Skip the pulling of ic repo which is mostly useful + /// for development since each change on master will + /// result in rebuilding of image + #[clap(long)] + pub skip_pull: bool, +} + +impl Args { + pub fn ensure_key(&self) -> anyhow::Result<(u64, PathBuf)> { + let key_pair = &TEST_NEURON_1_OWNER_KEYPAIR; + let path = dirs::home_dir() + .ok_or(anyhow::anyhow!("No home dir present"))? + .join(PathBuf::from_str(TEST_NEURON_1_IDENTITY_PATH)?); + let dir = path.parent().ok_or(anyhow::anyhow!("No parent dir for path: {}", path.display()))?; + if !dir.exists() { + std::fs::create_dir_all(dir)?; + } + + std::fs::write(&path, key_pair.to_pem()).map_err(|e| anyhow::anyhow!(e))?; + Ok((449479075714955186, path)) + } + + pub async fn ensure_git(&self) -> anyhow::Result<()> { + std::fs::create_dir_all(&self.ic_repo_path)?; + + let git_dir = &self.ic_repo_path.join(".git"); + if !git_dir.exists() { + if !Command::new("git") + .args(["clone", "https://github.com/dfinity/ic.git", "."]) + .current_dir(&self.ic_repo_path) + .stderr(Stdio::null()) + .stdout(Stdio::null()) + .status() + .await? + .success() + { + anyhow::bail!("Failed to clone ic repo") + } + + return Ok(()); + } + + if !Command::new("git") + .args(["switch", "master", "-f"]) + .current_dir(&self.ic_repo_path) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .await? + .success() + { + anyhow::bail!("Failed to switch branch to master") + } + + if self.skip_pull { + return Ok(()); + } + + if !Command::new("git") + .args(["pull"]) + .current_dir(&self.ic_repo_path) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .await? + .success() + { + anyhow::bail!("Failed to pull master branch") + } + + Ok(()) + } +} diff --git a/rs/qualifier/src/ict_util.rs b/rs/qualifier/src/ict_util.rs new file mode 100644 index 000000000..959bd8128 --- /dev/null +++ b/rs/qualifier/src/ict_util.rs @@ -0,0 +1,105 @@ +use std::{path::PathBuf, process::Stdio, str::FromStr}; + +use itertools::Itertools; +use log::info; +use serde_json::Value; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + process::{ChildStdout, Command}, + sync::mpsc::Sender, +}; +use tokio_util::sync::CancellationToken; + +use crate::Message; + +pub async fn ict(ic_git: PathBuf, config: String, token: CancellationToken, sender: Sender) -> anyhow::Result<()> { + let ic_config = PathBuf::from_str("/tmp/ic_config.json")?; + std::fs::write(&ic_config, &config)?; + + let command = "gitlab-ci/container/container-run.sh"; + let args = &[ + "ict", + "testnet", + "create", + "--lifetime-mins", + "180", + "--from-ic-config-path", + &ic_config.display().to_string(), + ]; + + info!("Running command: {} {}", command, args.iter().join(" ")); + let mut child = Command::new(command) + .args(args) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(&ic_git) + .spawn()?; + + if let Some(taken) = child.stdout.as_mut() { + wait_data(taken, token.clone(), sender).await?; + } + + token.cancelled().await; + info!("Received shutdown, killing testnet"); + child.kill().await?; + + Ok(()) +} + +async fn wait_data(stdout: &mut ChildStdout, token: CancellationToken, sender: Sender) -> anyhow::Result<()> { + let mut stdout_reader = BufReader::new(stdout).lines(); + + let target = "Testnet is being deployed, please wait ..."; + let logs; + info!("Finding logs file..."); + loop { + let line = stdout_reader.next_line().await?; + if let Some(text) = line { + if text.contains(target) { + let path = text + .split(target) + .last() + .ok_or(anyhow::anyhow!("Failed to parse output"))? + .trim() + .to_string(); + logs = path; + break; + } + } + + if token.is_cancelled() { + return Ok(()); + } + } + + sender + .send(Message::Log(logs)) + .await + .map_err(|_| anyhow::anyhow!("Failed to send data across channels"))?; + + info!("Building testnet..."); + let mut whole_config = vec![]; + loop { + let line = stdout_reader.next_line().await?; + if let Some(line) = line { + whole_config.push(line.trim().to_string()); + let config = whole_config.iter().join(""); + + if serde_json::from_str::(&config).is_ok() { + break; + } + } + + if token.is_cancelled() { + return Ok(()); + } + } + + let config = whole_config.iter().join(""); + sender + .send(Message::Config(config)) + .await + .map_err(|_| anyhow::anyhow!("Failed to send data across channels"))?; + + Ok(()) +} diff --git a/rs/qualifier/src/main.rs b/rs/qualifier/src/main.rs new file mode 100644 index 000000000..ecefef8dc --- /dev/null +++ b/rs/qualifier/src/main.rs @@ -0,0 +1,186 @@ +use std::{fmt::Display, time::Duration}; + +use clap::Parser; +use cli::Args; +use dre::{ + auth::{Auth, Neuron}, + ic_admin::{download_ic_admin, should_update_ic_admin, IcAdminWrapper}, +}; +use ic_canisters::governance::governance_canister_version; +use ic_management_types::Network; +use ict_util::ict; +use log::info; +use qualify_util::qualify; +use reqwest::ClientBuilder; +use serde_json::Value; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; + +mod cli; +mod ict_util; +mod qualify_util; + +const NETWORK_NAME: &str = "configured-testnet"; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + init_logger(); + + // Check if farm is reachable. If not, error + let client = ClientBuilder::new().timeout(Duration::from_secs(30)).build()?; + client + .get("https://kibana.testnet.dfinity.network") + .send() + .await + .map_err(|e| anyhow::anyhow!("Checking connectivity failed: {}", e.to_string()))? + .error_for_status() + .map_err(|e| anyhow::anyhow!("Checking connectivity failed: {}", e.to_string()))?; + + let args = Args::parse(); + info!("Running qualification for {}", args.version_to_qualify); + info!("Generating keys for farm testnets..."); + let (neuron_id, private_key_pem) = args.ensure_key()?; + info!("Principal key created"); + + // Take in one version and figure out what is the base version + // + // To find the initial version we could take NNS version? + let initial_version = if let Some(ref v) = args.initial_version { + v.to_string() + } else { + info!("Fetching the version of NNS which will be used as starting point"); + let mainnet = Network::new_unchecked("mainnet", &[])?; + let ic_admin_path = match should_update_ic_admin()? { + (true, _) => { + let govn_canister_version = governance_canister_version(mainnet.get_nns_urls()).await?; + download_ic_admin(Some(govn_canister_version.stringified_hash)).await? + } + (false, s) => s, + }; + let mainnet_anonymous_neuron = Neuron { + auth: Auth::Anonymous, + neuron_id: 0, + include_proposer: false, + }; + let ic_admin = IcAdminWrapper::new(mainnet, Some(ic_admin_path), true, mainnet_anonymous_neuron, false); + let response = ic_admin + .run_passthrough_get( + &[ + "subnet".to_string(), + "tdb26-jop6k-aogll-7ltgs-eruif-6kk7m-qpktf-gdiqx-mxtrf-vb5e6-eqe".to_string(), + ], + true, + ) + .await?; + + let initial_version = serde_json::from_str::(&response)?; + let initial_version = initial_version["records"][0]["value"]["replica_version_id"] + .as_str() + .ok_or(anyhow::anyhow!("Couldn't parse subnet record"))?; + initial_version.to_string() + }; + + if initial_version == args.version_to_qualify { + anyhow::bail!("Initial version and version to qualify are the same") + } + info!("Initial version that will be used: {}", initial_version); + + // Generate configuration for `ict` including the initial version + // + // We could take in a file and mutate it and copy it to /tmp folder + let config = if let Some(ref path) = args.config_override { + let contents = std::fs::read_to_string(path)?; + let mut config = serde_json::from_str::(&contents)?; + config["initial_version"] = serde_json::Value::String(initial_version.to_owned()); + + serde_json::to_string_pretty(&config)? + } else { + let config = format!( + r#"{{ + "subnets": [ + {{ + "subnet_type": "application", + "num_nodes": 4 + }}, + {{ + "subnet_type": "application", + "num_nodes": 4 + }}, + {{ + "subnet_type": "system", + "num_nodes": 4 + }} + ], + "num_unassigned_nodes": 4, + "initial_version": "{}" + }}"#, + &initial_version + ); + + // Validate that the string is valid json + serde_json::to_string_pretty(&serde_json::from_str::(&config)?)? + }; + info!("Using configuration: \n{}", config); + + args.ensure_git().await?; + + // Run ict and capture its output + // + // Its important to parse the output correctly so we get the path to + // log of the tool if something fails, on top of that we should + // aggregate the output of the command which contains the json dump + // of topology to parse it and get the nns urls and other links. Also + // we have to extract the neuron pem file to use with dre + let token = CancellationToken::new(); + let (sender, mut receiver) = mpsc::channel(2); + let handle = tokio::spawn(ict(args.ic_repo_path.clone(), config, token.clone(), sender)); + + qualify( + &mut receiver, + private_key_pem, + neuron_id, + NETWORK_NAME, + initial_version, + args.version_to_qualify.to_string(), + ) + .await?; + + info!("Finished qualifier run for: {}", args.version_to_qualify); + + token.cancel(); + handle.await??; + Ok(()) +} + +fn init_logger() { + match std::env::var("RUST_LOG") { + Ok(val) => std::env::set_var("LOG_LEVEL", val), + Err(_) => { + if std::env::var("LOG_LEVEL").is_err() { + // Default logging level is: info generally, warn for mio and actix_server + // You can override defaults by setting environment variables + // RUST_LOG or LOG_LEVEL + std::env::set_var("LOG_LEVEL", "info,mio::=warn,actix_server::=warn") + } + } + } + pretty_env_logger::init_custom_env("LOG_LEVEL"); +} + +pub enum Message { + Log(String), + Config(String), +} + +impl Display for Message { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match &self { + Message::Log(p) => p, + Message::Config(c) => c, + } + ) + } +} diff --git a/rs/qualifier/src/qualify_util.rs b/rs/qualifier/src/qualify_util.rs new file mode 100644 index 000000000..06525847f --- /dev/null +++ b/rs/qualifier/src/qualify_util.rs @@ -0,0 +1,134 @@ +use std::{path::PathBuf, str::FromStr}; + +use anyhow::Error; +use dre::{ + commands::{qualify::execute::Execute, Args, ExecutableCommand}, + ctx::DreContext, +}; +use ic_management_backend::registry::local_registry_path; +use ic_management_types::Network; +use itertools::Itertools; +use log::info; +use serde_json::Value; +use tokio::sync::mpsc::Receiver; +use url::Url; + +use crate::Message; + +pub async fn qualify( + receiver: &mut Receiver, + private_key_pem: PathBuf, + neuron_id: u64, + network_name: &str, + from_version: String, + to_version: String, +) -> anyhow::Result<()> { + // Run dre to qualify with correct parameters + info!("Awaiting logs path..."); + let data = receiver.recv().await.ok_or(anyhow::anyhow!("Failed to recv data"))?; + + info!("Received logs: {}", data); + + info!("Awaiting config..."); + let data = receiver.recv().await.ok_or(anyhow::anyhow!("Failed to recv data"))?; + + let config = match data { + Message::Log(_) => anyhow::bail!("Expected `Config` but found `Log`"), + Message::Config(c) => c, + }; + let config = Config::from_str(&config)?; + + info!("Received following config: {:#?}", config); + info!("Running qualification..."); + + // At this point we are going to run so we need to remove previous + // registry stored on the disk + let reg_path = local_registry_path(&Network::new_unchecked(network_name, &config.nns_urls).unwrap()); + if reg_path.exists() { + info!("Detected registry from previous runs on path: {}", reg_path.display()); + std::fs::remove_dir_all(®_path)?; + info!("Removed registry from previous runs"); + } + + let args = Args { + hsm_pin: None, + hsm_slot: None, + hsm_key_id: None, + private_key_pem: Some(private_key_pem), + neuron_id: Some(neuron_id), + ic_admin: None, + yes: true, + dry_run: false, + network: network_name.to_string(), + nns_urls: config.nns_urls, + subcommands: dre::commands::Subcommands::Qualify(dre::commands::qualify::QualifyCmd { + subcommand: dre::commands::qualify::QualifyCommands::Execute(Execute { + version: to_version, + from_version: Some(from_version), + step_range: None, + deployment_name: config.deployment_name, + prometheus_endpoint: config.prometheus_url, + }), + }), + verbose: false, + no_sync: false, + }; + let ctx = DreContext::from_args(&args).await?; + + args.execute(ctx).await +} + +#[derive(Debug)] +#[allow(dead_code)] +struct Config { + deployment_name: String, + kibana_url: String, + nns_urls: Vec, + prometheus_url: String, +} + +impl FromStr for Config { + type Err = Error; + + fn from_str(s: &str) -> Result { + let parsed = serde_json::from_str::(s)?; + + let system = parsed["ic_topology"]["subnets"] + .as_array() + .ok_or(anyhow::anyhow!("Failed to find 'ic_topology.subnets'"))? + .iter() + .find(|elem| elem["subnet_type"].as_str().eq(&Some("system"))) + .ok_or(anyhow::anyhow!("Didn't find system subnet"))?; + + let nns_urls = system["nodes"] + .as_array() + .ok_or(anyhow::anyhow!("Didn't find nodes within system subnet"))? + .iter() + .map(|n| n["ipv6"].as_str().ok_or(anyhow::anyhow!("Didn't find ipv6 within node"))) + .collect_vec(); + + if nns_urls.iter().any(|res| res.is_err()) { + anyhow::bail!("Failed to deserialize nns urls") + } + + let deployment_name = parsed["farm"]["group"] + .as_str() + .ok_or(anyhow::anyhow!("Failed to find 'farm.group'"))? + .to_string(); + + let config = Self { + prometheus_url: format!("http://prometheus.{}.testnet.farm.dfinity.systems/api/v1/query", deployment_name), + deployment_name, + kibana_url: parsed["kibana_url"]["url"] + .as_str() + .ok_or(anyhow::anyhow!("Failed to find 'kibana_url.url'"))? + .to_string(), + nns_urls: nns_urls + .into_iter() + .map(|n| Url::from_str(&format!("http://[{}]:8080/", n.unwrap())).unwrap()) + .collect_vec(), + }; + + Ok(config) + } +}