位置: IT常识 - 正文

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

编辑:rootadmin
大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出 3.6.1OutputFormat接口实现类

推荐整理分享大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架),希望有所帮助,仅作参考,欢迎阅读内容。

文章相关热门搜索词:大数据框架结构,大数据技术框架有哪些,大数据处理框架,大数据整合的框架,大数据 框架,大数据 框架,大数据框架基础教程,大数据框架基础教程,内容如对您有帮助,希望把文章链接给更多的朋友!

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。

1、文本输出TextOutputFormat

默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,疑问TextOutputFormat调用toString()方法把他们转换为字符串。

2、SequenceFileOutputFormat

将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

3、自定义OutputFormat

根据用户需求,自定义实现输出。

3.6.2自定义OutputFormat

1、使用场景

为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。

例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同的目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

2、自定义OUtputFormat步骤

(1)自定义一个类继承FileOutputFormat。

(2)改写RecordWriter,具体改写输出数据的方法write()。

3.6.3自定义OutputFormat案例实操

1、需求

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。

(1)输入数据

http://www.baidu.comhttp://www.google.comhttp://cn.bing.comhttp://www.atguigu.comhttp://www.sohu.comhttp://www.sina.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sindsafa.com

(2)期望输出数据

http://www.atguigu.comhttp://cn.bing.comhttp://www.baidu.comhttp://www.google.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sina.comhttp://www.sindsafa.comhttp://www.sohu.com

2、需求分析

3、案例实操

(1)编写FilterMapper类

package com.cuiyf41.output;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 写出 context.write(value, NullWritable.get()); }}

(2)编写FilterReducer类

