06_zookeeper原生Java API使用

摘要:
【Zookeeper构造方法概述】/***客户端和zk服务端的连接是一个异步的过程*当连接成功后,客户端会收到一个watch通知**ZooKeeper(StringconnectString,intsessionTimeout,Watcherwatcher,*longsessionId,byte[]sessionPasswd,booleancanBeReadOnly)*参数介绍*connectSt

【Zookeeper构造方法概述】

/**
     * 客户端和zk服务端的连接是一个异步的过程
     * 当连接成功后,客户端会收到一个watch通知
     *
     * ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
     *          long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
     * 参数介绍
     * connectString:连接服务器的ip字符串
     *      比如:"192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"
     *      可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群
     *      也可以在ip后加路径
     * sessionTimeout:超时时间,心跳收不到了,那就超时
     * watcher:通知事件,如果有对应的事件触发,则会收到一个通知:如果不需要,那就设为null
     * sessionId:会话的id
     * sessionPasswd:会话密码,当会话丢失后,可以依据sessionId和sessionPasswd重新获取会话
     * canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写,
     *                此时数据被读取到的可能是旧数据,一般设置为false,不推荐使用
     *
     */
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

【Zookeeper API 客户端连接服务端例子】

package com.zk.demo;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

/**
 * Created by HigginCui on 2018/9/20.
 */
public classZkConnect implements Watcher{

    public static final String zkServerPath = "127.0.0.1:2181";

    public static final Integer timeout = 5000;

    /**
     * 客户端和zk服务端的连接是一个异步的过程
     * 当连接成功后,客户端会收到一个watch通知
     */
    public static voidmain(String[] args) throws Exception{
        ZooKeeper zk = new ZooKeeper(zkServerPath,timeout,newZkConnect());

        for (int i=0;i<20;i++) {
            Thread.sleep(10);  //休眠10ms,在这个过程中,连接状态会从CONNECTING--->CONNECTED
            System.out.println(i+"---"+zk.getState());
        }

    }

    @Override
    public voidprocess(WatchedEvent watchedEvent) {
        System.err.println("收到zk的watch通知----");
    }
}

【运行结果】

06_zookeeper原生Java API使用第1张

【使用CountDownLatch优化zk连接过程】

packagecom.zk.demo;

importorg.apache.zookeeper.WatchedEvent;
importorg.apache.zookeeper.Watcher;
importorg.apache.zookeeper.ZooKeeper;

importjava.util.concurrent.CountDownLatch;

public class ZkConnect implementsWatcher{

    public static final String zkServerPath = "127.0.0.1:2181";

    public static final Integer timeout = 5000;

    private static CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throwsException{
        ZooKeeper zk = new ZooKeeper(zkServerPath,timeout,newZkConnect());

        System.out.println("连接状态---" +zk.getState());
        latch.await();
        System.out.println("连接状态---" +zk.getState());

    }

    @Override
    public voidprocess(WatchedEvent watchedEvent) {
        System.err.println("收到zk的watch通知----");
        latch.countDown();
    }
}

【运行结果】

06_zookeeper原生Java API使用第2张

【创建节点】

/*** 
* String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
 * 
 * 同步或异步创建节点,都不支持子节点的递归操作,异步有一个callBack方法
 * 参数
 * path:创建的路径
 * data:存储的数据,byte[]类型
 * acl:权限控制策略
 *      Ids.OPEN_ACL_UNSAFE ---> world:anyone:crdwa
 *      CREATE_ALL_ACL ---> auth:user:password:cdrwa
 * createMode:节点类型,是一个枚举
 *      CreateMode.PERSISTENT:           持久节点
 *      CreateMode.PERSISTENT_SEQUENTIAL:持久顺序节点
 *      CreateMode.EPHEMERAL:            临时节点
 *      CreateMode.EPHEMERAL_SEQUENTIAL: 临时顺序节点
 */

【创建一个临时节点】

packagecom.zk.demo;

import org.apache.zookeeper.*;
importjava.util.concurrent.CountDownLatch;

/*** Created by HigginCui on 2018/9/21.
 */
