位置: IT常识 - 正文

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

编辑:rootadmin
大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出 3.6.1OutputFormat接口实现类

推荐整理分享大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架),希望有所帮助,仅作参考,欢迎阅读内容。

文章相关热门搜索词:大数据框架结构,大数据技术框架有哪些,大数据处理框架,大数据整合的框架,大数据 框架,大数据 框架,大数据框架基础教程,大数据框架基础教程,内容如对您有帮助,希望把文章链接给更多的朋友!

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。

1、文本输出TextOutputFormat

默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,疑问TextOutputFormat调用toString()方法把他们转换为字符串。

2、SequenceFileOutputFormat

将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

3、自定义OutputFormat

根据用户需求,自定义实现输出。

3.6.2自定义OutputFormat

1、使用场景

为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。

例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同的目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

2、自定义OUtputFormat步骤

(1)自定义一个类继承FileOutputFormat。

(2)改写RecordWriter,具体改写输出数据的方法write()。

3.6.3自定义OutputFormat案例实操

1、需求

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。

(1)输入数据

http://www.baidu.comhttp://www.google.comhttp://cn.bing.comhttp://www.atguigu.comhttp://www.sohu.comhttp://www.sina.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sindsafa.com

(2)期望输出数据

http://www.atguigu.comhttp://cn.bing.comhttp://www.baidu.comhttp://www.google.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sina.comhttp://www.sindsafa.comhttp://www.sohu.com

2、需求分析

3、案例实操

(1)编写FilterMapper类

package com.cuiyf41.output;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 写出 context.write(value, NullWritable.get()); }}

(2)编写FilterReducer类

