服务器之家:专注于服务器技术及软件下载分享
分类导航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|数据库技术|

服务器之家 - 数据库 - MongoDB - 通用MapReduce程序复制HBase表数据

通用MapReduce程序复制HBase表数据

2020-05-20 16:29Angelababy_huan MongoDB

这篇文章主要为大家详细介绍了通用MapReduce程序复制HBase表数据,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

编写MR程序,让其可以适合大部分的HBase表数据导入到HBase表数据。其中包括可以设置版本数、可以设置输入表的列导入设置(选取其中某几列)、可以设置输出表的列导出设置(选取其中某几列)。

原始表test1数据如下:

通用MapReduce程序复制HBase表数据

每个row key都有两个版本的数据,这里只显示了row key为1的数据

 在hbase shell 中创建数据表:

?
1
2
3
4
5
6
7
create 'test2',{NAME => 'cf1',VERSIONS => 10}  // 保存无版本、无列导入设置、无列导出设置的数据
create 'test3',{NAME => 'cf1',VERSIONS => 10}  // 保存无版本、无列导入设置、有列导出设置的数据
create 'test4',{NAME => 'cf1',VERSIONS => 10}  // 保存无版本、有列导入设置、无列导出设置的数据
create 'test5',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、无列导入设置、无列导出设置的数据
create 'test6',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、无列导入设置、有列导出设置的数据
create 'test7',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、有列导入设置、无列导出设置的数据
create 'test8',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、有列导入设置、有列导出设置的数据

main函数入口:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package GeneralHBaseToHBase;
import org.apache.hadoop.util.ToolRunner;
public class DriverTest {
 public static void main(String[] args) throws Exception {
 // 无版本设置、无列导入设置,无列导出设置
 String[] myArgs1= new String[]{
 "test1", // 输入表
 "test2", // 输出表
 "0"// 版本大小数,如果值为0,则为默认从输入表导出最新的数据到输出表
 "-1", // 列导入设置,如果为-1 ,则没有设置列导入
 "-1" // 列导出设置,如果为-1,则没有设置列导出
 };
 ToolRunner.run(HBaseDriver.getConfiguration(),
 new HBaseDriver(),
 myArgs1);
 // 无版本设置、有列导入设置,无列导出设置
 String[] myArgs2= new String[]{
 "test1",
 "test3",
 "0",
 "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
 "-1"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(),
 new HBaseDriver(),
 myArgs2);
 // 无版本设置,无列导入设置,有列导出设置
 String[] myArgs3= new String[]{
 "test1",
 "test4",
 "0",
 "-1",
 "cf1:c1,cf1:c10,cf1:c14"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(),
 new HBaseDriver(),
 myArgs3);
 // 有版本设置,无列导入设置,无列导出设置
 String[] myArgs4= new String[]{
 "test1",
 "test5",
 "2",
 "-1",
 "-1"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(),
 new HBaseDriver(),
 myArgs4);
 // 有版本设置、有列导入设置,无列导出设置
 String[] myArgs5= new String[]{
 "test1",
 "test6",
 "2",
 "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
 "-1"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(),
 new HBaseDriver(),
 myArgs5);
 
 // 有版本设置、无列导入设置,有列导出设置
 String[] myArgs6= new String[]{
 "test1",
 "test7",
 "2",
 "-1",
 "cf1:c1,cf1:c10,cf1:c14"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(),
 new HBaseDriver(),
 myArgs6);
 // 有版本设置、有列导入设置,有列导出设置
 String[] myArgs7= new String[]{
 "test1",
 "test8",
 "2",
 "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
 "cf1:c1,cf1:c10,cf1:c14"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(),
 new HBaseDriver(),
 myArgs7);
 }
 
}