public class ZkNodeOperator implementsWatcher{


    private static ZooKeeper zooKeeper = null;

    public static final String zkServerPath = "127.0.0.1:2181";

    public static final Integer timeout = 5000;

    private static CountDownLatch latch = new CountDownLatch(1);

    publicZkNodeOperator() {
    }

    private static voidinit()  {
        try{
            zooKeeper = new ZooKeeper(zkServerPath,timeout,newZkNodeOperator());
            latch.await();
        }catch(Exception e){
            e.printStackTrace();
            if(zooKeeper!=null){
                try{
                    zooKeeper.close();
                }catch(InterruptedException e1){
                    e1.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throwsException{
        init();
//创建/testnode临时节点,包含数据为haha,权限是OPEN_ACL_UNSAFE,类型为临时节点 zooKeeper.create(
"/testnode","haha".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL); } /*** 观察者 */@Override public voidprocess(WatchedEvent event) { System.err.println("收到zk的watch通知----" +event.getPath()+"---"+event.getState()); latch.countDown(); } }

【运行结果 直接看打开zkCli.sh连接】

06_zookeeper原生Java API使用第3张

【创建持久节点,并且产生一个回调通知】

packagecom.zk.demo;

import org.apache.zookeeper.*;
importjava.util.concurrent.CountDownLatch;

/*** Created by HigginCui on 2018/9/21.
 */
public class ZkNodeOperator implementsWatcher{


    private static ZooKeeper zooKeeper = null;

    public static final String zkServerPath = "127.0.0.1:2181";

    public static final Integer timeout = 5000;

    private static CountDownLatch latch = new CountDownLatch(1);

    publicZkNodeOperator() {
    }

    private static voidinit()  {
        try{
            zooKeeper = new ZooKeeper(zkServerPath,timeout,newZkNodeOperator());
            latch.await();
        }catch(Exception e){
            e.printStackTrace();
            if(zooKeeper!=null){
                try{
                    zooKeeper.close();
                }catch(InterruptedException e1){
                    e1.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throwsException{
        init();
        //create(String path, byte[] data, List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
        String ctx = "{'create':'success'}";
        zooKeeper.create("/testnode","haha".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,newCreateNodeCallBack(),ctx);

        System.in.read();  //线程停在这里,等待回调线程的打印结果
}


    /*** 观察者
     */@Override
    public voidprocess(WatchedEvent event) {
        System.err.println("收到zk的watch通知----" +event.getPath()+"---"+event.getState());
        latch.countDown();

    }
}

/*** 创建节点回调通知类
 */
class CreateNodeCallBack implementsAsyncCallback.StringCallback{
    @Override
    public void processResult(inti, String path, Object ctx, String s1) {
        System.out.println("【回调方法】:创建节点的路径:"+path);
        System.out.println("【回调方法】:ctx==="+(String)ctx);
    }
}

【运行结果——控制台打印】

06_zookeeper原生Java API使用第4张

【运行结果——zk客户端连接】

06_zookeeper原生Java API使用第5张

【修改节点的数据】

/*** path:节点路径
* data:修改后的数据
* version:版本号,这里的版本号必须是正要修改的节点数据的版本号!!
*/Stat setData(String path, byte[] data, int version)

修改节点代码示例

Stat stat = zooKeeper.setData("/testnode", "Higgin".getBytes(), 1);
System.out.println("修改后数据的版本号为---"+stat.getVersion());

【运行结果】

先看下对应的数据版本号信息

06_zookeeper原生Java API使用第6张

看下控制台运行结果

06_zookeeper原生Java API使用第7张

修改下代码,将版本号改为0,与zk上要修改的数据的版本号保持一致

Stat stat = zooKeeper.setData("/testnode", "Higgin".getBytes(), 0);
System.out.println("修改后数据的版本号为---"+stat.getVersion());

【查看运行结果】

06_zookeeper原生Java API使用第8张

06_zookeeper原生Java API使用第9张

【删除节点】

/**
* 删除节点
* 注意:version版本号必须和要删除的节点数据的版本号一致
*/
public void delete(String path, int version)

【删除实例】

zooKeeper.delete("/testnode",1);

【运行结果】

删除前,先看下对应节点数据

06_zookeeper原生Java API使用第10张

执行删除代码后,可以看到节点已经不存在了

06_zookeeper原生Java API使用第11张

【获取节点数据】

public byte[] getData(String path, boolean watch, Stat stat) 
public byte[] getData(String path, Watcher watcher, Stat stat)
public voidgetData(String path, Watcher watcher, DataCallback cb, Object ctx)
public void getData(String path, boolean watch, DataCallback cb, Object ctx)

[获取节点数据实例]

byte[] dateBytes = zooKeeper.getData("/testnode", true, null);
String str = newString(dateBytes);
System.err.println("获取的数据为==="+str);

[运行结果]

06_zookeeper原生Java API使用第12张

【获取子节点的列表数据】

public List<String>getChildren(String path, Watcher watcher)
public List<String>getChildren(String path, boolean watch, Stat stat)
public List<String>getChildren(String path, boolean watch, Stat stat) 
public voidgetChildren(String path, Watcher watcher, ChildrenCallback cb, Object ctx)
......

[获取子节点列表实例]

List<String> childList = zooKeeper.getChildren("/testnode", null);
for(String child : childList) {
    System.err.println("子节点=="+child);
}

[ 运行结果 ]

先看下zk上对应的数据

06_zookeeper原生Java API使用第13张

[ 控制台运行结果 ]

06_zookeeper原生Java API使用第14张

免责声明:文章转载自《06_zookeeper原生Java API使用》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇mybatis-plus报错解决Invalid bound statement (not found)错误ASP漏洞+SQL注入的入侵方法下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

C#第三方Aspose.Words.dll导出Word(书签模板)方式说明

项目有遇到需要导出Word,在别人写好的基础上去修改样式,导出后发现样式不正确不整齐,于是采用模板的方式重新导出 1.模板word文件的制作,本人用office2013,在设计好需要的整个表格之后,在你需要替换的位置"插入"--书签 并命名,此命名需要在程序中进行替换 将做好的模板word文件放在程序目录下 2.引用Aspose.Words.dll 3.新...

【转载】C/C++中的char,wchar,TCHAR

点击这里查看原文章 总体简介:由于字符编码的不同,在C++中有三种对于字符类型:char, wchar_t , TCHAR。其实TCHAR不能算作一种类型,他紧紧是一个宏。我们都知道,宏在预编译的时候会被替换成相应的内容。TCHAR 在使用多字节编码时被定义成char,在Unicode编码时定义成wchar_t。 1.VC++中的char,wchar_t...

写入DLL文件

using System;using System.Collections.Generic;using System.ComponentModel;using System.Data;using System.Drawing;using System.Text;using System.Windows.Forms;using System.Runtime....

Java对文件的16进制读取和操作

大家可以参考一下源代码的相关部分注释,然后写出自己的16进制处理程序。有几个重点地方:16进制字符串-》10进制数 int input = Integer.parseInt("Str", 16)10进制整数-》16进制字符串 String hex = Integer.toHexString(int)文件读取方法 作为2进制文件直接读取,一个byte为单位的...

CORS跨域实现思路及相关解决方案

本篇包括以下内容: CORS 定义 CORS 对比 JSONP CORS,BROWSER支持情况 主要用途 Ajax请求跨域资源的异常 CORS 实现思路 安全说明 CORS 几种解决方案 自定义CORSFilter Nginx 配置支持Ajax跨域 支持多域名配置的CORS Filter keyword:cors,跨域,ajax,403,fi...

java 如何在pdf中生成表格

1、目标   在pdf中生成一个可变表头的表格,并向其中填充数据。通过泛型动态的生成表头,通过反射动态获取实体类(我这里是User)的get方法动态获得数据,从而达到动态生成表格。   每天生成一个文件夹存储生成的pdf文件(文件夹的命名是年月日时间戳),如:20151110   生成的文件可能在毫秒级别,故文件的命名规则是"到毫秒的时间戳-uuid",如...