Migliorata export transazioni

This commit is contained in:
2024-01-31 09:40:45 +01:00
parent 0534ddeb3e
commit 252faa7221
10 changed files with 255 additions and 135 deletions

View File

@@ -239,9 +239,9 @@ public class AsyncServiceNew {
TransactionDTO transaction = new TransactionDTO()
.setPublicationId(subscriptionDTO.getPublicationId())
.setEntityName(UtilityHashMap.<String>getValueIfExists(result, "entity_name_list"))
.setErrorMsg(UtilityHashMap.<String>getValueIfExists(result, "error_msg"))
.setEsito(UtilityHashMap.<String>getValueIfExists(result, "esito"))
.setExecDate(UtilityHashMap.<Date>getValueIfExists(result, "exec_date"))
//.setErrorMsg(UtilityHashMap.<String>getValueIfExists(result, "error_msg"))
//.setEsito(UtilityHashMap.<String>getValueIfExists(result, "esito"))
//.setExecDate(UtilityHashMap.<Date>getValueIfExists(result, "exec_date"))
.setTransactionDate(UtilityHashMap.<Date>getValueIfExists(result, "transaction_date"))
.setTransactionGroupId(UtilityHashMap.<Integer>getValueIfExists(result, "transaction_group_id"))
.setTransactionId(UtilityHashMap.<Integer>getValueIfExists(result, "transaction_id"))
@@ -249,9 +249,9 @@ public class AsyncServiceNew {
.setUsername(UtilityHashMap.<String>getValueIfExists(result, "user_name"));
if (!UtilityString.isNullOrEmpty(subscriptionDTO.getWhereCond())) {
Class entityClass = UtilityHashMap.getValueIfExists(EntitySubTypeHolder.getMapType(), subscriptionDTO.getEntityName().toLowerCase());
Class<? extends EntityBase> entityClass = UtilityHashMap.getValueIfExists(EntitySubTypeHolder.getMapType(), subscriptionDTO.getEntityName().toLowerCase());
EntityBase entity = jsonObjectMapper.readValue(transaction.transactionJson, EntityBase.class);
EntityBase entity = jsonObjectMapper.readValue(transaction.getTransactionJson(), EntityBase.class);
List<EntityBase> tmpList = new ArrayList<>();
tmpList.add(entity);

View File

@@ -88,4 +88,9 @@ public abstract class BaseMigration implements MigrationModelInterface {
return generatedId;
}
protected void dropTable(Connection connection, String tableName) throws SQLException {
String dropSql = "DROP TABLE " + tableName;
executeStatement(connection, dropSql);
}
}

View File

@@ -70,11 +70,8 @@ public class Migration_20240124171059 extends BaseMigration implements Migration
private void dropOldTables(@NotNull Connection connection) throws SQLException {
String dropSql = "DROP TABLE stb_publications_detail";
executeStatement(connection, dropSql);
dropSql = "DROP TABLE stb_publications";
executeStatement(connection, dropSql);
dropTable(connection, "stb_publications_detail");
dropTable(connection, "stb_publications");
}
private void createNewTables(@NotNull Connection connection) throws SQLException {

View File

@@ -0,0 +1,38 @@
package it.integry.ems.migration.model;
import it.integry.ems.migration.model._base.BaseMigration;
import it.integry.ems.migration.model._base.MigrationModelInterface;
public class Migration_20240130180106 extends BaseMigration implements MigrationModelInterface {
@Override
public void up() throws Exception {
if (isHistoryDB())
return;
dropTable(advancedDataSource.getConnection(), "stb_transaction_log_db");
dropTable(advancedDataSource.getConnection(), "stb_transaction_log");
String createTransactionTableSql = "\n" +
"CREATE TABLE dbo.stb_transaction_log\n" +
"(\n" +
" id BIGINT IDENTITY\n" +
" CONSTRAINT stb_transaction_log_pk\n" +
" PRIMARY KEY,\n" +
" publication_group_id BIGINT NOT NULL,\n" +
" created_at DATETIME DEFAULT GETDATE() NOT NULL,\n" +
" user_name VARCHAR(40),\n" +
" entities VARCHAR(MAX),\n" +
" entities_json VARCHAR(MAX) NOT NULL,\n" +
" group_id BIGINT NOT NULL\n" +
")";
executeStatement(advancedDataSource.getConnection(), createTransactionTableSql);
}
@Override
public void down() throws Exception {
}
}

View File

@@ -1,6 +1,7 @@
package it.integry.ems.sync;
import it.integry.annotations.PostContextConstruct;
import it.integry.common.var.CommonConstants;
import it.integry.ems.datasource.DataSource;
import it.integry.ems.json.JSONObjectMapper;
import it.integry.ems.looper.service.LooperService;
@@ -9,7 +10,7 @@ import it.integry.ems.sync.dto.ExportHistoryItemDTO;
import it.integry.ems.sync.dto.PublicationDTO;
import it.integry.ems_model.base.EntityBase;
import it.integry.ems_model.base.EntityPropertyHolder;
import it.integry.ems_model.entity.StbTransactionLogDb;
import it.integry.ems_model.entity.StbTransactionLog;
import it.integry.ems_model.types.OperationType;
import it.integry.ems_model.utility.UtilityDB;
import it.integry.ems_model.utility.UtilityLocalDate;
@@ -19,6 +20,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.sql.Connection;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -50,6 +53,7 @@ public class AsyncHistoryManager {
public void addToExportQueue(DataSource dataSource, long groupId, PublicationDTO publication) throws Exception {
tryAddInExecutionList(groupId, new ExportHistoryItemDTO()
.setPublicationGroupId(groupId)
.setPublication(publication)
.setDataSource(dataSource));
}
@@ -111,6 +115,10 @@ public class AsyncHistoryManager {
for (int chunkIndex = 0; chunkIndex < (totalItemCount / chunkSize) + 1; chunkIndex++) {
while (exportHistoryItem.getToProcessQueue().size() > 50000) {
Thread.sleep(10 * 1000);
}
String innerSql = "SELECT *, ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) AS row_number " +
"FROM " + exportHistoryItem.getPublication().getEntityName();
@@ -152,26 +160,58 @@ public class AsyncHistoryManager {
}
private void internalProcessQueue(ExportHistoryItemDTO entityToExport) throws Exception {
logger.debug("TBS COUNT: " + entityToExport.getToProcessQueue().size());
//logger.debug("TBS COUNT: " + entityToExport.getToProcessQueue().size());
if (entityToExport.getStartDate() == null) entityToExport.setStartDate(UtilityLocalDate.getNowTime());
try (MultiDBTransactionManager multiDBTransactionManager = new MultiDBTransactionManager(entityToExport.getDataSource().getProfile())) {
EntityBase entity;
while ((entity = entityToExport.getToProcessQueue().poll()) != null) {
StbTransactionLogDb stbTransactionLogDb = new StbTransactionLogDb()
.setTransactionDate(UtilityLocalDate.getNowTime())
.setTransactionJson(jsonObjectMapper.writeValueAsString(entity))
.setTransactionGroupIdField(-1)
.setEntityNameList(entityToExport.getPublication().getEntityName());
StbTransactionLog stbTransactionLog = new StbTransactionLog()
.setPublicationGroupId(entityToExport.getPublicationGroupId())
.setCreatedAt(UtilityLocalDate.getNowTime())
.setEntitiesJson(jsonObjectMapper.writeValueAsString(entity))
.setGroupId(-1)
.setEntities(entityToExport.getPublication().getEntityName());
stbTransactionLogDb
stbTransactionLog
.setOperation(OperationType.INSERT);
stbTransactionLogDb.manageWithParentConnection(multiDBTransactionManager.getPrimaryConnection());
stbTransactionLog.manageWithParentConnection(multiDBTransactionManager.getPrimaryConnection());
entityToExport.setSyncronizedItemCount(entityToExport.getSyncronizedItemCount() + 1);
final long inExecutionMinues = ChronoUnit.MINUTES.between(entityToExport.getStartDate(),UtilityLocalDate.getNowTime());
if(inExecutionMinues > 0)
entityToExport.setSyncronizedItemsPerMinute((int) (entityToExport.getSyncronizedItemCount() / inExecutionMinues));
}
}
}
public HashMap<Long, HashMap<String, Object>> getStatus() {
HashMap<Long, HashMap<String, Object>> total = new HashMap<>();
for(Long key : currentlyInExecution.keySet()) {
for(ExportHistoryItemDTO item : currentlyInExecution.get(key)) {
LocalDateTime endDate = item.getStartDate().plusMinutes(item.getTotalItemCount() / item.getSyncronizedItemsPerMinute());
HashMap<String, Object> value = new HashMap<>();
value.put("started_at", CommonConstants.DATETIME_YMD_DASHED_FORMATTER.format(item.getStartDate()));
value.put("enstimated_end", CommonConstants.DATETIME_YMD_DASHED_FORMATTER.format(endDate));
value.put("speed", item.getSyncronizedItemsPerMinute());
value.put("total", item.getTotalItemCount());
value.put("processed", item.getSyncronizedItemCount());
total.put(key, value);
}
}
return total;
}
}

View File

@@ -23,4 +23,9 @@ public class RemoteSyncronizationController {
return ServiceRestResponse.createPositiveResponse();
}
@RequestMapping(value = "publications/{groupId}/status", method = RequestMethod.GET)
public ServiceRestResponse statusPublication(@PathVariable long groupId) throws Exception {
return ServiceRestResponse.createPositiveResponse(remoteSyncronizationService.getPublicationStatus(groupId));
}
}

View File

@@ -3,19 +3,32 @@ package it.integry.ems.sync.dto;
import it.integry.ems.datasource.DataSource;
import it.integry.ems_model.base.EntityBase;
import java.time.LocalDateTime;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class ExportHistoryItemDTO {
private long publicationGroupId;
private DataSource dataSource;
private PublicationDTO publication;
private LocalDateTime startDate;
private long totalItemCount;
private long processedItemCount;
private long syncronizedItemCount;
private int syncronizedItemsPerMinute;
private Queue<EntityBase> toProcessQueue = new ConcurrentLinkedQueue<EntityBase>();
public long getPublicationGroupId() {
return publicationGroupId;
}
public ExportHistoryItemDTO setPublicationGroupId(long publicationGroupId) {
this.publicationGroupId = publicationGroupId;
return this;
}
public DataSource getDataSource() {
return dataSource;
@@ -35,6 +48,15 @@ public class ExportHistoryItemDTO {
return this;
}
public LocalDateTime getStartDate() {
return startDate;
}
public ExportHistoryItemDTO setStartDate(LocalDateTime startDate) {
this.startDate = startDate;
return this;
}
public long getTotalItemCount() {
return totalItemCount;
}
@@ -61,4 +83,22 @@ public class ExportHistoryItemDTO {
this.toProcessQueue = toProcessQueue;
return this;
}
public long getSyncronizedItemCount() {
return syncronizedItemCount;
}
public ExportHistoryItemDTO setSyncronizedItemCount(long syncronizedItemCount) {
this.syncronizedItemCount = syncronizedItemCount;
return this;
}
public int getSyncronizedItemsPerMinute() {
return syncronizedItemsPerMinute;
}
public ExportHistoryItemDTO setSyncronizedItemsPerMinute(int syncronizedItemsPerMinute) {
this.syncronizedItemsPerMinute = syncronizedItemsPerMinute;
return this;
}
}

View File

@@ -14,6 +14,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
@Service
@@ -44,7 +45,9 @@ public class RemoteSyncronizationService {
groupId,
PublicationDTO.fromStbPublicationDetail(stbPublicationsDetail));
}
}
public HashMap<Long, HashMap<String, Object>> getPublicationStatus(long groupId) {
return asyncHistoryManager.getStatus();
}
}

View File

@@ -0,0 +1,106 @@
package it.integry.ems_model.entity;
import com.fasterxml.jackson.annotation.JsonTypeName;
import it.integry.ems_model.annotation.*;
import it.integry.ems_model.base.EntityBase;
import org.kie.api.definition.type.PropertyReactive;
import java.time.LocalDateTime;
@Master
@PropertyReactive
@Table(StbTransactionLog.ENTITY)
@JsonTypeName(StbTransactionLog.ENTITY)
public class StbTransactionLog extends EntityBase {
private static final long serialVersionUID = 1L;
public static final String ENTITY = "stb_transaction_log";
@PK
@Identity
@SqlField(value = "id", nullable = false)
private long id;
@PK
@SqlField(value = "publication_group_id", nullable = false)
private long publicationGroupId;
@SqlField(value = "created_at")
private LocalDateTime createdAt;
@SqlField(value = "user_name")
private String userName;
@SqlField(value = "entities")
private String entities;
@SqlField(value = "entities_json", nullable = false)
private String entitiesJson;
@SqlField(value = "group_id", nullable = false)
private int groupId;
public long getId() {
return id;
}
public StbTransactionLog setId(long id) {
this.id = id;
return this;
}
public long getPublicationGroupId() {
return publicationGroupId;
}
public StbTransactionLog setPublicationGroupId(long publicationGroupId) {
this.publicationGroupId = publicationGroupId;
return this;
}
public LocalDateTime getCreatedAt() {
return createdAt;
}
public StbTransactionLog setCreatedAt(LocalDateTime createdAt) {
this.createdAt = createdAt;
return this;
}
public String getUserName() {
return userName;
}
public StbTransactionLog setUserName(String userName) {
this.userName = userName;
return this;
}
public String getEntities() {
return entities;
}
public StbTransactionLog setEntities(String entities) {
this.entities = entities;
return this;
}
public String getEntitiesJson() {
return entitiesJson;
}
public StbTransactionLog setEntitiesJson(String entitiesJson) {
this.entitiesJson = entitiesJson;
return this;
}
public int getGroupId() {
return groupId;
}
public StbTransactionLog setGroupId(int groupId) {
this.groupId = groupId;
return this;
}
}

View File

@@ -1,114 +0,0 @@
package it.integry.ems_model.entity;
import com.fasterxml.jackson.annotation.JsonTypeName;
import it.integry.ems_model.annotation.*;
import it.integry.ems_model.base.EntityBase;
import org.kie.api.definition.type.PropertyReactive;
import java.time.LocalDateTime;
@Master
@PropertyReactive
@Table(StbTransactionLogDb.ENTITY)
@JsonTypeName(StbTransactionLogDb.ENTITY)
public class StbTransactionLogDb extends EntityBase {
private static final long serialVersionUID = 1L;
public static final String ENTITY = "stb_transaction_log_db";
@PK
@Identity
@SqlField(value = "id", nullable = false)
private long id;
@SqlField(value = "transaction_date")
private LocalDateTime transactionDate;
@SqlField(value = "user_name")
private String userName;
@SqlField(value = "entity_name_list")
private String entityNameList;
// @SqlField(value = "error_msg")
// private String errorMsg;
// @SqlField(value = "esito")
// private String esito;
// @SqlField(value = "exec_date")
// private LocalDateTime execDate;
@SqlField(value = "transaction_json")
private String transactionJson;
@SqlField(value = "transaction_group_id")
private int transactionGroupIdField;
@SqlField(value = "transaction_query")
private String transactionQuery;
public long getId() {
return id;
}
public StbTransactionLogDb setId(long id) {
this.id = id;
return this;
}
public LocalDateTime getTransactionDate() {
return transactionDate;
}
public StbTransactionLogDb setTransactionDate(LocalDateTime transactionDate) {
this.transactionDate = transactionDate;
return this;
}
public String getUserName() {
return userName;
}
public StbTransactionLogDb setUserName(String userName) {
this.userName = userName;
return this;
}
public String getEntityNameList() {
return entityNameList;
}
public StbTransactionLogDb setEntityNameList(String entityNameList) {
this.entityNameList = entityNameList;
return this;
}
public String getTransactionJson() {
return transactionJson;
}
public StbTransactionLogDb setTransactionJson(String transactionJson) {
this.transactionJson = transactionJson;
return this;
}
public int getTransactionGroupIdField() {
return transactionGroupIdField;
}
public StbTransactionLogDb setTransactionGroupIdField(int transactionGroupIdField) {
this.transactionGroupIdField = transactionGroupIdField;
return this;
}
public String getTransactionQuery() {
return transactionQuery;
}
public StbTransactionLogDb setTransactionQuery(String transactionQuery) {
this.transactionQuery = transactionQuery;
return this;
}
}