柚子快報(bào)邀請(qǐng)碼778899分享:數(shù)據(jù)庫(kù) Hbase與MR的交互
柚子快報(bào)邀請(qǐng)碼778899分享:數(shù)據(jù)庫(kù) Hbase與MR的交互
Hbase與MR的交互
?
小白的Hbase學(xué)習(xí)筆記
?
目錄
Hbase與MR的交互
1.從Hbase中讀取學(xué)生表信息 統(tǒng)計(jì)每個(gè)班級(jí)下的人數(shù) 并將最后的結(jié)果寫(xiě)入HDFS或者本地文件系統(tǒng)
2.統(tǒng)計(jì)每個(gè)年齡的人數(shù)
1)寫(xiě)入一列數(shù)據(jù)
2)寫(xiě)入兩列數(shù)據(jù)
?
?
1.從Hbase中讀取學(xué)生表信息 統(tǒng)計(jì)每個(gè)班級(jí)下的人數(shù) 并將最后的結(jié)果寫(xiě)入HDFS或者本地文件系統(tǒng)
?
package com.shujia.hbaseMROP;
import com.google.common.collect.Table;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
//需求:
// 從Hbase中讀取學(xué)生表信息 統(tǒng)計(jì)每個(gè)班級(jí)下的人數(shù) 并將最后的結(jié)果寫(xiě)入HDFS或者本地文件系統(tǒng)
public class Hbase2HDFS {
//添加依賴
//Map端
//Map端輸入的應(yīng)該是班級(jí)名稱 -> 1
public static class myHbaseMapper extends TableMapper
@Override
protected void map(ImmutableBytesWritable key, Result value, Mapper
String row = Bytes.toString(key.get());
String rowKey = Bytes.toString(value.getRow());
System.out.println("row:"+row+"rowKey:"+rowKey);
byte[] bytes = value.getValue(
Bytes.toBytes("info")
, Bytes.toBytes("clazz")
);
if(bytes != null){
String clazz = Bytes.toString(bytes);
context.write(new Text(clazz),new IntWritable(1));
}
}
}
//reduce端
public static class myReducer extends Reducer
@Override
protected void reduce(Text key, Iterable
int num=0;
for (IntWritable value : values) {
num += value.get();
}
context.write(key,new IntWritable(num));
}
}
//driver
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
//Job連接Hbase
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,master:2181");//如果端口不是默認(rèn)2181 那么可以輸入
Job job = Job.getInstance(conf);
job.setJobName("Hbase2HDFS");
job.setJarByClass(Hbase2HDFS.class);
//設(shè)置Mapper
/**
* TableName table,
* Scan scan,
* Class extends TableMapper> mapper,
* Class> outputKeyClass,
* Class> outputValueClass, Job job) throws IOException
*/
TableMapReduceUtil.initTableMapperJob(
TableName.valueOf("jan:tbl1")
,new Scan()
,myHbaseMapper.class
,Text.class
,IntWritable.class
,job
);
//設(shè)置Reducer
job.setReducerClass(myReducer.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//輸入(上面連接zookeeper)
// 輸出的配置
FileOutputFormat.setOutputPath(job,new Path("D:\\IDEA_workplace\\hbasedemo15\\hbaseOut\\out"));
//提交
job.waitForCompletion(true);
}
}
?
?
2.統(tǒng)計(jì)每個(gè)年齡的人數(shù)
?
1)寫(xiě)入一列數(shù)據(jù)
?
package com.shujia.hbaseMROP;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//需求:
// 統(tǒng)計(jì)每個(gè)年齡的人數(shù)
public class Hbase2Hbase {
//一個(gè)MR程序包含三部分
//Mapper
public static class ReadHbaseMapper extends TableMapper
@Override
protected void map(ImmutableBytesWritable key, Result value, Mapper
byte[] age_value = value.getValue(
Bytes.toBytes("info")
, Bytes.toBytes("age")
);
if(age_value != null){
context.write(new Text(Bytes.toString(age_value)),new IntWritable(1));
}
}
}
//Reducer
public static class WriteHbaseReducer extends TableReducer
@Override
protected void reduce(Text key, Iterable
int num=0;
for (IntWritable value : values) {
num +=value.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
//在Hbase中創(chuàng)建存儲(chǔ)數(shù)據(jù)的表
//create 'jan:age_cnt',{NAME => 'info1',VERSIONS => 1}
//寫(xiě)入一列數(shù)據(jù)
put.addColumn(
Bytes.toBytes("info1")
,Bytes.toBytes("cnt")
,Bytes.toBytes(num)
);
//寫(xiě)入兩行數(shù)據(jù)
/**
* put.addColumn(
* Bytes.toBytes("info1")
* ,Bytes.toBytes("age")
* ,Bytes.toBytes(key.toString())
* );
*/
context.write(NullWritable.get(),put);
}
}
//driver
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum","node1,node2,master");
Job job = Job.getInstance(conf);
job.setJobName("Hbase2Hbase");
job.setJarByClass(Hbase2Hbase.class);
//設(shè)置Mapper
/**
* TableName table,
* Scan scan,
* Class extends TableMapper> mapper,
* Class> outputKeyClass,
* Class> outputValueClass,
* Job job
*/
TableMapReduceUtil.initTableMapperJob(
TableName.valueOf("jan:tbl1")
,new Scan()
,ReadHbaseMapper.class
,Text.class
,IntWritable.class
,job
);
//設(shè)置Reducer
TableMapReduceUtil.initTableReducerJob(
"jan:age_cnt"
,WriteHbaseReducer.class
,job
);
//提交
job.waitForCompletion(true);
/**
* scan 'jan:age_cnt'
*
* truncate 'jan:age_cnt'
*/
}
}
?
scan 'jan:age_cnt'
truncate 'jan:age_cnt'
?
?
?
2)寫(xiě)入兩列數(shù)據(jù)
?
package com.shujia.hbaseMROP;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//需求:
// 統(tǒng)計(jì)每個(gè)年齡的人數(shù)
public class Hbase2Hbase {
//一個(gè)MR程序包含三部分
//Mapper
public static class ReadHbaseMapper extends TableMapper
@Override
protected void map(ImmutableBytesWritable key, Result value, Mapper
byte[] age_value = value.getValue(
Bytes.toBytes("info")
, Bytes.toBytes("age")
);
if(age_value != null){
context.write(new Text(Bytes.toString(age_value)),new IntWritable(1));
}
}
}
//Reducer
public static class WriteHbaseReducer extends TableReducer
@Override
protected void reduce(Text key, Iterable
int num=0;
for (IntWritable value : values) {
num +=value.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
//在Hbase中創(chuàng)建存儲(chǔ)數(shù)據(jù)的表
//create 'jan:age_cnt',{NAME => 'info1',VERSIONS => 1}
//寫(xiě)入一列數(shù)據(jù)
put.addColumn(
Bytes.toBytes("info1")
,Bytes.toBytes("cnt")
,Bytes.toBytes(num)
);
//寫(xiě)入兩行數(shù)據(jù)
put.addColumn(
Bytes.toBytes("info1")
,Bytes.toBytes("age")
,Bytes.toBytes(key.toString())
);
context.write(NullWritable.get(),put);
}
}
//driver
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum","node1,node2,master");
Job job = Job.getInstance(conf);
job.setJobName("Hbase2Hbase");
job.setJarByClass(Hbase2Hbase.class);
//設(shè)置Mapper
/**
* TableName table,
* Scan scan,
* Class extends TableMapper> mapper,
* Class> outputKeyClass,
* Class> outputValueClass,
* Job job
*/
TableMapReduceUtil.initTableMapperJob(
TableName.valueOf("jan:tbl1")
,new Scan()
,ReadHbaseMapper.class
,Text.class
,IntWritable.class
,job
);
//設(shè)置Reducer
TableMapReduceUtil.initTableReducerJob(
"jan:age_cnt"
,WriteHbaseReducer.class
,job
);
//提交
job.waitForCompletion(true);
/**
* scan 'jan:age_cnt'
*
* truncate 'jan:age_cnt'
*/
}
}
?
?
柚子快報(bào)邀請(qǐng)碼778899分享:數(shù)據(jù)庫(kù) Hbase與MR的交互
相關(guān)閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。