SparkSQL创建RDD:UDAF(UserDefinedAggregatedFunction)用户自定义聚合函数【Java版纯代码】_java 自定义rdd-程序员宅基地

要实现8个方法,8个方法中,最为重要的有3个:

initialize:初始化,在给,map端每一个分区的每一个key进行初始化,给0

update:在map端聚合

merge: 在reduce端聚合

Java版代码:

package com.bjsxt;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.hive.ql.parse.HiveParser_SelectClauseParser.selectClause_return;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
 * 用户自定义聚合函数
 * @author Administrator
 *
 */
public class UDAF {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("test").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        List<String> list = Arrays.asList("zhangsan", "lisi", "wangwu", "zhangsan", "zhangsan", "lisi", "wangwu");
        JavaRDD<String> parallelize = sc.parallelize(list);
        JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {

            /**
             * map是一对一的类型
             * 进去的是String类型,出来的是row类型
             */
            @Override
            public Row call(String s) throws Exception {

                return RowFactory.create(s);
            }
        });
        List<StructField> fields = new ArrayList<StructField>();
        /**
         * 创建名为name的区域,类型为String
         */
        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        /**
         * 创建schema        
         */
        StructType schema = DataTypes.createStructType(fields);
        /**
         * 将rdd和schema相聚和
         */
        DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
        /**
         * 创建一个用户名为user的表格
         */
        df.registerTempTable("user");
        /**
         * 注册一个UDAF函数,实现统计相同值的个数 
         * 注意: 这里可以自定义一个类,继承UserDefinedAggregatedFunction类也是可以的
         */
        sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {
            /**
             * initialize相当于初始化:
             * map端每个元素的初试值都为零
             * reduce端的每个元素的初始值都为零
             * update相当于map端的聚合
             * merge相当于reduce端的聚合
             *map端的 merge的好处:
             *1.减少了suffer磁盘的数据量
             *2.减少了reduce端拉取的数据量
             *3.减少了reduce端的聚合次数
             */
            /**
             * 方法 用户自定义聚合函数
             */

            /**
             * ** 在进行聚合造作的时候,所要处理的数据的结果的类型
             * 
             * @return
             */

            @Override
            public StructType bufferSchema() {
                // TODO Auto-generated method stub
                return DataTypes.createStructType(
                        Arrays.asList(DataTypes.createStructField("bfferxx", DataTypes.IntegerType, true)));
            }

            /**
             * 指定UDAF函数计算后,返回的结果类型
             * 
             * @return
             */
            @Override
            public DataType dataType() {
                // TODO Auto-generated method stub
                return DataTypes.IntegerType;
            }

            /**
             * 确保一致性,一般用true 用以标记针对给定的一组输入 UDAF是否纵使生成相同的结果,
             * 
             * @return
             */
            @Override
            public boolean deterministic() {
                // TODO Auto-generated method stub
                return true;
            }

            /**
             * 最后返回一个和dataType方法的类型要一致的类型 返回UDAF最后的计算结果
             * 
             * @param arg0
             * @return row是已经分好组的key
             */
            @Override
            public Object evaluate(Row row) {

                return row.getInt(0);
            }

            /**
             * 初始化一个内部自定义的值 在Aggregate之前每组数据的初始化结果
             * 
             * @param buffer
             * 在map端每一个分区,中的每一个key做初始化,里边的值都为零
             * initialize不仅作用在map端,初始化元素为零
             * 而且还作用在reduce端,初始化reduce端的每个元素的值也为零
             */
            @Override
            public void initialize(MutableAggregationBuffer buffer) {
                buffer.update(0, 0);

            }

            /**
             * 指定输入字段的字段及类型
             * 
             * @return
             */
            @Override
            public StructType inputSchema() {

                return DataTypes.createStructType(
                        Arrays.asList(DataTypes.createStructField("namexxx", DataTypes.StringType, true)

                        ));

            }

            /**
             * 合并update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据会在多个节点上处理
             * 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来 buffer1.getInt(0):大聚合的时候,上一次聚合后的值
             * buffer2.getInt(0):这次计算传入进来的update的结果 这里即是:最后在分布式节点上完成传后需要进行全局级别的merge操作
             * 
             * @param arg0
             * @param arg1
             *merge 作用在reduce端,将所有的数据拉取在一起
             *reduce端跨分区,跨节点
             * 
             */
            @Override
            public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
                buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));
                /**
                 * 难不成各个节点的名字叫buffer吗?
                 */
            }

            /**
             * 更新,可以认为是一个一个地将组内的字段传递进来的,实现拼接的逻辑 buffer.getInt(0)获取的是上一次聚合后的值
             * 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小的聚合 大聚合发生在reduce端
             * 这里即是:在进行聚合的时候,每当有新的值进来,对分组后得值如何进行计算
             * 
             * @param arg0
             * @param arg1
             *            update相当于map端的聚合 作用在每一个分区的每一个小组
             */
            @Override
            public void update(MutableAggregationBuffer buffer, Row arg1) {
                buffer.update(0, buffer.getInt(0) + 1);

            }

        });
        /**
         * 真正的SQL查询语句
         */
        sqlContext.sql("select name,StringCount(name) as strCount from user group by name").show();
        sc.stop();
    }
}


