深入浅出Flink:功能、特性及核心实现

深入浅出系列

深入浅出Flink:功能、特性及核心实现

Flink 是一个开源的流处理框架,旨在处理无界和有界数据流,凭借其流批一体的设计、高性能的执行引擎和完善的容错机制,成为当前实时计算领域的主流技术。本文介绍了 Flink 的功能、特点及其核心架构与算法。

一、核心功能

1. 流处理 (Stream Processing)

A. 实时数据处理: 支持高吞吐、低延迟的实时数据处理,延迟可低至亚毫秒级,吞吐量可达每秒百万级事件,满足企业级实时业务需求(如实时监控、实时风控),实现毫秒级延迟的事件处理。

B. 有状态计算: 在流中维护和管理状态,提供内置的容错性状态存储,支持算子状态的持久化和故障恢复,无需依赖外部存储即可实现有状态计算(如累计计数、会话维护、实时聚合)。

C. 事件时间处理: 基于事件产生时间而非处理时间,原生支持基于事件时间(Event Time)的窗口计算,以数据实际发生的时间为准进行计算,确保计算结果的准确性和可复现性(如跨时区数据处理、乱序数据校准)。

D. 精确一次语义 (Exactly-Once): 端到端的一致性保证,通过检查点(Checkpoint)机制,保证数据处理的精确一次语义(Exactly-Once),即每条数据被且仅被正确处理一次,结合两阶段提交(2PC)实现端到端全链路一致性。

E. 支持多种数据源接入,包括 Kafka、Pulsar、RabbitMQ 等消息队列,以及 CDC(变更数据捕获)、日志文件、Socket 等,适配各类实时数据场景。

2. 批处理 (Batch Processing)

A. 统一批流引擎: 将批处理视为流处理的特例,把有界数据流(如历史业务数据、离线报表数据、批量日志)当作“有限长度的无界流”处理,统一流批处理模型,同一API处理有界和无界数据。

B. 批处理优化: 针对有界数据的特殊优化策略,复用流处理的核心引擎和 API,无需单独开发批处理逻辑,实现“一套代码、两种场景”,简化开发流程,降低维护成本,彻底解决传统 Lambda 架构的复杂性;支持数据分区复用、任务并行度动态调整,确保批处理任务的高效执行,性能不逊于传统批处理框架(如 Hadoop MapReduce)。

3. 复杂事件处理 (CEP – Complex Event Processing)

A. 模式匹配: 在事件流中检测复杂模式,通过 CEP 库提供的 API,可自定义复杂事件模式(如序列模式、组合模式),实现对实时事件流的复杂规则匹配,适用于实时风控、异常检测等场景。

B. 时间窗口关联: 跨时间窗口的事件关联分析,结合窗口机制,实现不同时间窗口内事件的关联计算,精准捕捉跨时段的复杂业务场景(如用户连续操作行为分析、设备异常序列检测)。

4. 机器学习集成 (FlinkML)

A. 实时特征工程: 流式特征提取与转换,集成 Flink ML 机器学习库,提供常用的特征处理算子,可实时从数据流中提取、转换特征,支撑在线机器学习模型的特征输入。

B. 在线学习: 模型实时更新与推理,支持基于实时数据流的在线机器学习,可实时更新模型参数,适用于实时推荐、实时风控等场景;支持与第三方机器学习框架(如 TensorFlow、PyTorch)集成,实现端到端的实时智能分析。

C. 提供常用的机器学习算法(如分类、回归、聚类、推荐),覆盖主流机器学习场景,适配流式数据处理需求。

5. 图计算 (Gelly)

A. 批量图处理: 基于迭代的图算法,内置 Gelly 图处理库,支持批量图计算任务(如最短路径、图聚合、社区发现),可处理大规模静态图数据,适用于社交网络分析、知识图谱构建等场景。

B. 流式图处理: 动态图更新与分析,支持动态图的实时更新(如节点、边的新增、删除),可实时分析动态变化的图数据,适配实时社交网络、实时知识图谱等场景。

6. SQL与Table API

A. 统一SQL引擎: 流批一体SQL查询,提供高级 API(SQL、Table API),采用标准 SQL 语法,支持流式 SQL 查询和批式 SQL 查询,统一流批 SQL 执行引擎,实现“一套SQL,流批通用”。

B. 声明式API: 高层次的表操作抽象,Table API 作为声明式 API,屏蔽底层执行细节,简化实时数据处理开发,无需编写复杂的底层代码,降低开发门槛,适合数据分析人员和业务开发人员使用。

C. Table API 与 DataStream API 可相互转换,支持混合使用,既可以利用 SQL 的便捷性,也可以通过 DataStream API 实现复杂的业务逻辑。

D. 支持标准 SQL 函数、自定义函数(UDF、UDTF、UDAF),以及与 Hive 等数据仓库的集成,实现流批一体的 SQL 分析。

二、核心特点

1. 低延迟高吞吐:每秒处理数百万事件,延迟可达毫秒级;基于原生流处理引擎,采用流水线执行模式,结合算子链优化,减少线程切换和网络开销,确保高吞吐与低延迟并存。

2. 精确一次语义:端到端Exactly-Once状态一致性;通过Checkpoint分布式快照和两阶段提交(2PC)机制,确保数据从数据源到输出端全链路不丢不重,适用于金融、交易等核心场景。

3. 有状态计算:内置强大的分布式状态管理机制;支持键控状态、算子状态等多种状态类型,搭配多种状态后端,可支撑TB级超大状态,支持状态TTL、压缩、增量快照等优化。

4. 事件时间语义:支持乱序事件和迟到数据处理;通过Watermark水位线机制感知事件时间进度,可灵活处理乱序数据,支持迟到数据侧输出、窗口允许迟到等策略,确保计算结果准确。

5. 背压机制:自动流量控制防止系统过载;采用Credit-based Flow Control(基于信用值的流控)机制,实时反馈缓冲区状态,避免数据堆积导致的OOM,确保系统稳定运行。

6. 容错恢复:基于Checkpoint的快速故障恢复;采用Chandy-Lamport分布式快照算法,异步生成全局一致性快照,故障后可快速恢复状态,支持并行恢复、本地恢复,提升恢复效率。

7. 水平扩展:无缝扩展到数千个节点;基于Key Group机制实现状态动态重分布,支持任务在线扩缩容,可通过增加TaskManager节点和Slot数量,线性提升处理能力。

8. 统一批流:同一套API和引擎处理批与流;将批视为有界流、流视为无界流,复用核心引擎、API和容错机制,一套代码可适配两种场景,简化架构并降低维护成本。

9. 易用性:支持多种编程语言开发,包括 Java、Scala、Python(Flink Python API,又称 PyFlink)、SQL,适配不同开发人员的技术栈,降低学习和使用成本;提供完善的开发工具和监控体系,如 Flink Web UI,便于问题排查和性能优化。

