| name | 29-2-process-service-layer |
| description | Process 模块 Service 业务层详细分析,涵盖流水线服务、构建服务、模板服务、业务逻辑实现。当用户开发 Process 业务逻辑、实现流水线功能、处理构建流程或扩展服务能力时使用。 |
Process 模块 Service 业务层详细分析
模块路径:
src/backend/ci/core/process/biz-process/src/main/kotlin/com/tencent/devops/process/service/和src/backend/ci/core/process/biz-base/src/main/kotlin/com/tencent/devops/process/engine/service/
一、Service 层架构概述
Process 模块的 Service 层采用双层架构设计:
┌─────────────────────────────────────────────────────────────────────────┐
│ Facade Service 层 (biz-process) │
│ 职责:业务编排、权限校验、参数转换、调用 Engine Service │
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │PipelineInfoFacade│ │PipelineListFacade│ │PipelineBuildFacade│ │
│ │Service (80KB) │ │Service (97KB) │ │Service (137KB) │ │
│ └──────────────────┘ └──────────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Engine Service 层 (biz-base) │
│ 职责:核心业务逻辑、数据访问、事件分发 │
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │PipelineRepository│ │PipelineRuntime │ │PipelineContainer │ │
│ │Service (110KB) │ │Service (101KB) │ │Service (50KB) │ │
│ └──────────────────┘ └──────────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
二、定时触发系统
2.1 Quartz 定时调度架构
┌─────────────────────────────────────────────────────────────────────────┐
│ Quartz 调度器 │
│ ┌─────────────────────────────────────────────────────────────────────┐│
│ │ PipelineQuartzService ││
│ │ @Scheduled(initialDelay=20000, fixedDelay=3000000) ││
│ │ reloadTimer() ─► 加载所有定时任务到 Quartz ││
│ └─────────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ PipelineQuartzJob │
│ execute(context) ─► PipelineJobBean.execute() │
│ ├─► 获取定时器配置 (pipelineTimerService.get()) │
│ ├─► 检查项目路由标签 │
│ ├─► Redis 分布式锁防重复触发 │
│ └─► 发送 PipelineTimerBuildEvent │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ PipelineTimerBuildListener │
│ 消费 PipelineTimerBuildEvent │
│ ├─► 获取流水线模型 │
│ ├─► 解析启动参数 │
│ └─► 调用 pipelineBuildFacadeService.startPipeline() │
└─────────────────────────────────────────────────────────────────────────┘
2.2 核心类说明
PipelineQuartzService - 定时调度服务
@Service
class PipelineQuartzService @Autowired constructor(
private val pipelineTimerService: PipelineTimerService,
private val schedulerManager: SchedulerManager
) {
// 每 50 分钟重新加载定时任务
@Scheduled(initialDelay = 20000, fixedDelay = 3000000)
fun reloadTimer() {
// 遍历所有定时器,添加到 Quartz
pipelineTimerService.list(start, limit).forEach { timer ->
timer.crontabExpressions.forEach { crontab ->
addJob(projectId, pipelineId, crontab, taskId)
}
}
}
fun addJob(projectId: String, pipelineId: String, crontab: String, taskId: String) {
val md5 = DigestUtils.md5Hex(crontab)
val comboKey = "${pipelineId}_${md5}_${projectId}_$taskId"
schedulerManager.addJob(
key = comboKey,
cronExpression = crontab,
jobBeanClass = PipelineQuartzJob::class.java,
projectId = projectId,
pipelineId = pipelineId,
taskId = taskId,
md5 = md5
)
}
}
PipelineJobBean - Quartz Job 执行器
class PipelineJobBean(
private val pipelineEventDispatcher: PipelineEventDispatcher,
private val pipelineTimerService: PipelineTimerService,
private val redisOperation: RedisOperation
) {
fun execute(context: JobExecutionContext?) {
// 1. 获取定时器配置
val pipelineTimer = pipelineTimerService.get(projectId, pipelineId, taskId)
// 2. Redis 锁防止重复触发
val redisLock = PipelineTimerTriggerLock(redisOperation, pipelineLockKey, scheduledFireTime)
if (redisLock.tryLock()) {
// 3. 发送定时构建事件
pipelineEventDispatcher.dispatch(
PipelineTimerBuildEvent(
source = "timer_trigger",
projectId = pipelineTimer.projectId,
pipelineId = pipelineId,
userId = pipelineTimer.startUser,
channelCode = pipelineTimer.channelCode,
taskId = taskId,
startParam = pipelineTimer.startParam
)
)
}
}
}
三、拦截器链机制
3.1 拦截器架构
┌─────────────────────────────────────────────────────────────────────────┐
│ PipelineInterceptorChain │
│ ┌─────────────────────────────────────────────────────────────────────┐│
│ │ filter(InterceptData) ─► 依次执行拦截器 ││
│ │ 如果任一拦截器返回 isNotOk(),则中断并返回错误 ││
│ └─────────────────────────────────────────────────────────────────────┘│
│ │ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────────┐ │
│ │ RunLock │→ │ Queue │→ │ TimerTriggerScmChange │ │
│ │ Interceptor │ │ Interceptor │ │ Interceptor │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
3.2 拦截器接口
interface PipelineInterceptor {
fun execute(task: InterceptData): Response<BuildStatus>
}
3.3 拦截器链实现
class PipelineInterceptorChain(private val filters: List<PipelineInterceptor>) {
fun filter(any: InterceptData): Response<BuildStatus> {
var result = Response<BuildStatus>(OK)
run lit@{
filters.forEach {
result = it.execute(any)
if (result.isNotOk()) {
return@lit // 中断链
}
}
}
return result
}
}
3.4 拦截器配置
@Configuration
class PipelineInterceptorConfigure {
@Bean
fun pipelineInterceptorChain(
runLockInterceptor: RunLockInterceptor,
queueInterceptor: QueueInterceptor,
timerTriggerScmChangeInterceptor: TimerTriggerScmChangeInterceptor
): PipelineInterceptorChain {
return PipelineInterceptorChain(
listOf(
runLockInterceptor, // 1. 运行锁检查
queueInterceptor, // 2. 队列/并发控制
timerTriggerScmChangeInterceptor // 3. 定时触发源码变更检查
)
)
}
}
3.5 各拦截器职责
| 拦截器 | 职责 | 返回结果 |
|---|---|---|
RunLockInterceptor |
检查流水线是否被锁定 | LOCK 时拒绝,SINGLE/SINGLE_LOCK 时排队 |
QueueInterceptor |
并发控制、队列管理 | 队列满时拒绝,超出并发时取消旧构建 |
TimerTriggerScmChangeInterceptor |
定时触发时检查源码是否变更 | 无变更时跳过构建 |
3.6 RunLockInterceptor 详解
@Component
class RunLockInterceptor @Autowired constructor(
private val pipelineRuntimeService: PipelineRuntimeService
) : PipelineInterceptor {
override fun execute(task: InterceptData): Response<BuildStatus> {
val runLockType = task.runLockType
return when {
// 流水线被锁定,拒绝执行
runLockType == PipelineRunLockType.LOCK ->
Response(ERROR_PIPELINE_LOCK, "流水线已锁定")
// 串行模式,检查是否有正在运行的构建
runLockType == PipelineRunLockType.SINGLE -> {
val runningCount = pipelineRuntimeService.getRunningCount(projectId, pipelineId)
if (runningCount >= 1) {
Response(BuildStatus.QUEUE) // 排队
} else {
Response(BuildStatus.RUNNING) // 可执行
}
}
// 并发组模式
runLockType == PipelineRunLockType.GROUP_LOCK -> {
// 检查同一并发组是否有正在运行的构建
checkConcurrencyGroup(task)
}
else -> Response(BuildStatus.RUNNING)
}
}
}
3.7 QueueInterceptor 详解
@Component
class QueueInterceptor @Autowired constructor(
private val pipelineRuntimeService: PipelineRuntimeService,
private val pipelineEventDispatcher: PipelineEventDispatcher
) : PipelineInterceptor {
override fun execute(task: InterceptData): Response<BuildStatus> {
val buildSummary = pipelineRuntimeService.getBuildSummaryRecord(projectId, pipelineId)
return when {
// 队列已满
buildSummary.queueCount >= task.maxQueueSize -> {
if (task.cancelAllowed) {
// 取消最早排队的构建
outQueueCancelBySingle(projectId, pipelineId, task)
Response(BuildStatus.RUNNING)
} else {
Response(ERROR_PIPELINE_QUEUE_FULL, "队列已满")
}
}
// 运行数+排队数超过最大限制
(buildSummary.runningCount + buildSummary.queueCount) >= maxLimit ->
Response(ERROR_PIPELINE_QUEUE_FULL, "超过最大并发限制")
else -> Response(BuildStatus.RUNNING)
}
}
private fun outQueueCancelBySingle(projectId: String, pipelineId: String, task: InterceptData) {
// 取消最早排队的构建
val buildInfo = pipelineRuntimeExtService.popNextQueueBuildInfo(projectId, pipelineId)
if (buildInfo != null) {
pipelineEventDispatcher.dispatch(
PipelineBuildCancelEvent(
source = "QueueInterceptor",
projectId = projectId,
pipelineId = pipelineId,
buildId = buildInfo.buildId,
status = BuildStatus.CANCELED
)
)
}
}
}
四、Facade Service 层详解 (biz-process)
2.1 目录结构
biz-process/src/main/kotlin/com/tencent/devops/process/service/
├── PipelineInfoFacadeService.kt # 流水线信息管理门面 (80KB)
├── PipelineListFacadeService.kt # 流水线列表查询门面 (97KB)
├── PipelineVersionFacadeService.kt # 版本管理门面 (37KB)
├── ParamFacadeService.kt # 参数管理门面 (18KB)
├── builds/
│ ├── PipelineBuildFacadeService.kt # 构建管理门面 (137KB) - 最大
│ ├── PipelineBuildRetryService.kt # 重试服务 (25KB)
│ └── PipelinePauseBuildFacadeService.kt # 暂停服务 (11KB)
├── template/
│ ├── TemplateFacadeService.kt # 模板管理门面 (120KB)
│ └── TemplateSettingService.kt # 模板配置 (7KB)
├── view/
│ ├── PipelineViewService.kt # 视图服务 (34KB)
│ └── PipelineViewGroupService.kt # 视图分组 (44KB)
└── webhook/
└── PipelineBuildWebhookService.kt # Webhook 服务 (33KB)
2.2 PipelineInfoFacadeService - 流水线信息管理
文件: PipelineInfoFacadeService.kt (80KB)
职责: 流水线 CRUD 操作的业务编排层
@Service
class PipelineInfoFacadeService @Autowired constructor(
private val dslContext: DSLContext,
private val templateDao: TemplateDao,
private val pipelineSettingFacadeService: PipelineSettingFacadeService,
private val pipelineRepositoryService: PipelineRepositoryService, // Engine Service
private val pipelineGroupService: PipelineGroupService,
private val pipelineViewGroupService: PipelineViewGroupService,
private val pipelinePermissionService: PipelinePermissionService,
private val modelCheckPlugin: ModelCheckPlugin,
private val client: Client,
// ... 更多依赖
) {
// 核心方法
}
核心方法:
| 方法 | 功能 | 调用链 |
|---|---|---|
createPipeline() |
创建流水线 | 权限校验 → 模型校验 → PipelineRepositoryService.deployPipeline() |
editPipeline() |
编辑流水线 | 权限校验 → 版本检查 → PipelineRepositoryService.deployPipeline() |
copyPipeline() |
复制流水线 | 获取模型 → 修改名称 → createPipeline() |
deletePipeline() |
删除流水线 | 权限校验 → PipelineRepositoryService.deletePipeline() |
getPipeline() |
获取流水线 | PipelineRepositoryService.getModel() |
exportPipeline() |
导出流水线 | 获取模型 → JSON/YAML 转换 → 返回文件流 |
importPipeline() |
导入流水线 | 解析文件 → 模型校验 → createPipeline() |
创建流水线流程:
fun createPipeline(
userId: String,
projectId: String,
model: Model,
channelCode: ChannelCode,
checkPermission: Boolean = true
): String {
// 1. 权限校验
if (checkPermission) {
pipelinePermissionService.checkPipelinePermission(
userId = userId,
projectId = projectId,
permission = AuthPermission.CREATE
)
}
// 2. 项目流水线数量限制检查
val pipelineCount = pipelineInfoDao.countByProject(dslContext, projectId)
if (pipelineCount >= maxPipelinePerProject) {
throw ErrorCodeException(ERROR_MAX_PIPELINE_COUNT_PER_PROJECT)
}
// 3. 模型校验(插件存在性、参数合法性等)
modelCheckPlugin.checkModelIntegrity(model, projectId)
// 4. 调用 Engine Service 保存
val pipelineId = pipelineRepositoryService.deployPipeline(
model = model,
projectId = projectId,
signPipelineId = null, // 新建时为空
userId = userId,
channelCode = channelCode,
create = true
).pipelineId
// 5. 创建权限资源
pipelinePermissionService.createResource(
userId = userId,
projectId = projectId,
pipelineId = pipelineId,
pipelineName = model.name
)
// 6. 记录操作日志
operationLogService.addOperationLog(
userId = userId,
projectId = projectId,
pipelineId = pipelineId,
operationType = OperationLogType.CREATE
)
return pipelineId
}
2.3 PipelineBuildFacadeService - 构建管理
文件: builds/PipelineBuildFacadeService.kt (137KB) - 最大的 Service 文件
职责: 构建启动、停止、重试、查询等业务编排
@Service
class PipelineBuildFacadeService(
private val pipelineEventDispatcher: PipelineEventDispatcher,
private val pipelineInterceptorChain: PipelineInterceptorChain,
private val pipelineBuildService: PipelineBuildService,
private val pipelineRepositoryService: PipelineRepositoryService,
private val pipelineRuntimeService: PipelineRuntimeService,
private val buildVariableService: BuildVariableService,
private val pipelineTaskService: PipelineTaskService,
private val pipelineContainerService: PipelineContainerService,
private val pipelineStageService: PipelineStageService,
private val redisOperation: RedisOperation,
// ... 更多依赖
) {
companion object {
private val logger = LoggerFactory.getLogger(PipelineBuildFacadeService::class.java)
}
}
核心方法:
| 方法 | 功能 | 说明 |
|---|---|---|
buildManualStartup() |
手动启动构建 | 入口方法,处理参数后调用 startPipeline() |
startPipeline() |
启动流水线 | 核心启动逻辑,调用拦截器链 |
buildManualShutdown() |
手动停止构建 | 发送取消事件 |
buildManualReview() |
人工审核 | 处理审核通过/拒绝 |
retry() |
重试构建 | 支持整体重试或指定任务重试 |
getBuildDetail() |
获取构建详情 | 返回 ModelDetail |
getBuildStatus() |
获取构建状态 | 返回 BuildHistoryWithVars |
getHistoryBuild() |
获取构建历史 | 分页查询 |
getBuildVars() |
获取构建变量 | 返回变量键值对 |
启动构建流程:
fun startPipeline(
userId: String,
readyToBuildPipelineInfo: PipelineInfo,
startType: StartType,
startParams: MutableMap<String, String>,
channelCode: ChannelCode,
isMobile: Boolean,
model: Model,
frequencyLimit: Boolean = true
): BuildId {
val projectId = readyToBuildPipelineInfo.projectId
val pipelineId = readyToBuildPipelineInfo.pipelineId
// 1. 构建拦截器链校验(频率限制、并发控制等)
val interceptResult = pipelineInterceptorChain.filter(
InterceptData(
pipelineInfo = readyToBuildPipelineInfo,
model = model,
startType = startType,
frequencyLimit = frequencyLimit
)
)
if (interceptResult.isNotOk()) {
throw ErrorCodeException(errorCode = interceptResult.status.toString())
}
// 2. 调用 Runtime Service 创建构建记录
val buildId = pipelineRuntimeService.startBuild(
pipelineInfo = readyToBuildPipelineInfo,
fullModel = model,
startParams = startParams,
startType = startType,
buildNo = buildNo
)
// 3. 发送构建启动事件到 MQ
pipelineEventDispatcher.dispatch(
PipelineBuildStartEvent(
source = "startPipeline",
projectId = projectId,
pipelineId = pipelineId,
userId = userId,
buildId = buildId.id,
taskId = "",
status = BuildStatus.QUEUE,
actionType = ActionType.START
)
)
return buildId
}
2.4 PipelineListFacadeService - 列表查询
文件: PipelineListFacadeService.kt (97KB)
职责: 流水线列表查询、搜索、过滤
核心方法:
| 方法 | 功能 |
|---|---|
listPipelineInfo() |
分页查询流水线列表 |
searchByPipelineName() |
按名称搜索 |
listPipelinesByViewId() |
按视图查询 |
listDeletedPipeline() |
查询已删除流水线 |
getPipelineStatus() |
批量获取状态 |
2.5 TemplateFacadeService - 模板管理
文件: template/TemplateFacadeService.kt (120KB)
职责: 流水线模板的创建、实例化、更新
核心方法:
| 方法 | 功能 |
|---|---|
createTemplate() |
创建模板 |
updateTemplate() |
更新模板 |
deleteTemplate() |
删除模板 |
createTemplateInstances() |
批量实例化模板 |
updateTemplateInstances() |
批量更新实例 |
三、Engine Service 层详解 (biz-base)
3.1 目录结构
biz-base/src/main/kotlin/com/tencent/devops/process/engine/service/
├── PipelineRepositoryService.kt # 流水线存储服务 (110KB) - 核心
├── PipelineRuntimeService.kt # 运行时服务 (101KB) - 核心
├── PipelineContainerService.kt # 容器管理 (50KB)
├── PipelineStageService.kt # 阶段管理 (40KB)
├── PipelineTaskService.kt # 任务管理 (32KB)
├── PipelineBuildDetailService.kt # 构建详情 (16KB)
├── PipelineSettingService.kt # 配置服务 (6KB)
├── detail/
│ ├── BaseBuildDetailService.kt # 详情基类 (12KB)
│ ├── StageBuildDetailService.kt # 阶段详情 (11KB)
│ ├── ContainerBuildDetailService.kt # 容器详情 (12KB)
│ └── TaskBuildDetailService.kt # 任务详情 (25KB)
├── record/
│ ├── PipelineBuildRecordService.kt # 构建记录 (36KB)
│ ├── StageBuildRecordService.kt # 阶段记录 (21KB)
│ ├── ContainerBuildRecordService.kt # 容器记录 (22KB)
│ └── TaskBuildRecordService.kt # 任务记录 (30KB)
└── vmbuild/
└── EngineVMBuildService.kt # 构建机服务 (60KB)
3.2 PipelineRepositoryService - 流水线存储
文件: PipelineRepositoryService.kt (110KB) - 最核心的 Engine Service
职责: 流水线模型的存储、版本管理、部署
@Service
class PipelineRepositoryService constructor(
private val pipelineEventDispatcher: PipelineEventDispatcher,
private val modelContainerIdGenerator: ModelContainerIdGenerator,
private val pipelineIdGenerator: PipelineIdGenerator,
private val modelTaskIdGenerator: ModelTaskIdGenerator,
private val dslContext: DSLContext,
private val pipelineInfoDao: PipelineInfoDao,
private val pipelineResourceDao: PipelineResourceDao,
private val pipelineModelTaskDao: PipelineModelTaskDao,
private val pipelineSettingDao: PipelineSettingDao,
private val pipelineBuildSummaryDao: PipelineBuildSummaryDao,
// ... 更多 DAO
) {
companion object {
private const val MAX_LEN_FOR_NAME = 255
private val logger = LoggerFactory.getLogger(PipelineRepositoryService::class.java)
}
}
核心方法:
| 方法 | 功能 | 操作的表 |
|---|---|---|
deployPipeline() |
部署流水线(创建/更新) | T_PIPELINE_INFO, T_PIPELINE_RESOURCE, T_PIPELINE_MODEL_TASK |
getPipelineInfo() |
获取流水线信息 | T_PIPELINE_INFO |
getModel() |
获取流水线模型 | T_PIPELINE_RESOURCE |
deletePipeline() |
删除流水线 | 软删除,更新 DELETE 标记 |
restorePipeline() |
恢复流水线 | 恢复 DELETE 标记 |
listPipelineIdByProject() |
按项目列出流水线 | T_PIPELINE_INFO |
saveSetting() |
保存流水线配置 | T_PIPELINE_SETTING |
deployPipeline 核心逻辑:
fun deployPipeline(
model: Model,
projectId: String,
signPipelineId: String?,
userId: String,
channelCode: ChannelCode,
create: Boolean,
updateLastModifyUser: Boolean = true
): DeployPipelineResult {
// 1. 生成/获取 pipelineId
val pipelineId = signPipelineId ?: pipelineIdGenerator.getNextId()
// 2. 为模型中的元素生成 ID
model.stages.forEachIndexed { stageIndex, stage ->
stage.id = stage.id ?: modelContainerIdGenerator.getNextId()
stage.containers.forEachIndexed { containerIndex, container ->
container.id = container.id ?: modelContainerIdGenerator.getNextId()
container.elements.forEach { element ->
element.id = element.id ?: modelTaskIdGenerator.getNextId()
}
}
}
// 3. 数据库事务操作
val version = dslContext.transactionResult { configuration ->
val context = DSL.using(configuration)
if (create) {
// 创建流水线信息
pipelineInfoDao.create(
dslContext = context,
pipelineId = pipelineId,
projectId = projectId,
version = 1,
pipelineName = model.name,
userId = userId,
channelCode = channelCode
)
// 创建构建摘要记录
pipelineBuildSummaryDao.create(context, projectId, pipelineId)
} else {
// 更新版本号
pipelineInfoDao.update(
dslContext = context,
projectId = projectId,
pipelineId = pipelineId,
userId = userId,
version = newVersion
)
}
// 保存流水线模型(JSON)
pipelineResourceDao.create(
dslContext = context,
projectId = projectId,
pipelineId = pipelineId,
version = version,
model = JsonUtil.toJson(model)
)
// 保存模型任务索引(用于搜索)
saveModelTasks(context, model, projectId, pipelineId, version)
version
}
// 4. 发送流水线创建/更新事件
if (create) {
pipelineEventDispatcher.dispatch(
PipelineCreateEvent(projectId, pipelineId, userId)
)
} else {
pipelineEventDispatcher.dispatch(
PipelineUpdateEvent(projectId, pipelineId, userId)
)
}
return DeployPipelineResult(pipelineId, version)
}
3.3 PipelineRuntimeService - 运行时服务
文件: PipelineRuntimeService.kt (101KB) - 构建运行时核心
职责: 构建的创建、状态管理、查询
@Service
class PipelineRuntimeService @Autowired constructor(
private val pipelineEventDispatcher: PipelineEventDispatcher,
private val dslContext: DSLContext,
private val pipelineInfoDao: PipelineInfoDao,
private val pipelineBuildDao: PipelineBuildDao,
private val pipelineBuildSummaryDao: PipelineBuildSummaryDao,
private val pipelineStageService: PipelineStageService,
private val pipelineContainerService: PipelineContainerService,
private val pipelineTaskService: PipelineTaskService,
private val buildVariableService: BuildVariableService,
// ... 更多依赖
) {
companion object {
private val logger = LoggerFactory.getLogger(PipelineRuntimeService::class.java)
}
}
核心方法:
| 方法 | 功能 | 说明 |
|---|---|---|
startBuild() |
启动构建 | 创建构建记录、Stage/Container/Task 记录 |
getBuildInfo() |
获取构建信息 | 返回 BuildInfo |
updateBuildStatus() |
更新构建状态 | 状态机转换 |
getBuildHistoryByIds() |
批量获取历史 | 返回 List<BuildHistory> |
cancelBuild() |
取消构建 | 发送取消事件 |
finishBuild() |
结束构建 | 更新最终状态、发送通知 |
startBuild 核心逻辑:
fun startBuild(
pipelineInfo: PipelineInfo,
fullModel: Model,
startParams: Map<String, String>,
startType: StartType,
buildNo: Int?
): BuildId {
val projectId = pipelineInfo.projectId
val pipelineId = pipelineInfo.pipelineId
// 1. 生成构建 ID
val buildId = "b-${UUIDUtil.generate()}"
// 2. 计算构建号
val buildNum = pipelineBuildSummaryDao.updateBuildNum(
dslContext, projectId, pipelineId
)
// 3. 创建构建记录
pipelineBuildDao.create(
dslContext = dslContext,
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
version = pipelineInfo.version,
buildNum = buildNum,
trigger = startType.name,
status = BuildStatus.QUEUE,
startUser = startParams[PIPELINE_START_USER_ID] ?: "",
triggerUser = startParams[PIPELINE_START_USER_NAME] ?: ""
)
// 4. 创建 Stage 记录
fullModel.stages.forEachIndexed { index, stage ->
pipelineStageService.createStage(
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
stage = stage,
seq = index
)
// 5. 创建 Container 记录
stage.containers.forEachIndexed { containerIndex, container ->
pipelineContainerService.createContainer(
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
stageId = stage.id!!,
container = container,
seq = containerIndex
)
// 6. 创建 Task 记录
container.elements.forEachIndexed { taskIndex, element ->
pipelineTaskService.createTask(
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
stageId = stage.id!!,
containerId = container.id!!,
element = element,
seq = taskIndex
)
}
}
}
// 7. 保存构建变量
buildVariableService.batchSetVariable(
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
variables = startParams
)
return BuildId(buildId)
}
3.4 PipelineContainerService - 容器管理
文件: PipelineContainerService.kt (50KB)
职责: Job(容器)的创建、状态管理
核心方法:
| 方法 | 功能 |
|---|---|
createContainer() |
创建容器记录 |
getContainer() |
获取容器信息 |
updateContainerStatus() |
更新容器状态 |
listContainers() |
列出阶段下的容器 |
listContainerBuildTasks() |
列出容器下的任务 |
3.5 PipelineStageService - 阶段管理
文件: PipelineStageService.kt (40KB)
职责: Stage 的创建、状态管理
核心方法:
| 方法 | 功能 |
|---|---|
createStage() |
创建阶段记录 |
getStage() |
获取阶段信息 |
updateStageStatus() |
更新阶段状态 |
listStages() |
列出构建的所有阶段 |
getPrevStage() |
获取前一个阶段 |
getNextStage() |
获取下一个阶段 |
3.6 PipelineTaskService - 任务管理
文件: PipelineTaskService.kt (32KB)
职责: Task(插件)的创建、状态管理
核心方法:
| 方法 | 功能 |
|---|---|
createTask() |
创建任务记录 |
getBuildTask() |
获取任务信息 |
updateTaskStatus() |
更新任务状态 |
listContainerBuildTasks() |
列出容器下的任务 |
getNextPendingTask() |
获取下一个待执行任务 |
3.7 EngineVMBuildService - 构建机服务
文件: vmbuild/EngineVMBuildService.kt (60KB)
职责: 与构建机(Agent)的交互
核心方法:
| 方法 | 功能 |
|---|---|
setStartUpVMStatus() |
设置构建机启动状态 |
claimBuildTask() |
领取待执行任务 |
completeClaimBuildTask() |
完成任务回调 |
heartbeat() |
心跳上报 |
endBuild() |
结束构建 |
四、Service 层调用关系
┌─────────────────────────────────────────────────────────────────────────┐
│ ResourceImpl (API 实现) │
│ UserPipelineResourceImpl | UserBuildResourceImpl | ... │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Facade Service │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ PipelineInfoFacadeService │ │
│ │ ├─► pipelinePermissionService.checkPermission() │ │
│ │ ├─► modelCheckPlugin.checkModelIntegrity() │ │
│ │ └─► pipelineRepositoryService.deployPipeline() │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ PipelineBuildFacadeService │ │
│ │ ├─► pipelineInterceptorChain.filter() │ │
│ │ ├─► pipelineRuntimeService.startBuild() │ │
│ │ └─► pipelineEventDispatcher.dispatch(StartEvent) │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Engine Service │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ PipelineRepositoryService │ │
│ │ ├─► pipelineInfoDao.create/update() │ │
│ │ ├─► pipelineResourceDao.create() │ │
│ │ └─► pipelineEventDispatcher.dispatch(CreateEvent) │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ PipelineRuntimeService │ │
│ │ ├─► pipelineBuildDao.create() │ │
│ │ ├─► pipelineStageService.createStage() │ │
│ │ ├─► pipelineContainerService.createContainer() │ │
│ │ └─► pipelineTaskService.createTask() │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ DAO 层 │
│ PipelineInfoDao | PipelineBuildDao | PipelineResourceDao | ... │
└─────────────────────────────────────────────────────────────────────────┘
五、开发规范
5.1 Facade Service 规范
@Service
class XxxFacadeService @Autowired constructor(
private val permissionService: PipelinePermissionService, // 权限服务
private val engineService: XxxEngineService, // Engine Service
private val client: Client // 微服务客户端
) {
// 1. 权限校验
// 2. 参数校验/转换
// 3. 调用 Engine Service
// 4. 记录操作日志
}
5.2 Engine Service 规范
@Service
class XxxEngineService @Autowired constructor(
private val dslContext: DSLContext,
private val xxxDao: XxxDao,
private val pipelineEventDispatcher: PipelineEventDispatcher
) {
// 1. 数据库事务操作
// 2. 事件分发
// 3. 不做权限校验(由 Facade 层处理)
}
5.3 新增 Service 检查清单
- Facade Service 放在
biz-process/service/ - Engine Service 放在
biz-base/engine/service/ - Facade Service 负责权限校验
- Engine Service 负责数据访问和事件分发
- 使用构造器注入依赖
- 添加
@Service注解
版本: 1.1.0 | 更新日期: 2025-12-10 | 补充: 添加定时触发系统和拦截器链机制