Sqoop教程

Zookeeper API

ZooKeeper 有针对 Java 和 C 的官方 API 绑定。ZooKeeper 社区为大多数语言(.NET、python 等)提供非官方 API。使用 ZooKeeper API,应用程序可以连接、交互、操作数据、协调并最终与 ZooKeeper 集成断开连接。
ZooKeeper API 具有丰富的功能集,可以以简单且安全的方式获取 ZooKeeper 集成的所有功能。 ZooKeeper API 提供同步和异步方法。
ZooKeeper 集成和 ZooKeeper API 在各个方面完全互补,它以极大的方式使开发人员受益。让我们在本章中讨论 Java 绑定。

ZooKeeper API 基础

与 ZooKeeper 集成交互的应用程序被称为 ZooKeeper Client 或简称为 Client
Znode 是 ZooKeeper ensemble 的核心组件,ZooKeeper API 提供了一小组方法来使用 ZooKeeper ensemble 操作 znode 的所有细节。
客户端应该按照下面给出的步骤与 ZooKeeper 集合进行清晰干净的交互。
连接到 ZooKeeper 集合。 ZooKeeper 集成为客户端分配一个会话 ID。 定期向服务器发送心跳。否则,ZooKeeper 集合使会话 ID 过期,客户端需要重新连接。 只要会话 ID 处于活动状态,就获取/设置 znode。 在所有任务完成后,断开与 ZooKeeper 整体的连接。如果客户端长时间处于非活动状态,ZooKeeper 集成将自动断开客户端连接。

Java 绑定

让我们了解本章中最重要的 ZooKeeper API 集。 ZooKeeper API 的核心部分是 ZooKeeper 类。它提供了在其构造函数中连接 ZooKeeper 集成的选项,并具有以下方法-
connect-连接到 ZooKeeper 集合 create-创建一个 znode exists-检查 znode 是否存在及其信息 getData-从特定 znode 获取数据 setData-在特定 znode 中设置数据 getChildren-获取特定 znode 中可用的所有子节点 delete-获取特定的 znode 及其所有子节点 close-关闭连接

连接到 ZooKeeper Ensemble

ZooKeeper 类通过其构造函数提供连接功能。构造函数的签名如下-
ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
哪里,
connectionString-ZooKeeper 集成主机。 sessionTimeout-以毫秒为单位的会话超时。 watcher-一个实现"Watcher"接口的对象。ZooKeeper 集成通过 watcher 对象返回连接状态。
让我们创建一个新的辅助类 ZooKeeperConnection 并添加一个方法 connectconnect 方法创建一个 ZooKeeper 对象,连接到 ZooKeeper 集合,然后返回该对象。
这里 CountDownLatch 用于停止(等待)主进程,直到客户端与 ZooKeeper 集合连接。
ZooKeeper 集成通过 Watcher 回调回复连接状态。一旦客户端连接到 ZooKeeper 集合,Watcher 回调将被调用,并且 Watcher 回调调用 CountDownLatchcountDown 方法释放锁, await 在主进程中。
这是与 ZooKeeper 集成连接的完整代码。

编码:ZooKeeperConnection.java

// import java classes
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
// import zookeeper classes
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
public class ZooKeeperConnection {
   // declare zookeeper instance to access ZooKeeper ensemble
   private ZooKeeper zoo;
   final CountDownLatch connectedSignal = new CountDownLatch(1);
   // Method to connect zookeeper ensemble.
   public ZooKeeper connect(String host) throws IOException,InterruptedException {
  
      zoo = new ZooKeeper(host,5000,new Watcher() {
    
         public void process(WatchedEvent we) {
            if (we.getState() == KeeperState.SyncConnected) {
               connectedSignal.countDown();
            }
         }
      });
    
      connectedSignal.await();
      return zoo;
   }
   // Method to disconnect from zookeeper server
   public void close() throws InterruptedException {
      zoo.close();
   }
}
保存上面的代码,在下一节连接ZooKeeper ensemble时会用到。

创建一个Znode

ZooKeeper 类提供了 create 方法来在 ZooKeeper 集合中创建一个新的 znode。 create 方法的签名如下-
create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
哪里,
path-Znode 路径。例如,/myapp1、/myapp2、/myapp1/mydata1、myapp2/mydata1/myanothersubdata data-存储在指定 znode 路径中的数据 acl-要创建的节点的访问控制列表。 ZooKeeper API 提供了一个静态接口 ZooDefs.Ids 来获取一些基本的 acl 列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE 返回打开 znode 的 acl 列表。 createMode-节点的类型,临时的、顺序的或两者兼而有之。这是一个枚举
让我们创建一个新的 Java 应用程序来检查 ZooKeeper API 的 create 功能。创建文件 ZKCreate.java。在 main 方法中,创建一个 ZooKeeperConnection 类型的对象并调用 connect 方法连接到 ZooKeeper 集合。
connect 方法将返回 ZooKeeper 对象 zk。现在,使用自定义 路径数据调用 zk对象的 create方法。
创建znode的完整程序代码如下-

编码:ZKCreate.java

