Getting data from a very large table

  • A+
Category:Languages

I have a very large table in a MySQL database, 200 million records in table Users.

I make query, using JDBC:

public List<Pair<Long, String>> getUsersAll() throws SQLException {         Connection cnn = null;         CallableStatement cs = null;         ResultSet rs = null;         final List<Pair<Long, String>> res = new ArrayList<>();         try {             cnn = dataSource.getConnection();             cs = cnn.prepareCall("select UserPropertyKindId, login from TEST.users;");             rs = cs.executeQuery();             while (rs.next()) {                 res.add(new ImmutablePair<>(rs.getLong(1), rs.getString(2)));             }             return res;         } catch (SQLException ex) {             throw ex;         } finally {             DbUtils.closeQuietly(cnn, cs, rs);         }     } 

Next, I process the result:

List<Pair<Long, String>> users= dao.getUsersAll();             if (CollectionUtils.isNotEmpty(users)) {                 for (List<Pair<Long, String>> partition : Lists.partition(users, 2000)) {                     InconsistsUsers.InconsistsUsersCallable callable = new InconsistsUsers.InconsistsUsersCallable (new ArrayList<>(partition));                     processExecutor.submit(callable);                 }             } 

But since the table is very large and it is all unloaded into memory, my application crashes with an error:

com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure  The last packet successfully received from the server was 105,619 milliseconds ago. 

How can I receive data in parts and process them in order of priority, so as not to upload all the result into memory at once? It may be possible to create a cursor and upload data to a non-blocking queue and process it as the data arrives. How can this be done?

UPDATE:

My DB structure: https://www.db-fiddle.com/f/v377ZHkG1YZcdQsETtPm9L/3

Current algorithm:

1 Get all data users from Users table: select UserPropertyKindId, login from Users;

2 This result is split for 2000 pairs and submit to ThreadPoolTaskExecutor:

List<Pair<Long, String>> users= dao.getUsersAll();             if (CollectionUtils.isNotEmpty(users)) {                 for (List<Pair<Long, String>> partition : Lists.partition(users, 2000)) {                     InconsistsUsers.InconsistsUsersCallable callable = new InconsistsUsers.InconsistsUsersCallable(new ArrayList<>(partition));                     processExecutor.submit(callable));                 }             } 

3 In callable for each pair make two query:

First query:

select distinct entityId from UserPropertyValue where userPropertyKindId= ? and value = ? -- value its login from Users table 

Second query:

 SELECT UserIds from UserPropertyIndex where UserPropertyKindId = ? and Value = ? 

Two cases are possible:

  1. Result of first query is empty: loginng, send notification, continue to next pair;
  2. Result of second qiery not equal first query (varbinary data decoded. There are stored encoded entityId's). Then loginng, send notification, go to next pair.

I can't change the structure of the base. All the manipulations I have to do on the Java code side.

 


You should handle this on several levels:

JDBC driver fetch size

JDBC has a Statement.setFetchSize() method, which indicates how many rows are going to be pre-fetched by the JDBC driver prior to you getting them from JDBC. Note that MySQL JDBC drivers don't really implement this correctly, but you can set setFetchSize(Integer.MIN_VALUE) to prevent it from fetching all rows in one go. See also this answer here.

Note, you may also activate the feature on your connection using useCursorFetch

Your own logic

You should not put the entire list of users in memory. What you're doing right now is collecting all the rows from JDBC and then partitioning your list later on using Lists.partition(users, 2000). This is going in the right direction, but you're not doing it right yet. Instead, do:

try (ResultSet rs = cs.executeQuery()) {     while (rs.next()) {         res.add(new ImmutablePair<>(rs.getLong(1), rs.getString(2)));     }      // Process a batch of rows:     if (res.size() >= 2000) {         process(res);         res.clear();     } }  // Process the remaining rows process(res); 

The important message here is to not load all rows in memory and then process them in batches, but to process them directly while streaming rows from JDBC.

Comment

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: