Flink-StarRocks详解:第六部分-即席查询大案例解析(第56天)

大数据飞总 2024-08-09 09:05:02 阅读 76

系列文章目录

数仓场景:即席查询案例

6.1 场景介绍

6.2 方案架构

6.3 方案特点

6.4 操作流程

6.4.1 步骤一:创建MySQL源数据表

6.4.2 步骤二:创建StarRocks表

6.4.3 步骤三:执行Flink任务,启动数据流

6.4.4 步骤四:验证数据

文章目录

系列文章目录前言6. 数仓场景:即席查询案例6.1 场景介绍6.2 方案架构6.3 方案特点

6.4 操作流程6.4.1 步骤一:创建MySQL源数据表6.4.2 步骤二:创建StarRocks表6.4.3 步骤三:执行Flink任务,启动数据流6.4.4 步骤四:验证数据


前言

本文为Flink-StarRocks详解后续章节:主要详解StarRocks数仓场景:即席查询大案例

6. 数仓场景:即席查询案例

本文通过示例介绍如何基于EMR Serverless StarRocks的视图能力构建数仓场景-即席查询解决方案。

6.1 场景介绍

随着向量化、CBO(Cost Based Optimizer,基于代价的优化器)、单机多核调度等技术的应用,StarRocks的计算能力逐步提升。很多时候在使用StarRocks进行数仓分层建模时,大部分将数据建模到DWD层(基础整合层)或DWS层(维度宽度)。在实际业务中,运用StarRocks的计算能力,可以直接查询DWD或DWS层数据,还可以灵活地交互式即席查询。

6.2 方案架构

使用StarRocks实现数仓场景即席查询的基本架构如下图所示。

在这里插入图片描述

整体数据流如下:

(1)Flink清洗导入Kafka的日志或者通过Flink-CDC-StarRocks工具读取MySQL Binlog导入StarRocks。根据需要选用明细、聚合、更新或主键各种模型,只物理落地ODS层(格式整理层)。

(2)向上采用StarRocks View视图能力,利用StarRocks向量化极速查询和CBO优化器满足多表关联、嵌套子查询等复杂SQL,查询时现场计算指标结果,保证指标上卷和下钻高度同源一致。

6.3 方案特点

该方案主要特点是,计算逻辑在StarRocks侧(现场查询),适用于业务库高频数据更新的场景,实体数据只在ODS或DWD层存储。

 方案优势

 灵活性强,可随时根据业务逻辑调整View。

 指标修正简单,上层都是View逻辑封装,只需要更新底表数据。

 方案缺点

当View的逻辑较为复杂,数据量较多时,查询性能较低。

 适用场景

 数据来源于数据库和埋点系统,适合对QPS要求不高,对灵活性要求比较高,且计算资源较为充足的场景。

 实时要求非常高,要求写入即可查,更新即反馈。适合有即席查询需求,且资源较为充足,查询复杂度较低的场景。

6.4 操作流程

6.4.1 步骤一:创建MySQL源数据表

(1)登录DMS

(2)创建库和表

<code>CREATE DATABASE IF NOT EXISTS flink_cdc;

create table flink_cdc.orders (

order_id INT NOT NULL AUTO_INCREMENT,

order_revenue FLOAT NOT NULL,

order_region VARCHAR(40) NOT NULL,

customer_id INT NOT NULL,

PRIMARY KEY ( order_id )

);

create table flink_cdc.customers (

customer_id INT NOT NULL,

customer_age INT NOT NULL,

customer_name VARCHAR(40) NOT NULL,

PRIMARY KEY ( customer_id )

);

6.4.2 步骤二:创建StarRocks表

(1)登录EMR StarRocks Manager

(2)创建库和表

CREATE DATABASE IF NOT EXISTS `flink_cdc`;

CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` (

`customer_id` INT NOT NULL COMMENT "",

`customer_age` FLOAT NOT NULL COMMENT "",

`customer_name` STRING NOT NULL COMMENT ""

) ENGINE=olap

PRIMARY KEY(`customer_id`)

COMMENT ""

DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1

PROPERTIES (

"replication_num" = "1"

);

CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` (

`order_id` INT NOT NULL COMMENT "",

`order_revenue` FLOAT NOT NULL COMMENT "",

`order_region` STRING NOT NULL COMMENT "",

`customer_id` INT NOT NULL COMMENT ""

) ENGINE=olap

PRIMARY KEY(`order_id`)

COMMENT ""

DISTRIBUTED BY HASH(`order_id`) BUCKETS 1

PROPERTIES (

"replication_num" = "1"

);

(3)基于ODS表创建DWD视图

CREATE VIEW flink_cdc.dwd_order_customer_valid (

order_id,

order_revenue,

order_region,

customer_id,

customer_age,

customer_name

)

AS

SELECT o.order_id, o.order_revenue, o.order_region, c.customer_id, c.customer_age, c.customer_name

FROM flink_cdc.customers c JOIN flink_cdc.orders o

ON c.customer_id=o.customer_id

WHERE c.customer_id != -1;

(4)基于DWD表创建DWS视图

CREATE VIEW flink_cdc.dws_agg_by_region (

order_region,

order_cnt,

order_total_revenue)

AS

SELECT order_region, count(order_region), sum(order_revenue)

FROM flink_cdc.dwd_order_customer_valid