package com.cuiyf41.output;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> { Text k = new Text(); @Override protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 1 获取一行 String line = key.toString(); // 2 拼接 line = line + "\r\n"; // 3 设置key k.set(line); // 4 输出 context.write(k, NullWritable.get()); }}

(3)自定义一个OutputFormat类

package com.atguigu.mapreduce.outputformat;import java.io.IOException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)throws IOException, InterruptedException {// 创建一个RecordWriterreturn new FilterRecordWriter(job);}}

(4)编写RecordWriter类

package com.cuiyf41.output;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream atguiguOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { // 1 获取文件系统 FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 创建输出文件路径 Path atguiguPath = new Path("e:/atguigu.log"); Path otherPath = new Path("e:/other.log"); // 3 创建输出流 atguiguOut = fs.create(atguiguPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判断是否包含“atguigu”输出到不同文件 if (key.toString().contains("atguigu")) { atguiguOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭资源 IOUtils.closeStream(atguiguOut); IOUtils.closeStream(otherOut); }}

(5)编写FilterDriver类

package com.cuiyf41.output;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FilterDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "e:/input/log.txt", "e:/output2" }; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要将自定义的输出格式组件设置到job中 job.setOutputFormatClass(FilterOutputFormat.class); Path input = new Path(args[0]); Path output = new Path(args[1]); // 如果输出路径存在,则进行删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output,true); } FileInputFormat.setInputPaths(job, input); // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, output); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
本文链接地址:https://www.jiuchutong.com/zhishi/299790.html 转载请保留说明!

上一篇:谷歌研究员走火入魔事件曝光:认为AI已具备人格,被罚带薪休假,聊天记录让网友San值狂掉...(谷歌研究院)

下一篇:猿创征文|【React 三】组件实例的三大属性(state、props、refs)(关于猿猴的作文)

  • 朋友圈删除别人的评论对方能看到吗(朋友圈删除别人评论)

    朋友圈删除别人的评论对方能看到吗(朋友圈删除别人评论)

  • 淘宝开团是什么意思啊(淘宝开团是什么意思,不能直接买吗)

    淘宝开团是什么意思啊(淘宝开团是什么意思,不能直接买吗)

  • 华为手机自动扣费怎么关闭(华为手机自动扣费微信怎么关闭)

    华为手机自动扣费怎么关闭(华为手机自动扣费微信怎么关闭)

  • 抖音里的歌曲在哪里下载(抖音里的歌曲在哪里找到)

    抖音里的歌曲在哪里下载(抖音里的歌曲在哪里找到)

  • 拼团是什么意思(南航拼团是什么意思)

    拼团是什么意思(南航拼团是什么意思)

  • 话费充值错误怎么申请退款吗(话费充值错误怎么办理)

    话费充值错误怎么申请退款吗(话费充值错误怎么办理)

  • 亚马逊会员日是什么时候(亚马逊会员日是几天)

    亚马逊会员日是什么时候(亚马逊会员日是几天)

  • bmp是图片文件的后缀名吗(bmp是图片文件的扩展名吗)

    bmp是图片文件的后缀名吗(bmp是图片文件的扩展名吗)

  • 没有充电器怎么办(没有充电器怎么给苹果手机充电)

    没有充电器怎么办(没有充电器怎么给苹果手机充电)

  • 8p听筒声音小免提正常(苹果8p听筒小声)

    8p听筒声音小免提正常(苹果8p听筒小声)

  • 没登微信为什么有步数(没登微信为什么能用快捷键)

    没登微信为什么有步数(没登微信为什么能用快捷键)

  • 手机自带软件误删怎么恢复(手机自带软件被误删?看这里一键找回!)

    手机自带软件误删怎么恢复(手机自带软件被误删?看这里一键找回!)

  • 苹果11可以用快充会有影响吗(苹果11可以用快冲吗)

    苹果11可以用快充会有影响吗(苹果11可以用快冲吗)

  • 微商城是什么(微商城是什么平台)

    微商城是什么(微商城是什么平台)

  • 使用域名系统的原因(域名系统采用)

    使用域名系统的原因(域名系统采用)

  • xr自动调节亮度怎么关(xr自动调节屏幕亮度)

    xr自动调节亮度怎么关(xr自动调节屏幕亮度)

  • iphone xs大陆上市时间(苹果xs上架)

    iphone xs大陆上市时间(苹果xs上架)

  • 红米dc调光怎么开(红米10ⅹ5g屏幕dc调光)

    红米dc调光怎么开(红米10ⅹ5g屏幕dc调光)

  • 竖向文本框里的字怎么居中(竖向文本框的括号怎么向下)

    竖向文本框里的字怎么居中(竖向文本框的括号怎么向下)

  • 小米悬浮球设置方法(小米悬浮球设置皮肤)

    小米悬浮球设置方法(小米悬浮球设置皮肤)

  • 荣耀20比v20强在哪(荣耀20对比v20哪个好)

    荣耀20比v20强在哪(荣耀20对比v20哪个好)

  • input标签属性有哪些(input标签常用属性)

    input标签属性有哪些(input标签常用属性)

  • 新版qq小耳朵是什么(新版qq小耳朵是什么意思)

    新版qq小耳朵是什么(新版qq小耳朵是什么意思)

  • 猎豹浏览器如何小窗口播放(猎豹浏览器如何倍速播放)

    猎豹浏览器如何小窗口播放(猎豹浏览器如何倍速播放)

  • 【网络安全】实操XSS订单系统漏洞(利用盲打)(网络安全实验室)

    【网络安全】实操XSS订单系统漏洞(利用盲打)(网络安全实验室)

  • 一般纳税人结转税额怎么做会计分录
  • 盈利能力也可以反映短期偿债能力
  • 向农业生产者收购原木进项税怎么算
  • 失控发票一定要补税吗
  • 防伪开票系统技术维护费怎么做分录
  • 出差加油算什么费用
  • 个体户开票超过500万能注销吗
  • 个税系统如何升级到最新版本
  • 通用定额发票属于哪一类
  • 固定资产折旧方法有几种
  • 月末本年利润余额怎么算
  • 收到政府划拨的固定资产会计处理
  • 权益净利率如何算
  • 企业注销合适还是转让出去合适
  • 其他应收款在什么方
  • 堤围费是什么意思
  • 滴滴出行客运服务费 事业单位 报销
  • 在会计上跨季度增值税专用发票怎么进行红冲?
  • 无票收入报税后,后期又开票出去怎么处理?
  • 企业注销时未分配利润怎么处理
  • 资金筹集业务的会计处理
  • 工程款分配方式
  • 制造费用明细科目有哪些
  • 应收冲预收是什么意思
  • 给员工小孩买礼物合适吗
  • 微信聊天记录备份和恢复
  • 承租厂房需要缴纳什么税
  • 公司收到补偿款是利好还是利空
  • thinkphp获取数据库数据
  • laravel实战教程
  • 训练自己的GPT模型 中文改英文
  • php生成证书图片
  • 计算机视觉的应用
  • 入库税款异常怎么处理
  • 收缴违约金
  • 开票日期已超过开票截止日期是什么意思
  • 累计折旧是当月提还是下月提
  • phpcms怎么样
  • 小规模印花税可以按次申报吗
  • 产品检测费计入成本吗
  • 下列支出计入销售费用账户借方的是
  • 入伙退伙协议要盖章吗
  • 申请开立账户的请示
  • 二手车经销管理办法
  • 下月初可以认证上月的发票么
  • 两个独立核算单位可以共用一个账户吗
  • 运输公司汽车折旧能直接进主营成本吗
  • 会务费要纳税调整吗
  • 固定资产净值如何处理
  • 开发间接费用的折旧包括哪些
  • 所有者权益科目有哪些
  • 公司资产业务
  • 享受所得税减免优惠的生产性外商投资企业包括
  • 建筑图纸设计费多少钱
  • 发票入账的时候能作废吗
  • 车辆报废取得的收入增值税怎么算
  • 主营业务成本的增加在哪一方
  • linux重置开机密码
  • mysql 5.7.17 winx64安装配置教程
  • mac怎么复制文件路径地址
  • vc6运行程序
  • centos 虚拟化
  • scair.exe是什么程序
  • win7如何开启飞行模式
  • mac使用命令
  • win8应用商店无法连接网络
  • win10系统下如何安装谷歌浏览器插件?win10系统下谷歌浏览器插件的安装方法图文教程
  • windows显示内部版本
  • win8应用错误怎么办
  • 简述在windows中创建用户的步骤
  • linux常用桌面环境
  • ecxel进度条
  • android 界面滑动
  • 深入了解求助者时,首先要进行工作是
  • python module import
  • jQuery+ajax实现文章点赞功能的方法
  • linux中的shell命令
  • 出口退税是指对出口货物退还其在国内
  • 北京地税局报税流程
  • 泉州市税务局投诉电话
  • 免责声明:网站部分图片文字素材来源于网络,如有侵权,请及时告知,我们会第一时间删除,谢谢! 邮箱:opceo@qq.com

    鄂ICP备2023003026号

    网站地图: 企业信息 工商信息 财税知识 网络常识 编程技术

    友情链接: 武汉网站建设