位置: IT常识 - 正文

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

编辑:rootadmin
大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出 3.6.1OutputFormat接口实现类

推荐整理分享大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架),希望有所帮助,仅作参考,欢迎阅读内容。

文章相关热门搜索词:大数据框架结构,大数据技术框架有哪些,大数据处理框架,大数据整合的框架,大数据 框架,大数据 框架,大数据框架基础教程,大数据框架基础教程,内容如对您有帮助,希望把文章链接给更多的朋友!

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。

1、文本输出TextOutputFormat

默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,疑问TextOutputFormat调用toString()方法把他们转换为字符串。

2、SequenceFileOutputFormat

将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

3、自定义OutputFormat

根据用户需求,自定义实现输出。

3.6.2自定义OutputFormat

1、使用场景

为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。

例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同的目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

2、自定义OUtputFormat步骤

(1)自定义一个类继承FileOutputFormat。

(2)改写RecordWriter,具体改写输出数据的方法write()。

3.6.3自定义OutputFormat案例实操

1、需求

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。

(1)输入数据

http://www.baidu.comhttp://www.google.comhttp://cn.bing.comhttp://www.atguigu.comhttp://www.sohu.comhttp://www.sina.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sindsafa.com

(2)期望输出数据

http://www.atguigu.comhttp://cn.bing.comhttp://www.baidu.comhttp://www.google.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sina.comhttp://www.sindsafa.comhttp://www.sohu.com

2、需求分析

3、案例实操

(1)编写FilterMapper类

package com.cuiyf41.output;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 写出 context.write(value, NullWritable.get()); }}

(2)编写FilterReducer类

package com.cuiyf41.output;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> { Text k = new Text(); @Override protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 1 获取一行 String line = key.toString(); // 2 拼接 line = line + "\r\n"; // 3 设置key k.set(line); // 4 输出 context.write(k, NullWritable.get()); }}

(3)自定义一个OutputFormat类

package com.atguigu.mapreduce.outputformat;import java.io.IOException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)throws IOException, InterruptedException {// 创建一个RecordWriterreturn new FilterRecordWriter(job);}}

(4)编写RecordWriter类

package com.cuiyf41.output;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream atguiguOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { // 1 获取文件系统 FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 创建输出文件路径 Path atguiguPath = new Path("e:/atguigu.log"); Path otherPath = new Path("e:/other.log"); // 3 创建输出流 atguiguOut = fs.create(atguiguPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判断是否包含“atguigu”输出到不同文件 if (key.toString().contains("atguigu")) { atguiguOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭资源 IOUtils.closeStream(atguiguOut); IOUtils.closeStream(otherOut); }}

(5)编写FilterDriver类

package com.cuiyf41.output;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FilterDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "e:/input/log.txt", "e:/output2" }; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要将自定义的输出格式组件设置到job中 job.setOutputFormatClass(FilterOutputFormat.class); Path input = new Path(args[0]); Path output = new Path(args[1]); // 如果输出路径存在,则进行删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output,true); } FileInputFormat.setInputPaths(job, input); // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, output); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
本文链接地址:https://www.jiuchutong.com/zhishi/299790.html 转载请保留说明!

上一篇:谷歌研究员走火入魔事件曝光:认为AI已具备人格,被罚带薪休假,聊天记录让网友San值狂掉...(谷歌研究院)

下一篇:猿创征文|【React 三】组件实例的三大属性(state、props、refs)(关于猿猴的作文)

  • 自查补缴增值税的会计处理
  • 未实际处置资产损失税前扣除金额的分录?
  • 预计产品质量保证损失是什么意思
  • 两年前发票怎么红冲
  • 记账凭证附单据是有哪些
  • 网上社保增减员操作流程
  • 员工单独购买工伤保险可以吗
  • 同一法人的两家企业可以以同一投标人身份投标吗
  • 报废固定资产产生的净损益属于利得吗
  • 医院员工工资表模板
  • 房地产企业增值税纳税义务发生时间
  • 销项负数发票如何勾选
  • 企业购买银行理财产品如何进行账务处理
  • 核定征收的公司用做账么
  • 小规模企业差额征收税率
  • 互联网税务行动计划五大板块
  • 营运资金的范畴包括哪些
  • 劳务服务公司业务范围
  • 开票3个点13个点是什么意思
  • 个税滞纳金如何调增所得税费用
  • 工程项目处理废旧钢材怎么入账的
  • 电子发票怎么打成A4纸一半
  • 现金为负数时怎么做账
  • 车位销售代理合同
  • microsoft edge怎么设置关闭所有网页
  • 如何在excel中链接图片对方能显示
  • 在window系统中哪些用户可以查看日志
  • 第三方支付销售
  • linux管理员账户名
  • 一些出口商为什么要倾销
  • 罚款收入营业外收入还是其他应收款
  • 安装win7提示版本过低
  • 工装费用会计分录
  • php定义字符串的三种方式
  • lama羊驼
  • .patch文件
  • 土地使用税退税的会计分录
  • 当月注销外管证个税账户也会注销吗
  • 金税盘怎样设置收款人复核人
  • 本期到期债务计算公式
  • react高阶组件
  • phpcms模板下载
  • 长期待摊费用的账务处理
  • 个人所得税进什么科目
  • 资产总额的季度平均值
  • 待认证进项税额月末怎么处理
  • 不开票收入要交印花税吗
  • 行政单位无偿划拨资产账务处理
  • 一般纳税人资格证书怎么查询
  • 非居民企业所得税征收方式鉴定表
  • 一个企业只有收入怎么办
  • 在建工程中发生的费用
  • 土地使用税怎么征收标准
  • 土地使用税和房产税是一起的吗
  • 企业在计划内应生产的工业产品
  • mysql搜索字段内容
  • 设置u盘为只读
  • ie11安装方法
  • VirtualBox安装64位系统报错的处理方法
  • linux下4种kill某个用户所有进程的方法
  • netddeclnt.exe - netddeclnt是什么进程 有什么用
  • 升级win8.1后vpn出现错误720该怎么办?
  • 基于stm32的100个毕业设计
  • [置顶]马粥街残酷史
  • python根据ip获取主机名
  • python数据结构教程
  • ecmascript6 官方文档
  • 基于vue的购物系统
  • unity销毁预制体
  • linux实现shell代码
  • python获取数据的方法
  • androidstudio性能检测工具
  • python字典常用操作
  • 国家税务总局咨询热线电话
  • 广东国税局发票查询平台
  • 国家税务局总局河南官网
  • 税务局如何查询社保明细
  • 交强险和车船税网上购买
  • 福建省泉州市石狮市新服装城
  • 冯炜教授现况如何
  • 免责声明:网站部分图片文字素材来源于网络,如有侵权,请及时告知,我们会第一时间删除,谢谢! 邮箱:opceo@qq.com

    鄂ICP备2023003026号

    网站地图: 企业信息 工商信息 财税知识 网络常识 编程技术

    友情链接: 武汉网站建设