服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|

Netty + ZooKeeper 实现简单的服务注册与发现

2019-07-08 12:51fengzhizi715 JAVA教程

服务注册和发现一直是分布式的核心组件。本文介绍了借助 ZooKeeper 做注册中心,如何实现一个简单的服务注册和发现。,需要的朋友可以参考下

一. 背景

最近的一个项目:我们的系统接收到上游系统的派单任务后,会推送到指定的门店的相关设备,并进行相应的业务处理。

二. Netty 的使用

在接收到派单任务之后,通过 Netty 推送到指定门店相关的设备。在我们的系统中 Netty 实现了消息推送、长连接以及心跳机制。

Netty + ZooKeeper 实现简单的服务注册与发现

2.1 Netty Server 端:

每个 Netty 服务端通过 ConcurrentHashMap 保存了客户端的 clientId 以及它连接的 SocketChannel。

服务器端向客户端发送消息时,只要获取 clientId 对应的 SocketChannel,往 SocketChannel 里写入相应的 message 即可。

EventLoopGroup boss = new NioEventLoopGroup(1);
  EventLoopGroup worker = new NioEventLoopGroup();
  ServerBootstrap bootstrap = new ServerBootstrap();
  bootstrap.group(boss, worker)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 128)
    .option(ChannelOption.TCP_NODELAY, true)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childHandler(new ChannelInitializer() {
     @Override
     protected void initChannel(Channel channel) throws Exception {
      ChannelPipeline p = channel.pipeline();
      p.addLast(new MessageEncoder());
      p.addLast(new MessageDecoder());
      p.addLast(new PushServerHandler());
     }
    });
  ChannelFuture future = bootstrap.bind(host,port).sync();
  if (future.isSuccess()) {
   logger.info("server start...");
  }

2.2 Netty Client 端:

客户端用于接收服务端的消息,随即进行业务处理。客户端还有心跳机制,它通过 IdleEvent 事件定时向服务端放送 Ping 消息以此来检测 SocketChannel 是否中断。

public PushClientBootstrap(String host, int port) throws InterruptedException {
  this.host = host;
  this.port = port;
  start(host,port);
 }
 private void start(String host, int port) throws InterruptedException {
  bootstrap = new Bootstrap();
  bootstrap.channel(NioSocketChannel.class)
    .option(ChannelOption.SO_KEEPALIVE, true)
    .group(workGroup)
    .remoteAddress(host, port)
    .handler(new ChannelInitializer(){
     @Override
     protected void initChannel(Channel channel) throws Exception {
      ChannelPipeline p = channel.pipeline();
      p.addLast(new IdleStateHandler(20, 10, 0)); // IdleStateHandler 用于检测心跳
      p.addLast(new MessageDecoder());
      p.addLast(new MessageEncoder());
      p.addLast(new PushClientHandler());
     }
    });
  doConnect(port, host);
 }
 /**
  * 建立连接,并且可以实现自动重连.
  * @param port port.
  * @param host host.
  * @throws InterruptedException InterruptedException.
  */
 private void doConnect(int port, String host) throws InterruptedException {
  if (socketChannel != null && socketChannel.isActive()) {
   return;
  }
  final int portConnect = port;
  final String hostConnect = host;
  ChannelFuture future = bootstrap.connect(host, port);
  future.addListener(new ChannelFutureListener() {
   @Override
   public void operationComplete(ChannelFuture futureListener) throws Exception {
    if (futureListener.isSuccess()) {
     socketChannel = (SocketChannel) futureListener.channel();
     logger.info("Connect to server successfully!");
    } else {
     logger.info("Failed to connect to server, try connect after 10s");
     futureListener.channel().eventLoop().schedule(new Runnable() {
      @Override
      public void run() {
       try {
        doConnect(portConnect, hostConnect);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      }
     }, 10, TimeUnit.SECONDS);
    }
   }
  }).sync();
 }

三. 借助 ZooKeeper 实现简单的服务注册与发现

3.1 服务注册

服务注册本质上是为了解耦服务提供者和服务消费者。服务注册是一个高可用强一致性的服务发现存储仓库,主要用来存储服务的api和地址对应关系。为了高可用,服务注册中心一般为一个集群,并且能够保证分布式一致性。目前常用的有 ZooKeeper、Etcd 等等。

在我们项目中采用了 ZooKeeper 实现服务注册。

public class ServiceRegistry {
 private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);
 private CountDownLatch latch = new CountDownLatch(1);
 private String registryAddress;
 public ServiceRegistry(String registryAddress) {
  this.registryAddress = registryAddress;
 }
 public void register(String data) {
  if (data != null) {
   ZooKeeper zk = connectServer();
   if (zk != null) {
    createNode(zk, data);
   }
  }
 }
 /**
  * 连接 zookeeper 服务器
  * @return
  */
 private ZooKeeper connectServer() {
  ZooKeeper zk = null;
  try {
   zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
     if (event.getState() == Event.KeeperState.SyncConnected) {
      latch.countDown();
     }
    }
   });
   latch.await();
  } catch (IOException | InterruptedException e) {
   logger.error("", e);
  }
  return zk;
 }
 /**
  * 创建节点
  * @param zk
  * @param data
  */
 private void createNode(ZooKeeper zk, String data) {
  try {
   byte[] bytes = data.getBytes();
   String path = zk.create(Constants.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
   logger.debug("create zookeeper node ({} => {})", path, data);
  } catch (KeeperException | InterruptedException e) {
   logger.error("", e);
  }
 }
}

