码说MapReduce

  MapReduce框架作为Hadoop发展初期的核心计算框架,为大数据处理技术飞速演进提供了基石。在Hadoop生态圈中,MapReduce框架由于其成熟稳定的性能,仍然是离线批处理技术的主力。以我们的北京移动大数据集群为例,Hive、SparkSql是支撑探索性数据查询的主要工具,其简单易懂的SQL语句查询,可以使具备基础数据库管理能力的人员轻松上手,完美地支撑了实时数据查询需求。

  在我最初使用Java写MapReduce程序之前,总有一个疑问:既然可以用SQL这么通俗易懂的语句直接操作数据,而且不需要过多了解MapReduce执行过程,为什么还要费力地用Java垒代码,去了解MapReduce的底层执行过程。什么样的应用场景需要我们来开发MapReduce呢?

  首先,Sql非常适用于处理结构化数据,对于非结构化数据以及需要特殊函数处理的数据比如文本数据,Sql则会力不从心。举一个小例子,从海量文本数据中提取各种字符编码并翻译为中文,过程中还涉及自动识别是utf-8还是ANSI亦或是其他编码格式,这个需求用MapReduce程序实现起来更为合理;另外,在处理业务逻辑较为复杂的任务时,使用Sql很难实现,其执行效率方面也很难满足业务需求。举例来说,我们需要将业务日志中的域名识别为相应的互联网应用,现实操作中需要分多种情况使用多重判断进行规则匹配,并剔除钓鱼网站和fake url,使用SQL很难实现业务逻辑。再例如,使用Sql进行多表join并叠加复杂的数学运算时,其效率也很难满足业务需求。

  在我们的机器学习工具开发过程中,为了使用原有数据建立特征向量,我们需要对原有表结构进行转化,需要迭代原始数据生成具有较多特征值的特征向量。原始数据量为13亿条,共13.2GB,我们尝试使用Hive SQL进行实现,经过测试,任务执行时间过长无法满足需求。而使用MapReduce编写两个Job实现业务逻辑,同时使用哈希算法优化字符串查询效率,最终处理时长为15分钟。应对这些复杂情况,使用MapReduce编程可以使我们获得更多对程序实现的控制和方法选择,通过底层算法优化实现效率提升。

  基于不同的业务场景,结合不同工具特点,我们采用SQL脚本和MapReduce开发程序结合的策略,使日常数据处理任务在效率上得到了很好地满足。在我们平台中,MapReduce程序承担了如关键字提取、应用匹配和标签规则运算等近30%的日常数据处理任务。

  总之,我们在实际应用中依据灵活性和效率来选择是否自己开发程序。

  概览MapReduce

  认识MapReduce先从架构入手,在此我们一图以蔽之:

物联网

  图 1

  现在广泛使用的MapReduce v2基于YARN架构,其角色包括Resource Manager(RM)、NodeManager(NM)、Application Master(AM)。RM由Master主机承担,主要负责任务调度和资源调配,NM和AM由各工作节点Slave承担,负责任务的处理和资源读写,其计算单位抽象为container。MapReduce的计算流程可以抽象为Splitting、Mapping、Shuffling、Reducing阶段,其中shuffling包括了Grouping、Sorting、Partitioning过程。以WordCount为例,如下图:

物联网

  图 2

  在掌握了MapReduce架构和原理的基础上,从代码的角度认识MapReduce才是程序员的正确打开方式。

  开发MapReduce

  MapReduce程序中,Map和Reduce逻辑功能分别通过扩展Mapper类和Reducer类实现。具体在实现过程中,我们在主类中将Mapper和Reducer类扩展并作为内部类调用,最后通过main函数定义输入输出以及Job配置,从而作为程序主入口。

  Map实现

  Mapper类扩展需要实现map方法,如下:

物联网

  根据需求可以扩展setup、cleanup和自定义方法等,扩展Mapper类时需要声明键值对类型,如 Mapper< NullWritable,Writable,IntWritable,Text >,依次分别为输入输出< key,value >类型,其中< NullWritable,Writable >是orc文件格式输入< key,value >类型。

  需要强调的是,MapReduce中所有输入输出字段类型都必须实现Writable或者WritableComparable类型,这是因为MapReduce中磁盘读写和节点数据传输过程涉及到数据的序列化和反序列化,需要通过这两类来实现。经常用到的IntWritable、LongWritable、Text等都是实现自WritableComparable类,如果需要,我们也可以扩展这两类实现自定义数据类型。例如,在通过MapReduce实现两表和多表Join的过程中,我通过实现WritableComparable类自定义Map输出的key字段类型,来实现对于Grouping和Sorting阶段不同比较字段的控制。