- 浏览: 196458 次
- 性别:
- 来自: 广州
文章分类
最新评论
-
永立s:
这篇博客帮我解决了一个问题,十分感谢.
HBase表增加snappy压缩 -
BlackWing:
日志是job运行日志,看你怎么配置了,一般就在hadoop安装 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误 -
heymaomao:
heymaomao 写道有两个问题,想请教下楼主 第一是日志楼 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误 -
heymaomao:
有两个问题,想请教下楼主 第一是日志楼主到底看的是哪个日志文件 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误 -
atomduan:
本地的Unix 进程创建失败,检查下服务器内存是否够用,是不是 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误
转载请标明出处:http://blackwing.iteye.com/blog/1991380
hbase自带了ImportTsv类,可以直接把tsv格式(官方教材显示,是\t分割各个字段的文本格式)生成HFile,并且使用另外一个类org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles直接把HFile移动到hbase对应的hdfs目录。
PS:网上看到一个XD说,直接生成HFile并入库HBase效率不如先生成HFile,再通过LoadIncrementalHFiles移动文件到hbase目录高,这点没有验证,我的做法也是先生成,再move。
官方教材在此:
但ImportTsv功能对我来说不适合,例如文件格式为:
ImportTsv导入的命令为:
它生成的表格式为:
而我要求的格式是:
所以还是自己写MR处理数据方便。
Mapper:
job:
生成的HFile文件在hdfs的/output目录下,已经根据cf名称建好文件目录:
其中:
根据其源码知道,会自动为job设置好以下参数:
HFileOutputFormat只支持写单个column family,如果有多个cf,则需要写多个job来实现了。
hbase自带了ImportTsv类,可以直接把tsv格式(官方教材显示,是\t分割各个字段的文本格式)生成HFile,并且使用另外一个类org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles直接把HFile移动到hbase对应的hdfs目录。
PS:网上看到一个XD说,直接生成HFile并入库HBase效率不如先生成HFile,再通过LoadIncrementalHFiles移动文件到hbase目录高,这点没有验证,我的做法也是先生成,再move。
官方教材在此:
http://hbase.apache.org/book/ops_mgt.html#importtsv
但ImportTsv功能对我来说不适合,例如文件格式为:
topsid uid roler_num typ time 10 111111 255 0 1386553377000
ImportTsv导入的命令为:
bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,kq:topsid,kq:uid,kq:roler_num,kq:type -Dimporttsv.bulk.output=hdfs://storefile-outputdir <hdfs-data-inputdir>
它生成的表格式为:
row : 10 cf : kq qualifier: topsid value: 10 .....
而我要求的格式是:
row : 10-111111-255 cf : kq qualifier: 0 value: 1
所以还是自己写MR处理数据方便。
Mapper:
/* * adminOnOff.log 文件格式: * topsid uid roler_num typ time * */ public class HFileImportMapper2 extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> { protected SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); protected final String CF_KQ="kq";//考勤 protected final int ONE=1; @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); System.out.println("line : "+line); String[] datas = line.split("\\s+"); // row格式为:yyyyMMdd-sid-uid-role_num-timestamp-typ String row = sdf.format(new Date(Long.parseLong(datas[4]))) + "-" + datas[0] + "-" + datas[1] + "-" + datas[2] + "-" + datas[4] + "-" + datas[3]; ImmutableBytesWritable rowkey = new ImmutableBytesWritable( Bytes.toBytes(row)); KeyValue kv = new KeyValue(Bytes.toBytes(row),this.CF_KQ.getBytes(), datas[3].getBytes(),Bytes.toBytes(this.ONE)); context.write(rowkey, kv); } }
job:
public class GenHFile2 { public static void main(String[] args) { Configuration conf = new Configuration(); conf.addResource("myConf.xml"); String input = conf.get("input"); String output = conf.get("output"); String tableName = conf.get("source_table"); System.out.println("table : "+tableName); HTable table; try { //运行前,删除已存在的中间输出目录 try { FileSystem fs = FileSystem.get(URI.create(output), conf); fs.delete(new Path(output),true); fs.close(); } catch (IOException e1) { e1.printStackTrace(); } table = new HTable(conf,tableName.getBytes()); Job job = new Job(conf); job.setJobName("Generate HFile"); job.setJarByClass(HFileImportMapper2.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(HFileImportMapper2.class); FileInputFormat.setInputPaths(job, input); //job.setReducerClass(KeyValueSortReducer.class); //job.setMapOutputKeyClass(ImmutableBytesWritable.class); //job.setMapOutputValueClass(KeyValue.class); job.getConfiguration().set("mapred.mapoutput.key.class", "org.apache.hadoop.hbase.io.ImmutableBytesWritable"); job.getConfiguration().set("mapred.mapoutput.value.class", "org.apache.hadoop.hbase.KeyValue"); //job.setOutputFormatClass(HFileOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(output)); //job.setPartitionerClass(SimpleTotalOrderPartitioner.class); HFileOutputFormat.configureIncrementalLoad(job,table); try { job.waitForCompletion(true); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); } } }
生成的HFile文件在hdfs的/output目录下,已经根据cf名称建好文件目录:
hdfs://namenode/output/kq/601c5029fb264dc8869a635043c24560
其中:
HFileOutputFormat.configureIncrementalLoad(job,table);
根据其源码知道,会自动为job设置好以下参数:
public static void configureIncrementalLoad(Job job, HTable table) throws IOException { Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(HFileOutputFormat.class); // Based on the configured map output class, set the correct reducer to properly // sort the incoming values. // TODO it would be nice to pick one or the other of these formats. if (KeyValue.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(KeyValueSortReducer.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); } else if (Text.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(TextSortReducer.class); } else { LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); } conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); // Use table's region boundaries for TOP split points. LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName())); List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); job.setNumReduceTasks(startKeys.size()); configurePartitioner(job, startKeys); // Set compression algorithms based on column families configureCompression(table, conf); configureBloomType(table, conf); configureBlockSize(table, conf); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured."); }
HFileOutputFormat只支持写单个column family,如果有多个cf,则需要写多个job来实现了。
发表评论
-
新版hadoop MultipleOutputs多文件输出
2015-03-11 14:22 3472转载请标明出处:http://blackwing.iteye. ... -
解决直接读HFile时因表数据写入而导致文件目录变化问题
2015-03-02 18:22 1491转载请标明出处:http://blackwing.iteye. ... -
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误
2014-08-21 18:18 23658转载请标明出处:http://blackwing.iteye. ... -
LoadIncrementalHFiles是copy而不是move的疑惑
2013-12-19 10:57 4062转载请标明出处:http://blackwing.iteye. ... -
NullPointerException SerializationFactory.getSerializer解决
2013-12-04 17:30 1587转载请标明出处:http://blackwing.iteye. ... -
Hadoop的Text类getBytes字节数据put到HBase后有多余字符串问题
2013-11-21 15:53 2099转载请标明出处:http://blackwing.iteye. ... -
编译YCSB 解决Not a host:port pair问题
2013-09-18 17:25 1928转载请标明出处:http://blackwing.iteye. ... -
HBase使用SNAPPY压缩遇到compression test fail问题解决
2013-09-18 10:51 10782转载请标明出处:http://blackwing.iteye. ... -
HBase表增加snappy压缩
2013-09-13 17:54 4292转载请标明来源:http://blackwing.iteye. ... -
hadoop 1.0.3增加snappy压缩
2013-09-11 17:27 1767转载请标明来源:http://blackwing.iteye. ... -
把hadoop的metrics加入ganglia监控
2013-09-04 17:02 1545hadoop的metrics加入ganglia其实是很简单的, ... -
ROOT不在线的另外一种原因及解决办法
2013-07-29 14:28 1460转载请声明出处:http://blackwing.iteye. ... -
enable和disable表时出现表未disable/enable异常处理
2013-07-29 11:42 4892转载请标明出处:http://blackwing.iteye. ... -
MultithreadedMapper多线程读取数据
2013-04-27 15:51 0由于当前业务需求读取HBase表时,会存在数据倾斜,大部分数据 ... -
shuffle & sort解释
2013-04-16 17:31 1209转载请标明出处:http://blackwing.iteye. ... -
hadoop的 IncompatibleClassChangeError
2013-02-06 17:26 2048开发环境中,使用的是官方版的hadoop 1.0.1版,而集群 ... -
HBase的start key和end key疑惑
2013-02-05 15:57 4858转载请标明来源:http://blackwing.iteye. ... -
HBase的coprocessor分拆HRegion
2013-02-04 15:15 3269引用转载请注明出处,文章链接:http://blackwing ... -
分拆TableSplit 让多个mapper同时读取
2013-01-06 18:13 2548默认情况下,一个region是一个tableSplit,对应一 ... -
GET查询HBase无结果时 Result的size也不为空
2012-11-28 11:15 2030用Get查询hbase某个row时,就算该row不存在,但还是 ...
相关推荐
MapReduce生成HFile入库到HBase 可能需要的jar包,一共有3个 可以直接放在每台机器的${HADOOP_HOME}/lib下 hadoopHadoop 1.1.2 + hbase 0.94.6.1
本文将HBase-2.2.1安装在Hadoop-3.1.2上,关于Hadoop-3.1.2的安装,请参见《基于zookeeper-3.5.5安装hadoop-3.1.2》一文。安装环境为64位CentOS-Linux 7.2版本。 本文将在HBase官方提供的quickstart.html文件的指导...
hadoop-2.7.2-hbase-jar.tar.gz hadoop-2.7.2-hbase-jar.tar.gz hadoop-2.7.2-hbase-jar.tar.gz
hadoop2.73-eclipse开发hbase所需要的所有jar包,便于大家下载使用
hadoop集群配置流程以及用到的配置文件,hadoop2.8.4、hbase2.1.0、zookeeper3.4.12
hadoop1.1.2操作例子 包括hbase hive mapreduce相应的jar包
Hadoop深入浅出之HBASE介绍.pptx
Hadoop+Zookeeper+HBase环境搭建,详细步骤和实例,从零开始搭建Hadoop集群
NULL 博文链接:https://liuyq1991.iteye.com/blog/2109364
该文档保护了目前比较流行的大数据平台的原理过程梳理。Hadoop,Hive,Hbase,Spark,MapReduce,Storm
hadoop_hadoop-2.7.2-hbase-jar.rar hadoop_hadoop-2.7.2-hbase-jar.rar
基于hadoop+hbase+springboot实现的分布式网盘系统,适合本科毕业设计 资源包含的整个demo在Hadoop,和Hbase环境搭建好了,可以启动起来。 技术选型 1.Hadoop 2.Hbase 3.SpringBoot ...... 系统实现的功能 1.用户...
Hadoop(Hbase)的安装部署与配置实验
初学Hadoop时试验搭建很多次的集群部署方案,步骤很详细。hadoop-2.6.5.tar.gz zookeeper-3.4.10.tar.gz Hbase1.2.6 ,两个nameNode+三dataNode
数据仓库hadoop+zookeeper+hbase集群安装方法记录,自己搭建纯手写的记录。相关软件请自行下载
Docker(Hadoop_3.3.1+HBase_2.4.16+Zookeeper_3.7.1+Hive_3.1.3 )配置文件 搭建集群环境
Hadoop3.1.1集成hbase2.1.1,很详细的介绍了Hadoop3.1.1集成hbase2.1.1的教程,希望对大家有用。
采用Docker Swarm集群方式, 部署Hadoop3.x + HBase2.x的真正分布式集群环境,趟坑无数, 配置文件已整理好,内置Dockerfile构建文件、docker-compose脚本文件、hbase安装包、hadoop配置文件等。可以根据生产环境, ...
hadoop,hbase,hive版本整合兼容性最全,最详细说明【适用于任何版本】,避免下载后才发现不兼容的坑
Hadoop+Hbase搭建云存储总结