10. 可扩展性:支持自定义算子、自定义状态后端、自定义数据源和 Sink,可根据业务需求扩展 Flink 的功能;支持多种部署模式,适配不同的运维环境,支持云原生部署。

三、核心架构

1. 运行时架构

Flink 运行时采用 Master-Worker 主从架构,主要由 JobManager、TaskManager、ResourceManager 和 Client 四大组件组成,各组件分工明确、协同工作,确保任务高效执行和集群稳定运行。

核心组件包括:

JobManager
A. 职责:集群协调、作业调度、故障恢复;将用户程序编译为执行计划,调度任务到TaskManager,协调Checkpoint,检测故障并触发恢复。
B. 关键算法/机制:Chandy-Lamport 分布式快照、作业调度算法

TaskManager
A. 职责:执行任务、维护本地状态;接收JobManager分配的任务,执行算子逻辑,管理本地状态,负责任务间网络传输。
B. 关键算法/机制:Actor模型 消息传递、状态后端存储、算子链优化

ResourceManager
A. 职责:资源分配、动态扩缩容;管理集群资源,接收JobManager的资源申请,分配Task Slot,回收空闲资源。
B. 关键算法/机制:Slot共享机制、延迟调度、动态资源分配算法

CheckpointCoordinator
A. 职责:协调分布式快照;触发Checkpoint,协调各算子完成快照,确认全局快照成功。
B. 关键算法/机制:Barrier对齐/非对齐算法、异步快照机制

Client
A. 职责:提交作业、编译执行计划;将用户代码转换为JobGraph,提交给Dispatcher,提供日志查看和任务监控功能。
B. 关键算法/机制:执行计划编译、作业提交机制

2. 数据流引擎架构

Flink 程序的执行过程可抽象为数据流图,由 Source、Transformation、Sink 三大核心算子组成,数据从 Source 进入,经过一系列 Transformation 处理,最终由 Sink 输出到外部系统,形成完整的数据流链路;核心优化包括算子链合并、数据分区策略、背压机制,确保数据流高效、稳定传输。

A. Source(数据源算子):读取外部数据,转换为Flink可处理的数据流,支持并行读取,常用Source包括Kafka Source、CDC Source等。

B. Transformation(数据处理算子):对数据流进行过滤、转换、聚合、关联等处理,支持并行执行,常用算子包括Map、KeyBy、Window等,可通过算子链优化减少开销。

C. Sink(数据输出算子):将处理后的数据输出到外部系统,支持事务性输出,确保Exactly-Once语义,常用Sink包括Kafka Sink、JDBC Sink等。

3. 状态后端架构

状态后端是 Flink 用于存储和管理状态的核心组件,负责状态的持久化、读取和恢复,不同的状态后端适用于不同的业务场景,可通过配置灵活选择。

MemoryStateBackend
A. 存储介质:JVM Heap
B. 适用场景:测试、小状态(KB级、MB级),轻量快速,无磁盘I/O开销,非生产环境适用。

FsStateBackend
A. 存储介质:本地磁盘 + 异步HDFS
B. 适用场景:大状态、高吞吐,兼顾性能与可靠性,适合生产环境中的中小规模任务(GB级)。

RocksDBStateBackend
A. 存储介质:RocksDB (LSM-Tree)
B. 适用场景:超大状态、增量Checkpoint,支持TB级状态,是生产环境的主流选择,适配高吞吐、大状态场景。

RocksDBStateBackend 核心架构
基于LSM-Tree(日志结构合并树)实现,核心结构分为四层,兼顾性能与存储容量:

A. MemTable (Active/Immutable):内存跳表,采用O(logN)写入速度,Active MemTable用于接收新写入的状态数据,满额后转为Immutable MemTable,等待刷盘。

B. Level 0:直接从Immutable MemTable刷盘生成,文件间可能存在重叠,读取时需遍历多个文件。

C. Level 1-N:大小层结构,层内文件不重叠,层间容量呈十倍差异,确保读取效率。

D. Compaction:合并排序机制,定期将低层文件合并到高层,减少读放大,优化读取性能。

Flink针对RocksDB的特定优化:支持状态TTL(Time-To-Live)自动清理过期状态;支持State Migration实现状态格式版本兼容;支持基于SST文件的增量Checkpoint,仅存储状态变更,减少快照开销。

4. 时间语义架构

Flink 支持三种时间语义,核心基于事件时间构建,通过Watermark机制实现乱序数据处理,确保时间语义的准确性和灵活性:

A. 事件时间(Event Time):数据实际发生的时间,是Flink默认且推荐的时间语义,确保计算结果可复现、可对账,适用于跨时区、乱序数据场景。

B. 处理时间(Process Time):数据被算子处理的时间,延迟最低,但受集群负载影响,结果不可复现,适用于对结果准确性要求不高的场景。

C. 摄入时间(Ingestion Time):数据进入Flink系统的时间,介于事件时间和处理时间之间,兼顾延迟与准确性。

D. 核心支撑:Watermark水位线机制,用于感知事件时间进度,触发窗口计算;窗口机制,用于按时间或数量对数据流进行分窗处理,实现聚合计算。

四、核心算法

1. 分布式快照算法 (Checkpointing)

核心算法:Chandy-Lamport 算法 (Flink改进版),是Flink Checkpoint机制的底层核心,用于在分布式系统中捕获全局一致性状态,为容错恢复和Exactly-Once语义提供支撑。

算法流程:

A. Checkpoint Coordinator 向所有Source注入 Barrier,Barrier作为快照边界,与数据流并行传输。

B. Barrier 随数据流传播,将数据流分为前后两个快照周期,确保快照数据的一致性。

C. 算子收到所有输入的Barrier后,异步快照本地状态,不阻塞正常的数据处理。

D. 状态持久化到分布式存储 (HDFS/S3),根据状态后端类型选择存储介质。

E. 算子完成快照后,通知 Coordinator 完成,Coordinator 确认所有算子快照完成后,标记该Checkpoint成功。

优化变体:

A. 对齐Checkpoint (Aligned): 阻塞等待所有输入流的Barrier到达,保证精确一次语义,适用于对一致性要求高的场景。

B. 非对齐Checkpoint (Unaligned): Barrier超越数据,优先完成快照,减少反压对快照的影响,适用于高吞吐、高反压场景。

关键优化技术:

A. 增量Checkpoint: 仅存储状态变更 (基于RocksDB的增量备份),大幅减少快照数据量和存储开销。

B. 本地恢复: 优先从本地磁盘恢复状态,减少网络传输,提升恢复效率。

C. 异步快照: 状态拷贝与数据处理并行,不影响任务的低延迟特性。

2. 水印与窗口算法 (Watermark & Windowing)

水印传播算法

水印生成策略:

A. Periodic Watermarks: 周期性地生成当前最大时间戳 – 延迟,适用于大多数乱序场景,可灵活调整周期和延迟。

