一、Java RMI介绍

   Java RMI 指的是远程方法调用 (Remote Method Invocation)。它是一种机制,能够让在某个 Java 虚拟机上的对象调用另一个 Java 虚拟机中的对象上的方法。可以用此方法调用的任何对象必须实现该远程接口。

Java RMI不是什么新技术(在Java1.1的时代都有了),但却是是非常重要的底层技术,大名鼎鼎的EJB都是建立在rmi基础之上的,现在还有一些开源的远程调用组件,其底层技术也是rmi。

二、Java RMI的简单案例

2.1、 定义一个远程接口,必须继承Remote接口,其中需要远程调用的方法必须抛出RemoteException异常

package demo.zookeeper.remoting.common;

import java.rmi.Remote;
import java.rmi.RemoteException;

public interface HelloService extends Remote {

    String sayHello(String name) throws RemoteException;
}

2.2、远程的接口的实现

package demo.zookeeper.remoting.server;

import demo.zookeeper.remoting.common.HelloService;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;

public class HelloServiceImpl extends UnicastRemoteObject implements HelloService {

    private static final long serialVersionUID = 1L;

    protected HelloServiceImpl() throws RemoteException {
    }

    @Override
    public String sayHello(String name) throws RemoteException {
        return String.format("Hello %s", name);
    }
}

2.3、创建RMI注册表,启动RMI服务,并将远程对象注册到RMI注册表中。

package demo.zookeeper.remoting.server;

import java.rmi.Naming;
import java.rmi.registry.LocateRegistry;

public class RmiServer {

    public static void main(String[] args) throws Exception {
        int port = 1099;
        //绑定的URL标准格式为:rmi://host:port/name(其中协议名可以省略,下面两种写法都是正确的)
        String url = "rmi://localhost:1099/demo.zookeeper.remoting.server.HelloServiceImpl";
        //String url = "//localhost:1099/demo.zookeeper.remoting.server.HelloServiceImpl";
        //本地主机上的远程对象注册表Registry的实例,并指定端口为1099,这一步必不可少(Java默认端口是1099),必不可缺的一步,缺少注册表创建,则无法绑定对象到远程注册表上 
        LocateRegistry.createRegistry(port);
        //把远程对象注册到RMI注册服务器上 
        Naming.rebind(url, new HelloServiceImpl());
        System.out.println(">>>>:远程绑定成功!");
    }
}

2.4、客户端测试,在客户端调用远程对象上的远程方法,并返回结果。

package demo.zookeeper.remoting.client;

import demo.zookeeper.remoting.common.HelloService;
import java.rmi.Naming;

public class RmiClient {

    public static void main(String[] args) throws Exception {
        String url = "rmi://localhost:1099/demo.zookeeper.remoting.server.HelloServiceImpl";
        //在RMI服务注册表中查找名称为helloService的对象,
        HelloService helloService = (HelloService) Naming.lookup(url);
        //并调用其上的方法 
        String result = helloService.sayHello("Jack");
        System.out.println(result);
    }
}

思考: 上面是一个客户端对应一个服务端, 服务端启动1099端口,如果使用zookeeper高可用, 启动多个服务端, 客户端连接任意一个服务端,都可以获取数据, 如果一个server挂了, 也不受影响。

架构改进:

这里写图片描述

三、存储到zookeeper

zk工具类, 由于获取zk对象, 以及创建节点

package com.chb.zookeeper.util;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
public class zkUtils {
    /**
     *  用于等待 SyncConnected 事件触发后继续执行当前线程
     */
    private static CountDownLatch latch = new CountDownLatch(1);

    /**
     * 连接zookeeper集群
     * @return
     */
    public static ZooKeeper zkConnect() {
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constant.ZK_CONNECTION_STR, Constant.ZK_SESSION_TIMEOUT, new Watcher() {//监听者
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {//server成功通过数据,   
                        // 唤醒当前正在执行的线程
                        latch.countDown();
                    }
                }
            });
            latch.await();//是当前线程处于等待状态
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return zk;
    }


    /**
     * 创建zk Node
     * @param zk   Zookeeper对象
     * @param path 节点目录
     * @param data   url
     */
    public static  void createNode(ZooKeeper zk ,String path,  String data) {
        try {
            System.out.println("url:"+data+" path:" +path);
            zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }


    }
}

3.1、server

