位置: IT常识 - 正文

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

编辑:rootadmin
大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出 3.6.1OutputFormat接口实现类

推荐整理分享大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架),希望有所帮助,仅作参考,欢迎阅读内容。

文章相关热门搜索词:大数据框架结构,大数据技术框架有哪些,大数据处理框架,大数据整合的框架,大数据 框架,大数据 框架,大数据框架基础教程,大数据框架基础教程,内容如对您有帮助,希望把文章链接给更多的朋友!

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。

1、文本输出TextOutputFormat

默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,疑问TextOutputFormat调用toString()方法把他们转换为字符串。

2、SequenceFileOutputFormat

将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

3、自定义OutputFormat

根据用户需求,自定义实现输出。

3.6.2自定义OutputFormat

1、使用场景

为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。

例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同的目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

2、自定义OUtputFormat步骤

(1)自定义一个类继承FileOutputFormat。

(2)改写RecordWriter,具体改写输出数据的方法write()。

3.6.3自定义OutputFormat案例实操

1、需求

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。

(1)输入数据

http://www.baidu.comhttp://www.google.comhttp://cn.bing.comhttp://www.atguigu.comhttp://www.sohu.comhttp://www.sina.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sindsafa.com

(2)期望输出数据

http://www.atguigu.comhttp://cn.bing.comhttp://www.baidu.comhttp://www.google.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sina.comhttp://www.sindsafa.comhttp://www.sohu.com

2、需求分析

3、案例实操

(1)编写FilterMapper类

package com.cuiyf41.output;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 写出 context.write(value, NullWritable.get()); }}

(2)编写FilterReducer类

