-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Closed
Labels
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
the problem seems like #6854 I can't find solution.
version is 2.3.5 and has merged #6688
When I set sink.enable-2pc=false, job is OK.
But when I set sink.enable-2pc=true, job will failed and exception is:
25/03/17 16:27:46 INFO InternalParquetRecordReader: Assembled and processed 13847816 records from 13 columns in 77705 ms: 178.2101 rec/ms, 2316.7312 cell/ms
25/03/17 16:27:46 INFO InternalParquetRecordReader: time spent so far 4% reading (3902 ms) and 95% processing (77705 ms)
25/03/17 16:27:46 INFO InternalParquetRecordReader: at row 13847816. reading next block
25/03/17 16:27:46 INFO InternalParquetRecordReader: block read in memory in 455 ms. row count = 4088733
25/03/17 16:28:06 INFO RetryExec: I/O exception (java.net.SocketException) caught when processing request to {}->http://dataarch-bjy-d5-154.idchb1az2.hb1.kwaidc.com:8040: Broken pipe (Write failed)
25/03/17 16:28:09 ERROR DorisSinkWriter: stream load finished unexpectedly, interrupt worker thread! org.apache.http.client.ClientProtocolException
25/03/17 16:28:09 ERROR Utils: Aborting task
java.lang.RuntimeException: java.lang.InterruptedException
at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:57)
at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.writeRecord(DorisStreamLoad.java:194)
at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:135)
at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:52)
at org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:66)
at org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:41)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1422)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:95)
at org.apache.spark.scheduler.Task.run(Task.scala:124)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:495)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1388)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
at org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.write(RecordBuffer.java:93)
at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:55)
... 19 more
25/03/17 16:28:09 ERROR DataWritingSparkTask: Aborting commit for partition 4 (task 4, attempt 0stage 0.0)
25/03/17 16:28:09 INFO DorisStreamLoad: abort for labelSuffix s_distribute_shelf_zsm_2da1507d273144e384e7bdbe1c35aad6_4. start chkId 1.
SeaTunnel Version
2.3.5 and has merged #6688
SeaTunnel Config
env {
parallelism = ${TASK_NUM}
job.mode = "BATCH"
spark.app.name = "hive2bleem-template"
}
source {
Hive {
table_name = ${HIVE_TBL}
metastore_uri = ${HIVE_URL}
read_partitions = ${PARTS}
read_columns = ${COLS}
}
}
sink {
Doris {
table.identifier = ${BLEEM_TBL}
fenodes = ${BLEEM_FE}
username="root"
password=""
sink.label-prefix="s"
sink.enable-2pc=${enable-2pc}
doris.batch.size=200000
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}
Running Command
sparkV2 jobError Exception
25/03/17 16:27:46 INFO InternalParquetRecordReader: Assembled and processed 13847816 records from 13 columns in 77705 ms: 178.2101 rec/ms, 2316.7312 cell/ms
25/03/17 16:27:46 INFO InternalParquetRecordReader: time spent so far 4% reading (3902 ms) and 95% processing (77705 ms)
25/03/17 16:27:46 INFO InternalParquetRecordReader: at row 13847816. reading next block
25/03/17 16:27:46 INFO InternalParquetRecordReader: block read in memory in 455 ms. row count = 4088733
25/03/17 16:28:06 INFO RetryExec: I/O exception (java.net.SocketException) caught when processing request to {}->http://dataarch-bjy-d5-154.idchb1az2.hb1.kwaidc.com:8040: Broken pipe (Write failed)
25/03/17 16:28:09 ERROR DorisSinkWriter: stream load finished unexpectedly, interrupt worker thread! org.apache.http.client.ClientProtocolException
25/03/17 16:28:09 ERROR Utils: Aborting task
java.lang.RuntimeException: java.lang.InterruptedException
at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:57)
at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.writeRecord(DorisStreamLoad.java:194)
at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:135)
at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:52)
at org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:66)
at org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:41)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1422)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:95)
at org.apache.spark.scheduler.Task.run(Task.scala:124)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:495)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1388)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
at org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.write(RecordBuffer.java:93)
at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:55)
... 19 more
25/03/17 16:28:09 ERROR DataWritingSparkTask: Aborting commit for partition 4 (task 4, attempt 0stage 0.0)
25/03/17 16:28:09 INFO DorisStreamLoad: abort for labelSuffix s_distribute_shelf_zsm_2da1507d273144e384e7bdbe1c35aad6_4. start chkId 1.
Zeta or Flink or Spark Version
No response
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct