@@ -662,13 +662,13 @@ public void testGetMultiTableJobMetrics() {
662662 Assertions .assertEquals (
663663 totalCount .get (SOURCE_RECEIVED_COUNT ),
664664 tableCount .entrySet ().stream ()
665- .filter (e -> e .getKey ().startsWith (SOURCE_RECEIVED_COUNT ))
665+ .filter (e -> e .getKey ().startsWith (SOURCE_RECEIVED_COUNT + "#" ))
666666 .mapToLong (Map .Entry ::getValue )
667667 .sum ());
668668 Assertions .assertEquals (
669669 totalCount .get (SINK_WRITE_COUNT ),
670670 tableCount .entrySet ().stream ()
671- .filter (e -> e .getKey ().startsWith (SINK_WRITE_COUNT ))
671+ .filter (e -> e .getKey ().startsWith (SINK_WRITE_COUNT + "#" ))
672672 .mapToLong (Map .Entry ::getValue )
673673 .sum ());
674674 Assertions .assertEquals (
@@ -684,18 +684,59 @@ public void testGetMultiTableJobMetrics() {
684684 .mapToLong (Map .Entry ::getValue )
685685 .sum ());
686686 // Instantaneous rates in the same direction are directly added
687- Assertions .assertEquals (
688- totalCount .get (SOURCE_RECEIVED_QPS ),
689- tableCount .entrySet ().stream ()
690- .filter (e -> e .getKey ().startsWith (SOURCE_RECEIVED_QPS + "#" ))
691- .mapToLong (Map .Entry ::getValue )
692- .sum ());
693- Assertions .assertEquals (
694- totalCount .get (SINK_WRITE_QPS ),
695- tableCount .entrySet ().stream ()
696- .filter (e -> e .getKey ().startsWith (SINK_WRITE_QPS + "#" ))
697- .mapToLong (Map .Entry ::getValue )
698- .sum ());
687+ // The size does not fluctuate more than %2 of the total value
688+ Assertions .assertTrue (
689+ Math .abs (
690+ totalCount .get (SOURCE_RECEIVED_QPS )
691+ - tableCount .entrySet ().stream ()
692+ .filter (
693+ e ->
694+ e .getKey ()
695+ .startsWith (
696+ SOURCE_RECEIVED_QPS
697+ + "#" ))
698+ .mapToLong (Map .Entry ::getValue )
699+ .sum ())
700+ < totalCount .get (SOURCE_RECEIVED_QPS ) * 0.02 );
701+ Assertions .assertTrue (
702+ Math .abs (
703+ totalCount .get (SINK_WRITE_QPS )
704+ - tableCount .entrySet ().stream ()
705+ .filter (
706+ e ->
707+ e .getKey ()
708+ .startsWith (
709+ SINK_WRITE_QPS
710+ + "#" ))
711+ .mapToLong (Map .Entry ::getValue )
712+ .sum ())
713+ < totalCount .get (SINK_WRITE_QPS ) * 0.02 );
714+ Assertions .assertTrue (
715+ Math .abs (
716+ totalCount .get (SOURCE_RECEIVED_BYTES_PER_SECONDS )
717+ - tableCount .entrySet ().stream ()
718+ .filter (
719+ e ->
720+ e .getKey ()
721+ .startsWith (
722+ SOURCE_RECEIVED_BYTES_PER_SECONDS
723+ + "#" ))
724+ .mapToLong (Map .Entry ::getValue )
725+ .sum ())
726+ < totalCount .get (SOURCE_RECEIVED_BYTES_PER_SECONDS ) * 0.02 );
727+ Assertions .assertTrue (
728+ Math .abs (
729+ totalCount .get (SINK_WRITE_BYTES_PER_SECONDS )
730+ - tableCount .entrySet ().stream ()
731+ .filter (
732+ e ->
733+ e .getKey ()
734+ .startsWith (
735+ SINK_WRITE_BYTES_PER_SECONDS
736+ + "#" ))
737+ .mapToLong (Map .Entry ::getValue )
738+ .sum ())
739+ < totalCount .get (SINK_WRITE_BYTES_PER_SECONDS ) * 0.02 );
699740
700741 } catch (ExecutionException | InterruptedException | JsonProcessingException e ) {
701742 throw new RuntimeException (e );
0 commit comments