Merge branch 'master' into develop
This commit is contained in:
@@ -430,6 +430,10 @@ public class UtilityDB {
|
||||
}
|
||||
|
||||
public static void executeStatement(Connection connection, String... sqls) throws SQLException {
|
||||
executeStatement(connection, Arrays.asList(sqls));
|
||||
}
|
||||
|
||||
public static void executeStatement(Connection connection, List<String> sqls) throws SQLException {
|
||||
Statement statement = connection.createStatement();
|
||||
|
||||
for (String sql : sqls) {
|
||||
|
||||
@@ -6,9 +6,11 @@ import com.annimon.stream.Stream;
|
||||
import it.integry.ems.exception.PrimaryDatabaseNotPresentException;
|
||||
import it.integry.ems.production.dto.MRP.*;
|
||||
import it.integry.ems.sync.MultiDBTransaction.MultiDBTransactionManager;
|
||||
import it.integry.ems_model.entity.CtbScad;
|
||||
import it.integry.ems_model.service.SetupGest;
|
||||
import it.integry.ems_model.utility.*;
|
||||
import it.integry.ems_model.utility.UtilityBigDecimal;
|
||||
import it.integry.ems_model.utility.UtilityDB;
|
||||
import it.integry.ems_model.utility.UtilityHashMap;
|
||||
import it.integry.ems_model.utility.UtilityString;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@@ -104,7 +106,8 @@ public class MrpDailyMaterialReqService {
|
||||
int id = 0;
|
||||
logger.debug(MrpDailyMaterialReqService.class.getSimpleName() + " - articoli caricati: " + datiArt.size());
|
||||
|
||||
Map<String, List<MrpDailyMaterialReqDetDTO>> listArticoli = Stream.of(mrpDailyMaterialReqDetDTO).collect(Collectors.groupingBy(MrpDailyMaterialReqDetDTO::getCodMart));
|
||||
Map<String, List<MrpDailyMaterialReqDetDTO>> listArticoli = Stream.of(mrpDailyMaterialReqDetDTO)
|
||||
.collect(Collectors.groupingBy(MrpDailyMaterialReqDetDTO::getCodMart));
|
||||
|
||||
for (MrpDailyMaterialReqDTO art : datiArt) {
|
||||
boolean inclusiSospesi = getSetupIncluseSospesi(art.getCodMgrp(), setupGruppi);
|
||||
|
||||
@@ -70,7 +70,6 @@ public class ExchangeColliImportService {
|
||||
MultiDBTransactionManager exchangeMultiDb,
|
||||
RequestDataDTO requestDataDTO,
|
||||
ExchangeImportSchemaManagerService.SchemaType colliSchemaType) throws Exception {
|
||||
|
||||
boolean useTempTable = true;
|
||||
|
||||
try {
|
||||
@@ -91,88 +90,108 @@ public class ExchangeColliImportService {
|
||||
testataTableName = null;
|
||||
}
|
||||
|
||||
List<MtbColt> exchangeImportedMtbColts = importColliLavorazione(
|
||||
exchangeMultiDb.getPrimaryConnection(),
|
||||
UtilityLocalDate.getNow().minusWeeks(1),
|
||||
UtilityLocalDate.getNow(),
|
||||
true, false, testataTableName, righeTableName);
|
||||
exchangeImportDataManagerService.prepareData(exchangeMultiDb.getPrimaryConnection(), useTempTable, exchangeImportSchemaManagerService.getTablesBySchemaType(colliSchemaType));
|
||||
|
||||
List<MtbColt> exchangeUpdatedMtbColts = importColliLavorazione(
|
||||
exchangeMultiDb.getPrimaryConnection(),
|
||||
UtilityLocalDate.getNow().minusWeeks(1),
|
||||
UtilityLocalDate.getNow(),
|
||||
false, useTempTable, testataTableName, righeTableName);
|
||||
AtomicInteger dataCount = new AtomicInteger(0);
|
||||
AtomicInteger importedCounter = new AtomicInteger(0);
|
||||
|
||||
List<EquatableEntityInterface> importedMtbColts = exchangeImportedMtbColts.stream()
|
||||
.map(x -> (EquatableEntityInterface) x)
|
||||
.collect(Collectors.toList());
|
||||
LocalDate startDate = LocalDate.of(2024, 7, 1);
|
||||
|
||||
List<EquatableEntityInterface> updatedMtbColts = exchangeUpdatedMtbColts.stream()
|
||||
.map(x -> (EquatableEntityInterface) x)
|
||||
.collect(Collectors.toList());
|
||||
Result<Object> firstErrorObjectIfPresent = null;
|
||||
|
||||
List<EquatableEntityInterface> allMtbColts = exchangeImportDataManagerService
|
||||
.runSync(MtbColt.class, importedMtbColts, updatedMtbColts);
|
||||
while (startDate.isBefore(UtilityLocalDate.getNow())) {
|
||||
|
||||
allMtbColts.forEach(x -> {
|
||||
x.setOperation(x.getOperation() == OperationType.INSERT ? OperationType.INSERT_OR_UPDATE : x.getOperation());
|
||||
((MtbColt) x).getMtbColr().forEach(y -> y.setOperation(y.getOperation() == OperationType.INSERT ? OperationType.INSERT_OR_UPDATE : y.getOperation()));
|
||||
});
|
||||
final LocalDate tempStartDate = startDate;
|
||||
final LocalDate tempEndDate = startDate;
|
||||
|
||||
startDate = startDate.plusDays(1);
|
||||
|
||||
AtomicInteger importedCounter = new AtomicInteger();
|
||||
List<MtbColt> exchangeImportedMtbColts = importColliLavorazione(
|
||||
exchangeMultiDb.getPrimaryConnection(),
|
||||
tempStartDate,
|
||||
tempEndDate,
|
||||
true, false, testataTableName, righeTableName);
|
||||
|
||||
List<RunnableThrowable> calls = new ArrayList<>();
|
||||
for (EquatableEntityInterface dataToSave : allMtbColts) {
|
||||
List<MtbColt> exchangeUpdatedMtbColts = importColliLavorazione(
|
||||
exchangeMultiDb.getPrimaryConnection(),
|
||||
tempStartDate,
|
||||
tempEndDate,
|
||||
false, useTempTable, testataTableName, righeTableName);
|
||||
|
||||
calls.add(() -> {
|
||||
List<EquatableEntityInterface> importedMtbColts = exchangeImportedMtbColts.stream()
|
||||
.map(x -> (EquatableEntityInterface) x)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
logger.debug("Importati {} colli di {}", importedCounter.incrementAndGet(), allMtbColts.size());
|
||||
try (MultiDBTransactionManager exchangeMultiDbThread = new MultiDBTransactionManager(exchangeMultiDb.getPrimaryDatasource().getProfile(), false);
|
||||
MultiDBTransactionManager internalMultiDbThread = new MultiDBTransactionManager(internalMultiDb.getPrimaryDatasource().getProfile(), false)) {
|
||||
List<EquatableEntityInterface> updatedMtbColts = exchangeUpdatedMtbColts.stream()
|
||||
.map(x -> (EquatableEntityInterface) x)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
MtbColt mtbColtToSave = (MtbColt) dataToSave;
|
||||
if (mtbColtToSave.hasDocument() && mtbColtToSave.getOperation() == OperationType.DELETE) {
|
||||
MtbColt mtbColtRemoveDocument = (MtbColt) mtbColtToSave.clone();
|
||||
mtbColtRemoveDocument.setMtbColr(new ArrayList<>())
|
||||
.setCodDtip(EmsRestConstants.NULL)
|
||||
.setSerDoc(EmsRestConstants.NULL)
|
||||
.setDataDoc(EmsRestConstants.DATE_NULL)
|
||||
.setOperation(OperationType.UPDATE);
|
||||
List<EquatableEntityInterface> allMtbColts = exchangeImportDataManagerService
|
||||
.runSync(MtbColt.class, importedMtbColts, updatedMtbColts);
|
||||
|
||||
entityProcessor.processEntity(mtbColtRemoveDocument, true, true, ROSSOGARGANO_EXCHANGE_USER, internalMultiDbThread, requestDataDTO);
|
||||
dataCount.addAndGet(allMtbColts.size());
|
||||
|
||||
mtbColtToSave.setCodDtip(EmsRestConstants.NULL)
|
||||
.setSerDoc(EmsRestConstants.NULL)
|
||||
.setDataDoc(EmsRestConstants.DATE_NULL);
|
||||
}
|
||||
|
||||
entityProcessor.processEntity(mtbColtToSave, true, true, ROSSOGARGANO_EXCHANGE_USER, internalMultiDbThread, requestDataDTO);
|
||||
singleUpdateImported(exchangeMultiDbThread.getPrimaryConnection(), mtbColtToSave, testataTableName, useTempTable);
|
||||
singleUpdateImported(exchangeMultiDbThread.getPrimaryConnection(), mtbColtToSave, righeTableName, useTempTable);
|
||||
|
||||
internalMultiDbThread.commitAll();
|
||||
exchangeMultiDbThread.commitAll();
|
||||
} catch (Exception ex) {
|
||||
MtbColt collo = (MtbColt) dataToSave;
|
||||
logger.error("Errore durante l'importazione del collo [num: " + collo.getNumCollo() + ", " +
|
||||
"data: " + UtilityLocalDate.formatDate(collo.getDataCollo(), CommonConstants.DATE_FORMAT_DMY) + ", " +
|
||||
"serie: " + collo.getSerCollo() + ", " +
|
||||
"gestione: " + collo.getGestione() + "]",
|
||||
ex);
|
||||
|
||||
throw ex;
|
||||
}
|
||||
allMtbColts.forEach(x -> {
|
||||
x.setOperation(x.getOperation() == OperationType.INSERT ? OperationType.INSERT_OR_UPDATE : x.getOperation());
|
||||
((MtbColt) x).getMtbColr().forEach(y -> y.setOperation(y.getOperation() == OperationType.INSERT ? OperationType.INSERT_OR_UPDATE : y.getOperation()));
|
||||
});
|
||||
|
||||
|
||||
List<RunnableThrowable> calls = new ArrayList<>();
|
||||
for (EquatableEntityInterface dataToSave : allMtbColts) {
|
||||
|
||||
calls.add(() -> {
|
||||
|
||||
logger.debug("Importati {} colli di {}", importedCounter.incrementAndGet(), dataCount);
|
||||
try (MultiDBTransactionManager exchangeMultiDbThread = new MultiDBTransactionManager(exchangeMultiDb.getPrimaryDatasource().getProfile(), false);
|
||||
MultiDBTransactionManager internalMultiDbThread = new MultiDBTransactionManager(internalMultiDb.getPrimaryDatasource().getProfile(), false)) {
|
||||
|
||||
MtbColt mtbColtToSave = (MtbColt) dataToSave;
|
||||
if (mtbColtToSave.hasDocument() && mtbColtToSave.getOperation() == OperationType.DELETE) {
|
||||
MtbColt mtbColtRemoveDocument = (MtbColt) mtbColtToSave.clone();
|
||||
mtbColtRemoveDocument.setMtbColr(new ArrayList<>())
|
||||
.setCodDtip(EmsRestConstants.NULL)
|
||||
.setSerDoc(EmsRestConstants.NULL)
|
||||
.setDataDoc(EmsRestConstants.DATE_NULL)
|
||||
.setNumDoc(EmsRestConstants.INTEGER_NULL)
|
||||
.setOperation(OperationType.UPDATE);
|
||||
|
||||
entityProcessor.processEntity(mtbColtRemoveDocument, true, true, ROSSOGARGANO_EXCHANGE_USER, internalMultiDbThread, requestDataDTO);
|
||||
|
||||
mtbColtToSave.setCodDtip(null)
|
||||
.setSerDoc(null)
|
||||
.setDataDoc(null)
|
||||
.setNumDoc(null);
|
||||
}
|
||||
|
||||
entityProcessor.processEntity(mtbColtToSave, true, true, ROSSOGARGANO_EXCHANGE_USER, internalMultiDbThread, requestDataDTO);
|
||||
singleUpdateImported(exchangeMultiDbThread.getPrimaryConnection(), mtbColtToSave, testataTableName, useTempTable);
|
||||
singleUpdateImported(exchangeMultiDbThread.getPrimaryConnection(), mtbColtToSave, righeTableName, useTempTable);
|
||||
|
||||
internalMultiDbThread.commitAll();
|
||||
exchangeMultiDbThread.commitAll();
|
||||
} catch (Exception ex) {
|
||||
MtbColt collo = (MtbColt) dataToSave;
|
||||
logger.error("Errore durante l'importazione del collo [num: " + collo.getNumCollo() + ", " +
|
||||
"data: " + UtilityLocalDate.formatDate(collo.getDataCollo(), CommonConstants.DATE_FORMAT_DMY) + ", " +
|
||||
"serie: " + collo.getSerCollo() + ", " +
|
||||
"gestione: " + collo.getGestione() + "]",
|
||||
ex);
|
||||
|
||||
throw ex;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
final ArrayList<Result<Object>> results = UtilityThread.executeParallel(calls);
|
||||
|
||||
firstErrorObjectIfPresent = results.stream()
|
||||
.filter(x -> x instanceof Result.Error)
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
|
||||
}
|
||||
|
||||
final ArrayList<Result<Object>> results = UtilityThread.executeParallel(calls);
|
||||
|
||||
final Result<Object> firstErrorObjectIfPresent = results.stream()
|
||||
.filter(x -> x instanceof Result.Error)
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
|
||||
if (firstErrorObjectIfPresent != null) throw ((Result.Error) firstErrorObjectIfPresent).getError();
|
||||
} finally {
|
||||
if (useTempTable)
|
||||
@@ -181,7 +200,8 @@ public class ExchangeColliImportService {
|
||||
}
|
||||
|
||||
private List<MtbColt> importColliLavorazione(Connection connection,
|
||||
LocalDate minDate, LocalDate maxDate, boolean retrieveAlreadyImported, boolean useTempTable, String testataTableName, String righeTableName) throws Exception {
|
||||
LocalDate minDate, LocalDate maxDate, boolean retrieveAlreadyImported, boolean useTempTable, String
|
||||
testataTableName, String righeTableName) throws Exception {
|
||||
|
||||
String mtbColtLavOriginalName = testataTableName;
|
||||
String mtbColtLavTableName = mtbColtLavOriginalName + (useTempTable ? "_tmp" : "");
|
||||
@@ -189,16 +209,6 @@ public class ExchangeColliImportService {
|
||||
String mtbColrLavTableName = mtbColrLavOriginalName + (useTempTable ? "_tmp" : "");
|
||||
|
||||
|
||||
if (useTempTable) {
|
||||
UtilityDB.executeStatement(connection,
|
||||
"INSERT INTO " + mtbColtLavTableName +
|
||||
" SELECT * FROM " + mtbColtLavOriginalName,
|
||||
"INSERT INTO " + mtbColrLavTableName +
|
||||
" SELECT * FROM " + mtbColrLavOriginalName
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
final List<MtbColt> mtbColtLav = exchangeImportDataManagerService.retrieveDataFromExchange(connection, MtbColt.class,
|
||||
mtbColtLavTableName, Query.format("data_collo BETWEEN {} AND {}", minDate, maxDate), retrieveAlreadyImported);
|
||||
|
||||
@@ -222,7 +232,8 @@ public class ExchangeColliImportService {
|
||||
}
|
||||
|
||||
|
||||
private void singleUpdateImported(Connection connection, MtbColt importedDataKey, String tableName, boolean useTempTable) throws Exception {
|
||||
private void singleUpdateImported(Connection connection, MtbColt importedDataKey, String tableName,
|
||||
boolean useTempTable) throws Exception {
|
||||
final HashMap<String, Object> importedKey = new HashMap<String, Object>() {{
|
||||
put("data_collo", importedDataKey.getDataCollo());
|
||||
put("ser_collo", importedDataKey.getSerCollo());
|
||||
@@ -234,7 +245,8 @@ public class ExchangeColliImportService {
|
||||
}
|
||||
|
||||
|
||||
public List<CertificatiSinfoOneDTO> getCertificati(String profileDb, String codAnag, LocalDate dataCert, String codMart, String codVdes, String partitaMag) throws Exception {
|
||||
public List<CertificatiSinfoOneDTO> getCertificati(String profileDb, String codAnag, LocalDate dataCert, String
|
||||
codMart, String codVdes, String partitaMag) throws Exception {
|
||||
|
||||
try (MultiDBTransactionManager multiDBTransactionManager = new MultiDBTransactionManager(profileDb)) {
|
||||
|
||||
|
||||
@@ -12,10 +12,8 @@ import org.springframework.stereotype.Service;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.sql.Connection;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.sql.SQLException;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
@@ -23,6 +21,20 @@ import java.util.stream.Collectors;
|
||||
public class ExchangeImportDataManagerService {
|
||||
|
||||
|
||||
public void prepareData(Connection connection, boolean useTempTable, String... tables) throws SQLException {
|
||||
prepareData(connection, useTempTable, Arrays.asList(tables));
|
||||
}
|
||||
|
||||
public void prepareData(Connection connection, boolean useTempTable, List<String> tables) throws SQLException {
|
||||
if (useTempTable) {
|
||||
final List<String> sqls = tables.stream()
|
||||
.map(x -> String.format("INSERT INTO %s_tmp SELECT * FROM %s", x, x))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
UtilityDB.executeStatement(connection, sqls);
|
||||
}
|
||||
}
|
||||
|
||||
public <T extends EquatableEntityInterface> List<T> retrieveDataFromExchange(Connection connection,
|
||||
Class<T> clazz,
|
||||
String tableName,
|
||||
|
||||
@@ -73,6 +73,10 @@ public class ExchangeImportSchemaManagerService {
|
||||
put(SchemaType.ValorizzazioneCertificati, Collections.singletonList("valorizzazione_certificati"));
|
||||
}};
|
||||
|
||||
public List<String> getTablesBySchemaType(SchemaType schemaType) {
|
||||
return schemaToTableBinding.get(schemaType);
|
||||
}
|
||||
|
||||
public void syncSchema(Connection connection, SchemaType schemaType, boolean createTempTablesToo) throws Exception {
|
||||
|
||||
SQLServerDBSchemaManager sqlServerDBSchemaManager = new SQLServerDBSchemaManager(connection);
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package it.integry.ems.system.exchange.service;
|
||||
|
||||
import it.integry.ems.expansion.RunnableThrowable;
|
||||
import it.integry.ems.javabeans.RequestDataDTO;
|
||||
import it.integry.ems.production.agribook.AgribookFieldService;
|
||||
import it.integry.ems.production.agribook.model.AgribookNewFieldRequestDTO;
|
||||
@@ -23,7 +22,10 @@ import org.springframework.stereotype.Service;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.time.LocalDate;
|
||||
import java.util.*;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -50,11 +52,15 @@ public class ExchangeOrdiniImportService {
|
||||
|
||||
public void importOrdiniLavorazione(MultiDBTransactionManager internalMultiDb, MultiDBTransactionManager exchangeMultiDb, RequestDataDTO requestDataDTO) throws Exception {
|
||||
|
||||
ExchangeImportSchemaManagerService.SchemaType schemaType = ExchangeImportSchemaManagerService.SchemaType.OrdiniLavorazione;
|
||||
|
||||
boolean useTempTable = true;
|
||||
|
||||
|
||||
try {
|
||||
exchangeImportSchemaManagerService.syncSchema(exchangeMultiDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.OrdiniLavorazione, useTempTable);
|
||||
exchangeImportSchemaManagerService.syncSchema(exchangeMultiDb.getPrimaryConnection(), schemaType, useTempTable);
|
||||
exchangeImportDataManagerService.prepareData(exchangeMultiDb.getPrimaryConnection(), useTempTable,
|
||||
exchangeImportSchemaManagerService.getTablesBySchemaType(schemaType));
|
||||
|
||||
final List<DtbOrdt> exchangeImportedMtbColts = importOrdiniLavorazione(
|
||||
exchangeMultiDb.getPrimaryConnection(),
|
||||
@@ -93,17 +99,13 @@ public class ExchangeOrdiniImportService {
|
||||
);
|
||||
|
||||
final Exception[] firstExceptionToThrow = {null};
|
||||
|
||||
final AtomicInteger[] importedCounter = {new AtomicInteger()};
|
||||
final AtomicInteger importedCounter = new AtomicInteger();
|
||||
|
||||
for (List<EquatableEntityInterface> listToProcess : splittedOrders) {
|
||||
List<RunnableThrowable> calls = new ArrayList<>();
|
||||
|
||||
|
||||
for (EquatableEntityInterface dataToSave : listToProcess) {
|
||||
//calls.add(() -> {
|
||||
|
||||
logger.debug("Importati {} ordini di {}", importedCounter[0].incrementAndGet(), allData.size());
|
||||
logger.debug("Importati {} ordini di {}", importedCounter.incrementAndGet(), allData.size());
|
||||
try {
|
||||
entityProcessor.processEntity(dataToSave, true, true, ROSSOGARGANO_EXCHANGE_USER, internalMultiDb, requestDataDTO);
|
||||
|
||||
@@ -118,15 +120,12 @@ public class ExchangeOrdiniImportService {
|
||||
logger.error("Errore durante l'importazione dell'ordine [num: " + order.getNumOrd() + "," +
|
||||
"data: " + order.getDataOrd() + "," +
|
||||
"gestione: " + order.getGestione() + "]", ex);
|
||||
|
||||
internalMultiDb.rollbackAll();
|
||||
//throw ex;
|
||||
exchangeMultiDb.rollbackAll();
|
||||
}
|
||||
//});
|
||||
|
||||
}
|
||||
|
||||
|
||||
//UtilityThread.executeParallel(calls);
|
||||
}
|
||||
|
||||
if (firstExceptionToThrow[0] != null) throw firstExceptionToThrow[0];
|
||||
@@ -145,15 +144,6 @@ public class ExchangeOrdiniImportService {
|
||||
String dtbOrdrTableName = dtbOrdrOriginalTableName + (useTempTable ? "_tmp" : "");
|
||||
|
||||
|
||||
if (useTempTable) {
|
||||
UtilityDB.executeStatement(connection,
|
||||
"INSERT INTO " + dtbOrdtTableName +
|
||||
" SELECT * FROM " + dtbOrdtOriginalTableName,
|
||||
"INSERT INTO " + dtbOrdrTableName +
|
||||
" SELECT * FROM " + dtbOrdrOriginalTableName
|
||||
);
|
||||
}
|
||||
|
||||
List<DtbOrdt> dtbOrdtLav = exchangeImportDataManagerService.retrieveDataFromExchange(connection, DtbOrdt.class,
|
||||
dtbOrdtTableName, Query.format("data_ord BETWEEN {} AND {}", minDate, maxDate), retrieveAlreadyImported);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user