Published on

skynet定时器队列

skynet每次创建定时器都会产生一个协程, 在服务有打量高频定时器的情况下会产生很多消息, 实际上可以每个服务都只使用一个定时器队列, 同时只存在一个, 因为更久的定时器暂时是否运行都无所谓

local skynet = require "skynet"
local minheap = require "minheap"

local timer_queue = {}

--- 创建新的定时器队列
--- handler(event) 在事件到期时调用,由调用方提供业务逻辑
function timer_queue.new(handler)
	if not handler then
		error("handler is required")
	end
	local self = {
		_heap = minheap.new(),
		_store = {},
		_counter = 0,
		_handler = handler,
		_co = nil,
	}
	setmetatable(self, { __index = timer_queue })
	return self
end

--- 推送事件,at_time 为 skynet.now() 时间戳,返回 token 用于取消
--- 自动唤醒事件循环,调用方无需额外调用 wakeup
function timer_queue:push(event, at_time)
	self._counter = self._counter + 1
	local seq = self._counter
	self._store[seq] = event
	self._heap:push(seq, at_time)
	self:_wakeup()
	return seq
end

--- 取消单个事件(O(log n))
function timer_queue:cancel(seq)
	self._heap:remove(seq)
	self._store[seq] = nil
end

--- 取消多个事件
function timer_queue:cancel_all(seqs)
	for _, seq in ipairs(seqs) do
		self:cancel(seq)
	end
end

--- 启动事件循环,需通过 skynet.fork 调用,只能启动一次
function timer_queue:start()
	if self._co then
		error("timer_queue:start() already called")
	end
	self._co = coroutine.running()
	self:_loop()
end

----------------------------------------------------------------------
-- 内部方法
----------------------------------------------------------------------

--- 事件循环
function timer_queue:_loop()
	while true do
		self:_dispatch()
		self:_wait_next()
	end
end

--- 分发所有到期事件(xpcall 保护,单个事件异常不影响后续事件)
function timer_queue:_dispatch()
	local now = skynet.now()
	while not self._heap:empty() do
		local top = self._heap:peek()
		if top.priority > now then
			break
		end
		self._heap:pop()
		local ev = self._store[top.value]
		self._store[top.value] = nil
		if ev then
			local ok, err = xpcall(self._handler, debug.traceback, ev)
			if not ok then
				skynet.error("[timer_queue] handler error:", err)
			end
		end
	end
end

--- 等待下一个事件到期(或永久等待直到 push 唤醒)
function timer_queue:_wait_next()
	if self._heap:empty() then
		skynet.wait()
		return
	end
	local delay = self._heap:peek().priority - skynet.now()
	if delay > 0 then
		skynet.sleep(delay)
	end
end

--- 唤醒事件循环
function timer_queue:_wakeup()
	if self._co then
		skynet.wakeup(self._co)
	end
end

return timer_queue