본문으로 바로가기

IMDb demo 프로그램 ①

 

1. 기능

이 프로그램은 영화 목록을 텍스트 파일로 받아서

장르별로 영화의 갯수를 세어 output으로 반환하는 기능을 가지고 있다.

 

2. movie.dat ( input 데이터 )

다음은 hdfs 의 input 폴더 안에 들어갈 텍스트 데이터이고, 한줄씩 Map의 input으로 전달된다.

순번::영화제목(개봉년도)::장르|장르|장르...

위와 같은 형식으로 작성되어 있다.

 

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action
10::GoldenEye (1995)::Action|Adventure|Thriller
11::American President, The (1995)::Comedy|Drama|Romance
12::Dracula: Dead and Loving It (1995)::Comedy|Horror
13::Balto (1995)::Animation|Children's
14::Nixon (1995)::Drama
15::Cutthroat Island (1995)::Action|Adventure|Romance
16::Casino (1995)::Drama|Thriller
17::Sense and Sensibility (1995)::Drama|Romance
...

 

3. output.dat ( output 데이터 )

hdfs 의 movieoutput 폴더 하위에 생성될 출력파일이다.

다음과 같이 장르와 해당 장르의 영화 갯수가 차례대로 적혀있어야 한다.

( 장르가 적힌 순서는 무관하다 )

 

Thriller 182
Romance 108
Children's 58
...

 

4. Map

순번::영화제목(개봉년도)::장르|장르|장르...

1) input 텍스트를 Tokenizer로 자른다.

2) 장르를 key 로 value 는 1로 write 한다.

 

 

5. shuffle (Hadoop engine 에서 자동으로 수행 - 직접 작성하지 않음)

(key, value) -> (장르, 1,1,1,1,1)

위와 같이 장르를 key값으로 하고 같은 장르를 가진 value 들을 모두 1로 한 entity 가 생성되고

이 entitiy들이 Reduce의 input으로 전달된다.

 

 

6. Reduce

Reduce 의 input 으로 들어온 key (장르) 는 그대로 output의 key로 설정하고

value인 1을 모두 더해서 그 합을 output의 value로 설정하여 write한다.

 

 

7. 소스코드

import java.io.*;
import java.util.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.GenericOptionsParser;

public class IMDB
{
	public static class IMDBMapper extends Mapper<Object, Text, Text, IntWritable>
	{
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException
		{
			String s = value.toString();
			String[] token = s.split("::");
			int len = token.length;
			StringTokenizer itr = new StringTokenizer(token[len-1],"|");
			while (itr.hasMoreTokens())
			{
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
		
	}

	public static class IMDBReducer extends Reducer<Text, IntWritable, Text, IntWritable>
	{
		private IntWritable result = new IntWritable();
		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
		{
			int sum = 0;
			for (IntWritable val : values)
			{
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}
	public static void main (String[] args) throws Exception
	{
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs.length != 2)
		{
			System.err.println("Usage: imdbdemo <in> <out>");
			System.exit(2);
		}
		Job job = new Job(conf, "imdb");
		job.setJarByClass(IMDB.class);
		job.setMapperClass(IMDBMapper.class);
		job.setCombinerClass(IMDBReducer.class);
		// MAP DEBUGING
		//job.setNumReduceTasks(0);  
		job.setReducerClass(IMDBReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		FileSystem.get(job.getConfiguration()).delete( new Path(otherArgs[1]), true);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
반응형