package com.cuiyf41.output;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> { Text k = new Text(); @Override protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 1 获取一行 String line = key.toString(); // 2 拼接 line = line + "\r\n"; // 3 设置key k.set(line); // 4 输出 context.write(k, NullWritable.get()); }}

(3)自定义一个OutputFormat类

package com.atguigu.mapreduce.outputformat;import java.io.IOException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)throws IOException, InterruptedException {// 创建一个RecordWriterreturn new FilterRecordWriter(job);}}

(4)编写RecordWriter类

package com.cuiyf41.output;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream atguiguOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { // 1 获取文件系统 FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 创建输出文件路径 Path atguiguPath = new Path("e:/atguigu.log"); Path otherPath = new Path("e:/other.log"); // 3 创建输出流 atguiguOut = fs.create(atguiguPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判断是否包含“atguigu”输出到不同文件 if (key.toString().contains("atguigu")) { atguiguOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭资源 IOUtils.closeStream(atguiguOut); IOUtils.closeStream(otherOut); }}

(5)编写FilterDriver类

package com.cuiyf41.output;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FilterDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "e:/input/log.txt", "e:/output2" }; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要将自定义的输出格式组件设置到job中 job.setOutputFormatClass(FilterOutputFormat.class); Path input = new Path(args[0]); Path output = new Path(args[1]); // 如果输出路径存在,则进行删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output,true); } FileInputFormat.setInputPaths(job, input); // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, output); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
本文链接地址:https://www.jiuchutong.com/zhishi/299790.html 转载请保留说明!

上一篇:谷歌研究员走火入魔事件曝光:认为AI已具备人格,被罚带薪休假,聊天记录让网友San值狂掉...(谷歌研究院)

下一篇:猿创征文|【React 三】组件实例的三大属性(state、props、refs)(关于猿猴的作文)

  • 怎样去推广自己的网店及技巧(怎样去推广自己的知识付费)

    怎样去推广自己的网店及技巧(怎样去推广自己的知识付费)

  • 拼多多补偿金会退到哪里(拼多多补偿金会返还吗)

    拼多多补偿金会退到哪里(拼多多补偿金会返还吗)

  • 华为手机怎样给分身微信改图标(华为手机怎样给软件设置密码)

    华为手机怎样给分身微信改图标(华为手机怎样给软件设置密码)

  • 1KB的准确数值是(1KB的准确数值是多少)

    1KB的准确数值是(1KB的准确数值是多少)

  • 钉钉开摄像头分屏会被发现吗(钉钉开摄像头分屏老师能看见吗)

    钉钉开摄像头分屏会被发现吗(钉钉开摄像头分屏老师能看见吗)

  • 网易云黑胶会员能看到访客吗(网易云黑胶会员怎么取消自动续费)

    网易云黑胶会员能看到访客吗(网易云黑胶会员怎么取消自动续费)

  • 京东自营与京东超市的区别(京东自营与京东自营官方旗舰店的区别)

    京东自营与京东超市的区别(京东自营与京东自营官方旗舰店的区别)

  • autoit error是什么意思(autoit error line0)

    autoit error是什么意思(autoit error line0)

  • 华为手机把天气删除了怎么移回来(华为手机把天气放桌面)

    华为手机把天气删除了怎么移回来(华为手机把天气放桌面)

  • OPPO手机信息允许通知为什么打不开(oppo手机短信息设置)

    OPPO手机信息允许通知为什么打不开(oppo手机短信息设置)

  • 主板尺寸分类(主板尺寸分类24.4×19.0cm)

    主板尺寸分类(主板尺寸分类24.4×19.0cm)

  • 苹果x要不要升级ios13.3

    苹果x要不要升级ios13.3

  • vivo下载管理器在哪里打开(vivo浏览器下载安装)

    vivo下载管理器在哪里打开(vivo浏览器下载安装)

  • 支付宝关注的公众号在哪里(支付宝关注的公众号怎么找)

    支付宝关注的公众号在哪里(支付宝关注的公众号怎么找)

  • 摄像机离线了怎么处理(摄像机离线了怎么连接)

    摄像机离线了怎么处理(摄像机离线了怎么连接)

  • 乐视手机怎么清理垃圾(乐视手机怎么清除密码)

    乐视手机怎么清理垃圾(乐视手机怎么清除密码)

  • 华为p20pro广角怎么用(华为p20p广角拍照)

    华为p20pro广角怎么用(华为p20p广角拍照)

  • 荣耀v20返回键在哪(荣耀v20怎么返回)

    荣耀v20返回键在哪(荣耀v20怎么返回)

  • 荣耀20i怎么关闭后台程序(荣耀20i怎么关闭智慧助手)

    荣耀20i怎么关闭后台程序(荣耀20i怎么关闭智慧助手)

  • 微信群发会封号吗(群发彩信会封号吗)

    微信群发会封号吗(群发彩信会封号吗)

  • 什么叫原生开发(原生开发小程序 什么意思)

    什么叫原生开发(原生开发小程序 什么意思)

  • Win10 KB5004945 更新后多个品牌打印机无法使用的解决方法

    Win10 KB5004945 更新后多个品牌打印机无法使用的解决方法

  • vue 如何让PC端不同屏幕大小分辨率自适应(屏幕自适应)(postcss-px2rem、px2rem-loader、lib-flexible)(vue适配pc)

    vue 如何让PC端不同屏幕大小分辨率自适应(屏幕自适应)(postcss-px2rem、px2rem-loader、lib-flexible)(vue适配pc)

  • Pytorch训练模型损失Loss为Nan或者无穷大(INF)原因(pytorch训练模型计算f1)

    Pytorch训练模型损失Loss为Nan或者无穷大(INF)原因(pytorch训练模型计算f1)

  • 车船税是每个月交还是每年交
  • 所得税的会计分录怎么做
  • 固定资产抵扣增值税何时开始
  • 库存现金和银行存款的区别
  • 车辆购置税多缴了能退吗
  • 事业单位其他收入的会计分录
  • 同一控制吸收合并会计处理
  • 固定资产尚未投入怎么办
  • 购置股权账务处理流程
  • 一般销售商品业务
  • 开个人普发票需要什么资料?
  • 预收电费怎么做会计分录
  • 开专票需要对方是一般纳税人吗
  • 会计七月份忙吗
  • 发票验旧后可领新发票吗
  • 金蝶标准版期末结转过不去
  • 筹备期间的开办企业是指
  • 增资后工商怎么变更
  • 个税扣除每个月更新吗
  • 增值税红字发票是什么意思
  • 补交以前年度个税怎么做账
  • 苹果mac电脑下刻字怎么弄
  • 绿宝树的养殖方法和注意事项视频
  • 在Linux系统中安装Samba服务器
  • macOS Big Sur 11.1 开发者预览版 Beta 2推送更新
  • 支付的费用没有发票能入费用科目吗
  • 企业发行股票的溢价收入应计入
  • 前端获取post请求返回数据
  • 投资性房地产期末公允价值大于账面价值
  • 境外汇款所得税缴纳规定
  • 差旅费包干什么意思
  • php功能实现
  • 非关联企业借款利息扣除
  • 镶嵌在巨石之间的英文
  • css字体溢出
  • 芒果sdk
  • 2023前端面试题目
  • 小微企业能申请留抵退税吗?
  • 增值税免税收入账务处理
  • python怎么设置窗口界面
  • 循环logo
  • python多进程间通信
  • 一个人失恋了该怎么安慰她
  • 票据粘贴顺序是什么
  • mongodb operator
  • 房地产企业增值税预征率
  • 工商年报纳税总额从哪看
  • 年末未缴增值税应符合计算逻辑
  • 电子承兑汇票到期怎么兑现
  • 出资入股是什么意思
  • 房贷利息抵个税怎么申请
  • 关于小规模纳税人的说法正确的是
  • 为什么其他权益工具投资处置价差计留存收益
  • 购买的税控设备
  • 体检入账的原始凭证是
  • 款已付未收到发票
  • 弥补上年亏损的分录 所得税
  • mysql查看当前执行的sql
  • 两台电脑如何共享网络
  • 硬盘磁盘保护
  • 组策略 guest
  • linux怎么和window系统共用
  • 手机上的安全模式是什么
  • win 8系统怎么样
  • win1020h2正式版下载
  • 在linux操作系统中,/etc/rc.d/init.d
  • EGL简介
  • 获取linux命令执行结果
  • css美化表单 案例
  • 在js里写html
  • jquery遍历li
  • node.js ajax
  • 用python写爬虫
  • python嵌套语句
  • 南京上班时间2021
  • 2020年职工探亲路费报销最新规定
  • 深圳买新房契税一般什么时候交比较好
  • 国家税务总局大连电子税务局
  • 在北京电子税务局怎么缴纳社保费
  • 珠海斗门井岸镇邮编
  • 免责声明:网站部分图片文字素材来源于网络,如有侵权,请及时告知,我们会第一时间删除,谢谢! 邮箱:opceo@qq.com

    鄂ICP备2023003026号

    网站地图: 企业信息 工商信息 财税知识 网络常识 编程技术

    友情链接: 武汉网站建设