- Published on
skynet harbor
- Authors

- Name
- Ushen
不同的服务之间的通信方式之一就是通过harbor,当消息的高8为与本地地址不一样时会投放消息到harbor,转发到目的地的harbor手中。
早期和现在的harbor结构
云风的 BLOG重新设计并实现了 skynet 的 harbor 模块
harbor可以从config文件配置是否开启,每个服务的harbor通过一个master去管理。
配置的standalone和master分别表示master监听的端口以及skynet服务连接的master
skynet节点会通过master与其他的skynet节点一一建立单向通道,每两个节点就会建立两条通道
现在的skynet_master.c已经被删除,被cmaster.lua和cslave.lua取代,但可以通过早期skynet版本找到
struct name {
struct name * next;
char key[GLOBALNAME_LENGTH];
uint32_t hash;
uint32_t value;
};
struct namemap {
struct name *node[HASH_SIZE];
};
struct master {
struct skynet_context *ctx;
int remote_fd[REMOTE_MAX];
bool connected[REMOTE_MAX];
char * remote_addr[REMOTE_MAX];
struct namemap map;
};
struct master *
master_create() {
struct master *m = skynet_malloc(sizeof(*m));
int i;
for (i=0;i<REMOTE_MAX;i++) {
m->remote_fd[i] = -1;
m->remote_addr[i] = NULL;
m->connected[i] = false;
}
memset(&m->map, 0, sizeof(m->map));
return m;
}
master的结构主要是存储skynet节点的连接,以及名字的键对
static void
_connect_to(struct master *m, int id) {
assert(m->connected[id] == false);
struct skynet_context * ctx = m->ctx;
const char *ipaddress = m->remote_addr[id];
char * portstr = strchr(ipaddress,':');
if (portstr==NULL) {
skynet_error(ctx, "Harbor %d : address invalid (%s)",id, ipaddress);
return;
}
int sz = portstr - ipaddress;
char tmp[sz + 1];
memcpy(tmp,ipaddress,sz);
tmp[sz] = '\0';
int port = strtol(portstr+1,NULL,10);
skynet_error(ctx, "Master connect to harbor(%d) %s:%d", id, tmp, port);
m->remote_fd[id] = skynet_socket_connect(ctx, tmp, port);
}
首先skynet节点汇报自己的harbor_id,然后master会连入skynet
static void
_send_to(struct master *m, int id, const void * buf, int sz, uint32_t handle) {
uint8_t * buffer= (uint8_t *)skynet_malloc(4 + sz + 12);
to_bigendian(buffer, sz+12);
memcpy(buffer+4, buf, sz);
to_bigendian(buffer+4+sz, 0);
to_bigendian(buffer+4+sz+4, handle);
to_bigendian(buffer+4+sz+8, 0);
sz += 4 + 12;
if (skynet_socket_send(m->ctx, m->remote_fd[id], buffer, sz)) {
skynet_error(m->ctx, "Harbor %d : send error", id);
}
}
static void
_broadcast(struct master *m, const char *name, size_t sz, uint32_t handle) {
int i;
for (i=1;i<REMOTE_MAX;i++) {
int fd = m->remote_fd[i];
if (fd < 0 || m->connected[i]==false)
continue;
_send_to(m, i , name, sz, handle);
}
}
static void
on_connected(struct master *m, int id) {
_broadcast(m, m->remote_addr[id], strlen(m->remote_addr[id]), id);
m->connected[id] = true;
int i;
for (i=1;i<REMOTE_MAX;i++) {
if (i == id)
continue;
const char * addr = m->remote_addr[i];
if (addr == NULL || m->connected[i] == false) {
continue;
}
_send_to(m, id , addr, strlen(addr), i);
}
}
然后skynet会连接master,master被连接之后,通知个个skynet节点连接到这个新的节点,然后新的节点再连接到其他的节点。
harbor做的事情也很简单,通过检查高8位地址,来判断把消息投放到哪里
int
skynet_harbor_message_isremote(uint32_t handle) {
assert(HARBOR != ~0);
int h = (handle & ~HANDLE_MASK);
return h != HARBOR && h !=0;
}
if (skynet_harbor_message_isremote(destination)) {
struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
rmsg->destination.handle = destination;
rmsg->message = data;
rmsg->sz = sz & MESSAGE_TYPE_MASK;
rmsg->type = sz >> MESSAGE_TYPE_SHIFT;
skynet_harbor_send(rmsg, source, session);
}
每次收到消息会根据这个函数判断是不是远程调用,是的话调用到skynet_harbor_send
void
skynet_harbor_send(struct remote_message *rmsg, uint32_t source, int session) {
assert(invalid_type(rmsg->type) && REMOTE);
skynet_context_send(REMOTE, rmsg, sizeof(*rmsg) , source, PTYPE_SYSTEM , session);
}