Claude Code Plugins

Community-maintained marketplace

Feedback

43-agent-module-architecture

@TencentBlueKing/bk-ci
2.5k
0

Agent 构建机模块架构指南(Go 语言),涵盖 Agent 启动流程、心跳机制、任务领取执行、升级更新、与 Dispatch 交互。当用户开发 Agent 功能、修改心跳逻辑、处理任务执行或实现 Agent 升级时使用。

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name 43-agent-module-architecture
description Agent 构建机模块架构指南(Go 语言),涵盖 Agent 启动流程、心跳机制、任务领取执行、升级更新、与 Dispatch 交互。当用户开发 Agent 功能、修改心跳逻辑、处理任务执行或实现 Agent 升级时使用。

Agent 构建机模块架构指南

模块定位: Agent 是 BK-CI 的构建机核心组件,由 Go 语言编写,负责与后端服务通信、接收构建任务、拉起 Worker 进程执行构建。

一、模块概述

1.1 核心职责

职责 说明
进程管理 Daemon 守护 Agent 进程,确保持续运行
任务调度 从 Dispatch 服务拉取构建任务并执行
Worker 管理 拉起 Worker(Kotlin JAR)执行实际构建逻辑
心跳上报 定期向后端上报 Agent 状态和环境信息
自动升级 检测并自动升级 Agent、Worker、JDK
数据采集 通过 Telegraf 采集构建机指标数据
Docker 构建 支持 Docker 容器化构建(Linux)

1.2 与 Worker 的关系

┌─────────────────────────────────────────────────────────────┐
│                    构建机 (Build Machine)                    │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────┐     守护      ┌─────────┐                      │
│  │ Daemon  │ ───────────▶ │  Agent  │                      │
│  │  (Go)   │              │  (Go)   │                      │
│  └─────────┘              └────┬────┘                      │
│                                │ 拉起                       │
│                                ▼                            │
│                          ┌─────────┐                        │
│                          │ Worker  │                        │
│                          │(Kotlin) │                        │
│                          └────┬────┘                        │
│                               │ 执行                        │
│                               ▼                             │
│                    ┌──────────────────────┐                │
│                    │ 插件任务 / 脚本任务   │                │
│                    └──────────────────────┘                │
└─────────────────────────────────────────────────────────────┘
  • Agent (Go): 负责进程调度、与后端通信、环境管理
  • Worker (Kotlin): 负责具体构建任务执行、插件运行、日志上报

二、目录结构

src/agent/
├── agent/                          # 主 Agent 模块
│   ├── src/
│   │   ├── cmd/                    # 入口程序
│   │   │   ├── agent/main.go       # Agent 主程序入口
│   │   │   ├── daemon/main.go      # Daemon 守护进程入口
│   │   │   ├── installer/main.go   # 安装程序入口
│   │   │   └── upgrader/main.go    # 升级程序入口
│   │   ├── pkg/                    # 核心包
│   │   │   ├── agent/              # Agent 核心逻辑
│   │   │   ├── api/                # API 客户端
│   │   │   ├── collector/          # 数据采集
│   │   │   ├── config/             # 配置管理
│   │   │   ├── cron/               # 定时任务
│   │   │   ├── i18n/               # 国际化
│   │   │   ├── imagedebug/         # Docker 镜像调试
│   │   │   ├── job/                # 构建任务管理
│   │   │   ├── job_docker/         # Docker 构建
│   │   │   ├── pipeline/           # Pipeline 任务
│   │   │   ├── upgrade/            # 升级逻辑
│   │   │   ├── upgrader/           # 升级器实现
│   │   │   └── util/               # 工具函数
│   │   └── third_components/       # 第三方组件管理
│   ├── go.mod
│   ├── Makefile
│   └── README.md
├── agent-slim/                     # 轻量版 Agent
│   └── cmd/slim.go
└── common/                         # 公共工具库
    └── utils/
        ├── fileutil/
        └── slice.go

三、核心组件详解

3.1 Daemon 守护进程

文件: src/cmd/daemon/main.go

Daemon 负责守护 Agent 进程,确保其持续运行:

// Unix 实现:通过文件锁检测 Agent 是否存活
func watch(isDebug bool) {
    totalLock := flock.New(fmt.Sprintf("%s/%s.lock", systemutil.GetRuntimeDir(), systemutil.TotalLock))
    
    // 首次立即检查
    totalLock.Lock()
    doCheckAndLaunchAgent(isDebug)
    totalLock.Unlock()
    
    // 定时检查(5秒间隔)
    checkTimeTicker := time.NewTicker(agentCheckGap)
    for ; ; totalLock.Unlock() {
        select {
        case <-checkTimeTicker.C:
            if err := totalLock.Lock(); err != nil {
                continue
            }
            doCheckAndLaunchAgent(isDebug)
        }
    }
}

// 检查并拉起 Agent
func doCheckAndLaunchAgent(isDebug bool) {
    agentLock := flock.New(fmt.Sprintf("%s/agent.lock", systemutil.GetRuntimeDir()))
    locked, err := agentLock.TryLock()
    if err == nil && locked {
        // 能获取锁说明 Agent 未运行,需要拉起
        logs.Warn("agent is not available, will launch it")
        process, err := launch(workDir+"/"+config.AgentFileClientLinux, isDebug)
        if err != nil {
            logs.WithError(err).Error("launch agent failed")
        }
    }
}

Windows 实现: 使用 github.com/kardianos/service 库实现 Windows Service

3.2 Agent 核心流程

文件: src/pkg/agent/agent.go

func Run(isDebug bool) {
    // 1. 初始化配置
    config.Init(isDebug)
    third_components.Init()
    
    // 2. 初始化国际化
    i18n.InitAgentI18n()
    
    // 3. 上报启动(重试直到成功)
    _, err := job.AgentStartup()
    if err != nil {
        for {
            _, err = job.AgentStartup()
            if err == nil {
                break
            }
            time.Sleep(5 * time.Second)
        }
    }
    
    // 4. 启动后台任务
    go collector.Collect()      // 数据采集
    go cron.CleanJob()          // 定期清理
    go cron.CleanDebugContainer() // 清理调试容器
    
    // 5. 主循环:Ask 请求
    for {
        doAsk()
        config.LoadAgentIp()
        time.Sleep(5 * time.Second)
    }
}

3.3 Ask 统一请求模式

Agent 使用 Ask 模式统一处理多种任务:

func doAsk() {
    // 构建 Ask 请求
    enable := genAskEnable()
    heart, upgrad := genHeartInfoAndUpgrade(enable.Upgrade, exiterror)
    
    result, err := api.Ask(&api.AskInfo{
        Enable:  enable,      // 启用的功能
        Heart:   heart,       // 心跳信息
        Upgrade: upgrad,      // 升级信息
    })
    
    // 处理响应
    resp := new(api.AskResp)
    util.ParseJsonToData(result.Data, &resp)
    
    // 执行各类任务
    doAgentJob(enable, resp)
}

func doAgentJob(enable api.AskEnable, resp *api.AskResp) {
    // 心跳响应处理
    if resp.Heart != nil {
        go agentHeartbeat(resp.Heart)
    }
    
    // 构建任务
    hasBuild := (enable.Build != api.NoneBuildType) && (resp.Build != nil)
    if hasBuild {
        go job.DoBuild(resp.Build)
    }
    
    // 升级任务
    if enable.Upgrade && resp.Upgrade != nil {
        go upgrade.AgentUpgrade(resp.Upgrade, hasBuild)
    }
    
    // Pipeline 任务
    if enable.Pipeline && resp.Pipeline != nil {
        go pipeline.RunPipeline(resp.Pipeline)
    }
    
    // Docker 调试
    if enable.DockerDebug && resp.Debug != nil {
        go imagedebug.DoImageDebug(resp.Debug)
    }
}

3.4 构建任务执行

文件: src/pkg/job/build.go

// DoBuild 执行构建任务
func DoBuild(buildInfo *api.ThirdPartyBuildInfo) {
    // 获取任务锁
    BuildTotalManager.Lock.Lock()
    
    // 检查并发数
    dockerCanRun, normalCanRun := CheckParallelTaskCount()
    
    if buildInfo.DockerBuildInfo != nil && dockerCanRun {
        // Docker 构建
        GBuildDockerManager.AddBuild(buildInfo.BuildId, &api.ThirdPartyDockerTaskInfo{...})
        BuildTotalManager.Lock.Unlock()
        runDockerBuild(buildInfo)
        return
    }
    
    if normalCanRun {
        // 普通构建
        GBuildManager.AddPreInstance(buildInfo.BuildId)
        BuildTotalManager.Lock.Unlock()
        runBuild(buildInfo)
    }
}

// runBuild 启动 Worker 进程
func runBuild(buildInfo *api.ThirdPartyBuildInfo) error {
    // 检查 worker.jar 是否存在
    agentJarPath := config.BuildAgentJarPath()
    if !fileutil.Exists(agentJarPath) {
        // 尝试自愈
        upgradeWorkerFile := systemutil.GetUpgradeDir() + "/" + config.WorkAgentFile
        if fileutil.Exists(upgradeWorkerFile) {
            fileutil.CopyFile(upgradeWorkerFile, agentJarPath, true)
        }
    }
    
    // 设置环境变量
    goEnv := map[string]string{
        "DEVOPS_AGENT_VERSION":     config.AgentVersion,
        "DEVOPS_WORKER_VERSION":    third_components.Worker.GetVersion(),
        "DEVOPS_PROJECT_ID":        buildInfo.ProjectId,
        "DEVOPS_BUILD_ID":          buildInfo.BuildId,
        "DEVOPS_VM_SEQ_ID":         buildInfo.VmSeqId,
        "DEVOPS_FILE_GATEWAY":      config.GAgentConfig.FileGateway,
        "DEVOPS_GATEWAY":           config.GetGateWay(),
        "BK_CI_LOCALE_LANGUAGE":    config.GAgentConfig.Language,
        "DEVOPS_AGENT_JDK_8_PATH":  third_components.Jdk.Jdk8.GetJavaOrNull(),
        "DEVOPS_AGENT_JDK_17_PATH": third_components.Jdk.Jdk17.GetJavaOrNull(),
    }
    
    // 创建临时目录并启动构建
    tmpDir, _ := systemutil.MkBuildTmpDir()
    doBuild(buildInfo, tmpDir, workDir, goEnv, runUser)
}

3.5 配置管理

文件: src/pkg/config/config.go

Agent 配置从 .agent.properties 文件加载:

// 配置键定义
const (
    KeyProjectId         = "devops.project.id"
    KeyAgentId           = "devops.agent.id"
    KeySecretKey         = "devops.agent.secret.key"
    KeyDevopsGateway     = "landun.gateway"
    KeyDevopsFileGateway = "landun.fileGateway"
    KeyTaskCount         = "devops.parallel.task.count"
    KeyEnvType           = "landun.env"
    KeySlaveUser         = "devops.slave.user"
    KeyDockerTaskCount   = "devops.docker.parallel.task.count"
    KeyLanguage          = "devops.language"
    // ...
)

// AgentConfig 配置结构
type AgentConfig struct {
    Gateway                 string
    FileGateway             string
    BuildType               string
    ProjectId               string
    AgentId                 string
    SecretKey               string
    ParallelTaskCount       int
    DockerParallelTaskCount int
    EnableDockerBuild       bool
    Language                string
    // ...
}

// AgentEnv 环境信息
type AgentEnv struct {
    OsName           string
    agentIp          string
    HostName         string
    AgentVersion     string
    AgentInstallPath string
    OsVersion        string
    CPUProductInfo   string
    GPUProductInfo   string
}

3.6 API 客户端

文件: src/pkg/api/api.go

// 构建 URL
func buildUrl(url string) string {
    return config.GetGateWay() + url
}

// Agent 启动上报
func AgentStartup() (*httputil.DevopsResult, error) {
    url := buildUrl("/ms/environment/api/buildAgent/agent/thirdPartyAgent/startup")
    startInfo := &ThirdPartyAgentStartInfo{
        HostName:      config.GAgentEnv.HostName,
        HostIp:        config.GAgentEnv.GetAgentIp(),
        DetectOs:      config.GAgentEnv.OsName,
        MasterVersion: config.AgentVersion,
        SlaveVersion:  third_components.Worker.GetVersion(),
    }
    return httputil.NewHttpClient().Post(url).Body(startInfo, false).
        SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(nil).IntoDevopsResult()
}

// 构建完成上报
func WorkerBuildFinish(buildInfo *ThirdPartyBuildWithStatus) (*httputil.DevopsResult, error) {
    url := buildUrl("/ms/dispatch/api/buildAgent/agent/thirdPartyAgent/workerBuildFinish")
    return httputil.NewHttpClient().Post(url).Body(buildInfo, false).
        SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(nil).IntoDevopsResult()
}

