4848import java .util .List ;
4949import java .util .Map ;
5050
51+ import static org .apache .seatunnel .api .table .type .CommonOptions .EVENT_TIME ;
52+
5153@ DisabledOnOs (
5254 value = OS .WINDOWS ,
5355 disabledReason =
@@ -346,24 +348,33 @@ void testCanalJsonSink() throws IOException {
346348 Collections .emptyList (),
347349 "comment" );
348350
351+ Map <String , Object > rowOptions = new HashMap <>();
352+ rowOptions .put (EVENT_TIME .getName (), 1L );
353+
349354 SeaTunnelRow row1 = new SeaTunnelRow (new Object [] {1L , "A" , 100 });
350355 row1 .setRowKind (RowKind .INSERT );
351356 row1 .setTableId (TablePath .DEFAULT .getFullName ());
357+ row1 .setOptions (rowOptions );
352358 SeaTunnelRow row2 = new SeaTunnelRow (new Object [] {2L , "B" , 100 });
353359 row2 .setRowKind (RowKind .INSERT );
354360 row2 .setTableId (TablePath .DEFAULT .getFullName ());
361+ row2 .setOptions (rowOptions );
355362 SeaTunnelRow row3 = new SeaTunnelRow (new Object [] {3L , "C" , 100 });
356363 row3 .setRowKind (RowKind .INSERT );
357364 row3 .setTableId (TablePath .DEFAULT .getFullName ());
365+ row3 .setOptions (rowOptions );
358366 SeaTunnelRow row1UpdateBefore = new SeaTunnelRow (new Object [] {1L , "A" , 100 });
359367 row1UpdateBefore .setTableId (TablePath .DEFAULT .getFullName ());
360368 row1UpdateBefore .setRowKind (RowKind .UPDATE_BEFORE );
369+ row1UpdateBefore .setOptions (rowOptions );
361370 SeaTunnelRow row1UpdateAfter = new SeaTunnelRow (new Object [] {1L , "A_1" , 100 });
362371 row1UpdateAfter .setTableId (TablePath .DEFAULT .getFullName ());
363372 row1UpdateAfter .setRowKind (RowKind .UPDATE_AFTER );
373+ row1UpdateAfter .setOptions (rowOptions );
364374 SeaTunnelRow row2Delete = new SeaTunnelRow (new Object [] {2L , "B" , 100 });
365375 row2Delete .setTableId (TablePath .DEFAULT .getFullName ());
366376 row2Delete .setRowKind (RowKind .DELETE );
377+ row2Delete .setOptions (rowOptions );
367378
368379 SinkFlowTestUtils .runBatchWithCheckpointDisabled (
369380 catalogTable ,
@@ -379,22 +390,22 @@ void testCanalJsonSink() throws IOException {
379390 String dataStr = FileUtils .readFileToStr (path );
380391 Assertions .assertTrue (
381392 dataStr .contains (
382- "{\" data\" :[{\" a\" :1,\" b\" :\" A\" ,\" c\" :100}],\" type\" :\" INSERT\" }" ));
393+ "{\" data\" :[{\" a\" :1,\" b\" :\" A\" ,\" c\" :100}],\" type\" :\" INSERT\" , \" database \" : \" default \" , \" table \" : \" default \" , \" ts \" :1 }" ));
383394 Assertions .assertTrue (
384395 dataStr .contains (
385- "{\" data\" :[{\" a\" :2,\" b\" :\" B\" ,\" c\" :100}],\" type\" :\" INSERT\" }" ));
396+ "{\" data\" :[{\" a\" :2,\" b\" :\" B\" ,\" c\" :100}],\" type\" :\" INSERT\" , \" database \" : \" default \" , \" table \" : \" default \" , \" ts \" :1 }" ));
386397 Assertions .assertTrue (
387398 dataStr .contains (
388- "{\" data\" :[{\" a\" :3,\" b\" :\" C\" ,\" c\" :100}],\" type\" :\" INSERT\" }" ));
399+ "{\" data\" :[{\" a\" :3,\" b\" :\" C\" ,\" c\" :100}],\" type\" :\" INSERT\" , \" database \" : \" default \" , \" table \" : \" default \" , \" ts \" :1 }" ));
389400 Assertions .assertTrue (
390401 dataStr .contains (
391- "{\" data\" :[{\" a\" :1,\" b\" :\" A\" ,\" c\" :100}],\" type\" :\" DELETE\" }" ));
402+ "{\" data\" :[{\" a\" :1,\" b\" :\" A\" ,\" c\" :100}],\" type\" :\" DELETE\" , \" database \" : \" default \" , \" table \" : \" default \" , \" ts \" :1 }" ));
392403 Assertions .assertTrue (
393404 dataStr .contains (
394- "{\" data\" :[{\" a\" :1,\" b\" :\" A_1\" ,\" c\" :100}],\" type\" :\" INSERT\" }" ));
405+ "{\" data\" :[{\" a\" :1,\" b\" :\" A_1\" ,\" c\" :100}],\" type\" :\" INSERT\" , \" database \" : \" default \" , \" table \" : \" default \" , \" ts \" :1 }" ));
395406 Assertions .assertTrue (
396407 dataStr .contains (
397- "{\" data\" :[{\" a\" :2,\" b\" :\" B\" ,\" c\" :100}],\" type\" :\" DELETE\" }" ));
408+ "{\" data\" :[{\" a\" :2,\" b\" :\" B\" ,\" c\" :100}],\" type\" :\" DELETE\" , \" database \" : \" default \" , \" table \" : \" default \" , \" ts \" :1 }" ));
398409 }
399410
400411 @ Test
@@ -431,24 +442,33 @@ void testDebeziumJsonSink() throws IOException {
431442 Collections .emptyList (),
432443 "comment" );
433444
445+ Map <String , Object > rowOptions = new HashMap <>();
446+ rowOptions .put (EVENT_TIME .getName (), 1L );
447+
434448 SeaTunnelRow row1 = new SeaTunnelRow (new Object [] {1L , "A" , 100 });
435449 row1 .setRowKind (RowKind .INSERT );
436450 row1 .setTableId (TablePath .DEFAULT .getFullName ());
451+ row1 .setOptions (rowOptions );
437452 SeaTunnelRow row2 = new SeaTunnelRow (new Object [] {2L , "B" , 100 });
438453 row2 .setRowKind (RowKind .INSERT );
439454 row2 .setTableId (TablePath .DEFAULT .getFullName ());
455+ row2 .setOptions (rowOptions );
440456 SeaTunnelRow row3 = new SeaTunnelRow (new Object [] {3L , "C" , 100 });
441457 row3 .setRowKind (RowKind .INSERT );
442458 row3 .setTableId (TablePath .DEFAULT .getFullName ());
459+ row3 .setOptions (rowOptions );
443460 SeaTunnelRow row1UpdateBefore = new SeaTunnelRow (new Object [] {1L , "A" , 100 });
444461 row1UpdateBefore .setTableId (TablePath .DEFAULT .getFullName ());
445462 row1UpdateBefore .setRowKind (RowKind .UPDATE_BEFORE );
463+ row1UpdateBefore .setOptions (rowOptions );
446464 SeaTunnelRow row1UpdateAfter = new SeaTunnelRow (new Object [] {1L , "A_1" , 100 });
447465 row1UpdateAfter .setTableId (TablePath .DEFAULT .getFullName ());
448466 row1UpdateAfter .setRowKind (RowKind .UPDATE_AFTER );
467+ row1UpdateAfter .setOptions (rowOptions );
449468 SeaTunnelRow row2Delete = new SeaTunnelRow (new Object [] {2L , "B" , 100 });
450469 row2Delete .setTableId (TablePath .DEFAULT .getFullName ());
451470 row2Delete .setRowKind (RowKind .DELETE );
471+ row2Delete .setOptions (rowOptions );
452472
453473 SinkFlowTestUtils .runBatchWithCheckpointDisabled (
454474 catalogTable ,
@@ -464,22 +484,22 @@ void testDebeziumJsonSink() throws IOException {
464484 String dataStr = FileUtils .readFileToStr (path );
465485 Assertions .assertTrue (
466486 dataStr .contains (
467- "{\" before\" :null,\" after\" :{\" a\" :1,\" b\" :\" A\" ,\" c\" :100},\" op\" :\" c\" }" ));
487+ "{\" before\" :null,\" after\" :{\" a\" :1,\" b\" :\" A\" ,\" c\" :100},\" op\" :\" c\" , \" source \" :{ \" schema \" : \" default \" , \" database \" : \" default \" , \" table \" : \" default \" }, \" ts_ms \" :1 }" ));
468488 Assertions .assertTrue (
469489 dataStr .contains (
470- "{\" before\" :null,\" after\" :{\" a\" :2,\" b\" :\" B\" ,\" c\" :100},\" op\" :\" c\" }" ));
490+ "{\" before\" :null,\" after\" :{\" a\" :2,\" b\" :\" B\" ,\" c\" :100},\" op\" :\" c\" , \" source \" :{ \" schema \" : \" default \" , \" database \" : \" default \" , \" table \" : \" default \" }, \" ts_ms \" :1 }" ));
471491 Assertions .assertTrue (
472492 dataStr .contains (
473- "{\" before\" :null,\" after\" :{\" a\" :3,\" b\" :\" C\" ,\" c\" :100},\" op\" :\" c\" }" ));
493+ "{\" before\" :null,\" after\" :{\" a\" :3,\" b\" :\" C\" ,\" c\" :100},\" op\" :\" c\" , \" source \" :{ \" schema \" : \" default \" , \" database \" : \" default \" , \" table \" : \" default \" }, \" ts_ms \" :1 }" ));
474494 Assertions .assertTrue (
475495 dataStr .contains (
476- "{\" before\" :{\" a\" :1,\" b\" :\" A\" ,\" c\" :100},\" after\" :null,\" op\" :\" d\" }" ));
496+ "{\" before\" :{\" a\" :1,\" b\" :\" A\" ,\" c\" :100},\" after\" :null,\" op\" :\" d\" , \" source \" :{ \" schema \" : \" default \" , \" database \" : \" default \" , \" table \" : \" default \" }, \" ts_ms \" :1 }" ));
477497 Assertions .assertTrue (
478498 dataStr .contains (
479- "{\" before\" :null,\" after\" :{\" a\" :1,\" b\" :\" A_1\" ,\" c\" :100},\" op\" :\" c\" }" ));
499+ "{\" before\" :null,\" after\" :{\" a\" :1,\" b\" :\" A_1\" ,\" c\" :100},\" op\" :\" c\" , \" source \" :{ \" schema \" : \" default \" , \" database \" : \" default \" , \" table \" : \" default \" }, \" ts_ms \" :1 }" ));
480500 Assertions .assertTrue (
481501 dataStr .contains (
482- "{\" before\" :{\" a\" :2,\" b\" :\" B\" ,\" c\" :100},\" after\" :null,\" op\" :\" d\" }" ));
502+ "{\" before\" :{\" a\" :2,\" b\" :\" B\" ,\" c\" :100},\" after\" :null,\" op\" :\" d\" , \" source \" :{ \" schema \" : \" default \" , \" database \" : \" default \" , \" table \" : \" default \" }, \" ts_ms \" :1 }" ));
483503 }
484504
485505 @ Test
@@ -515,25 +535,33 @@ void testMaxWellJsonSink() throws IOException {
515535 Collections .emptyMap (),
516536 Collections .emptyList (),
517537 "comment" );
538+ Map <String , Object > rowOptions = new HashMap <>();
539+ rowOptions .put (EVENT_TIME .getName (), 1L );
518540
519541 SeaTunnelRow row1 = new SeaTunnelRow (new Object [] {1L , "A" , 100 });
520542 row1 .setRowKind (RowKind .INSERT );
521543 row1 .setTableId (TablePath .DEFAULT .getFullName ());
544+ row1 .setOptions (rowOptions );
522545 SeaTunnelRow row2 = new SeaTunnelRow (new Object [] {2L , "B" , 100 });
523546 row2 .setRowKind (RowKind .INSERT );
524547 row2 .setTableId (TablePath .DEFAULT .getFullName ());
548+ row2 .setOptions (rowOptions );
525549 SeaTunnelRow row3 = new SeaTunnelRow (new Object [] {3L , "C" , 100 });
526550 row3 .setRowKind (RowKind .INSERT );
527551 row3 .setTableId (TablePath .DEFAULT .getFullName ());
552+ row3 .setOptions (rowOptions );
528553 SeaTunnelRow row1UpdateBefore = new SeaTunnelRow (new Object [] {1L , "A" , 100 });
529554 row1UpdateBefore .setTableId (TablePath .DEFAULT .getFullName ());
530555 row1UpdateBefore .setRowKind (RowKind .UPDATE_BEFORE );
556+ row1UpdateBefore .setOptions (rowOptions );
531557 SeaTunnelRow row1UpdateAfter = new SeaTunnelRow (new Object [] {1L , "A_1" , 100 });
532558 row1UpdateAfter .setTableId (TablePath .DEFAULT .getFullName ());
533559 row1UpdateAfter .setRowKind (RowKind .UPDATE_AFTER );
560+ row1UpdateAfter .setOptions (rowOptions );
534561 SeaTunnelRow row2Delete = new SeaTunnelRow (new Object [] {2L , "B" , 100 });
535562 row2Delete .setTableId (TablePath .DEFAULT .getFullName ());
536563 row2Delete .setRowKind (RowKind .DELETE );
564+ row2Delete .setOptions (rowOptions );
537565
538566 SinkFlowTestUtils .runBatchWithCheckpointDisabled (
539567 catalogTable ,
@@ -548,17 +576,22 @@ void testMaxWellJsonSink() throws IOException {
548576 Path path = Paths .get ("/tmp/seatunnel/LocalFileTest/maxwell_json_file.maxwell_json" );
549577 String dataStr = FileUtils .readFileToStr (path );
550578 Assertions .assertTrue (
551- dataStr .contains ("{\" data\" :{\" a\" :1,\" b\" :\" A\" ,\" c\" :100},\" type\" :\" INSERT\" }" ));
579+ dataStr .contains (
580+ "{\" data\" :{\" a\" :1,\" b\" :\" A\" ,\" c\" :100},\" type\" :\" INSERT\" ,\" database\" :\" default\" ,\" table\" :\" default\" ,\" ts\" :1}" ));
552581 Assertions .assertTrue (
553- dataStr .contains ("{\" data\" :{\" a\" :2,\" b\" :\" B\" ,\" c\" :100},\" type\" :\" INSERT\" }" ));
582+ dataStr .contains (
583+ "{\" data\" :{\" a\" :2,\" b\" :\" B\" ,\" c\" :100},\" type\" :\" INSERT\" ,\" database\" :\" default\" ,\" table\" :\" default\" ,\" ts\" :1}" ));
554584 Assertions .assertTrue (
555- dataStr .contains ("{\" data\" :{\" a\" :3,\" b\" :\" C\" ,\" c\" :100},\" type\" :\" INSERT\" }" ));
585+ dataStr .contains (
586+ "{\" data\" :{\" a\" :3,\" b\" :\" C\" ,\" c\" :100},\" type\" :\" INSERT\" ,\" database\" :\" default\" ,\" table\" :\" default\" ,\" ts\" :1}" ));
556587 Assertions .assertTrue (
557- dataStr .contains ("{\" data\" :{\" a\" :1,\" b\" :\" A\" ,\" c\" :100},\" type\" :\" DELETE\" }" ));
588+ dataStr .contains (
589+ "{\" data\" :{\" a\" :1,\" b\" :\" A\" ,\" c\" :100},\" type\" :\" DELETE\" ,\" database\" :\" default\" ,\" table\" :\" default\" ,\" ts\" :1}" ));
558590 Assertions .assertTrue (
559591 dataStr .contains (
560- "{\" data\" :{\" a\" :1,\" b\" :\" A_1\" ,\" c\" :100},\" type\" :\" INSERT\" }" ));
592+ "{\" data\" :{\" a\" :1,\" b\" :\" A_1\" ,\" c\" :100},\" type\" :\" INSERT\" , \" database \" : \" default \" , \" table \" : \" default \" , \" ts \" :1 }" ));
561593 Assertions .assertTrue (
562- dataStr .contains ("{\" data\" :{\" a\" :2,\" b\" :\" B\" ,\" c\" :100},\" type\" :\" DELETE\" }" ));
594+ dataStr .contains (
595+ "{\" data\" :{\" a\" :2,\" b\" :\" B\" ,\" c\" :100},\" type\" :\" DELETE\" ,\" database\" :\" default\" ,\" table\" :\" default\" ,\" ts\" :1}" ));
563596 }
564597}
0 commit comments