B. Punctuated Watermarks: 基于特定事件触发,当检测到特定标记事件时生成水印,适用于数据乱序程度不稳定的场景。

C. 允许迟到数据: 通过sideOutputLateData()方法,将迟到数据收集到侧输出流,避免数据丢失,同时不影响窗口正常计算。

水印传播规则:

A. 多输入流: 取所有输入流的最小水印 (Min-Watermark),确保所有输入流的数据都被正确处理。

B. 广播: 广播水印到所有下游算子,确保下游所有并行实例的时间进度一致。

C. 分区: 按分区维护水印,不同分区的水印独立传播,适配数据分区处理场景。

窗口机制

窗口类型:

A. Tumbling Window (滚动窗口): 固定大小,不重叠,如每5分钟一个窗口,适用于周期性统计(如每小时统计订单量)。

B. Sliding Window (滑动窗口): 固定大小,可重叠,如每5分钟一个窗口,滑动步长为2分钟,适用于需要连续统计的场景(如实时监控最近5分钟的异常数据)。

C. Session Window (会话窗口): 动态大小,基于活动间隙(无数据到达的时间)划分窗口,当会话间隔内无数据时,窗口关闭,适用于用户会话分析(如用户一次浏览行为)。

D. Global Window (全局窗口): 全局统一窗口,所有数据进入同一个窗口,需自定义触发机制,适用于特殊业务场景。

窗口触发与清理:

A. Trigger: 决定何时计算并发射窗口结果,支持内置触发(如水印触发)和自定义触发。

B. Evictor: 窗口计算前/后移除数据,可自定义数据清理规则,减少内存开销。

C. AllowedLateness: 允许迟到数据更新窗口,配置窗口允许迟到时间,超出时间的迟到数据将被侧输出。

3. 网络流控算法 (Backpressure)

核心算法:Credit-based Flow Control (基于信用值的流控),用于自动流量控制,防止系统过载,确保数据流稳定传输。

机制:

A. 接收方 (InputGate) 维护可用缓冲区数量 (Credit),表示可接收的数据量。

B. 发送方 (ResultPartition) 仅发送Credit允许的数据量,不超过接收方缓冲区上限。

C. 接收方处理完数据后,返还Credit给发送方,更新可用缓冲区数量。

D. 零Credit时发送方停止发送,形成反压,避免数据堆积导致OOM。

优势:

A. 精确控制: 基于实际缓冲区状态而非延迟估计,流控更精准。

B. 无级联反压: 精确到子分区级别,避免反压在集群内级联扩散。

C. 快速响应: 实时反馈缓冲区状态,快速调整发送速率,确保系统稳定。

4. 状态管理算法

RocksDB 调优算法

基于LSM-Tree结构的核心优化,适配Flink超大状态存储需求:

LSM-Tree 结构:

A. MemTable (Active/Immutable): 内存跳表,O(logN)写入速度,Active MemTable接收新数据,Immutable MemTable等待刷盘。

B. Level 0: 直接从Immutable MemTable刷盘,文件间可能重叠,读取需遍历多个文件。

C. Level 1-N: 大小层,层内文件不重叠,层间十倍大小差,优化读取效率。

D. Compaction: 合并排序,减少读放大,定期将低层文件合并到高层,清理过期数据。

Flink特定优化:

A. TTL (Time-To-Live): 状态过期自动清理,减少无效状态存储开销。

B. State Migration: 状态格式版本兼容,支持Flink版本升级时的状态平滑迁移。

C. Incremental Checkpoint: 基于SST文件的增量备份,仅存储状态变更,提升快照效率。

状态恢复与分区算法

A. 状态恢复算法: 基于Checkpoint或Savepoint快照数据,通过状态后端读取快照文件,将每个算子的状态恢复到故障前的一致状态;对于Keyed State,通过Key Group机制将状态均匀分配到新的Task中,实现并行恢复,支持增量恢复、并行恢复优化。

B. 状态分区算法: 基于Key Group机制,将Keyed State按Key的哈希值划分为多个Key Group,每个Key Group对应一个Task Slot,实现状态的并行存储和处理;当任务扩缩容时,Key Group会重新分配,确保状态的均衡分布。

5. 调度算法 (Scheduling)

核心优化:延迟调度与槽位共享,提升资源利用率和任务执行效率。

Slot Sharing Group:

A. 将不同Task放入同一Slot,减少网络传输,提升资源利用率。

B. 默认规则: 相同并行度的算子链可共享同一个Slot,无需额外配置。

调度策略:

A. Eager Scheduling: 立即分配所有资源,适用于小规模、短任务,启动速度快。

B. Lazy from Sources: 按需分配,从Source开始逐步分配资源,适用于大规模、长任务,提升资源利用率。

C. Region-based: 基于Pipeline Region的细粒度调度,将作业划分为多个Region,按依赖关系调度,提升并行度和执行效率。

6. 数据分区算法 (Partitioning)

Forward(正向分区)
A. 算法描述:一对一,同一Slot内传输,无数据分发
B. 适用场景:算子链优化,相邻算子合并执行场景

Shuffle(随机分区)
A. 算法描述:随机均匀分布,将数据随机发送到下游Task
B. 适用场景:负载均衡,需要均匀分配数据的场景

Rebalance(轮询分区)
A. 算法描述:Round-Robin轮询,依次将数据发送到下游Task
B. 适用场景:均匀分配数据,提升整体吞吐量

Rescale(本地轮询分区)
A. 算法描述:本地轮询,仅在同一TaskManager内分发数据
B. 适用场景:并行度改变、本地数据处理,减少网络传输

Broadcast(广播分区)
A. 算法描述:复制到所有并行实例,每个下游Task都接收完整数据
B. 适用场景:小数据广播(如配置数据)、全局规则分发

KeyBy(按Key分区)
A. 算法描述:Hash(Key) % parallelism,相同Key的数据进入同一个Task
B. 适用场景:分组聚合、Keyed State管理,确保同一Key的状态一致

Custom(自定义分区)
A. 算法描述:用户自定义Partitioner,按业务规则分发数据
B. 适用场景:特殊业务需求,需自定义数据分发逻辑

KeyBy Hash算法: 对数据的Key进行哈希计算,得到哈希值后对并行度取模,确保相同Key的数据流进入同一个Task,从而保证Keyed State的一致性和连续性;哈希函数采用高效的一致性哈希,减少数据倾斜。

五、容错与一致性机制

1. 故障恢复机制

Flink的故障恢复机制基于Checkpoint和Savepoint,结合主从架构的高可用设计,确保故障后快速恢复,不影响业务连续性:

A. JobManager高可用(HA):通过ZooKeeper等协调工具实现主备JobManager切换,避免单点故障,确保集群7×24小时稳定运行。

B. Checkpoint恢复:故障发生后,JobManager从最近成功的Checkpoint中读取全局状态快照,重新调度任务,TaskManager从状态后端恢复本地状态,继续执行任务,无需重新处理全部数据。

