Skip to content

Conversation

@TJxiaobao
Copy link
Contributor

Based on the existing project framework, refactor the scheduling module of hertzbeat-collector-go.

@zqr10159 zqr10159 merged commit 47ef6c3 into main Sep 8, 2025
6 checks passed
@zqr10159 zqr10159 deleted the reconfiguration_scheduling branch September 8, 2025 13:40
// Clear running jobs
r.mu.Lock()
r.runningJobs = make(map[int64]*jobtypes.Job)
r.mu.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go 中锁的写法一般在函数开头
使用 defer 释放资源,defer 总是在退出函数时执行

func (r *Runner) Close() error {
r.mu.Lock()
defer r.mu.Unlock()

r.runningJobs = make(map[int64]*jobtypes.Job)

}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will correct this problem. Thank you for your suggestion!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得这个文件不用删掉,项目确实得有一个地方入口以供测试功能(单测除外


func startRunners(ctx context.Context, cfg *clrServer.Server) error {

// Initialize all collectors before starting runners
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得可以将这部分移动到 job 内部完成 这样项目整体结构会更好理解一些

从 server 启动依赖组件,collector task 也是 job 的一部分?你怎么看 @TJxiaobao

"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)

// init 函数在包被导入时自动执行
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以换成英文注释


// init 函数在包被导入时自动执行
// 集中注册所有协议的工厂函数
func init() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里可能仍然有一些出入,不过问题不大。这种方式也挺好的,显式配置更好

"fmt"
"time"

_ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是预期中的吗?

它会执行 bacis 包下的所有 init 函数

}

// NewResultHandler creates a new result handler
func NewResultHandler(logger logger.Logger) *ResultHandlerImpl {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果这里是实现了一个接口的话,我想这里应该返回的是接口类型,而不是结构体的实现

// 4. Triggering alerts based on thresholds
// 5. Updating monitoring status

if data.Code == 200 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

200 是魔法值,可以用 http 库的 200 常量来判断

}

// NewCommonDispatcher creates a new common dispatcher
func NewCommonDispatcher(logger logger.Logger, metricsCollector MetricsCollector, resultHandler ResultHandler) *CommonDispatcherImpl {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里应该创建一个 ctx 吗?

从 go 的编程风格来说,ctx 应该永远被用作函数的第一个参数实现,而不是放到结构体里面去,这不符合 go 的编程规范(约定俗成

commonDispatcher MetricsTaskDispatcher
cyclicTasks sync.Map // map[int64]*jobtypes.Timeout for cyclic jobs
tempTasks sync.Map // map[int64]*jobtypes.Timeout for one-time jobs
ctx context.Context
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上,ctx 不应该作为结构体的属性

)

// hashedWheelTimer implements a time wheel for efficient timeout management
type hashedWheelTimer struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

小写?

bucket.mu.RUnlock()
}

return map[string]interface{}{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以使用 any 代替 interface {}

if err := r.timeDispatch.AddJob(job); err != nil {
delete(r.runningJobs, job.ID)
r.Logger.Error(err, "failed to add job to time dispatcher", "jobID", job.ID)
return fmt.Errorf("failed to add job to time dispatcher: %w", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

使用 errors.New() 以返回一个 error

// Runner implements the service runner interface
type Runner struct {
Config
timeDispatch TimeDispatcher
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些配置应该都在 Config 中,config 是当前 server 的所有 ctx 配置,runner 只负责将当前 server run 起来

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants