前言

最近在做一个物联网设备配置化开发管理的平台,遇到这样一个场景:要能根据数据库中的配置文件,组建出不同的物联网设备任务执行队列,通过mqtt协议发布任务对应topic控制每个任务的异步执行,错误重试,还需要有和框架共用的日志记录等操作。因为需求比较特殊,找了好多现成的轮子感觉都不太顺手,因此打算DIY一个简单的基于事件驱动的分布式任务队列。

考虑到的问题

  • mqtt比http更极端的是,它不太方便让开发者知道一个topic是否真的被收到。(即使可以通过调整Qos确保订阅者一定能收到消息,但是无法从一次请求中得出任务执行的状态和结果。mqtt over websocket? 长连接的Qos更难保证,而且物联网设备使用场景复杂,很难说保证稳定的链接)
  • 任务队列和整个应用是需要异步隔离的,也就是说,不能因为在执行一个任务队列,而阻塞掉Node的一个worker进程。如果不把任务队列单独抽离出来,就会导致调用执行任务的进程一直阻塞。虽然把任务队列全丢给agent进程可以解决worker阻塞的问题,但是由于agent只有单个进程,无法实现较高的并发,同时也会导致agent/master不稳定。
  • 任务调用的过程都是基于TCP异步请求的,而无状态的连接让我们无法及时得知远端设备运行状况,不方便状态跟踪,日志记录和调试。
  • 任务队列可能会同时存在多个,并且有基于配置的动态生成能力,多个任务中,可能用到了相同的设备。因此需要锁的机制来确保设备状态的隔离性。
  • 任务可能出现失败,需要设计重试机制。(这个队列并不是一个事务,回滚暂时不考虑,因为物联网设备在执行任务的时候大多都会带来物理上的改变。如果想记录物理状态并原子化,开发成本会急剧上升)

架构设计

先放个整体图
mqtt task queue

  1. Redis
    • 使用两个Redis数据库,分别记录任务队列和设备状态:
    clients: {
      job: { // 任务队列,key: string value: Array<Job>
        port: 6379,
        host: '192.168.137.30',
        password: '123456',
        db: 1,
      },
      device: { // 设备状态,key: string value: DeviceStatus
        port: 6379,
        host: '192.168.137.30',
        password: '123456',
        db: 2,
      },
    },
    
    • 任务队列:用任务名称作为key,value为任务描述的数组。一个任务描述,在这里就是一个对象,包含了mqtt的topic和message。通过发布mqtt包的方式,调用远端设备执行任务,任务参数通过message传入。
    • 设备状态:设备名作为key,状态为一个对象。目前只考虑记录online、locked两个状态,均为Boolean,分别代表设备是否在线,是否上锁。

  1. Node
    • 设备状态:因为在应用启动的时候我就把设备状态加载进redis了,所以说理论上只用提供查询和修改两个方法即可。不过存储的东西除了online和locked之外,还额外存了一个pendingQueue的数组,具体作用后面会说到。
      export default class DeviceService extends Service {
        async index(connectionName: string) {
          return ((await this.ctx.service.tool.redis.get('device', connectionName)) as IDeviceStatus) || {}
        }
    
        async update(connectionName: string, online?: boolean, locked?: boolean, pendingQueue?: string[]) {
          const statusWillUpdate = await this.index(connectionName)
          if (typeof online === 'boolean') statusWillUpdate.online = online
          if (typeof locked === 'boolean') statusWillUpdate.locked = locked
          if (pendingQueue) statusWillUpdate.pendingQueue = pendingQueue
          return await this.ctx.service.tool.redis.set('device', connectionName, statusWillUpdate)
        }
      }
    
    • 任务编排、触发器:任务编排与一般的CRUD业务没有区别,单纯的把相关信息存入数据库就ok。触发器则是在某些条件下(如定时任务,收到某个http/mqtt请求),将一组或多组任务放入任务队列中,同时尝试执行队首任务。
    • 任务队列:封装一些对任务队列维护的方法(CRUD)。创建任务队列时,以UUID为key,存入一个序列化后的队列对象。包括:当前任务的指针,当前队列状态(pending,running,completed)
      • 设计当前任务指针的目的是记录该队列执行的位置,因为多个队列在执行的时候可能会出现设备占用冲突的情况。
      • 如果某个队列中,将要被执行的任务发现设备被锁住,那么指针位置就保持不变,但是队列的状态变为pending,直到设备被释放,类似于进程中断,挂起等待资源释放。(这里可能会引申出一个新的问题,如果当多个队列同时等待一个设备,就需要分配执行顺序,后面dispatcher会介绍)
    export default class QueueService extends Service {
      async index(uuid: string) {
        const queue = JSON.parse((await this.ctx.service.tool.redis.get('device', uuid)) || '{}') as ITaskQueue
        return queue
      }
    
      async create(jobs: IJobWork[]) {
        const queueItem = {} as ITaskQueue
        queueItem.status = 'pending'
        queueItem.jobPointer = 0
        queueItem.jobs = jobs
    
        return await this.ctx.service.tool.redis.set('job', uuidv4(), JSON.stringify(queueItem))
      }
    
      async update(uuid: string, status?: 'pending' | 'running' | 'completed', jobPointer?: number) {
        const itemWillUpdate = await this.index(uuid)
        if (status) itemWillUpdate.status = status
        if (jobPointer) itemWillUpdate.jobPointer = jobPointer
    
        return await this.ctx.service.tool.redis.set('job', uuid, JSON.stringify(itemWillUpdate))
      }
    
      async destroy(uuid: string) {
        return await this.ctx.service.tool.redis.delete('job', uuid)
      }
    }
    
    
    • 执行模块:分两个子模块,一个监听器,一组对外的api。
      • 监听器:当收到一条任务完成消息的时候,去检查完成任务的设备,如果该设备的pendingQueue不为空,那么除了执行当前任务队列的下一个任务之外,还要执行在pendingQueue中队首所对应的任务队列。否则就直接执行当前队列的下一个
      export default class MqttController extends Controller {
        public async index() {
          const ctx = this.ctx as IPluginContext
          const { topic, message } = ctx.req
          this.service.tool.redis.setArr('mqtt', topic, message, 86400)
      
          const apiDoneReg = /.*(?<=done)$/
          let currentQueueIsDone = false
          if (apiDoneReg.test(topic)) {
            // 收到任务执行完成的信息
            const { uuid } = message
            const queue = await ctx.service.taskQueue.queue.index(uuid)
            const { jobs, jobPointer } = queue
            const device = await ctx.service.taskQueue.device.index(jobs[jobPointer].connectionName)
      
            // 先解除锁,pop监视器栈
            Promise.all([
              ctx.service.taskQueue.device.update(jobs[jobPointer].connectionName, undefined, false),
              ctx.service.taskQueue.monitor.pop(uuid),
            ])
      
            if (!ctx.req.message.success) {
              // 执行任务未成功
              ctx
                .getLogger('taskLogger')
                .error(`[Job Execution Error ${message.uuid} ] device: ${message.device} error: ${message.error}`)
              // todo: send error message
            }
            if (ctx.service.mqtt.job.isFinished(queue)) {
              // 队列已完成
              currentQueueIsDone = true
              await ctx.service.taskQueue.queue.update(uuid, 'completed', undefined)
            }
      
            // 当前设备状态
            if (device.pendingQueue.length === 0) {
              // queue is empty
              // dispatch next directly
              if (!currentQueueIsDone) {
                await ctx.service.taskQueue.queue.update(uuid, undefined, jobPointer + 1)
                await ctx.service.taskQueue.dispatcher.dispatch(uuid)
              }
            } else {
              // queue is not empty
              // dispatch pendingQueue first
              await ctx.service.taskQueue.dispatcher.dispatchPending(jobs[jobPointer].connectionName)
              if (!currentQueueIsDone) {
                await ctx.service.taskQueue.queue.update(uuid, undefined, jobPointer + 1)
                await ctx.service.taskQueue.dispatcher.dispatch(uuid)
              }
            }
          }
        }
      }
      
      
      • 普通的执行模块:用于发布执行队列中任务的指令,传入一个队列的uuid,尝试执行该队列中指针指向的任务。在执行之前要判断设备状态,如果设备被锁住则不会立即执行,而是将这个队列push到设备状态的pendingQueue数组中。
      /**
       * 尝试执行任务队列中指针指向的任务
       * @param uuid 任务队列id
       */
      async dispatch(uuid: string) {
        const queue = await this.ctx.service.taskQueue.queue.index(uuid)
        if (JSON.stringify(queue) === '{}') {
          // 找不到执行队列
          this.ctx.app.getLogger('taskLogger').error(`[Job Execution Error ${uuid} ] Job Queue Not Found: ${queue}`)
          return
        }
      
        const { jobPointer, status } = queue
        if (status === 'completed') {
          // 当前队列已完成
          this.ctx.app.getLogger('taskLogger').error(`[Job Execution Error ${uuid} ] Job Queue Is Not Pending: ${status}`)
          return
        }
      
        const jobWillBeDispatched = queue.jobs[jobPointer]
        const deviceStatus = await this.ctx.service.taskQueue.device.index(jobWillBeDispatched.connectionName)
        if (!deviceStatus.online) {
          // 执行当前job的设备不在线
          this.ctx.app
            .getLogger('taskLogger')
            .error(`[Job Execution Error ${uuid} ] Device Is Offline: ${jobWillBeDispatched.connectionName}`)
          return
        }
      
        if (deviceStatus.locked) {
          // 执行当前job的设备被锁住
          deviceStatus.pendingQueue.push(uuid)
          Promise.all([
            this.ctx.service.taskQueue.queue.update(uuid, 'pending'),
            this.ctx.service.taskQueue.device.update(
              jobWillBeDispatched.connectionName,
              undefined,
              undefined,
              deviceStatus.pendingQueue
            ),
          ]).then(() => {
            this.ctx.app
              .getLogger('taskLogger')
              .info(
                `[Job Execution Info ${uuid} ] Job ${
                  jobWillBeDispatched.name || 'name not found'
                } Is Pending Because Device Is Locked`
              )
          })
          return
        }
      
        // 没任何问题,直接执行当前job
        const job = queue.jobs[jobPointer]
        await this.ctx.service.taskQueue.queue.update(uuid, 'running')
        await this.ctx.service.taskQueue.device.update(jobWillBeDispatched.connectionName, undefined, true)
        this.ctx.app.mqttClient.publish(job.apiTopic, JSON.stringify({ ...job.args, uuid }), async () => {
          this.ctx.app
            .getLogger('taskLogger')
            .info(`[Job Execution Info ${uuid} ] Job ${jobWillBeDispatched.name || 'name not found'} Is Running`)
          await this.ctx.service.taskQueue.monitor.push({
            uuid,
            connectionName: job.connectionName,
            apiTopic: job.apiTopic,
            args: job.args,
          })
        })
      }
      
      • 专门执行pendingQueue中任务的模块:其实是直接调用了上一个模块,不过附加了额外操作。查找执行任务,更新pendingQueue等
      async dispatchPending(connectionName: string) {
        // 通过设备名称找到队列并执行,同时更新pendingQueue
        const device = await this.ctx.service.taskQueue.device.index(connectionName)
        const { pendingQueue, locked, online } = device
        const jobWillBeDispatched = pendingQueue[0]
        const newQueue = pendingQueue.slice(1, pendingQueue.length)
        await this.dispatch(jobWillBeDispatched)
        await this.ctx.service.taskQueue.device.update(connectionName, undefined, false, newQueue)
      }
      
      • 对外的api:通过http请求方式,组装任务并保存任务队列,调起上文中的普通执行模块。
      export default class DispatcherController extends Controller {
        async index() {
          const { ctx } = this
          const { jobId } = ctx.request.body
          const body = {} as IBody
          const jobQueue = await ctx.service.mqtt.job.assembleJob(jobId) // 组装任务
          const queueId = await ctx.service.taskQueue.queue.create(jobQueue) // 生成队列
          await ctx.service.taskQueue.dispatcher.dispatch(queueId) // 执行队列
      
          body.code = 200
          body.error = 0
          body.msg = 'job started successfully'
          body.data = {}
          ctx.body = body
        }
      }
      

后续(测试完再写一篇)

  • 监视器:定时检查任务执行情况,防止失败的任务阻塞队列。
  • 重试机制具体实现
  • 队列执行日志持久化

What is broken can be reforged.