Migliorata gestione del transaction group id nelle sincronizzazioni
This commit is contained in:
@@ -0,0 +1,47 @@
|
|||||||
|
package it.integry.ems.migration.model;
|
||||||
|
|
||||||
|
import it.integry.ems.migration._base.BaseMigration;
|
||||||
|
import it.integry.ems.migration._base.MigrationModelInterface;
|
||||||
|
import it.integry.ems_model.entity.StbPublications;
|
||||||
|
import it.integry.ems_model.types.OperationType;
|
||||||
|
import it.integry.ems_model.utility.UtilityDB;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class Migration_20241021113736 extends BaseMigration implements MigrationModelInterface {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void up() throws Exception {
|
||||||
|
if (isHistoryDB())
|
||||||
|
return;
|
||||||
|
|
||||||
|
String sql = "ALTER TABLE dbo.stb_publications" +
|
||||||
|
" ADD last_group_id BIGINT DEFAULT 0 NOT NULL";
|
||||||
|
|
||||||
|
executeStatement(sql);
|
||||||
|
|
||||||
|
String maxTransactionIdSql = "SELECT MAX(group_id) AS max_id " +
|
||||||
|
"FROM stb_transaction_log";
|
||||||
|
Long maxTransactionId = UtilityDB.executeSimpleQueryOnlyFirstRowFirstColumn(advancedDataSource.getConnection(), maxTransactionIdSql);
|
||||||
|
maxTransactionId = maxTransactionId == null ? 0 : ++maxTransactionId;
|
||||||
|
|
||||||
|
String retrievePublicationsSql = "SELECT * FROM stb_publications";
|
||||||
|
final List<StbPublications> stbPublications = UtilityDB.executeSimpleQueryDTO(advancedDataSource.getConnection(), retrievePublicationsSql, StbPublications.class);
|
||||||
|
|
||||||
|
if (stbPublications == null || stbPublications.isEmpty())
|
||||||
|
return;
|
||||||
|
|
||||||
|
for (StbPublications stbPublication : stbPublications) {
|
||||||
|
stbPublication.setLastGroupId(maxTransactionId);
|
||||||
|
|
||||||
|
stbPublication.setOperation(OperationType.UPDATE);
|
||||||
|
stbPublication.manageWithParentConnection(advancedDataSource.getConnection());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void down() throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -10,6 +10,7 @@ import it.integry.ems.settings.SettingsController;
|
|||||||
import it.integry.ems.sync.MultiDBTransaction.MultiDBTransactionManager;
|
import it.integry.ems.sync.MultiDBTransaction.MultiDBTransactionManager;
|
||||||
import it.integry.ems.utility.UtilityDebug;
|
import it.integry.ems.utility.UtilityDebug;
|
||||||
import it.integry.ems_model.base.EntityBase;
|
import it.integry.ems_model.base.EntityBase;
|
||||||
|
import it.integry.ems_model.entity.StbPublications;
|
||||||
import it.integry.ems_model.entity.StbPublicationsDetail;
|
import it.integry.ems_model.entity.StbPublicationsDetail;
|
||||||
import it.integry.ems_model.entity.StbTransactionLog;
|
import it.integry.ems_model.entity.StbTransactionLog;
|
||||||
import it.integry.ems_model.types.OperationType;
|
import it.integry.ems_model.types.OperationType;
|
||||||
@@ -41,7 +42,6 @@ public class AsyncManager {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private SettingsController settingsController;
|
private SettingsController settingsController;
|
||||||
|
|
||||||
|
|
||||||
private final static HashMap<String, List<StbPublicationsDetail>> cachedSetup = new HashMap<>();
|
private final static HashMap<String, List<StbPublicationsDetail>> cachedSetup = new HashMap<>();
|
||||||
|
|
||||||
private final static ConcurrentLinkedQueue<Map.Entry<String, StbTransactionLog>> toBeSavedQueue = new ConcurrentLinkedQueue<>();
|
private final static ConcurrentLinkedQueue<Map.Entry<String, StbTransactionLog>> toBeSavedQueue = new ConcurrentLinkedQueue<>();
|
||||||
@@ -82,15 +82,12 @@ public class AsyncManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void cacheSetup(String dbName, String profileName) throws Exception {
|
private void cacheSetup(String dbName, String profileName) throws Exception {
|
||||||
MultiDBTransactionManager multiDBTransactionManager = new MultiDBTransactionManager(profileName, false);
|
|
||||||
|
|
||||||
try {
|
try (MultiDBTransactionManager multiDBTransactionManager = new MultiDBTransactionManager(profileName, false)) {
|
||||||
String sql = "SELECT * FROM " + StbPublicationsDetail.ENTITY + " WHERE active = 1";
|
String sql = "SELECT * FROM " + StbPublicationsDetail.ENTITY + " WHERE active = 1";
|
||||||
List<StbPublicationsDetail> publications = UtilityDB.executeSimpleQueryDTO(multiDBTransactionManager.getPrimaryConnection(), sql, StbPublicationsDetail.class);
|
List<StbPublicationsDetail> publications = UtilityDB.executeSimpleQueryDTO(multiDBTransactionManager.getPrimaryConnection(), sql, StbPublicationsDetail.class);
|
||||||
|
|
||||||
cachedSetup.putIfAbsent(dbName, publications);
|
cachedSetup.putIfAbsent(dbName, publications);
|
||||||
} finally {
|
|
||||||
multiDBTransactionManager.closeAll();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -108,7 +105,7 @@ public class AsyncManager {
|
|||||||
|
|
||||||
//logger.debug("SYNC OFFLINE ABILITATA SU " + tableName);
|
//logger.debug("SYNC OFFLINE ABILITATA SU " + tableName);
|
||||||
|
|
||||||
for (StbPublicationsDetail activePublication : activePublications){
|
for (StbPublicationsDetail activePublication : activePublications) {
|
||||||
List<EntityBase> myObjs = new ArrayList<>();
|
List<EntityBase> myObjs = new ArrayList<>();
|
||||||
myObjs.add(entityBase);
|
myObjs.add(entityBase);
|
||||||
|
|
||||||
@@ -129,13 +126,13 @@ public class AsyncManager {
|
|||||||
|
|
||||||
return activePublication.getStbPublicationId();
|
return activePublication.getStbPublicationId();
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static Long saveNewTransaction(Connection connection, String dbName, EntityBase entityBase, long publicationId, Long transactionGroupId) throws Exception {
|
public static Long saveNewTransaction(Connection connection, String dbName, EntityBase entityBase, long publicationId, Long transactionGroupId) throws Exception {
|
||||||
//TODO: Calc transaction group ID here
|
//TODO: Calc transaction group ID here
|
||||||
if (transactionGroupId == null) transactionGroupId = getNextTransactionGroupId(connection, dbName);
|
if (transactionGroupId == null) transactionGroupId = getNextTransactionGroupId(connection, publicationId);
|
||||||
|
|
||||||
ResponseJSONObjectMapper jsonObjectMapper = ContextLoader.getCurrentWebApplicationContext().getBean(ResponseJSONObjectMapper.class);
|
ResponseJSONObjectMapper jsonObjectMapper = ContextLoader.getCurrentWebApplicationContext().getBean(ResponseJSONObjectMapper.class);
|
||||||
|
|
||||||
@@ -153,28 +150,17 @@ public class AsyncManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static long getNextTransactionGroupId(Connection connection, String dbName) throws Exception {
|
private static long getNextTransactionGroupId(Connection connection, long publicationId) throws Exception {
|
||||||
|
String retrievePublicationSql = "SELECT * FROM " + StbTransactionLog.ENTITY +
|
||||||
|
" WHERE id = " + publicationId;
|
||||||
|
|
||||||
final long maxToBeProcessed = toBeSavedQueue.stream().filter(x -> x.getKey().equalsIgnoreCase(dbName))
|
final StbPublications stbPublication = UtilityDB.executeSimpleQueryOnlyFirstRowDTO(connection, retrievePublicationSql, StbPublications.class);
|
||||||
.map(x -> x.getValue().getGroupId())
|
stbPublication.setLastGroupId(stbPublication.getLastGroupId() + 1);
|
||||||
.max(Long::compare)
|
|
||||||
.orElse(0L);
|
|
||||||
|
|
||||||
long transactionGroupId = 0;
|
stbPublication.setOperation(OperationType.UPDATE);
|
||||||
|
stbPublication.manageWithParentConnection(connection);
|
||||||
|
|
||||||
if (maxToBeProcessed <= 0) {
|
return stbPublication.getLastGroupId();
|
||||||
Long tmpTransactionGroupId = UtilityDB.executeSimpleQueryOnlyFirstRowFirstColumn(
|
|
||||||
connection, "SELECT MAX(group_id) AS max_id FROM " + StbTransactionLog.ENTITY);
|
|
||||||
|
|
||||||
tmpTransactionGroupId = UtilityLong.isNull(tmpTransactionGroupId,0L);
|
|
||||||
|
|
||||||
transactionGroupId = Math.max(maxToBeProcessed, tmpTransactionGroupId);
|
|
||||||
} else {
|
|
||||||
transactionGroupId = maxToBeProcessed;
|
|
||||||
}
|
|
||||||
|
|
||||||
transactionGroupId++; //Incremento l'ID
|
|
||||||
return transactionGroupId;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -4,11 +4,12 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
|
|||||||
import it.integry.ems_model.annotation.*;
|
import it.integry.ems_model.annotation.*;
|
||||||
import it.integry.ems_model.base.EntityBase;
|
import it.integry.ems_model.base.EntityBase;
|
||||||
import it.integry.ems_model.utility.Query;
|
import it.integry.ems_model.utility.Query;
|
||||||
import org.kie.api.definition.type.PropertyReactive;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.kie.api.definition.type.PropertyReactive;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
@Master
|
@Master
|
||||||
@PropertyReactive
|
@PropertyReactive
|
||||||
@@ -30,6 +31,9 @@ public class StbPublications extends EntityBase {
|
|||||||
@SqlField(value = "publication_description", maxLength = 1024, nullable = false)
|
@SqlField(value = "publication_description", maxLength = 1024, nullable = false)
|
||||||
private String publicationDescription;
|
private String publicationDescription;
|
||||||
|
|
||||||
|
@SqlField(value = "last_group_id", defaultObjectValue = "0")
|
||||||
|
private Long lastGroupId;
|
||||||
|
|
||||||
@EntityChild(copyPk = false)
|
@EntityChild(copyPk = false)
|
||||||
private List<StbPublicationsDetail> stbPublicationsDetails = new ArrayList<>();
|
private List<StbPublicationsDetail> stbPublicationsDetails = new ArrayList<>();
|
||||||
|
|
||||||
@@ -55,6 +59,15 @@ public class StbPublications extends EntityBase {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Long getLastGroupId() {
|
||||||
|
return lastGroupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public StbPublications setLastGroupId(Long lastGroupId) {
|
||||||
|
this.lastGroupId = lastGroupId;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public List<StbPublicationsDetail> getStbPublicationsDetails() {
|
public List<StbPublicationsDetail> getStbPublicationsDetails() {
|
||||||
return stbPublicationsDetails;
|
return stbPublicationsDetails;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user