3.1.1、在单节点上创建server的步骤主要有创建注册表对象,将本机远程对象注册到RMI服务器上

        //本地主机上的远程对象注册表Registry的实例,并指定端口为1099,这一步必不可少(Java默认端口是1099),必不可缺的一步,缺少注册表创建,则无法绑定对象到远程注册表上 
        LocateRegistry.createRegistry(port);
        //把远程对象注册到RMI注册服务器上 
        Naming.rebind(url, new HelloServiceImpl());

3.1.2、使用zookeeper构建高可用的RMI服务器

每启动一个RMI Server 将远程对象的url信息注册到zookeeper上的节点

    /**
     * 根据传入的Remote对象引用,rmi ip port 信息构建成url
     * 
     * @param remote
     * @param host
     * @param port
     * @return
     */
    private static String publishService(Remote remote, String host, int port) {
        String url = String.format("rmi://%s:%d/%s", host, port, remote.getClass().getName());
        try {
            //本地主机上的远程对象注册表Registry的实例,并指定端口为1099,这一步必不可少(Java默认端口是1099),必不可缺的一步,缺少注册表创建,则无法绑定对象到远程注册表上 
            LocateRegistry.createRegistry(port);
            //把远程对象注册到RMI注册服务器上 
            Naming.rebind(url, remote);
            System.out.println("publish rmi service (url: {})"+url);
        } catch (RemoteException e) {
            e.printStackTrace();
        } catch (MalformedURLException e) {
            e.printStackTrace();
        }
        return url;
    }

###将url信息写到zookeeper节点
            if (zk != null) {
                System.out.println("zk 连接成功!");
                //将url信息写到 zookeeper节点上
                zkUtils.createNode(zk, Constant.ZK_PROVIDER_PATH, url);
            }

3.2、测试server

3.2.1、没有创建一级节点,出现下面错误,不会自动创建一级节点

org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /registry/provider

3.2.2创建/registry/,

这里写图片描述

3.2.3、再次开启server , 修改端口启动多个server

[zk: localhost:2181(CONNECTED) 29] ls /registry    
[provider0000000001, provider0000000002, provider0000000000]
[zk: localhost:2181(CONNECTED) 30] get /registry/provider0000000000
rmi://192.198.1.127:11233/com.chb.zookeeper.util.HelloServiceImpl
cZxid = 0xf0000002d
ctime = Tue Jun 20 22:29:05 CST 2017
mZxid = 0xf0000002d
mtime = Tue Jun 20 22:29:05 CST 2017
pZxid = 0xf0000002d
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15cc5cbe0b00005
dataLength = 65
numChildren = 0
[zk: localhost:2181(CONNECTED) 31] 

3.2.4、server完整代码

package com.chb.zookeeper.server;

import java.rmi.RemoteException;

import com.chb.zookeeper.util.HelloService;
import com.chb.zookeeper.util.HelloServiceImpl;

/**
 * RMI server端
 * @author 12285
 */
