位置: IT常识 - 正文

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

编辑:rootadmin
大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出 3.6.1OutputFormat接口实现类

推荐整理分享大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架),希望有所帮助,仅作参考,欢迎阅读内容。

文章相关热门搜索词:大数据框架结构,大数据技术框架有哪些,大数据处理框架,大数据整合的框架,大数据 框架,大数据 框架,大数据框架基础教程,大数据框架基础教程,内容如对您有帮助,希望把文章链接给更多的朋友!

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。

1、文本输出TextOutputFormat

默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,疑问TextOutputFormat调用toString()方法把他们转换为字符串。

2、SequenceFileOutputFormat

将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

3、自定义OutputFormat

根据用户需求,自定义实现输出。

3.6.2自定义OutputFormat

1、使用场景

为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。

例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同的目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

2、自定义OUtputFormat步骤

(1)自定义一个类继承FileOutputFormat。

(2)改写RecordWriter,具体改写输出数据的方法write()。

3.6.3自定义OutputFormat案例实操

1、需求

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。

(1)输入数据

http://www.baidu.comhttp://www.google.comhttp://cn.bing.comhttp://www.atguigu.comhttp://www.sohu.comhttp://www.sina.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sindsafa.com

(2)期望输出数据

http://www.atguigu.comhttp://cn.bing.comhttp://www.baidu.comhttp://www.google.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sina.comhttp://www.sindsafa.comhttp://www.sohu.com

2、需求分析

3、案例实操

(1)编写FilterMapper类

package com.cuiyf41.output;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 写出 context.write(value, NullWritable.get()); }}

(2)编写FilterReducer类

package com.cuiyf41.output;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> { Text k = new Text(); @Override protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 1 获取一行 String line = key.toString(); // 2 拼接 line = line + "\r\n"; // 3 设置key k.set(line); // 4 输出 context.write(k, NullWritable.get()); }}

(3)自定义一个OutputFormat类

package com.atguigu.mapreduce.outputformat;import java.io.IOException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)throws IOException, InterruptedException {// 创建一个RecordWriterreturn new FilterRecordWriter(job);}}

(4)编写RecordWriter类

package com.cuiyf41.output;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream atguiguOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { // 1 获取文件系统 FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 创建输出文件路径 Path atguiguPath = new Path("e:/atguigu.log"); Path otherPath = new Path("e:/other.log"); // 3 创建输出流 atguiguOut = fs.create(atguiguPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判断是否包含“atguigu”输出到不同文件 if (key.toString().contains("atguigu")) { atguiguOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭资源 IOUtils.closeStream(atguiguOut); IOUtils.closeStream(otherOut); }}

(5)编写FilterDriver类

package com.cuiyf41.output;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FilterDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "e:/input/log.txt", "e:/output2" }; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要将自定义的输出格式组件设置到job中 job.setOutputFormatClass(FilterOutputFormat.class); Path input = new Path(args[0]); Path output = new Path(args[1]); // 如果输出路径存在,则进行删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output,true); } FileInputFormat.setInputPaths(job, input); // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, output); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
本文链接地址:https://www.jiuchutong.com/zhishi/299790.html 转载请保留说明!

上一篇:谷歌研究员走火入魔事件曝光:认为AI已具备人格,被罚带薪休假,聊天记录让网友San值狂掉...(谷歌研究院)

下一篇:猿创征文|【React 三】组件实例的三大属性(state、props、refs)(关于猿猴的作文)

  • reno2z(reno怎么样)(reno 2手机)

    reno2z(reno怎么样)(reno 2手机)

  • ipadmini2能用电容笔吗(ipadmini2可以用电容笔吗?)

    ipadmini2能用电容笔吗(ipadmini2可以用电容笔吗?)

  • opporeno4pro是否有3.5mm耳机孔(opporeno4pro都有哪一些不一样的功能)

    opporeno4pro是否有3.5mm耳机孔(opporeno4pro都有哪一些不一样的功能)

  • 荣耀手机微信深色模式怎么设置(荣耀手机微信深度清理在哪里)

    荣耀手机微信深色模式怎么设置(荣耀手机微信深度清理在哪里)

  • 固态硬盘一直红灯(固态硬盘亮红灯是什么意思)

    固态硬盘一直红灯(固态硬盘亮红灯是什么意思)

  • 怎么注销qq支付(微信转账错了怎么收回来)

    怎么注销qq支付(微信转账错了怎么收回来)

  • 手机屏幕不亮了但是可以操作是怎么回事(手机屏幕不亮了是什么原因)

    手机屏幕不亮了但是可以操作是怎么回事(手机屏幕不亮了是什么原因)

  • 抖音属于什么系(抖音属于什么系列)

    抖音属于什么系(抖音属于什么系列)

  • 苹果8plus停产了吗(苹果8plus能卖多少钱)

    苹果8plus停产了吗(苹果8plus能卖多少钱)

  • 小米手机wifi已保存但连不上(小米手机WIFI已保存)

    小米手机wifi已保存但连不上(小米手机WIFI已保存)

  • 水冷主机和普通主机的区别(水冷主机和普通的区别)

    水冷主机和普通主机的区别(水冷主机和普通的区别)

  • 小米手机微信没有提示音是怎么回事(小米手机微信没有网络怎么回事)

    小米手机微信没有提示音是怎么回事(小米手机微信没有网络怎么回事)

  • 手机东西怎么传到电脑(手机东西怎么传输到电脑)

    手机东西怎么传到电脑(手机东西怎么传输到电脑)

  • 华为移动服务可以卸载吗(华为移动服务是干什么的可以删除么)

    华为移动服务可以卸载吗(华为移动服务是干什么的可以删除么)

  • 华为nova5pro有耳机插孔吗(华为nove5pro耳机)

    华为nova5pro有耳机插孔吗(华为nove5pro耳机)

  • 美版苹果xsmax支持双卡吗(美版苹果xsmax支持移动4g吗)

    美版苹果xsmax支持双卡吗(美版苹果xsmax支持移动4g吗)

  • 抖音注销后是什么状态(抖音注销后是什么状态,抖音注销7天了还没答复)

    抖音注销后是什么状态(抖音注销后是什么状态,抖音注销7天了还没答复)

  • iphonex如何用nfc读卡(iPhonex如何用5g)

    iphonex如何用nfc读卡(iPhonex如何用5g)

  • 怎么退出亲情号(怎么退出亲情号?)

    怎么退出亲情号(怎么退出亲情号?)

  • 苹果6sp上市时间(苹果6s plus上市价格)

    苹果6sp上市时间(苹果6s plus上市价格)

  • 修改无线密码(falogincn修改无线密码)

    修改无线密码(falogincn修改无线密码)

  • 什么是跨域?及7种跨域解决方法(什么是跨域以及解决方法)

    什么是跨域?及7种跨域解决方法(什么是跨域以及解决方法)

  • python计数排序法是什么(python统计出现次数并排序)

    python计数排序法是什么(python统计出现次数并排序)

  • 企业借出去的钱收不回来
  • 减免税款会计分录
  • 印花税申报缴纳错误,怎么办
  • 什么是价税分离的原因
  • 财务人员办理会计业务
  • 支付给个人的劳务费需要发票吗
  • 会计科目已受控于应收应付系统
  • 劳务公司一般纳税人开票几个点
  • 外资企业撤退
  • 企业购房税费政策
  • 冲销暂估入库如何做会计核算?
  • 保费收入确认的条件
  • 行政机关作出下列行为属于行政复议的范围
  • 绿化工程上税多少
  • 办税员身份怎么办理
  • 特许权使用费代扣代缴增值税
  • 政府土地出让金返还
  • 有限合伙人公司
  • 走物流的货物如何收费
  • 搬运费没有发票能入成本吗?
  • 不同方式的融资租赁业务如何贴花?
  • 1697510006
  • 如何关闭windows10安全中心服务
  • 购买加油卡怎么入账
  • php递归函数详解
  • php如何定义一个常量?常量名有哪些命名规则?
  • php 数组对象
  • 收到银行承兑汇票的账务处理
  • 增值税专用发票有几联?
  • 违反发票管理的处罚
  • 黑白相间表带
  • php获取文件类型
  • 金税盘锁住了怎么办
  • cqi映射表
  • 净资产收益率的概念
  • 零申报怎么做报表
  • 农业公司经营项目有哪些?
  • 勘察设计四个阶段
  • 正则表达式除去指定字母
  • 金税盘抵免增值税怎么做账
  • 固定资产一次性扣除政策
  • 收回多发的工资在上缴财政,可以用应缴财政款科目吗
  • 帝国cms视频教程
  • 电汇跟支票的区别
  • 织梦网站怎么添加关键词
  • 辅助生产车间照明费用计入什么科目
  • 去年的滞纳金没有入账怎么处理
  • 番茄开发票属于蔬菜吗?
  • 哪些合同不需要线上审批
  • 预付款已经开了发票未到货要怎么做账
  • sql语句取并集
  • 开普票需要公对公吗
  • 饭馆增值税
  • 固定资产正常报废与非正常报废的会计处理基本相同
  • 残疾人就业保证金上年工资总额是说上一年度么
  • 什么是补偿性余额,对银行借款的成本有何影响?
  • 预付账款转营业外支出
  • 税控盘服务费怎么填增值税申报表
  • 公司章程上的出资额怎么填
  • 权益类会计科目有哪些科目
  • 销售费用明细科目里有什么
  • sql自定义数据类型
  • mysql死锁的情况
  • winxp如何访问win10
  • windows server 2008 r2怎么用u盘启动
  • ubuntu搭建hexo
  • win8.1进入桌面
  • ssd安装centos7
  • centos7图形安装
  • win8.1删除所有内容并重新安装
  • win2000停止服务
  • nodejs 爬虫
  • linux创建用户的命令是什么
  • cshrc语法
  • 全面理解和准确把握新时代党的建设总要求心得体会
  • javascript学习指南
  • [置顶]JM259194
  • Android的AdapterView及其子类简介-android学习之旅(二十三)
  • 国税系统升级后怎么添加办税人
  • 船舶吨税税收优惠口诀
  • 免责声明:网站部分图片文字素材来源于网络,如有侵权,请及时告知,我们会第一时间删除,谢谢! 邮箱:opceo@qq.com

    鄂ICP备2023003026号

    网站地图: 企业信息 工商信息 财税知识 网络常识 编程技术

    友情链接: 武汉网站建设