From 9097b1cdc00cc2845cd1a18bfb6d35609166ac5d Mon Sep 17 00:00:00 2001 From: sangjanai Date: Wed, 26 Feb 2025 08:54:50 +0700 Subject: [PATCH] chore: refactor background tasks processing --- engine/controllers/models.cc | 9 +++-- engine/main.cc | 8 +++-- engine/services/hardware_service.cc | 19 +++++----- engine/services/model_service.cc | 54 +++++++++++++++++++++++++++-- engine/services/model_service.h | 20 +++++++---- engine/utils/task_queue.h | 45 ++++++++++++++++++++++++ 6 files changed, 127 insertions(+), 28 deletions(-) create mode 100644 engine/utils/task_queue.h diff --git a/engine/controllers/models.cc b/engine/controllers/models.cc index 7439a5df5..86b749ce6 100644 --- a/engine/controllers/models.cc +++ b/engine/controllers/models.cc @@ -218,11 +218,10 @@ void Models::ListModel( obj["id"] = model_entry.model; obj["model"] = model_entry.model; obj["status"] = "downloaded"; - // TODO(sang) Temporarily remove this estimation - // auto es = model_service_->GetEstimation(model_entry.model); - // if (es.has_value() && !!es.value()) { - // obj["recommendation"] = hardware::ToJson(*(es.value())); - // } + auto es = model_service_->GetEstimation(model_entry.model); + if (es.has_value()) { + obj["recommendation"] = hardware::ToJson(*es); + } data.append(std::move(obj)); yaml_handler.Reset(); } else if (model_config.engine == kPythonEngine) { diff --git a/engine/main.cc b/engine/main.cc index 122ea094a..2f60916a6 100644 --- a/engine/main.cc +++ b/engine/main.cc @@ -37,6 +37,7 @@ #include "utils/file_manager_utils.h" #include "utils/logging_utils.h" #include "utils/system_info_utils.h" +#include "utils/task_queue.h" #if defined(__APPLE__) && defined(__MACH__) #include // for dirname() @@ -177,8 +178,11 @@ void RunServer(std::optional host, std::optional port, download_service, dylib_path_manager, db_service); auto inference_svc = std::make_shared(engine_service); auto model_src_svc = std::make_shared(db_service); - auto model_service = std::make_shared( - db_service, hw_service, download_service, inference_svc, engine_service); + cortex::TaskQueue task_queue( + std::min(2u, std::thread::hardware_concurrency()), "background_task"); + auto model_service = + std::make_shared(db_service, hw_service, download_service, + inference_svc, engine_service, task_queue); inference_svc->SetModelService(model_service); auto file_watcher_srv = std::make_shared( diff --git a/engine/services/hardware_service.cc b/engine/services/hardware_service.cc index 56ecadd6d..2ec8c20b5 100644 --- a/engine/services/hardware_service.cc +++ b/engine/services/hardware_service.cc @@ -207,9 +207,6 @@ bool HardwareService::Restart(const std::string& host, int port) { if (!TryConnectToServer(host, port)) { return false; } - std::cout << "Server started" << std::endl; - std::cout << "API Documentation available at: http://" << host << ":" - << port << std::endl; } #endif @@ -327,18 +324,18 @@ void HardwareService::UpdateHardwareInfos() { // Note: only support NVIDIA for now, so hardware_id = software_id if (db_service_->HasHardwareEntry(gpu.uuid)) { auto res = db_service_->UpdateHardwareEntry(gpu.uuid, std::stoi(gpu.id), - std::stoi(gpu.id)); + std::stoi(gpu.id)); if (res.has_error()) { CTL_WRN(res.error()); } } else { - auto res = - db_service_->AddHardwareEntry(HwEntry{.uuid = gpu.uuid, - .type = "gpu", - .hardware_id = std::stoi(gpu.id), - .software_id = std::stoi(gpu.id), - .activated = true, - .priority = INT_MAX}); + auto res = db_service_->AddHardwareEntry( + HwEntry{.uuid = gpu.uuid, + .type = "gpu", + .hardware_id = std::stoi(gpu.id), + .software_id = std::stoi(gpu.id), + .activated = true, + .priority = INT_MAX}); if (res.has_error()) { CTL_WRN(res.error()); } diff --git a/engine/services/model_service.cc b/engine/services/model_service.cc index 979cf9342..ab4a0080f 100644 --- a/engine/services/model_service.cc +++ b/engine/services/model_service.cc @@ -143,6 +143,21 @@ cpp::result GetDownloadTask( } } // namespace +ModelService::ModelService(std::shared_ptr db_service, + std::shared_ptr hw_service, + std::shared_ptr download_service, + std::shared_ptr inference_service, + std::shared_ptr engine_svc, + cortex::TaskQueue& task_queue) + : db_service_(db_service), + hw_service_(hw_service), + download_service_{download_service}, + inference_svc_(inference_service), + engine_svc_(engine_svc), + task_queue_(task_queue) { + ProcessBgrTasks(); +}; + void ModelService::ForceIndexingModelList() { CTL_INF("Force indexing model list"); @@ -331,8 +346,17 @@ cpp::result ModelService::HandleDownloadUrlAsync( return download_service_->AddTask(downloadTask, on_finished); } +std::optional ModelService::GetEstimation( + const std::string& model_handle) { + std::lock_guard l(es_mtx_); + if (auto it = es_.find(model_handle); it != es_.end()) { + return it->second; + } + return std::nullopt; +} + cpp::result, std::string> -ModelService::GetEstimation(const std::string& model_handle, +ModelService::EstimateModel(const std::string& model_handle, const std::string& kv_cache, int n_batch, int n_ubatch) { namespace fs = std::filesystem; @@ -548,7 +572,7 @@ ModelService::DownloadModelFromCortexsoAsync( // Close the file pyvenv_cfg.close(); // Add executable permission to python - set_permission_utils::SetExecutePermissionsRecursive(venv_path); + (void)set_permission_utils::SetExecutePermissionsRecursive(venv_path); } else { CTL_ERR("Failed to extract venv.zip"); }; @@ -828,7 +852,7 @@ cpp::result ModelService::StartModel( CTL_WRN("Error: " + res.error()); for (auto& depend : depends) { if (depend != model_handle) { - StopModel(depend); + auto sr = StopModel(depend); } } return cpp::fail("Model failed to start dependency '" + depend + @@ -1381,4 +1405,28 @@ std::string ModelService::GetEngineByModelId( auto mc = yaml_handler.GetModelConfig(); CTL_DBG(mc.engine); return mc.engine; +} + +void ModelService::ProcessBgrTasks() { + CTL_INF("Start processing background tasks") + auto cb = [this] { + CTL_DBG("Estimate model resource usage"); + auto list_entry = db_service_->LoadModelList(); + if (list_entry) { + for (const auto& model_entry : list_entry.value()) { + // Only process local models + if (model_entry.status == cortex::db::ModelStatus::Downloaded) { + auto es = EstimateModel(model_entry.model); + if (es.has_value()) { + std::lock_guard l(es_mtx_); + es_[model_entry.model] = es.value(); + } + } + } + } + }; + + auto clone = cb; + task_queue_.RunInQueue(std::move(cb)); + task_queue_.RunEvery(std::chrono::seconds(10), std::move(clone)); } \ No newline at end of file diff --git a/engine/services/model_service.h b/engine/services/model_service.h index 17f2c0ddb..7525f022e 100644 --- a/engine/services/model_service.h +++ b/engine/services/model_service.h @@ -10,6 +10,7 @@ #include "services/download_service.h" #include "services/hardware_service.h" #include "utils/hardware/gguf/gguf_file_estimate.h" +#include "utils/task_queue.h" class InferenceService; @@ -35,12 +36,8 @@ class ModelService { std::shared_ptr hw_service, std::shared_ptr download_service, std::shared_ptr inference_service, - std::shared_ptr engine_svc) - : db_service_(db_service), - hw_service_(hw_service), - download_service_{download_service}, - inference_svc_(inference_service), - engine_svc_(engine_svc) {}; + std::shared_ptr engine_svc, + cortex::TaskQueue& task_queue); cpp::result AbortDownloadModel( const std::string& task_id); @@ -81,7 +78,10 @@ class ModelService { bool HasModel(const std::string& id) const; - cpp::result, std::string> GetEstimation( + std::optional GetEstimation( + const std::string& model_handle); + + cpp::result, std::string> EstimateModel( const std::string& model_handle, const std::string& kv_cache = "f16", int n_batch = 2048, int n_ubatch = 2048); @@ -112,6 +112,8 @@ class ModelService { const std::string& model_path, int ngl, int ctx_len, int n_batch = 2048, int n_ubatch = 2048, const std::string& kv_cache_type = "f16"); + void ProcessBgrTasks(); + std::shared_ptr db_service_; std::shared_ptr hw_service_; std::shared_ptr download_service_; @@ -124,4 +126,8 @@ class ModelService { */ std::unordered_map> loaded_model_metadata_map_; + + std::mutex es_mtx_; + std::unordered_map> es_; + cortex::TaskQueue& task_queue_; }; diff --git a/engine/utils/task_queue.h b/engine/utils/task_queue.h new file mode 100644 index 000000000..911a7b307 --- /dev/null +++ b/engine/utils/task_queue.h @@ -0,0 +1,45 @@ +#pragma once +#include +#include +#include "trantor/net/EventLoopThreadPool.h" + +namespace cortex { +class TaskQueue { + public: + TaskQueue(size_t num_threads, const std::string& name) + : ev_loop_pool_( + std::make_unique(num_threads, name)) { + ev_loop_pool_->start(); + } + ~TaskQueue() {} + + template + void RunInQueue(Functor&& f) { + if (ev_loop_pool_) { + ev_loop_pool_->getNextLoop()->runInLoop(std::forward(f)); + } + } + + template + uint64_t RunEvery(const std::chrono::duration& interval, + Functor&& cb) { + if (ev_loop_pool_) { + return ev_loop_pool_->getNextLoop()->runEvery(interval, + std::forward(cb)); + } + return 0; + } + + template + uint64_t RunAfter(const std::chrono::duration& delay, Functor&& cb) { + if (ev_loop_pool_) { + return ev_loop_pool_->getNextLoop()->runAfter(delay, + std::forward(cb)); + } + return 0; + } + + private: + std::unique_ptr ev_loop_pool_ = nullptr; +}; +} // namespace cortex \ No newline at end of file