C. Savepoint恢复:手动触发的Savepoint可用于任务迁移、版本升级、集群扩容等场景,恢复时可指定Savepoint路径,实现任务断点续跑。

D. 本地恢复优化:优先从TaskManager本地磁盘恢复状态,减少网络传输,提升恢复效率;对于RocksDB状态后端,可直接读取本地RocksDB文件恢复状态。

2. 两阶段提交 (2PC) – 端到端Exactly-Once

用于实现端到端的Exactly-Once语义,协调Flink内部状态与外部系统的事务,确保数据从数据源到输出端全链路不丢不重。

参与方:
A. Coordinator: Flink JobManager,负责协调整个事务流程,触发Checkpoint和事务提交/回滚。
B. Transaction Manager: 外部系统 (Kafka/DB),负责管理外部系统的事务,接收Flink的提交/回滚指令。
C. Participants: Flink Sink算子,负责与外部系统交互,执行预提交、提交、回滚操作。

阶段:
A. Pre-commit: Sink算子将处理后的数据刷写至外部系统,预提交事务,此时数据处于不可见状态。
B. Checkpoint: Flink执行Checkpoint,生成全局状态快照,确保Flink内部状态与外部系统预提交数据一致。
C. Commit: Checkpoint成功后,Coordinator通知所有Sink算子和Transaction Manager,正式提交事务,数据变为可见状态。
D. Abort: Checkpoint失败时,Coordinator通知所有参与者回滚事务,丢弃预提交的数据,确保数据一致性。

支持的外部系统: Kafka (0.11+)、JDBC(MySQL、PostgreSQL等)、HDFS等支持事务的存储系统。

六、性能优化技术

1. 算子链优化 (Operator Chaining)

将相邻的算子合并为一个任务执行,减少线程切换、序列化/反序列化和网络传输开销,提升任务执行效率。

条件:
A. 相同并行度,确保算子间数据传输无需重新分区。
B. 一对一分区 (Forward),数据无需跨Slot、跨节点传输。
C. 同一Slot Sharing Group,确保算子可共享同一个Slot资源。
D. 无用户自定义的断链配置,用户未手动禁止算子链合并。

效果:
A. 减少线程切换,降低CPU开销。
B. 减少序列化/反序列化操作,提升数据传输效率。
C. 减少网络传输,避免跨节点、跨Slot的数据传输开销。

2. 异步Checkpoint调优
A. 异步快照:将状态快照的生成与数据处理并行执行,仅在Barrier对齐时产生极短停顿,不影响任务的低延迟特性。

B. 增量Checkpoint:仅存储状态的变更部分,而非全量状态,大幅减少快照数据量和存储开销,尤其适合超大状态场景。

C. Checkpoint并行度:配置Checkpoint的并行度,多个Task同时执行快照生成,提升快照效率。

D. Checkpoint间隔优化:根据业务延迟需求和状态大小,合理设置Checkpoint间隔,平衡容错性和性能。

3. 内存管理

Flink采用自主内存管理机制,脱离JVM堆内存限制,减少GC压力,避免OOM,确保任务长时间稳定运行。

内存区域:
A. Network Memory: 网络缓冲,用于任务间数据传输,基于Credit-based流控机制管理,确保网络传输稳定。
B. Managed Memory: 管理内存,供RocksDB、排序、哈希等操作使用,可灵活配置大小,支持堆外内存。
C. JVM Heap: JVM堆内存,用于存储用户对象及非RocksDB的状态数据,通过内存优化减少GC停顿。

优化:
A. 堆外内存减少GC压力,将大量数据存储在堆外,避免JVM GC对任务执行的影响。
B. 自主内存管理避免OOM,通过内存分区、内存限额等机制,合理分配内存资源,防止内存溢出。
C. 内存复用:对排序、哈希等操作的内存进行复用,提升内存利用率。

七、生态集成架构

Flink拥有完善的生态系统,可与各类数据存储、消息队列、计算框架集成,适配不同业务场景,降低开发和运维成本:

1. 消息队列集成:支持Kafka、Pulsar、RabbitMQ等主流消息队列,可作为Source读取数据或作为Sink输出数据,支持事务性输出。

2. 数据存储集成:支持HDFS、HBase、Elasticsearch、Redis、MySQL、PostgreSQL等,可读取数据进行处理或输出处理结果。

3. 数据仓库集成:与Hive深度集成,支持Hive SQL查询、Hive表读写,实现流批一体的数仓建设。

4. 机器学习框架集成:支持与TensorFlow、PyTorch等第三方机器学习框架集成,实现实时特征工程、在线模型推理。

5. 部署平台集成:支持Standalone、YARN、Kubernetes、Mesos等部署模式,适配云原生、容器化运维场景。

6. 监控工具集成:支持与Prometheus、Grafana、ELK等监控工具集成,实时监控任务执行状态、吞吐量、延迟等指标。

八、版本演进关键特性

1.0版本:稳定流处理API,奠定Flink流处理的基础,提供基本的流处理能力和容错机制。

1.2版本:Async I/O,支持异步访问外部存储,不阻塞计算;Table API初步引入,提供声明式查询能力。

1.4版本:端到端Exactly-Once语义正式支持;非对齐Checkpoint预览,优化高反压场景的快照效率。

1.9版本:统一Table API (Blink Planner合并),提升SQL执行效率,实现流批一体的SQL查询。

1.11版本:原生Kubernetes支持,适配云原生部署;内存配置简化,降低运维成本。

1.12版本:纯SQL流批一体,DataStream API批执行,彻底统一流批处理引擎;PyFlink性能提升。

1.13版本:被动扩缩容,支持根据负载自动调整任务并行度;SQL MATCH_RECOGNIZE,增强CEP SQL能力。

1.14版本:内存网络缓冲解耦,提升内存利用率;检查点改进,优化故障恢复效率。

1.15版本:检查点进一步改进,支持增量Checkpoint优化;云原生优化,提升Kubernetes部署体验。

1.16+版本:自适应调度,根据任务负载动态调整资源分配;云原生自动伸缩,适配弹性云环境。

九、总结

Flink通过分层架构设计(API层→Table层→Runtime层)、Chandy-Lamport分布式快照算法、LSM-Tree状态管理、Credit-based流控等核心技术,实现了低延迟、高吞吐、精确一次的流批一体计算能力。其事件时间语义和背压机制是区别于其他流处理引擎的关键差异化优势。

Flink 作为开源的流批一体计算框架,其核心优势在于将无界流和有界流统一到同一套处理模型中,凭借完善的核心功能(流处理、批处理、CEP、机器学习、图计算等)、优秀的核心特点、坚实的核心架构和高效的核心算法,成为当前实时计算领域的首选技术。

无论是实时数仓、金融风控、物联网监控,还是实时推荐、机器学习,Flink 都能凭借其灵活的 API、强大的处理能力、良好的可扩展性和完善的生态集成,适配各类业务场景,助力企业实现实时化、智能化的数据处理。

