服务器之家:专注于服务器技术及软件下载分享
分类导航

云服务器|WEB服务器|FTP服务器|邮件服务器|虚拟服务器|服务器安全|DNS服务器|服务器其它|

RPC 服务器之【多进程描述符传递】高阶模型

2019-07-13 09:54码洞老钱 服务器其它

今天老师要给大家介绍一个比较特别的 RPC 服务器模型,这个模型不同于 Nginx、不同于 Redis、不同于 Apache、不同于 Tornado、不同于 Netty,它的原型是 Node Cluster 的多进程并发模型。

今天老师要给大家介绍一个比较特别的 RPC 服务器模型,这个模型不同于 Nginx、不同于 Redis、不同于 Apache、不同于 Tornado、不同于 Netty,它的原型是 Node Cluster 的多进程并发模型。

RPC 服务器之【多进程描述符传递】高阶模型

Nginx 并发模型

我们知道 Nginx 的并发模型是一个多进程并发模型,它的 Master 进程在绑定监听地址端口后 fork 出了多个 Slave 进程共同竞争处理这个服务端套接字接收到的很多客户端连接。

RPC 服务器之【多进程描述符传递】高阶模型

这多个 Slave 进程会共享同一个处于操作系统内核态的套接字队列,操作系统的网络模块在处理完三次握手后就会将套接字塞进这个队列。这是一个生产者消费者模型,生产者是操作系统的网络模块,消费者是多个 Slave 进程,队列中的对象是客户端套接字。

这种模型在负载均衡上有一个缺点,那就是套接字分配不均匀,形成了类似于贫富分化的局面,也就是「闲者愈闲,忙者愈忙」的状态。这是因为当多个进程竞争同一个套接字队列时,操作系统采用了 LIFO 的策略,最后一个来 accept 的进程最优先拿到 套接字。越是繁忙的进程越是有更多的机会调用 accept,它能拿到的套接字也就越多。

RPC 服务器之【多进程描述符传递】高阶模型

Node Cluster 并发模型

Node Cluster 为了解决负载均衡问题,它采用了不同的策略。它也是多进程并发模型,Master 进程会 fork 出多个子进程来处理客户端套接字。但是不存在竞争问题,因为负责 accept 套接字的只能是 Master 进程,Slave 进程只负责处理客户端套接字请求。那就存在一个问题,Master 进程拿到的客户端套接字如何传递给 Slave 进程。

RPC 服务器之【多进程描述符传递】高阶模型

这时,神奇的 sendmsg 登场了。它是操作系统提供的系统调用,可以在不同的进程之间传递文件描述符。sendmsg 会搭乘一个特殊的「管道」将 Master 进程的套接字描述符传递到 Slave 进程,Slave 进程通过 recvmsg 系统调用从这个「管道」中将描述符取出来。这个「管道」比较特殊,它是 Unix 域套接字。普通的套接字可以跨机器传输消息,Unix 域套接字只能在同一个机器的不同进程之间传递消息。同管道一样,Unix 域套接字也分为有名套接字和无名套接字,有名套接字会在文件系统指定一个路径名,无关进程之间都可以通过这个路径来访问 Unix 域套接字。而无名套接字一般用于父子进程之间,父进程会通过 socketpair 调用来创建套接字,然后 fork 出来子进程,这样子进程也会同时持有这个套接字的引用。后续父子进程就可以通过这个套接字互相通信。

RPC 服务器之【多进程描述符传递】高阶模型

注意这里的传递描述符,本质上不是传递,而是复制。父进程的描述符并不会在 sendmsg 自动关闭自动消失,子进程收到的描述符和父进程的描述符也不是同一个整数值。但是父子进程的描述符都会指向同一个内核套接字对象。

有了描述符的传递能力,父进程就可以将 accept 到的客户端套接字轮流传递给多个 Slave 进程,负载均衡的目标就可以顺利实现了。

接下来我们就是用 Python 代码来撸一遍 Node Cluster 的并发模型。因为 sendmsg 和 recvmsg 方法到了 Python3.5 才内置进来,所以下面的代码需要使用 Python3.5+才可以运行。

我们看 sendmsg 方法的定义


  1. socket.sendmsg(buffers[, ancdata[, flags[, address]]]) 

