Published on

skynet消息队列

Authors
  • avatar
    Name
    Ushen
    Twitter

skynet用的是actor模型, 里面的mq就是actor的邮箱

有一个总的队列 global_queue, 每一个服务有一个自己的队列message_queue

结构体为

struct skynet_message {
	uint32_t source;
	int session;
	void * data;
	size_t sz;
};

struct message_queue {
	struct spinlock lock;
	uint32_t handle;
	int cap;
	int head;
	int tail;
	int release;
	int in_global;
	int overload;
	int overload_threshold;
	struct skynet_message *queue;
	struct message_queue *next;
};

struct global_queue {
	struct message_queue *head;
	struct message_queue *tail;
	struct spinlock lock;
};

每次我们call或者send的时候都会往message_queue去插入一条消息,这个消息的机构就是skynet_message

这里的queue其实就是一个数组, 根据两个指针head和tail不断移动还有一个cap来运行,一个环形队列

看插入和取出的方法, 里面还包含关于overload的用途

void 
skynet_mq_push(struct message_queue *q, struct skynet_message *message) {
	assert(message);
	SPIN_LOCK(q)

	q->queue[q->tail] = *message;
	if (++ q->tail >= q->cap) {
		q->tail = 0;
	}

	if (q->head == q->tail) {
		expand_queue(q);
	}

	if (q->in_global == 0) {
		q->in_global = MQ_IN_GLOBAL;
		skynet_globalmq_push(q);
	}
	
	SPIN_UNLOCK(q)
}


int
skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {
	int ret = 1;
	SPIN_LOCK(q)

	if (q->head != q->tail) {
		*message = q->queue[q->head++];
		ret = 0;
		int head = q->head;
		int tail = q->tail;
		int cap = q->cap;

		if (head >= cap) {
			q->head = head = 0;
		}
		int length = tail - head;
		if (length < 0) {
			length += cap;
		}
		while (length > q->overload_threshold) {
			q->overload = length;
			q->overload_threshold *= 2;
		}
	} else {
		// reset overload_threshold when queue is empty
		q->overload_threshold = MQ_OVERLOAD;
	}

	if (ret) {
		q->in_global = 0;
	}
	
	SPIN_UNLOCK(q)

	return ret;
}

global_queue和mesage_queue都有锁,每次操作的时候会自旋获取, 具体实现为

static inline void
spinlock_lock(struct spinlock *lock) {
	for (;;) {
		if (!atomic_test_and_set_(&lock->lock))
			return;
		while (atomic_load_relaxed_(&lock->lock))
			atomic_pause_();
	}
}

#define SPIN_LOCK(q) spinlock_lock(&(q)->lock);

扩容也是像大多结构一样,每次都*2, 把消息复制到一个新的数组, 然后增加cap, 这里没锁,主要是这里只在推送的时候去调用, push那里锁住了

static void
expand_queue(struct message_queue *q) {
	struct skynet_message *new_queue = skynet_malloc(sizeof(struct skynet_message) * q->cap * 2);
	int i;
	for (i=0;i<q->cap;i++) {
		new_queue[i] = q->queue[(q->head + i) % q->cap];
	}
	q->head = 0;
	q->tail = q->cap;
	q->cap *= 2;
	
	skynet_free(q->queue);
	q->queue = new_queue;
}

关于如何运作global_queue和message_queue取到消息并执行任务, 在skynet_server