package com.cuiyf41.output;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> { Text k = new Text(); @Override protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 1 获取一行 String line = key.toString(); // 2 拼接 line = line + "\r\n"; // 3 设置key k.set(line); // 4 输出 context.write(k, NullWritable.get()); }}

(3)自定义一个OutputFormat类

package com.atguigu.mapreduce.outputformat;import java.io.IOException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)throws IOException, InterruptedException {// 创建一个RecordWriterreturn new FilterRecordWriter(job);}}

(4)编写RecordWriter类

package com.cuiyf41.output;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream atguiguOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { // 1 获取文件系统 FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 创建输出文件路径 Path atguiguPath = new Path("e:/atguigu.log"); Path otherPath = new Path("e:/other.log"); // 3 创建输出流 atguiguOut = fs.create(atguiguPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判断是否包含“atguigu”输出到不同文件 if (key.toString().contains("atguigu")) { atguiguOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭资源 IOUtils.closeStream(atguiguOut); IOUtils.closeStream(otherOut); }}

(5)编写FilterDriver类

package com.cuiyf41.output;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FilterDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "e:/input/log.txt", "e:/output2" }; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要将自定义的输出格式组件设置到job中 job.setOutputFormatClass(FilterOutputFormat.class); Path input = new Path(args[0]); Path output = new Path(args[1]); // 如果输出路径存在,则进行删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output,true); } FileInputFormat.setInputPaths(job, input); // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, output); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
本文链接地址:https://www.jiuchutong.com/zhishi/299790.html 转载请保留说明!

上一篇:谷歌研究员走火入魔事件曝光:认为AI已具备人格,被罚带薪休假,聊天记录让网友San值狂掉...(谷歌研究院)

下一篇:猿创征文|【React 三】组件实例的三大属性(state、props、refs)(关于猿猴的作文)

  • 小米watchs1pro如何接电话(小米watchs1pro如何安装应用)

    小米watchs1pro如何接电话(小米watchs1pro如何安装应用)

  • 荣耀60怎么设置来电闪光(荣耀60怎么设置三指截屏)

    荣耀60怎么设置来电闪光(荣耀60怎么设置三指截屏)

  • 网易云听歌次数在哪看(网易云听歌次数最多多少次)

    网易云听歌次数在哪看(网易云听歌次数最多多少次)

  • 淘宝店铺评分灰色和红色什么区别(淘宝店铺评分灰分怎么看)

    淘宝店铺评分灰色和红色什么区别(淘宝店铺评分灰分怎么看)

  • 笔记本可以加装固态硬盘吗(笔记本可以加装什么)

    笔记本可以加装固态硬盘吗(笔记本可以加装什么)

  • 抖音背包在哪里查看(抖音背包在哪里打开)

    抖音背包在哪里查看(抖音背包在哪里打开)

  • 华为nova6耗电量快正常吗(华为nova6耗电太快了)

    华为nova6耗电量快正常吗(华为nova6耗电太快了)

  • 抖音互粉会不会降低权重(抖音互粉会不会影响账号)

    抖音互粉会不会降低权重(抖音互粉会不会影响账号)

  • 苹果探探闪退怎么回事(探探闪退怎么回事)

    苹果探探闪退怎么回事(探探闪退怎么回事)

  • 个人在唯品会怎么开店(唯品会个人卖家)

    个人在唯品会怎么开店(唯品会个人卖家)

  • 快手红包快币怎么提现(快手红包快币怎么查看)

    快手红包快币怎么提现(快手红包快币怎么查看)

  • 苹果xr支持5g网络吗(苹果xr支持5g不)

    苹果xr支持5g网络吗(苹果xr支持5g不)

  • 苹果6splus怎么截图啊(苹果6splus怎么截屏长图)

    苹果6splus怎么截图啊(苹果6splus怎么截屏长图)

  • 小米手机的成功是基于什么模式(小米手机的成功是基于什么模式还有发展前行)

    小米手机的成功是基于什么模式(小米手机的成功是基于什么模式还有发展前行)

  • 打印怎么缩印(pdf怎么缩印)

    打印怎么缩印(pdf怎么缩印)

  • 微信看一看里3+朋友看过是什么意思(微信看一看里面的朋友看过是谁在看)

    微信看一看里3+朋友看过是什么意思(微信看一看里面的朋友看过是谁在看)

  • 手机热点怎么看谁连接(手机热点怎么看用了多少流量)

    手机热点怎么看谁连接(手机热点怎么看用了多少流量)

  • 华为mate30怎么设置指纹应用锁(华为mate30怎么设置下面三个键)

    华为mate30怎么设置指纹应用锁(华为mate30怎么设置下面三个键)

  • 怎样退出qq音乐登录(怎样退出qq音乐一起听)

    怎样退出qq音乐登录(怎样退出qq音乐一起听)

  • 红魔3什么时间发布(红魔3什么时候出的)

    红魔3什么时间发布(红魔3什么时候出的)

  • 淘宝卖家怎样拒绝发货(淘宝卖家怎么拒收别人的消息)

    淘宝卖家怎样拒绝发货(淘宝卖家怎么拒收别人的消息)

  • ipsw.em降级怎么安装(ipswme降级如何在手机上安装)

    ipsw.em降级怎么安装(ipswme降级如何在手机上安装)

  • 钉钉签到在哪里(钉钉签到在哪里导出)

    钉钉签到在哪里(钉钉签到在哪里导出)

  • vivosos紧急联络怎么设置(vivo紧急联系人怎么使用)

    vivosos紧急联络怎么设置(vivo紧急联系人怎么使用)

  • word表格中光标怎么移不上去(word表格中光标位置不对)

    word表格中光标怎么移不上去(word表格中光标位置不对)

  • Linux系统中查看执行中的进程占用内存量的方法(linux系统中查看磁盘空间情况命令)

    Linux系统中查看执行中的进程占用内存量的方法(linux系统中查看磁盘空间情况命令)

  • 代扣个税的会计分录怎么做
  • 财务软件属于无形资产的哪一种
  • 季度预缴能不能弥补亏损
  • 学校付学生的现金奖励入什么科目
  • 营业执照缴纳印花税贴花怎么缴纳
  • 无偿受让股权的股东对发起股东没有出资承担责任
  • 什么情况下发票不能冲红
  • 个人所得税返还奖励财务人员做账
  • 专票上的账号打错了
  • 携税宝可以全额抵扣吗
  • 总资产报酬率可以用净利润计算吗
  • 出口佣金可以在企业所得税前扣除吗
  • 建筑类企业可以申报高新吗
  • 小规模纳税人需要做账吗
  • 企业所得税赞助费是否可税前扣除
  • 报销时可以一个人报销吗
  • 购入设备超过多少才允许一次性扣除
  • 购买税控设备,享受何种优惠政策
  • 利息赔款是否缴纳企业所得税?
  • 1697509422
  • 短期投资算债权吗
  • 稿费个人所得税计算方式
  • 利润弥补亏损如何处理
  • 如何解决浏览器禁止访问
  • 攀岗的企业性质有哪些
  • u盘怎么安装软件
  • 软件产品登记证书官费
  • 补充医疗保险是六险吗
  • 损失赔偿金要开什么发票
  • 不能运行windows10的设备有哪些
  • wordpress采集教程
  • 什么叫奖励旅游
  • 废旧物资回收企业所得税优惠政策
  • 出售其他债权投资差额计入
  • 印花税账务处理会计分录
  • 纳税人填写纳税申报表
  • 公司处理固定资产汽车的账务处理
  • 从网上学电脑下载什么软件
  • 织梦使用教程
  • 关于帝国战争的游戏
  • 固定资产与固定资产净值的区别
  • 增值税附表一填写
  • mysql开启远程访问权限
  • 工业用地便宜
  • 经营性应收项目的减少调增 为什么是减
  • 税务局核定税种需要多久
  • 存货和总账不平如何调整
  • 计提贷款损失准备的意义
  • 前期物业管理阶段的工作有哪些
  • 固定资产盘亏是什么意思
  • 自建厂房会计处理
  • 固定资产清理增值税计税依据
  • 开办费企业所得税处理
  • 原始凭证的审核内容
  • 筹资费用和财务费用一样吗
  • 采购未取得发票怎么结转成本
  • 定额发票的使用方法
  • 购车的费用包括哪些费用
  • 冲销应付账款暂估应付账款的分录怎么做
  • 预付账款的账务处理例题
  • 开一般户需要什么理由
  • windons10安装
  • windows7wifi密码
  • ubuntu系统怎么更新
  • mac有线不能上网
  • 获取mac命令
  • freebsd常用命令
  • executor进程
  • win10 rs1是什么版本
  • win10右键图片
  • cocos2dx小游戏
  • js 文件分割
  • linux做ftp
  • javascript中window.event事件用法详解
  • javascript nextSibling 与 getNextElement(node) 使用介绍
  • node.js的理解
  • js类的定义方法
  • 发票事业单位有税号吗
  • 怎么删除天眼查诉讼信息
  • 铁路土地使用税减免
  • 免责声明:网站部分图片文字素材来源于网络,如有侵权,请及时告知,我们会第一时间删除,谢谢! 邮箱:opceo@qq.com

    鄂ICP备2023003026号

    网站地图: 企业信息 工商信息 财税知识 网络常识 编程技术

    友情链接: 武汉网站建设