GROUP BY order_region;

6.4.3 步骤三:执行Flink任务,启动数据流

(1)打开阿里云flink控制台

(2)创建MySQL CDC映射表

注意:hostname等需要根据自己的实际情况进行修改。

CREATE DATABASE IF NOT EXISTS `vvp`.`flinkcdc`;

CREATE TABLE IF NOT EXISTS `vvp`.`flinkcdc`.`customers_src` (

`customer_id` INT NOT NULL,

`customer_age` FLOAT NOT NULL,

`customer_name` STRING NOT NULL,

PRIMARY KEY(`customer_id`) NOT ENFORCED

) with (

'connector' = 'mysql',

'hostname' = 'rm-cn-x0r3fp1lj000qa.rwlb.rds.aliyuncs.com',

'port' = '3306',

'username' = 'xxxxxx',

'password' = 'xxxxxx',

'database-name' = 'flink_cdc',

'table-name' = 'customers'

);

CREATE TABLE IF NOT EXISTS `vvp`.`flinkcdc`.`orders_src` (

`order_id` INT NOT NULL,

`order_revenue` FLOAT NOT NULL,

`order_region` STRING NOT NULL,

`customer_id` INT NOT NULL,

PRIMARY KEY(`order_id`) NOT ENFORCED

) with (

'connector' = 'mysql',

'hostname' = 'rm-cn-x0r3fp1lj000qa.rwlb.rds.aliyuncs.com',

'port' = '3306',

'username' = 'xxxx',

'password' = 'xxxxxx!',

'database-name' = 'flink_cdc',

'table-name' = 'orders'

);

(3)创建StarRocks映射表

注意:jdbc-url、load-url等需要根据自己的实际情况进行修改。查询位置为EMR控制台-》StarRocks-》点击实例-》实例详情

CREATE TABLE IF NOT EXISTS `vvp`.`flinkcdc`.`customers_sink` (

`customer_id` INT NOT NULL,

`customer_age` FLOAT NOT NULL,

`customer_name` STRING NOT NULL,

PRIMARY KEY(`customer_id`)

NOT ENFORCED

) with (

'connector' = 'starrocks'

,'jdbc-url' = 'jdbc:mysql://fe-c-838fd7a4db1550a6-internal.starrocks.aliyuncs.com:9030'

,'load-url' = 'fe-c-838fd7a4db1550a6-internal.starrocks.aliyuncs.com:8030'

,'database-name' = 'flink_cdc'

,'table-name' = 'customers'

,'username' = 'xxxxxx'

,'password' = 'xxxxxx'

,'sink.buffer-flush.interval-ms' = '5000'

,'sink.semantic' = 'exactly-once'

);

CREATE TABLE IF NOT EXISTS `vvp`.`flinkcdc`.`orders_sink` (

`order_id` INT NOT NULL,

`order_revenue` FLOAT NOT NULL,

`order_region` STRING NOT NULL,

`customer_id` INT NOT NULL,

PRIMARY KEY(`order_id`)

NOT ENFORCED

) with (

'connector' = 'starrocks'

,'jdbc-url' = 'jdbc:mysql://fe-c-838fd7a4db1550a6-internal.starrocks.aliyuncs.com:9030'

,'load-url' = 'fe-c-838fd7a4db1550a6-internal.starrocks.aliyuncs.com:8030'

,'database-name' = 'flink_cdc'

,'table-name' = 'orders'

,'username' = 'xxxxxx''

,'password' = 'xxxxxx'

,'sink.buffer-flush.interval-ms' = '5000'

,'sink.semantic' = 'exactly-once'

);

 参数含义

在这里插入图片描述

在这里插入图片描述

(4)将MySQL数据插入到StarRocks

以下代码写到一个流作业中,然后部署运行。

需要无状态启动,并且设置checkpoint周期为5秒

在这里插入图片描述

<code>

BEGIN STATEMENT SET;

INSERT INTO `vvp`.`flinkcdc`.`customers_sink` SELECT * FROM `vvp`.`flinkcdc`.`customers_src`;

INSERT INTO `vvp`.`flinkcdc`.`orders_sink` SELECT * FROM `vvp`.`flinkcdc`.`orders_src`;

END;

6.4.4 步骤四:验证数据

(1)在RDS数据库窗口执行以下命令,向表orders和customers中插入数据。

INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1);

INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1);

INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test");

(2)在EMR StarRocks Manager中进行查询

1)查看orders表信息

select * from flink_cdc.orders;

在这里插入图片描述

2)查看customers表信息

<code>select * from flink_cdc.customers;

在这里插入图片描述

3)查询DWD层数据

<code>select * from flink_cdc.dwd_order_customer_valid;

在这里插入图片描述

4)查询DWS层数据

<code>select * from flink_cdc.dws_agg_by_region;

在这里插入图片描述

(3)在RDS数据库窗口执行以下命令,从orders表中删除一条记录

<code>DELETE FROM flink_cdc.orders where order_id = 2;

(4)再次在EMR StarRocks Manager中进行查询,查看变化

1)查询DWD层数据

select * from flink_cdc.dwd_order_customer_valid;

在这里插入图片描述

2)查询DWS层数据

<code>select * from flink_cdc.dws_agg_by_region;

在这里插入图片描述



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。