国内服务器由于网络环境无法正常使用github管理项目
解决方式:
- 添加github-client源
- apt update & apt install gh
- gh auth login. # choose ssh
enjoy!
国内服务器由于网络环境无法正常使用github管理项目
解决方式:
enjoy!
Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.
基于内存,分布式计算,函数式处理
弹性分布式数据集:不可变,分布式抽象数据。使用时可以显示的将其换存在内存中以提高速度
Transformation
将一个RDD转换为另一个RDD,transfromation具有lazy load特性。需要遇到action算子时才会执行
Transformation | 含义 |
---|---|
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子 |
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) | 对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 先按分区聚合 再总的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 对k/y的RDD进行操作 |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 与sortByKey类似,但是更灵活 第一个参数是根据什么排序 第二个是怎么排序 false倒序 第三个排序后分区数 默认与原RDD一样 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD 相当于内连接(求交集) |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable |
cartesian(otherDataset) | 两个RDD的笛卡尔积 的成很多个K/V |
pipe(command, [envVars]) | 调用外部程序 |
coalesce(numPartitions**)** | 重新分区 第一个参数是要分多少区,第二个参数是否shuffle 默认false 少分区变多分区 true 多分区变少分区 false |
repartition(numPartitions) | 重新分区 必须shuffle 参数是要分多少区 少变多 |
repartitionAndSortWithinPartitions(partitioner) | 重新分区+排序 比先分区再排序效率高 对K/V的RDD进行操作 |
foldByKey(zeroValue)(seqOp) | 该函数用于K/V做折叠,合并处理 ,与aggregate类似 第一个括号的参数应用于每个V值 第二括号函数是聚合例如:_+_ |
combineByKey | 合并相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) |
partitionBy****(partitioner) | 对RDD进行分区 partitioner是分区器 例如new HashPartition(2 |
cache | RDD缓存,可以避免重复计算从而减少时间,区别:cache内部调用了persist算子,cache默认就一个缓存级别MEMORY-ONLY ,而persist则可以选择缓存级别 |
persist | |
Subtract****(rdd) | 返回前rdd元素不在后rdd的rdd |
leftOuterJoin | leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。 |
rightOuterJoin | rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可 |
subtractByKey | substractByKey和基本转换操作中的subtract类似只不过这里是针对K的,返回在主RDD中出现,并且不在otherRDD中出现的元素 |
Action
触发代码执行,一段spark代码至少需要一个action
Action | 含义 |
---|---|
reduce(func) | 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 |
saveAsObjectFile(path) | |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数func进行更新。 |
aggregate | 先对分区进行操作,在总体操作 |
reduceByKeyLocally | |
lookup | |
top | |
fold | |
foreachPartition |
Application
driver程序以及集群执行的代码
driver
创建sc,加载数据集,处理数据以及展示流程
集群节点
driver:创建sc,启动spark级集群计算任务
master:master节点
worker:集群任务执行节点,启动executor进程
Executor:执行应用程序以及汇报执行状态
cluster manager:集群资源管理器,负责申请资源调度任务,如yarn
jobs
RDD中的action,每个action变为一个job,然后DAGScheduler回分解stage执行
stage
一个job拆分为多组task,每组是一个stage
task
送到executor执行的工作单元,两类:shuffleMapTask:输出shffle所需要的数据;resultTask:无需shuffle直接返回处理结果
partition
数据划分
cores
worker执行进程
memory
内存设置
shuffle
stage之间的数据拷贝
窄依赖:每个RDD只会被一个子RDD所依赖,例如map、filter、union等操作都会产生窄依赖;(独生子女)
宽依赖:每个RDD被多个子RDD所依赖,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖;(超生)
关于join的依赖关系:
两个RDD在进行join操作时,一个RDD的partition仅仅和另一个RDD中已知个数的Partition进行join,那么这种类型的join操作就是窄依赖。否则就是宽依赖。
DAGScheduler遇到action job时,会根据宽窄依赖决定stage划分。如果遇到窄依赖会将该action加入当前stage,遇到宽依赖则创建新的stage去执行
Git commit message specification
https://www.conventionalcommits.org/en/v1.0.0/
https://www.ruanyifeng.com/blog/2016/01/commit_message_change_log.html
1 | <type>[optional scope]: <description> |
Types:
1 | abaliable types: |
1 | feat: allow provided config object to extend other configs |
!
to draw attention to breaking change1 | feat!: send an email to the customer when a product is shipped |
!
to draw attention to breaking change1 | feat(api)!: send an email to the customer when a product is shipped |
!
and BREAKING CHANGE footer1 | chore!: drop support for Node 6 |
1 | docs: correct spelling of CHANGELOG |
1 | feat(lang): add Polish language |
1 | fix: prevent racing of requests |
idea plugins: git commit template
hangelog generate & commit message validate
如果我们要在分布式计算里面分发大对象,例如:字典,集合,黑白名单等,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么知识每个executor拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。
1 | # create |
1、能不能将一个RDD使用广播变量广播出去?
不能,因为RDD是不存储数据的。**可以将RDD的结果广播出去。**
2、 广播变量只能在Driver端定义,不能在Executor端定义。
3、 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。
5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
在spark应用程序中,我们经常会有这样的需求,如异常监控,调试,记录符合某特性的数据的数目,这种需求都需要用到计数器,如果一个变量不被声明为一个累加器,那么它将在被改变时不会再driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。
1 | # create |
累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。
linux安装ss客户端与kcp并开启全局代理
1 | # 行复制:光标移动到需要复制的行,输入下面命令 |
Git 是一个开源的分布式版本控制系统,用于敏捷高效地处理任何或小或大的项目。Git 是 Linus Torvalds 为了帮助管理 Linux 内核开发而开发的一个开放源码的版本控制软件。Git 与常用的版本控制工具 CVS, Subversion 等不同,它采用了分布式版本库的方式,不必服务器端软件支持。
Git 一般只添加数据, 你执行的 Git 操作,几乎只往 Git 数据库中增加数据。 很难让Git 执行任何不可逆操作,或者让它以任何方式清除数据。 同别的 VCS 一样,未提交更新时有可能丢失或弄乱修改的内容;但是一旦你提交快照到 Git 中,就难以再丢失数据,特别是如果你定期的推送数据库到其它仓库的话。
本文介绍如何使用Markdown绘制UML图