// Ask 统一请求
func Ask(info *AskInfo) (*httputil.AgentResult, error) {
    url := buildUrl("/ms/dispatch/api/buildAgent/agent/thirdPartyAgent/ask")
    return httputil.NewHttpClient().Post(url).Body(info, bodyEq).
        SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(askRequest.Resp).IntoAgentResult()
}

3.7 升级机制

文件: src/pkg/upgrade/upgrade.go

// AgentUpgrade 升级主逻辑
func AgentUpgrade(upgradeItem *api.UpgradeItem, hasBuild bool) {
    upItems := &upgradeItems{
        Agent:          upgradeItem.Agent,
        Worker:         upgradeItem.Worker,
        Jdk:            upgradeItem.Jdk,
        DockerInitFile: upgradeItem.DockerInitFile,
    }
    
    if upItems.NoChange() {
        return
    }
    
    // 有构建任务时跳过升级
    if hasBuild {
        return
    }
    
    // 获取任务锁,确保无任务运行
    if !job.BuildTotalManager.Lock.TryLock() {
        return
    }
    defer job.BuildTotalManager.Lock.Unlock()
    
    if job.CheckRunningJob() {
        return
    }
    
    // 下载升级文件
    downloadUpgradeFiles(upItems)
    
    // 执行升级
    DoUpgradeOperation(upItems)
}

3.8 数据采集

文件: src/pkg/collector/collector.go

使用 Telegraf 进行数据采集:

func Collect() {
    if config.GAgentConfig.CollectorOn == false {
        logs.Info("agent collector off")
        return
    }
    
    for {
        ctx, cancel := context.WithCancel(context.Background())
        go func() {
            // 监听 IP 变化事件
            ipData := <-ipChan.DChan
            cancel()
        }()
        doAgentCollect(ctx)
    }
}

func doAgentCollect(ctx context.Context) {
    // 生成 Telegraf 配置
    configContent, _ := genTelegrafConfig()
    
    // 初始化 Telegraf Agent
    tAgent, _ := getTelegrafAgent(configContent.Bytes(), logFile)
    
    // 运行采集
    for {
        tAgent.Run(ctx)
        time.Sleep(telegrafRelaunchTime)
    }
}

四、数据类型定义

4.1 构建信息

文件: src/pkg/api/type.go

// 构建任务类型
type BuildJobType string

const (
    AllBuildType    BuildJobType = "ALL"
    DockerBuildType BuildJobType = "DOCKER"
    BinaryBuildType BuildJobType = "BINARY"
    NoneBuildType   BuildJobType = "NONE"
)

// 第三方构建信息
type ThirdPartyBuildInfo struct {
    ProjectId       string                     `json:"projectId"`
    BuildId         string                     `json:"buildId"`
    VmSeqId         string                     `json:"vmSeqId"`
    Workspace       string                     `json:"workspace"`
    PipelineId      string                     `json:"pipelineId"`
    DockerBuildInfo *ThirdPartyDockerBuildInfo `json:"dockerBuildInfo"`
    ExecuteCount    *int                       `json:"executeCount"`
    ContainerHashId string                     `json:"containerHashId"`
}

// Docker 构建信息
type ThirdPartyDockerBuildInfo struct {
    AgentId         string        `json:"agentId"`
    SecretKey       string        `json:"secretKey"`
    Image           string        `json:"image"`
    Credential      Credential    `json:"credential"`
    Options         DockerOptions `json:"options"`
    ImagePullPolicy string        `json:"imagePullPolicy"`
}

4.2 心跳信息

// Agent 心跳信息
type AgentHeartbeatInfo struct {
    MasterVersion           string                     `json:"masterVersion"`
    SlaveVersion            string                     `json:"slaveVersion"`
    HostName                string                     `json:"hostName"`
    AgentIp                 string                     `json:"agentIp"`
    ParallelTaskCount       int                        `json:"parallelTaskCount"`
    AgentInstallPath        string                     `json:"agentInstallPath"`
    StartedUser             string                     `json:"startedUser"`
    TaskList                []ThirdPartyTaskInfo       `json:"taskList"`
    DockerParallelTaskCount int                        `json:"dockerParallelTaskCount"`
    DockerTaskList          []ThirdPartyDockerTaskInfo `json:"dockerTaskList"`
}

