前言
MapReduce是Hadoop的编程模型。Hadoop使用Java开发,所以MapReduce一般也使用java进行开发。Hadoop 提供了一个叫做Hadoop Streaming的工具,使用这个工具理论上支持使用任何可执行程序或者脚本基于map/reduce模型处理Hadoop集群上的数据。
使用Python开发MapReduce应用有两种方法:
第一种使用hadoop streaming工具调用python脚本
第二种使用jython解释器调用python脚本。 jython脚本,最终要还是要编译为字节码,在JVM上执行。使用Jython相当于使用Python的语法,写java程序,算是结合了python和java的优势,但是jython和python并不完全兼容,而且2015.5后Jython就没有再更新,使用起来风险太大。
本文介绍使用Hadoop streaming和python相结合开发MapReduce程序
环境
Hadoop 目前使用的环境是 CDH 5.15.0 Python 目前使用的环境是 Python 3.6.4 (需要每个节点都安装相同的版本和模块)
示例程序介绍
读取文本–》寻找评论内容–》使用snownlp产生情感得分–》将结果放入HDFS
Map程序示例
mapper.py
#!/usr/bin/env python
import sys
from snownlp import SnowNLP
for line in sys.stdin:
words = line.split("\001")
for cellVal in words:
val = cellVal.split("\002");
if (str(val[0]).__contains__("comment_content")):
try:
score = round(SnowNLP(val[1]).sentiments, 4)
print('emotion\002{0}\001{1}'.format(str(score),line))
except ZeroDivisionError:
print('emotion\002{0}\001{1}'.format("0",line))
Reduce程序示例
reducer.py
#!/usr/bin/env python
import sys
curr_word = None
curr_count = 0
for line in sys.stdin:
if(line!=''):
print(line)
运行命令
在文件目录运行 目前运行的服务器都是Hadoop主节点
hadoop jar /opt/cloudera/parcels/CDH-5.15.0-1.cdh5.15.0.p0.21/jars/hadoop-streaming-2.6.0-cdh5.15.0.jar -files ‘mapper.py,reducer.py’ -numReduceTasks 20 -input /xx/2019-10-14 -jobconf mapreduce.map.memory.mb=8049 -jobconf mapreduce.reduce.memory.mb=8049 -output /data/testout -mapper “python mapper.py” -reducer “python reducer.py”
参数讲解:
Hadoop streaming 是一个jar(hadoop-streaming-2.6.0-cdh5.15.0.jar)包,可以再hadoop安装目录下找到,不同的版本路径可能不通。
-files 参数需要上传的资源文件,这里只有map和reduce两个脚本
-numReduceTasks 启动reduce task的数量
-input 输入目录,原始文本
-jobconf 后面跟着的是MapReduce的参数
mapreduce.reduce.memory.mb 每个reduce程序占用的内存
mapreduce.map.memory.mb 每个map程序占用的内存
-output: mapreduc 计算结果输出目录,mapreduce回自动创建,如果在执行时此路径已经存在整个mapreduce job将失败
-mapper: mapper task调用的命令
-reducer:reducer task调用的命令 ### 监控MR运行状况和报错处理
MapReduce监控页面:http://synhadoop124:8088/cluster/apps
怎么寻找自己的job页面
运行成功命令之后会有类似 下面的代码
19/10/24 15:39:34 INFO client.RMProxy: Connecting to ResourceManager at synhadoop124/192.168.0.124:8032
19/10/24 15:39:34 INFO client.RMProxy: Connecting to ResourceManager at synhadoop124/192.168.0.124:8032
19/10/24 15:39:36 INFO mapred.FileInputFormat: Total input paths to process : 422
19/10/24 15:39:36 INFO mapreduce.JobSubmitter: number of splits:422
19/10/24 15:39:37 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1571218839574_0172
19/10/24 15:39:37 INFO impl.YarnClientImpl: Submitted application application_1571218839574_0172
19/10/24 15:39:37 INFO mapreduce.Job: The url to track the job: http://synhadoop124:8088/proxy/application_1571218839574_0172/
19/10/24 15:39:37 INFO mapreduce.Job: Running job: job_1571218839574_0172
19/10/24 15:39:43 INFO mapreduce.Job: Job job_1571218839574_0172 running in uber mode : false
19/10/24 15:39:43 INFO mapreduce.Job: map 0% reduce 0%
19/10/24 15:40:27 INFO mapreduce.Job: map 1% reduce 0%
mapreduce.Job: Job job_1571218839574_0172 1571218839574_0172 在监控页面搜索该值就可以看到你正在运行的job