Published on

skynet harbor

Authors
  • avatar
    Name
    Ushen
    Twitter

不同的服务之间的通信方式之一就是通过harbor,当消息的高8为与本地地址不一样时会投放消息到harbor,转发到目的地的harbor手中。

早期和现在的harbor结构

云风的 BLOG重新设计并实现了 skynet 的 harbor 模块

harbor可以从config文件配置是否开启,每个服务的harbor通过一个master去管理。

配置的standalone和master分别表示master监听的端口以及skynet服务连接的master

skynet节点会通过master与其他的skynet节点一一建立单向通道,每两个节点就会建立两条通道

现在的skynet_master.c已经被删除,被cmaster.lua和cslave.lua取代,但可以通过早期skynet版本找到

struct name {
    struct name * next;
    char key[GLOBALNAME_LENGTH];
    uint32_t hash;
    uint32_t value;
};

struct namemap {
    struct name *node[HASH_SIZE];
};

struct master {
    struct skynet_context *ctx;
    int remote_fd[REMOTE_MAX];
    bool connected[REMOTE_MAX];
    char * remote_addr[REMOTE_MAX];
    struct namemap map;
};

struct master *
master_create() {
    struct master *m = skynet_malloc(sizeof(*m));
    int i;
    for (i=0;i<REMOTE_MAX;i++) {
        m->remote_fd[i] = -1;
        m->remote_addr[i] = NULL;
        m->connected[i] = false;
    }
    memset(&m->map, 0, sizeof(m->map));
    return m;
}

master的结构主要是存储skynet节点的连接,以及名字的键对

static void
_connect_to(struct master *m, int id) {
    assert(m->connected[id] == false);
    struct skynet_context * ctx = m->ctx;
    const char *ipaddress = m->remote_addr[id];
    char * portstr = strchr(ipaddress,':');
    if (portstr==NULL) {
        skynet_error(ctx, "Harbor %d : address invalid (%s)",id, ipaddress);
        return;
    }
    int sz = portstr - ipaddress;
    char tmp[sz + 1];
    memcpy(tmp,ipaddress,sz);
    tmp[sz] = '\0';
    int port = strtol(portstr+1,NULL,10);
    skynet_error(ctx, "Master connect to harbor(%d) %s:%d", id, tmp, port);
    m->remote_fd[id] = skynet_socket_connect(ctx, tmp, port);
}

首先skynet节点汇报自己的harbor_id,然后master会连入skynet

static void
_send_to(struct master *m, int id, const void * buf, int sz, uint32_t handle) {
    uint8_t * buffer= (uint8_t *)skynet_malloc(4 + sz + 12);
    to_bigendian(buffer, sz+12);
    memcpy(buffer+4, buf, sz);
    to_bigendian(buffer+4+sz, 0);
    to_bigendian(buffer+4+sz+4, handle);
    to_bigendian(buffer+4+sz+8, 0);

    sz += 4 + 12;

    if (skynet_socket_send(m->ctx, m->remote_fd[id], buffer, sz)) {
        skynet_error(m->ctx, "Harbor %d : send error", id);
    }
}

static void
_broadcast(struct master *m, const char *name, size_t sz, uint32_t handle) {
    int i;
    for (i=1;i<REMOTE_MAX;i++) {
        int fd = m->remote_fd[i];
        if (fd < 0 || m->connected[i]==false)
            continue;
        _send_to(m, i , name, sz, handle);
    }
}


static void
on_connected(struct master *m, int id) {
    _broadcast(m, m->remote_addr[id], strlen(m->remote_addr[id]), id);
    m->connected[id] = true;
    int i;
    for (i=1;i<REMOTE_MAX;i++) {
        if (i == id)
            continue;
        const char * addr = m->remote_addr[i];
        if (addr == NULL || m->connected[i] == false) {
            continue;
        }
        _send_to(m, id , addr, strlen(addr), i);
    }
}

然后skynet会连接master,master被连接之后,通知个个skynet节点连接到这个新的节点,然后新的节点再连接到其他的节点。

harbor做的事情也很简单,通过检查高8位地址,来判断把消息投放到哪里

int 
skynet_harbor_message_isremote(uint32_t handle) {
    assert(HARBOR != ~0);
    int h = (handle & ~HANDLE_MASK);
    return h != HARBOR && h !=0;
}


if (skynet_harbor_message_isremote(destination)) {
    struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
    rmsg->destination.handle = destination;
    rmsg->message = data;
    rmsg->sz = sz & MESSAGE_TYPE_MASK;
    rmsg->type = sz >> MESSAGE_TYPE_SHIFT;
    skynet_harbor_send(rmsg, source, session);
}

每次收到消息会根据这个函数判断是不是远程调用,是的话调用到skynet_harbor_send

void 
skynet_harbor_send(struct remote_message *rmsg, uint32_t source, int session) {
    assert(invalid_type(rmsg->type) && REMOTE);
    skynet_context_send(REMOTE, rmsg, sizeof(*rmsg) , source, PTYPE_SYSTEM , session);
}