Migliorie su EXCHANGE

This commit is contained in:
2024-09-30 10:50:16 +02:00
parent ad2b7b3905
commit f8004fab38
3 changed files with 180 additions and 176 deletions

View File

@@ -96,7 +96,6 @@ public class ExchangeColliImportService {
AtomicInteger importedCounter = new AtomicInteger(0);
LocalDate startDate = LocalDate.of(2024, 7, 1);
Result<Object> firstErrorObjectIfPresent = null;
while (startDate.isBefore(UtilityLocalDate.getNow())) {

View File

@@ -1,6 +1,6 @@
package it.integry.ems.system.exchange.service;
import it.integry.ems.expansion.RunnableThrowable;
import it.integry.ems.dto.Result;
import it.integry.ems.javabeans.RequestDataDTO;
import it.integry.ems.service.EntityProcessor;
import it.integry.ems.sync.MultiDBTransaction.MultiDBTransactionManager;
@@ -9,7 +9,6 @@ import it.integry.ems_model.entity.DtbDocr;
import it.integry.ems_model.entity.DtbDoct;
import it.integry.ems_model.types.OperationType;
import it.integry.ems_model.utility.Query;
import it.integry.ems_model.utility.UtilityDB;
import it.integry.ems_model.utility.UtilityLocalDate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -46,141 +45,147 @@ public class ExchangeDocumentImportService {
public void importTestateDocumentiLavorazione(MultiDBTransactionManager internalMultiDb, MultiDBTransactionManager exchangeMultiDb, RequestDataDTO requestDataDTO) throws Exception {
boolean useTempTable = true;
ExchangeImportSchemaManagerService.SchemaType schemaType = ExchangeImportSchemaManagerService.SchemaType.DocumentiLavorazione;
try {
exchangeImportSchemaManagerService.syncSchema(exchangeMultiDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.DocumentiLavorazione, useTempTable);
exchangeImportSchemaManagerService.syncSchema(exchangeMultiDb.getPrimaryConnection(), schemaType, useTempTable);
exchangeImportDataManagerService.prepareData(exchangeMultiDb.getPrimaryConnection(), useTempTable,
exchangeImportSchemaManagerService.getTablesBySchemaType(schemaType));
final List<DtbDoct> exchangeImportedTestateData = importTestateDocumentiLavorazione(
exchangeMultiDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusMonths(1),
UtilityLocalDate.getNow(),
true, false);
AtomicInteger dataCount = new AtomicInteger(0);
AtomicInteger importedCounter = new AtomicInteger(0);
final List<DtbDoct> exchangeUpdatedTestateData = importTestateDocumentiLavorazione(
exchangeMultiDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusMonths(1),
UtilityLocalDate.getNow(),
false, useTempTable);
LocalDate startDate = LocalDate.of(2024, 7, 1);
Result<DtbDoct> firstErrorObjectIfPresent = null;
while (startDate.isBefore(UtilityLocalDate.getNow())) {
final LocalDate tempStartDate = startDate;
final LocalDate tempEndDate = startDate;
startDate = startDate.plusDays(1);
final List<DtbDoct> exchangeImportedTestateData = importTestateDocumentiLavorazione(
exchangeMultiDb.getPrimaryConnection(),
tempStartDate,
tempEndDate,
true, false);
final List<DtbDoct> exchangeUpdatedTestateData = importTestateDocumentiLavorazione(
exchangeMultiDb.getPrimaryConnection(),
tempStartDate,
tempEndDate,
false, useTempTable);
final List<DtbDocr> exchangeImportedRigheData = importRigheDocumentiLavorazione(
exchangeMultiDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusMonths(1),
UtilityLocalDate.getNow(),
true, false);
final List<DtbDocr> exchangeImportedRigheData = importRigheDocumentiLavorazione(
exchangeMultiDb.getPrimaryConnection(),
tempStartDate,
tempEndDate,
true, false);
final List<DtbDocr> exchangeUpdatedRigheData = importRigheDocumentiLavorazione(
exchangeMultiDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusMonths(1),
UtilityLocalDate.getNow(),
false, useTempTable);
final List<DtbDocr> exchangeUpdatedRigheData = importRigheDocumentiLavorazione(
exchangeMultiDb.getPrimaryConnection(),
tempStartDate,
tempEndDate,
false, useTempTable);
List<EquatableEntityInterface> allTestateData = exchangeImportDataManagerService
.runSync(DtbDoct.class, exchangeImportedTestateData, exchangeUpdatedTestateData);
List<EquatableEntityInterface> allTestateData = exchangeImportDataManagerService
.runSync(DtbDoct.class, exchangeImportedTestateData, exchangeUpdatedTestateData);
List<EquatableEntityInterface> allRigheData = exchangeImportDataManagerService
.runSync(DtbDocr.class, exchangeImportedRigheData, exchangeUpdatedRigheData);
List<EquatableEntityInterface> allRigheData = exchangeImportDataManagerService
.runSync(DtbDocr.class, exchangeImportedRigheData, exchangeUpdatedRigheData);
allTestateData.parallelStream().forEach(x -> {
((DtbDoct) x).setUpdProgMaga(false);
// .setOperation(x.getOperation() == OperationType.INSERT ? OperationType.INSERT_OR_UPDATE : x.getOperation());
// ((DtbDoct) x).getDtbDocr().parallelStream().forEach(y -> y.setOperation(y.getOperation() == OperationType.INSERT ? OperationType.INSERT_OR_UPDATE : y.getOperation()));
allTestateData.parallelStream().forEach(x -> {
((DtbDoct) x).setUpdProgMaga(false);
});
});
allRigheData.stream()
.map(x -> (DtbDocr) x)
.filter(x -> allTestateData.stream()
.map(y -> (DtbDoct) y)
.noneMatch(y -> Objects.hash(x.getDataDoc(), x.getNumDoc(), x.getSerDoc(), x.getCodAnag(), x.getCodDtip()) ==
Objects.hash(y.getDataDoc(), y.getNumDoc(), y.getSerDoc(), y.getCodAnag(), y.getCodDtip()) &&
allRigheData.stream()
.map(x -> (DtbDocr) x)
.filter(x -> allTestateData.stream()
.map(y -> (DtbDoct) y)
.noneMatch(y -> Objects.hash(x.getDataDoc(), x.getNumDoc(), x.getSerDoc(), x.getCodAnag(), x.getCodDtip()) ==
Objects.hash(y.getDataDoc(), y.getNumDoc(), y.getSerDoc(), y.getCodAnag(), y.getCodDtip()) &&
(Objects.equals(x.getCodAnag(), y.getCodAnag()) &&
Objects.equals(x.getCodDtip(), y.getCodDtip()) &&
Objects.equals(x.getDataDoc(), y.getDataDoc()) &&
Objects.equals(x.getNumDoc(), y.getNumDoc()) &&
Objects.equals(x.getSerDoc(), y.getSerDoc()))))
.forEach(x -> {
DtbDoct testata = new DtbDoct()
.setSerDoc(x.getSerDoc())
.setDataDoc(x.getDataDoc())
.setNumDoc(x.getNumDoc())
.setCodDtip(x.getCodDtip())
.setCodAnag(x.getCodAnag())
.setUpdProgMaga(false);
(Objects.equals(x.getCodAnag(), y.getCodAnag()) &&
Objects.equals(x.getCodDtip(), y.getCodDtip()) &&
Objects.equals(x.getDataDoc(), y.getDataDoc()) &&
Objects.equals(x.getNumDoc(), y.getNumDoc()) &&
Objects.equals(x.getSerDoc(), y.getSerDoc()))))
.forEach(x -> {
DtbDoct testata = new DtbDoct()
.setSerDoc(x.getSerDoc())
.setDataDoc(x.getDataDoc())
.setNumDoc(x.getNumDoc())
.setCodDtip(x.getCodDtip())
.setCodAnag(x.getCodAnag())
.setUpdProgMaga(false);
testata.setOperation(OperationType.UPDATE);
allTestateData.add(testata);
});
testata.setOperation(OperationType.UPDATE);
allTestateData.add(testata);
});
allTestateData.parallelStream()
.map(x -> (DtbDoct) x)
.forEach(x ->
x.setDtbDocr(allRigheData.stream()
.map(y -> (DtbDocr) y)
.filter(y ->
Objects.hash(x.getDataDoc(), x.getNumDoc(), x.getSerDoc(), x.getCodAnag(), x.getCodDtip()) ==
Objects.hash(y.getDataDoc(), y.getNumDoc(), y.getSerDoc(), y.getCodAnag(), y.getCodDtip()) &&
allTestateData.parallelStream()
.map(x -> (DtbDoct) x)
.forEach(x ->
x.setDtbDocr(allRigheData.stream()
.map(y -> (DtbDocr) y)
.filter(y ->
Objects.hash(x.getDataDoc(), x.getNumDoc(), x.getSerDoc(), x.getCodAnag(), x.getCodDtip()) ==
Objects.hash(y.getDataDoc(), y.getNumDoc(), y.getSerDoc(), y.getCodAnag(), y.getCodDtip()) &&
(Objects.equals(x.getCodAnag(), y.getCodAnag()) &&
Objects.equals(x.getCodDtip(), y.getCodDtip()) &&
Objects.equals(x.getDataDoc(), y.getDataDoc()) &&
Objects.equals(x.getNumDoc(), y.getNumDoc()) &&
Objects.equals(x.getSerDoc(), y.getSerDoc())))
.collect(Collectors.toList())));
(Objects.equals(x.getCodAnag(), y.getCodAnag()) &&
Objects.equals(x.getCodDtip(), y.getCodDtip()) &&
Objects.equals(x.getDataDoc(), y.getDataDoc()) &&
Objects.equals(x.getNumDoc(), y.getNumDoc()) &&
Objects.equals(x.getSerDoc(), y.getSerDoc())))
.collect(Collectors.toList())));
List<EquatableEntityInterface> allData = allTestateData;
final Exception[] firstExceptionToThrow = {null};
AtomicInteger importedCounter = new AtomicInteger();
List<RunnableThrowable> calls = new ArrayList<>();
dataCount.addAndGet(allTestateData.size());
for (EquatableEntityInterface dataToSave : allData) {
for (EquatableEntityInterface dataToSave : allTestateData) {
DtbDoct document = (DtbDoct) dataToSave;
DtbDoct document = (DtbDoct) dataToSave;
// calls.add(() -> {
logger.debug("Importati {} documenti di {}", importedCounter.incrementAndGet(), allData.size());
try {
//Inserisco prima la testata perché se inserisco testata+righe e la testata ha INSERT,
// le righe vengono salvate con ID riga calcolato e non con ID riga passato
DtbDoct cloneDocument = (DtbDoct) document.clone();
cloneDocument.setDtbDocr(new ArrayList<>());
entityProcessor.processEntity(cloneDocument, true, true, ROSSOGARGANO_EXCHANGE_USER, internalMultiDb, requestDataDTO);
singleUpdateImported(exchangeMultiDb.getPrimaryConnection(), (DtbDoct) dataToSave, useTempTable);
logger.debug("Importati {} documenti di {}", importedCounter.incrementAndGet(), dataCount.get());
try {
//Inserisco prima la testata perché se inserisco testata+righe e la testata ha INSERT,
// le righe vengono salvate con ID riga calcolato e non con ID riga passato
DtbDoct cloneDocument = (DtbDoct) document.clone();
cloneDocument.setDtbDocr(new ArrayList<>());
entityProcessor.processEntity(cloneDocument, true, true, ROSSOGARGANO_EXCHANGE_USER, internalMultiDb, requestDataDTO);
singleUpdateImported(exchangeMultiDb.getPrimaryConnection(), (DtbDoct) dataToSave, useTempTable);
if (!document.getDtbDocr().isEmpty()) {
document.setOperation(OperationType.NO_OP);
entityProcessor.processEntity(document, true, true, ROSSOGARGANO_EXCHANGE_USER, internalMultiDb, requestDataDTO);
singleUpdateImported(exchangeMultiDb.getPrimaryConnection(), document.getDtbDocr(), useTempTable);
if (!document.getDtbDocr().isEmpty()) {
document.setOperation(OperationType.NO_OP);
entityProcessor.processEntity(document, true, true, ROSSOGARGANO_EXCHANGE_USER, internalMultiDb, requestDataDTO);
singleUpdateImported(exchangeMultiDb.getPrimaryConnection(), document.getDtbDocr(), useTempTable);
}
internalMultiDb.commitAll();
exchangeMultiDb.commitAll();
} catch (Exception ex) {
if (firstErrorObjectIfPresent == null) firstErrorObjectIfPresent = new Result.Error<>(ex);
logger.error("Errore durante l'importazione del documento [" +
"num: " + document.getNumDoc() + ", " +
"serie: " + document.getSerDoc() + ", " +
"data: " + document.getDataDoc() + ", " +
"cod anag: " + document.getCodAnag() + ", " +
"cod dtip: " + document.getCodDtip() + "]", ex);
internalMultiDb.rollbackAll();
exchangeMultiDb.rollbackAll();
}
internalMultiDb.commitAll();
exchangeMultiDb.commitAll();
} catch (Exception ex) {
if (firstExceptionToThrow[0] == null) firstExceptionToThrow[0] = ex;
logger.error("Errore durante l'importazione del documento [" +
"num: " + document.getNumDoc() + ", " +
"serie: " + document.getSerDoc() + ", " +
"data: " + document.getDataDoc() + ", " +
"cod anag: " + document.getCodAnag() + ", " +
"cod dtip: " + document.getCodDtip() + "]", ex);
//multiDBTransactionManager.rollbackAll();
//throw ex;
}
// });
}
// UtilityThread.executeParallel(calls);
if (firstExceptionToThrow[0] != null) throw firstExceptionToThrow[0];
if (firstErrorObjectIfPresent != null) throw ((Result.Error) firstErrorObjectIfPresent).getError();
} finally {
if (useTempTable)
exchangeImportSchemaManagerService.deleteTempTables(exchangeMultiDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.DocumentiLavorazione);
exchangeImportSchemaManagerService.deleteTempTables(exchangeMultiDb.getPrimaryConnection(), schemaType);
}
}
@@ -191,13 +196,6 @@ public class ExchangeDocumentImportService {
String dtbDoctLavOriginalName = "dtb_doct_lav";
String dtbDoctLavTableName = dtbDoctLavOriginalName + (useTempTable ? "_tmp" : "");
if (useTempTable) {
UtilityDB.executeStatement(connection,
"INSERT INTO " + dtbDoctLavTableName +
" SELECT * FROM " + dtbDoctLavOriginalName
);
}
final List<DtbDoct> dtbDoctLav = exchangeImportDataManagerService.retrieveDataFromExchange(connection, DtbDoct.class,
dtbDoctLavTableName, Query.format("data_doc BETWEEN {} AND {}", minDate, maxDate), retrieveAlreadyImported);
@@ -216,16 +214,8 @@ public class ExchangeDocumentImportService {
LocalDate minDate, LocalDate maxDate, boolean retrieveAlreadyImported, boolean useTempTable) throws Exception {
String dtbDocrLavOriginalName = "dtb_docr_lav";
String dtbDocrLavTableName = dtbDocrLavOriginalName + (useTempTable ? "_tmp" : "");
if (useTempTable) {
UtilityDB.executeStatement(connection,
"INSERT INTO " + dtbDocrLavTableName +
" SELECT * FROM " + dtbDocrLavOriginalName
);
}
return exchangeImportDataManagerService.retrieveDataFromExchange(connection, DtbDocr.class,
dtbDocrLavTableName, Query.format("data_doc BETWEEN {} AND {}", minDate, maxDate), retrieveAlreadyImported);

View File

@@ -1,5 +1,6 @@
package it.integry.ems.system.exchange.service;
import it.integry.ems.dto.Result;
import it.integry.ems.javabeans.RequestDataDTO;
import it.integry.ems.production.agribook.AgribookFieldService;
import it.integry.ems.production.agribook.model.AgribookNewFieldRequestDTO;
@@ -62,73 +63,87 @@ public class ExchangeOrdiniImportService {
exchangeImportDataManagerService.prepareData(exchangeMultiDb.getPrimaryConnection(), useTempTable,
exchangeImportSchemaManagerService.getTablesBySchemaType(schemaType));
final List<DtbOrdt> exchangeImportedMtbColts = importOrdiniLavorazione(
exchangeMultiDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusMonths(1),
UtilityLocalDate.getNow(),
true, false);
AtomicInteger dataCount = new AtomicInteger(0);
AtomicInteger importedCounter = new AtomicInteger(0);
final List<DtbOrdt> exchangeUpdatedMtbColts = importOrdiniLavorazione(
exchangeMultiDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusMonths(1),
UtilityLocalDate.getNow(),
false, useTempTable);
LocalDate startDate = LocalDate.of(2024, 7, 1);
List<EquatableEntityInterface> allData = exchangeImportDataManagerService
.runSync(DtbOrdt.class, exchangeImportedMtbColts, exchangeUpdatedMtbColts);
Result<DtbOrdt> firstErrorObjectIfPresent = null;
allData.stream()
.map(x -> (DtbOrdt) x)
.filter(x -> x.getGestione().equalsIgnoreCase("L") &&
(x.getOperation() == OperationType.INSERT_OR_UPDATE || x.getOperation() == OperationType.INSERT))
.forEach(x -> {
DtbOrdSteps ordStep =
new DtbOrdSteps()
.setIdRiga(0)
.setCodJfas(x.getCodJfas())
.setQtaProd(x.getQtaProd());
ordStep.setOperation(OperationType.INSERT_OR_UPDATE);
x.getDtbOrdSteps().add(ordStep);
});
while (startDate.isBefore(UtilityLocalDate.getNow())) {
List<List<EquatableEntityInterface>> splittedOrders =
Arrays.asList(
allData.stream().filter(x -> ((DtbOrdt) x).getGestione().equalsIgnoreCase("A")).collect(Collectors.toList()),
allData.stream().filter(x -> !((DtbOrdt) x).getGestione().equalsIgnoreCase("A")).collect(Collectors.toList())
);
final LocalDate tempStartDate = startDate;
final LocalDate tempEndDate = startDate;
final Exception[] firstExceptionToThrow = {null};
final AtomicInteger importedCounter = new AtomicInteger();
startDate = startDate.plusDays(1);
for (List<EquatableEntityInterface> listToProcess : splittedOrders) {
final List<DtbOrdt> exchangeImportedMtbColts = importOrdiniLavorazione(
exchangeMultiDb.getPrimaryConnection(),
tempStartDate,
tempEndDate,
true, false);
for (EquatableEntityInterface dataToSave : listToProcess) {
final List<DtbOrdt> exchangeUpdatedMtbColts = importOrdiniLavorazione(
exchangeMultiDb.getPrimaryConnection(),
tempStartDate,
tempEndDate,
false, useTempTable);
logger.debug("Importati {} ordini di {}", importedCounter.incrementAndGet(), allData.size());
try {
entityProcessor.processEntity(dataToSave, true, true, ROSSOGARGANO_EXCHANGE_USER, internalMultiDb, requestDataDTO);
List<EquatableEntityInterface> allData = exchangeImportDataManagerService
.runSync(DtbOrdt.class, exchangeImportedMtbColts, exchangeUpdatedMtbColts);
dataCount.addAndGet(allData.size());
singleUpdateImported(exchangeMultiDb.getPrimaryConnection(), (DtbOrdt) dataToSave, useTempTable);
internalMultiDb.commitAll();
exchangeMultiDb.commitAll();
} catch (Exception ex) {
if (firstExceptionToThrow[0] == null) firstExceptionToThrow[0] = ex;
allData.stream()
.map(x -> (DtbOrdt) x)
.filter(x -> x.getGestione().equalsIgnoreCase("L") &&
(x.getOperation() == OperationType.INSERT_OR_UPDATE || x.getOperation() == OperationType.INSERT))
.forEach(x -> {
DtbOrdSteps ordStep =
new DtbOrdSteps()
.setIdRiga(0)
.setCodJfas(x.getCodJfas())
.setQtaProd(x.getQtaProd());
DtbOrdt order = (DtbOrdt) dataToSave;
ordStep.setOperation(OperationType.INSERT_OR_UPDATE);
x.getDtbOrdSteps().add(ordStep);
});
logger.error("Errore durante l'importazione dell'ordine [num: " + order.getNumOrd() + "," +
"data: " + order.getDataOrd() + "," +
"gestione: " + order.getGestione() + "]", ex);
List<List<EquatableEntityInterface>> splittedOrders =
Arrays.asList(
allData.stream().filter(x -> ((DtbOrdt) x).getGestione().equalsIgnoreCase("A")).collect(Collectors.toList()),
allData.stream().filter(x -> !((DtbOrdt) x).getGestione().equalsIgnoreCase("A")).collect(Collectors.toList())
);
for (List<EquatableEntityInterface> listToProcess : splittedOrders) {
for (EquatableEntityInterface dataToSave : listToProcess) {
logger.debug("Importati {} ordini di {}", importedCounter.incrementAndGet(), dataCount.get());
try {
entityProcessor.processEntity(dataToSave, true, true, ROSSOGARGANO_EXCHANGE_USER, internalMultiDb, requestDataDTO);
singleUpdateImported(exchangeMultiDb.getPrimaryConnection(), (DtbOrdt) dataToSave, useTempTable);
internalMultiDb.commitAll();
exchangeMultiDb.commitAll();
} catch (Exception ex) {
if (firstErrorObjectIfPresent == null) firstErrorObjectIfPresent = new Result.Error<>(ex);
DtbOrdt order = (DtbOrdt) dataToSave;
logger.error("Errore durante l'importazione dell'ordine [num: " + order.getNumOrd() + "," +
"data: " + order.getDataOrd() + "," +
"gestione: " + order.getGestione() + "]", ex);
internalMultiDb.rollbackAll();
exchangeMultiDb.rollbackAll();
}
internalMultiDb.rollbackAll();
exchangeMultiDb.rollbackAll();
}
}
}
if (firstExceptionToThrow[0] != null) throw firstExceptionToThrow[0];
if (firstErrorObjectIfPresent != null) throw ((Result.Error) firstErrorObjectIfPresent).getError();
} finally {
if (useTempTable)
exchangeImportSchemaManagerService.deleteTempTables(exchangeMultiDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.OrdiniLavorazione);