Skip to content

Commit a88a05f

Browse files
authored
Merge pull request #441 from InterestingLab/kid-xiong.wd2.docs.flink
Kid xiong.wd2.docs.flink
2 parents bec498b + fddfaae commit a88a05f

File tree

59 files changed

+559
-327
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+559
-327
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,6 @@ apidoc
3030

3131
Test.java
3232
Test.scala
33-
33+
test.conf
34+
log4j.properties
3435
spark-warehouse

bin/start-waterdrop-flink.sh

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ while (( "$#" )); do
88
shift 2
99
;;
1010

11+
-i|--variable)
12+
variable=$2
13+
java_property_value="-D${variable}"
14+
variables_substitution="${java_property_value} ${variables_substitution}"
15+
shift 2
16+
;;
17+
1118
*) # preserve positional arguments
1219
PARAMS="$PARAMS $1"
1320
shift
@@ -31,8 +38,12 @@ assemblyJarName=$(find ${PLUGINS_DIR} -name waterdrop-core*.jar)
3138

3239
source ${CONF_DIR}/waterdrop-env.sh
3340

34-
echo ${assemblyJarName}
35-
set -x
41+
string_trim() {
42+
echo $1 | awk '{$1=$1;print}'
43+
}
44+
45+
export JVM_ARGS=$(string_trim "${variables_substitution}")
46+
3647
exec ${FLINK_HOME}/bin/flink run \
3748
${PARAMS} \
3849
-c io.github.interestinglab.waterdrop.WaterdropFlink \

