5、Flink SQL管理平台flink-streaming-platform-web安装搭建

Rainbow酱 2024-08-04 13:03:01 阅读 85

一、下载和环境

1、下载源码

无情(朱慧培)/flink-streaming-platform-web

2、环境要求

hadoop版本 2+flink 版本 1.12.0 官方地址:

Gitee.com

jdk版本 jdk1.8mysql版本 5.6+scala版本 2.12

scala安装

kafka版本 1.0+

Kafka安装

二、编译

目前web客户端支持的flink版本是1.15.3,如果需要调整flink版本可下载源码 然后修改pom里面的版本号

<code>vim flink-streaming-platform-web-master\pom.xml

<flink.version>1.17.0</flink.version>

<scala.binary.version>2.12</scala.binary.version>

可能调整后导致flink引用的上下不兼容 需要你手动解决下

保存后打包

mvn clean package -Dmaven.test.skip=true

错误:::danger

在windows上执行assembly任务,但是它包含了相对于linux的路径

:::

修改<code>flink-streaming-platform-web-master\deployer\src\main\assembly\dev.xml

最后打好的包在 <code>/flink-streaming-platform-web-master/deployer/target

包名是:flink-streaming-platform-web.tar.gz

三、部署

1、将打包好的文件放入到172服务器上,由于依赖Flink,必须与Flink同一服务器

2、解压

<code>cd /home

tar -xvzf /home/flink-streaming-platform-web.tar.gz

3、修改配置文件:application.properties

vim /home/flink-streaming-platform-web/conf/application.properties

####jdbc信息

server.port=9084

spring.datasource.url=jdbc:mysql://10.9.70.172:3306/flink_web?serverTimezone=CTT&characterEncoding=utf8&useUnicode=true&useSSL=false&autoReconnect=true&zeroDateTimeBehavior=round&allowMultiQueries=true&allowPublicKeyRetrieval=true

spring.datasource.username=root

spring.datasource.password=Stwc.2wsx

4、启动服务

cd /home/flink-streaming-platform-web/bin/

sh deploy.sh start

image.png

5、访问

9084端口在浏览器访问

http://10.9.70.172:9084/

登录号:admin 密码 123456

image.png

用户密码过于简单,可以通过修改数据库中表user的密码

下为按照源码提供的加密方式加密的密码,密码为“<code>Stwc.2wsx”

185c9df7a8eedd40e3ec2a2404941549

登录成功

6、配置flink web平台

页面上点击系统设置,进入配置页面:

<code>/opt/software/flink/flink-1.17.0/

/home/flink-streaming-platform-web/

http://10.9.70.166:8088/

http://10.9.70.172:8081/

http://10.9.70.172:8081/;http://10.9.70.166:8081/

:::warning

这里的参数意义:

Flink客户端目录:就是安装的Flink目录;Flink管理平台目录:就是下载的flink-streaming-platform-web放的目录;yarn RM http地址:就是yarn.resourcemanager.webapp.address,通常是8088端口;

经过测试,配置这3个参数即可使用。

:::

image.png

7、添加flink插件

Just a moment…

上传到三台服务器 <code>/opt/software/flink/flink-1.17.0/lib/ 上

7、运行demo

在<code>172的mysql数据库flink_web中创建表

