前言
最近在做一个物联网设备配置化开发管理的平台,遇到这样一个场景:要能根据数据库中的配置文件,组建出不同的物联网设备任务执行队列,通过mqtt协议发布任务对应topic控制每个任务的异步执行,错误重试,还需要有和框架共用的日志记录等操作。因为需求比较特殊,找了好多现成的轮子感觉都不太顺手,因此打算DIY一个简单的基于事件驱动的分布式任务队列。
考虑到的问题
- mqtt比http更极端的是,它不太方便让开发者知道一个topic是否真的被收到。(即使可以通过调整Qos确保订阅者一定能收到消息,但是无法从一次请求中得出任务执行的状态和结果。mqtt over websocket? 长连接的Qos更难保证,而且物联网设备使用场景复杂,很难说保证稳定的链接)
- 任务队列和整个应用是需要异步隔离的,也就是说,不能因为在执行一个任务队列,而阻塞掉Node的一个worker进程。如果不把任务队列单独抽离出来,就会导致调用执行任务的进程一直阻塞。虽然把任务队列全丢给agent进程可以解决worker阻塞的问题,但是由于agent只有单个进程,无法实现较高的并发,同时也会导致agent/master不稳定。
- 任务调用的过程都是基于TCP异步请求的,而无状态的连接让我们无法及时得知远端设备运行状况,不方便状态跟踪,日志记录和调试。
- 任务队列可能会同时存在多个,并且有基于配置的动态生成能力,多个任务中,可能用到了相同的设备。因此需要锁的机制来确保设备状态的隔离性。
- 任务可能出现失败,需要设计重试机制。(这个队列并不是一个事务,回滚暂时不考虑,因为物联网设备在执行任务的时候大多都会带来物理上的改变。如果想记录物理状态并原子化,开发成本会急剧上升)
架构设计
先放个整体图
- Redis
- 使用两个Redis数据库,分别记录任务队列和设备状态:
clients: { job: { // 任务队列,key: string value: Array<Job> port: 6379, host: '192.168.137.30', password: '123456', db: 1, }, device: { // 设备状态,key: string value: DeviceStatus port: 6379, host: '192.168.137.30', password: '123456', db: 2, }, },
- 任务队列:用任务名称作为key,value为任务描述的数组。一个任务描述,在这里就是一个对象,包含了mqtt的topic和message。通过发布mqtt包的方式,调用远端设备执行任务,任务参数通过message传入。
- 设备状态:设备名作为key,状态为一个对象。目前只考虑记录online、locked两个状态,均为Boolean,分别代表设备是否在线,是否上锁。
- Node
- 设备状态:因为在应用启动的时候我就把设备状态加载进redis了,所以说理论上只用提供查询和修改两个方法即可。不过存储的东西除了online和locked之外,还额外存了一个pendingQueue的数组,具体作用后面会说到。
export default class DeviceService extends Service { async index(connectionName: string) { return ((await this.ctx.service.tool.redis.get('device', connectionName)) as IDeviceStatus) || {} } async update(connectionName: string, online?: boolean, locked?: boolean, pendingQueue?: string[]) { const statusWillUpdate = await this.index(connectionName) if (typeof online === 'boolean') statusWillUpdate.online = online if (typeof locked === 'boolean') statusWillUpdate.locked = locked if (pendingQueue) statusWillUpdate.pendingQueue = pendingQueue return await this.ctx.service.tool.redis.set('device', connectionName, statusWillUpdate) } }
- 任务编排、触发器:任务编排与一般的CRUD业务没有区别,单纯的把相关信息存入数据库就ok。触发器则是在某些条件下(如定时任务,收到某个http/mqtt请求),将一组或多组任务放入任务队列中,同时尝试执行队首任务。
- 任务队列:封装一些对任务队列维护的方法(CRUD)。创建任务队列时,以UUID为key,存入一个序列化后的队列对象。包括:当前任务的指针,当前队列状态(pending,running,completed)
- 设计当前任务指针的目的是记录该队列执行的位置,因为多个队列在执行的时候可能会出现设备占用冲突的情况。
- 如果某个队列中,将要被执行的任务发现设备被锁住,那么指针位置就保持不变,但是队列的状态变为pending,直到设备被释放,类似于进程中断,挂起等待资源释放。(这里可能会引申出一个新的问题,如果当多个队列同时等待一个设备,就需要分配执行顺序,后面dispatcher会介绍)
export default class QueueService extends Service { async index(uuid: string) { const queue = JSON.parse((await this.ctx.service.tool.redis.get('device', uuid)) || '{}') as ITaskQueue return queue } async create(jobs: IJobWork[]) { const queueItem = {} as ITaskQueue queueItem.status = 'pending' queueItem.jobPointer = 0 queueItem.jobs = jobs return await this.ctx.service.tool.redis.set('job', uuidv4(), JSON.stringify(queueItem)) } async update(uuid: string, status?: 'pending' | 'running' | 'completed', jobPointer?: number) { const itemWillUpdate = await this.index(uuid) if (status) itemWillUpdate.status = status if (jobPointer) itemWillUpdate.jobPointer = jobPointer return await this.ctx.service.tool.redis.set('job', uuid, JSON.stringify(itemWillUpdate)) } async destroy(uuid: string) { return await this.ctx.service.tool.redis.delete('job', uuid) } }
- 执行模块:分两个子模块,一个监听器,一组对外的api。
- 监听器:当收到一条任务完成消息的时候,去检查完成任务的设备,如果该设备的pendingQueue不为空,那么除了执行当前任务队列的下一个任务之外,还要执行在pendingQueue中队首所对应的任务队列。否则就直接执行当前队列的下一个
export default class MqttController extends Controller { public async index() { const ctx = this.ctx as IPluginContext const { topic, message } = ctx.req this.service.tool.redis.setArr('mqtt', topic, message, 86400) const apiDoneReg = /.*(?<=done)$/ let currentQueueIsDone = false if (apiDoneReg.test(topic)) { // 收到任务执行完成的信息 const { uuid } = message const queue = await ctx.service.taskQueue.queue.index(uuid) const { jobs, jobPointer } = queue const device = await ctx.service.taskQueue.device.index(jobs[jobPointer].connectionName) // 先解除锁,pop监视器栈 Promise.all([ ctx.service.taskQueue.device.update(jobs[jobPointer].connectionName, undefined, false), ctx.service.taskQueue.monitor.pop(uuid), ]) if (!ctx.req.message.success) { // 执行任务未成功 ctx .getLogger('taskLogger') .error(`[Job Execution Error ${message.uuid} ] device: ${message.device} error: ${message.error}`) // todo: send error message } if (ctx.service.mqtt.job.isFinished(queue)) { // 队列已完成 currentQueueIsDone = true await ctx.service.taskQueue.queue.update(uuid, 'completed', undefined) } // 当前设备状态 if (device.pendingQueue.length === 0) { // queue is empty // dispatch next directly if (!currentQueueIsDone) { await ctx.service.taskQueue.queue.update(uuid, undefined, jobPointer + 1) await ctx.service.taskQueue.dispatcher.dispatch(uuid) } } else { // queue is not empty // dispatch pendingQueue first await ctx.service.taskQueue.dispatcher.dispatchPending(jobs[jobPointer].connectionName) if (!currentQueueIsDone) { await ctx.service.taskQueue.queue.update(uuid, undefined, jobPointer + 1) await ctx.service.taskQueue.dispatcher.dispatch(uuid) } } } } }
- 普通的执行模块:用于发布执行队列中任务的指令,传入一个队列的uuid,尝试执行该队列中指针指向的任务。在执行之前要判断设备状态,如果设备被锁住则不会立即执行,而是将这个队列push到设备状态的pendingQueue数组中。
/** * 尝试执行任务队列中指针指向的任务 * @param uuid 任务队列id */ async dispatch(uuid: string) { const queue = await this.ctx.service.taskQueue.queue.index(uuid) if (JSON.stringify(queue) === '{}') { // 找不到执行队列 this.ctx.app.getLogger('taskLogger').error(`[Job Execution Error ${uuid} ] Job Queue Not Found: ${queue}`) return } const { jobPointer, status } = queue if (status === 'completed') { // 当前队列已完成 this.ctx.app.getLogger('taskLogger').error(`[Job Execution Error ${uuid} ] Job Queue Is Not Pending: ${status}`) return } const jobWillBeDispatched = queue.jobs[jobPointer] const deviceStatus = await this.ctx.service.taskQueue.device.index(jobWillBeDispatched.connectionName) if (!deviceStatus.online) { // 执行当前job的设备不在线 this.ctx.app .getLogger('taskLogger') .error(`[Job Execution Error ${uuid} ] Device Is Offline: ${jobWillBeDispatched.connectionName}`) return } if (deviceStatus.locked) { // 执行当前job的设备被锁住 deviceStatus.pendingQueue.push(uuid) Promise.all([ this.ctx.service.taskQueue.queue.update(uuid, 'pending'), this.ctx.service.taskQueue.device.update( jobWillBeDispatched.connectionName, undefined, undefined, deviceStatus.pendingQueue ), ]).then(() => { this.ctx.app .getLogger('taskLogger') .info( `[Job Execution Info ${uuid} ] Job ${ jobWillBeDispatched.name || 'name not found' } Is Pending Because Device Is Locked` ) }) return } // 没任何问题,直接执行当前job const job = queue.jobs[jobPointer] await this.ctx.service.taskQueue.queue.update(uuid, 'running') await this.ctx.service.taskQueue.device.update(jobWillBeDispatched.connectionName, undefined, true) this.ctx.app.mqttClient.publish(job.apiTopic, JSON.stringify({ ...job.args, uuid }), async () => { this.ctx.app .getLogger('taskLogger') .info(`[Job Execution Info ${uuid} ] Job ${jobWillBeDispatched.name || 'name not found'} Is Running`) await this.ctx.service.taskQueue.monitor.push({ uuid, connectionName: job.connectionName, apiTopic: job.apiTopic, args: job.args, }) }) }
- 专门执行pendingQueue中任务的模块:其实是直接调用了上一个模块,不过附加了额外操作。查找执行任务,更新pendingQueue等
async dispatchPending(connectionName: string) { // 通过设备名称找到队列并执行,同时更新pendingQueue const device = await this.ctx.service.taskQueue.device.index(connectionName) const { pendingQueue, locked, online } = device const jobWillBeDispatched = pendingQueue[0] const newQueue = pendingQueue.slice(1, pendingQueue.length) await this.dispatch(jobWillBeDispatched) await this.ctx.service.taskQueue.device.update(connectionName, undefined, false, newQueue) }
- 对外的api:通过http请求方式,组装任务并保存任务队列,调起上文中的普通执行模块。
export default class DispatcherController extends Controller { async index() { const { ctx } = this const { jobId } = ctx.request.body const body = {} as IBody const jobQueue = await ctx.service.mqtt.job.assembleJob(jobId) // 组装任务 const queueId = await ctx.service.taskQueue.queue.create(jobQueue) // 生成队列 await ctx.service.taskQueue.dispatcher.dispatch(queueId) // 执行队列 body.code = 200 body.error = 0 body.msg = 'job started successfully' body.data = {} ctx.body = body } }
后续(测试完再写一篇)
- 监视器:定时检查任务执行情况,防止失败的任务阻塞队列。
- 重试机制具体实现
- 队列执行日志持久化
Comments | 0 条评论