-
Notifications
You must be signed in to change notification settings - Fork 6.9k
postgresql binary support #36372
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
postgresql binary support #36372
Conversation
|
@terrymanu @RaigorJiang import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.bind.protocol.util.codec.decoder.PgBinaryObj;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.bind.protocol.util.codec.encoder.*;
import org.postgresql.core.Oid;
import org.postgresql.core.QueryExecutor;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.jdbc.PgStatement;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.Arrays;
import java.util.TimeZone;
public class ClientTest {
public static void main(String[] args) throws IOException {
String url = "jdbc:postgresql://localhost:3307/test";
String username = "root";
String password = "root";
Short[][] param1 = new Short[2][4];
param1[0][0] = 1;
param1[0][1] = 2;
param1[1][0] = 3;
Integer[][] param2 = new Integer[2][4];
param2[0][0] = 1;
param2[0][1] = 2;
param2[1][0] = 3;
Long[][] param3 = new Long[2][4];
param3[0][0] = 1L;
param3[0][1] = 2L;
param3[1][0] = 3L;
Float[][] param4 = new Float[2][4];
param4[0][0] = 1.1F;
param4[0][1] = 2.2F;
param4[1][0] = Float.NaN;
param4[1][1] = Float.POSITIVE_INFINITY;
param4[1][2] = Float.NEGATIVE_INFINITY;
Double[][] param5 = new Double[2][4];
param5[0][0] = 1.1;
param5[0][1] = 2.2;
param5[1][0] = Double.NaN;
param5[1][1] = Double.POSITIVE_INFINITY;
param5[1][2] = Double.NEGATIVE_INFINITY;
Number[][] param6 = new Number[2][4];
param6[0][0] = new BigDecimal("1.1");
param6[0][1] = new BigDecimal("2.2");
param6[1][0] = Double.NaN;
param6[1][1] = Double.POSITIVE_INFINITY;
param6[1][2] = Double.NEGATIVE_INFINITY;
Boolean[][] param7 = new Boolean[2][4];
param7[0][0] = true;
param7[0][1] = false;
String[][] param8 = new String[2][4];
param8[0][0] = "aaa";
param8[0][1] = "bbbb";
String[][] param9 = new String[2][4];
param9[0][0] = "aaa";
param9[0][1] = "bbbb";
byte[][][] param10 = new byte[2][4][];
param10[0][0] = new byte[10];
param10[0][1] = "aaa".getBytes(StandardCharsets.UTF_8);
String query = "SELECT\n" +
" ?::int2[] AS int2_array, -- smallint\n" +
" ?::int4[] AS int4_array, -- integer\n" +
" ?::int8[] AS int8_array, -- bigint\n" +
" ?::float4[] AS float4_array, -- real\n" +
" ?::float8[] AS float8_array, -- double precision\n" +
" ?::numeric[] AS numeric_array, -- numeric / decimal\n" +
" ?::bool[] AS bool_array, -- boolean\n" +
" ?::text[] AS text_array, -- text\n" +
" ?::varchar[] AS varchar_array, -- varchar\n" +
" ?::bytea[] AS bytea_array, -- bytea\n" +
" ?::date[] AS date_array, -- date\n" +
" ?::time[] AS time_array, -- time without time zone\n" +
" ?::timestamp[] AS timestamp_array, -- timestamp\n" +
" ?::bool AS boolv, -- bool\n" +
" ?::bytea AS byteav, -- bytea\n" +
" ?::date AS datev, -- date\n" +
" ?::float8 AS float8v, -- float8\n" +
" ?::float4 AS float4v, -- float4\n" +
" ?::int2 AS int2v, -- int2\n" +
" ?::int4 AS int4v, -- int4\n" +
" ?::int8 AS int8v, -- int8\n" +
" ?::numeric AS numericv, -- numeric\n" +
" ?::text AS textv, -- text\n" +
" ?::varchar AS varcharv, -- varchar\n" +
" ?::time AS timev, -- time\n" +
" ?::timestamp AS timestampv -- timestamp\n";
int[] arrayOids = new int[]{Oid.INT2_ARRAY, Oid.INT4_ARRAY, Oid.INT8_ARRAY, Oid.FLOAT4_ARRAY, Oid.FLOAT8_ARRAY, Oid.NUMERIC_ARRAY, Oid.BOOL_ARRAY, Oid.TEXT_ARRAY, Oid.VARCHAR_ARRAY, Oid.BYTEA_ARRAY};
String[] arrayOidNames = new String[]{"int2[]", "int4[]", "int8[]", "float4[]", "float8[]", "numeric[]", "bool[]", "text[]", "varchar[]", "bytea[]"};
byte[][] sendResult = new byte[26][];
Object[] params = new Object[]{param1, param2, param3, param4, param5, param6, param7, param8, param9, param10};
try (Connection connection = DriverManager.getConnection(url, username, password)) {
PgConnection pgConnection = (PgConnection) connection;
QueryExecutor queryExecutor = pgConnection.getQueryExecutor();
queryExecutor.addBinarySendOid(Oid.DATE);
queryExecutor.addBinarySendOid(Oid.DATE_ARRAY);
queryExecutor.addBinarySendOid(Oid.TIME);
queryExecutor.addBinarySendOid(Oid.TIME_ARRAY);
queryExecutor.addBinarySendOid(Oid.BOOL_ARRAY);
queryExecutor.addBinarySendOid(Oid.TIMESTAMP);
queryExecutor.addBinarySendOid(Oid.TIMESTAMP_ARRAY);
queryExecutor.addBinarySendOid(Oid.NUMERIC_ARRAY);
queryExecutor.addBinarySendOid(Oid.VARCHAR);
queryExecutor.addBinarySendOid(Oid.TEXT);
queryExecutor.addBinarySendOid(Oid.BOOL);
queryExecutor.addBinaryReceiveOid(Oid.DATE);
queryExecutor.addBinaryReceiveOid(Oid.DATE_ARRAY);
queryExecutor.addBinaryReceiveOid(Oid.TIME);
queryExecutor.addBinaryReceiveOid(Oid.TIME_ARRAY);
queryExecutor.addBinaryReceiveOid(Oid.BOOL_ARRAY);
queryExecutor.addBinaryReceiveOid(Oid.TIMESTAMP);
queryExecutor.addBinaryReceiveOid(Oid.TIMESTAMP_ARRAY);
queryExecutor.addBinaryReceiveOid(Oid.NUMERIC_ARRAY);
queryExecutor.addBinaryReceiveOid(Oid.VARCHAR);
queryExecutor.addBinaryReceiveOid(Oid.TEXT);
queryExecutor.addBinaryReceiveOid(Oid.BOOL);
PreparedStatement preparedStatement = connection.prepareStatement(query);
PgStatement pgStatement = (PgStatement) preparedStatement;
pgStatement.setPrepareThreshold(-1);
for (int i = 0; i < arrayOids.length; i++) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ArrayEncoding.getArrayEncoder(params[i]).toBinaryRepresentation((Object[]) params[i], arrayOids[i], baos);
byte[] byteArray = baos.toByteArray();
PgBinaryObj pgBinaryObj = new PgBinaryObj(byteArray);
pgBinaryObj.setType(arrayOidNames[i]);
sendResult[i] = byteArray;
preparedStatement.setObject(i + 1, pgBinaryObj);
}
Date[][] dates = new Date[2][4];
//1970-01-01
dates[0][0] = new Date(-TimeZone.getDefault().getRawOffset());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ArrayEncoding.getArrayEncoder(dates).toBinaryRepresentation(dates, Oid.DATE_ARRAY, baos);
byte[] byteArray = baos.toByteArray();
PgBinaryObj dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("date[]");
sendResult[10] = byteArray;
preparedStatement.setObject(11, dateBinary);
baos.reset();
Time[][] times = new Time[2][4];
//time0
times[0][0] = new Time(-TimeZone.getDefault().getRawOffset());
ArrayEncoding.getArrayEncoder(times).toBinaryRepresentation(times, Oid.TIME_ARRAY, baos);
byteArray = baos.toByteArray();
dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("time[]");
sendResult[11] = byteArray;
preparedStatement.setObject(12, dateBinary);
baos.reset();
Timestamp[][] timestamps = new Timestamp[2][4];
//timestamps0 1970-01-01
timestamps[0][0] = new Timestamp(-TimeZone.getDefault().getRawOffset());
ArrayEncoding.getArrayEncoder(timestamps).toBinaryRepresentation(timestamps, Oid.TIMESTAMP_ARRAY, baos);
byteArray = baos.toByteArray();
dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("timestamp[]");
sendResult[12] = byteArray;
preparedStatement.setObject(13, dateBinary);
baos.reset();
BooleanArrayEncoder.INSTANCE.write(true,baos);
byteArray = baos.toByteArray();
byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length);
dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("bool");
sendResult[13] = byteArray;
preparedStatement.setObject(14,dateBinary);
byteArray = new byte[10];
dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("bytea");
sendResult[14] = byteArray;
preparedStatement.setObject(15, dateBinary);
baos.reset();
DateArrayEncoder.INSTANCE.write(new Date(-TimeZone.getDefault().getRawOffset()),baos);
byteArray = baos.toByteArray();
byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length);
dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("date");
sendResult[15] = byteArray;
preparedStatement.setObject(16, dateBinary);
baos.reset();
Float8ArrayEncoder.INSTANCE.write(1.2,baos);
byteArray = baos.toByteArray();
byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length);
dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("float8");
sendResult[16] = byteArray;
preparedStatement.setObject(17, dateBinary);
baos.reset();
Float4ArrayEncoder.INSTANCE.write(1.2f,baos);
byteArray = baos.toByteArray();
byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length);
dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("float4");
sendResult[17] = byteArray;
preparedStatement.setObject(18, dateBinary);
baos.reset();
Int2ArrayEncoder.INSTANCE.write((short) 1,baos);
byteArray = baos.toByteArray();
byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length);
dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("int2");
sendResult[18] = byteArray;
preparedStatement.setObject(19, dateBinary);
baos.reset();
Int4ArrayEncoder.INSTANCE.write( 1,baos);
byteArray = baos.toByteArray();
byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length);
dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("int4");
sendResult[19] = byteArray;
preparedStatement.setObject(20, dateBinary);
baos.reset();
Int8ArrayEncoder.INSTANCE.write(1L,baos);
byteArray = baos.toByteArray();
byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length);
dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("int8");
sendResult[20] = byteArray;
preparedStatement.setObject(21, dateBinary);
baos.reset();
NumericArrayEncoder.INSTANCE.write(new BigDecimal("1.2"),baos);
byteArray = baos.toByteArray();
byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length );
dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("numeric");
sendResult[21] = byteArray;
preparedStatement.setObject(22, dateBinary);
baos.reset();
StringArrayEncoder.INSTANCE.write("testString",baos);
byteArray = baos.toByteArray();
byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length );
dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("text");
sendResult[22] = byteArray;
preparedStatement.setObject(23, dateBinary);
baos.reset();
StringArrayEncoder.INSTANCE.write("testVarchar",baos);
byteArray = baos.toByteArray();
byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length );
dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("varchar");
sendResult[23] = byteArray;
preparedStatement.setObject(24, dateBinary);
baos.reset();
TimeArrayEncoder.INSTANCE.write(new Time(-TimeZone.getDefault().getRawOffset()),baos);
byteArray = baos.toByteArray();
byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length );
dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("time");
sendResult[24] = byteArray;
preparedStatement.setObject(25, dateBinary);
baos.reset();
TimestampArrayEncoder.INSTANCE.write(new Timestamp(-TimeZone.getDefault().getRawOffset()),baos);
byteArray = baos.toByteArray();
byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length );
dateBinary = new PgBinaryObj(byteArray);
dateBinary.setType("timestamp");
sendResult[25] = byteArray;
preparedStatement.setObject(26, dateBinary);
//
// baos.reset();
// Int2ArrayEncoder.INSTANCE.write((short) 1,baos);
// byteArray = baos.toByteArray();
// byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length );
// dateBinary = new PgBinaryObj(byteArray);
// dateBinary.setType("int2");
// sendResult[18] = byteArray;
preparedStatement.execute();
ResultSet resultSet = preparedStatement.getResultSet();
resultSet.next();
Object value = resultSet.getObject(6);
// value.toString();
value = resultSet.getObject(11);
value = resultSet.getObject(12);
value = resultSet.getObject(13);
value = resultSet.getObject(14);
for (int col = 1; col <= resultSet.getMetaData().getColumnCount(); col++) {
value = resultSet.getBytes(col);
boolean equals = new String((byte[]) value, StandardCharsets.US_ASCII).equals(new String((byte[]) sendResult[col - 1], StandardCharsets.US_ASCII));
System.out.println("col:"+col+ equals);
}
} catch (SQLException e) {
e.printStackTrace();
}
}
} |
|
Additionally, this PR contains some unused Decoder classes. I will keep them if we need them, but if not, I will remove them. |
|
[ERROR] /home/shenfeng/github/shardingsphere/database/protocol/type/postgresql/src/main/java/org/apache/shardingsphere/database/protocol/postgresql/packet/command/query/extended/bind/protocol/PostgreSQLArrayParameterDecoder.java:168:18: Control variable 'i' is modified. [ModifiedControlVariable] Are you serious? |
@terrymanu @RaigorJiang Hello, all test cases passed except for one Spotless exception. Can we proceed with the code review? By the way, I'd also like to discuss how to add my integration tests.
|
|
Hi @terrymanu @RaigorJiang, any updates on this? Thanks! |
...rotocol/dialect/postgresql/src/main/java/org/postgresql/jdbc/ShardingSpherePgArrayUtils.java
Outdated
Show resolved
Hide resolved
...rotocol/dialect/postgresql/src/main/java/org/postgresql/jdbc/ShardingSpherePgArrayUtils.java
Outdated
Show resolved
Hide resolved
...stgresql/packet/command/query/extended/bind/protocol/PostgreSQLArrayBinaryProtocolValue.java
Show resolved
Hide resolved
.../postgresql/packet/command/query/extended/bind/protocol/PostgreSQLArrayParameterDecoder.java
Outdated
Show resolved
Hide resolved
...esql/packet/command/query/extended/bind/protocol/PostgreSQLArrayBinaryProtocolValueTest.java
Outdated
Show resolved
Hide resolved
|
Hi @ShenFeng312 , thank you for your great work. Your thoroughness is evident in this pull request. Thank you! |
|
PTAL @RaigorJiang @terrymanu |
RaigorJiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ShenFeng312 , every time I read this pull request, I'm amazed at the effort you put into it. I salute you.
This pull request involves handling multiple data types, affecting many files and complex logic, especially since you also deleted some files and performed some refactoring.
I plan to spend more time researching these type implementations and again request that you adjust the code style. There are still many areas that don't conform to ShardingSphere code standards, such as comments and blank lines.
Please refer to: https://shardingsphere.apache.org/community/en/involved/conduct/code/
|
Hi! @RaigorJiang If you don’t mind, I’d really appreciate your help in pointing out any remaining style problems so that I can fully align with the project standards. Thanks again for your support! |
|
ping @RaigorJiang @terrymanu |
|
Hi @RaigorJiang @terrymanu I just wanted to check if there are any updates on this PR? |
RaigorJiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ShenFeng312
I discussed this with several committers familiar with the Proxy protocol, and we reached the following consensus:
-
The current PR involves too many changes, including adjustments to data processing logic and abstraction levels, making review difficult;
-
The code has many non-standard aspects, including naming conventions and comments;
-
There are clearly incorrect parts of the business logic, such as
PostgreSQLStringBinaryProtocolValue#write.
Based on the above information, we speculate that some code in this PR was generated by a large model? Some code appears to have not been rigorously considered.
Therefore, we suggest you withdraw this PR and submit it multiple times with small-scale adjustments, ensuring the correctness of the code logic and comments, as well as consistency in style.
| return ((byte[]) value).length; | ||
| } | ||
| return value.toString().getBytes(payload.getCharset()).length; | ||
| return -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is ColumnLength a fixed value of -1? This is incorrect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For performance considerations, I believe it's reasonable to return -1 when the length cannot be determined, and then obtain the value when writing. I have noted the reason for the change in the interface.
| @Override | ||
| public Object read(final PostgreSQLPacketPayload payload, final int parameterValueLength) { | ||
| byte[] bytes = new byte[parameterValueLength]; | ||
| // 获取pg 时间戳 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chinese comments are not accepted.
| import java.nio.charset.Charset; | ||
|
|
||
| /** | ||
| * postgresql array decoder. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specialized terms and sentences should all be capitalized.
| import java.nio.charset.Charset; | ||
|
|
||
| /** | ||
| * BooleanArrayDecoder. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Words in the comments should be separated, not camelCase.
| } else { | ||
| payload.writeStringEOF(value.toString()); | ||
| byte[] bytes = value.toString().getBytes(payload.getCharset()); | ||
| payload.writeInt4(bytes.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PostgreSQL DataRowPacket has a unified write length operation (based on getColumnLength()), so individual value implementations do not need to call writeInt4.
This is clearly incorrect logic.
I don't think you have carefully reviewed this PR. The serialization and deserialization logic for arrays in this PR comes from part of the code in pgjdbc. I have abstracted and optimized it to support both binary and text formats. As a result, it carries the coding style of pg-jdbc. @RaigorJiang |
@ShenFeng312 Furthermore, you didn't specify the source of the copy in the comments of class. So, when pgjdbc is updated or bugs are fixed, who will maintain the copied code in ShardingSphere? I noticed that you mainly use Finally, we cannot assume that because the code is copied, we can break ShardingSphere's coding standards; this is unsustainable. |
@RaigorJiang |
Thank you. If you can resubmit and improve it, that would of course be the best. The original intention of submitting this PR was to demonstrate how to correctly solve the serialization and deserialization issues. Feel free to use the code in this PR and make modifications anywhere. I have no intention of insisting that you must use my code.@RaigorJiang |
|
But please note, your implementation in #32848 is incorrect. PG's |
@ShenFeng312 Okay, I will test it based on the information you provided. Thank you for your prompt feedback. |
For performance considerations, I believe it's reasonable to return -1 when the length cannot be determined, and then obtain the value when writing. I have noted the reason for the change in the interface. If we convert the string into a byte array just to get its length, I think that’s a waste of performance. It's completely unnecessary, because when writing we already obtain its byte array anyway |

For #35830
Changes proposed in this pull request:
Before committing this PR, I'm sure that I have checked the following options:
./mvnw clean install -B -T1C -Dmaven.javadoc.skip -Dmaven.jacoco.skip -e.