분산 캐쉬 Distributed Cache
1. 개념
작은 사이즈의 읽기전용 파일 (사전, meta data) 를 task tracker의 working directory에 복사하는 메커니즘
-> 이를 inmemory에 올림
Distributed Cache로 사용될 파일은 먼저 HDFS에 복사되고 그 위치가 사용되어야 함
default 최대크기 == 10GB (실질적으로 100MB가 적당)
2. 사용 전략
1) 어떤 파일?
- 크기가 작은 파일, HDFS에 올라가 있는 파일 (relation_b.dat)
2) 분산 캐쉬(Distributed Cache) 사용 방법
- 프로그램의 인자로 Distributed Cache 파일의 HDFS상 위치 넘김
- Main에서 이 위치를 등록 (** 주의 local이 아닌 HDFS상 path)
1
|
DistributedCache.addCacheFile(new URI("data/relation_b"). job.getConfiguration());
|
cs |
- Mapper의 setup 에서 위치를 참조하여 파일 오픈 (read -> inmemory)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public static class MapSideJoinMapper extends Mapper<Object, Text, Text, Text>
{
// HashTable
Hashtable<String, String> joinMap = new Hashtable<String, String>();
public void map(Object key, Text value, Context context) throws IOExcepton, InterruptedException
{...}
protected void setup(Context context) throws IOException, InterruptedException
{
// read cache
Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
BufferedReader br = new BufferedReader(new FileReader(cacheFiles[0].toString()));
String line = br.readLine();
while(line != null)
{
StringTokenizer itr = new StringTokenizer(line, "|");
String category = itr.nextToken();
String category_name = itr.nextToken();
// HashMap put (k, v) -> (category, category_name)
joinMap.put(category, category_name);
// read nextLine
line = br.readLine();
}
}
}
|
cs |
** cache로 등록된 relation_b파일은 task실행 전에 HDFS에서 로컬 파일 시스템으로 복사되므로,
로컬 textfile을 읽는 코드를 작성하고, ID를 키로 하여 해쉬맵에 저장
3) Mapper
- map() 에서 읽혀지는 물건 종류 코드가 hashtable에 존재하는지 확인
1
|
String value = price + "\t" + joinMap.get(category);
|
cs |
// Hashtable <"K", "Kitchen"> <"B", "Baby"> ... : 이런식으로 저장되어 있으므로, 코드 "K"를 넣으면 "Kitchen" 반환 |
- 존재하면 -> (key : ID , val : 가격 + "\t" + 물건 종류) emit
** MapSideJoin에서는 Reducer를 사용하지 않으므로 Reducer은 생략
3. 전체 코드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
public class MapSideJoin
{
public static class MapSideJoinMapper extends Mapper<Object, Text, Text, Text>
{
// HashTable
Hashtable<String, String> joinMap = new Hashtable<String, String>();
public void map(Object key, Text value, Context context) throws IOExcepton, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString(), "|");
Text outputKey = new Text();
Text outputVal = new Text();
String ID = itr.nextToken();
String price = itr.nextToken();
String category = itr.nextToken();
String val = price + "\t" + joinMap.get(category);
outputKey.set(ID);
outputVal.set(val);
context.write(outputKey, outputVal);
}
protected void setup(Context context) throws IOException, InterruptedException
{
// read cache
Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
BufferedReader br = new BufferedReader(new FileReader(cacheFiles[0].toString()));
String line = br.readLine();
while(line != null)
{
StringTokenizer itr = new StringTokenizer(line, "|");
String category = itr.nextToken();
String category_name = itr.nextToken();
// HashMap put (k, v) -> (category, category_name)
joinMap.put(category, category_name);
// read nextLine
line = br.readLine();
}
}
}
public static void main(String[] args) throws Exception
{
// 기본 설정 code 생략
// cache
DistributedCache.addCacheFile(new URI("/join_data/relation_b"), job.getConfiguration());
}
}
|
cs |
관련글
내용
참고자료
반응형
'개발 > Hadoop' 카테고리의 다른 글
[ Hadoop : 하둡 ] IMDb demo 프로그램 ② - MapReduce 작성 (1) | 2021.05.24 |
---|---|
[ Hadoop : 하둡 ] Uber demo 프로그램 - MapReduce (0) | 2021.05.08 |
[ Hadoop : 하둡 ] IMDb demo 프로그램 ① - MapReduce 작성 (0) | 2021.05.08 |