位置: IT常识 - 正文

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

编辑:rootadmin
大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出 3.6.1OutputFormat接口实现类

推荐整理分享大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架),希望有所帮助,仅作参考,欢迎阅读内容。

文章相关热门搜索词:大数据框架结构,大数据技术框架有哪些,大数据处理框架,大数据整合的框架,大数据 框架,大数据 框架,大数据框架基础教程,大数据框架基础教程,内容如对您有帮助,希望把文章链接给更多的朋友!

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。

1、文本输出TextOutputFormat

默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,疑问TextOutputFormat调用toString()方法把他们转换为字符串。

2、SequenceFileOutputFormat

将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

3、自定义OutputFormat

根据用户需求,自定义实现输出。

3.6.2自定义OutputFormat

1、使用场景

为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。

例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同的目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

2、自定义OUtputFormat步骤

(1)自定义一个类继承FileOutputFormat。

(2)改写RecordWriter,具体改写输出数据的方法write()。

3.6.3自定义OutputFormat案例实操

1、需求

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。

(1)输入数据

http://www.baidu.comhttp://www.google.comhttp://cn.bing.comhttp://www.atguigu.comhttp://www.sohu.comhttp://www.sina.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sindsafa.com

(2)期望输出数据

http://www.atguigu.comhttp://cn.bing.comhttp://www.baidu.comhttp://www.google.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sina.comhttp://www.sindsafa.comhttp://www.sohu.com

2、需求分析

3、案例实操

(1)编写FilterMapper类

package com.cuiyf41.output;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 写出 context.write(value, NullWritable.get()); }}

(2)编写FilterReducer类

