-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Closed
Labels
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
Elasticsearch can automatically create indexes based on templates, such as creating indexes by year based on time, which eliminates the need for savemode. Therefore, the savemode of sink-es should be supported.
SeaTunnel Version
dev
SeaTunnel Config
env {
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
checkpoint.timeout = 600000
read_limit.rows_per_second=2000
}
source {
SqlServer-CDC {
result_table_name = "**"
username = "sa"
password = "****"
database-names = ["**"]
table-names = ["**.dbo.**"]
base-url = "jdbc:sqlserver://127.0.0.1:1433;databaseName=**"
}
}
transform {
Sql {
query = """
SELECT
'test_1' AS source,
FORMATDATETIME ( createTime, 'yyyy' ) AS index_key,
CONCAT ( 's_', CAST ( id AS string ) ) AS id
FROM
test_content
"""
}
}
sink {
Elasticsearch {
hosts = ["127.0.0.1:9200"]
index = "test_${index_key}"
username="elastic"
password="****"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
primary_keys=["id"]
}
}
Running Command
seatunnel.sh -c config/test_es.confError Exception
2024-08-19 13:34:32,351 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:211)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: java.util.concurrent.CompletionException: org.apache.seatunnel.engine.common.exception.JobException: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:377)
at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:383)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at org.apache.seatunnel.engine.server.master.JobMaster.init(JobMaster.java:242)
at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:499)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: Illegal character in path at index 17: /test_${index_key}
at org.elasticsearch.client.RestClient.buildUri(RestClient.java:569)
at org.elasticsearch.client.RestClient$InternalRequest.<init>(RestClient.java:699)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:234)
at org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient.createIndex(EsRestClient.java:438)
at org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient.createIndex(EsRestClient.java:428)
at org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchCatalog.createTable(ElasticSearchCatalog.java:185)
at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createTable(DefaultSaveModeHandler.java:186)
at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:112)
at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:69)
at org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:38)
at org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36)
at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:375)
... 20 more
Caused by: java.net.URISyntaxException: Illegal character in path at index 17: /test_${index_key}
at java.net.URI$Parser.fail(URI.java:2845)
at java.net.URI$Parser.checkChars(URI.java:3018)
at java.net.URI$Parser.parseHierarchical(URI.java:3102)
at java.net.URI$Parser.parse(URI.java:3060)
at java.net.URI.<init>(URI.java:588)
at org.apache.http.client.utils.URIBuilder.<init>(URIBuilder.java:82)
at org.elasticsearch.client.RestClient.buildUri(RestClient.java:563)
... 31 more
at com.hazelcast.spi.impl.AbstractInvocationFuture.wrapInCompletionException(AbstractInvocationFuture.java:1347)
at com.hazelcast.spi.impl.AbstractInvocationFuture.cascadeException(AbstractInvocationFuture.java:1340)
at com.hazelcast.spi.impl.AbstractInvocationFuture.access$200(AbstractInvocationFuture.java:65)
at com.hazelcast.spi.impl.AbstractInvocationFuture$ApplyNode.execute(AbstractInvocationFuture.java:1478)
at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockOtherNode(AbstractInvocationFuture.java:797)
at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockAll(AbstractInvocationFuture.java:759)
at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1235)
at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223)
at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionally(AbstractInvocationFuture.java:709)
at com.hazelcast.client.impl.spi.impl.ClientInvocation.completeExceptionally(ClientInvocation.java:294)
at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyExceptionWithOwnedPermission(ClientInvocation.java:321)
at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyException(ClientInvocation.java:304)
at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.handleResponse(ClientResponseHandlerSupplier.java:164)
at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.process(ClientResponseHandlerSupplier.java:141)
at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.access$300(ClientResponseHandlerSupplier.java:60)
at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:251)
at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:243)
at com.hazelcast.client.impl.connection.tcp.TcpClientConnection.handleClientMessage(TcpClientConnection.java:245)
at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.handleMessage(ClientMessageDecoder.java:135)
at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.onRead(ClientMessageDecoder.java:89)
at com.hazelcast.internal.networking.nio.NioInboundPipeline.process(NioInboundPipeline.java:136)
at com.hazelcast.internal.networking.nio.NioThread.processSelectionKey(NioThread.java:383)
at com.hazelcast.internal.networking.nio.NioThread.processSelectionKeys(NioThread.java:368)
at com.hazelcast.internal.networking.nio.NioThread.selectLoop(NioThread.java:294)
at com.hazelcast.internal.networking.nio.NioThread.executeRun(NioThread.java:249)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: org.apache.seatunnel.engine.common.exception.JobException: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:377)
at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:383)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at org.apache.seatunnel.engine.server.master.JobMaster.init(JobMaster.java:242)
at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:499)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Zeta or Flink or Spark Version
No response
Java or Scala Version
jdk1.8
Screenshots
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct