skynet每次创建定时器都会产生一个协程, 在服务有打量高频定时器的情况下会产生很多消息, 实际上可以每个服务都只使用一个定时器队列, 同时只存在一个, 因为更久的定时器暂时是否运行都无所谓
local skynet = require "skynet"
local minheap = require "minheap"
local timer_queue = {}
--- 创建新的定时器队列
--- handler(event) 在事件到期时调用,由调用方提供业务逻辑
function timer_queue.new(handler)
if not handler then
error("handler is required")
end
local self = {
_heap = minheap.new(),
_store = {},
_counter = 0,
_handler = handler,
_co = nil,
}
setmetatable(self, { __index = timer_queue })
return self
end
--- 推送事件,at_time 为 skynet.now() 时间戳,返回 token 用于取消
--- 自动唤醒事件循环,调用方无需额外调用 wakeup
function timer_queue:push(event, at_time)
self._counter = self._counter + 1
local seq = self._counter
self._store[seq] = event
self._heap:push(seq, at_time)
self:_wakeup()
return seq
end
--- 取消单个事件(O(log n))
function timer_queue:cancel(seq)
self._heap:remove(seq)
self._store[seq] = nil
end
--- 取消多个事件
function timer_queue:cancel_all(seqs)
for _, seq in ipairs(seqs) do
self:cancel(seq)
end
end
--- 启动事件循环,需通过 skynet.fork 调用,只能启动一次
function timer_queue:start()
if self._co then
error("timer_queue:start() already called")
end
self._co = coroutine.running()
self:_loop()
end
----------------------------------------------------------------------
-- 内部方法
----------------------------------------------------------------------
--- 事件循环
function timer_queue:_loop()
while true do
self:_dispatch()
self:_wait_next()
end
end
--- 分发所有到期事件(xpcall 保护,单个事件异常不影响后续事件)
function timer_queue:_dispatch()
local now = skynet.now()
while not self._heap:empty() do
local top = self._heap:peek()
if top.priority > now then
break
end
self._heap:pop()
local ev = self._store[top.value]
self._store[top.value] = nil
if ev then
local ok, err = xpcall(self._handler, debug.traceback, ev)
if not ok then
skynet.error("[timer_queue] handler error:", err)
end
end
end
end
--- 等待下一个事件到期(或永久等待直到 push 唤醒)
function timer_queue:_wait_next()
if self._heap:empty() then
skynet.wait()
return
end
local delay = self._heap:peek().priority - skynet.now()
if delay > 0 then
skynet.sleep(delay)
end
end
--- 唤醒事件循环
function timer_queue:_wakeup()
if self._co then
skynet.wakeup(self._co)
end
end
return timer_queue