IMDb demo 프로그램 ①
1. 기능
이 프로그램은 영화 목록을 텍스트 파일로 받아서
장르별로 영화의 갯수를 세어 output으로 반환하는 기능을 가지고 있다.
2. movie.dat ( input 데이터 )
다음은 hdfs 의 input 폴더 안에 들어갈 텍스트 데이터이고, 한줄씩 Map의 input으로 전달된다.
순번::영화제목(개봉년도)::장르|장르|장르...
위와 같은 형식으로 작성되어 있다.
1::Toy Story (1995)::Animation|Children's|Comedy 2::Jumanji (1995)::Adventure|Children's|Fantasy 3::Grumpier Old Men (1995)::Comedy|Romance 4::Waiting to Exhale (1995)::Comedy|Drama 5::Father of the Bride Part II (1995)::Comedy 6::Heat (1995)::Action|Crime|Thriller 7::Sabrina (1995)::Comedy|Romance 8::Tom and Huck (1995)::Adventure|Children's 9::Sudden Death (1995)::Action 10::GoldenEye (1995)::Action|Adventure|Thriller 11::American President, The (1995)::Comedy|Drama|Romance 12::Dracula: Dead and Loving It (1995)::Comedy|Horror 13::Balto (1995)::Animation|Children's 14::Nixon (1995)::Drama 15::Cutthroat Island (1995)::Action|Adventure|Romance 16::Casino (1995)::Drama|Thriller 17::Sense and Sensibility (1995)::Drama|Romance ... |
3. output.dat ( output 데이터 )
hdfs 의 movieoutput 폴더 하위에 생성될 출력파일이다.
다음과 같이 장르와 해당 장르의 영화 갯수가 차례대로 적혀있어야 한다.
( 장르가 적힌 순서는 무관하다 )
Thriller 182 Romance 108 Children's 58 ... |
4. Map
순번::영화제목(개봉년도)::장르|장르|장르...
1) input 텍스트를 Tokenizer로 자른다.
2) 장르를 key 로 value 는 1로 write 한다.
5. shuffle (Hadoop engine 에서 자동으로 수행 - 직접 작성하지 않음)
(key, value) -> (장르, 1,1,1,1,1)
위와 같이 장르를 key값으로 하고 같은 장르를 가진 value 들을 모두 1로 한 entity 가 생성되고
이 entitiy들이 Reduce의 input으로 전달된다.
6. Reduce
Reduce 의 input 으로 들어온 key (장르) 는 그대로 output의 key로 설정하고
value인 1을 모두 더해서 그 합을 output의 value로 설정하여 write한다.
7. 소스코드
import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.GenericOptionsParser;
public class IMDB
{
public static class IMDBMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String s = value.toString();
String[] token = s.split("::");
int len = token.length;
StringTokenizer itr = new StringTokenizer(token[len-1],"|");
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IMDBReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main (String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2)
{
System.err.println("Usage: imdbdemo <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "imdb");
job.setJarByClass(IMDB.class);
job.setMapperClass(IMDBMapper.class);
job.setCombinerClass(IMDBReducer.class);
// MAP DEBUGING
//job.setNumReduceTasks(0);
job.setReducerClass(IMDBReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
FileSystem.get(job.getConfiguration()).delete( new Path(otherArgs[1]), true);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
반응형
'개발 > Hadoop' 카테고리의 다른 글
MapSideJoin - 분산 캐쉬 Distributed Cache (0) | 2021.06.10 |
---|---|
[ Hadoop : 하둡 ] IMDb demo 프로그램 ② - MapReduce 작성 (1) | 2021.05.24 |
[ Hadoop : 하둡 ] Uber demo 프로그램 - MapReduce (0) | 2021.05.08 |