《《Spark大数据技术与应用案例教程》教案第5课查询考试成绩排名前三的学生信息.docx》由会员分享,可在线阅读,更多相关《《Spark大数据技术与应用案例教程》教案第5课查询考试成绩排名前三的学生信息.docx(7页珍藏版)》请在课桌文档上搜索。
1、课时2课时(90min)教学目标知识技能目标:(1)掌握SparkRDD的转换操作和行动操作(2)理解RDD持久化和分区素质目标:培养学生熟练SparkRDD操作、持久化和分区的方法,提高学生专业知识技术能力教学重难点教学重点:SparkRDD的操作,RDD持久化和分区教学睚点:SparkRDD分区教学方法案例分析法、问答法、讨论法、i井授法教学用具电脑、投影仪、多媒体课件、教材教学过程主要教学内容及步骤课前任务【教师】布置课前任务,和学生负责人取得联系,让其提醒同学通过文旌课堂APP或其他学习软件,完成课前任务杳阅资料请大家了解SparkRDD的操作。【学生】完成课前任务考勤【教师】使用文旌
2、课堂APP进行签到【学生】班干部报请假人员及原因问题导入【教师】提出以下问题:SparkRDD有哪些操作?【鞋】思考、传授新知【教师】通过学生的回答引入新知,介绍SParkRDD操作、RDD持久化和分区的相关知识一、SParkRDD操作【教师】讲解SparkRDD操作的相关知识SparkRDD提供了一系列的操作方法,用于操作分布式数据集。RDD操作可以分为转换(transformation)操作和行动(action)操作。1.SparkRDD的转换操作转换操作是指将一个RDD转换成另一个RDD的操作,它们主要用于处理和清洗数据。常用的转换操作有m叩。、filter()xflatMap()xSo
3、nBy()、Uniono和dislincl()等,详细说明如表2-3所示。表2-3转换操作的详细说明转换操作说明map(func)将一个RDD中的每个元素都应用一个指定的函数func,返回一个新的RDDfilter(func)将一个RDD中的每个元素都应用一个指定的函数func,筛选出符合条件的元素,返回一个新的RDDflatMap(func)与m叩0类似,但flatMap()可以将每个元素映射到多个输出结果中sortBy(keyfunc,ascending.按照指定规则对RDD中的元素进行排序,并返回一个新的RDDe课题查询考试成绩排名前三的学生信息numPartitions)其中,参数ke
4、yfunc表示计算键(key)的函数;ascending(可选)用于指定键的排列顺序,默认值为TrUe,即升序排列;IWmPartiIiOns(可选)表示分区数,默认排序后的分区个数和排序之前的分区个数相等union(otherRDD)将两个RDD合并为一个新的RDD,使得新的RDD包含原来两个RDD中的所有元素distinct()去除RDD中重复的元素,返回一个新的RDD【教师】通过例子,使学生掌握SparkRDD的转换操作【例2-4创建一个RDD,执行map()操作,将RDD中的每个元素都加2,输出结果如图2-10所示。然后,执行SortByo操作,对RDD的所有元素进行排序,输出结果如图
5、2-11所示。最后,执行filter。操作,过滤出RDD中的偶数,输出结果如图2-12所示。hadoopbogon$pyspark跄!J建一个包含数字的RDDrdd=sc.parallelize(6,11,1,8,2,9,4,5)# 将RDD中的每个元素加2add-rdd=rdd.map(lambdax:x+2)# 输出加2后的RDD元素add_rdd.collect()麻序Sorted.numbers=add-rdd.sortBy(lambdax:x)# 输出排序后的RDD元素sorted_numbers.collect()# 过滤出RDD中的偶数filtered_rdd=sorted_nu
6、mbers.filter(lambdax:x%2=0)# 输出RDD中的偶数元素filtered_rdd.collect()addrdd.collect()sortednumbers.collect()(8,13,3,10,4,11,6,73,4,6,7,8,10,11,13图2-10加2后的RDD元素图2-11排序后的RDD元素filteredrdd.collect()4,6,8,101图2-12RDD中的偶数元素【例2-5】读取本地文件7usrlocalSPark/mycode/hello_spark.txt”中的数据创建RDDe然后,执行fatMap()操作,以空格为分隔符将每行字符串分
7、割成一个个单词,输出结果如图2-13所hadoopbogon$pyspark机卖取hello_spark.txt文件中的数据创建RDDIines=Sc.iextFile(file:/usr/local/spark/mycoWOrdSjdd=lines.flatMap(lambdaline:line.split()# 输出分割成单词后的RDD元素# WOrdS_rdd.(breach(PrinI)words.rdd.foreach(print)HelloSparkIloveSpark图2-13分割成单词后的RDD元素【例2-6创建两个RDD(即rddl和rdd2),执行union。操作合并rdd
8、l和rdd2,输出结果如图2-14所示。然后,执行distinct。操作,去除RDD中重复的元素,输出结果如图2-15所示.hadoop()bogon$pyspark rdd1=sc.parallelize(apple,banana,orange)rdd2=sc.parallelize(pear,grape,apple)蛤并RDDrdd3=rddl.union(rdd2)懒出合并后的RDD元素rdd3.collect() 去重 rdd-distinct=rdd3.distinct() 输出去重后的RDD元素rdd_distinct.collect()rdd3.collect()apple,ba
9、nana,orange,pear,grape,apple图2-14合并后的RDD元素rdd.distinct.COllectO(,orange,pear,applelgrape,banana图2-15去重后的RDD元素二、SParkRDD的行动操作【教师】讲解SparkRDD行动操作的相关知识行动操作是指对RDD数据集进行实际计算并返回结果的操作。常用的行动操作有CoUnl()、ColIeCt()、first。、take。、reduce。和foreach()等,详细说明如表2-4所示.表2-4行动操作的详细说明行动操作说明count()返回RDD中元素的数量collect()将RDD中的所有元
10、素收集到一个数组中,并返回该数组first()返回RDD中的第一个元素take(n)返回RDD中的前n个元素reduce(func)使用指定的函数func对RDD中的元素进行聚合计算,返回最终结果foreach(func)对RDD中的每个元素应用指定的函数func【教师】通过例子,使学生掌握SparkRDD的行动操作【例2-7创建RDD,执行不同的行动操作,代码和运行结果如图2-16所示。rdd=sc.parallelize(l,2,3,4.j)rdd.count()rdd.first()rdd.take(4)(L2,3.4)rdd.reduce(lambdaatb:aeb)120rdd.co
11、llect()1.2t3,4,53图2-16执行不同行动操作的代码和运行结果【高手点拨】Tambdaa,b:a*b是使用Python的Iambda语法定义的一个lambda函数。该函数接受两个参数a和b,并对它们执行乘法(*)操作.例2-7中,依次取出RDD的元素1和2,执行乘法操作得到2,并将得到的2赋值给a,取出RDD的元素3赋值给b,与2相乘得到6。以此执行,直到RDD的元素全部取出为止。三、SParkRDD持久化【教师】讲解SParkRDD持久化的相关知识在迭代计算中,通常需要多次使用同一组数据。如果需要多次使用同一个RDD,则每次调用都需要执行与该RDD相关的一系列转换操作,这可能导
12、致计算机消耗较大的资源。为了避免重复计算的情况,可以在Spark中设置RDD持久化.RDD持久化操作是指将一个RDD标记为持久化,首次遇到行动操作触发计算时,该RDD将会缓存在计算节点的内存中以便后续的行动操作重用。持久化RDD的方法主要有PeNiSl()和cache()使用PerSiSt()方法可以以不同的存储级别存储持久化的RDD,PySpark中可用的存储级别如表2-5所示。CaChe()方法是使用默认存储级别的快捷方法,使用该方法会调用PerSiSt(MEMORY_ONLY)。表2-5PySPark中可用的存储级别存储级别所需内存空间程度所需CPU计算时间程度是否在内存中是否在磁盘上说
13、明MEMORY.ONLY高低是否默认级别,将RDD作为反序列化的对象存储在JVM内存中.若内存不足,则按照一定原则替换缓存中的内容MEMORY_AND_DISK高中等部分部分将RDD作为反序列化的对象存储在JVM内存中。若内存不足,超出的分区会被存储在磁盘上DISK.ONLY低高否是仅将RDD分区存储在磁盘上MEM0RY_0NLY_2MEMORY_AND_DISK_2DISK_ONLY_2与上面级别相同,但在两个集群节点上复制RDD分区【教师】通过例子,加深学生对SparkRDD持久化的理解【例2-8使用CaCheo方法对RDD进行持久化,输出结果如图2-17所示。hadoopbogon$py
14、sparkrdd=sc.parallelize(JHadoop,Spark,RDD)# 调用PerSiSI(MEMORY_ONLY),由于RDD还没有计算,此时并不会缓存RDDrdd.cache()# 首次执行行动操作,触发一次从头到尾的计算,这时CaChe()被执行,把RDD放入缓存中print(rdd.count()# 第二次执行行动操作,不触发从头到尾的计算,只需要重复使用上面缓存中的RDDprint(,.join(rdd.collect()|print(rdd.count()3print(,.join(rdd.collect。)Hadoop,Spark,RDD图2/7输出结果【学生】聆
15、听讲解、认真记录【高手点拨】持久化RDD会占用内存空间,当某个RDD不再使用时,可以使用U叩ersist()方法将该RDD从缓存中移除.例如,执行代码rdd.unpersist()移除例2-8中缓存的RDDe三、SParkRDD分区【教师】讲解SparkRDD分区的相关知识SparkRDD分区是指将RDD分成多个分区,分别保存在不同的节点上,如图2/8所示。进行RDD分区的主要目的是提高作业的并行度和容错性,减少网络传输开销,优化内存利用率,从而充分利用集群中的计算资源,加快数据处理速度。图2-18SParkRDD分区RDD分区的原则是使分区的个数尽可能等于集群中的CPU核心数。RDD分区的常
16、用方法有以下几种.1.创建RDD时进行分区创建RDD时,可以通过ParaIIeHZe(C,numSIices)方法中的numSlices参数或textFiIe(name,minPartitions)的minPartitions参数指定RDD的分区数。如果没有指定分区数,则根据当前可用的CPU核心数设置默认分区数。【教师】通过例子,使学生掌握创建RDD时进彳亍分区的方法【例2-9使用ParalleIiZeO方法分别创建RDD,然后分别使用getNumPartitions()方法输出分区数,对比默认分区与手动设置分区的分区数,如图2-19所示。rdd=sc.parallelized1,2,3,4,
17、5)rdd.getNumPartitions()4rdd.numSlice=sc.parallelize(1,2,3,4,5,2)rddnumslice.getNumPartitions()2图2-19使用ParaHeIiZe()方法创建的RDD的分区数对比【例2-10使用textFile()方法分别创建RDD,然后分别使用getNumPartitions。方法输出分区数,对比默认分区与手动设置分区的分区数,如图220所示。Words=SC.tetFile(file:/usr/local/spark/mycode/hello_spark.txt,)words.getNumPartitions(
18、)2words_numSlice=sc.textFile(file:/usr/local/spark/mycode/hello.spark.txt,3)words.numslice.getNumPartitions()4图2-20使用IeXtFiIe()方法创建的RDD的分区数对比【高手点拨】当手动设置了最小分区数为3时,实际生成的分区数可能会调整为比3大的分区数。这是由于Spark根据数据处理的需求和性能优化的考虑,对分区数量进行了动态调整,但这不会影响数据的处理结果。2.创建RDD后重分区创建RDD后,也可以重新对RDD进行分区。Spark提供的CoaleSCe()和repartition
19、。方法可以重新对任何类型的RDD进行简单分区。CoaleSCe()方法使用哈希分区方式减少分区数量。该方法的基本格式如下。coalesce(numPartitions,shuffle)其中,参数的含义如下。(1)numPartitions:想要减少到的分区数。(2)shuffle:表示是否需要进行数据的洗牌。当shuffle为False(默认值)时,如果重新设置的分区数大于RDD现有的分区数,则RDD的分区数不变;当shuffle为TrUe时,如果重新设置的分区数大于RDD现有的分区数,RDD新的分区数也能重设成功。repartition。方法本质上是CoaIeSCeo方法的简单实现。该方法的
20、基本格式如下。repartition(numPartitions)【教师】通过例子,使学生掌握创建RDD后重分区的方法【例2-11创建RDD并设置分区数为2然后使用coalesce()方法并设置不同的shuffle参数值,重新对RDD进行区,输出结果如图2-21所示。rdd=sc.parallelize(1,2,3,4,5,2)rdd.getNumPartitions()2rdd.coalesce=rdd.coalesce(4)rdd.coalesce.getNumPartitions()2rdd_coalescel=rdd.coalesce(4,True)rdd-coalescel.getN
21、umPartitions()4图2-21CoaIeSCeO方法重设分区数【例2-12创建RDD并设置分区数为2,使用repartition0方法对RDD进行重分区,分区数为1,输出结果如图2-22所示。rdd=sc.parallelize(1,2,3,4,5,2)rdd.getNumPartitions()2rdd_repartition=rdd.repartition(l)rdd_repartition.getNumPartitions()1图2-22repartition。方法重设分区数【学生】聆听、思考、理解、记录课堂实践【教师】介绍“查询考试成绩排名前三的学生信息”的大概流程,安排学生
22、扫描微课二维码观看视频“查询考试成绩排名前三的学生信息“(详见教材),并要求学生进行相应操作打开PyChaEl,在rdd目录下新建,top_three_information.pyw文件,然后在该文件中编写应用程序,输出考试成绩排名前三的学生信息(包括学号、姓名、班级和考试成绩)。实现步骤如下。步骤IA配置Spark应用程序并创建SparkContext对象。步骤2A使用IeXtFiIe。方法读取HDFS文件中的考试成绩数据创建RDDe步骤3A执行map()操作按照分割符分割RDD中的数据元素,返回新的RDDe步骤4A执行firs.)操作提取出首行数据(表头),然后执行MlerO操作过滤掉首行
23、数据,只保留考试成绩数据。步骤5A执行SonBy()操作按照降序对考试成绩进行排序,然后执行IakeO操作获取前三名的成绩.步骤6A使用Printo方法输出学生的信息。首先输出表头,然后循环打印前三名学生的学号、姓名、班级和考试成绩。步骤7使用StOPo方法停止SparkContext对象,释放执行资源。.(详见教材)【学生】自行扫码观看配套微课,按照要求进行操作,如遇问题可询问老师【教师】巡堂辅导,及时解决学生遇到的问题课堂小结【教师】简要总结本节课的要点SparkRDD操作SparkRDD持久化SparkRDD分区【学生】总结回顾知识点作业布置【教师】布置课后作业(1)完成项目二项目实训中与本课相关的习题;(2)根据课堂所学知识,课后自己上机练习”查询考试成绩排名前三的学生信息”的操作。【学生】完成课后任务教学反思