common/src/main/java/io/github/interestinglab/waterdrop/common/config/CheckConfigUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ public class CheckConfigUtil {
1111

1212
public static CheckResult check(Config config, String... params) {
1313
for (String param : params) {
14-
if (!config.hasPath(param) || config.getString(param) == null || config.getString(param).trim().isEmpty()) {
15-
return new CheckResult(false, "please specify [" + param + "] as non-empty string");
14+
if (!config.hasPath(param) || config.getAnyRef(param) == null) {
15+
return new CheckResult(false, "please specify [" + param + "] as non-empty");
1616
}
1717
}
1818
return new CheckResult(true,"");
419 KB
Loading
86.5 KB
Loading

docs/zh-cn/images/flink/yarn.jpg

94.3 KB
Loading

docs/zh-cn/v2/flink/commands/start-waterdrop-flink.sh.md

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,55 @@
22

33

44
```bash
5-
bin/start-waterdrop-flink.sh -c config-path [other params]
5+
bin/start-waterdrop-flink.sh -c config-path -i key=value [other params]
66
```
7-
> 使用 `-c`或者`--config`来指定配置文件的路径。其余参数参考flink原始参数,查看flink参数方法:`flink run -h`,参数可以根据需求任意添加,如`-m yarn-cluster`则指定为on yarn模式。
7+
> 使用 `-c`或者`--config`来指定配置文件的路径。
8+
9+
> 使用 `-i` 或者 `--variable` 来指定配置文件中的变量,可以配置多个
10+
11+
```
12+
env {
13+
execution.parallelism = 1
14+
}
15+
16+
source {
17+
FakeSourceStream {
18+
result_table_name = "fake"
19+
field_name = "name,age"
20+
}
21+
}
22+
23+
transform {
24+
sql {
25+
sql = "select name,age from fake where name='"${my_name}"'"
26+
}
27+
}
28+
29+
sink {
30+
ConsoleSink {}
31+
}
32+
```
33+
34+
```bash
35+
bin/start-waterdrop-flink.sh -c config-path -i my_name=kid-xiong
36+
```
37+
这样指定将会把配置文件中的`"${my_name}"`替换为`kid-xiong`
38+
39+
> 其余参数参考flink原始参数,查看flink参数方法:`flink run -h`,参数可以根据需求任意添加,如`-m yarn-cluster`则指定为on yarn模式。
40+
41+
```bash
42+
flink run -h
43+
```
44+
* flink standalone 可配置的参数
45+
![standalone](../../../images/flink/standalone.jpg)
46+
例如:-p 2 指定作业并行度为2
47+
```bash
48+
bin/start-waterdrop-flink.sh -p 2 -c config-path
49+
```
50+
51+
* flink yarn-cluster 可配置参数
52+
![yarn-cluster](../../../images/flink/yarn.jpg)
53+
例如:-m yarn-cluster -ynm waterdrop 指定作业在运行在yarn上,并且yarn webUI的名称为waterdrop
54+
```bash
55+
bin/start-waterdrop-flink.sh -m yarn-cluster -ynm waterdrop -c config-path
56+
```

docs/zh-cn/v2/flink/configuration/README.md

Lines changed: 76 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2,69 +2,96 @@
22

33
# 通用配置
44

5-
## 核心概念
6-
7-
8-
---
95

106
## 配置文件
117

128
一个完整的Waterdrop配置包含`env`, `source`, `transform`, `sink`, 即:
139

1410
```
15-
env {
16-
...
17-
}
11+
env {
12+
...
13+
}
14+
15+
source {
16+
...
17+
}
18+
19+
transform {
20+
...
21+
}
22+
23+
sink {
24+
...
25+
}
1826
19-
source {
20-
...
21-
}
22-
23-
transform {
24-
...
25-
}
27+
```
2628

27-
sink {
28-
...
29-
}
29+
* `env`是flink任务的相关的配置,例如设置时间为event-time还是process-time
3030

31+
```
32+
env {
33+
execution.parallelism = 1 #设置任务的整体并行度为1
34+
execution.checkpoint.interval = 10000 #设置任务checkpoint的频率
35+
execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" #设置checkpoint的路径
36+
}
3137
```
3238

33-
* `env`是运行环境相关的配置,以下是env可配置项,具体含义可以参照[flink官网配置](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/execution_configuration.html)
39+
* 以下是env可配置项,具体含义可以参照[flink官网配置](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/execution_configuration.html)
3440
```java
35-
public class ConfigKeyName {
36-
public final static String TIME_CHARACTERISTIC = "execution.time-characteristic";
37-
public final static String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout";
38-
public final static String PARALLELISM = "execution.parallelism";
39-
public final static String MAX_PARALLELISM = "execution.max-parallelism";
40-
public final static String CHECKPOINT_INTERVAL = "execution.checkpoint.interval";
41-
public final static String CHECKPOINT_MODE = "execution.checkpoint.mode";
42-
public final static String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout";
43-
public final static String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri";
44-
public final static String MAX_CONCURRENT_CHECKPOINTS = "execution.max-concurrent-checkpoints";
45-
public final static String CHECKPOINT_CLEANUP_MODE = "execution.checkpoint.cleanup-mode";
46-
public final static String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpoint.min-pause";
47-
public final static String FAIL_ON_CHECKPOINTING_ERRORS = "execution.checkpoint.fail-on-error";
48-
public final static String RESTART_STRATEGY = "execution.restart.strategy";
49-
public final static String RESTART_ATTEMPTS = "execution.restart.attempts";
50-
public final static String RESTART_DELAY_BETWEEN_ATTEMPTS = "execution.restart.delayBetweenAttempts";
51-
public final static String RESTART_FAILURE_INTERVAL = "execution.restart.failureInterval";
52-
public final static String RESTART_FAILURE_RATE = "execution.restart.failureRate";
53-
public final static String RESTART_DELAY_INTERVAL = "execution.restart.delayInterval";
54-
public final static String MAX_STATE_RETENTION_TIME = "execution.query.state.max-retention";
55-
public final static String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention";
56-
public final static String STATE_BACKEND = "execution.state.backend";
57-
58-
}
41+
public class ConfigKeyName {
42+
//the time characteristic for all streams create from this environment, e.g., processing-time,event-time,ingestion-time
43+
public final static String TIME_CHARACTERISTIC = "execution.time-characteristic";
44+
//the maximum time frequency (milliseconds) for the flushing of the output buffers
45+
public final static String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout";
46+
//the parallelism for operations executed through this environment
47+
public final static String PARALLELISM = "execution.parallelism";
48+
//the maximum degree of parallelism to be used for the program
49+
public final static String MAX_PARALLELISM = "execution.max-parallelism";
50+
//enables checkpointing for the streaming job,time interval between state checkpoints in milliseconds
51+
public final static String CHECKPOINT_INTERVAL = "execution.checkpoint.interval";
52+
//the checkpointing mode (exactly-once vs. at-least-once)
53+
public final static String CHECKPOINT_MODE = "execution.checkpoint.mode";
54+
//the maximum time that a checkpoint may take before being discarded
55+
public final static String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout";
56+
//a file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
57+
public final static String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri";
58+
//the maximum number of checkpoint attempts that may be in progress at the same time
59+
public final static String MAX_CONCURRENT_CHECKPOINTS = "execution.max-concurrent-checkpoints";
60+
//enables checkpoints to be persisted externally,delete externalized checkpoints on job cancellation (e.g., true,false)
61+
public final static String CHECKPOINT_CLEANUP_MODE = "execution.checkpoint.cleanup-mode";
62+
//the minimal pause before the next checkpoint is triggere
63+
public final static String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpoint.min-pause";
64+
//the tolerable checkpoint failure number
65+
public final static String FAIL_ON_CHECKPOINTING_ERRORS = "execution.checkpoint.fail-on-error";
66+
//the restart strategy to be used for recovery (e.g., 'no' , 'fixed-delay', 'failure-rate')
67+
//no -> no restart strategy
68+
//fixed-delay -> fixed delay restart strategy
69+
//failure-rate -> failure rate restart strategy
70+
public final static String RESTART_STRATEGY = "execution.restart.strategy";
71+
//number of restart attempts for the fixed delay restart strategy
72+
public final static String RESTART_ATTEMPTS = "execution.restart.attempts";
73+
//delay in-between restart attempts for the delay restart strategy
74+
public final static String RESTART_DELAY_BETWEEN_ATTEMPTS = "execution.restart.delayBetweenAttempts";
75+
//time interval for failures
76+
public final static String RESTART_FAILURE_INTERVAL = "execution.restart.failureInterval";
77+
//maximum number of restarts in given interval for the failure rate restart strategy
78+
public final static String RESTART_FAILURE_RATE = "execution.restart.failureRate";
79+
//delay in-between restart attempts for the failure rate restart strategy
80+
public final static String RESTART_DELAY_INTERVAL = "execution.restart.delayInterval";
81+
//the maximum time interval for which idle state is retained
82+
public final static String MAX_STATE_RETENTION_TIME = "execution.query.state.max-retention";
83+
//the minimum time interval for which idle state is retained
84+
public final static String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention";
85+
//the state backend ('rocksdb','fs')
86+
public final static String STATE_BACKEND = "execution.state.backend";
87+
88+
}
5989
```
6090

6191

62-
* `source`可配置任意的input插件及其参数,具体参数随不同的input插件而变化。
63-
64-
* `transform`可配置任意的filter插件及其参数,具体参数随不同的filter插件而变化。
65-
66-
transform中的多个插件按配置顺序形成了数据处理的pipeline, 默认上一个transform的输出是下一个transform的输入。但也可以通过source_table_name控制。
92+
* `source`可配置任意的source插件及其参数,具体参数随不同的source插件而变化。
6793

68-
* `sink`可配置任意的output插件及其参数,具体参数随不同的output插件而变化。
94+
* `transform`可配置任意的transform插件及其参数,具体参数随不同的transform插件而变化。transform中的多个插件按配置顺序形成了数据处理的pipeline, 默认上一个transform的输出是下一个transform的输入,但也可以通过source_table_name控制。
95+
* `transform`处理完的数据,会发送给`sink`中配置的每个插件。
96+
* `sink`可配置任意的sink插件及其参数,具体参数随不同的sink插件而变化。
6997

70-
* `transform`处理完的数据,会发送给`sink`中配置的每个插件。

docs/zh-cn/v2/flink/configuration/sink-plugins/Console.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,8 @@ None
1414

1515
```
1616
ConsoleSink{}
17-
```
17+
```
18+
19+
### Note
20+
flink的console输出在flink的web UI
21+
![flink_console](../../../../images/flink/flink-console.png)
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
## Source plugin : Socket [Flink]
2+
3+
* Author: InterestingLab
4+
* Homepage: https://interestinglab.github.io/waterdrop
5+
* Version: 2.0.0
6+
7+
### Description
8+
> Socket作为数据源
9+
10+
### Options
11+
| name | type | required | default value |
12+
| --- | --- | --- | --- |
13+
| [host](#host-string) | string | no | localhost |
14+
| [port](#port-int) | int | no | 9999 |
15+
16+
##### host [string]
17+
18+
socket server hostname
19+
20+
##### port [int]
21+
22+
socket server port
23+
24+
### Examples
25+
```
26+
source {
27+
SocketStream{
28+
result_table_name = "socket"
29+
field_name = "info"
30+
}
31+
}
32+
```

0 commit comments

Comments
 (0)