Skip to content
This repository was archived by the owner on Jul 4, 2025. It is now read-only.

chore: refactor background tasks processing #2031

Merged
merged 4 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions engine/controllers/models.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,10 @@ void Models::ListModel(
obj["id"] = model_entry.model;
obj["model"] = model_entry.model;
obj["status"] = "downloaded";
// TODO(sang) Temporarily remove this estimation
// auto es = model_service_->GetEstimation(model_entry.model);
// if (es.has_value() && !!es.value()) {
// obj["recommendation"] = hardware::ToJson(*(es.value()));
// }
auto es = model_service_->GetEstimation(model_entry.model);
if (es.has_value()) {
obj["recommendation"] = hardware::ToJson(*es);
}
data.append(std::move(obj));
yaml_handler.Reset();
} else if (model_config.engine == kPythonEngine) {
Expand Down
8 changes: 6 additions & 2 deletions engine/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "utils/file_manager_utils.h"
#include "utils/logging_utils.h"
#include "utils/system_info_utils.h"
#include "utils/task_queue.h"

#if defined(__APPLE__) && defined(__MACH__)
#include <libgen.h> // for dirname()
Expand Down Expand Up @@ -177,8 +178,11 @@ void RunServer(std::optional<std::string> host, std::optional<int> port,
download_service, dylib_path_manager, db_service);
auto inference_svc = std::make_shared<InferenceService>(engine_service);
auto model_src_svc = std::make_shared<ModelSourceService>(db_service);
auto model_service = std::make_shared<ModelService>(
db_service, hw_service, download_service, inference_svc, engine_service);
cortex::TaskQueue task_queue(
std::min(2u, std::thread::hardware_concurrency()), "background_task");
auto model_service =
std::make_shared<ModelService>(db_service, hw_service, download_service,
inference_svc, engine_service, task_queue);
inference_svc->SetModelService(model_service);

auto file_watcher_srv = std::make_shared<FileWatcherService>(
Expand Down
4 changes: 1 addition & 3 deletions engine/services/hardware_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,6 @@ bool HardwareService::Restart(const std::string& host, int port) {
if (!TryConnectToServer(host, port)) {
return false;
}
std::cout << "Server started" << std::endl;
std::cout << "API Documentation available at: http://" << host << ":"
<< port << std::endl;
}

#endif
Expand Down Expand Up @@ -348,6 +345,7 @@ void HardwareService::UpdateHardwareInfos() {
return false;
return true;
};

auto res = db_service_->AddHardwareEntry(
HwEntry{.uuid = gpu.uuid,
.type = "gpu",
Expand Down
54 changes: 51 additions & 3 deletions engine/services/model_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,21 @@ cpp::result<DownloadTask, std::string> GetDownloadTask(
}
} // namespace

ModelService::ModelService(std::shared_ptr<DatabaseService> db_service,
std::shared_ptr<HardwareService> hw_service,
std::shared_ptr<DownloadService> download_service,
std::shared_ptr<InferenceService> inference_service,
std::shared_ptr<EngineServiceI> engine_svc,
cortex::TaskQueue& task_queue)
: db_service_(db_service),
hw_service_(hw_service),
download_service_{download_service},
inference_svc_(inference_service),
engine_svc_(engine_svc),
task_queue_(task_queue) {
ProcessBgrTasks();
};

void ModelService::ForceIndexingModelList() {
CTL_INF("Force indexing model list");

Expand Down Expand Up @@ -331,8 +346,17 @@ cpp::result<DownloadTask, std::string> ModelService::HandleDownloadUrlAsync(
return download_service_->AddTask(downloadTask, on_finished);
}

std::optional<hardware::Estimation> ModelService::GetEstimation(
const std::string& model_handle) {
std::lock_guard l(es_mtx_);
if (auto it = es_.find(model_handle); it != es_.end()) {
return it->second;
}
return std::nullopt;
}

cpp::result<std::optional<hardware::Estimation>, std::string>
ModelService::GetEstimation(const std::string& model_handle,
ModelService::EstimateModel(const std::string& model_handle,
const std::string& kv_cache, int n_batch,
int n_ubatch) {
namespace fs = std::filesystem;
Expand Down Expand Up @@ -548,7 +572,7 @@ ModelService::DownloadModelFromCortexsoAsync(
// Close the file
pyvenv_cfg.close();
// Add executable permission to python
set_permission_utils::SetExecutePermissionsRecursive(venv_path);
(void)set_permission_utils::SetExecutePermissionsRecursive(venv_path);
} else {
CTL_ERR("Failed to extract venv.zip");
};
Expand Down Expand Up @@ -828,7 +852,7 @@ cpp::result<StartModelResult, std::string> ModelService::StartModel(
CTL_WRN("Error: " + res.error());
for (auto& depend : depends) {
if (depend != model_handle) {
StopModel(depend);
auto sr = StopModel(depend);
}
}
return cpp::fail("Model failed to start dependency '" + depend +
Expand Down Expand Up @@ -1390,4 +1414,28 @@ std::string ModelService::GetEngineByModelId(
auto mc = yaml_handler.GetModelConfig();
CTL_DBG(mc.engine);
return mc.engine;
}

void ModelService::ProcessBgrTasks() {
CTL_INF("Start processing background tasks")
auto cb = [this] {
CTL_DBG("Estimate model resource usage");
auto list_entry = db_service_->LoadModelList();
if (list_entry) {
for (const auto& model_entry : list_entry.value()) {
// Only process local models
if (model_entry.status == cortex::db::ModelStatus::Downloaded) {
auto es = EstimateModel(model_entry.model);
if (es.has_value()) {
std::lock_guard l(es_mtx_);
es_[model_entry.model] = es.value();
}
}
}
}
};

auto clone = cb;
task_queue_.RunInQueue(std::move(cb));
task_queue_.RunEvery(std::chrono::seconds(10), std::move(clone));
}
20 changes: 13 additions & 7 deletions engine/services/model_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "services/download_service.h"
#include "services/hardware_service.h"
#include "utils/hardware/gguf/gguf_file_estimate.h"
#include "utils/task_queue.h"

class InferenceService;

Expand All @@ -35,12 +36,8 @@ class ModelService {
std::shared_ptr<HardwareService> hw_service,
std::shared_ptr<DownloadService> download_service,
std::shared_ptr<InferenceService> inference_service,
std::shared_ptr<EngineServiceI> engine_svc)
: db_service_(db_service),
hw_service_(hw_service),
download_service_{download_service},
inference_svc_(inference_service),
engine_svc_(engine_svc) {};
std::shared_ptr<EngineServiceI> engine_svc,
cortex::TaskQueue& task_queue);

cpp::result<std::string, std::string> AbortDownloadModel(
const std::string& task_id);
Expand Down Expand Up @@ -81,7 +78,10 @@ class ModelService {

bool HasModel(const std::string& id) const;

cpp::result<std::optional<hardware::Estimation>, std::string> GetEstimation(
std::optional<hardware::Estimation> GetEstimation(
const std::string& model_handle);

cpp::result<std::optional<hardware::Estimation>, std::string> EstimateModel(
const std::string& model_handle, const std::string& kv_cache = "f16",
int n_batch = 2048, int n_ubatch = 2048);

Expand Down Expand Up @@ -112,6 +112,8 @@ class ModelService {
const std::string& model_path, int ngl, int ctx_len, int n_batch = 2048,
int n_ubatch = 2048, const std::string& kv_cache_type = "f16");

void ProcessBgrTasks();

int GetCpuThreads() const;

std::shared_ptr<DatabaseService> db_service_;
Expand All @@ -126,4 +128,8 @@ class ModelService {
*/
std::unordered_map<std::string, std::shared_ptr<ModelMetadata>>
loaded_model_metadata_map_;

std::mutex es_mtx_;
std::unordered_map<std::string, std::optional<hardware::Estimation>> es_;
cortex::TaskQueue& task_queue_;
};
45 changes: 45 additions & 0 deletions engine/utils/task_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once
#include <memory>
#include <string>
#include "trantor/net/EventLoopThreadPool.h"

namespace cortex {
class TaskQueue {
public:
TaskQueue(size_t num_threads, const std::string& name)
: ev_loop_pool_(
std::make_unique<trantor::EventLoopThreadPool>(num_threads, name)) {
ev_loop_pool_->start();
}
~TaskQueue() {}

template <typename Functor>
void RunInQueue(Functor&& f) {
if (ev_loop_pool_) {
ev_loop_pool_->getNextLoop()->runInLoop(std::forward<Functor>(f));
}
}

template <typename Functor>
uint64_t RunEvery(const std::chrono::duration<double>& interval,
Functor&& cb) {
if (ev_loop_pool_) {
return ev_loop_pool_->getNextLoop()->runEvery(interval,
std::forward<Functor>(cb));
}
return 0;
}

template <typename Functor>
uint64_t RunAfter(const std::chrono::duration<double>& delay, Functor&& cb) {
if (ev_loop_pool_) {
return ev_loop_pool_->getNextLoop()->runAfter(delay,
std::forward<Functor>(cb));
}
return 0;
}

private:
std::unique_ptr<trantor::EventLoopThreadPool> ev_loop_pool_ = nullptr;
};
} // namespace cortex
Loading