我们只需要关心第二个参数 ancdata,描述符是通过ancdata 参数传递的,它的意思是 「辅助数据」,而 buffers 表示需要传递的消息内容,因为消息内容这里没有意义,所以这个字段可以任意填写,但是必须要有内容,如果没有内容,sendmsg 方法就是一个空调用。


  1. import socket, structdef send_fds(sock, fd): return sock.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack("i", fd))])# ancdata 参数是一个三元组的列表,三元组的第一个参数表示网络协议栈级别 level,第二个参数表示辅助数据的类型 type,第三个参数才是携带的数据,level=SOL_SOCKET 表示传递的数据处于 TCP 协议层级,type=SCM_RIGHTS 就表示携带的数据是文件描述符。我们传递的描述符 fd 是一个整数,需要使用 struct 包将它序列化成二进制。 

再看 recvmsg 方法的定义


  1. msg, ancdata, flags, addr = socket.recvmsg(bufsize[, ancbufsize[, flags]]) 

同样,我们只需要关心返回的 ancdata 数据,它里面包含了我们需要的文件描述符。但是需要提供消息体的长度和辅助数据的长度参数。辅助数据的长度比较特殊,需要使用 CMSG_LEN 方法来计算,因为辅助数据里面还有我们看不到的额外的头部信息。


  1. bufsize = 1  # 消息内容的长度 
  2. ancbufsize = socket.CMSG_LEN(struct.calcsize('i'))  # 辅助数据的长度 
  3. msg, ancdata, flags, addr = socket.recvmsg(bufsize, ancbufsize) # 收取消息 
  4. level, type, fd_bytes = ancdata[0] # 取第一个元祖,注意发送消息时我们传递的是一个三元组的列表 
  5. fd = struct.unpack('i', fd_bytes) # 反序列化 

代码实现