如果觉得这篇文章对你有帮助,欢迎点赞、收藏,也可以在评论区留言,聊聊你在使用Flink 时遇到的问题~

深入浅出Spark:功能、特性与核心实现

深入浅出系列

深入浅出Spark:功能、特性与核心实现

在大数据处理领域,Spark早已成为不可或缺的核心引擎。自2009年诞生于加州大学伯克利分校的AMPLab,到2014年成为Apache基金会顶级项目,Spark凭借其卓越的性能和灵活的架构,逐步取代传统MapReduce,成为数千家企业(包括80%的财富500强)处理大规模数据的首选框架。今天,我们就来全面拆解Spark的核心功能、独特特点、核心架构、数据抽象、算法机制、核心组件、优化技术、生态集成及演进趋势,带你读懂这款“大规模数据分析的统一引擎”背后的底层逻辑。

一、核心功能:覆盖全场景大数据处理需求

Spark的核心价值在于“统一”与“高效”,打破了传统大数据处理中各类场景的壁垒,提供一套完整技术栈,无需切换框架即可完成从数据采集到分析、建模、部署的全流程,核心涵盖五大功能:

1. 批处理计算

A. 大规模数据集的离线计算:专注于PB级静态数据的离线处理,广泛应用于历史日志分析、离线报表生成、批量数据ETL等场景,替代传统MapReduce实现高效离线计算。

B. 支持复杂的数据转换和分析:通过丰富的算子(map、reduce、join、filter等),可轻松实现多步骤、复杂逻辑的数据转换与深度分析,适配各类离线业务需求。

2. 流处理

A. 实时数据流处理:支持Kafka、Flume等多种实时数据源,能够持续接收并处理用户行为日志、实时交易数据、物联网设备数据等,满足实时监控、实时风控等需求。

B. 微批处理模式:通过Spark Streaming将实时流切分为短小批处理作业,实现高吞吐量、可容错的实时处理,延迟可低至秒级。

C. 结构化流处理:基于Structured Streaming实现,将流数据视为无限增长的表,支持SQL查询,实现批流语法统一,提升流处理易用性和一致性。

3. 交互式查询

A. Spark SQL支持SQL查询:内置Spark SQL组件,可直接编写标准SQL语句对结构化数据进行查询,无需编写复杂分布式代码,适配数据分析师的使用习惯。

B. 低延迟的交互式分析:依托内存计算和优化引擎,即便面对TB级结构化数据,也能快速返回查询结果,支持Spark Shell交互式编程,便于开发者实时探索数据。

4. 机器学习

A. MLlib机器学习库:Spark内置的分布式机器学习库,封装了丰富的算法,无需手动实现分布式逻辑,降低大规模机器学习开发门槛。

B. 支持完整的机器学习流程:覆盖特征工程、模型训练、模型评估、模型部署全流程,适配分类、回归、聚类、协同过滤等各类数据挖掘场景。

5. 图计算

A. GraphX图计算库:专门用于处理海量图数据的组件,适配社交网络、知识图谱、路网数据、金融关联网络等场景。

B. 支持图算法和图处理:提供PageRank、最短路径、连通分量等经典图算法,以及顶点操作、边操作、图遍历等基础功能,实现大规模图数据的高效处理。

二、核心特点:五大优势奠定行业地位

Spark之所以能成为大数据处理的事实标准,核心在于具备高性能、易用性、通用性、容错性、兼容性五大核心特点,相互支撑适配不同规模、不同场景的需求:

1. 高性能

A. 基于内存计算,比Hadoop MapReduce快10-100倍:中间结果优先驻留内存,避免频繁磁盘IO,大幅提升迭代计算和多步骤计算的效率。

B. 支持DAG执行引擎:替代MapReduce固定的“Map→Shuffle→Reduce”流程,可根据任务逻辑动态优化执行计划,减少不必要的计算步骤。

2. 易用性

A. 支持多种语言(Scala, Java, Python, R):兼容主流编程语言,开发者可使用熟悉的语言进行开发,无需学习新语法,降低学习成本。

B. 丰富的API和高级算子:封装复杂的分布式计算逻辑,通过简单的API调用即可实现复杂数据处理,代码量比Hadoop大幅减少。

3. 通用性

A. 一站式解决多种计算场景:批处理、流处理、交互式查询、机器学习、图计算共享底层引擎,无需维护多套独立系统。

B. 统一的技术栈:各功能模块无缝集成,减少数据在不同框架间的传输开销,提升整体处理效率,实现“一站式”大数据处理。

4. 容错性

A. 基于RDD的容错机制:通过RDD Lineage(血统)记录数据生成过程,数据丢失后可反向追溯重算,无需额外数据复制。

B. 支持数据复制和检查点:关键数据可配置多副本存储,同时支持Checkpoint机制,将数据持久化至外部存储,截断长血统链,降低容错成本。

5. 兼容性

A. 支持多种数据源(HDFS, HBase, Cassandra等):可灵活读取和写入不同存储介质、不同格式的数据,适配各类数据存储场景。

B. 与Hadoop生态系统无缝集成:可直接复用Hadoop的存储资源(HDFS)和集群资源(YARN),无需改造现有系统,降低迁移和部署成本。

三、核心架构:构建高效分布式计算骨架

Spark采用分层架构设计,由集群管理器、执行引擎架构、存储体系三部分组成,各组件分工明确、协同工作,支撑各类功能稳定运行:

1. 集群管理器

负责整个集群的资源分配和管理,连接Driver和Worker节点,支持四种部署模式,适配不同基础设施环境:

A. Standalone:Spark自带的独立集群模式,部署简单、配置便捷,适合小规模集群或测试环境。

B. YARN:Hadoop生态中的资源管理框架,Spark可作为YARN的应用运行,适合大规模生产环境,与Hadoop生态无缝兼容。

C. Mesos:通用集群资源管理框架,支持多种应用(Spark、Hadoop等)的资源调度,适合多租户、多应用共存场景。

D. Kubernetes:容器化集群管理平台,实现Spark容器化部署、弹性伸缩,适配云原生环境。

2. 执行引擎架构(主从模式)

采用经典主从(Master-Slave)模式,由多个组件协同完成任务调度、分配和执行:

A. Driver Program: 主控程序,整个Spark应用的“大脑”,运行用户main函数,负责生成执行计划、调度任务、监控执行状态。

B. SparkContext: 应用入口点,Driver核心组件,负责创建RDD、启动任务、与Cluster Manager通信申请资源,管理应用生命周期。

C. Cluster Manager: 资源管理器,集群资源管理的“中枢”,负责CPU、内存等资源的统一分配和管理,监控Executor状态。

D. Worker Node: 工作节点,集群中的从节点,负责运行Executor进程,提供计算资源,接收并执行Driver分配的任务。

E. Executor: 执行进程,运行在Worker Node上的独立JVM进程,负责执行具体Task任务,管理本地数据缓存,与其他Executor交换数据。

