记录flink + docker, 部署jar,执行任务程序
首先安装flink,这里以1.13.1举例(docker安装的话,jobmanager和taskmanager需要分开部署)flink-docker-github-repository (这个参数flink文档中没找到)关于query.server.port: 6125不用去管,这个不填也会默认自动生成: 12yml中的内容不说了, 下面贴几张关于Flink生产配置最佳实践的图, 钉钉公开课趣头条实时平台负责人分享https://www.bilibili.com/video/BV1iE411r7S6 Flink在yarn上运行,每个TaskManager的slot个数怎么设置?经验公式:slot个数tm个数=并行度并行度=kafka的分区个数(10分区)slot的个数要小于yarn设置的单个container最大可以申请的cpu核数(5个 8-36个)。那么就是...
记录开发中遇到关于flink的一些错误(长期)
2022-01-20 09:25:58,308 WARN org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - Could not execute application:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn’t call an eager execution function [collect, print, printToErr, count] 这是因为当 flink 作业是以 detached...
关于记录集合里面的记录数
记录集合里面的记录数 该数量是指组件与组件之间通信的【缓存队列】的size大小,Kettle内部用List实现该缓存队列,每一条语句都会被封装成一个 RowSet对象,每个组件之间都会有个List<RowSet>队列,源step每次会往该队列写一条数据,目标step每次会从队列读取一条数据。 kettle里面转换是并行的,数据是一条一条流经每个组件,队列大小采用默认的10000条即可。 但是当使用到【阻塞数据直至步骤完成】、【阻塞数据】相关组件时,则需要根据业务数据量扩大该size的大小。 否则,数据会一直在某两个组件之间的缓存队列中存放,当数量达到一定大小时,就会卡主不动。 缓存队列的大小对作业没有影响,查看源码发现,缓存队列由inputRowSet 和 outputRowset组成,底部实现是ArrayList,初始化创建的时候,默认容量为10,并不会直接创建设定长度的 数组。并且使用ArrayList的特性:当要添加的数据量超过数组的容量时候,ArrayList会动态扩容,size扩大1.5倍
记录kettle安装以及支持clickhouse进行开发
kettle下载地址 选择需要的版本,进入这个路径,下载这个ce 解压 kettle 安装包下载clickhouse所需驱动包 将 clickhouse-plugins文件夹复制到 kettle 的 data-integration\plugins文件夹里 复制驱动包里的其余jar包,粘贴到 kettle 的 data-integration\libswt\xxx 目录下(根据自己的系统而定,linux环境使用uname -r 先看下系统) 启动kettle编辑工具,双击 data-integration 目录下 Spoon.bat 启动
记录kettle安装以及支持clickhouse进行开发(基于kettle9.2)
todo 待改造,目前这种可以配置变量的地方却无法配置变量(不识别,只能写死),之后尝试改下 目前kettle最新的版本是9.2,但是es仅支持到6.4.2,而且也无法配置用户名和密码还有schema,所以需要对es-bulk-insert组件改造下下载源码(kettle...
Flink-es-conector7-ElasticsearchSink
截至1.13.1,官方文档所提供的方式已经废弃12345678910111213141516171819202122232425262728293031HttpHost httpHost = new HttpHost(esHost, esPort, esScheme);List<HttpHost> httpPosts = new ArrayList<>();httpPosts.add(httpHost);RestClientFactory restClientFactory = new RestClientFactory() { @Override public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); ...
job和trans的参数详解
pan调用示例 sh pan.sh -rep=initech_pdi_repo -user=pgibbons -pass=lumburghsux -trans=TPS_reports_2011 -param:db_ip=$db_ip -param:db_name=$db_name 参数 参数说明 rep 企业或数据库存储库名称 user 仓库用户名 pass 仓库密码 trans 要启动的转换的名称(在存储库中显示) dir 包含转换的存储库目录,包括前导斜杠 file 如果调用的是本地 KTR 文件,则为文件名,如果不在本地目录中,则包括路径 level 日志级别(基本、详细、调试、行级别、错误、无) logfile 将日志输出写入的本地文件名 listdir 列出指定存储库中的目录 listtrans 列出指定存储库目录中的转换 listrep 列出可用的存储库 exprep 将所有存储库对象导出到一个 XML 文件 norep 防止 Pan 登录到存储库。如果您已经设置了...
Kettle是什么
kettle中文网 免费开源的基于java的企业级ETL工具ETL(Extract-Transform-Load的缩写,即数据抽取、转换、装载的过程),对于企业或行业应用来说,我们经常会遇到各种数据的处理,转换,迁移,所以了解并掌握一种etl工具的使用,必不可少,这里我介绍一个我在工作中使用了3年左右的ETL工具Kettle,本着好东西不独享的想法,跟大家分享碰撞交流一下!在使用中我感觉这个工具真的很强大,支持图形化的GUI设计界面,然后可以以工作流的形式流转,在做一些简单或复杂的数据抽取、质量检测、数据清洗、数据转换、数据过滤等方面有着比较稳定的表现,其中最主要的我们通过熟练的应用它,减少了非常多的研发工作量,提高了我们的工作效率
一些开发中需要注意的问题点
注意点1234567891011121314151617181920212223242526272829303132333435361. 过滤记录或者字段选择等组件, 无法出现上面步骤中的字段的时候,尝试在上面步骤中再次点击确定2. 两个数据流join前必须先进行排序操作。3. 数据流从计算器或join操作流出,并将流入另一join操作之前,需要插入一个字段选择插件。4. 表输入插件中,填写查询语句时,语句末尾不能加分号。~~ kettle8.2以上无此问题5. sql脚本插件中,填写sql语句时,语句末尾必须加分号,否则不会执行。~~ kettle8.2以上无此问题6. 引用变量时,采用${变量名}的方式引用,以免出现问题。此种引用方式还可以在变量内容后接其他字符串,例如:${key}_20151217会被解析为:value_2015217。7. job中执行多个trans时,如何确定trans执行的先后顺序?根据连接线的先后顺序执行,如果需要优先执行则先连接从开始到该trans的连线即可。8....
记一次使用coGroup产生的问题
记一次使用coGroup产生的问题(Caused by: org.apache.flink.util.TraversableOnceException: The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed)