hadoop mapreduce 架构图(详解HadoopMapReduce处理海量小文件)
hadoop mapreduce 架构图(详解HadoopMapReduce处理海量小文件)压缩选项,详见上面代码中的配置。下面看运行上面程序的过程:
前言在HDFS上存储文件,大量的小文件是非常消耗NameNode内存的,因为每个文件都会分配一个文件描述符,NameNode需要在启动的时候加载全部文件的描述信息,所以文件越多,对NameNode来说开销越大。我们可以考虑,将小文件压缩以后,再上传到HDFS中,这时只需要一个文件描述符信息,自然大大减轻了NameNode对内存使用的开销。MapReduce计算中,hadoop内置提供了如下几种压缩格式:
- DEFLATE
- gzip
- bzip2
- LZO
使用压缩文件进行MapReduce计算,它的开销在于解压缩所消耗的时间,在特定的应用场景中这个也是应该考虑的问题。不过对于海量小文件的应用场景,我们压缩了小文件,却换来的Locality特性。假如成百上千的小文件压缩后只有一个Block,那么这个Block必然存在一个DataNode节点上,在计算的时候输入一个InputSplit,没有网络间传输数据的开销,而且是在本地进行运算。
倘若直接将小文件上传到HDFS上,成百上千的小Block分布在不同DataNode节点上,为了计算可能需要“移动数据”之后才能进行计算。文件很少的情况下,除了NameNode内存使用开销以外,可能感觉不到网络传输开销,但是如果小文件达到一定规模就非常明显了。下面,我们使用gzip格式压缩小文件,然后上传到HDFS中,实现MapReduce程序进行任务处理。使用一个类实现了基本的Map任务和Reduce任务,代码如下所示:(原创:时延军(包含链接:http://shiyanjun.cn))
package org.shirdrn.kodz.inaction.hadoop.smallfiles.compression;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class GzipFilesMaxCostComputation {
public static class GzipFilesMapper extends Mapper<LongWritable Text Text LongWritable> {
private final static LongWritable costValue = new LongWritable(0);
private Text code = new Text();
@Override
protected void map(LongWritable key Text value Context context) throws IOException InterruptedException {
// a line such as 'SG 253654006139495 253654006164392 619850464'
String line = value.toString();
String[] array = line.split("\\s");
if (array.length == 4) {
String countryCode = array[0];
String strCost = array[3];
long cost = 0L;
try {
cost = Long.parseLong(strCost);
} catch (NumberFormatException e) {
cost = 0L;
}
if (cost != 0) {
code.set(countryCode);
costValue.set(cost);
context.write(code costValue);
}
}
}
}
public static class GzipFilesReducer extends Reducer<Text LongWritable Text LongWritable> {
@Override
protected void reduce(Text key Iterable<LongWritable> values Context context) throws IOException InterruptedException {
long max = 0L;
Iterator<LongWritable> iter = values.iterator();
while (iter.hasNext()) {
LongWritable current = iter.next();
if (current.get() > max) {
max = current.get();
}
}
context.write(key new LongWritable(max));
}
}
public static void main(String[] args) throws IOException ClassNotFoundException InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: gzipmaxcost <in> <out>");
System.exit(2);
}
Job job = new Job(conf "gzip maxcost");
job.getConfiguration().setBoolean("mapred.output.compress" true);
job.getConfiguration().setClass("mapred.output.compression.codec" GzipCodec.class CompressionCodec.class);
job.setJarByClass(GzipFilesMaxCostComputation.class);
job.setMapperClass(GzipFilesMapper.class);
job.setCombinerClass(GzipFilesReducer.class);
job.setReducerClass(GzipFilesReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job new Path(otherArgs[1]));
int exitFlag = job.waitForCompletion(true) ? 0 : 1;
System.exit(exitFlag);
}
}
上面程序就是计算最大值的问题,实现比较简单,而且使用gzip压缩文件。另外,如果考虑Mapper输出后,需要向Reducer拷贝的数据量比较大,可以考虑在配置Job的时候,指定
压缩选项,详见上面代码中的配置。
下面看运行上面程序的过程:
- 准备数据
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ du -sh ../dataset/gzipfiles/*
147M ../dataset/gzipfiles/data_10m.gz
43M ../dataset/gzipfiles/data_50000_1.gz
16M ../dataset/gzipfiles/data_50000_2.gz
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -mkdir /user/xiaoxiang/datasets/gzipfiles
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -copyFromLocal ../dataset/gzipfiles/* /user/xiaoxiang/datasets/gzipfiles
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -ls /user/xiaoxiang/datasets/gzipfiles
Found 3 items
-rw-r--r-- 3 xiaoxiang supergroup 153719349 2013-03-24 12:56 /user/xiaoxiang/datasets/gzipfiles/data_10m.gz
-rw-r--r-- 3 xiaoxiang supergroup 44476101 2013-03-24 12:56 /user/xiaoxiang/datasets/gzipfiles/data_50000_1.gz
-rw-r--r-- 3 xiaoxiang supergroup 15935178 2013-03-24 12:56 /user/xiaoxiang/datasets/gzipfiles/data_50000_2.gz
- 运行程序
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop jar gzip-compression.jar
org.shirdrn.kodz.inaction.hadoop.smallfiles.compression.GzipFilesMaxCostComputation /user/xiaoxiang/datasets/gzipfiles /user/xiaoxiang/output/smallfiles/gzip
13/03/24 13:06:28 INFO input.FileInputFormat: Total input paths to process : 3
13/03/24 13:06:28 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/03/24 13:06:28 WARN snappy.LoadSnappy: Snappy native library not loaded
13/03/24 13:06:28 INFO mapred.JobClient: Running job: job_201303111631_0039
13/03/24 13:06:29 INFO mapred.JobClient: map 0% reduce 0%
13/03/24 13:06:55 INFO mapred.JobClient: map 33% reduce 0%
13/03/24 13:07:04 INFO mapred.JobClient: map 66% reduce 11%
13/03/24 13:07:13 INFO mapred.JobClient: map 66% reduce 22%
13/03/24 13:07:25 INFO mapred.JobClient: map 100% reduce 22%
13/03/24 13:07:31 INFO mapred.JobClient: map 100% reduce 100%
13/03/24 13:07:36 INFO mapred.JobClient: Job complete: job_201303111631_0039
13/03/24 13:07:36 INFO mapred.JobClient: Counters: 29
13/03/24 13:07:36 INFO mapred.JobClient: Job Counters
13/03/24 13:07:36 INFO mapred.JobClient: Launched reduce tasks=1
13/03/24 13:07:36 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=78231
13/03/24 13:07:36 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/03/24 13:07:36 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/03/24 13:07:36 INFO mapred.JobClient: Launched map tasks=3
13/03/24 13:07:36 INFO mapred.JobClient: Data-local map tasks=3
13/03/24 13:07:36 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=34413
13/03/24 13:07:36 INFO mapred.JobClient: File Output Format Counters
13/03/24 13:07:36 INFO mapred.JobClient: Bytes Written=1337
13/03/24 13:07:36 INFO mapred.JobClient: FileSystemCounters
13/03/24 13:07:36 INFO mapred.JobClient: FILE_BYTES_READ=288127
13/03/24 13:07:36 INFO mapred.JobClient: HDFS_BYTES_READ=214131026
13/03/24 13:07:36 INFO mapred.JobClient: FILE_BYTES_WRITTEN=385721
13/03/24 13:07:36 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=1337
13/03/24 13:07:36 INFO mapred.JobClient: File Input Format Counters
13/03/24 13:07:36 INFO mapred.JobClient: Bytes Read=214130628
13/03/24 13:07:36 INFO mapred.JobClient: Map-Reduce Framework
13/03/24 13:07:36 INFO mapred.JobClient: Map output materialized bytes=9105
13/03/24 13:07:36 INFO mapred.JobClient: Map input records=14080003
13/03/24 13:07:36 INFO mapred.JobClient: Reduce shuffle bytes=6070
13/03/24 13:07:36 INFO mapred.JobClient: Spilled Records=22834
13/03/24 13:07:36 INFO mapred.JobClient: Map output bytes=154878493
13/03/24 13:07:36 INFO mapred.JobClient: CPU time spent (ms)=90200
13/03/24 13:07:36 INFO mapred.JobClient: Total committed heap usage (bytes)=688193536
13/03/24 13:07:36 INFO mapred.JobClient: Combine input records=14092911
13/03/24 13:07:36 INFO mapred.JobClient: SPLIT_RAW_BYTES=398
13/03/24 13:07:36 INFO mapred.JobClient: Reduce input records=699
13/03/24 13:07:36 INFO mapred.JobClient: Reduce input groups=233
13/03/24 13:07:36 INFO mapred.JobClient: Combine output records=13747
13/03/24 13:07:36 INFO mapred.JobClient: Physical memory (bytes) snapshot=765448192
13/03/24 13:07:36 INFO mapred.JobClient: Reduce output records=233
13/03/24 13:07:36 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2211237888
13/03/24 13:07:36 INFO mapred.JobClient: Map output records=14079863
- 运行结果
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -ls /user/xiaoxiang/output/smallfiles/gzip
Found 3 items
-rw-r--r-- 3 xiaoxiang supergroup 0 2013-03-24 13:07 /user/xiaoxiang/output/smallfiles/gzip/_SUCCESS
drwxr-xr-x - xiaoxiang supergroup 0 2013-03-24 13:06 /user/xiaoxiang/output/smallfiles/gzip/_logs
-rw-r--r-- 3 xiaoxiang supergroup 1337 2013-03-24 13:07 /user/xiaoxiang/output/smallfiles/gzip/part-r-00000.gz
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -copyToLocal /user/xiaoxiang/output/smallfiles/gzip/part-r-00000.gz ./
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ gunzip -c ./part-r-00000.gz
AD 999974516
AE 999938630
AF 999996180
AG 999991085
AI 999989595
AL 999998489
AM 999978568
AO 999989628
AQ 999995031
AR 999999563
AS 999935982
AT 999999909
AU 999937089
AW 999965784
AZ 999996557
BA 999994828
BB 999992177
BD 999992272
BE 999925057
BF 999999220
BG 999971528
BH 999994900
BI 999982573
BJ 999977886
BM 999991925
BN 999986630
BO 999995482
BR 999989947
BS 999983475
BT 999992685
BW 999984222
BY 999998496
BZ 999997173
CA 999991096
CC 999969761
CD 999978139
CF 999995342
CG 999957938
CH 999997524
CI 999998864
CK 999968719
CL 999967083
CM 999998369
CN 999975367
CO 999999167
CR 999980097
CU 999976352
CV 999990543
CW 999996327
CX 999987579
CY 999982925
CZ 999993908
DE 999985416
DJ 999997438
DK 999963312
DM 999941706
DO 999992176
DZ 999973610
EC 999971018
EE 999960984
EG 999980522
ER 999980425
ES 999949155
ET 999987033
FI 999989788
FJ 999990686
FK 999977799
FM 999994183
FO 999988472
FR 999988342
GA 999982099
GB 999970658
GD 999996318
GE 999991970
GF 999982024
GH 999941039
GI 999995295
GL 999948726
GM 999984872
GN 999992209
GP 999996090
GQ 999988635
GR 999999672
GT 999981025
GU 999975956
GW 999962551
GY 999999881
HK 999970084
HN 999972628
HR 999986688
HT 999970913
HU 999997568
ID 999994762
IE 999996686
IL 999982184
IM 999987831
IN 999973935
IO 999984611
IQ 999990126
IR 999986780
IS 999973585
IT 999997239
JM 999986629
JO 999982595
JP 999985598
KE 999996012
KG 999991556
KH 999975644
KI 999994328
KM 999989895
KN 999991068
KP 999967939
KR 999992162
KW 999924295
KY 999985907
KZ 999992835
LA 999989151
LB 999989233
LC 999994793
LI 999986863
LK 999989876
LR 999984906
LS 999957706
LT 999999688
LU 999999823
LV 999981633
LY 999992365
MA 999993880
MC 999978886
MD 999997483
MG 999996602
MH 999989668
MK 999983468
ML 999990079
MM 999989010
MN 999969051
MO 999978283
MP 999995848
MQ 999913110
MR 999982303
MS 999997548
MT 999982604
MU 999988632
MV 999975914
MW 999991903
MX 999978066
MY 999995010
MZ 999981189
NA 999976735
NC 999961053
NE 999990091
NF 999989399
NG 999985037
NI 999965733
NL 999988890
NO 999993122
NP 999972410
NR 999956464
NU 999987046
NZ 999998214
OM 999967428
PA 999944775
PE 999998598
PF 999959978
PG 999987347
PH 999981534
PK 999954268
PL 999996619
PM 999998975
PR 999978127
PT 999993404
PW 999991278
PY 999993590
QA 999995061
RE 999998518
RO 999994148
RS 999999923
RU 999995809
RW 999980184
SA 999973822
SB 999972832
SC 999991021
SD 999963744
SE 999972256
SG 999977637
SH 999999068
SI 999980580
SK 999998152
SL 999999269
SM 999941188
SN 999990278
SO 999978960
SR 999997483
ST 999980447
SV 999999945
SX 999938671
SY 999990666
SZ 999992537
TC 999969904
TD 999999303
TG 999977640
TH 999979255
TJ 999983666
TK 999971131
TM 999958998
TN 999979170
TO 999959971
TP 999986796
TR 999996679
TT 999984435
TV 999974536
TW 999975092
TZ 999992734
UA 999972948
UG 999980070
UM 999998377
US 999918442
UY 999989662
UZ 999982762
VA 999987372
VC 999991495
VE 999997971
VG 999954576
VI 999990063
VN 999974393
VU 999976113
WF 999961299
WS 999970242
YE 999984650
YT 999994707
ZA 999998692
ZM 999993331
ZW 999943540
觉得文章还不错的话,可以转发此文关注小编,每天更新技术好文