F. Task: 最小执行单元,每个Task对应一个RDD分区的处理逻辑,由Executor线程池并发执行。

3. 存储体系

采用多级别存储协同模式,兼顾计算效率和数据可靠性,支撑数据存储和缓存需求:

A. 内存存储:核心存储级别,用于缓存频繁访问的RDD数据和计算中间结果,减少磁盘IO,提升计算速度。

B. 磁盘存储:用于持久化不需要频繁访问但需长期保存的数据(如Checkpoint数据、RDD磁盘持久化副本),避免内存溢出,保障数据可靠性。

C. 外部存储系统集成:与HDFS、HBase、Cassandra等外部存储系统无缝集成,可直接读取和写入数据,无需额外数据迁移。

四、核心数据抽象:Spark数据处理的基础

数据抽象是Spark进行数据处理的核心基础,提供三层核心抽象,分别适配不同数据处理场景,层层优化易用性和效率:

1. RDD (Resilient Distributed Datasets)

A. 弹性分布式数据集:Spark最基础、最核心的数据抽象,是所有功能的基石,适用于各类批处理场景。

B. 核心数据抽象基础:支撑Spark所有上层组件(Spark SQL、MLlib等)的运行,定义了数据的分布式存储和处理规范。

C. 特性:不可变(一旦创建无法修改,转换操作生成新RDD)、分区(数据分片并行处理)、容错(通过Lineage机制实现高效容错)。

2. DataFrame/Dataset

A. 结构化数据抽象:基于RDD构建,带有Schema(数据结构)信息,类似于关系型数据库的表,适配结构化数据处理场景。

B. 支持SQL查询:兼容Spark SQL,可直接通过SQL语句进行查询分析,提升结构化数据处理的易用性。

C. 类型安全(Dataset):Dataset是DataFrame的增强版,支持编译时类型检查,避免运行时数据类型异常,采用Tungsten二进制编码,兼顾效率与类型安全。

3. DStream

A. 离散化流:Spark Streaming的核心数据抽象,用于处理实时流数据。

B. 流处理核心抽象:本质是一系列连续的RDD集合,将实时流按时间片切分为微批,通过RDD批处理操作实现实时流处理。

五、核心算法与机制:支撑Spark高效运行的底层逻辑

Spark的高效运行,离不开一系列核心算法与机制的支撑,覆盖调度、内存管理、容错、Shuffle、查询优化等多个维度,进一步降低计算开销、提升可靠性:

1. 调度算法

A. DAG调度器

A. 阶段划分:以宽依赖(Shuffle操作)为边界,将用户代码构建的DAG划分为多个执行阶段(Stage),窄依赖操作归属于同一个Stage。

B. 任务调度:根据Stage依赖关系,按顺序调度各Stage执行,确保任务执行的有序性和高效性。

B. 任务调度器

A. 数据本地性优化:优先将任务分配到数据所在节点,减少跨节点网络传输,降低IO开销,提升执行效率。

B. 任务分片:将每个Stage的任务均匀分片,分配到不同Executor,避免单个Executor负载过重,实现负载均衡。

2. 内存管理

A. 统一内存管理器:将内存统一管理,避免内存碎片化,可根据任务负载动态调整各区域内存占比,提升内存利用率。

B. 堆内/堆外内存管理:堆内内存(JVM堆内存)用于存储RDD缓存、计算中间结果;堆外内存用于存储Shuffle中间数据等,避免JVM堆内存限制,减少GC耗时。

C. 内存分区
Storage Memory(存储内存):用于缓存RDD数据和广播变量,支撑内存计算。
Execution Memory(执行内存):用于任务计算过程中的中间数据存储,保障计算高效执行。
User Memory(用户内存):用于存储用户自定义数据结构,满足用户个性化需求。
Reserved Memory(预留内存):用于Spark内部开销,确保系统稳定运行。

3. 容错机制

A. Lineage(血统)机制:RDD记录数据的生成过程(血统),当某个分区数据丢失或节点故障时,可通过血统反向追溯,重新计算该分区,无需重跑整个作业。

B. Checkpoint机制:主动将RDD数据持久化至HDFS等外部存储,截断长血统链,减少容错时的重算成本,适用于迭代次数多的作业。

C. 数据复制策略:对关键数据(如Shuffle中间数据、Checkpoint数据)配置多副本存储,数据丢失后可快速恢复,提升数据可靠性。

4. Shuffle机制

A. Hash Shuffle:早期Shuffle机制,根据Key的Hash值分配到不同Reducer,实现简单,但数据量大时会产生大量小文件,增加IO和网络开销。

B. Sort Shuffle:对Hash Shuffle优化,先对数据排序再合并小文件,减少文件数量,降低IO和网络开销,适用于大规模数据场景。

C. Tungsten Shuffle优化:基于Tungsten执行引擎,采用堆外内存存储Shuffle数据,优化序列化和传输方式,进一步提升Shuffle效率。

5. 查询优化

A. Catalyst优化器

逻辑计划优化:将SQL解析为抽象语法树(AST),转换为逻辑计划后,通过谓词下推、列裁剪、常量折叠等规则优化,减少数据处理量。

物理计划优化:将优化后的逻辑计划转换为多个可选物理计划,根据数据统计信息估算成本,选择最优执行计划。

代码生成:将最优物理计划动态编译为原生机器码,替代JVM解释执行,提升执行速度。

B. Tungsten执行引擎

堆外内存管理:采用Unsafe Row二进制堆外内存格式,减少GC开销,提升存储密度。

缓存感知计算:根据数据缓存情况动态调整执行计划,充分利用缓存资源,减少重复计算。

代码生成优化:全阶段代码生成,将多个算子融合为单一代码块,消除虚函数调用,提升CPU利用率。

6. 流处理算法

A. 微批处理调度:将实时流切分为连续微批,每个微批作为批处理作业执行,平衡吞吐量和延迟。

B. 状态管理:支持流处理过程中的状态保存和更新,如累计计数、窗口聚合结果等,满足复杂实时分析需求。

C. 窗口操作:支持滑动窗口、滚动窗口等,对指定时间窗口内的流数据进行聚合分析,适配实时监控场景。

D. 水印机制:设置水印时间,自动识别并丢弃超过水印时间的延迟数据,处理事件时间乱序问题,确保结果时效性。

7. 机器学习算法

A. 分布式梯度下降:用于逻辑回归、线性回归等算法的模型训练,将梯度下降任务分布式执行,提升训练速度。

B. 模型并行:将机器学习模型拆分为多个部分,分配到不同节点并行训练,适用于大型模型训练。

C. 特征工程算法:包括特征提取、特征转换、特征选择等,如TF-IDF、Word2Vec、标准化等,提升模型性能。

D. 超参数调优:提供网格搜索、随机搜索等方法,自动寻找最优超参数组合,提升模型泛化能力。

8. 图计算算法