driver:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package GeneralHBaseToHBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import util.JarUtil;
 
 
public class HBaseDriver extends Configured implements Tool{
 public static String FROMTABLE=""; //导入表
 public static String TOTABLE=""; //导出表
 public static String SETVERSION=""; //是否设置版本
 // args => {FromTable,ToTable,SetVersion,ColumnFromTable,ColumnToTable}
 @Override
 public int run(String[] args) throws Exception {
 if(args.length!=5){
 System.err.println("Usage:\n demo.job.HBaseDriver <input> <inputTable> "
  + "<output> <outputTable>"
  +"< versions >"
  + " <set columns from inputTable> like <cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14> or <-1> "
  + "<set columns from outputTable> like <cf1:c1,cf1:c10,cf1:c14> or <-1>");
 return -1;
 }
 Configuration conf = getConf();
 FROMTABLE = args[0];
 TOTABLE = args[1];
 SETVERSION = args[2];
 conf.set("SETVERSION", SETVERSION);
 if(!args[3].equals("-1")){
 conf.set("COLUMNFROMTABLE", args[3]);
 }
 if(!args[4].equals("-1")){
 conf.set("COLUMNTOTABLE", args[4]);
 }
 String jobName ="From table "+FROMTABLE+ " ,Import to "+ TOTABLE;
 Job job = Job.getInstance(conf, jobName);
 job.setJarByClass(HBaseDriver.class);
 Scan scan = new Scan();
 // 判断是否需要设置版本
 if(SETVERSION != "0" || SETVERSION != "1"){
 scan.setMaxVersions(Integer.parseInt(SETVERSION));
 }
 // 设置HBase表输入:表名、scan、Mapper类、mapper输出键类型、mapper输出值类型
 TableMapReduceUtil.initTableMapperJob(
 FROMTABLE,
 scan,
 HBaseToHBaseMapper.class,
 ImmutableBytesWritable.class,
 Put.class,
 job);
 // 设置HBase表输出:表名,reducer类
 TableMapReduceUtil.initTableReducerJob(TOTABLE, null, job);
 // 没有 reducers, 直接写入到 输出文件
  job.setNumReduceTasks(0);
 
  return job.waitForCompletion(true) ? 0 : 1;
  
 }
 private static Configuration configuration;
 public static Configuration getConfiguration(){
 if(configuration==null){
 /**
 * TODO 了解如何直接从Windows提交代码到Hadoop集群
 *  并修改其中的配置为实际配置
 */
 configuration = new Configuration();
 configuration.setBoolean("mapreduce.app-submission.cross-platform", true);// 配置使用跨平台提交任务
 configuration.set("fs.defaultFS", "hdfs://master:8020");// 指定namenode
 configuration.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架
 configuration.set("yarn.resourcemanager.address", "master:8032"); // 指定resourcemanager
 configuration.set("yarn.resourcemanager.scheduler.address", "master:8030");// 指定资源分配器
 configuration.set("mapreduce.jobhistory.address", "master:10020");// 指定historyserver
 configuration.set("hbase.master", "master:16000");
 configuration.set("hbase.rootdir", "hdfs://master:8020/hbase");
 configuration.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");
 configuration.set("hbase.zookeeper.property.clientPort", "2181");
 //TODO 需export->jar file ; 设置正确的jar包所在位置
 configuration.set("mapreduce.job.jar",JarUtil.jar(HBaseDriver.class));// 设置jar包路径
 }
 
 return configuration;
 }
 
 
}