package com.cuiyf41.output;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> { Text k = new Text(); @Override protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 1 获取一行 String line = key.toString(); // 2 拼接 line = line + "\r\n"; // 3 设置key k.set(line); // 4 输出 context.write(k, NullWritable.get()); }}

(3)自定义一个OutputFormat类

package com.atguigu.mapreduce.outputformat;import java.io.IOException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)throws IOException, InterruptedException {// 创建一个RecordWriterreturn new FilterRecordWriter(job);}}

(4)编写RecordWriter类

package com.cuiyf41.output;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream atguiguOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { // 1 获取文件系统 FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 创建输出文件路径 Path atguiguPath = new Path("e:/atguigu.log"); Path otherPath = new Path("e:/other.log"); // 3 创建输出流 atguiguOut = fs.create(atguiguPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判断是否包含“atguigu”输出到不同文件 if (key.toString().contains("atguigu")) { atguiguOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭资源 IOUtils.closeStream(atguiguOut); IOUtils.closeStream(otherOut); }}

(5)编写FilterDriver类

package com.cuiyf41.output;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FilterDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "e:/input/log.txt", "e:/output2" }; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要将自定义的输出格式组件设置到job中 job.setOutputFormatClass(FilterOutputFormat.class); Path input = new Path(args[0]); Path output = new Path(args[1]); // 如果输出路径存在,则进行删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output,true); } FileInputFormat.setInputPaths(job, input); // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, output); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
本文链接地址:https://www.jiuchutong.com/zhishi/299790.html 转载请保留说明!

上一篇:谷歌研究员走火入魔事件曝光:认为AI已具备人格,被罚带薪休假,聊天记录让网友San值狂掉...(谷歌研究院)

下一篇:猿创征文|【React 三】组件实例的三大属性(state、props、refs)(关于猿猴的作文)

  • 小米手环6怎么设置密码(小米手环6怎么调时间)

    小米手环6怎么设置密码(小米手环6怎么调时间)

  • windows移动中心没有无线开关(windows移动中心没有无线网)

    windows移动中心没有无线开关(windows移动中心没有无线网)

  • 手机怎么录指纹(苹果手机怎么录指纹)

    手机怎么录指纹(苹果手机怎么录指纹)

  • 手机突然黑屏一秒后又亮了(手机突然黑屏一闪一闪的怎么回事)

    手机突然黑屏一秒后又亮了(手机突然黑屏一闪一闪的怎么回事)

  • 三星a50s如何关机(三星a51hd怎么关)

    三星a50s如何关机(三星a51hd怎么关)

  • 交易被淘宝关闭是什么意思(交易被淘宝关闭,原因是之前有过退款)

    交易被淘宝关闭是什么意思(交易被淘宝关闭,原因是之前有过退款)

  • uid是什么(uid是什么软件)

    uid是什么(uid是什么软件)

  • iphone11出厂带贴膜吗(苹果11出场带膜吗)

    iphone11出厂带贴膜吗(苹果11出场带膜吗)

  • txupd是什么应用程序

    txupd是什么应用程序

  • 5g双模可以用4g吗(5g双模好吗)

    5g双模可以用4g吗(5g双模好吗)

  • 因特网最初创建时的应用领域是什么(因特网的最早雏形)

    因特网最初创建时的应用领域是什么(因特网的最早雏形)

  • oppo手机怎样把联系人存到卡里(oppo手机怎样把5g网络改成4g网络)

    oppo手机怎样把联系人存到卡里(oppo手机怎样把5g网络改成4g网络)

  • 什么叫做ie系列浏览器(什么叫做ie系列浏览器有哪些)

    什么叫做ie系列浏览器(什么叫做ie系列浏览器有哪些)

  • iphone我的名片怎么改(iphone我的名片怎么设置)

    iphone我的名片怎么改(iphone我的名片怎么设置)

  • 荣耀9x升降摄像头怎么用(荣耀9x升降摄像头卡住了)

    荣耀9x升降摄像头怎么用(荣耀9x升降摄像头卡住了)

  • 厉害的反义词(厉害的反义词和近义词)

    厉害的反义词(厉害的反义词和近义词)

  • 微信加好友能不能隐藏来源(微信加好友能不能看到来源)

    微信加好友能不能隐藏来源(微信加好友能不能看到来源)

  • 苹果8plus和苹果11对比(苹果8plus和苹果11)

    苹果8plus和苹果11对比(苹果8plus和苹果11)

  • 苹果xsmax是几核处理器(苹果xsmax是a十几处理器)

    苹果xsmax是几核处理器(苹果xsmax是a十几处理器)

  • 拼多多用户名是哪一个(拼多多用户名称是什么)

    拼多多用户名是哪一个(拼多多用户名称是什么)

  • 苹果手表时间不准怎么调(苹果手表时间不准怎么回事)

    苹果手表时间不准怎么调(苹果手表时间不准怎么回事)

  • 苹果ipad可以打电话吗(苹果ipad可以打印吗)

    苹果ipad可以打电话吗(苹果ipad可以打印吗)

  • 迅雷尚未验证怎么回事(迅雷无法验证app)

    迅雷尚未验证怎么回事(迅雷无法验证app)

  •  oppo怎么解除禁止安装权限(oppo怎么解除禁止自动读取验证码)

    oppo怎么解除禁止安装权限(oppo怎么解除禁止自动读取验证码)

  • win10神key激活秘钥推荐 附激活工具(win10永久激活码神key一周内)

    win10神key激活秘钥推荐 附激活工具(win10永久激活码神key一周内)

  • service命令  管理系统服务(service运行命令)

    service命令 管理系统服务(service运行命令)

  • zipgrep命令  查找压缩包内指定的文件(linux在查找zip里的文件)

    zipgrep命令 查找压缩包内指定的文件(linux在查找zip里的文件)

  • 未达起征点的增值税要交企业所得税吗
  • 今日头条测试收款如何记账?
  • 公司的现金收入可以直接发工资吗
  • 汇算清缴时发现去年的成本少确认了报表怎么填列
  • 企业年金是否缴税
  • 工程机械租赁税点
  • 欠税后股东能否退股
  • 员工工伤保险报销走什么科目
  • 公众号注册验证方式
  • 票据贴现无手续怎么处理
  • 记账凭证广告费
  • 小规模企业所得税标准
  • 预估入账的收入是什么
  • 金税盘一直更新登陆不进去
  • 物业公司要怎么开水电费证明
  • 存货盘亏计入什么科目批准后
  • 企业所得税期间费用利息收支
  • 上年免税收入转内销补交税分录
  • 公司账户收到车险怎么做账
  • 股权激励会计处理分录和企业所得税
  • 空调作为固定资产吗
  • 以前年度损益调整借贷方向
  • 甲供材料如何纳税
  • 审计报告与汇缴报告
  • 如何查看电脑的型号和配置
  • 或有事项确认预计负债的分录
  • 收到收据的会计分录
  • 从租计征房产税怎么申报
  • 公司收到的应收股利需要分配利润吗
  • 本月购买原材料怎么入账
  • 子公司开票给母公司,冲减利润,怎么避免税务风险
  • 增值税最高开票限额审批时限
  • php yield 异步
  • 专项应付款费用化
  • 公允价值变动损益借贷方向增减
  • uniapp route
  • 代收税款是什么意思
  • torch测定
  • zend框架教程
  • /f命令
  • 帝国cms安装教程
  • 支付的劳务费是什么意思
  • 以固定资产抵债发生的固定资产
  • 个人独资企业用企业资金进行个人消费
  • python天气数据的爬取与分析
  • 印花税核定征收比例取消了吗
  • 呆账冲销是什么意思
  • 抵扣的税可以退掉么?
  • 销货退回的账务如何处理
  • 如何确认固定资产的减值
  • 小微企业开专票需要缴纳多少税
  • 计提的费用收到发票时候怎么做账
  • a公司持有b公司
  • 跨年度发票的报表怎么填
  • 公司车辆购买保险申请
  • 工程前期费用包含哪些
  • 检测费用的会计分录
  • 暂估成本多久要冲掉
  • 本月工资未发个税能不能扣
  • 投标保证金退还
  • 公司赠送客户礼品怎么做账
  • 电子发票是否使用
  • 新手必看教程
  • 企业构建固定资产无形资产和其他长期资产支付
  • 什么是结账?结账前要做哪些准备工作?
  • mysql闪退怎么回事
  • Linq to SQL 插入数据时的一个问题
  • centos 7.2安装
  • xp主题怎么改成win7
  • linux系统编译命令
  • xp如何升级到sp3
  • 内存使用过低
  • win7桌面图标有小方块
  • scrollWidth,clientWidth与offsetWidth的区别
  • css写文字
  • unity mode
  • Android ADB server didn't ACK * failed to start daemon * 简单有效的解决方案
  • unity behavior designer
  • 广州地税官网
  • 十堰市税务大厅
  • 免责声明:网站部分图片文字素材来源于网络,如有侵权,请及时告知,我们会第一时间删除,谢谢! 邮箱:opceo@qq.com

    鄂ICP备2023003026号

    网站地图: 企业信息 工商信息 财税知识 网络常识 编程技术

    友情链接: 武汉网站建设