Scala版代码:

class MyUDAF extends UserDefinedAggregateFunction  {
  // 聚合操作时,所处理的数据的类型
  def bufferSchema: StructType = {
    DataTypes.createStructType(Array(DataTypes.createStructField("aaa", IntegerType, true)))
  }
  // 最终函数返回值的类型
  def dataType: DataType = {
    DataTypes.IntegerType
  }

  def deterministic: Boolean = {
    true
  }
  // 最后返回一个最终的聚合值     要和dataType的类型一一对应
  def evaluate(buffer: Row): Any = {
    buffer.getAs[Int](0)
  }
  // 为每个分组的数据执行初始化值
  def initialize(buffer: MutableAggregationBuffer): Unit = {
     buffer(0) = 0
  }
  //输入数据的类型
  def inputSchema: StructType = {
    DataTypes.createStructType(Array(DataTypes.createStructField("input", StringType, true)))
  }
  // 最后merger的时候,在各个节点上的聚合值,要进行merge,也就是合并
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Int](0)+buffer2.getAs[Int](0) 
  }
  // 每个组,有新的值进来的时候,进行分组对应的聚合值的计算
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Int](0)+1
  }
}

object UDAF {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("udaf")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val rdd = sc.makeRDD(Array("zhangsan","lisi","wangwu","zhangsan","lisi"))
    val rowRDD = rdd.map { x => {RowFactory.create(x)} }
    
    val schema = DataTypes.createStructType(Array(DataTypes.createStructField("name", StringType, true)))
    val df = sqlContext.createDataFrame(rowRDD, schema)
    df.show()
    df.registerTempTable("user")
    /**
     * 注册一个udaf函数
     */
    sqlContext.udf.register("StringCount", new MyUDAF())
    sqlContext.sql("select name ,StringCount(name) from user group by name").show()
    sc.stop()
  }
}

亲,鼓励一下我呗。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/wyqwilliam/article/details/81271805

智能推荐

ionic tabs_tabs ionic-程序员宅基地

文章浏览阅读417次。https://ionicframework.com/docs/ionicons/搜索图标原文链接:http://blog.maptoface.com/post/124_tabs ionic

Java统一异常处理--实战篇-程序员宅基地

文章浏览阅读4k次,点赞2次,收藏20次。文章目录背景什么是统一异常处理目标统一异常处理实战用 Assert(断言) 替换 throw exceptionAssert善解人意的Enum定义统一异常处理器类异常处理器说明handleServletExceptionhandleBindExceptionhandleValidExceptionhandleBusinessException、handleBaseExceptionhandleException异于常人的404统一返回结果验证统一异常处理主要代码开始验证捕获自定义异常捕获进入 Control_java统一异常处理

python字符串处理之数字求和_python处理字符串:将字符串中的数字相加求和-程序员宅基地

文章浏览阅读916次。计算字符串中所有数字的和,字符串中有数字和字母组合而成如果出现连续数字,按照一个数操作具体解释在代码行里:def sum_str(str1):len1=len(str1) #首先将字符串str1的长度赋值给len1sum = n = 0 #建立一个值为0的空变量sun #建立一个值为0的空变量nfor i in range(len1): ..._输入一个只包含数学宇符的字符品输出字符串对应的数字总和

阿里云服务器上搭建Discuz论坛_如何用阿里云搭建dz-程序员宅基地