mapper:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
package GeneralHBaseToHBase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.NavigableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HBaseToHBaseMapper extends TableMapper<ImmutableBytesWritable, Put> {
 Logger log = LoggerFactory.getLogger(HBaseToHBaseMapper.class);
 private static int versionNum = 0;
 private static String[] columnFromTable = null;
 private static String[] columnToTable = null;
 private static String column1 = null;
 private static String column2 = null;
 @Override
 protected void setup(Context context)
 throws IOException, InterruptedException {
 Configuration conf = context.getConfiguration();
 versionNum = Integer.parseInt(conf.get("SETVERSION", "0"));
 column1 = conf.get("COLUMNFROMTABLE",null);
 if(!(column1 == null)){
 columnFromTable = column1.split(",");
 }
 column2 = conf.get("COLUMNTOTABLE",null);
 if(!(column2 == null)){
 columnToTable = column2.split(",");
 }
 }
 @Override
 protected void map(ImmutableBytesWritable key, Result value,
 Context context)
 throws IOException, InterruptedException {
 context.write(key, resultToPut(key,value));
 }
 /***
 * 把key,value转换为Put
 * @param key
 * @param value
 * @return
 * @throws IOException
 */
 private Put resultToPut(ImmutableBytesWritable key, Result value) throws IOException {
 HashMap<String, String> fTableMap = new HashMap<>();
 HashMap<String, String> tTableMap = new HashMap<>();
 Put put = new Put(key.get());
 if(! (columnFromTable == null || columnFromTable.length == 0)){
 fTableMap = getFamilyAndColumn(columnFromTable);
 }
 if(! (columnToTable == null || columnToTable.length == 0)){
 tTableMap = getFamilyAndColumn(columnToTable);
 }
 if(versionNum==0){     
 if(fTableMap.size() == 0){  
 if(tTableMap.size() == 0){
  for (Cell kv : value.rawCells()) {
  put.add(kv); // 没有设置版本,没有设置列导入,没有设置列导出
  }
  return put;
 } else{
  return getPut(put, value, tTableMap); // 无版本、无列导入、有列导出
 }
 } else {
 if(tTableMap.size() == 0){
  return getPut(put, value, fTableMap);// 无版本、有列导入、无列导出
 } else {
  return getPut(put, value, tTableMap);// 无版本、有列导入、有列导出
 }
 }
 } else{
 if(fTableMap.size() == 0){
 if(tTableMap.size() == 0){
  return getPut1(put, value); // 有版本,无列导入,无列导出
 }else{
  return getPut2(put, value, tTableMap); //有版本,无列导入,有列导出
 }
 }else{
 if(tTableMap.size() == 0){
  return getPut2(put,value,fTableMap);// 有版本,有列导入,无列导出
 }else{
  return getPut2(put,value,tTableMap); // 有版本,有列导入,有列导出
 }
 }
 }
 }
 /***
 * 无版本设置的情况下,对于有列导入或者列导出
 * @param put
 * @param value
 * @param tableMap
 * @return
 * @throws IOException
 */
 
 private Put getPut(Put put,Result value,HashMap<String, String> tableMap) throws IOException{
 for(Cell kv : value.rawCells()){
 byte[] family = kv.getFamily();
 if(tableMap.containsKey(new String(family))){
 String columnStr = tableMap.get(new String(family));
 ArrayList<String> columnBy = toByte(columnStr);
 if(columnBy.contains(new String(kv.getQualifier()))){
  put.add(kv); //没有设置版本,没有设置列导入,有设置列导出
 }
 }
 }
 return put;
 }
 /***
 * (有版本,无列导入,有列导出)或者(有版本,有列导入,无列导出)
 * @param put
 * @param value
 * @param tTableMap
 * @return
 */
 private Put getPut2(Put put,Result value,HashMap<String, String> tableMap){
 NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map=value.getMap();
  for(byte[] family:map.keySet()){
   if(tableMap.containsKey(new String(family))){
   String columnStr = tableMap.get(new String(family));
   log.info("@@@@@@@@@@@"+new String(family)+" "+columnStr);
 ArrayList<String> columnBy = toByte(columnStr);
   NavigableMap<byte[], NavigableMap<Long, byte[]>> familyMap = map.get(family);//列簇作为key获取其中的列相关数据
    for(byte[] column:familyMap.keySet()){        //根据列名循坏
     log.info("!!!!!!!!!!!"+new String(column));
     if(columnBy.contains(new String(column))){
     NavigableMap<Long, byte[]> valuesMap = familyMap.get(column);
      for(Entry<Long, byte[]> s:valuesMap.entrySet()){//获取列对应的不同版本数据,默认最新的一个
      System.out.println("***:"+new String(family)+" "+new String(column)+" "+s.getKey()+" "+new String(s.getValue()));
      put.addColumn(family, column, s.getKey(),s.getValue());
      }
     }
    }
   }
   
  }
 return put;
 }
 /***
 * 有版本、无列导入、无列导出
 * @param put
 * @param value
 * @return
 */
 private Put getPut1(Put put,Result value){
 NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map=value.getMap();
  for(byte[] family:map.keySet()){
   NavigableMap<byte[], NavigableMap<Long, byte[]>> familyMap = map.get(family);//列簇作为key获取其中的列相关数据
   for(byte[] column:familyMap.keySet()){        //根据列名循坏
    NavigableMap<Long, byte[]> valuesMap = familyMap.get(column);
    for(Entry<Long, byte[]> s:valuesMap.entrySet()){    //获取列对应的不同版本数据,默认最新的一个
     put.addColumn(family, column, s.getKey(),s.getValue());
    }
   }
  }
  return put;
 }
 // str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}
 /***
 * 得到列簇名与列名的k,v形式的map
 * @param str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}
 * @return map => {"cf1" => "c1,c2,c10,c11,c14"}
 */
 private static HashMap<String, String> getFamilyAndColumn(String[] str){
 HashMap<String, String> map = new HashMap<>();
 HashSet<String> set = new HashSet<>();
 for(String s : str){
 set.add(s.split(":")[0]);
 }
 Object[] ob = set.toArray();
 for(int i=0; i<ob.length;i++){
 String family = String.valueOf(ob[i]);
 String columns = "";
 for(int j=0;j < str.length;j++){
 if(family.equals(str[j].split(":")[0])){
  columns += str[j].split(":")[1]+",";
 }
 }
 map.put(family, columns.substring(0, columns.length()-1));
 }
 return map;
 }
 
 private static ArrayList<String> toByte(String s){
 ArrayList<String> b = new ArrayList<>();
 String[] sarr = s.split(",");
 for(int i=0;i<sarr.length;i++){
 b.add(sarr[i]);
 }
 return b;
 }
}