A. Pregel API:基于Pregel模型的图计算API,支持分布式图计算,适配复杂图遍历和聚合任务。

B. Graph并行算法:包括PageRank、最短路径、连通分量、三角计数等经典图算法,采用并行计算方式提升效率。

C. 图分区策略:提供顶点切割、边切割等分区策略,将图数据均匀分配到不同节点,减少跨节点数据传输。

六、核心组件:Spark功能的具体载体

Spark的各类功能通过六大核心组件实现,各组件基于Spark Core构建,分工明确、无缝集成,构成完整技术栈:

A. Spark Core: 核心引擎,负责RDD创建、转换、行动操作,以及任务调度、内存管理、容错等核心功能,是所有其他组件的基础。

B. Spark SQL: 结构化数据处理组件,支持SQL查询和DataFrame/Dataset API,集成Catalyst优化器,适配结构化数据处理场景。

C. Spark Streaming: 流处理组件,基于DStream实现微批流处理,Structured Streaming支持端到端一致性,适配实时场景。

D. MLlib: 分布式机器学习库,提供丰富算法和特征工程工具,支持完整机器学习流程。

E. GraphX: 图并行计算组件,提供图数据抽象、图算子和经典图算法,适配大规模图数据处理。

F. SparkR: R语言接口,允许R语言开发者使用Spark核心功能,拓展Spark用户群体。

七、优化技术:进一步提升Spark执行效率

Spark通过多种优化技术,进一步降低计算开销、提升资源利用率,保障作业高效执行,核心优化技术包括:

A. 数据本地性优化:调度算法优先将任务分配到数据所在节点,减少跨节点网络传输,降低IO开销。

B. 序列化优化(Kryo序列化):采用Kryo序列化机制,比Java序列化快10倍,减少数据存储体积和网络传输开销。

C. 动态资源分配:根据作业负载动态调整Executor数量和资源分配,避免资源浪费,提升集群利用率。

D. 推测执行:对执行速度异常缓慢的Task(慢任务)重新调度,避免单个慢任务拖慢整个作业进度。

E. 数据压缩:对Shuffle数据、持久化数据进行压缩,减少磁盘存储和网络传输开销。

F. 广播变量和累加器:广播变量将小数据广播到所有节点,避免重复传输;累加器用于分布式环境下的计数和求和,提升计算效率。

八、生态系统集成:拓展Spark应用边界

Spark具备良好的生态兼容性,能够与各类大数据工具、存储系统、云平台集成,进一步拓展应用场景,核心集成包括:

A. 与Hadoop生态系统集成:无缝兼容HDFS、YARN、HBase、Hive等Hadoop组件,可直接复用Hadoop生态资源,降低部署成本。

B. 数据源连接器:支持JDBC、ODBC、Kafka、Flume等多种数据源连接器,可灵活读取和写入各类数据。

C. 第三方库支持:支持与TensorFlow、PyTorch等深度学习库,以及Pandas、NumPy等数据分析库集成,拓展数据处理和建模能力。

D. 云平台集成(AWS, Azure, GCP):适配主流云平台,支持Spark在AWS EMR、Azure HDInsight、GCP Dataproc等云服务上部署,实现弹性伸缩和便捷管理。

九、关键架构对比:Spark vs 传统MapReduce

Spark之所以能取代传统MapReduce成为大数据处理主流框架,核心在于其在多个维度的显著优势,具体对比如下:

维度 传统MapReduce Apache Spark
计算模型 磁盘迭代(Map → Shuffle → Reduce),中间结果频繁落盘 内存迭代 + DAG流水线,中间结果优先驻留内存
容错机制 任务重试 + 数据复制,容错成本高 Lineage重算 + Checkpoint,无需额外数据复制,容错高效
延迟 高(分钟级),不适用于实时场景 低(秒级/毫秒级),支持批处理、流处理、交互查询
编程抽象 仅支持Map/Reduce函数,编程复杂度高 RDD/DataFrame/Dataset + 丰富算子,编程简洁、易用
优化器 无专门优化器,执行效率低 Catalyst + Tungsten双重优化,大幅提升执行效率
适用场景 仅适用于离线批处理,场景单一 批处理 + 流处理 + 迭代计算 + 交互查询,全场景适配

十、演进趋势(Spark 3.x+)

随着大数据技术的不断发展,Spark 3.x及以上版本持续优化,聚焦性能提升、生态适配和功能扩展,核心演进趋势如下:

A. 自适应查询执行(AQE):作业运行时动态优化Join策略、分区合并、数据倾斜处理,无需人工干预,进一步提升查询性能。

B. 动态分区裁剪(DPP):在星型模型等场景下,自动裁剪事实表的无用分区,减少数据扫描量,提升查询效率。

C. GPU加速:支持RAPIDS Accelerator,利用GPU加速SQL查询和DataFrame处理,适配大规模、高并发场景。

D. ANSI SQL兼容:完整支持SQL:2003标准,提升SQL查询的兼容性和易用性,降低数据分析师的学习成本。

E. Kubernetes原生:Spark on K8s成为主流部署模式,实现容器化部署、弹性伸缩,适配云原生环境,提升集群可管理性和可扩展性。

综上,Apache Spark通过全场景核心功能、五大核心特点、分层核心架构、灵活数据抽象、高效算法机制、完整组件栈、实用优化技术和广泛生态集成,构建了高效、灵活、统一的大数据处理框架。无论是企业级大规模数据处理,还是开发者日常数据探索,Spark都能提供高效、便捷的解决方案,同时持续演进适配云原生、GPU加速等新趋势,成为大数据领域不可替代的核心引擎。

如果觉得这篇文章对你有帮助,欢迎点赞、收藏,也可以在评论区留言,聊聊你在使用Spark时遇到的问题~

Ubuntu18搭建CDH6环境03

1、确保cdt01可以ssh联通cdt02和cdt03

#这个userid与可以无密码使用sudo的userid相同
ssh -l userid cdh02
ssh -l userid cdh03

2、浏览器访问(以后都是界面了)
http://172.16.172.101:7180
用户名:admin
密码:admin

3、根据引导界面,新建Cluster
将172.16.172.101-172.16.172.103都安装好cloudera-manager-agent

4、根据引导界面,选用需要的软件进行安装
安装时,注意合理分配角色,也就是合理分配内存资源

5、依次安装
hdfs
zookeeper
hbase
yarn
hive
spark

6、安装完毕

PS:
1、如果出现找不到jdbc driver的情况

sudo apt-get install libmysql-java

Ubuntu18搭建CDH6环境02

1、cdt01安装

#添加cloudera仓库
wget https://archive.cloudera.com/cm6/6.3.0/ubuntu1804/apt/archive.key
sudo apt-key add archive.key
wget https://archive.cloudera.com/cm6/6.3.0/ubuntu1804/apt/cloudera-manager.list
sudo mv cloudera-manager.list /etc/apt/sources.list.d/