import java.io.IOException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
public class ZKCreate {
   // create static instance for zookeeper class.
   private static ZooKeeper zk;
   // create static instance for ZooKeeperConnection class.
   private static ZooKeeperConnection conn;
   // Method to create znode in zookeeper ensemble
   public static void create(String path, byte[] data) throws 
      KeeperException,InterruptedException {
      zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
      CreateMode.PERSISTENT);
   }
   public static void main(String[] args) {
      // znode path
      String path = "/MyFirstZnode"; // Assign path to znode
      // data in byte array
      byte[] data = "My first zookeeper app”.getBytes(); // Declare data
    
      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         create(path, data); // Create the data to the specified path
         conn.close();
      } catch (Exception e) {
         System.out.println(e.getMessage()); //catch error message
      }
   }
}
一旦应用程序被编译和执行,一个带有指定数据的 znode 将在 ZooKeeper ensemble 中创建。您可以使用 ZooKeeper CLI zkCli.sh 检查它。
cd /path/to/zookeeper
bin/zkCli.sh
>>> get /MyFirstZnode

Exists – 检查 Znode 是否存在

ZooKeeper 类提供了 exists 方法来检查 znode 的存在。如果指定的 znode 存在,则它返回 znode 的元数据。 exists 方法的签名如下-
exists(String path, boolean watcher)
哪里,
路径-Znode路径 watcher-布尔值,用于指定是否观看指定的 znode
让我们创建一个新的 Java 应用程序来检查 ZooKeeper API 的"存在​​"功能。创建一个文件 "ZKExists.java"。在main方法中,使用 "ZooKeeperConnection"对象创建ZooKeeper对象, "zk"。然后,使用自定义的 "path"调用 "zk"对象的 "exists"方法。完整列表如下-

编码:ZKExists.java

import java.io.IOException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
public class ZKExists {
   private static ZooKeeper zk;
   private static ZooKeeperConnection conn;
   // Method to check existence of znode and its status, if znode is available.
   public static Stat znode_exists(String path) throws
      KeeperException,InterruptedException {
      return zk.exists(path, true);
   }
   public static void main(String[] args) throws InterruptedException,KeeperException {
      String path = "/MyFirstZnode"; // Assign znode to the specified path
      
      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         Stat stat = znode_exists(path); // Stat checks the path of the znode
        
         if(stat != null) {
            System.out.println("Node exists and the node version is " +
            stat.getVersion());
         } else {
            System.out.println("Node does not exists");
         }
        
      } catch(Exception e) {
         System.out.println(e.getMessage()); // Catches error messages
      }
   }
}
编译并执行应用程序后,您将获得以下输出。
Node exists and the node version is 1.

getData 方法

ZooKeeper 类提供了 getData 方法来获取附加在指定 znode 中的数据及其状态。 getData 方法的签名如下-
getData(String path, Watcher watcher, Stat stat)
哪里,
path-Znode 路径。 watcher-Watcher 类型的回调函数。当指定 znode 的数据发生变化时,ZooKeeper ensemble 将通过 Watcher 回调进行通知。这是一次性通知。 stat-返回 znode 的元数据。
让我们创建一个新的 Java 应用程序来了解 ZooKeeper API 的 getData 功能。创建文件 ZKGetData.java。在 main 方法中,使用 ZooKeeperConnection 对象创建一个 ZooKeeper 对象 zk。然后,使用自定义路径调用zk对象的 getData方法。
这里是从指定节点获取数据的完整程序代码-

编码:ZKGetData.java

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
public class ZKGetData {
   private static ZooKeeper zk;
   private static ZooKeeperConnection conn;
   public static Stat znode_exists(String path) throws 
      KeeperException,InterruptedException {
      return zk.exists(path,true);
   }
   public static void main(String[] args) throws InterruptedException, KeeperException {
      String path = "/MyFirstZnode";
      final CountDownLatch connectedSignal = new CountDownLatch(1);
    
      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         Stat stat = znode_exists(path);
      
         if(stat != null) {
            byte[] b = zk.getData(path, new Watcher() {
        
               public void process(WatchedEvent we) {
          
                  if (we.getType() == Event.EventType.None) {
                     switch(we.getState()) {
                        case Expired:
                        connectedSignal.countDown();
                        break;
                     }
              
                  } else {
                     String path = "/MyFirstZnode";
              
                     try {
                        byte[] bn = zk.getData(path,
                        false, null);
                        String data = new String(bn,
                        "UTF-8");
                        System.out.println(data);
                        connectedSignal.countDown();
              
                     } catch(Exception ex) {
                        System.out.println(ex.getMessage());
                     }
                  }
               }
            }, null);
        
            String data = new String(b, "UTF-8");
            System.out.println(data);
            connectedSignal.await();
        
         } else {
            System.out.println("Node does not exists");
         }
      } catch(Exception e) {
        System.out.println(e.getMessage());
      }
   }
}
一旦应用程序被编译并执行,您将得到以下输出
My first zookeeper app
应用程序将等待来自 ZooKeeper 集合的进一步通知。使用 ZooKeeper CLI zkCli.sh 更改指定 znode 的数据。
cd /path/to/zookeeper
bin/zkCli.sh
>>> set /MyFirstZnode Hello
现在,应用程序将打印以下输出并退出。
Hello

setData 方法

ZooKeeper 类提供了 setData 方法来修改附加在指定 znode 中的数据。 setData 方法的签名如下-
setData(String path, byte[] data, int version)
哪里,
路径-Znode路径 data-存储在指定 znode 路径中的数据。 version-znode 的当前版本。每当数据发生变化时,ZooKeeper 都会更新 znode 的版本号。
现在让我们创建一个新的 Java 应用程序来了解 ZooKeeper API 的 setData 功能。创建文件 ZKSetData.java。在 main 方法中,使用 ZooKeeperConnection 对象创建一个 ZooKeeper 对象 zk。然后,使用指定的路径、新数据和节点版本调用 zk 对象的 setData 方法。
这是修改指定znode中附加数据的完整程序代码。

代码:ZKSetData.java

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import java.io.IOException;
public class ZKSetData {
   private static ZooKeeper zk;
   private static ZooKeeperConnection conn;
   // Method to update the data in a znode. Similar to getData but without watcher.
   public static void update(String path, byte[] data) throws
      KeeperException,InterruptedException {
      zk.setData(path, data, zk.exists(path,true).getVersion());
   }
   public static void main(String[] args) throws InterruptedException,KeeperException {
      String path= "/MyFirstZnode";
      byte[] data = "Success".getBytes(); //Assign data which is to be updated.
    
      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         update(path, data); // Update znode data to the specified path
      } catch(Exception e) {
         System.out.println(e.getMessage());
      }
   }
}
应用程序编译执行后,指定znode的数据会发生变化,可以使用ZooKeeper CLI查看, zkCli.sh
cd /path/to/zookeeper
bin/zkCli.sh
>>> get /MyFirstZnode

getChildren 方法

ZooKeeper 类提供了 getChildren 方法来获取特定 znode 的所有子节点。 getChildren 方法的签名如下-
getChildren(String path, Watcher watcher)
哪里,
path-Znode 路径。 watcher-"Watcher"类型的回调函数。ZooKeeper 集成将在指定的 znode 被删除或 znode 下的子节点被创建/删除时发出通知。这是一次性通知。

编码:ZKGetChildren.java

import java.io.IOException;
import java.util.*;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
public class ZKGetChildren {
   private static ZooKeeper zk;
   private static ZooKeeperConnection conn;
   // Method to check existence of znode and its status, if znode is available.
   public static Stat znode_exists(String path) throws 
      KeeperException,InterruptedException {
      return zk.exists(path,true);
   }
   public static void main(String[] args) throws InterruptedException,KeeperException {
      String path = "/MyFirstZnode"; // Assign path to the znode
    
      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         Stat stat = znode_exists(path); // Stat checks the path
         if(stat!= null) {
            //“getChildren” method-get all the children of znode.It has two
            args, path and watch
            List <String> children = zk.getChildren(path, false);
            for(int i = 0; i < children.size(); i++)
            System.out.println(children.get(i)); //Print children's
         } else {
            System.out.println("Node does not exists");
         }
      } catch(Exception e) {
         System.out.println(e.getMessage());
      }
   }
}
在运行程序之前,让我们使用 ZooKeeper CLI zkCli.sh/MyFirstZnode 创建两个子节点。
cd /path/to/zookeeper
bin/zkCli.sh
>>> create /MyFirstZnode/myfirstsubnode Hi
>>> create /MyFirstZnode/mysecondsubmode Hi
现在,编译g 并运行程序将输出上面创建的znodes。
myfirstsubnode
mysecondsubnode

删除一个Znode

ZooKeeper 类提供了 delete 方法来删​​除指定的 znode。 delete 方法的签名如下-
delete(String path, int version)
哪里,
path-Znode 路径。 version-znode 的当前版本。
让我们创建一个新的 Java 应用程序来了解 ZooKeeper API 的 删除功能。创建文件 ZKDelete.java。在 main 方法中,使用 ZooKeeperConnection 对象创建一个 ZooKeeper 对象 zk。然后,使用指定的 路径和节点版本调用 zk对象的 delete方法。
删除一个znode的完整程序代码如下-

编码:ZKDelete.java

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
public class ZKDelete {
   private static ZooKeeper zk;
   private static ZooKeeperConnection conn;
   // Method to check existence of znode and its status, if znode is available.
   public static void delete(String path) throws KeeperException,InterruptedException {
      zk.delete(path,zk.exists(path,true).getVersion());
   }
   public static void main(String[] args) throws InterruptedException,KeeperException {
      String path = "/MyFirstZnode"; //Assign path to the znode
    
      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         delete(path); //delete the node with the specified path
      } catch(Exception e) {
         System.out.println(e.getMessage()); // catches error messages
      }
   }
}
昵称: 邮箱:
Copyright © 2022 立地货 All Rights Reserved.
备案号:京ICP备14037608号-4