public class Server {
    public static void main(String[] args) {
        //当前RMI服务器的ip
        String host = "192.198.1.127";
        int port = 11235;
        try {
            HelloService helloService = new HelloServiceImpl();

            ServiceProvider serviceProvider = new ServiceProvider();

            serviceProvider.publish(helloService, host, port);

            Thread.sleep(Long.MAX_VALUE);
        } catch (RemoteException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

###3.2.5、ServiceProvider

package com.chb.zookeeper.server;

import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;

import org.apache.zookeeper.ZooKeeper;

import com.chb.zookeeper.util.Constant;
import com.chb.zookeeper.util.zkUtils;

/**
 * 发布RMI服务, 并将RMI信息注册到Zookeep中
 * @author 12285
 */
public class ServiceProvider {
    /**
     * 发布 RMI 服务并注册 RMI 地址到 ZooKeeper 中
     * @param remote
     * @param host
     * @param port
     */
    public void publish(Remote remote, String host, int port){
        String url = publishService(remote, host, port);
        if (url != null) {
            //连接Zookeeper
            ZooKeeper zk = zkUtils.zkConnect();
            if (zk != null) {
                System.out.println("zk 连接成功!");
                zkUtils.createNode(zk, Constant.ZK_PROVIDER_PATH, url);
            }
        }
    }

    /**
     * 根据传入的Remote对象引用,rmi ip port 信息构建成url
     * 
     * @param remote
     * @param host
     * @param port
     * @return
     */
    private static String publishService(Remote remote, String host, int port) {
        String url = String.format("rmi://%s:%d/%s", host, port, remote.getClass().getName());
        try {
            //本地主机上的远程对象注册表Registry的实例,并指定端口为1099,这一步必不可少(Java默认端口是1099),必不可缺的一步,缺少注册表创建,则无法绑定对象到远程注册表上 
            LocateRegistry.createRegistry(port);
            //把远程对象注册到RMI注册服务器上 
            Naming.rebind(url, remote);
            System.out.println("publish rmi service (url: {})"+url);
        } catch (RemoteException e) {
            e.printStackTrace();
        } catch (MalformedURLException e) {
            e.printStackTrace();
        }
        return url;
    }


}

3.3、Client

client 不在直接获取一个端口的rmi服务, 而是在zookeeper节点遍历,获取rmi服务信息

    /**
     * 查找RMI服务
     */
    public <T extends Remote> T lookup() {
        T service = null;
        int size = urlList.size();
        String url = null;
        if (size == 1) {
            url = urlList.get(0);// 若 urlList 中只有一个元素,则直接获取该元素
        }else if(size > 1){
            url = urlList.get(ThreadLocalRandom.current().nextInt(size));// 若 urlList 中存在多个元素,则随机获取一个元素
        }
        service  = lookupService(url);
        return service;
    }

3.3.1、ServiceConsumer

package com.chb.zookeeper.client;

import java.net.MalformedURLException;
import java.rmi.ConnectException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event;

import com.chb.zookeeper.util.Constant;
import com.chb.zookeeper.util.zkUtils;

public class ServiceConsumer {
    /**
     *  定义一个 volatile 成员变量,用于保存最新的 RMI 地址(考虑到该变量或许会被其它线程所修改,一旦修改后,该变量的值会影响到所有线程)
     */
    private volatile List<String> urlList = new ArrayList<>(); 

    public ServiceConsumer() {
        ZooKeeper zk = null;
        zk = zkUtils.zkConnect();
        if (zk != null) {
            watchNode(zk); // 观察 /registry 节点的所有子节点并更新 urlList 成员变量

        }
    }
    /**
     * 观察 /registry 节点的所有子节点并更新 urlList 成员变量
     * @param zk
     */
    private void watchNode(final ZooKeeper zk) {
        try {
            List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        watchNode(zk); // 若子节点有变化,则重新调用该方法(为了获取最新子节点中的数据)
                    }
                }
            });
            //存放/registry所有子节点的数据
            List<String> dataList = new ArrayList<String>();
            for (String node : nodeList) {
                byte[] data = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null); // 获取 /registry 的子节点中的数据
                dataList.add(new String(data));
            }
            urlList = dataList;
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }
    /**
     * 查找RMI服务
     */
    public <T extends Remote> T lookup() {
        T service = null;
        int size = urlList.size();
        String url = null;
        if (size == 1) {
            url = urlList.get(0);// 若 urlList 中只有一个元素,则直接获取该元素
        }else if(size > 1){
            url = urlList.get(ThreadLocalRandom.current().nextInt(size));// 若 urlList 中存在多个元素,则随机获取一个元素
        }
        service  = lookupService(url);
        return service;
    }
    /**
     *  在 JNDI 中查找 RMI 远程服务对象
     */
    @SuppressWarnings("unchecked")
    private <T> T lookupService(String url) {
        T remote = null;
        try {
            remote = (T) Naming.lookup(url);
        } catch (MalformedURLException | RemoteException | NotBoundException e) {
            if (e instanceof ConnectException) {
                System.out.println("zk 连接中断");
                 // 若连接中断,则使用 urlList 中第一个 RMI 地址来查找(这是一种简单的重试方式,确保不会抛出异常)
                if (urlList.size() != 0) {
                    url = urlList.get(0);
                    return lookupService(url);
                }
            }
        }
        return remote;
    }




}

3.3.2、Client实现

package com.chb.zookeeper.client;

import java.rmi.RemoteException;

import com.chb.zookeeper.util.HelloService;

public class Client {
    public static void main(String[] args) {
        ServiceConsumer serviceConsumer = new ServiceConsumer();

        HelloService helloService = serviceConsumer.lookup();

        try {
            String result = helloService.sayHello("chb");

            System.out.println(result);

            Thread.sleep(3000);
        } catch (RemoteException | InterruptedException e) {
            e.printStackTrace();
        }

    }
}
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