下面我来献上完整的服务器代码,为了简单起见,我们在 Slave 进程中处理 RPC 请求使用同步模型。


  1. # coding: utf 
  2. # sendmsg recvmsg python3.5+才可以支持 
  3.  
  4. import os 
  5. import json 
  6. import struct 
  7. import socket 
  8.  
  9.  
  10. def handle_conn(conn, addr, handlers): 
  11.     print(addr, "comes"
  12.     while True
  13.         # 简单起见,这里就没有使用循环读取了 
  14.         length_prefix = conn.recv(4) 
  15.         if not length_prefix: 
  16.             print(addr, "bye"
  17.             conn.close() 
  18.             break  # 关闭连接,继续处理下一个连接 
  19.         length, = struct.unpack("I", length_prefix) 
  20.         body = conn.recv(length) 
  21.         request = json.loads(body) 
  22.         in_ = request['in'
  23.         params = request['params'
  24.         print(in_, params) 
  25.         handler = handlers[in_] 
  26.         handler(conn, params) 
  27.  
  28.  
  29. def loop_slave(pr, handlers): 
  30.     while True
  31.         bufsize = 1 
  32.         ancsize = socket.CMSG_LEN(struct.calcsize('i')) 
  33.         msg, ancdata, flags, addr = pr.recvmsg(bufsize, ancsize) 
  34.         cmsg_level, cmsg_type, cmsg_data = ancdata[0] 
  35.         fd = struct.unpack('i', cmsg_data)[0] 
  36.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd) 
  37.         handle_conn(sock, sock.getpeername(), handlers) 
  38.  
  39.  
  40. def ping(conn, params): 
  41.     send_result(conn, "pong", params) 
  42.  
  43.  
  44. def send_result(conn, out, result): 
  45.     response = json.dumps({"out"out"result": result}).encode('utf-8'
  46.     length_prefix = struct.pack("I", len(response)) 
  47.     conn.sendall(length_prefix) 
  48.     conn.sendall(response) 
  49.  
  50.  
  51. def loop_master(serv_sock, pws): 
  52.     idx = 0 
  53.     while True
  54.         sock, addr = serv_sock.accept() 
  55.         pw = pws[idx % len(pws)] 
  56.         # 消息数据,whatever 
  57.         msg = [b'x'
  58.         # 辅助数据,携带描述符 
  59.         ancdata = [( 
  60.             socket.SOL_SOCKET, 
  61.             socket.SCM_RIGHTS, 
  62.             struct.pack('i', sock.fileno()))] 
  63.         pw.sendmsg(msg, ancdata) 
  64.         sock.close()  # 关闭引用 
  65.         idx += 1 
  66.  
  67.  
  68. def prefork(serv_sock, n): 
  69.     pws = [] 
  70.     for i in range(n): 
  71.         # 开辟父子进程通信「管道」 
  72.         pr, pw = socket.socketpair() 
  73.         pid = os.fork() 
  74.         if pid < 0:  # fork error 
  75.             return pws 
  76.         if pid > 0: 
  77.             # 父进程 
  78.             pr.close()  # 父进程不用读 
  79.             pws.append(pw) 
  80.             continue 
  81.         if pid == 0: 
  82.             # 子进程 
  83.             serv_sock.close()  # 关闭引用 
  84.             pw.close()  # 子进程不用写 
  85.             return pr 
  86.     return pws 
  87.  
  88.  
  89. if __name__ == '__main__'
  90.     serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
  91.     serv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
  92.     serv_sock.bind(("localhost", 8080)) 
  93.     serv_sock.listen(1) 
  94.     pws_or_pr = prefork(serv_sock, 10) 
  95.     if hasattr(pws_or_pr, '__len__'): 
  96.         if pws_or_pr: 
  97.             loop_master(serv_sock, pws_or_pr) 
  98.         else
  99.             # fork 全部失败,没有子进程,Game Over 
  100.             serv_sock.close() 
  101.     else
  102.         handlers = { 
  103.             "ping": ping 
  104.         } 
  105.         loop_slave(pws_or_pr, handlers) 

父进程使用 fork 调用创建了多个子进程,然后又使用 socketpair 调用为每一个子进程都创建一个无名套接字用来传递描述符。父进程使用 roundrobin 策略平均分配接收到的客户端套接字。子进程接收到的是一个描述符整数,需要将描述符包装成套接字对象后方可读写。打印对比发送和接收到的描述符,你会发现它们俩的值并不相同,这是因为 sendmsg 将描述符发送到内核后,内核给描述符指向的内核套接字又重新分配了一个新的描述符对象。

延伸 · 阅读

精彩推荐
  • 服务器其它服务器重启不能启动的几种常见的解决方法

    服务器重启不能启动的几种常见的解决方法

    服务器重启不能启动的几种常见的解决方法,碰到此问题的朋友可以参考下。...

    网络2832019-06-15
  • 服务器其它企业为什么要进行服务器托管?

    企业为什么要进行服务器托管?

    随着互联网发展越来越迅速,互联网企业越来越意识到网站的重要性。建立网站离不开服务器的支持。可能很多人会犹豫到底要不要进行 服务器托管 ,那今天小编就来和大家一起分析一下,企业到底要不要进行服务器托管。 首先...

    服务器之家2002019-05-26
  • 服务器其它刀片服务器是什么 刀片服务器的主要特点

    刀片服务器是什么 刀片服务器的主要特点

    刀片服务器 是指在标准高度的机架式机箱内可插装多个卡式的服务器单元,是一种实现HAHD的低成本服务器平台,为特殊应用行业和高密度计算环境专门设计。刀片服务器就像刀片一样,每一块...

    IT百科1622019-05-16
  • 服务器其它win7开机时出现“rpc服务器不可用”之解决办法

    win7开机时出现“rpc服务器不可用”之解决办法

    问题: win7开机时出现rpc服务器不可用 ,正常情况下是开机无密码直接登录,开机按住F8,进入启动菜单后,无论用什么模式启动,都跳出RPC服务器不可用,而且显示之后只有两个选择,一个是返回输入密码,一个是关机,感觉是...

    服务器之家4432019-06-15
  • 服务器其它出现RPC服务器不可用的解决方法

    出现RPC服务器不可用的解决方法

    出现 RPC服务器 不可用 的解决方法 RPC服务器,是指Remote Procedure Call Protocol,中文释义为(RFC-1831)远程过程调用协议:一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协...

    互联网2172019-05-22
  • 服务器其它服务器运维我为什么不特别爱用Web面板

    服务器运维我为什么不特别爱用Web面板

    所谓的服务器 Web 控制面板其实就是通过 Web 端的访问来轻松管理服务器,提升运维效率。例如:创建管理网站、FTP、数据库,拥有可视化文件管理器,可视化软件管理器,可视化 CPU、内存、流...

    图设信息技术3582019-05-28
  • 服务器其它关闭打印机后台导致的RPC服务不可用

    关闭打印机后台导致的RPC服务不可用

    问题说明:做测试的时候标签 打印机 没连接上,任务给挂起了,为了关闭这些任务直接在任务栏里面关闭了打印机后台,结果吃完酸菜鱼回来睡了一觉再运行程序,突然提示 RPC服务不可用 。 于是乎我就在网上搜,找到了很良心...

    海梦思心2832019-06-15
  • 服务器其它RPC服务器不可用怎么办?解决RPC服务器不可用

    RPC服务器不可用怎么办?解决RPC服务器不可用

    最近在写一个程序需要向windows操作系统里面添加一个IP地址;其实就是使用了windows的netsh命令来添加IP地址;但是在添加IP地址的时候却提示RPC服务器不可用;网上各种没有找到解决办法;于是我就自己尝试了;下面就是我如何解决 RPC服务...

    百度经验3512019-06-15