记录flink + docker, 部署jar,执行任务程序
首先安装flink,这里以1.13.1举例(docker安装的话,jobmanager和taskmanager需要分开部署)flink-docker-github-repository (这个参数flink文档中没找到)关于query.server.port: 6125不用去管,这个不填也会默认自动生成: 12yml中的内容不说了, 下面贴几张关于Flink生产配置最佳实践的图, 钉钉公开课趣头条实时平台负责人分享https://www.bilibili.com/video/BV1iE411r7S6 Flink在yarn上运行,每个TaskManager的slot个数怎么设置?经验公式:slot个数tm个数=并行度并行度=kafka的分区个数(10分区)slot的个数要小于yarn设置的单个container最大可以申请的cpu核数(5个 8-36个)。那么就是...
记录开发中遇到关于flink的一些错误(长期)
2022-01-20 09:25:58,308 WARN org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - Could not execute application:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count] 这是因为当 flink 作业是以 detached...
Flink-es-conector7-ElasticsearchSink
截至1.13.1,官方文档所提供的方式已经废弃12345678910111213141516171819202122232425262728293031HttpHost httpHost = new HttpHost(esHost, esPort, esScheme);List<HttpHost> httpPosts = new ArrayList<>();httpPosts.add(httpHost);RestClientFactory restClientFactory = new RestClientFactory() { @Override public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); ...
记一次使用coGroup产生的问题
记一次使用coGroup产生的问题(Caused by: org.apache.flink.util.TraversableOnceException: The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed)
flink问题:esotericsoftware.kryo.serializers.CollectionSerializer.read NullPointerException
(flink)问题:esotericsoftware.kryo.serializers.CollectionSerializer.read NullPointerException 异常如下 123456789101112131415161718192021222324252627282930313233com.esotericsoftware.kryo.KryoException: java.lang.NullPointerExceptionSerialization trace:values (org.apache.avro.generic.GenericData$Record) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at...
记录一次flink的数据转换解决方式
12345678910111213/** * { * 1:[2,3,4], * 2:[3,4,5] * } * ↓ * { * 2:[1], * 3:[1,2], * 4:[1,2], * 5:[2] * } */ flink由上转成下面的数据格式 打散 1234562:1,3:1,4:1,3:2,4:2,5:2 2.再根据key分组并且reduce 12345678910111213141516171819202122232425262728DataSet<Tuple2<String, String>> test = ...test.flatMap(new FlatMapFunction<Tuple2<String, String>, Tuple2<String, String>>() { @Override public void flatMap(Tuple2<String, String>...
flink简述
...
关于DataRowException, internal schema representation is probably ...异常的调试记录
异常:DataRowException, internal schema representation is probably out of sync with real database schema 123456789起初异常信息打印的不全, 但是浑然不知,后来在打印信息里发现flink并没有打印出应该打印的日志,然后去查了下,是缺少依赖包包加上<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <!-- 注意,若无type为jar则报错--> <type>jar</type></dependency> 全部异常信息 1234567891011121314151617181920212223242526272829303132333435ERROR...
记录flink的安装及简单使用
先查看centos中自带的jdk并卸载 1234567[root@root ~]# rpm -qa | grep java //查看tzdata-java-2016c-1.el6.noarchjava-1.6.0-openjdk-1.6.0.38-1.13.10.4.el6.x86_64java-1.7.0-openjdk-1.7.0.99-2.6.5.1.el6.x86_64[root@root ~]# rpm -e --allmatches --nodeps java-1.6.0-openjdk-1.6.0.38-1.13.10.4.el6.x86_64 //卸载[root@root ~]# rpm -e --allmatches --nodeps java-1.7.0-openjdk-1.7.0.99-2.6.5.1.el6.x86_64 //卸载[root@root ~]# rpm -qa | grep java //再次查看 开发版openjdk(非开发版还需单独额外安装jps等工具) 1yum install -y ...
记一次利用Semaphore处理大批次数据计算的解决方案
先描述下信号量的意义 1234567891011Semaphore是一个计数信号量。 在概念上,信号量维持一组许可证。如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。 每个release()添加许可证,潜在地释放阻塞获取方。但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行。 信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源。在这里插入图片描述Semaphore实现的功能就类似有3个停车位,假如有6个人要停车,那么同时只能停多少辆车?同时只能有3个人能够占用,当3个人中...