- Published on
skynet消息队列
- Authors

- Name
- Ushen
skynet用的是actor模型, 里面的mq就是actor的邮箱
有一个总的队列 global_queue, 每一个服务有一个自己的队列message_queue
结构体为
struct skynet_message {
uint32_t source;
int session;
void * data;
size_t sz;
};
struct message_queue {
struct spinlock lock;
uint32_t handle;
int cap;
int head;
int tail;
int release;
int in_global;
int overload;
int overload_threshold;
struct skynet_message *queue;
struct message_queue *next;
};
struct global_queue {
struct message_queue *head;
struct message_queue *tail;
struct spinlock lock;
};
每次我们call或者send的时候都会往message_queue去插入一条消息,这个消息的机构就是skynet_message
这里的queue其实就是一个数组, 根据两个指针head和tail不断移动还有一个cap来运行,一个环形队列
看插入和取出的方法, 里面还包含关于overload的用途
void
skynet_mq_push(struct message_queue *q, struct skynet_message *message) {
assert(message);
SPIN_LOCK(q)
q->queue[q->tail] = *message;
if (++ q->tail >= q->cap) {
q->tail = 0;
}
if (q->head == q->tail) {
expand_queue(q);
}
if (q->in_global == 0) {
q->in_global = MQ_IN_GLOBAL;
skynet_globalmq_push(q);
}
SPIN_UNLOCK(q)
}
int
skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {
int ret = 1;
SPIN_LOCK(q)
if (q->head != q->tail) {
*message = q->queue[q->head++];
ret = 0;
int head = q->head;
int tail = q->tail;
int cap = q->cap;
if (head >= cap) {
q->head = head = 0;
}
int length = tail - head;
if (length < 0) {
length += cap;
}
while (length > q->overload_threshold) {
q->overload = length;
q->overload_threshold *= 2;
}
} else {
// reset overload_threshold when queue is empty
q->overload_threshold = MQ_OVERLOAD;
}
if (ret) {
q->in_global = 0;
}
SPIN_UNLOCK(q)
return ret;
}
global_queue和mesage_queue都有锁,每次操作的时候会自旋获取, 具体实现为
static inline void
spinlock_lock(struct spinlock *lock) {
for (;;) {
if (!atomic_test_and_set_(&lock->lock))
return;
while (atomic_load_relaxed_(&lock->lock))
atomic_pause_();
}
}
#define SPIN_LOCK(q) spinlock_lock(&(q)->lock);
扩容也是像大多结构一样,每次都*2, 把消息复制到一个新的数组, 然后增加cap, 这里没锁,主要是这里只在推送的时候去调用, push那里锁住了
static void
expand_queue(struct message_queue *q) {
struct skynet_message *new_queue = skynet_malloc(sizeof(struct skynet_message) * q->cap * 2);
int i;
for (i=0;i<q->cap;i++) {
new_queue[i] = q->queue[(q->head + i) % q->cap];
}
q->head = 0;
q->tail = q->cap;
q->cap *= 2;
skynet_free(q->queue);
q->queue = new_queue;
}
关于如何运作global_queue和message_queue取到消息并执行任务, 在skynet_server里