Python 与 Hadoop(MapReduce)

Python Hadoop

Posted by BY tuo on October 24, 2019

前言

MapReduce是Hadoop的编程模型。Hadoop使用Java开发,所以MapReduce一般也使用java进行开发。Hadoop 提供了一个叫做Hadoop Streaming的工具,使用这个工具理论上支持使用任何可执行程序或者脚本基于map/reduce模型处理Hadoop集群上的数据。

使用Python开发MapReduce应用有两种方法:

第一种使用hadoop streaming工具调用python脚本

第二种使用jython解释器调用python脚本。 jython脚本,最终要还是要编译为字节码,在JVM上执行。使用Jython相当于使用Python的语法,写java程序,算是结合了python和java的优势,但是jython和python并不完全兼容,而且2015.5后Jython就没有再更新,使用起来风险太大。

本文介绍使用Hadoop streaming和python相结合开发MapReduce程序

环境

Hadoop 目前使用的环境是 CDH 5.15.0 Python 目前使用的环境是 Python 3.6.4 (需要每个节点都安装相同的版本和模块)

示例程序介绍

读取文本–》寻找评论内容–》使用snownlp产生情感得分–》将结果放入HDFS

Map程序示例

mapper.py

#!/usr/bin/env python

import sys
from snownlp import SnowNLP

for line in sys.stdin:
    words = line.split("\001")
    for cellVal in words:
        val = cellVal.split("\002");
        if (str(val[0]).__contains__("comment_content")):
            try:
                score = round(SnowNLP(val[1]).sentiments, 4)
                print('emotion\002{0}\001{1}'.format(str(score),line))
            except ZeroDivisionError:
                print('emotion\002{0}\001{1}'.format("0",line))

Reduce程序示例

reducer.py

#!/usr/bin/env python
import sys

curr_word = None
curr_count = 0

for line in sys.stdin:
        if(line!=''):
                print(line)

运行命令

在文件目录运行 目前运行的服务器都是Hadoop主节点

hadoop jar /opt/cloudera/parcels/CDH-5.15.0-1.cdh5.15.0.p0.21/jars/hadoop-streaming-2.6.0-cdh5.15.0.jar -files ‘mapper.py,reducer.py’ -numReduceTasks 20 -input /xx/2019-10-14 -jobconf mapreduce.map.memory.mb=8049 -jobconf mapreduce.reduce.memory.mb=8049 -output /data/testout -mapper “python mapper.py” -reducer “python reducer.py”

参数讲解:

    Hadoop streaming 是一个jar(hadoop-streaming-2.6.0-cdh5.15.0.jar)包,可以再hadoop安装目录下找到,不同的版本路径可能不通。
    -files 参数需要上传的资源文件,这里只有map和reduce两个脚本
    -numReduceTasks   启动reduce task的数量
    -input          输入目录,原始文本
    -jobconf        后面跟着的是MapReduce的参数 
                mapreduce.reduce.memory.mb  每个reduce程序占用的内存
                mapreduce.map.memory.mb     每个map程序占用的内存
    -output: mapreduc 计算结果输出目录,mapreduce回自动创建,如果在执行时此路径已经存在整个mapreduce job将失败
    -mapper: mapper task调用的命令
    -reducer:reducer task调用的命令 ### 监控MR运行状况和报错处理

    MapReduce监控页面:http://synhadoop124:8088/cluster/apps
    
    怎么寻找自己的job页面
    运行成功命令之后会有类似 下面的代码
        19/10/24 15:39:34 INFO client.RMProxy: Connecting to ResourceManager at synhadoop124/192.168.0.124:8032
        19/10/24 15:39:34 INFO client.RMProxy: Connecting to ResourceManager at synhadoop124/192.168.0.124:8032
        19/10/24 15:39:36 INFO mapred.FileInputFormat: Total input paths to process : 422
        19/10/24 15:39:36 INFO mapreduce.JobSubmitter: number of splits:422
        19/10/24 15:39:37 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1571218839574_0172
        19/10/24 15:39:37 INFO impl.YarnClientImpl: Submitted application application_1571218839574_0172
        19/10/24 15:39:37 INFO mapreduce.Job: The url to track the job: http://synhadoop124:8088/proxy/application_1571218839574_0172/
        19/10/24 15:39:37 INFO mapreduce.Job: Running job: job_1571218839574_0172
        19/10/24 15:39:43 INFO mapreduce.Job: Job job_1571218839574_0172 running in uber mode : false
        19/10/24 15:39:43 INFO mapreduce.Job:  map 0% reduce 0%
        19/10/24 15:40:27 INFO mapreduce.Job:  map 1% reduce 0%

mapreduce.Job: Job job_1571218839574_0172 1571218839574_0172 在监控页面搜索该值就可以看到你正在运行的job