본문으로 바로가기

MapSideJoin - 분산 캐쉬 Distributed Cache

category 개발/Hadoop 2021. 6. 10. 03:13

분산 캐쉬 Distributed Cache

 

1. 개념 

작은 사이즈의 읽기전용 파일 (사전, meta data) 를 task tracker의 working directory에 복사하는 메커니즘

-> 이를 inmemory에 올림

Distributed Cache로 사용될 파일은 먼저 HDFS에 복사되고 그 위치가 사용되어야 함

default 최대크기 == 10GB (실질적으로 100MB가 적당)

2. 사용 전략

 1) 어떤 파일?

- 크기가 작은 파일, HDFS에 올라가 있는 파일 (relation_b.dat)

 

2) 분산 캐쉬(Distributed Cache) 사용 방법

- 프로그램의 인자로 Distributed Cache 파일의 HDFS상 위치 넘김

- Main에서 이 위치를 등록 (** 주의 local이 아닌 HDFS상 path)

1
DistributedCache.addCacheFile(new URI("data/relation_b"). job.getConfiguration());
cs

- Mapper의 setup 에서 위치를 참조하여 파일 오픈 (read -> inmemory)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    public static class MapSideJoinMapper extends Mapper<Object, Text, Text, Text>
    {
        // HashTable
        Hashtable<StringString> joinMap = new Hashtable<StringString>();
 
        public void map(Object key, Text value, Context context) throws IOExcepton, InterruptedException
        {...}
 
        protected void setup(Context context) throws IOException, InterruptedException
        {
            // read cache
            Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
            BufferedReader br = new BufferedReader(new FileReader(cacheFiles[0].toString()));
 
            String line = br.readLine();
            while(line != null)
            {
                StringTokenizer itr = new StringTokenizer(line, "|");
                String category = itr.nextToken();
                String category_name = itr.nextToken();
 
                // HashMap put (k, v) -> (category, category_name)
                joinMap.put(category, category_name);
 
                // read nextLine
                line = br.readLine();
            }
        }
 
    }
cs

 

** cache로 등록된 relation_b파일은 task실행 전에 HDFS에서 로컬 파일 시스템으로 복사되므로,

로컬 textfile을 읽는 코드를 작성하고, ID를 키로 하여 해쉬맵에 저장

 

3) Mapper

- map() 에서 읽혀지는 물건 종류 코드가 hashtable에 존재하는지 확인

1
String value = price + "\t" + joinMap.get(category);
cs
// Hashtable
<"K", "Kitchen">
<"B", "Baby">
...
: 이런식으로 저장되어 있으므로, 코드 "K"를 넣으면 "Kitchen" 반환

 

- 존재하면 -> (key : ID , val : 가격 + "\t" + 물건 종류) emit

 

** MapSideJoin에서는 Reducer를 사용하지 않으므로 Reducer은 생략

 

3. 전체 코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class MapSideJoin
{
    public static class MapSideJoinMapper extends Mapper<Object, Text, Text, Text>
    {
        // HashTable
        Hashtable<StringString> joinMap = new Hashtable<StringString>();
 
        public void map(Object key, Text value, Context context) throws IOExcepton, InterruptedException
        {
            StringTokenizer itr = new StringTokenizer(value.toString(), "|");
            Text outputKey = new Text();
            Text outputVal = new Text();
 
            String ID = itr.nextToken();
            String price = itr.nextToken();
            String category = itr.nextToken();
            String val = price + "\t" + joinMap.get(category);
 
            outputKey.set(ID);
           outputVal.set(val);
            context.write(outputKey, outputVal);
        }
 
        protected void setup(Context context) throws IOException, InterruptedException
        {
            // read cache
            Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
            BufferedReader br = new BufferedReader(new FileReader(cacheFiles[0].toString()));
 
            String line = br.readLine();
            while(line != null)
            {
                StringTokenizer itr = new StringTokenizer(line, "|");
                String category = itr.nextToken();
                String category_name = itr.nextToken();
 
                // HashMap put (k, v) -> (category, category_name)
                joinMap.put(category, category_name);
 
                // read nextLine
                line = br.readLine();
            }
        }
 
    }
 
    public static void main(String[] args) throws Exception
    {
        // 기본 설정 code 생략
        // cache 
        DistributedCache.addCacheFile(new URI("/join_data/relation_b"), job.getConfiguration());
    }
}
cs

관련글

내용


참고자료

반응형