Presto KAFKA 连接器
 
 
 用于 Presto 的 Kafka 连接器允许使用 Presto 访问来自 Apache Kafka 的数据。
 
先决条件
 
 下载并安装以下 Apache 项目的最新版本。
 
Apache ZooKeeper 
Apache kafka 
启动 ZooKeeper
 
 使用以下命令启动 ZooKeeper 服务器。
 
 
  
  $ bin/zookeeper-server-start.sh config/zookeeper.properties
 
   
  
 现在,ZooKeeper 在 2181 上启动端口。
 
启动kafka
 
 使用以下命令在另一个终端中启动 Kafka。
 
 
  
  $ bin/kafka-server-start.sh config/server.properties
 
   
  
 kafka 启动后,使用的端口号为 9092、
 
TPCH 数据
 
下载 tpch-kafka
 
 
  
  $  curl-o kafka-tpch 
https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_ 
0811-1.0.sh 
 
   
  
 现在您已经使用上述命令从 Maven 中心下载了加载器。您将得到如下类似的响应。
 
 
  
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current 
                                 Dload  Upload   Total   Spent    Left  Speed 
  0     0    0     0    0     0      0      0--:--:-- 0:00:01--:--:--    0  
  5 21.6M    5 1279k    0     0  83898      0  0:04:30  0:00:15  0:04:15  129k
  6 21.6M    6 1407k    0     0  86656      0  0:04:21  0:00:16  0:04:05  131k  
 24 21.6M   24 5439k    0     0   124k      0  0:02:57  0:00:43  0:02:14  175k 
 24 21.6M   24 5439k    0     0   124k      0  0:02:58  0:00:43  0:02:15  160k 
 25 21.6M   25 5736k    0     0   128k      0  0:02:52  0:00:44  0:02:08  181k 
 ………………………..
 
   
  
 然后,使用以下命令使其可执行,
 
 
运行 tpch-kafka
 
 使用以下命令运行 kafka-tpch 程序以使用 tpch 数据预加载多个主题。
 
查询
 
 
  
  $ ./kafka-tpch load--brokers localhost:9092--prefix tpch.--tpch-type tiny 
 
   
  
结果
 
 
  
  2016-07-13T16:15:52.083+0530 INFO main io.airlift.log.Logging Logging 
to stderr
2016-07-13T16:15:52.124+0530 INFO main de.softwareforge.kafka.LoadCommand
Processing tables: [customer, orders, lineitem, part, partsupp, supplier,
nation, region]
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-1
de.softwareforge.kafka.LoadCommand Loading table 'customer' into topic 'tpch.customer'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-2
de.softwareforge.kafka.LoadCommand Loading table 'orders' into topic 'tpch.orders'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-3
de.softwareforge.kafka.LoadCommand Loading table 'lineitem' into topic 'tpch.lineitem'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-4
de.softwareforge.kafka.LoadCommand Loading table 'part' into topic 'tpch.part'...
………………………
……………………….
 
   
  
 现在,Kafka 表 customer、orders、supplier 等都使用 tpch 加载。
 
添加配置设置
 
 让我们在 Presto 服务器上添加以下 Kafka 连接器配置设置。
 
 
  
  connector.name = kafka  
kafka.nodes = localhost:9092  
kafka.table-names = tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp, 
tpch.supplier,tpch.nation,tpch.region  
kafka.hide-internal-columns = false 
 
   
  
 在上面的配置中,Kafka表是使用Kafka-tpch程序加载的。
 
启动 Presto CLI
 
 使用以下命令启动 Presto CLI,
 
 
  
  $ ./presto--server localhost:8080--catalog kafka —schema tpch;
 
   
  
 此处
 "tpch"是Kafka连接器的架构,您将收到如下响应。
 
 
列出表格
 
 关注wing 查询列出了 
 "tpch" 模式中的所有表。
 
查询
 
 
  
  presto:tpch> show tables;
 
   
  
结果
 
 
  
    Table 
----------
 customer 
 lineitem 
 nation 
 orders
 part 
 partsupp 
 region 
 supplier 
 
   
  
描述客户表
 
 以下查询描述了
 "客户"表。
 
查询
 
 
  
  presto:tpch> describe customer; 
 
   
  
结果
 
 
  
    Column           |  Type   |                   Comment 
-------------------+---------+---------------------------------------------
 _partition_id     | bigint  | Partition Id 
 _partition_offset | bigint  | Offset for the message within the partition 
 _segment_start    | bigint  | Segment start offset 
 _segment_end      | bigint  | Segment end offset 
 _segment_count    | bigint  | Running message count per segment 
 _key              | varchar | Key text 
 _key_corrupt      | boolean | Key data is corrupt 
 _key_length       | bigint  | Total number of key bytes 
 _message          | varchar | Message text 
 _message_corrupt  | boolean | Message data is corrupt 
 _message_length   | bigint  | Total number of message bytes