文章浏览阅读8.7k次。前沿: 搭建discuz论坛,需要搭建软件环境(也就是其运行的环境)本实例采用的是XAMMP软件站集成环境目的:搭建Discuz3论坛搭建步骤: 第一步:下载XAMPP(Apache+Mysql+PHP+RERL)百度云下载地址:XAMPP点击下载 windows到服务器之间的文件传输采用xftp; 终端shell软件采用xshell,然后将下载的x_如何用阿里云搭建dz

百度地图api离线开发(示例源代码)_百度地图离线api-程序员宅基地

文章浏览阅读5.8k次。相关教程: 1、如何搭建WEB离线地图开发环境 2、下载离线地图数据(金字塔瓦片数据) 3、离线地图二次开发接口(离线地图API) 4、离线地图API接口实例DEMO 5、离线地图完整演示实例说明:1.当前版本支持 谷歌电子/卫星地图瓦片、高德地图、阿里云地图、超图、腾讯地图等(只需下载该地图源的瓦片拷贝到指定目录即可);2.效果预览演示地址:..._百度地图离线api

ubuntu20.04.3安装Qt6.22操作步骤_ubuntu安装qt6-程序员宅基地

文章浏览阅读1.1w次,点赞5次,收藏37次。ubuntu20.04.3安装Qt6.22 LTS的操作步骤_ubuntu安装qt6

随便推点

嵌入式基础知识总结_嵌入式软件基础知识点总结-程序员宅基地

文章浏览阅读7.2k次,点赞16次,收藏80次。提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档文章目录前言一、选择题二、填空题三、简答题四、综合题2.读入数据总结前言 本篇为嵌入式知识点总结,花费了大概一两天时间整理的,现在分享给大家!!!一、选择题1.以下哪个不是嵌入式系统的特点?( )A.面向特定应用 B.高质量高可靠 C.可裁剪性 D.具备二次开发能力 解析:嵌入式系统本身不具备二次开发能力,即_嵌入式软件基础知识点总结

[学习] 鸿洋大大的万能适配器(1)_鸿洋 commonadapter-程序员宅基地

文章浏览阅读953次。总结一下从 ViewHolder 开始学习public ViewHolder(Context context, View itemView) { super(itemView); mContext = context; mConvertView = itemView; mViews = new SparseArra..._鸿洋 commonadapter

数据集下载-程序员宅基地

文章浏览阅读3.6k次。数据集下载_数据集下载

Android APK反编译就这么简单 详解(附图)_[置顶] android apk反编译就这么简单 详解(附图)-程序员宅基地

文章浏览阅读343次。在学习Android开发的过程你,你往往会去借鉴别人的应用是怎么开发的,那些漂亮的动画和精致的布局可能会让你爱不释手,作为一个开发者,你可能会很想知道这些效果界面是怎么去实现的,这时,你便可以对改应用的APK进行反编译查看。下面是我参考了一些文章后简单的教程详解。(注:反编译不是让各位开发者去对一个应用破解搞重装什么的,主要目的是为了促进开发者学习,借鉴好的代码,提升自我开发水平。)_[置顶] android apk反编译就这么简单 详解(附图)

正点原子DS100拆解全过程-硬件工程师必备-程序员宅基地

文章浏览阅读4.2k次,点赞4次,收藏18次。前言:之前一篇只针对正点原子DS100手持示波器的使用介绍文章。可作为一个电子工程师,光使用不是我们的风格哈,我们还要拆开看看电路。开拆外壳首先,看下图,DS100不是使用螺母进行固定的,而是通过结构上的卡扣进行固定,所以大家拆的时候要小心,别拆开之后,卡扣都坏了。顺手再提一句,其实我这次开拆也是准备修它呢?因为电源按键在我使用的时候,因为过于使劲导致焊锡松动了。所以准备补点焊建议经过拆卸之后,发现受损按键的部分是全靠焊盘和焊接的连接支撑受力,所以当使用者稍微使劲按压,这个键极易被损坏._ds100拆解

突破伪静态的四种注入方法_伪静态怎么防止被注入js-程序员宅基地

文章浏览阅读2.5k次。伪静态,主要是为了隐藏传递的参数名,伪静态只是一种URL重写的手段,既然能接受参数输入,所以并不能防止注入。目前来看,防止注入的最有效的方法就是使用LINQ。常规的伪静态页面如下:http://www.2cto.com /play/Diablo.html, 在看到之前先要确定这个页面是静态还是伪静态,鉴别方法很多。 例如关联的动态页面是game.php ,那么当用户访问后程序会自动转换_伪静态怎么防止被注入js