有了服务注册,在 Netty 服务端启动之后,将 Netty 服务端的 ip 和 port 注册到 ZooKeeper。

EventLoopGroup boss = new NioEventLoopGroup(1);
  EventLoopGroup worker = new NioEventLoopGroup();
  ServerBootstrap bootstrap = new ServerBootstrap();
  bootstrap.group(boss, worker)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 128)
    .option(ChannelOption.TCP_NODELAY, true)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childHandler(new ChannelInitializer() {
     @Override
     protected void initChannel(Channel channel) throws Exception {
      ChannelPipeline p = channel.pipeline();
      p.addLast(new MessageEncoder());
      p.addLast(new MessageDecoder());
      p.addLast(new PushServerHandler());
     }
    });
  ChannelFuture future = bootstrap.bind(host,port).sync();
  if (future.isSuccess()) {
   logger.info("server start...");
  }
  if (serviceRegistry != null) {
   serviceRegistry.register(host + ":" + port);
  }

3.2 服务发现

这里我们采用的是客户端的服务发现,即服务发现机制由客户端实现。

客户端在和服务端建立连接之前,通过查询注册中心的方式来获取服务端的地址。如果存在有多个 Netty 服务端的话,可以做服务的负载均衡。在我们的项目中只采用了简单的随机法进行负载。

public class ServiceDiscovery {
 private static final Logger logger = LoggerFactory.getLogger(ServiceDiscovery.class);
 private CountDownLatch latch = new CountDownLatch(1);
 private volatile List<String> serviceAddressList = new ArrayList<>();
 private String registryAddress; // 注册中心的地址
 public ServiceDiscovery(String registryAddress) {
  this.registryAddress = registryAddress;
  ZooKeeper zk = connectServer();
  if (zk != null) {
   watchNode(zk);
  }
 }
 /**
  * 通过服务发现,获取服务提供方的地址
  * @return
  */
 public String discover() {
  String data = null;
  int size = serviceAddressList.size();
  if (size > 0) {
   if (size == 1) { //只有一个服务提供方
    data = serviceAddressList.get(0);
    logger.info("unique service address : {}", data);
   } else {   //使用随机分配法。简单的负载均衡法
    data = serviceAddressList.get(ThreadLocalRandom.current().nextInt(size));
    logger.info("choose an address : {}", data);
   }
  }
  return data;
 }
 /**
  * 连接 zookeeper
  * @return
  */
 private ZooKeeper connectServer() {
  ZooKeeper zk = null;
  try {
   zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
     if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
      latch.countDown();
     }
    }
   });
   latch.await();
  } catch (IOException | InterruptedException e) {
   logger.error("", e);
  }
  return zk;
 }
 /**
  * 获取服务地址列表
  * @param zk
  */
 private void watchNode(final ZooKeeper zk) {
  try {
   //获取子节点列表
   List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY_PATH, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
     if (event.getType() == Event.EventType.NodeChildrenChanged) {
      //发生子节点变化时再次调用此方法更新服务地址
      watchNode(zk);
     }
    }
   });
   List<String> dataList = new ArrayList<>();
   for (String node : nodeList) {
    byte[] bytes = zk.getData(Constants.ZK_REGISTRY_PATH + "/" + node, false, null);
    dataList.add(new String(bytes));
   }
   logger.debug("node data: {}", dataList);
   this.serviceAddressList = dataList;
  } catch (KeeperException | InterruptedException e) {
   logger.error("", e);
  }
 }
}