// 心跳响应
type AgentHeartbeatResponse struct {
    MasterVersion           string            `json:"masterVersion"`
    SlaveVersion            string            `json:"slaveVersion"`
    AgentStatus             string            `json:"agentStatus"`
    ParallelTaskCount       int               `json:"parallelTaskCount"`
    Envs                    map[string]string `json:"envs"`
    Gateway                 string            `json:"gateway"`
    FileGateway             string            `json:"fileGateway"`
    DockerParallelTaskCount int               `json:"dockerParallelTaskCount"`
    Language                string            `json:"language"`
}

五、跨平台支持

5.1 平台特定代码

Agent 通过 Go 的构建标签支持多平台:

src/pkg/config/
├── config.go           # 通用配置
├── config_darwin.go    # macOS 特定
├── config_linux.go     # Linux 特定
└── config_win.go       # Windows 特定

src/pkg/upgrader/
├── upgrader_darwin.go  # macOS 升级器
├── upgrader_unix.go    # Unix 升级器
└── upgrader_win.go     # Windows 升级器

5.2 构建命令

# Linux
make clean build_linux

# macOS
make clean build_macos

# Windows
build_windows.bat

生成的二进制文件:

  • devopsDaemon_linux / devopsDaemon_macos / devopsDaemon.exe
  • devopsAgent_linux / devopsAgent_macos / devopsAgent.exe
  • upgrader_linux / upgrader_macos / upgrader.exe

六、与后端服务交互

6.1 API 端点

服务 端点 用途
Environment /ms/environment/api/buildAgent/agent/thirdPartyAgent/startup Agent 启动上报
Dispatch /ms/dispatch/api/buildAgent/agent/thirdPartyAgent/ask 统一 Ask 请求
Dispatch /ms/dispatch/api/buildAgent/agent/thirdPartyAgent/workerBuildFinish 构建完成上报
Environment /ms/environment/api/buildAgent/agent/thirdPartyAgent/agents/pipelines Pipeline 任务
Environment /ms/environment/api/buildAgent/agent/thirdPartyAgent/upgrade/files/download 下载升级文件

6.2 认证头

func (a *AgentConfig) GetAuthHeaderMap() map[string]string {
    return map[string]string{
        "X-DEVOPS-BUILD-TYPE": a.BuildType,
        "X-DEVOPS-PROJECT-ID": a.ProjectId,
        "X-DEVOPS-AGENT-ID":   a.AgentId,
        "X-DEVOPS-AGENT-SECRET-KEY": a.SecretKey,
    }
}

七、开发规范

7.1 错误处理

// 标准错误检查
if err != nil {
    logs.WithError(err).Error("operation failed")
    return errors.Wrap(err, "context message")
}

// Panic 恢复
defer func() {
    if err := recover(); err != nil {
        logs.Error("panic: ", err)
    }
}()

7.2 日志规范

// 日志级别
logs.Debug("debug message")
logs.Info("info message")
logs.Infof("formatted: %s", value)
logs.Warn("warning message")
logs.Error("error message")
logs.WithError(err).Error("error with context")

7.3 并发模式

// 启动 goroutine
go collector.Collect()
go cron.CleanJob()

// 使用锁保护共享资源
BuildTotalManager.Lock.Lock()
defer BuildTotalManager.Lock.Unlock()

// 使用文件锁进行进程间同步
agentLock := flock.New(fmt.Sprintf("%s/agent.lock", runtimeDir))
locked, err := agentLock.TryLock()

7.4 新增功能开发

  1. 新增 API 调用:在 src/pkg/api/api.go 添加函数
  2. 新增数据类型:在 src/pkg/api/type.go 定义结构体
  3. 新增配置项:在 src/pkg/config/config.go 添加常量和字段
  4. 新增后台任务:在 doAgentJob() 中添加处理逻辑

八、控制脚本

# Linux 示例
scripts/linux/install.sh    # 安装
scripts/linux/start.sh      # 启动
scripts/linux/stop.sh       # 停止
scripts/linux/uninstall.sh  # 卸载

九、相关模块

模块 关系 说明
Worker 下游 Agent 拉起 Worker 执行构建
Environment 上游 Agent 状态管理、心跳上报
Dispatch 上游 构建任务分发
Log 下游 构建日志上报(通过 Worker)