- Published on
skynet service
- Authors

- Name
- Ushen
这个模块主要是用于skynet_context的创建销毁发送等管理和消息处理。
struct skynet_context {
void * instance;
struct skynet_module * mod;
void * cb_ud;
skynet_cb cb;
struct message_queue *queue;
ATOM_POINTER logfile;
uint64_t cpu_cost; // in microsec
uint64_t cpu_start; // in microsec
char result[32];
uint32_t handle;
int session_id;
ATOM_INT ref;
int message_count;
bool init;
bool endless;
bool profile;
CHECKCALLING_DECL
};
struct skynet_node {
ATOM_INT total;
int init;
uint32_t monitor_exit;
pthread_key_t handle_key;
bool profile; // default is on
};
static struct skynet_node G_NODE;
context创建的时候会根据模块名字查询地址mod
创建模块实例instance
注册handle
创建新的消息队列queue
worker线程会不断执行skynet_context_message_dispatch函数。
struct message_queue *
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
if (q == NULL) {
q = skynet_globalmq_pop();
if (q==NULL)
return NULL;
}
uint32_t handle = skynet_mq_handle(q);
struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {
struct drop_t d = { handle };
skynet_mq_release(q, drop_message, &d);
return skynet_globalmq_pop();
}
int i,n=1;
struct skynet_message msg;
for (i=0;i<n;i++) {
if (skynet_mq_pop(q,&msg)) {
skynet_context_release(ctx);
return skynet_globalmq_pop();
} else if (i==0 && weight >= 0) {
n = skynet_mq_length(q);
n >>= weight;
}
int overload = skynet_mq_overload(q);
if (overload) {
skynet_error(ctx, "May overload, message queue length = %d", overload);
}
skynet_monitor_trigger(sm, msg.source , handle);
if (ctx->cb == NULL) {
skynet_free(msg.data);
} else {
dispatch_message(ctx, &msg);
}
skynet_monitor_trigger(sm, 0,0);
}
assert(q == ctx->queue);
struct message_queue *nq = skynet_globalmq_pop();
if (nq) {
// If global mq is not empty , push q back, and return next queue (nq)
// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
skynet_globalmq_push(q);
q = nq;
}
skynet_context_release(ctx);
return q;
}
这个函数的返回值是下一个队列。
首先拿到q的handle,再根据handle拿到context
循环n次,这个n次代表着消费多少条消息
取决于权重weight,weight在skynet_start的时候被初始化。
然后判断是否过载触发警告,
触发monitor监控。
static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
assert(ctx->init);
CHECKCALLING_BEGIN(ctx)
pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
int type = msg->sz >> MESSAGE_TYPE_SHIFT;
size_t sz = msg->sz & MESSAGE_TYPE_MASK;
FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile);
if (f) {
skynet_log_output(f, msg->source, type, msg->session, msg->data, sz);
}
++ctx->message_count;
int reserve_msg;
if (ctx->profile) {
ctx->cpu_start = skynet_thread_time();
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
ctx->cpu_cost += cost_time;
} else {
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
}
if (!reserve_msg) {
skynet_free(msg->data);
}
CHECKCALLING_END(ctx)
}
首先尝试上锁,然后给线程设置键值,打日志等等。
调用ctx->cb得到返回消息,ctx->cb是各模块注册的消息处理函数。最后释放锁。
在这个函数注册cb
void
skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
context->cb = cb;
context->cb_ud = ud;
}
消息发送时,会根据地址来判断是本地发送还是交给港口
int
skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
if ((sz & MESSAGE_TYPE_MASK) != sz) {
skynet_error(context, "The message to %x is too large", destination);
if (type & PTYPE_TAG_DONTCOPY) {
skynet_free(data);
}
return -2;
}
_filter_args(context, type, &session, (void **)&data, &sz);
if (source == 0) {
source = context->handle;
}
if (destination == 0) {
if (data) {
skynet_error(context, "Destination address can't be 0");
skynet_free(data);
return -1;
}
return session;
}
if (skynet_harbor_message_isremote(destination)) {
struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
rmsg->destination.handle = destination;
rmsg->message = data;
rmsg->sz = sz & MESSAGE_TYPE_MASK;
rmsg->type = sz >> MESSAGE_TYPE_SHIFT;
skynet_harbor_send(rmsg, source, session);
} else {
struct skynet_message smsg;
smsg.source = source;
smsg.session = session;
smsg.data = data;
smsg.sz = sz;
if (skynet_context_push(destination, &smsg)) {
skynet_free(data);
return -1;
}
}
return session;
}
港口推送在harbor里 本地推送就是往context里的queue放一条消息进去
int
skynet_context_push(uint32_t handle, struct skynet_message *message) {
struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {
return -1;
}
skynet_mq_push(ctx->queue, message);
skynet_context_release(ctx);
return 0;
}