Netty 客户端启动之后,通过服务发现获取 Netty 服务端的 ip 和 port。

/**
  * 支持通过服务发现来获取 Socket 服务端的 host、port
  * @param discoveryAddress
  * @throws InterruptedException
  */
 public PushClientBootstrap(String discoveryAddress) throws InterruptedException {

  serviceDiscovery = new ServiceDiscovery(discoveryAddress);
  serverAddress = serviceDiscovery.discover();

  if (serverAddress!=null) {
   String[] array = serverAddress.split(":");
   if (array!=null && array.length==2) {

    String host = array[0];
    int port = Integer.parseInt(array[1]);

    start(host,port);
   }
  }
 }

四. 总结

服务注册和发现一直是分布式的核心组件。本文介绍了借助 ZooKeeper 做注册中心,如何实现一个简单的服务注册和发现。其实,注册中心的选择有很多,例如 Etcd、Eureka 等等。选择符合我们业务需求的才是最重要的。

以上所述是小编给大家介绍的Netty + ZooKeeper 实现简单的服务注册与发现,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

延伸 · 阅读

精彩推荐
  • JAVA教程阿里云主机上安装jdk 某库出现问题的解决方法

    阿里云主机上安装jdk 某库出现问题的解决方法

    今天安装jdk到阿里云服务上,首先看下阿里云是32位还是64位的,如果是32位下载32位的包,如果是64位的下载64位的包,下面与大家分享下安装过程中遇到问题的解决方法...

    服务器之家1842019-06-18
  • JAVA教程Java 中引入内部类的意义?

    Java 中引入内部类的意义?

    这篇文章主要介绍了Java 中引入内部类的意义?文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,,需要的朋友可以参考下...

    胖君4162019-06-28
  • JAVA教程windows下zookeeper配置java环境变量的方法

    windows下zookeeper配置java环境变量的方法

    今天小编就为大家分享一篇关于windows下zookeeper配置java环境变量的方法,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧...

    langwang19933072019-06-20
  • JAVA教程使用JMX监控Zookeeper状态Java API

    使用JMX监控Zookeeper状态Java API

    今天小编就为大家分享一篇关于使用JMX监控Zookeeper状态Java API,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧...

    Scub2612019-06-20
  • JAVA教程Drools Fusion(CEP)定义及使用方法讲解

    Drools Fusion(CEP)定义及使用方法讲解

    今天小编就为大家分享一篇关于Drools Fusion(CEP)定义及使用方法讲解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧...

    双斜杠少年3352019-06-24
  • JAVA教程Scala中的mkString的具体使用方法

    Scala中的mkString的具体使用方法

    这篇文章主要介绍了Scala中的mkString的具体方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...

    LittleLawson2712019-07-08
  • JAVA教程java与php的区别浅析

    java与php的区别浅析

    在本篇文章里小编给大家整理了关于java与php的区别以及相关知识点,有兴趣的朋友们学习下。...

    laozhang3762019-06-24
  • JAVA教程java中JVM中如何存取数据和相关信息详解

    java中JVM中如何存取数据和相关信息详解

    这篇文章主要介绍了JVM中如何存取数据和相关信息详解,Java源代码文件(.java后缀)会被Java编译器编译为字节码文件,然后由JVM中的类加载器加载各个类的字节码文件,加载完毕之后,交由JVM执行引擎执行。JVM中怎么存取数据和相关...

    易水人去丶明月如霜4532019-06-27