| name | 43-agent-module-architecture |
| description | Agent 构建机模块架构指南(Go 语言),涵盖 Agent 启动流程、心跳机制、任务领取执行、升级更新、与 Dispatch 交互。当用户开发 Agent 功能、修改心跳逻辑、处理任务执行或实现 Agent 升级时使用。 |
Agent 构建机模块架构指南
模块定位: Agent 是 BK-CI 的构建机核心组件,由 Go 语言编写,负责与后端服务通信、接收构建任务、拉起 Worker 进程执行构建。
一、模块概述
1.1 核心职责
| 职责 | 说明 |
|---|---|
| 进程管理 | Daemon 守护 Agent 进程,确保持续运行 |
| 任务调度 | 从 Dispatch 服务拉取构建任务并执行 |
| Worker 管理 | 拉起 Worker(Kotlin JAR)执行实际构建逻辑 |
| 心跳上报 | 定期向后端上报 Agent 状态和环境信息 |
| 自动升级 | 检测并自动升级 Agent、Worker、JDK |
| 数据采集 | 通过 Telegraf 采集构建机指标数据 |
| Docker 构建 | 支持 Docker 容器化构建(Linux) |
1.2 与 Worker 的关系
┌─────────────────────────────────────────────────────────────┐
│ 构建机 (Build Machine) │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────┐ 守护 ┌─────────┐ │
│ │ Daemon │ ───────────▶ │ Agent │ │
│ │ (Go) │ │ (Go) │ │
│ └─────────┘ └────┬────┘ │
│ │ 拉起 │
│ ▼ │
│ ┌─────────┐ │
│ │ Worker │ │
│ │(Kotlin) │ │
│ └────┬────┘ │
│ │ 执行 │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ 插件任务 / 脚本任务 │ │
│ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
- Agent (Go): 负责进程调度、与后端通信、环境管理
- Worker (Kotlin): 负责具体构建任务执行、插件运行、日志上报
二、目录结构
src/agent/
├── agent/ # 主 Agent 模块
│ ├── src/
│ │ ├── cmd/ # 入口程序
│ │ │ ├── agent/main.go # Agent 主程序入口
│ │ │ ├── daemon/main.go # Daemon 守护进程入口
│ │ │ ├── installer/main.go # 安装程序入口
│ │ │ └── upgrader/main.go # 升级程序入口
│ │ ├── pkg/ # 核心包
│ │ │ ├── agent/ # Agent 核心逻辑
│ │ │ ├── api/ # API 客户端
│ │ │ ├── collector/ # 数据采集
│ │ │ ├── config/ # 配置管理
│ │ │ ├── cron/ # 定时任务
│ │ │ ├── i18n/ # 国际化
│ │ │ ├── imagedebug/ # Docker 镜像调试
│ │ │ ├── job/ # 构建任务管理
│ │ │ ├── job_docker/ # Docker 构建
│ │ │ ├── pipeline/ # Pipeline 任务
│ │ │ ├── upgrade/ # 升级逻辑
│ │ │ ├── upgrader/ # 升级器实现
│ │ │ └── util/ # 工具函数
│ │ └── third_components/ # 第三方组件管理
│ ├── go.mod
│ ├── Makefile
│ └── README.md
├── agent-slim/ # 轻量版 Agent
│ └── cmd/slim.go
└── common/ # 公共工具库
└── utils/
├── fileutil/
└── slice.go
三、核心组件详解
3.1 Daemon 守护进程
文件: src/cmd/daemon/main.go
Daemon 负责守护 Agent 进程,确保其持续运行:
// Unix 实现:通过文件锁检测 Agent 是否存活
func watch(isDebug bool) {
totalLock := flock.New(fmt.Sprintf("%s/%s.lock", systemutil.GetRuntimeDir(), systemutil.TotalLock))
// 首次立即检查
totalLock.Lock()
doCheckAndLaunchAgent(isDebug)
totalLock.Unlock()
// 定时检查(5秒间隔)
checkTimeTicker := time.NewTicker(agentCheckGap)
for ; ; totalLock.Unlock() {
select {
case <-checkTimeTicker.C:
if err := totalLock.Lock(); err != nil {
continue
}
doCheckAndLaunchAgent(isDebug)
}
}
}
// 检查并拉起 Agent
func doCheckAndLaunchAgent(isDebug bool) {
agentLock := flock.New(fmt.Sprintf("%s/agent.lock", systemutil.GetRuntimeDir()))
locked, err := agentLock.TryLock()
if err == nil && locked {
// 能获取锁说明 Agent 未运行,需要拉起
logs.Warn("agent is not available, will launch it")
process, err := launch(workDir+"/"+config.AgentFileClientLinux, isDebug)
if err != nil {
logs.WithError(err).Error("launch agent failed")
}
}
}
Windows 实现: 使用 github.com/kardianos/service 库实现 Windows Service
3.2 Agent 核心流程
文件: src/pkg/agent/agent.go
func Run(isDebug bool) {
// 1. 初始化配置
config.Init(isDebug)
third_components.Init()
// 2. 初始化国际化
i18n.InitAgentI18n()
// 3. 上报启动(重试直到成功)
_, err := job.AgentStartup()
if err != nil {
for {
_, err = job.AgentStartup()
if err == nil {
break
}
time.Sleep(5 * time.Second)
}
}
// 4. 启动后台任务
go collector.Collect() // 数据采集
go cron.CleanJob() // 定期清理
go cron.CleanDebugContainer() // 清理调试容器
// 5. 主循环:Ask 请求
for {
doAsk()
config.LoadAgentIp()
time.Sleep(5 * time.Second)
}
}
3.3 Ask 统一请求模式
Agent 使用 Ask 模式统一处理多种任务:
func doAsk() {
// 构建 Ask 请求
enable := genAskEnable()
heart, upgrad := genHeartInfoAndUpgrade(enable.Upgrade, exiterror)
result, err := api.Ask(&api.AskInfo{
Enable: enable, // 启用的功能
Heart: heart, // 心跳信息
Upgrade: upgrad, // 升级信息
})
// 处理响应
resp := new(api.AskResp)
util.ParseJsonToData(result.Data, &resp)
// 执行各类任务
doAgentJob(enable, resp)
}
func doAgentJob(enable api.AskEnable, resp *api.AskResp) {
// 心跳响应处理
if resp.Heart != nil {
go agentHeartbeat(resp.Heart)
}
// 构建任务
hasBuild := (enable.Build != api.NoneBuildType) && (resp.Build != nil)
if hasBuild {
go job.DoBuild(resp.Build)
}
// 升级任务
if enable.Upgrade && resp.Upgrade != nil {
go upgrade.AgentUpgrade(resp.Upgrade, hasBuild)
}
// Pipeline 任务
if enable.Pipeline && resp.Pipeline != nil {
go pipeline.RunPipeline(resp.Pipeline)
}
// Docker 调试
if enable.DockerDebug && resp.Debug != nil {
go imagedebug.DoImageDebug(resp.Debug)
}
}
3.4 构建任务执行
文件: src/pkg/job/build.go
// DoBuild 执行构建任务
func DoBuild(buildInfo *api.ThirdPartyBuildInfo) {
// 获取任务锁
BuildTotalManager.Lock.Lock()
// 检查并发数
dockerCanRun, normalCanRun := CheckParallelTaskCount()
if buildInfo.DockerBuildInfo != nil && dockerCanRun {
// Docker 构建
GBuildDockerManager.AddBuild(buildInfo.BuildId, &api.ThirdPartyDockerTaskInfo{...})
BuildTotalManager.Lock.Unlock()
runDockerBuild(buildInfo)
return
}
if normalCanRun {
// 普通构建
GBuildManager.AddPreInstance(buildInfo.BuildId)
BuildTotalManager.Lock.Unlock()
runBuild(buildInfo)
}
}
// runBuild 启动 Worker 进程
func runBuild(buildInfo *api.ThirdPartyBuildInfo) error {
// 检查 worker.jar 是否存在
agentJarPath := config.BuildAgentJarPath()
if !fileutil.Exists(agentJarPath) {
// 尝试自愈
upgradeWorkerFile := systemutil.GetUpgradeDir() + "/" + config.WorkAgentFile
if fileutil.Exists(upgradeWorkerFile) {
fileutil.CopyFile(upgradeWorkerFile, agentJarPath, true)
}
}
// 设置环境变量
goEnv := map[string]string{
"DEVOPS_AGENT_VERSION": config.AgentVersion,
"DEVOPS_WORKER_VERSION": third_components.Worker.GetVersion(),
"DEVOPS_PROJECT_ID": buildInfo.ProjectId,
"DEVOPS_BUILD_ID": buildInfo.BuildId,
"DEVOPS_VM_SEQ_ID": buildInfo.VmSeqId,
"DEVOPS_FILE_GATEWAY": config.GAgentConfig.FileGateway,
"DEVOPS_GATEWAY": config.GetGateWay(),
"BK_CI_LOCALE_LANGUAGE": config.GAgentConfig.Language,
"DEVOPS_AGENT_JDK_8_PATH": third_components.Jdk.Jdk8.GetJavaOrNull(),
"DEVOPS_AGENT_JDK_17_PATH": third_components.Jdk.Jdk17.GetJavaOrNull(),
}
// 创建临时目录并启动构建
tmpDir, _ := systemutil.MkBuildTmpDir()
doBuild(buildInfo, tmpDir, workDir, goEnv, runUser)
}
3.5 配置管理
文件: src/pkg/config/config.go
Agent 配置从 .agent.properties 文件加载:
// 配置键定义
const (
KeyProjectId = "devops.project.id"
KeyAgentId = "devops.agent.id"
KeySecretKey = "devops.agent.secret.key"
KeyDevopsGateway = "landun.gateway"
KeyDevopsFileGateway = "landun.fileGateway"
KeyTaskCount = "devops.parallel.task.count"
KeyEnvType = "landun.env"
KeySlaveUser = "devops.slave.user"
KeyDockerTaskCount = "devops.docker.parallel.task.count"
KeyLanguage = "devops.language"
// ...
)
// AgentConfig 配置结构
type AgentConfig struct {
Gateway string
FileGateway string
BuildType string
ProjectId string
AgentId string
SecretKey string
ParallelTaskCount int
DockerParallelTaskCount int
EnableDockerBuild bool
Language string
// ...
}
// AgentEnv 环境信息
type AgentEnv struct {
OsName string
agentIp string
HostName string
AgentVersion string
AgentInstallPath string
OsVersion string
CPUProductInfo string
GPUProductInfo string
}
3.6 API 客户端
文件: src/pkg/api/api.go
// 构建 URL
func buildUrl(url string) string {
return config.GetGateWay() + url
}
// Agent 启动上报
func AgentStartup() (*httputil.DevopsResult, error) {
url := buildUrl("/ms/environment/api/buildAgent/agent/thirdPartyAgent/startup")
startInfo := &ThirdPartyAgentStartInfo{
HostName: config.GAgentEnv.HostName,
HostIp: config.GAgentEnv.GetAgentIp(),
DetectOs: config.GAgentEnv.OsName,
MasterVersion: config.AgentVersion,
SlaveVersion: third_components.Worker.GetVersion(),
}
return httputil.NewHttpClient().Post(url).Body(startInfo, false).
SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(nil).IntoDevopsResult()
}
// 构建完成上报
func WorkerBuildFinish(buildInfo *ThirdPartyBuildWithStatus) (*httputil.DevopsResult, error) {
url := buildUrl("/ms/dispatch/api/buildAgent/agent/thirdPartyAgent/workerBuildFinish")
return httputil.NewHttpClient().Post(url).Body(buildInfo, false).
SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(nil).IntoDevopsResult()
}
// Ask 统一请求
func Ask(info *AskInfo) (*httputil.AgentResult, error) {
url := buildUrl("/ms/dispatch/api/buildAgent/agent/thirdPartyAgent/ask")
return httputil.NewHttpClient().Post(url).Body(info, bodyEq).
SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(askRequest.Resp).IntoAgentResult()
}
3.7 升级机制
文件: src/pkg/upgrade/upgrade.go
// AgentUpgrade 升级主逻辑
func AgentUpgrade(upgradeItem *api.UpgradeItem, hasBuild bool) {
upItems := &upgradeItems{
Agent: upgradeItem.Agent,
Worker: upgradeItem.Worker,
Jdk: upgradeItem.Jdk,
DockerInitFile: upgradeItem.DockerInitFile,
}
if upItems.NoChange() {
return
}
// 有构建任务时跳过升级
if hasBuild {
return
}
// 获取任务锁,确保无任务运行
if !job.BuildTotalManager.Lock.TryLock() {
return
}
defer job.BuildTotalManager.Lock.Unlock()
if job.CheckRunningJob() {
return
}
// 下载升级文件
downloadUpgradeFiles(upItems)
// 执行升级
DoUpgradeOperation(upItems)
}
3.8 数据采集
文件: src/pkg/collector/collector.go
使用 Telegraf 进行数据采集:
func Collect() {
if config.GAgentConfig.CollectorOn == false {
logs.Info("agent collector off")
return
}
for {
ctx, cancel := context.WithCancel(context.Background())
go func() {
// 监听 IP 变化事件
ipData := <-ipChan.DChan
cancel()
}()
doAgentCollect(ctx)
}
}
func doAgentCollect(ctx context.Context) {
// 生成 Telegraf 配置
configContent, _ := genTelegrafConfig()
// 初始化 Telegraf Agent
tAgent, _ := getTelegrafAgent(configContent.Bytes(), logFile)
// 运行采集
for {
tAgent.Run(ctx)
time.Sleep(telegrafRelaunchTime)
}
}
四、数据类型定义
4.1 构建信息
文件: src/pkg/api/type.go
// 构建任务类型
type BuildJobType string
const (
AllBuildType BuildJobType = "ALL"
DockerBuildType BuildJobType = "DOCKER"
BinaryBuildType BuildJobType = "BINARY"
NoneBuildType BuildJobType = "NONE"
)
// 第三方构建信息
type ThirdPartyBuildInfo struct {
ProjectId string `json:"projectId"`
BuildId string `json:"buildId"`
VmSeqId string `json:"vmSeqId"`
Workspace string `json:"workspace"`
PipelineId string `json:"pipelineId"`
DockerBuildInfo *ThirdPartyDockerBuildInfo `json:"dockerBuildInfo"`
ExecuteCount *int `json:"executeCount"`
ContainerHashId string `json:"containerHashId"`
}
// Docker 构建信息
type ThirdPartyDockerBuildInfo struct {
AgentId string `json:"agentId"`
SecretKey string `json:"secretKey"`
Image string `json:"image"`
Credential Credential `json:"credential"`
Options DockerOptions `json:"options"`
ImagePullPolicy string `json:"imagePullPolicy"`
}
4.2 心跳信息
// Agent 心跳信息
type AgentHeartbeatInfo struct {
MasterVersion string `json:"masterVersion"`
SlaveVersion string `json:"slaveVersion"`
HostName string `json:"hostName"`
AgentIp string `json:"agentIp"`
ParallelTaskCount int `json:"parallelTaskCount"`
AgentInstallPath string `json:"agentInstallPath"`
StartedUser string `json:"startedUser"`
TaskList []ThirdPartyTaskInfo `json:"taskList"`
DockerParallelTaskCount int `json:"dockerParallelTaskCount"`
DockerTaskList []ThirdPartyDockerTaskInfo `json:"dockerTaskList"`
}
// 心跳响应
type AgentHeartbeatResponse struct {
MasterVersion string `json:"masterVersion"`
SlaveVersion string `json:"slaveVersion"`
AgentStatus string `json:"agentStatus"`
ParallelTaskCount int `json:"parallelTaskCount"`
Envs map[string]string `json:"envs"`
Gateway string `json:"gateway"`
FileGateway string `json:"fileGateway"`
DockerParallelTaskCount int `json:"dockerParallelTaskCount"`
Language string `json:"language"`
}
五、跨平台支持
5.1 平台特定代码
Agent 通过 Go 的构建标签支持多平台:
src/pkg/config/
├── config.go # 通用配置
├── config_darwin.go # macOS 特定
├── config_linux.go # Linux 特定
└── config_win.go # Windows 特定
src/pkg/upgrader/
├── upgrader_darwin.go # macOS 升级器
├── upgrader_unix.go # Unix 升级器
└── upgrader_win.go # Windows 升级器
5.2 构建命令
# Linux
make clean build_linux
# macOS
make clean build_macos
# Windows
build_windows.bat
生成的二进制文件:
devopsDaemon_linux/devopsDaemon_macos/devopsDaemon.exedevopsAgent_linux/devopsAgent_macos/devopsAgent.exeupgrader_linux/upgrader_macos/upgrader.exe
六、与后端服务交互
6.1 API 端点
| 服务 | 端点 | 用途 |
|---|---|---|
| Environment | /ms/environment/api/buildAgent/agent/thirdPartyAgent/startup |
Agent 启动上报 |
| Dispatch | /ms/dispatch/api/buildAgent/agent/thirdPartyAgent/ask |
统一 Ask 请求 |
| Dispatch | /ms/dispatch/api/buildAgent/agent/thirdPartyAgent/workerBuildFinish |
构建完成上报 |
| Environment | /ms/environment/api/buildAgent/agent/thirdPartyAgent/agents/pipelines |
Pipeline 任务 |
| Environment | /ms/environment/api/buildAgent/agent/thirdPartyAgent/upgrade/files/download |
下载升级文件 |
6.2 认证头
func (a *AgentConfig) GetAuthHeaderMap() map[string]string {
return map[string]string{
"X-DEVOPS-BUILD-TYPE": a.BuildType,
"X-DEVOPS-PROJECT-ID": a.ProjectId,
"X-DEVOPS-AGENT-ID": a.AgentId,
"X-DEVOPS-AGENT-SECRET-KEY": a.SecretKey,
}
}
七、开发规范
7.1 错误处理
// 标准错误检查
if err != nil {
logs.WithError(err).Error("operation failed")
return errors.Wrap(err, "context message")
}
// Panic 恢复
defer func() {
if err := recover(); err != nil {
logs.Error("panic: ", err)
}
}()
7.2 日志规范
// 日志级别
logs.Debug("debug message")
logs.Info("info message")
logs.Infof("formatted: %s", value)
logs.Warn("warning message")
logs.Error("error message")
logs.WithError(err).Error("error with context")
7.3 并发模式
// 启动 goroutine
go collector.Collect()
go cron.CleanJob()
// 使用锁保护共享资源
BuildTotalManager.Lock.Lock()
defer BuildTotalManager.Lock.Unlock()
// 使用文件锁进行进程间同步
agentLock := flock.New(fmt.Sprintf("%s/agent.lock", runtimeDir))
locked, err := agentLock.TryLock()
7.4 新增功能开发
- 新增 API 调用:在
src/pkg/api/api.go添加函数 - 新增数据类型:在
src/pkg/api/type.go定义结构体 - 新增配置项:在
src/pkg/config/config.go添加常量和字段 - 新增后台任务:在
doAgentJob()中添加处理逻辑
八、控制脚本
# Linux 示例
scripts/linux/install.sh # 安装
scripts/linux/start.sh # 启动
scripts/linux/stop.sh # 停止
scripts/linux/uninstall.sh # 卸载
九、相关模块
| 模块 | 关系 | 说明 |
|---|---|---|
| Worker | 下游 | Agent 拉起 Worker 执行构建 |
| Environment | 上游 | Agent 状态管理、心跳上报 |
| Dispatch | 上游 | 构建任务分发 |
| Log | 下游 | 构建日志上报(通过 Worker) |