Skip to content

Commit 74f7432

Browse files
hailin0chaorongzhi
authored andcommitted
[Feature][Core] Support event listener for job (apache#6419)
1 parent 244dc8a commit 74f7432

File tree

100 files changed

+1883
-115
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

100 files changed

+1883
-115
lines changed

docs/en/concept/event-listener.md

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# Event Listener
2+
3+
## Introduction
4+
5+
The SeaTunnel provides a rich event listening feature that allows you to manage the status at which data is synchronized.
6+
This functionality is crucial when you need to listen job running status(`org.apache.seatunnel.api.event`).
7+
This document will guide you through the usage of these parameters and how to leverage them effectively.
8+
9+
## Support Those Engines
10+
11+
> SeaTunnel Zeta<br/>
12+
> Flink<br/>
13+
> Spark<br/>
14+
15+
## API
16+
17+
The event API is defined in the `org.apache.seatunnel.api.event` package.
18+
19+
### Event Data API
20+
21+
- `org.apache.seatunnel.api.event.Event` - The interface for event data.
22+
- `org.apache.seatunnel.api.event.EventType` - The enum for event type.
23+
24+
### Event Listener API
25+
26+
You can customize event handler, such as sending events to external systems
27+
28+
- `org.apache.seatunnel.api.event.EventHandler` - The interface for event handler, SPI will automatically load subclass from the classpath.
29+
30+
### Event Collect API
31+
32+
- `org.apache.seatunnel.api.source.SourceSplitEnumerator` - Attached event listener API to report events from `SourceSplitEnumerator`.
33+
34+
```java
35+
package org.apache.seatunnel.api.source;
36+
37+
public interface SourceSplitEnumerator {
38+
39+
interface Context {
40+
41+
/**
42+
* Get the {@link org.apache.seatunnel.api.event.EventListener} of this enumerator.
43+
*
44+
* @return
45+
*/
46+
EventListener getEventListener();
47+
}
48+
}
49+
```
50+
51+
- `org.apache.seatunnel.api.source.SourceReader` - Attached event listener API to report events from `SourceReader`.
52+
53+
```java
54+
package org.apache.seatunnel.api.source;
55+
56+
public interface SourceReader {
57+
58+
interface Context {
59+
60+
/**
61+
* Get the {@link org.apache.seatunnel.api.event.EventListener} of this reader.
62+
*
63+
* @return
64+
*/
65+
EventListener getEventListener();
66+
}
67+
}
68+
```
69+
70+
- `org.apache.seatunnel.api.sink.SinkWriter` - Attached event listener API to report events from `SinkWriter`.
71+
72+
```java
73+
package org.apache.seatunnel.api.sink;
74+
75+
public interface SinkWriter {
76+
77+
interface Context {
78+
79+
/**
80+
* Get the {@link org.apache.seatunnel.api.event.EventListener} of this writer.
81+
*
82+
* @return
83+
*/
84+
EventListener getEventListener();
85+
}
86+
}
87+
```
88+
89+
## Configuration Listener
90+
91+
To use the event listening feature, you need to configure engine config.
92+
93+
### Zeta Engine
94+
95+
Example config in your config file(seatunnel.yaml):
96+
97+
```
98+
seatunnel:
99+
engine:
100+
event-report-http:
101+
url: "http://example.com:1024/event/report"
102+
headers:
103+
Content-Type: application/json
104+
```
105+
106+
### Flink Engine
107+
108+
You can define the implementation class of `org.apache.seatunnel.api.event.EventHandler` interface and add to the classpath to automatically load it through SPI.
109+
110+
Support flink version: 1.14.0+
111+
112+
Example: `org.apache.seatunnel.api.event.LoggingEventHandler`
113+
114+
### Spark Engine
115+
116+
You can define the implementation class of `org.apache.seatunnel.api.event.EventHandler` interface and add to the classpath to automatically load it through SPI.

docs/sidebars.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ const sidebars = {
9090
"concept/connector-v2-features",
9191
'concept/schema-feature',
9292
'concept/JobEnvConfig',
93-
'concept/speed-limit'
93+
'concept/speed-limit',
94+
'concept/event-listener'
9495
]
9596
},
9697
"Connector-v2-release-state",
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.event;
19+
20+
import lombok.AllArgsConstructor;
21+
import lombok.extern.slf4j.Slf4j;
22+
23+
import java.util.List;
24+
25+
@Slf4j
26+
@AllArgsConstructor
27+
public class DefaultEventProcessor implements EventListener, EventProcessor {
28+
private final String jobId;
29+
private final List<EventHandler> handlers;
30+
31+
public DefaultEventProcessor() {
32+
this(DefaultEventProcessor.class.getClassLoader());
33+
}
34+
35+
public DefaultEventProcessor(String jobId) {
36+
this(jobId, EventProcessor.loadEventHandlers(DefaultEventProcessor.class.getClassLoader()));
37+
}
38+
39+
public DefaultEventProcessor(ClassLoader classLoader) {
40+
this(null, EventProcessor.loadEventHandlers(classLoader));
41+
}
42+
43+
@Override
44+
public void process(Event event) {
45+
handlers.forEach(listener -> listener.handle(event));
46+
}
47+
48+
@Override
49+
public void onEvent(Event event) {
50+
if (jobId != null) {
51+
event.setJobId(jobId);
52+
}
53+
process(event);
54+
}
55+
56+
@Override
57+
public void close() throws Exception {
58+
log.info("Closing event handlers.");
59+
EventProcessor.close(handlers);
60+
}
61+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.event;
19+
20+
import java.io.Serializable;
21+
22+
public interface Event extends Serializable {
23+
24+
long getCreatedTime();
25+
26+
void setJobId(String jobId);
27+
28+
String getJobId();
29+
30+
EventType getEventType();
31+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.event;
19+
20+
import java.io.Serializable;
21+
22+
public interface EventHandler extends Serializable, AutoCloseable {
23+
24+
/**
25+
* Receive and handle the event data.
26+
*
27+
* @param event
28+
*/
29+
void handle(Event event);
30+
31+
@Override
32+
default void close() throws Exception {}
33+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.event;
19+
20+
import java.io.Serializable;
21+
22+
public interface EventListener extends Serializable {
23+
void onEvent(Event event);
24+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.event;
19+
20+
import java.util.LinkedList;
21+
import java.util.List;
22+
import java.util.ServiceConfigurationError;
23+
import java.util.ServiceLoader;
24+
25+
public interface EventProcessor extends AutoCloseable {
26+
void process(Event event);
27+
28+
static List<EventHandler> loadEventHandlers(ClassLoader classLoader) {
29+
try {
30+
List<EventHandler> result = new LinkedList<>();
31+
ServiceLoader.load(EventHandler.class, classLoader)
32+
.iterator()
33+
.forEachRemaining(result::add);
34+
return result;
35+
} catch (ServiceConfigurationError e) {
36+
throw new RuntimeException("Could not load service provider for event handlers.", e);
37+
}
38+
}
39+
40+
static void close(List<EventHandler> handlers) throws Exception {
41+
if (handlers != null) {
42+
for (EventHandler handler : handlers) {
43+
handler.close();
44+
}
45+
}
46+
}
47+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.event;
19+
20+
public enum EventType {
21+
SCHEMA_CHANGE_ADD_COLUMN,
22+
SCHEMA_CHANGE_DROP_COLUMN,
23+
SCHEMA_CHANGE_MODIFY_COLUMN,
24+
SCHEMA_CHANGE_CHANGE_COLUMN,
25+
SCHEMA_CHANGE_UPDATE_COLUMNS,
26+
SCHEMA_CHANGE_RENAME_TABLE,
27+
LIFECYCLE_ENUMERATOR_OPEN,
28+
LIFECYCLE_ENUMERATOR_CLOSE,
29+
LIFECYCLE_READER_OPEN,
30+
LIFECYCLE_READER_CLOSE,
31+
LIFECYCLE_WRITER_CLOSE,
32+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.event;
19+
20+
public interface LifecycleEvent extends Event {}

0 commit comments

Comments
 (0)