발번역 주의 요망, 내용 중 오류 발견시 연락 요망 = =
# What is Cascading?
Cascading은 분산컴퓨팅 클러스터 혹은 단일컴퓨터 노드 환경에서 질의를 정의&공유 하는 등의 작업을 하고
data processing workflow를 실행하는 data processing API이다.
단일컴퓨터 노드 환경(local mode)는 test코드와 workflow를 cluster에 배포 전에 효율적으로 테스트 할 수 있다.
분산컴퓨팅 환경에서는 Apache Hadoop plaform에서 이용 할 수 있다.
이를 이용해 쉽게 Hadoop Application을 개발 하고 Job을 생성 하고 스케쥴링 할 수 있다.
쉽게 설명 하자면
HDFS 등에 분산되어있는 데이터를 Cascading을 이용하여 쉽게 추출/정제를 하고 work flow를 생성 할 수 있다.
User는 Map/Reduce를 직접 구현하지 않아도 Casading의 Each, GroupBy, Aggregator, Filter 등을 이용하여 Hadoop Job에서 Mapper와 Reducer를 작동 시킬 수 있다.
또한 기 구현 된 Operation 중에 원하는 Operation이 없다면 Interface를 상속받아 구현 할 수 있다.
# Example - word count
source = Input 데이터의 위치와 원하는 형태의 스키마로 Tap 생성
sink = 저장하려는 스키마와, OuputPath, 싱크모드를 지정한 Tap 생성
assembly= 파이프를 만들어서 wordcount로 명명하고,
RegularExpression Generator로 각각의 라인을 단어로 쪼개서 word라고 명명된 필드에 넣음
word로 groupBy하여 count함
Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) ); Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE ); // the 'head' of the pipe assembly Pipe assembly = new Pipe( "wordcount" ); // For each input Tuple // parse out each word into a new Tuple with the field name "word" // regular expressions are optional in Cascading String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)"; Function function = new RegexGenerator( new Fields( "word" ), regex ); assembly = new Each( assembly, new Fields( "line" ), function ); // group the Tuple stream by the "word" value assembly = new GroupBy( assembly, new Fields( "word" ) ); // For every Tuple group // count the number of occurrences of "word" and store result in // a field named "count" Aggregator count = new Count( new Fields( "count" ) ); assembly = new Every( assembly, count ); // initialize app properties, tell Hadoop which jar file to use Properties properties = new Properties(); AppProps.setApplicationJarClass( properties, Main.class ); // plan a new Flow from the assembly using the source and sink Taps // with the above properties FlowConnector flowConnector = new HadoopFlowConnector( properties ); Flow flow = flowConnector.connect( "word-count", source, sink, assembly ); // execute the flow, block until complete flow.complete(); |
# Comparison of pipe types
Pipe type
|
Purpose
|
Input
|
Output
|
Pipe
|
instantiate a pipe; create or
name a branch
|
name |
a (named) pipe
|
SubAssembly |
create nested subassemblies
|
|
|
Each |
apply a filter or function, or
branch a stream
|
tuple stream (grouped or not) |
a tuple stream, optionally
filtered or transformed
|
Merge |
merge two or more streams
with identical fields |
two or more tuple streams
|
a tuple stream, unsorted
|
GroupBy
|
sort/group on field values;
optionally merge two or
more streams with identical
fields
|
one or more tuple streams
with identical fields
|
a single tuple stream,
grouped on key field(s) with
optional secondary sort |
Every |
apply aggregator or buffer
operation |
grouped tuple stream
|
a tuple stream plus new
fields with operation results
|
CoGroup |
join 1 or more streams on
matching field values
|
one or more tuple streams
|
a single tuple stream, joined
on key field(s) |
HashJoin |
join 1 or more streams on
matching field values
|
one or more tuple streams |
a tuple stream in arbitrary
order |
# 결론
M/R 코드는 딱 한번 짜봤는데 간단한 M/R조차 개발하는게 쉽지 않았다.
Cascading은 쉽게 work flow를 생성하여 ETL 업무를 하는데 적격이다.
또한 Cascading API는 Java뿐아니라 Scala, Clojure, Groove, JRuby, Jython 등에서도 사용 할 수 있다.
그러나 work flow가 필요 없는 간단한 질의의 경우 Impala나 Shark, Hive, Pig 등이 더 유리 할 수 있다.
개발 전엔 항상 데이터 크기, 모양, 해상도, 작업 복잡도, 실시간이냐 아니냐 등
여러가지를 잘 따져보고 자신의 상황에 맞는 것을 사용해야됨!!