Refactoring gestione delle sincronizzazioni in fase di salvataggio di una entity
This commit is contained in:
@@ -705,7 +705,7 @@ public class EmsController {
|
|||||||
if (ordinaSuPriorita)
|
if (ordinaSuPriorita)
|
||||||
entityList = entityPropertyHolder.getEntityChain(entityList, multiDBTransactionManager.getPrimaryConnection());
|
entityList = entityPropertyHolder.getEntityChain(entityList, multiDBTransactionManager.getPrimaryConnection());
|
||||||
|
|
||||||
Integer prevTransactionGroupId = null;
|
Long prevTransactionGroupId = null;
|
||||||
|
|
||||||
for (EntityBase entity : entityList) {
|
for (EntityBase entity : entityList) {
|
||||||
entity.setTransactionGroupId(prevTransactionGroupId);
|
entity.setTransactionGroupId(prevTransactionGroupId);
|
||||||
|
|||||||
@@ -498,7 +498,7 @@ public class EntityProcessor {
|
|||||||
entities = entityPropertyHolder.getEntityChain(entities, multiDBTransactionManager.getPrimaryDatasource().getConnection());
|
entities = entityPropertyHolder.getEntityChain(entities, multiDBTransactionManager.getPrimaryDatasource().getConnection());
|
||||||
}
|
}
|
||||||
|
|
||||||
Integer prevTransactionGroupId = null;
|
Long prevTransactionGroupId = null;
|
||||||
|
|
||||||
for (int i = 0; i < entities.size(); i++) {
|
for (int i = 0; i < entities.size(); i++) {
|
||||||
EntityBase entity = entities.get(i);
|
EntityBase entity = entities.get(i);
|
||||||
|
|||||||
@@ -1,18 +1,21 @@
|
|||||||
package it.integry.ems.sync;
|
package it.integry.ems.sync;
|
||||||
|
|
||||||
|
import com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement;
|
||||||
import it.integry.annotations.PostContextConstruct;
|
import it.integry.annotations.PostContextConstruct;
|
||||||
import it.integry.common.var.CommonConstants;
|
|
||||||
import it.integry.ems.json.JSONObjectMapper;
|
import it.integry.ems.json.JSONObjectMapper;
|
||||||
import it.integry.ems.looper.service.LooperService;
|
import it.integry.ems.looper.service.LooperService;
|
||||||
import it.integry.ems.settings.Model.AvailableConnectionsModel;
|
import it.integry.ems.settings.Model.AvailableConnectionsModel;
|
||||||
import it.integry.ems.settings.Model.SettingsModel;
|
import it.integry.ems.settings.Model.SettingsModel;
|
||||||
import it.integry.ems.settings.SettingsController;
|
import it.integry.ems.settings.SettingsController;
|
||||||
import it.integry.ems.sync.MultiDBTransaction.MultiDBTransactionManager;
|
import it.integry.ems.sync.MultiDBTransaction.MultiDBTransactionManager;
|
||||||
import it.integry.ems.utility.UtilityDebug;
|
|
||||||
import it.integry.ems_model.base.EntityBase;
|
import it.integry.ems_model.base.EntityBase;
|
||||||
import it.integry.ems_model.entity.StbPublicationsDetail;
|
import it.integry.ems_model.entity.StbPublicationsDetail;
|
||||||
|
import it.integry.ems_model.entity.StbTransactionLog;
|
||||||
|
import it.integry.ems_model.types.OperationType;
|
||||||
import it.integry.ems_model.utility.UtilityDB;
|
import it.integry.ems_model.utility.UtilityDB;
|
||||||
|
import it.integry.ems_model.utility.UtilityLocalDate;
|
||||||
import it.integry.ems_model.utility.UtilityString;
|
import it.integry.ems_model.utility.UtilityString;
|
||||||
|
import javafx.util.Pair;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.josql.Query;
|
import org.josql.Query;
|
||||||
@@ -21,11 +24,12 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.web.context.ContextLoader;
|
import org.springframework.web.context.ContextLoader;
|
||||||
|
|
||||||
import java.io.StringWriter;
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.PreparedStatement;
|
import java.util.ArrayList;
|
||||||
import java.text.SimpleDateFormat;
|
import java.util.HashMap;
|
||||||
import java.util.*;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
@@ -45,12 +49,14 @@ public class AsyncManager {
|
|||||||
|
|
||||||
private final static HashMap<String, List<StbPublicationsDetail>> cachedSetup = new HashMap<>();
|
private final static HashMap<String, List<StbPublicationsDetail>> cachedSetup = new HashMap<>();
|
||||||
|
|
||||||
|
private final static ConcurrentLinkedQueue<Pair<String, StbTransactionLog>> toBeSavedQueue = new ConcurrentLinkedQueue<>();
|
||||||
|
|
||||||
@PostContextConstruct
|
@PostContextConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
if (!UtilityDebug.isDebugExecution() && !UtilityDebug.isIntegryServer()) {
|
//if (!UtilityDebug.isDebugExecution() && !UtilityDebug.isIntegryServer()) {
|
||||||
looperService.add(this::internalCachePublicationsSetup, 5 * 60 * 1000, AsyncManager.class.getName());
|
looperService.add(this::internalCachePublicationsSetup, 5 * 60 * 1000, "sync-setup-cache");
|
||||||
}
|
looperService.add(this::consumeToBeSavedQueue, 20 * 1000, "sync-flush-data");
|
||||||
|
//}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void internalCachePublicationsSetup() {
|
private void internalCachePublicationsSetup() {
|
||||||
@@ -130,49 +136,88 @@ public class AsyncManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static void saveNewTransaction(Connection connection, EntityBase entityBase, int transactionGroupId) throws Exception {
|
public static Long saveNewTransaction(Connection connection, String dbName, EntityBase entityBase, long publicationId, Long transactionGroupId) throws Exception {
|
||||||
|
//TODO: Calc transaction group ID here
|
||||||
String transactionDate = new SimpleDateFormat(CommonConstants.DATETIME_FORMAT_YMD).format(new Date());
|
if (transactionGroupId == null) transactionGroupId = getNextTransactionGroupId(connection, dbName);
|
||||||
int transactionId = 0;
|
|
||||||
|
|
||||||
Integer tmpTransactionId = UtilityDB.executeSimpleQueryOnlyFirstRowFirstColumn(
|
|
||||||
connection, "SELECT max(transaction_id) as max_id from stb_transaction_log_db");
|
|
||||||
|
|
||||||
if (tmpTransactionId != null) transactionId = tmpTransactionId;
|
|
||||||
transactionId++; //Incremento l'ID
|
|
||||||
|
|
||||||
JSONObjectMapper jsonObjectMapper = ContextLoader.getCurrentWebApplicationContext().getBean(JSONObjectMapper.class);
|
JSONObjectMapper jsonObjectMapper = ContextLoader.getCurrentWebApplicationContext().getBean(JSONObjectMapper.class);
|
||||||
|
|
||||||
StringWriter writer = new StringWriter();
|
StbTransactionLog stbTransactionLog = new StbTransactionLog()
|
||||||
jsonObjectMapper.writeValue(writer, entityBase);
|
.setCreatedAt(UtilityLocalDate.getNowTime())
|
||||||
|
.setEntities(entityBase.getTableName().toUpperCase())
|
||||||
String entityName = entityBase.getTableName().toUpperCase();
|
.setEntitiesJson(jsonObjectMapper.writeValueAsString(entityBase))
|
||||||
|
.setPublicationGroupId(publicationId)
|
||||||
String sql =
|
.setUserName(entityBase.getUsername())
|
||||||
"INSERT INTO stb_transaction_log_db(transaction_id, transaction_group_id, transaction_date, user_name, transaction_json, entity_name_list) " +
|
.setGroupId(transactionGroupId);
|
||||||
"VALUES (" + transactionId + "," + transactionGroupId + ",'" + transactionDate + "', null, '%s', " + UtilityDB.valueToString(entityName) + ")";
|
stbTransactionLog.setOperation(OperationType.INSERT);
|
||||||
|
|
||||||
String jsonString = writer.toString().replace("'", "''");
|
|
||||||
|
|
||||||
sql = String.format(sql, jsonString);
|
|
||||||
|
|
||||||
PreparedStatement psExec = connection.prepareStatement(sql);
|
|
||||||
psExec.executeUpdate();
|
|
||||||
psExec.close();
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public static int getNextTransactionGroupId(Connection connection) throws Exception {
|
|
||||||
int transactionGroupId = 0;
|
|
||||||
|
|
||||||
Integer tmpTransactionGroupId = UtilityDB.executeSimpleQueryOnlyFirstRowFirstColumn(
|
|
||||||
connection, "SELECT MAX(transaction_group_id) AS max_id from stb_transaction_log_db");
|
|
||||||
|
|
||||||
if (tmpTransactionGroupId != null) transactionGroupId = tmpTransactionGroupId;
|
|
||||||
transactionGroupId++; //Incremento l'ID
|
|
||||||
|
|
||||||
|
toBeSavedQueue.add(new Pair<>(dbName, stbTransactionLog));
|
||||||
return transactionGroupId;
|
return transactionGroupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static long getNextTransactionGroupId(Connection connection, String dbName) throws Exception {
|
||||||
|
|
||||||
|
final long maxToBeProcessed = toBeSavedQueue.stream().filter(x -> x.getKey().equalsIgnoreCase(dbName))
|
||||||
|
.map(x -> x.getValue().getGroupId())
|
||||||
|
.max(Long::compare)
|
||||||
|
.orElse(0L);
|
||||||
|
|
||||||
|
long transactionGroupId = 0;
|
||||||
|
|
||||||
|
if (maxToBeProcessed <= 0) {
|
||||||
|
Long tmpTransactionGroupId = UtilityDB.executeSimpleQueryOnlyFirstRowFirstColumn(
|
||||||
|
connection, "SELECT MAX(group_id) AS max_id FROM " + StbTransactionLog.ENTITY);
|
||||||
|
|
||||||
|
tmpTransactionGroupId = tmpTransactionGroupId != 0 ? tmpTransactionGroupId : 0L;
|
||||||
|
|
||||||
|
transactionGroupId = Math.max(maxToBeProcessed, tmpTransactionGroupId);
|
||||||
|
} else {
|
||||||
|
transactionGroupId = maxToBeProcessed;
|
||||||
|
}
|
||||||
|
|
||||||
|
transactionGroupId++; //Incremento l'ID
|
||||||
|
return transactionGroupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void consumeToBeSavedQueue() {
|
||||||
|
List<String> dbNamesToConnect = toBeSavedQueue.stream().map(Pair::getKey).distinct().collect(Collectors.toList());
|
||||||
|
|
||||||
|
final List<AvailableConnectionsModel> databaseConnections = settingsModel.getAvailableConnections().stream()
|
||||||
|
.filter(x -> dbNamesToConnect.stream().anyMatch(y -> x.getDbName().equalsIgnoreCase(y)))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
String insertSQL = "INSERT INTO " + StbTransactionLog.ENTITY + " (publication_group_id, created_at, user_name, entities, entities_json, group_id)" +
|
||||||
|
" VALUES (?, ?, ?, ?, ?, ?)";
|
||||||
|
|
||||||
|
Pair<String, StbTransactionLog> itemToSave = null;
|
||||||
|
while ((itemToSave = toBeSavedQueue.poll()) != null) {
|
||||||
|
final Pair<String, StbTransactionLog> finalItemToSave = itemToSave;
|
||||||
|
final StbTransactionLog stbTransactionLog = finalItemToSave.getValue();
|
||||||
|
|
||||||
|
AvailableConnectionsModel connectionModel = databaseConnections.stream()
|
||||||
|
.filter(x -> x.getDbName().equalsIgnoreCase(finalItemToSave.getKey()))
|
||||||
|
.findFirst()
|
||||||
|
.get();
|
||||||
|
|
||||||
|
try (MultiDBTransactionManager multiDBTransactionManager = new MultiDBTransactionManager(connectionModel);
|
||||||
|
SQLServerPreparedStatement insertBulkPs = (SQLServerPreparedStatement) multiDBTransactionManager.prepareStatement(insertSQL)) {
|
||||||
|
|
||||||
|
insertBulkPs.setLong(1, stbTransactionLog.getGroupId());
|
||||||
|
insertBulkPs.setObject(2, stbTransactionLog.getCreatedAt());
|
||||||
|
insertBulkPs.setString(3, stbTransactionLog.getUserName());
|
||||||
|
insertBulkPs.setString(4, stbTransactionLog.getEntities());
|
||||||
|
insertBulkPs.setString(5, stbTransactionLog.getEntitiesJson());
|
||||||
|
insertBulkPs.setLong(6, stbTransactionLog.getGroupId());
|
||||||
|
|
||||||
|
|
||||||
|
insertBulkPs.executeLargeUpdate();
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,6 +46,10 @@ public class MultiDBTransactionManager implements AutoCloseable{
|
|||||||
public MultiDBTransactionManager() {
|
public MultiDBTransactionManager() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MultiDBTransactionManager(AvailableConnectionsModel connectionsModel) throws Exception {
|
||||||
|
this(connectionsModel.getProfileName(), true);
|
||||||
|
}
|
||||||
|
|
||||||
public MultiDBTransactionManager(String profileDb) throws Exception {
|
public MultiDBTransactionManager(String profileDb) throws Exception {
|
||||||
this(profileDb, true);
|
this(profileDb, true);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -113,7 +113,7 @@ public abstract class EntityBase implements Serializable, Cloneable, EntityInter
|
|||||||
|
|
||||||
//ID del gruppo di transazioni per le Sync OFFLINE
|
//ID del gruppo di transazioni per le Sync OFFLINE
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
private Integer transactionGroupId = null;
|
private Long transactionGroupId = null;
|
||||||
|
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
private int queryTimeoutSeconds = 60 * 30;
|
private int queryTimeoutSeconds = 60 * 30;
|
||||||
@@ -927,8 +927,7 @@ public abstract class EntityBase implements Serializable, Cloneable, EntityInter
|
|||||||
long publicationId = AsyncManager.getPublicationIdIfExists(dbName, this);
|
long publicationId = AsyncManager.getPublicationIdIfExists(dbName, this);
|
||||||
|
|
||||||
if (publicationId > 0) {
|
if (publicationId > 0) {
|
||||||
if (transactionGroupId == null) transactionGroupId = AsyncManager.getNextTransactionGroupId(connection);
|
transactionGroupId = AsyncManager.saveNewTransaction(connection, dbName, this, publicationId, transactionGroupId);
|
||||||
AsyncManager.saveNewTransaction(connection, this, transactionGroupId);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1988,11 +1987,11 @@ public abstract class EntityBase implements Serializable, Cloneable, EntityInter
|
|||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getTransactionGroupId() {
|
public Long getTransactionGroupId() {
|
||||||
return transactionGroupId;
|
return transactionGroupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setTransactionGroupId(Integer transactionGroupId) {
|
public void setTransactionGroupId(Long transactionGroupId) {
|
||||||
this.transactionGroupId = transactionGroupId;
|
this.transactionGroupId = transactionGroupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2005,8 +2004,7 @@ public abstract class EntityBase implements Serializable, Cloneable, EntityInter
|
|||||||
try {
|
try {
|
||||||
field.setAccessible(true);
|
field.setAccessible(true);
|
||||||
idRiga = (Integer) field.get(entityBase);
|
idRiga = (Integer) field.get(entityBase);
|
||||||
} catch (Exception e) {
|
} catch (Exception ignored) {
|
||||||
|
|
||||||
}
|
}
|
||||||
return idRiga;
|
return idRiga;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -83,9 +83,9 @@ public interface EntityInterface {
|
|||||||
|
|
||||||
EntityException getException();
|
EntityException getException();
|
||||||
|
|
||||||
Integer getTransactionGroupId();
|
Long getTransactionGroupId();
|
||||||
|
|
||||||
void setTransactionGroupId(Integer transactionGroupId);
|
void setTransactionGroupId(Long transactionGroupId);
|
||||||
|
|
||||||
void resetTransactionGroupId();
|
void resetTransactionGroupId();
|
||||||
|
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ public class StbTransactionLog extends EntityBase {
|
|||||||
private String entitiesJson;
|
private String entitiesJson;
|
||||||
|
|
||||||
@SqlField(value = "group_id", nullable = false)
|
@SqlField(value = "group_id", nullable = false)
|
||||||
private int groupId;
|
private long groupId;
|
||||||
|
|
||||||
public long getId() {
|
public long getId() {
|
||||||
return id;
|
return id;
|
||||||
@@ -95,11 +95,11 @@ public class StbTransactionLog extends EntityBase {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getGroupId() {
|
public long getGroupId() {
|
||||||
return groupId;
|
return groupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public StbTransactionLog setGroupId(int groupId) {
|
public StbTransactionLog setGroupId(long groupId) {
|
||||||
this.groupId = groupId;
|
this.groupId = groupId;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user