diff --git a/ems-core/src/main/java/it/integry/ems/controller/EmsController.java b/ems-core/src/main/java/it/integry/ems/controller/EmsController.java index be28421608..1c293ff97b 100644 --- a/ems-core/src/main/java/it/integry/ems/controller/EmsController.java +++ b/ems-core/src/main/java/it/integry/ems/controller/EmsController.java @@ -705,7 +705,7 @@ public class EmsController { if (ordinaSuPriorita) entityList = entityPropertyHolder.getEntityChain(entityList, multiDBTransactionManager.getPrimaryConnection()); - Integer prevTransactionGroupId = null; + Long prevTransactionGroupId = null; for (EntityBase entity : entityList) { entity.setTransactionGroupId(prevTransactionGroupId); diff --git a/ems-core/src/main/java/it/integry/ems/service/EntityProcessor.java b/ems-core/src/main/java/it/integry/ems/service/EntityProcessor.java index 3c535d6cbc..8fce41c407 100644 --- a/ems-core/src/main/java/it/integry/ems/service/EntityProcessor.java +++ b/ems-core/src/main/java/it/integry/ems/service/EntityProcessor.java @@ -498,7 +498,7 @@ public class EntityProcessor { entities = entityPropertyHolder.getEntityChain(entities, multiDBTransactionManager.getPrimaryDatasource().getConnection()); } - Integer prevTransactionGroupId = null; + Long prevTransactionGroupId = null; for (int i = 0; i < entities.size(); i++) { EntityBase entity = entities.get(i); diff --git a/ems-core/src/main/java/it/integry/ems/sync/AsyncManager.java b/ems-core/src/main/java/it/integry/ems/sync/AsyncManager.java index bc1b99cc40..dc3186f0c9 100644 --- a/ems-core/src/main/java/it/integry/ems/sync/AsyncManager.java +++ b/ems-core/src/main/java/it/integry/ems/sync/AsyncManager.java @@ -1,18 +1,21 @@ package it.integry.ems.sync; +import com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement; import it.integry.annotations.PostContextConstruct; -import it.integry.common.var.CommonConstants; import it.integry.ems.json.JSONObjectMapper; import it.integry.ems.looper.service.LooperService; import it.integry.ems.settings.Model.AvailableConnectionsModel; import it.integry.ems.settings.Model.SettingsModel; import it.integry.ems.settings.SettingsController; import it.integry.ems.sync.MultiDBTransaction.MultiDBTransactionManager; -import it.integry.ems.utility.UtilityDebug; import it.integry.ems_model.base.EntityBase; import it.integry.ems_model.entity.StbPublicationsDetail; +import it.integry.ems_model.entity.StbTransactionLog; +import it.integry.ems_model.types.OperationType; import it.integry.ems_model.utility.UtilityDB; +import it.integry.ems_model.utility.UtilityLocalDate; import it.integry.ems_model.utility.UtilityString; +import javafx.util.Pair; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.josql.Query; @@ -21,11 +24,12 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.context.ContextLoader; -import java.io.StringWriter; import java.sql.Connection; -import java.sql.PreparedStatement; -import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; @Component @@ -45,12 +49,14 @@ public class AsyncManager { private final static HashMap> cachedSetup = new HashMap<>(); + private final static ConcurrentLinkedQueue> toBeSavedQueue = new ConcurrentLinkedQueue<>(); @PostContextConstruct public void init() { - if (!UtilityDebug.isDebugExecution() && !UtilityDebug.isIntegryServer()) { - looperService.add(this::internalCachePublicationsSetup, 5 * 60 * 1000, AsyncManager.class.getName()); - } + //if (!UtilityDebug.isDebugExecution() && !UtilityDebug.isIntegryServer()) { + looperService.add(this::internalCachePublicationsSetup, 5 * 60 * 1000, "sync-setup-cache"); + looperService.add(this::consumeToBeSavedQueue, 20 * 1000, "sync-flush-data"); + //} } private void internalCachePublicationsSetup() { @@ -130,49 +136,88 @@ public class AsyncManager { } - public static void saveNewTransaction(Connection connection, EntityBase entityBase, int transactionGroupId) throws Exception { - - String transactionDate = new SimpleDateFormat(CommonConstants.DATETIME_FORMAT_YMD).format(new Date()); - int transactionId = 0; - - Integer tmpTransactionId = UtilityDB.executeSimpleQueryOnlyFirstRowFirstColumn( - connection, "SELECT max(transaction_id) as max_id from stb_transaction_log_db"); - - if (tmpTransactionId != null) transactionId = tmpTransactionId; - transactionId++; //Incremento l'ID + public static Long saveNewTransaction(Connection connection, String dbName, EntityBase entityBase, long publicationId, Long transactionGroupId) throws Exception { + //TODO: Calc transaction group ID here + if (transactionGroupId == null) transactionGroupId = getNextTransactionGroupId(connection, dbName); JSONObjectMapper jsonObjectMapper = ContextLoader.getCurrentWebApplicationContext().getBean(JSONObjectMapper.class); - StringWriter writer = new StringWriter(); - jsonObjectMapper.writeValue(writer, entityBase); - - String entityName = entityBase.getTableName().toUpperCase(); - - String sql = - "INSERT INTO stb_transaction_log_db(transaction_id, transaction_group_id, transaction_date, user_name, transaction_json, entity_name_list) " + - "VALUES (" + transactionId + "," + transactionGroupId + ",'" + transactionDate + "', null, '%s', " + UtilityDB.valueToString(entityName) + ")"; - - String jsonString = writer.toString().replace("'", "''"); - - sql = String.format(sql, jsonString); - - PreparedStatement psExec = connection.prepareStatement(sql); - psExec.executeUpdate(); - psExec.close(); - - } - - - public static int getNextTransactionGroupId(Connection connection) throws Exception { - int transactionGroupId = 0; - - Integer tmpTransactionGroupId = UtilityDB.executeSimpleQueryOnlyFirstRowFirstColumn( - connection, "SELECT MAX(transaction_group_id) AS max_id from stb_transaction_log_db"); - - if (tmpTransactionGroupId != null) transactionGroupId = tmpTransactionGroupId; - transactionGroupId++; //Incremento l'ID + StbTransactionLog stbTransactionLog = new StbTransactionLog() + .setCreatedAt(UtilityLocalDate.getNowTime()) + .setEntities(entityBase.getTableName().toUpperCase()) + .setEntitiesJson(jsonObjectMapper.writeValueAsString(entityBase)) + .setPublicationGroupId(publicationId) + .setUserName(entityBase.getUsername()) + .setGroupId(transactionGroupId); + stbTransactionLog.setOperation(OperationType.INSERT); + toBeSavedQueue.add(new Pair<>(dbName, stbTransactionLog)); return transactionGroupId; } + + public static long getNextTransactionGroupId(Connection connection, String dbName) throws Exception { + + final long maxToBeProcessed = toBeSavedQueue.stream().filter(x -> x.getKey().equalsIgnoreCase(dbName)) + .map(x -> x.getValue().getGroupId()) + .max(Long::compare) + .orElse(0L); + + long transactionGroupId = 0; + + if (maxToBeProcessed <= 0) { + Long tmpTransactionGroupId = UtilityDB.executeSimpleQueryOnlyFirstRowFirstColumn( + connection, "SELECT MAX(group_id) AS max_id FROM " + StbTransactionLog.ENTITY); + + tmpTransactionGroupId = tmpTransactionGroupId != 0 ? tmpTransactionGroupId : 0L; + + transactionGroupId = Math.max(maxToBeProcessed, tmpTransactionGroupId); + } else { + transactionGroupId = maxToBeProcessed; + } + + transactionGroupId++; //Incremento l'ID + return transactionGroupId; + } + + + private void consumeToBeSavedQueue() { + List dbNamesToConnect = toBeSavedQueue.stream().map(Pair::getKey).distinct().collect(Collectors.toList()); + + final List databaseConnections = settingsModel.getAvailableConnections().stream() + .filter(x -> dbNamesToConnect.stream().anyMatch(y -> x.getDbName().equalsIgnoreCase(y))) + .collect(Collectors.toList()); + + String insertSQL = "INSERT INTO " + StbTransactionLog.ENTITY + " (publication_group_id, created_at, user_name, entities, entities_json, group_id)" + + " VALUES (?, ?, ?, ?, ?, ?)"; + + Pair itemToSave = null; + while ((itemToSave = toBeSavedQueue.poll()) != null) { + final Pair finalItemToSave = itemToSave; + final StbTransactionLog stbTransactionLog = finalItemToSave.getValue(); + + AvailableConnectionsModel connectionModel = databaseConnections.stream() + .filter(x -> x.getDbName().equalsIgnoreCase(finalItemToSave.getKey())) + .findFirst() + .get(); + + try (MultiDBTransactionManager multiDBTransactionManager = new MultiDBTransactionManager(connectionModel); + SQLServerPreparedStatement insertBulkPs = (SQLServerPreparedStatement) multiDBTransactionManager.prepareStatement(insertSQL)) { + + insertBulkPs.setLong(1, stbTransactionLog.getGroupId()); + insertBulkPs.setObject(2, stbTransactionLog.getCreatedAt()); + insertBulkPs.setString(3, stbTransactionLog.getUserName()); + insertBulkPs.setString(4, stbTransactionLog.getEntities()); + insertBulkPs.setString(5, stbTransactionLog.getEntitiesJson()); + insertBulkPs.setLong(6, stbTransactionLog.getGroupId()); + + + insertBulkPs.executeLargeUpdate(); + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + } } diff --git a/ems-core/src/main/java/it/integry/ems/sync/MultiDBTransaction/MultiDBTransactionManager.java b/ems-core/src/main/java/it/integry/ems/sync/MultiDBTransaction/MultiDBTransactionManager.java index 403b90c5a5..04c32f7574 100644 --- a/ems-core/src/main/java/it/integry/ems/sync/MultiDBTransaction/MultiDBTransactionManager.java +++ b/ems-core/src/main/java/it/integry/ems/sync/MultiDBTransaction/MultiDBTransactionManager.java @@ -46,6 +46,10 @@ public class MultiDBTransactionManager implements AutoCloseable{ public MultiDBTransactionManager() { } + public MultiDBTransactionManager(AvailableConnectionsModel connectionsModel) throws Exception { + this(connectionsModel.getProfileName(), true); + } + public MultiDBTransactionManager(String profileDb) throws Exception { this(profileDb, true); } diff --git a/ems-core/src/main/java/it/integry/ems_model/base/EntityBase.java b/ems-core/src/main/java/it/integry/ems_model/base/EntityBase.java index b5cea009c9..7f7df79f56 100644 --- a/ems-core/src/main/java/it/integry/ems_model/base/EntityBase.java +++ b/ems-core/src/main/java/it/integry/ems_model/base/EntityBase.java @@ -113,7 +113,7 @@ public abstract class EntityBase implements Serializable, Cloneable, EntityInter //ID del gruppo di transazioni per le Sync OFFLINE @JsonIgnore - private Integer transactionGroupId = null; + private Long transactionGroupId = null; @JsonIgnore private int queryTimeoutSeconds = 60 * 30; @@ -927,8 +927,7 @@ public abstract class EntityBase implements Serializable, Cloneable, EntityInter long publicationId = AsyncManager.getPublicationIdIfExists(dbName, this); if (publicationId > 0) { - if (transactionGroupId == null) transactionGroupId = AsyncManager.getNextTransactionGroupId(connection); - AsyncManager.saveNewTransaction(connection, this, transactionGroupId); + transactionGroupId = AsyncManager.saveNewTransaction(connection, dbName, this, publicationId, transactionGroupId); } } } @@ -1988,11 +1987,11 @@ public abstract class EntityBase implements Serializable, Cloneable, EntityInter return builder.toString(); } - public Integer getTransactionGroupId() { + public Long getTransactionGroupId() { return transactionGroupId; } - public void setTransactionGroupId(Integer transactionGroupId) { + public void setTransactionGroupId(Long transactionGroupId) { this.transactionGroupId = transactionGroupId; } @@ -2005,8 +2004,7 @@ public abstract class EntityBase implements Serializable, Cloneable, EntityInter try { field.setAccessible(true); idRiga = (Integer) field.get(entityBase); - } catch (Exception e) { - + } catch (Exception ignored) { } return idRiga; } diff --git a/ems-core/src/main/java/it/integry/ems_model/base/EntityInterface.java b/ems-core/src/main/java/it/integry/ems_model/base/EntityInterface.java index 5672afc87e..904735468f 100644 --- a/ems-core/src/main/java/it/integry/ems_model/base/EntityInterface.java +++ b/ems-core/src/main/java/it/integry/ems_model/base/EntityInterface.java @@ -83,9 +83,9 @@ public interface EntityInterface { EntityException getException(); - Integer getTransactionGroupId(); + Long getTransactionGroupId(); - void setTransactionGroupId(Integer transactionGroupId); + void setTransactionGroupId(Long transactionGroupId); void resetTransactionGroupId(); diff --git a/ems-core/src/main/java/it/integry/ems_model/entity/StbTransactionLog.java b/ems-core/src/main/java/it/integry/ems_model/entity/StbTransactionLog.java index a4daf6bad3..6c661b0615 100644 --- a/ems-core/src/main/java/it/integry/ems_model/entity/StbTransactionLog.java +++ b/ems-core/src/main/java/it/integry/ems_model/entity/StbTransactionLog.java @@ -39,7 +39,7 @@ public class StbTransactionLog extends EntityBase { private String entitiesJson; @SqlField(value = "group_id", nullable = false) - private int groupId; + private long groupId; public long getId() { return id; @@ -95,11 +95,11 @@ public class StbTransactionLog extends EntityBase { return this; } - public int getGroupId() { + public long getGroupId() { return groupId; } - public StbTransactionLog setGroupId(int groupId) { + public StbTransactionLog setGroupId(long groupId) { this.groupId = groupId; return this; }