HCatalog教程

HCatalog 输入输出格式

HCatInputFormatHCatOutputFormat 接口用于从 HDFS 读取数据,处理后使用 MapReduce 作业将结果数据写入 HDFS。让我们详细说明输入和输出格式接口。

HCatInputFormat

HCatInputFormat 与 MapReduce 作业一起用于从 HCatalog 管理的表中读取数据。 HCatInputFormat 公开了一个 Hadoop 0.20 MapReduce API,用于读取数据,就像数据已发布到表中一样。
方法名称和描述
public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException
设置用于作业的输入。它使用给定的输入规范查询 Metastore,并将匹配的分区序列化到 MapReduce 任务的作业配置中。
public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException
设置用于作业的输入。它使用给定的输入规范查询 Metastore,并将匹配的分区序列化到 MapReduce 任务的作业配置中。
public HCatInputFormat setFilter(String filter)throws IOException
在输入表上设置过滤器。
public HCatInputFormat setProperties(Properties properties) throws IOException
为输入格式设置属性。
HCatInputFormat API 包括以下方法-
setInput setOutputSchema getTableSchema
要使用 HCatInputFormat 读取数据,首先使用正在读取的表中的必要信息实例化一个 InputJobInfo,然后调用 setInput 输入作业信息
您可以使用 setOutputSchema 方法来包含一个 投影模式,以指定输出字段。如果未指定架构,则将返回表中的所有列。您可以使用 getTableSchema 方法来确定指定输入表的表架构。

HCatOutputFormat

HCatOutputFormat 与 MapReduce 作业一起使用以将数据写入 HCatalog 管理的表。 HCatOutputFormat 公开了一个 Hadoop 0.20 MapReduce API,用于将数据写入表。当 MapReduce 作业使用 HCatOutputFormat 写入输出时,将使用为表配置的默认 OutputFormat,并在作业完成后将新分区发布到表中。
方法名称和描述
public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException
设置有关为作业写入的输出的信息。它查询元数据服务器以查找要用于表的 StorageHandler。如果分区已经发布,则会引发错误。
public static void setSchema (Configuration conf, HCatSchema schema) throws IOException
为写出到分区的数据设置架构。如果未调用,则默认情况下将表架构用于分区。
public RecordWriter <WritableComparable<?>, HCatRecord> getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException
获取作业的记录编写器。它使用 StorageHandler 的默认 OutputFormat 来获取记录编写器。
public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException
获取此输出格式的输出提交者。它确保正确提交输出。
HCatOutputFormat API 包括以下方法-
setInput setSchema getTableSchema
对 HCatOutputFormat 的第一次调用必须是 setOutput;任何其他调用都会抛出异常,说明输出格式未初始化。
正在写出的数据的架构由 setSchema 方法指定。您必须调用此方法,提供您正在编写的数据模式。如果您的数据具有与表架构相同的架构,您可以使用 HCatOutputFormat.getTableSchema() 获取表架构,然后将其传递给 setSchema()

示例

下面的 MapReduce 程序从一个表中读取数据,它假定在第二列("第 1 列")中有一个整数,并计算它找到的每个不同值的实例数。也就是说,它相当于" select col1, count(*) from $table group by col1;"。
例如,如果第二列中的值为 {1, 1, 1, 3, 3, 5},则程序将产生以下值和计数的输出-
1, 3
3, 2
5, 1
现在让我们看看程序代码-
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;
import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;
public class GroupByAge extends Configured implements Tool {
   public static class Map extends Mapper<WritableComparable, 
      HCatRecord, IntWritable, IntWritable> {
      int age;
    
      @Override
      protected void map(
         WritableComparable key, HCatRecord value,
         org.apache.hadoop.mapreduce.Mapper<WritableComparable,
         HCatRecord, IntWritable, IntWritable>.Context context
      )throws IOException, InterruptedException {
         age = (Integer) value.get(1);
         context.write(new IntWritable(age), new IntWritable(1));
      }
   }
  
   public static class Reduce extends Reducer<IntWritable, IntWritable,
      WritableComparable, HCatRecord> {
      @Override
      protected void reduce(
         IntWritable key, java.lang.Iterable<IntWritable> values,
         org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
         WritableComparable, HCatRecord>.Context context
      )throws IOException ,InterruptedException {
         int sum = 0;
         Iterator<IntWritable> iter = values.iterator();
      
         while (iter.hasNext()) {
            sum++;
            iter.next();
         }
      
         HCatRecord record = new DefaultHCatRecord(2);
         record.set(0, key.get());
         record.set(1, sum);
         context.write(null, record);
      }
   }
  
   public int run(String[] args) throws Exception {
      Configuration conf = getConf();
      args = new GenericOptionsParser(conf, args).getRemainingArgs();
    
      String serverUri = args[0];
      String inputTableName = args[1];
      String outputTableName = args[2];
      String dbName = null;
      String principalID = System
    
      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
      if (principalID != null)
      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
      Job job = new Job(conf, "GroupByAge");
      HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));
      // initialize HCatOutputFormat
      job.setInputFormatClass(HCatInputFormat.class);
      job.setJarByClass(GroupByAge.class);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);
    
      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(WritableComparable.class);
      job.setOutputValueClass(DefaultHCatRecord.class);
    
      HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
      HCatSchema s = HCatOutputFormat.getTableSchema(job);
      System.err.println("INFO: output schema explicitly set for writing:" + s);
      HCatOutputFormat.setSchema(job, s);
      job.setOutputFormatClass(HCatOutputFormat.class);
      return (job.waitForCompletion(true) ? 0 : 1);
   }
  
   public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new GroupByAge(), args);
      System.exit(exitCode);
   }
}
在编译上述程序之前,您必须下载一些 jars 并将它们添加到此应用程序的 classpath。您需要下载所有 Hive jar 和 HCatalog jar(HCatalog-core-0.5.0.jar、hive-metastore-0.10.0.jar、libthrift-0.7.0.jar、hive-exec-0.10.0.jar、 libfb303-0.7.0.jar、jdo2-api-2.3-ec.jar、slf4j-api-1.6.1.jar)。
使用以下命令将那些 jar 文件从 local 复制到 HDFS 并将它们添加到 classpath .
bin/hadoop fs-copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs-copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs-copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs-copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs-copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs-copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs-copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp
export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar
使用以下命令编译并执行给定的程序。
$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive
现在,检查您的输出目录(hdfs:user/tmp/hive)是否有输出(part_0000、part_0001)。
昵称: 邮箱:
Copyright © 2022 立地货 All Rights Reserved.
备案号:京ICP备14037608号-4