程序运行完之后,在hbase shell中查看每个表,看是否数据导入正确:

test2:(无版本、无列导入设置、无列导出设置)

通用MapReduce程序复制HBase表数据

test3 (无版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、无列导出设置)

通用MapReduce程序复制HBase表数据

test4(无版本、无列导入设置、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))

通用MapReduce程序复制HBase表数据

test5(有版本、无列导入设置、无列导出设置)

通用MapReduce程序复制HBase表数据

test6(有版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、无列导出设置)

通用MapReduce程序复制HBase表数据

test7(有版本、无列导入设置、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))

通用MapReduce程序复制HBase表数据

test8(有版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))

通用MapReduce程序复制HBase表数据

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/Angelababy_huan/article/details/53236693

延伸 · 阅读

精彩推荐
  • MongoDBmongodb数据库基础知识之连表查询

    mongodb数据库基础知识之连表查询

    这篇文章主要给大家介绍了关于mongodb数据库基础知识之连表查询的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用mongodb具有一定的参...

    ZJW02155642020-05-22
  • MongoDB在mac系统下安装与配置mongoDB数据库

    在mac系统下安装与配置mongoDB数据库

    这篇文章主要介绍了在mac系统下安装与配置mongoDB数据库的操作步骤,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪...

    CXYhh1219312021-11-14
  • MongoDBMongoDB的索引

    MongoDB的索引

    数据库中的索引就是用来提高查询操作的性能,但是会影响插入、更新和删除的效率,因为数据库不仅要执行这些操作,还要负责索引的更新 ...

    MongoDB教程网2532020-05-12
  • MongoDBMongodb索引的优化

    Mongodb索引的优化

    MongoDB 是一个基于分布式文件存储的数据库。由 C++ 语言编写。接下来通过本文给大家介绍Mongodb索引的优化,本文介绍的非常详细,具有参考借鉴价值,感...

    MRR3252020-05-05
  • MongoDBMongoDB查询之高级操作详解(多条件查询、正则匹配查询等)

    MongoDB查询之高级操作详解(多条件查询、正则匹配查询等)

    这篇文章主要给大家介绍了关于MongoDB查询之高级操作(多条件查询、正则匹配查询等)的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者...

    w田翔3872020-12-19
  • MongoDBWindows下MongoDB配置用户权限实例

    Windows下MongoDB配置用户权限实例

    这篇文章主要介绍了Windows下MongoDB配置用户权限实例,本文实现需要输入用户名、密码才可以访问MongoDB数据库,需要的朋友可以参考下 ...

    MongoDB教程网3082020-04-29
  • MongoDBMongoDB系列教程(五):mongo语法和mysql语法对比学习

    MongoDB系列教程(五):mongo语法和mysql语法对比学习

    这篇文章主要介绍了MongoDB系列教程(五):mongo语法和mysql语法对比学习,本文对熟悉Mysql数据库的同学来说帮助很大,用对比的方式可以快速学习到MongoDB的命...

    MongoDB教程网3252020-05-01
  • MongoDBMongoDB多条件模糊查询示例代码

    MongoDB多条件模糊查询示例代码

    这篇文章主要给大家介绍了关于MongoDB多条件模糊查询的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用MongoDB具有一定的参考学习价值...

    浅夏晴空5902020-05-25