CREATE TABLE sync_test_1 (

`day_time` varchar(64) NOT NULL,

`total_gmv` bigint(11) DEFAULT NULL,

PRIMARY KEY (`day_time`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

在web页面新建sql流任务:

<code>-p 2 -Dtaskmanager.numberOfTaskSlots=2 -Dyarn.application.queue=root.default

create table flink_test_1 (

id BIGINT,

day_time VARCHAR,

amnount BIGINT,

proctime AS PROCTIME ()

)

with (

'connector' = 'kafka',

'topic' = 'flink-connector',

'properties.bootstrap.servers' = 'JSYF3:9092',

'properties.group.id' = 'flink_gp_test1',

'scan.startup.mode' = 'earliest-offset',

'format' = 'json',

'json.fail-on-missing-field' = 'false',

'json.ignore-parse-errors' = 'true'

);

CREATE TABLE sync_test_1 (

day_time string,

total_gmv bigint,

PRIMARY KEY (day_time) NOT ENFORCED

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://10.9.70.172:3306/flink_web?characterEncoding=UTF-8',

'table-name' = 'sync_test_1',

'username' = 'root',

'password' = 'Stwc.2wsx'

);

INSERT INTO sync_test_1

SELECT day_time,SUM(amnount) AS total_gmv

FROM flink_test_1

GROUP BY day_time;

创建好任务后,启动任务吧。

启动后可在<code>yarn的8088端口页面看到起了一个application,名称是新建任务填写的名称加flink@前缀

错误:::danger

1:Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/OffsetResetStrategy

:::

这是缺少kafka-clients的jar包,只需将下载的kafka-clients jar包放在lib目录下重启flink,提交任务

:::danger

2:Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The cluster does not have the requested resources for the JobManager available!

Maximum Memory: 1024MB Requested: 4096MB. Please check the ‘yarn.scheduler.maximum-allocation-mb’ and the ‘yarn.nodemanager.resource.memory-mb’ configuration values

:::

内存不够,修改Flink的配置

vim /opt/software/hadoop/hadoop-3.3.5/etc/hadoop/yarn-site.xml

<property>

<name>yarn.nodemanager.resource.memory-mb</name>

<value>4096</value>

</property>

<!-- 每个容器请求的最小内存资源(以MB为单位)。-->

<property>

<name>yarn.scheduler.minimum-allocation-mb</name>

<value>512</value>

</property>

<!-- 每个容器请求的最大内存资源(以MB为单位)。-->

<property>

<name>yarn.scheduler.maximum-allocation-mb</name>

<value>2048</value>

</property>

重启yarn

任务也启动成功

kafka控制台往主题里面写数据,主题不存在会自动创建

<code>/home/kafka/kafka_2.12-3.4.0/bin/kafka-console-producer.sh --broker-list 10.9.70.172:9092 --topic flink-connector

{ "day_time":"2021-01-01","amnount":"2022","id":1}

{ "day_time":"2021-01-01","amnount":"2022","id":1}

{ "day_time":"2021-01-01","amnount":"2022","id":2}

{ "day_time":"2021-01-02","amnount":"2000","id":3}

/home/kafka/kafka_2.12-3.4.0/bin/kafka-run-class kafka.tools.ConsumerOffsetChecker --zookeeper JSYF3:2181 --group flink_gp_test1 --topic flink-connector

cd /home/kafka/kafka_2.12-3.4.0/bin/

./kafka-consumer-groups.sh --describe --group flink_gp_test1 --zookeeper JSYF3:2181

./kafka-topics.sh --list --bootstrap-server JSYF3:9092

image.png

image.png

【问题】

1、如何找到Flink错误日志

如果启动失败

查看文件

<code>/opt/software/flink/flink-1.17.0/log/flink-root-client-jsyf3.log

如果还是没有具体的错误信息

提示让你如看<code>application中的文件

http://10.9.70.166:8088/cluster/apps

2、Flink yarn Could not start rest endpoint on any port in port range 8081

此错误是说端口被占用。

对应的配置是 <code>flink-conf.yaml中的rest.bind-port

rest.bind-port不设置,则Rest Server默认绑定到rest.port端口(8081)。

rest.bind-port可以设置成列表格式如50100,50101,也可设置成范围格式如50100-50200。推荐范围格式,避免端口冲突。

vim /opt/software/flink/flink-1.17.0/conf/flink-conf.yaml

rest.bind-port: 8080-8090

【参考】

Dinky

部署 | Dinky

Flink SQL管理平台flink-streaming-platform-web安装搭建

Flink SQL管理平台flink-streaming-platform-web安装搭建



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。