#更新软件清单
sudo apt-get update

#安装jdk8
sudo apt-get install openjdk-8-jdk

#安装cloudera
sudo apt-get install cloudera-manager-daemons cloudera-manager-agent cloudera-manager-server

2、安装及配置mysql
2.1、安装mysql

sudo apt-get install mysql-server mysql-client libmysqlclient-dev libmysql-java

2.2、停止mysql

sudo service mysql stop

2.3、删除不需要的文件

sudo rm /var/lib/mysql/ib_logfile0
sudo rm /var/lib/mysql/ib_logfile1

2.4、修改配置文件

sudo vi /etc/mysql/mysql.conf.d/mysqld.cnf

#修改或添加以下信息
[mysqld]
transaction-isolation = READ-COMMITTED
max_allowed_packet = 32M
max_connections = 300
innodb_flush_method = O_DIRECT

2.5、启动mysql

sudo service mysql start

2.6、初始化mysql

sudo mysql_secure_installation

3、创建数据库并授权

sudo mysql -uroot -p
-- 创建数据库
-- Cloudera Manager Server
CREATE DATABASE scm DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
-- Activity Monitor
CREATE DATABASE amon DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
-- Reports Manager
CREATE DATABASE rman DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
-- Hue
CREATE DATABASE hue DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
-- Hive Metastore Server
CREATE DATABASE hive DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
-- Sentry Server
CREATE DATABASE sentry DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
-- Cloudera Navigator Audit Server
CREATE DATABASE nav DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
-- Cloudera Navigator Metadata Server
CREATE DATABASE navms DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
-- Oozie
CREATE DATABASE oozie DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;

#创建用户并授权
GRANT ALL ON scm.* TO 'scm'@'%' IDENTIFIED BY 'scm123456';
GRANT ALL ON amon.* TO 'amon'@'%' IDENTIFIED BY 'amon123456';
GRANT ALL ON rman.* TO 'rman'@'%' IDENTIFIED BY 'rman123456';
GRANT ALL ON hue.* TO 'hue'@'%' IDENTIFIED BY 'hue123456';
GRANT ALL ON hive.* TO 'hive'@'%' IDENTIFIED BY 'hive123456';
GRANT ALL ON sentry.* TO 'sentry'@'%' IDENTIFIED BY 'sentry123456';
GRANT ALL ON nav.* TO 'nav'@'%' IDENTIFIED BY 'nav123456';
GRANT ALL ON navms.* TO 'navms'@'%' IDENTIFIED BY 'navms123456';
GRANT ALL ON oozie.* TO 'oozie'@'%' IDENTIFIED BY 'oozie123456';

4、初始化数据库

sudo /opt/cloudera/cm/schema/scm_prepare_database.sh mysql scm scm scm123456
sudo /opt/cloudera/cm/schema/scm_prepare_database.sh mysql amon amon amon123456
sudo /opt/cloudera/cm/schema/scm_prepare_database.sh mysql rman rman rman123456
sudo /opt/cloudera/cm/schema/scm_prepare_database.sh mysql hue hue hue123456
sudo /opt/cloudera/cm/schema/scm_prepare_database.sh mysql hive hive hive123456
sudo /opt/cloudera/cm/schema/scm_prepare_database.sh mysql sentry sentry sentry123456
sudo /opt/cloudera/cm/schema/scm_prepare_database.sh mysql nav nav nav123456
sudo /opt/cloudera/cm/schema/scm_prepare_database.sh mysql navms navms navms123456
sudo /opt/cloudera/cm/schema/scm_prepare_database.sh mysql oozie oozie oozie123456

5、启动

#启动cloudera-scm-server
sudo systemctl start cloudera-scm-server

#查看启动日志,等待Jetty启动完成
sudo tail -f /var/log/cloudera-scm-server/cloudera-scm-server.log

6、启动
浏览器访问
http://172.16.172.101:7180
用户名:admin
密码:admin

Ubuntu18搭建CDH6环境01

1、环境准备

VirtualBox 6
Ubuntu 18
Cloudera CDH 6.3

2、虚拟机安装Ubuntu18,配置为
1CPU
4G内存
300G硬盘
两块网卡,一块为HostOnly,一块为NAT

3、将虚拟机克隆为三份
如果是手工拷贝,记得修改硬盘UUID、虚拟机UUID、网卡硬件ID

4、设置IP地址、hostname及hosts文件

机器名 HostOnly IP
cdh01 172.16.172.101
cdh02 172.16.172.102
cdh03 172.16.172.103

5、允许无密码使用sudo,至少修改cdh02和cdh03

#edit /etc/sudoers
userid ALL=(ALL:ALL) NOPASSWD: ALL

Redash环境搭建(Ubuntu)

1、下载安装脚本

wget -O bootstrap.sh https://raw.githubusercontent.com/getredash/redash/master/setup/ubuntu/bootstrap.sh

2、运行脚本

chmod +x bootstrap.sh
sudo ./bootstrap.sh

3、脚本执行成功后,直接访问nginx就好了
http://ip:80
实际上是代理了这个网站
http://localhost:5000

4、常见问题
在执行过程中,遇到下载失败的情况,就直接把文件下载到本地,改一下路径,重新运行脚本就好了
我在运行脚本的过程中,遇到了缺少schema的提示,删除了数据库redash及用户redash,重新运行脚本就好了

Superset环境搭建(Ubuntu)

1、安装依赖包

sudo apt-get install build-essential libssl-dev libffi-dev python-dev python-pip libsasl2-dev libldap2-dev
pip install virtualenv

2、使用虚拟环境

#新建沙盒
virtualenv supersetenv
#进入沙盒
source bin/activate

3、安装

#升级安装工具,安装服务
pip install --upgrade setuptools pip
pip install superset
#新增管理员用户
fabmanager create-admin --app superset
#重置管理员密码
#fabmanager reset-password admin --app superset
#升级数据库
superset db upgrade
#加载测试数据
superset load_examples
#初始化
superset init
#运行Server
superset runserver

4、此时,只需要访问http://ip:8088就可以登陆了

5、安装驱动

#mysql
apt-get install libmysqlclient-dev
pip install mysqlclient
#oracle
#pip install cx_Oracle
#mssql
#pip install pymssql

6、关闭

#Ctrl+C关闭Server
#退出沙盒
deactivate
#删除沙盒
#rmvirtualenv supersetenv

7、升级

#进入沙盒
source bin/activate
#升级Server
pip install superset --upgrade
#升级DB
superset db upgrade
#初始化
superset init
#退出沙盒
deactivate

Spark环境搭建05

这里主要说一下Spark的SQL操作,如何从mysql导入数据。

#拷贝了mysql的驱动到jar文件下面
#建立DF
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3307").option("dbtable", "hive.TBLS").option("user", "hive").option("password", "hive").load()

#schema
jdbcDF.schema

#count
jdbcDF.count()

#show
jdbcDF.show()