Fix e migliorie su importazione Exchange

This commit is contained in:
2024-07-11 13:02:33 +02:00
parent 9c1bb13d7a
commit d76fd941a8
20 changed files with 1286 additions and 382 deletions

View File

@@ -0,0 +1,35 @@
package it.integry.ems.system.exchange.controller;
import it.integry.common.var.CommonConstants;
import it.integry.ems.response.ServiceRestResponse;
import it.integry.ems.system.exchange.service.ExchangeArticoliImportService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
@RestController
@Scope("request")
@RequestMapping("exchange/articoli/")
public class ExchangeArticoliImportController {
private final Logger logger = LogManager.getLogger();
@Autowired
private ExchangeArticoliImportService exchangeArticoliImportService;
@RequestMapping(value = "import", method = RequestMethod.GET)
public @ResponseBody
ServiceRestResponse importArticoli(HttpServletRequest request,
@RequestParam(CommonConstants.PROFILE_DB) String configuration) throws Exception {
exchangeArticoliImportService.importArticoli();
return ServiceRestResponse.createPositiveResponse();
}
}

View File

@@ -28,7 +28,7 @@ public class ExchangeDocumentImportController {
ServiceRestResponse importDocumentiLavorazione(HttpServletRequest request,
@RequestParam(CommonConstants.PROFILE_DB) String configuration) throws Exception {
exchangeDocumentImportService.importDocumentiLavorazione();
exchangeDocumentImportService.importTestateDocumentiLavorazione();
return ServiceRestResponse.createPositiveResponse();
}
}

View File

@@ -2,7 +2,7 @@ package it.integry.ems.system.exchange.controller;
import it.integry.common.var.CommonConstants;
import it.integry.ems.response.ServiceRestResponse;
import it.integry.ems.system.exchange.service.ExchangeOrderImportService;
import it.integry.ems.system.exchange.service.ExchangeOrdiniImportService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
@@ -20,7 +20,7 @@ public class ExchangeOrdiniImportController {
private final Logger logger = LogManager.getLogger();
@Autowired
private ExchangeOrderImportService exchangeOrderImportService;
private ExchangeOrdiniImportService exchangeOrdiniImportService;
@@ -29,7 +29,7 @@ public class ExchangeOrdiniImportController {
ServiceRestResponse importLavorazione(HttpServletRequest request,
@RequestParam(CommonConstants.PROFILE_DB) String configuration) throws Exception {
exchangeOrderImportService.importOrdiniLavorazione();
exchangeOrdiniImportService.importOrdiniLavorazione();
return ServiceRestResponse.createPositiveResponse();
}
}

View File

@@ -0,0 +1,277 @@
package it.integry.ems.system.exchange.service;
import it.integry.ems.expansion.RunnableThrowable;
import it.integry.ems.service.EntityProcessor;
import it.integry.ems.sync.MultiDBTransaction.MultiDBTransactionManager;
import it.integry.ems_model.base.EquatableEntityInterface;
import it.integry.ems_model.entity.MtbAart;
import it.integry.ems_model.entity.MtbGrup;
import it.integry.ems_model.entity.MtbSfam;
import it.integry.ems_model.entity.MtbSgrp;
import it.integry.ems_model.types.OperationType;
import it.integry.ems_model.utility.UtilityDB;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@SuppressWarnings("rawtypes")
@Service
@Scope("request")
public class ExchangeArticoliImportService {
//TODO: To be remove, only for fast development
private final String ROSSOGARGANO_EXCHANGE_PROFILEDB = "ROSSO_GARGANO_EXCHANGE";
private final String ROSSOGARGANO_EXCHANGE_USER = "DBA";
@Autowired
private EntityProcessor entityProcessor;
@Autowired
private MultiDBTransactionManager multiDBTransactionManager;
@Autowired
private ExchangeImportSchemaManagerService exchangeImportSchemaManagerService;
@Autowired
private ExchangeImportDataManagerService exchangeImportDataManagerService;
private final Logger logger = LogManager.getLogger();
public void importArticoli() throws Exception {
boolean useTempTable = true;
try (MultiDBTransactionManager exchangeDb = new MultiDBTransactionManager(ROSSOGARGANO_EXCHANGE_PROFILEDB)) {
try {
exchangeImportSchemaManagerService.syncSchema(exchangeDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.Articoli, useTempTable);
importGruppiMerceologici(exchangeDb, useTempTable);
final List<MtbAart> exchangeImportedData = retrieveArticoli(
exchangeDb.getPrimaryConnection(),
true, false);
final List<MtbAart> exchangeUpdatedData = retrieveArticoli(
exchangeDb.getPrimaryConnection(),
false, useTempTable);
List<EquatableEntityInterface> allData = exchangeImportDataManagerService
.runSync(MtbAart.class, exchangeImportedData, exchangeUpdatedData);
allData.stream()
.filter(x -> x.getOperation() == OperationType.INSERT)
.forEach(x -> x.setOperation(OperationType.INSERT_OR_UPDATE));
final Exception[] firstExceptionToThrow = {null};
AtomicInteger importedCounter = new AtomicInteger();
List<RunnableThrowable> calls = new ArrayList<>();
for (EquatableEntityInterface dataToSave : allData) {
// calls.add(() -> {
logger.debug("Importati {} di {}", importedCounter.incrementAndGet(), allData.size());
try {
entityProcessor.processEntity(dataToSave, true, true, ROSSOGARGANO_EXCHANGE_USER, multiDBTransactionManager);
singleUpdateImported(exchangeDb.getPrimaryConnection(), (MtbAart) dataToSave, useTempTable);
multiDBTransactionManager.commitAll();
} catch (Exception ex) {
if (firstExceptionToThrow[0] == null) firstExceptionToThrow[0] = ex;
logger.error("Errore durante l'importazione del documento", ex);
multiDBTransactionManager.rollbackAll();
//throw ex;
}
// });
}
// UtilityThread.executeParallel(calls);
if (firstExceptionToThrow[0] != null) throw firstExceptionToThrow[0];
} finally {
if (useTempTable)
exchangeImportSchemaManagerService.deleteTempTables(exchangeDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.Articoli);
}
}
}
private List<MtbAart> retrieveArticoli(Connection connection, boolean retrieveAlreadyImported, boolean useTempTable) throws Exception {
String originalTableName = "mtb_aart";
String tableName = originalTableName + (useTempTable ? "_tmp" : "");
if (useTempTable) {
UtilityDB.executeStatement(connection,
"INSERT INTO " + tableName +
" SELECT * FROM " + originalTableName
);
}
return exchangeImportDataManagerService.retrieveDataFromExchange(connection, MtbAart.class,
tableName, null, retrieveAlreadyImported);
}
private void singleUpdateImported(Connection connection, MtbAart importedDtbOrdt, boolean useTempTable) throws Exception {
final HashMap<String, Object> importedKey = new HashMap<String, Object>() {{
put("cod_mart", importedDtbOrdt.getCodMart());
}};
exchangeImportDataManagerService.updateImportedStatus(connection, "mtb_aart", importedKey, useTempTable);
connection.commit();
}
private void importGruppiMerceologici(MultiDBTransactionManager exchangeDb, boolean useTempTable) throws Exception {
final List<MtbGrup> exchangeImportedData = retrieveMtbGrup(
exchangeDb.getPrimaryConnection(),
true, false);
final List<MtbGrup> exchangeUpdatedData = retrieveMtbGrup(
exchangeDb.getPrimaryConnection(),
false, useTempTable);
List<EquatableEntityInterface> allData = exchangeImportDataManagerService
.runSync(MtbGrup.class, exchangeImportedData, exchangeUpdatedData);
allData.stream()
.map(x -> (MtbGrup) x)
.forEach(x -> {
if (x.getOperation() == OperationType.INSERT)
x.setOperation(OperationType.INSERT_OR_UPDATE);
x.getMtbSgrp()
.forEach(y -> {
if (y.getOperation() == OperationType.INSERT)
y.setOperation(OperationType.INSERT_OR_UPDATE);
y.getMtbSfam().stream()
.filter(z -> z.getOperation() == OperationType.INSERT)
.forEach(z -> z.setOperation(OperationType.INSERT_OR_UPDATE));
y.setOperation(OperationType.INSERT_OR_UPDATE);
});
});
final Exception[] firstExceptionToThrow = {null};
AtomicInteger importedCounter = new AtomicInteger();
List<RunnableThrowable> calls = new ArrayList<>();
for (EquatableEntityInterface dataToSave : allData) {
// calls.add(() -> {
logger.debug("Importati {} gruppi merceologici di {}", importedCounter.incrementAndGet(), allData.size());
try {
entityProcessor.processEntity(dataToSave, true, true, ROSSOGARGANO_EXCHANGE_USER, multiDBTransactionManager);
singleUpdateImported(exchangeDb.getPrimaryConnection(), (MtbGrup) dataToSave, useTempTable);
multiDBTransactionManager.commitAll();
} catch (Exception ex) {
if (firstExceptionToThrow[0] == null) firstExceptionToThrow[0] = ex;
logger.error("Errore durante l'importazione del gruppo merceologico", ex);
multiDBTransactionManager.rollbackAll();
//throw ex;
}
// });
}
// UtilityThread.executeParallel(calls);
if (firstExceptionToThrow[0] != null) throw firstExceptionToThrow[0];
}
private List<MtbGrup> retrieveMtbGrup(Connection connection, boolean retrieveAlreadyImported, boolean useTempTable) throws Exception {
String mtbGrupOriginalTableName = "mtb_grup";
String mtbGrupTableName = mtbGrupOriginalTableName + (useTempTable ? "_tmp" : "");
String mtbSgrpOriginalTableName = "mtb_sgrp";
String mtbSgrpTableName = mtbSgrpOriginalTableName + (useTempTable ? "_tmp" : "");
String mtbSfamOriginalTableName = "mtb_sfam";
String mtbSfamTableName = mtbSfamOriginalTableName + (useTempTable ? "_tmp" : "");
if (useTempTable) {
UtilityDB.executeStatement(connection,
"INSERT INTO " + mtbGrupTableName +
" SELECT * FROM " + mtbGrupOriginalTableName,
"INSERT INTO " + mtbSgrpTableName +
" SELECT * FROM " + mtbSgrpOriginalTableName,
"INSERT INTO " + mtbSfamTableName +
" SELECT * FROM " + mtbSfamOriginalTableName
);
}
List<MtbGrup> mtbGrups = exchangeImportDataManagerService.retrieveDataFromExchange(connection, MtbGrup.class,
mtbGrupTableName, null, retrieveAlreadyImported);
List<MtbSgrp> mtbSgrps = exchangeImportDataManagerService.retrieveDataFromExchange(connection, MtbSgrp.class,
mtbSgrpTableName, null, retrieveAlreadyImported);
List<MtbSfam> mtbSfams = exchangeImportDataManagerService.retrieveDataFromExchange(connection, MtbSfam.class,
mtbSfamTableName, null, retrieveAlreadyImported);
mtbSgrps.forEach(mtbSgrp -> {
mtbSgrp.setMtbSfam(mtbSfams.stream()
.filter(mtbSfam -> mtbSfam.getCodMgrp().equalsIgnoreCase(mtbSgrp.getCodMgrp()) &&
mtbSfam.getCodMsgr().equalsIgnoreCase(mtbSgrp.getCodMsgr()))
.collect(Collectors.toList()));
});
mtbGrups.forEach(mtbGrup -> {
mtbGrup.setMtbSgrp(mtbSgrps.stream()
.filter(mtbSgrp -> mtbSgrp.getCodMgrp().equalsIgnoreCase(mtbGrup.getCodMgrp()))
.collect(Collectors.toList()));
});
return mtbGrups;
}
private void singleUpdateImported(Connection connection, MtbGrup importedData, boolean useTempTable) throws Exception {
final HashMap<String, Object> importedMtbGrupKey = new HashMap<String, Object>() {{
put("cod_mgrp", importedData.getCodMgrp());
}};
exchangeImportDataManagerService.updateImportedStatus(connection, "mtb_grup", importedMtbGrupKey, useTempTable);
final List<HashMap<String, Object>> importedMtbSgrpKeys = importedData.getMtbSgrp().stream()
.map(x -> new HashMap<String, Object>() {{
put("cod_mgrp", x.getCodMgrp());
put("cod_msgr", x.getCodMsgr());
}})
.collect(Collectors.toList());
exchangeImportDataManagerService.updateImportedStatus(connection, "mtb_sgrp", importedMtbSgrpKeys, useTempTable);
final List<HashMap<String, Object>> importedMtbSfamKeys = importedData.getMtbSgrp().stream()
.flatMap(x -> x.getMtbSfam().stream())
.map(x -> new HashMap<String, Object>() {{
put("cod_mgrp", x.getCodMgrp());
put("cod_msgr", x.getCodMsgr());
put("cod_msfa", x.getCodMsfa());
}})
.collect(Collectors.toList());
exchangeImportDataManagerService.updateImportedStatus(connection, "mtb_sfam", importedMtbSfamKeys, useTempTable);
connection.commit();
}
}

View File

@@ -6,9 +6,9 @@ import it.integry.ems.sync.MultiDBTransaction.MultiDBTransactionManager;
import it.integry.ems_model.base.EquatableEntityInterface;
import it.integry.ems_model.entity.MtbColr;
import it.integry.ems_model.entity.MtbColt;
import it.integry.ems_model.utility.Query;
import it.integry.ems_model.utility.UtilityDB;
import it.integry.ems_model.utility.UtilityLocalDate;
import it.integry.ems_model.utility.UtilityQuery;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
@@ -43,76 +43,106 @@ public class ExchangeColliImportService {
@Autowired
private ExchangeImportDataManagerService exchangeImportDataManagerService;
@Autowired
private ExchangePartiteMagazzinoImportService exchangePartiteMagazzinoImportService;
private final Logger logger = LogManager.getLogger();
public void importColliLavorazione() throws Exception {
boolean useTempTable = true;
try (MultiDBTransactionManager exchangeDb = new MultiDBTransactionManager(ROSSOGARGANO_EXCHANGE_PROFILEDB)) {
exchangeImportSchemaManagerService.syncSchema(exchangeDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.ColliLavorazione);
try {
exchangeImportSchemaManagerService.syncSchema(exchangeDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.ColliLavorazione, useTempTable);
exchangePartiteMagazzinoImportService.importPartiteMagazzinoLavorazione();
final List<MtbColt> exchangeImportedMtbColts = importColliLavorazione(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusMonths(1),
UtilityLocalDate.getNow(),
true);
final List<MtbColt> exchangeImportedMtbColts = importColliLavorazione(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusWeeks(1),
UtilityLocalDate.getNow(),
true, false);
List<MtbColt> exchangeUpdatedMtbColts = importColliLavorazione(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusMonths(1),
UtilityLocalDate.getNow(),
false);
List<MtbColt> exchangeUpdatedMtbColts = importColliLavorazione(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusWeeks(1),
UtilityLocalDate.getNow(),
false, useTempTable);
List<EquatableEntityInterface> importedMtbColts = exchangeImportedMtbColts.stream()
.map(x -> (EquatableEntityInterface) x)
.collect(Collectors.toList());
List<EquatableEntityInterface> importedMtbColts = exchangeImportedMtbColts.stream()
.map(x -> (EquatableEntityInterface) x)
.collect(Collectors.toList());
List<EquatableEntityInterface> updatedMtbColts = exchangeUpdatedMtbColts.stream()
.map(x -> (EquatableEntityInterface) x)
.collect(Collectors.toList());
List<EquatableEntityInterface> updatedMtbColts = exchangeUpdatedMtbColts.stream()
.map(x -> (EquatableEntityInterface) x)
.collect(Collectors.toList());
List<EquatableEntityInterface> allMtbColts = exchangeImportDataManagerService
.runSync(MtbColt.class, importedMtbColts, updatedMtbColts);
List<EquatableEntityInterface> allMtbColts = exchangeImportDataManagerService
.runSync(MtbColt.class, importedMtbColts, updatedMtbColts);
final Exception[] firstExceptionToThrow = {null};
final Exception[] firstExceptionToThrow = {null};
AtomicInteger importedCounter = new AtomicInteger();
List<RunnableThrowable> calls = new ArrayList<>();
AtomicInteger importedCounter = new AtomicInteger();
List<RunnableThrowable> calls = new ArrayList<>();
for (EquatableEntityInterface mtbColtToSave : allMtbColts) {
for (EquatableEntityInterface mtbColtToSave : allMtbColts) {
//calls.add(() -> {
logger.debug("Importati {} di {}", importedCounter.incrementAndGet(), allMtbColts.size());
try {
entityProcessor.processEntity(mtbColtToSave, multiDBTransactionManager);
// calls.add(() -> {
logger.debug("Importati {} di {}", importedCounter.incrementAndGet(), allMtbColts.size());
try {
entityProcessor.processEntity(mtbColtToSave, multiDBTransactionManager);
singleUpdateImported(exchangeDb.getPrimaryConnection(), (MtbColt) mtbColtToSave);
multiDBTransactionManager.commitAll();
} catch (Exception ex) {
if (firstExceptionToThrow[0] == null) firstExceptionToThrow[0] = ex;
singleUpdateImported(exchangeDb.getPrimaryConnection(), (MtbColt) mtbColtToSave, useTempTable);
logger.error("Errore durante l'importazione del collo", ex);
multiDBTransactionManager.rollbackAll();
//throw ex;
multiDBTransactionManager.commitAll();
exchangeDb.commitAll();
} catch (Exception ex) {
if (firstExceptionToThrow[0] == null) firstExceptionToThrow[0] = ex;
logger.error("Errore durante l'importazione del collo", ex);
//multiDBTransactionManager.rollbackAll();
//throw ex;
}
// });
}
//});
}
//UtilityThread.executeParallel(calls);
// UtilityThread.executeParallel(calls);
if (firstExceptionToThrow[0] != null) throw firstExceptionToThrow[0];
if (firstExceptionToThrow[0] != null) throw firstExceptionToThrow[0];
} finally {
if (useTempTable)
exchangeImportSchemaManagerService.deleteTempTables(exchangeDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.ColliLavorazione);
}
}
}
private List<MtbColt> importColliLavorazione(Connection connection,
LocalDate minDate, LocalDate maxDate, boolean retrieveAlreadyImported) throws Exception {
LocalDate minDate, LocalDate maxDate, boolean retrieveAlreadyImported, boolean useTempTable) throws Exception {
String mtbColtLavOriginalName = "mtb_colt_lav";
String mtbColtLavTableName = mtbColtLavOriginalName + (useTempTable ? "_tmp" : "");
String mtbColrLavOriginalName = "mtb_colr_lav";
String mtbColrLavTableName = mtbColrLavOriginalName + (useTempTable ? "_tmp" : "");
if (useTempTable) {
UtilityDB.executeStatement(connection,
"INSERT INTO " + mtbColtLavTableName +
" SELECT * FROM " + mtbColtLavOriginalName,
"INSERT INTO " + mtbColrLavTableName +
" SELECT * FROM " + mtbColrLavOriginalName
);
}
final List<MtbColt> mtbColtLav = exchangeImportDataManagerService.retrieveDataFromExchange(connection, MtbColt.class,
"mtb_colt_lav", "data_collo", minDate, maxDate, retrieveAlreadyImported);
mtbColtLavTableName, Query.format("data_collo BETWEEN {} AND {}", minDate, maxDate), retrieveAlreadyImported);
final List<MtbColr> mtbColrLav = exchangeImportDataManagerService.retrieveDataFromExchange(connection, MtbColr.class,
"mtb_colr_lav", "data_collo", minDate, maxDate, retrieveAlreadyImported);
mtbColrLavTableName, Query.format("data_collo BETWEEN {} AND {}", minDate, maxDate), retrieveAlreadyImported);
mtbColtLav
@@ -134,7 +164,7 @@ public class ExchangeColliImportService {
}
private void singleUpdateImported(Connection connection, MtbColt importedMtbColt) throws Exception {
private void singleUpdateImported(Connection connection, MtbColt importedMtbColt, boolean useTempTable) throws Exception {
final HashMap<String, Object> importedKey = new HashMap<String, Object>() {{
put("data_collo", importedMtbColt.getDataCollo());
put("ser_collo", importedMtbColt.getSerCollo());
@@ -142,22 +172,9 @@ public class ExchangeColliImportService {
put("gestione", importedMtbColt.getGestione());
}};
String whereCondKey = UtilityQuery.concatFieldsInWhereCond(importedKey);
exchangeImportDataManagerService.updateImportedStatus(connection, "mtb_colt_lav", importedKey, useTempTable);
exchangeImportDataManagerService.updateImportedStatus(connection, "mtb_colr_lav", importedKey, useTempTable);
UtilityDB.executeStatement(connection,
"DELETE FROM mtb_colr_lav_prev " +
"WHERE (" + whereCondKey + ")",
"DELETE FROM mtb_colt_lav_prev " +
"WHERE (" + whereCondKey + ")",
"INSERT INTO mtb_colt_lav_prev " +
"SELECT * FROM mtb_colt_lav " +
"WHERE (" + whereCondKey + ")",
"INSERT INTO mtb_colr_lav_prev " +
"SELECT * FROM mtb_colr_lav " +
"WHERE (" + whereCondKey + ")");
}

View File

@@ -6,9 +6,10 @@ import it.integry.ems.sync.MultiDBTransaction.MultiDBTransactionManager;
import it.integry.ems_model.base.EquatableEntityInterface;
import it.integry.ems_model.entity.DtbDocr;
import it.integry.ems_model.entity.DtbDoct;
import it.integry.ems_model.types.OperationType;
import it.integry.ems_model.utility.Query;
import it.integry.ems_model.utility.UtilityDB;
import it.integry.ems_model.utility.UtilityLocalDate;
import it.integry.ems_model.utility.UtilityQuery;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
@@ -20,6 +21,7 @@ import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -30,6 +32,7 @@ public class ExchangeDocumentImportService {
//TODO: To be remove, only for fast development
private final String ROSSOGARGANO_EXCHANGE_PROFILEDB = "ROSSO_GARGANO_EXCHANGE";
private final String ROSSOGARGANO_EXCHANGE_USER = "DBA";
@Autowired
private EntityProcessor entityProcessor;
@@ -43,96 +46,177 @@ public class ExchangeDocumentImportService {
@Autowired
private ExchangeImportDataManagerService exchangeImportDataManagerService;
@Autowired
private ExchangePartiteMagazzinoImportService exchangePartiteMagazzinoImportService;
private final Logger logger = LogManager.getLogger();
public void importDocumentiLavorazione() throws Exception {
public void importTestateDocumentiLavorazione() throws Exception {
boolean useTempTable = true;
try (MultiDBTransactionManager exchangeDb = new MultiDBTransactionManager(ROSSOGARGANO_EXCHANGE_PROFILEDB)) {
exchangeImportSchemaManagerService.syncSchema(exchangeDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.DocumentiLavorazione);
final List<DtbDoct> exchangeImportedData = importDocumentiLavorazione(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusMonths(1),
UtilityLocalDate.getNow(),
true);
final List<DtbDoct> exchangeUpdatedData = importDocumentiLavorazione(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusMonths(1),
UtilityLocalDate.getNow(),
false);
try {
exchangeImportSchemaManagerService.syncSchema(exchangeDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.DocumentiLavorazione, useTempTable);
exchangePartiteMagazzinoImportService.importPartiteMagazzinoLavorazione();
List<EquatableEntityInterface> importedData = exchangeImportedData.stream()
.map(x -> (EquatableEntityInterface) x)
.collect(Collectors.toList());
final List<DtbDoct> exchangeImportedTestateData = importTestateDocumentiLavorazione(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusWeeks(1),
UtilityLocalDate.getNow(),
true, false);
List<EquatableEntityInterface> updatedData = exchangeUpdatedData.stream()
.map(x -> (EquatableEntityInterface) x)
.collect(Collectors.toList());
final List<DtbDoct> exchangeUpdatedTestateData = importTestateDocumentiLavorazione(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusWeeks(1),
UtilityLocalDate.getNow(),
false, useTempTable);
List<EquatableEntityInterface> allData = exchangeImportDataManagerService
.runSync(DtbDoct.class, importedData, updatedData);
final Exception[] firstExceptionToThrow = {null};
final List<DtbDocr> exchangeImportedRigheData = importRigheDocumentiLavorazione(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusWeeks(1),
UtilityLocalDate.getNow(),
true, false);
AtomicInteger importedCounter = new AtomicInteger();
List<RunnableThrowable> calls = new ArrayList<>();
final List<DtbDocr> exchangeUpdatedRigheData = importRigheDocumentiLavorazione(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusWeeks(1),
UtilityLocalDate.getNow(),
false, useTempTable);
for (EquatableEntityInterface dataToSave : allData) {
List<EquatableEntityInterface> allTestateData = exchangeImportDataManagerService
.runSync(DtbDoct.class, exchangeImportedTestateData, exchangeUpdatedTestateData);
//calls.add(() -> {
logger.debug("Importati {} di {}", importedCounter.incrementAndGet(), allData.size());
try {
entityProcessor.processEntity(dataToSave, multiDBTransactionManager);
List<EquatableEntityInterface> allRigheData = exchangeImportDataManagerService
.runSync(DtbDocr.class, exchangeImportedRigheData, exchangeUpdatedRigheData);
singleUpdateImported(exchangeDb.getPrimaryConnection(), (DtbDoct) dataToSave);
multiDBTransactionManager.commitAll();
} catch (Exception ex) {
if (firstExceptionToThrow[0] == null) firstExceptionToThrow[0] = ex;
logger.error("Errore durante l'importazione del documento", ex);
multiDBTransactionManager.rollbackAll();
//throw ex;
allTestateData.stream()
.map(x -> (DtbDoct) x)
.forEach(x ->
x.setDtbDocr(allRigheData.stream()
.map(y -> (DtbDocr) y)
.filter(y -> y.getDataDoc().equals(x.getDataDoc()) &&
y.getNumDoc().equals(x.getNumDoc()) &&
y.getSerDoc().equalsIgnoreCase(x.getSerDoc()) &&
y.getCodAnag().equalsIgnoreCase(x.getCodAnag()) &&
y.getCodDtip().equalsIgnoreCase(x.getCodDtip()))
.collect(Collectors.toList())));
allRigheData.stream()
.map(x -> (DtbDocr) x)
.filter(x -> allTestateData.stream()
.map(y -> (DtbDoct) y)
.noneMatch(y -> x.getCodDtip().equalsIgnoreCase(y.getCodDtip()) &&
x.getDataDoc().equals(y.getDataDoc()) &&
Objects.equals(x.getNumDoc(), y.getNumDoc()) &&
x.getCodAnag().equalsIgnoreCase(y.getCodAnag()) &&
x.getSerDoc().equalsIgnoreCase(y.getSerDoc())))
.forEach(x -> {
DtbDoct testata = new DtbDoct()
.setSerDoc(x.getSerDoc())
.setDataDoc(x.getDataDoc())
.setNumDoc(x.getNumDoc())
.setCodDtip(x.getCodDtip())
.setCodAnag(x.getCodAnag());
testata.setOperation(OperationType.UPDATE);
allTestateData.add(testata);
});
List<EquatableEntityInterface> allData = allTestateData;
final Exception[] firstExceptionToThrow = {null};
AtomicInteger importedCounter = new AtomicInteger();
List<RunnableThrowable> calls = new ArrayList<>();
for (EquatableEntityInterface dataToSave : allData) {
// calls.add(() -> {
logger.debug("Importati {} di {}", importedCounter.incrementAndGet(), allData.size());
try {
entityProcessor.processEntity(dataToSave, true, true, ROSSOGARGANO_EXCHANGE_USER, multiDBTransactionManager);
singleUpdateImported(exchangeDb.getPrimaryConnection(), (DtbDoct) dataToSave, useTempTable);
//multiDBTransactionManager.commitAll();
multiDBTransactionManager.commitAll();
exchangeDb.commitAll();
} catch (Exception ex) {
if (firstExceptionToThrow[0] == null) firstExceptionToThrow[0] = ex;
logger.error("Errore durante l'importazione del documento", ex);
//multiDBTransactionManager.rollbackAll();
//throw ex;
}
// });
}
// UtilityThread.executeParallel(calls);
//});
if (firstExceptionToThrow[0] != null) throw firstExceptionToThrow[0];
} finally {
if (useTempTable)
exchangeImportSchemaManagerService.deleteTempTables(exchangeDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.DocumentiLavorazione);
}
//UtilityThread.executeParallel(calls);
if (firstExceptionToThrow[0] != null) throw firstExceptionToThrow[0];
}
}
private List<DtbDoct> importDocumentiLavorazione(Connection connection,
LocalDate minDate, LocalDate maxDate, boolean retrieveAlreadyImported) throws Exception {
private List<DtbDoct> importTestateDocumentiLavorazione(Connection connection,
LocalDate minDate, LocalDate maxDate, boolean retrieveAlreadyImported, boolean useTempTable) throws Exception {
String dtbDoctLavOriginalName = "dtb_doct_lav";
String dtbDoctLavTableName = dtbDoctLavOriginalName + (useTempTable ? "_tmp" : "");
if (useTempTable) {
UtilityDB.executeStatement(connection,
"INSERT INTO " + dtbDoctLavTableName +
" SELECT * FROM " + dtbDoctLavOriginalName
);
}
final List<DtbDoct> dtbDoctLav = exchangeImportDataManagerService.retrieveDataFromExchange(connection, DtbDoct.class,
"dtb_doct_lav", "data_doc", minDate, maxDate, retrieveAlreadyImported);
final List<DtbDocr> dtbDocrLav = exchangeImportDataManagerService.retrieveDataFromExchange(connection, DtbDocr.class,
"dtb_docr_lav", "data_doc", minDate, maxDate, retrieveAlreadyImported);
dtbDoctLavTableName, Query.format("data_doc BETWEEN {} AND {}", minDate, maxDate), retrieveAlreadyImported);
dtbDoctLav
.forEach(x -> {
x.setDtbDocr(dtbDocrLav.stream()
.filter(y -> y.getDataDoc().equals(x.getDataDoc()) &&
y.getNumDoc().equals(x.getNumDoc()) &&
y.getSerDoc().equalsIgnoreCase(x.getSerDoc()) &&
y.getCodAnag().equalsIgnoreCase(x.getCodAnag()) &&
y.getCodDtip().equalsIgnoreCase(x.getCodDtip()))
.collect(Collectors.toList()));
x.setUserName(ROSSOGARGANO_EXCHANGE_USER);
x.setUsername(ROSSOGARGANO_EXCHANGE_USER);
});
return dtbDoctLav;
}
private void singleUpdateImported(Connection connection, DtbDoct importedDtbDoct) throws Exception {
private List<DtbDocr> importRigheDocumentiLavorazione(Connection connection,
LocalDate minDate, LocalDate maxDate, boolean retrieveAlreadyImported, boolean useTempTable) throws Exception {
String dtbDocrLavOriginalName = "dtb_docr_lav";
String dtbDocrLavTableName = dtbDocrLavOriginalName + (useTempTable ? "_tmp" : "");
if (useTempTable) {
UtilityDB.executeStatement(connection,
"INSERT INTO " + dtbDocrLavTableName +
" SELECT * FROM " + dtbDocrLavOriginalName
);
}
return exchangeImportDataManagerService.retrieveDataFromExchange(connection, DtbDocr.class,
dtbDocrLavTableName, Query.format("data_doc BETWEEN {} AND {}", minDate, maxDate), retrieveAlreadyImported);
}
private void singleUpdateImported(Connection connection, DtbDoct importedDtbDoct, boolean useTempTable) throws Exception {
final HashMap<String, Object> importedKey = new HashMap<String, Object>() {{
put("data_doc", importedDtbDoct.getDataDoc());
put("ser_doc", importedDtbDoct.getSerDoc());
@@ -141,23 +225,9 @@ public class ExchangeDocumentImportService {
put("cod_dtip", importedDtbDoct.getCodDtip());
}};
String whereCondKey = UtilityQuery.concatFieldsInWhereCond(importedKey);
UtilityDB.executeStatement(connection,
"DELETE FROM dtb_docr_lav_prev " +
"WHERE (" + whereCondKey + ")",
"DELETE FROM dtb_doct_lav_prev " +
"WHERE (" + whereCondKey + ")",
"INSERT INTO dtb_docr_lav_prev " +
"SELECT * FROM dtb_docr_lav " +
"WHERE (" + whereCondKey + ")",
"INSERT INTO dtb_doct_lav_prev " +
"SELECT * FROM dtb_doct_lav " +
"WHERE (" + whereCondKey + ")");
exchangeImportDataManagerService.updateImportedStatus(connection, "dtb_doct_lav", importedKey, useTempTable);
exchangeImportDataManagerService.updateImportedStatus(connection, "dtb_docr_lav", importedKey, useTempTable);
// connection.commit();
}
}

View File

@@ -5,15 +5,16 @@ import it.integry.ems_model.base.EntityBase;
import it.integry.ems_model.base.EntityPropertyHolder;
import it.integry.ems_model.base.EquatableEntityInterface;
import it.integry.ems_model.types.OperationType;
import it.integry.ems_model.utility.Query;
import it.integry.ems_model.utility.UtilityDB;
import it.integry.ems_model.utility.UtilityQuery;
import it.integry.ems_model.utility.UtilityString;
import org.springframework.stereotype.Service;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
@@ -23,15 +24,17 @@ public class ExchangeImportDataManagerService {
public <T extends EquatableEntityInterface> List<T> retrieveDataFromExchange(Connection connection,
Class<T> clazz,
String tableName, String dateSqlFieldName,
LocalDate minDate, LocalDate maxDate, boolean retrieveAlreadyImported) throws Exception {
Class<T> clazz,
String tableName, String whereCond, boolean retrieveAlreadyImported) throws Exception {
String query = "SELECT * FROM " + tableName + (retrieveAlreadyImported ? "_prev" : "") + " ";
if (!UtilityString.isNullOrEmpty(whereCond)) {
query += "WHERE " + whereCond;
}
final List<T> importedRawData =
UtilityDB.executeSimpleQueryDTO(connection,
Query.format("SELECT * FROM " + tableName + (retrieveAlreadyImported ? "_prev" : "") + " " +
"WHERE " + dateSqlFieldName + " BETWEEN {} and {}", minDate, maxDate),
clazz);
UtilityDB.executeSimpleQueryDTO(connection, query, clazz);
if (importedRawData == null || importedRawData.isEmpty())
return new ArrayList<>();
@@ -41,7 +44,9 @@ public class ExchangeImportDataManagerService {
}
public List<EquatableEntityInterface> runSync(Class<? extends EquatableEntityInterface> clazz, List<EquatableEntityInterface> importedItems, List<EquatableEntityInterface> newItems) throws IllegalAccessException {
public List<EquatableEntityInterface> runSync(Class<? extends EquatableEntityInterface> clazz,
List<? extends EquatableEntityInterface> importedItems,
List<? extends EquatableEntityInterface> newItems) throws IllegalAccessException {
//To be added
List<EquatableEntityInterface> dataToAdd = calcItemsToAdd(clazz, importedItems, newItems);
@@ -59,9 +64,11 @@ public class ExchangeImportDataManagerService {
return allData;
}
private List<EquatableEntityInterface> calcItemsToAdd(Class<? extends EquatableEntityInterface> clazz, List<EquatableEntityInterface> importedMtbColts, List<EquatableEntityInterface> updatedMtbColts) throws IllegalAccessException {
final List<EquatableEntityInterface> itemsToAdd = updatedMtbColts.stream()
.filter(x -> importedMtbColts.stream().noneMatch(y -> y.equalsKey(x)))
private List<EquatableEntityInterface> calcItemsToAdd(Class<? extends EquatableEntityInterface> clazz,
List<? extends EquatableEntityInterface> alreadyImportedData,
List<? extends EquatableEntityInterface> updatedData) throws IllegalAccessException {
final List<EquatableEntityInterface> itemsToAdd = updatedData.stream()
.filter(x -> alreadyImportedData.stream().noneMatch(y -> y.equalsKey(x)))
.peek(x -> x.setOperation(OperationType.INSERT))
.collect(Collectors.toList());
@@ -69,6 +76,7 @@ public class ExchangeImportDataManagerService {
final List<EntityHierarchy> entityChildren = EntityPropertyHolder.getEntityChildrenStatic(clazz, EntityHierarchy::isEquatable);
for (EntityHierarchy entityChild : entityChildren) {
Field parentField = entityChild.getParentField();
parentField.setAccessible(true);
for (EquatableEntityInterface item : itemsToAdd) {
Object child = parentField.get(item);
@@ -87,10 +95,12 @@ public class ExchangeImportDataManagerService {
return itemsToAdd;
}
private List<EquatableEntityInterface> calcItemsToDelete(Class<? extends EquatableEntityInterface> clazz, List<EquatableEntityInterface> importedMtbColts, List<EquatableEntityInterface> updatedMtbColts) throws IllegalAccessException {
private List<EquatableEntityInterface> calcItemsToDelete(Class<? extends EquatableEntityInterface> clazz,
List<? extends EquatableEntityInterface> importedMtbColts,
List<? extends EquatableEntityInterface> updatedMtbColts) throws IllegalAccessException {
final List<EquatableEntityInterface> itemsToDelete = importedMtbColts.stream()
.filter(x -> updatedMtbColts.stream().noneMatch(y -> y.equalsKey(x)))
.peek(x -> x.setOperation(OperationType.INSERT))
.peek(x -> x.setOperation(OperationType.DELETE))
.collect(Collectors.toList());
@@ -103,7 +113,7 @@ public class ExchangeImportDataManagerService {
if (child instanceof List) {
List<EntityBase> childList = (List<EntityBase>) child;
childList.stream()
childList
.forEach(y -> y.setOperation(OperationType.DELETE));
} else {
EntityBase childItem = (EntityBase) child;
@@ -115,7 +125,9 @@ public class ExchangeImportDataManagerService {
return itemsToDelete;
}
private List<EquatableEntityInterface> calcItemsToUpdate(Class<? extends EquatableEntityInterface> clazz, List<EquatableEntityInterface> importedMtbColts, List<EquatableEntityInterface> updatedMtbColts) throws IllegalAccessException {
private List<EquatableEntityInterface> calcItemsToUpdate(Class<? extends EquatableEntityInterface> clazz,
List<? extends EquatableEntityInterface> importedMtbColts,
List<? extends EquatableEntityInterface> updatedMtbColts) throws IllegalAccessException {
List<EquatableEntityInterface> itemsToUpdate = updatedMtbColts.stream()
.filter(x -> importedMtbColts.stream().anyMatch(y -> y.equalsKey(x)))
.collect(Collectors.toList());
@@ -163,7 +175,7 @@ public class ExchangeImportDataManagerService {
try {
List<EquatableEntityInterface> childs = (List<EquatableEntityInterface>) parentField.get(x);
if(!childs.isEmpty()) {
if (!childs.isEmpty()) {
anyChildToUpdate = true;
break;
}
@@ -180,4 +192,41 @@ public class ExchangeImportDataManagerService {
}
public void updateImportedStatus(Connection connection, String tableName, HashMap<String, Object> importedKey, boolean useTempTable) throws Exception {
String whereCondKey = UtilityQuery.concatFieldsInWhereCond(importedKey);
UtilityDB.executeStatement(connection,
"DELETE FROM " + tableName + "_prev " +
"WHERE (" + whereCondKey + ")",
"INSERT INTO " + tableName + "_prev " +
"SELECT * FROM " + tableName + (useTempTable ? "_tmp" : "") + " " +
"WHERE (" + whereCondKey + ")"
);
}
public void updateImportedStatus(Connection connection, String tableName, List<HashMap<String, Object>> importedKeys, boolean useTempTable) throws Exception {
List<String> queryToExecute = new ArrayList<>();
for (HashMap<String, Object> key : importedKeys) {
String whereCondKey = UtilityQuery.concatFieldsInWhereCond(key);
queryToExecute.add(
"DELETE FROM " + tableName + "_prev " +
"WHERE (" + whereCondKey + ")");
queryToExecute.add(
"INSERT INTO " + tableName + "_prev " +
"SELECT * FROM " + tableName + (useTempTable ? "_tmp" : "") + " " +
"WHERE (" + whereCondKey + ")"
);
}
String[] queryArray = new String[queryToExecute.size()];
queryToExecute.toArray(queryArray);
UtilityDB.executeStatement(connection, queryArray);
}
}

View File

@@ -21,16 +21,20 @@ public class ExchangeImportSchemaManagerService {
public enum SchemaType {
ColliLavorazione,
OrdiniLavorazione,
DocumentiLavorazione
DocumentiLavorazione,
Articoli,
PartiteMagazzinoLavorazione
}
private final HashMap<SchemaType, List<String>> schemaToTableBinding = new HashMap<SchemaType, List<String>>() {{
put(SchemaType.ColliLavorazione, Arrays.asList("mtb_colt_lav", "mtb_colr_lav"));
put(SchemaType.OrdiniLavorazione, Arrays.asList("dtb_ordt_lav", "dtb_ordr_lav"));
put(SchemaType.DocumentiLavorazione, Arrays.asList("dtb_doct_lav", "dtb_docr_lav"));
put(SchemaType.Articoli, Arrays.asList("mtb_aart", "mtb_grup", "mtb_sgrp", "mtb_sfam", "mtb_tipi"));
put(SchemaType.PartiteMagazzinoLavorazione, Arrays.asList("mtb_partita_mag_lav"));
}};
public void syncSchema(Connection connection, SchemaType schemaType) throws Exception {
public void syncSchema(Connection connection, SchemaType schemaType, boolean createTempTablesToo) throws Exception {
SQLServerDBSchemaManager sqlServerDBSchemaManager = new SQLServerDBSchemaManager(connection);
@@ -48,6 +52,12 @@ public class ExchangeImportSchemaManagerService {
for (DatabaseTableView databaseTableView : objectsToSync) {
if(createTempTablesToo) {
final DatabaseTableView tmpTable = databaseTableView.clone();
tmpTable.setTableName(tmpTable.getTableName() + "_tmp");
sqlServerDBSchemaManager.createTable(tmpTable);
}
databaseTableView.setTableName(databaseTableView.getTableName() + "_prev");
DatabaseTable prevTable = sqlServerDBSchemaManager.getTable(databaseTableView.getTableName());
@@ -89,5 +99,24 @@ public class ExchangeImportSchemaManagerService {
}
}
public void deleteTempTables(Connection connection, SchemaType schemaType) throws Exception {
SQLServerDBSchemaManager sqlServerDBSchemaManager = new SQLServerDBSchemaManager(connection);
List<DatabaseTable> objectsToDelete = new ArrayList<>();
if (!schemaToTableBinding.containsKey(schemaType))
throw new Exception("Si prega di definire gli oggetti del database da sincronizzare per " + schemaType);
List<String> objectNamesToSync = schemaToTableBinding.get(schemaType);
for (String objectName : objectNamesToSync) {
DatabaseTable tmpView = sqlServerDBSchemaManager.getTable(objectName + "_tmp");
if (tmpView != null) objectsToDelete.add(tmpView);
}
for (DatabaseTable objectToDelete : objectsToDelete) {
sqlServerDBSchemaManager.dropTable(objectToDelete);
}
}
}

View File

@@ -1,205 +0,0 @@
package it.integry.ems.system.exchange.service;
import it.integry.ems.expansion.RunnableThrowable;
import it.integry.ems.service.EntityProcessor;
import it.integry.ems.sync.MultiDBTransaction.MultiDBTransactionManager;
import it.integry.ems_model.base.EquatableEntityInterface;
import it.integry.ems_model.entity.DtbOrdSteps;
import it.integry.ems_model.entity.DtbOrdr;
import it.integry.ems_model.entity.DtbOrdt;
import it.integry.ems_model.types.OperationType;
import it.integry.ems_model.utility.UtilityDB;
import it.integry.ems_model.utility.UtilityLocalDate;
import it.integry.ems_model.utility.UtilityQuery;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@SuppressWarnings("rawtypes")
@Service
@Scope("request")
public class ExchangeOrderImportService {
//TODO: To be remove, only for fast development
private final String ROSSOGARGANO_EXCHANGE_PROFILEDB = "ROSSO_GARGANO_EXCHANGE";
private final Logger logger = LogManager.getLogger();
@Autowired
private EntityProcessor entityProcessor;
@Autowired
private MultiDBTransactionManager multiDBTransactionManager;
@Autowired
private ExchangeImportSchemaManagerService exchangeImportSchemaManagerService;
@Autowired
private ExchangeImportDataManagerService exchangeImportDataManagerService;
public void importOrdiniLavorazione() throws Exception {
try (MultiDBTransactionManager exchangeDb = new MultiDBTransactionManager(ROSSOGARGANO_EXCHANGE_PROFILEDB)) {
exchangeImportSchemaManagerService.syncSchema(exchangeDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.OrdiniLavorazione);
final List<DtbOrdt> exchangeImportedMtbColts = importOrdiniLavorazione(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusMonths(1),
UtilityLocalDate.getNow(),
true);
final List<DtbOrdt> exchangeUpdatedMtbColts = importOrdiniLavorazione(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusMonths(1),
UtilityLocalDate.getNow(),
false);
List<EquatableEntityInterface> importedData = exchangeImportedMtbColts.stream()
.map(x -> (EquatableEntityInterface) x)
.collect(Collectors.toList());
List<EquatableEntityInterface> updatedData = exchangeUpdatedMtbColts.stream()
.map(x -> (EquatableEntityInterface) x)
.collect(Collectors.toList());
List<EquatableEntityInterface> allData = exchangeImportDataManagerService
.runSync(DtbOrdt.class, importedData, updatedData);
allData.stream()
.map(x -> (DtbOrdt) x)
.filter(x -> x.getGestione().equalsIgnoreCase("L") &&
x.getOperation() == OperationType.INSERT)
.forEach(x -> {
DtbOrdSteps ordStep =
new DtbOrdSteps()
.setCodProdPri(x.getCodProd())
.setIdRiga(0)
.setCodJfas(x.getCodJfas())
.setQtaProd(x.getQtaProd());
ordStep.setOperation(OperationType.INSERT);
x.getDtbOrdSteps().add(ordStep);
});
List<List<EquatableEntityInterface>> splittedOrders =
Arrays.asList(
allData.stream().filter(x -> ((DtbOrdt) x).getGestione().equalsIgnoreCase("A")).collect(Collectors.toList()),
allData.stream().filter(x -> !((DtbOrdt) x).getGestione().equalsIgnoreCase("A")).collect(Collectors.toList())
);
final Exception[] firstExceptionToThrow = {null};
final AtomicInteger[] importedCounter = {new AtomicInteger()};
for (List<EquatableEntityInterface> listToProcess : splittedOrders) {
List<RunnableThrowable> calls = new ArrayList<>();
for (EquatableEntityInterface dtbOrdtToSave : listToProcess) {
//calls.add(() -> {
logger.debug("Importati {} di {}", importedCounter[0].incrementAndGet(), allData.size());
try {
entityProcessor.processEntity(dtbOrdtToSave, multiDBTransactionManager);
singleUpdateImported(exchangeDb.getPrimaryConnection(), (DtbOrdt) dtbOrdtToSave);
multiDBTransactionManager.commitAll();
} catch (Exception ex) {
if (firstExceptionToThrow[0] == null) firstExceptionToThrow[0] = ex;
logger.error("Errore durante l'importazione dell'ordine", ex);
multiDBTransactionManager.rollbackAll();
//throw ex;
}
//});
}
//UtilityThread.executeParallel(calls);
}
if (firstExceptionToThrow[0] != null) throw firstExceptionToThrow[0];
}
}
private List<DtbOrdt> importOrdiniLavorazione(Connection connection,
LocalDate minDate, LocalDate maxDate, boolean retrieveAlreadyImported) throws Exception {
List<DtbOrdt> dtbOrdtLav = exchangeImportDataManagerService.retrieveDataFromExchange(connection, DtbOrdt.class,
"dtb_ordt_lav", "data_ord", minDate, maxDate, retrieveAlreadyImported);
List<DtbOrdr> dtbOrdrLav = exchangeImportDataManagerService.retrieveDataFromExchange(connection, DtbOrdr.class,
"dtb_ordr_lav", "data_ord", minDate, maxDate, retrieveAlreadyImported);
dtbOrdtLav = dtbOrdtLav.stream()
// .filter(x -> x.getGestione().equalsIgnoreCase("A"))
.sorted((o1, o2) -> {
int gestNum1 = o1.getGestione().equalsIgnoreCase("A") ? 1 : 2; //Prima gli ordini di produzione A
int gestNum2 = o2.getGestione().equalsIgnoreCase("A") ? 1 : 2; //Dopo gli ordini di lavorazione L
return Integer.compare(gestNum1, gestNum2);
})
.collect(Collectors.toList());
dtbOrdrLav = dtbOrdrLav.stream()
.filter(x -> x.getGestione().equalsIgnoreCase("L"))
.collect(Collectors.toList());
List<DtbOrdr> finalDtbOrdrLav = dtbOrdrLav;
dtbOrdtLav
.forEach(x -> {
x.setGeneraOrdLavDaProd(false);
x.setDtbOrdr(finalDtbOrdrLav.stream()
.filter(y -> y.getDataOrd().equals(x.getDataOrd()) &&
y.getGestione().equalsIgnoreCase(x.getGestione()) &&
y.getNumOrd().equals(x.getNumOrd()))
.collect(Collectors.toList()));
});
return dtbOrdtLav;
}
private void singleUpdateImported(Connection connection, DtbOrdt importedDtbOrdt) throws Exception {
final HashMap<String, Object> importedKey = new HashMap<String, Object>() {{
put("data_ord", importedDtbOrdt.getDataOrd());
put("num_ord", importedDtbOrdt.getNumOrd());
put("gestione", importedDtbOrdt.getGestione());
}};
String whereCondKey = UtilityQuery.concatFieldsInWhereCond(importedKey);
UtilityDB.executeStatement(connection,
"DELETE FROM dtb_ordr_lav_prev " +
"WHERE (" + whereCondKey + ")",
"DELETE FROM dtb_ordt_lav_prev " +
"WHERE (" + whereCondKey + ")",
"INSERT INTO dtb_ordr_lav_prev " +
"SELECT * FROM dtb_ordr_lav " +
"WHERE (" + whereCondKey + ")",
"INSERT INTO dtb_ordt_lav_prev " +
"SELECT * FROM dtb_ordt_lav " +
"WHERE (" + whereCondKey + ")");
}
}

View File

@@ -0,0 +1,216 @@
package it.integry.ems.system.exchange.service;
import it.integry.ems.expansion.RunnableThrowable;
import it.integry.ems.service.EntityProcessor;
import it.integry.ems.sync.MultiDBTransaction.MultiDBTransactionManager;
import it.integry.ems_model.base.EquatableEntityInterface;
import it.integry.ems_model.entity.DtbOrdSteps;
import it.integry.ems_model.entity.DtbOrdr;
import it.integry.ems_model.entity.DtbOrdt;
import it.integry.ems_model.types.OperationType;
import it.integry.ems_model.utility.Query;
import it.integry.ems_model.utility.UtilityDB;
import it.integry.ems_model.utility.UtilityLocalDate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@SuppressWarnings("rawtypes")
@Service
@Scope("request")
public class ExchangeOrdiniImportService {
//TODO: To be remove, only for fast development
private final String ROSSOGARGANO_EXCHANGE_PROFILEDB = "ROSSO_GARGANO_EXCHANGE";
private final Logger logger = LogManager.getLogger();
@Autowired
private EntityProcessor entityProcessor;
@Autowired
private MultiDBTransactionManager multiDBTransactionManager;
@Autowired
private ExchangeImportSchemaManagerService exchangeImportSchemaManagerService;
@Autowired
private ExchangeImportDataManagerService exchangeImportDataManagerService;
@Autowired
private ExchangePartiteMagazzinoImportService exchangePartiteMagazzinoImportService;
public void importOrdiniLavorazione() throws Exception {
boolean useTempTable = true;
try (MultiDBTransactionManager exchangeDb = new MultiDBTransactionManager(ROSSOGARGANO_EXCHANGE_PROFILEDB)) {
try {
exchangeImportSchemaManagerService.syncSchema(exchangeDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.OrdiniLavorazione, useTempTable);
exchangePartiteMagazzinoImportService.importPartiteMagazzinoLavorazione();
final List<DtbOrdt> exchangeImportedMtbColts = importOrdiniLavorazione(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusWeeks(1),
UtilityLocalDate.getNow(),
true, false);
final List<DtbOrdt> exchangeUpdatedMtbColts = importOrdiniLavorazione(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusWeeks(1),
UtilityLocalDate.getNow(),
false, useTempTable);
List<EquatableEntityInterface> allData = exchangeImportDataManagerService
.runSync(DtbOrdt.class, exchangeImportedMtbColts, exchangeUpdatedMtbColts);
allData.stream()
.map(x -> (DtbOrdt) x)
.filter(x -> x.getGestione().equalsIgnoreCase("L") &&
(x.getOperation() == OperationType.INSERT_OR_UPDATE || x.getOperation() == OperationType.INSERT))
.forEach(x -> {
DtbOrdSteps ordStep =
new DtbOrdSteps()
.setIdRiga(0)
.setCodJfas(x.getCodJfas())
.setQtaProd(x.getQtaProd());
ordStep.setOperation(OperationType.INSERT_OR_UPDATE);
x.getDtbOrdSteps().add(ordStep);
});
List<List<EquatableEntityInterface>> splittedOrders =
Arrays.asList(
allData.stream().filter(x -> ((DtbOrdt) x).getGestione().equalsIgnoreCase("A")).collect(Collectors.toList()),
allData.stream().filter(x -> !((DtbOrdt) x).getGestione().equalsIgnoreCase("A")).collect(Collectors.toList())
);
final Exception[] firstExceptionToThrow = {null};
final AtomicInteger[] importedCounter = {new AtomicInteger()};
for (List<EquatableEntityInterface> listToProcess : splittedOrders) {
List<RunnableThrowable> calls = new ArrayList<>();
for (EquatableEntityInterface dtbOrdtToSave : listToProcess) {
//calls.add(() -> {
logger.debug("Importati {} di {}", importedCounter[0].incrementAndGet(), allData.size());
try {
entityProcessor.processEntity(dtbOrdtToSave, multiDBTransactionManager);
singleUpdateImported(exchangeDb.getPrimaryConnection(), (DtbOrdt) dtbOrdtToSave, useTempTable);
multiDBTransactionManager.commitAll();
exchangeDb.commitAll();
} catch (Exception ex) {
if (firstExceptionToThrow[0] == null) firstExceptionToThrow[0] = ex;
logger.error("Errore durante l'importazione dell'ordine", ex);
multiDBTransactionManager.rollbackAll();
//throw ex;
}
//});
}
//UtilityThread.executeParallel(calls);
}
if (firstExceptionToThrow[0] != null) throw firstExceptionToThrow[0];
} finally {
if (useTempTable)
exchangeImportSchemaManagerService.deleteTempTables(exchangeDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.OrdiniLavorazione);
}
}
}
private List<DtbOrdt> importOrdiniLavorazione(Connection connection,
LocalDate minDate, LocalDate maxDate, boolean retrieveAlreadyImported, boolean useTempTable) throws Exception {
String dtbOrdtOriginalTableName = "dtb_ordt_lav";
String dtbOrdtTableName = dtbOrdtOriginalTableName + (useTempTable ? "_tmp" : "");
String dtbOrdrOriginalTableName = "dtb_ordr_lav";
String dtbOrdrTableName = dtbOrdrOriginalTableName + (useTempTable ? "_tmp" : "");
if (useTempTable) {
UtilityDB.executeStatement(connection,
"INSERT INTO " + dtbOrdtTableName +
" SELECT * FROM " + dtbOrdtOriginalTableName,
"INSERT INTO " + dtbOrdrTableName +
" SELECT * FROM " + dtbOrdrOriginalTableName
);
}
List<DtbOrdt> dtbOrdtLav = exchangeImportDataManagerService.retrieveDataFromExchange(connection, DtbOrdt.class,
dtbOrdtTableName, Query.format("data_ord BETWEEN {} AND {}", minDate, maxDate), retrieveAlreadyImported);
List<DtbOrdr> dtbOrdrLav = exchangeImportDataManagerService.retrieveDataFromExchange(connection, DtbOrdr.class,
dtbOrdrTableName, Query.format("data_ord BETWEEN {} AND {}", minDate, maxDate), retrieveAlreadyImported);
dtbOrdtLav = dtbOrdtLav.stream()
// .filter(x -> x.getGestione().equalsIgnoreCase("A"))
.sorted((o1, o2) -> {
int gestNum1 = o1.getGestione().equalsIgnoreCase("A") ? 1 : 2; //Prima gli ordini di produzione A
int gestNum2 = o2.getGestione().equalsIgnoreCase("A") ? 1 : 2; //Dopo gli ordini di lavorazione L
return Integer.compare(gestNum1, gestNum2);
})
.collect(Collectors.toList());
dtbOrdrLav = dtbOrdrLav.stream()
.filter(x -> x.getGestione().equalsIgnoreCase("L"))
.collect(Collectors.toList());
List<DtbOrdr> finalDtbOrdrLav = dtbOrdrLav;
dtbOrdtLav
.forEach(x -> {
x.setGeneraOrdLavDaProd(false);
x.setDtbOrdr(finalDtbOrdrLav.stream()
.filter(y -> y.getDataOrd().equals(x.getDataOrd()) &&
y.getGestione().equalsIgnoreCase(x.getGestione()) &&
y.getNumOrd().equals(x.getNumOrd()))
.collect(Collectors.toList()));
});
return dtbOrdtLav;
}
private void singleUpdateImported(Connection connection, DtbOrdt importedDtbOrdt, boolean useTempTable) throws Exception {
final HashMap<String, Object> importedKey = new HashMap<String, Object>() {{
put("data_ord", importedDtbOrdt.getDataOrd());
put("num_ord", importedDtbOrdt.getNumOrd());
put("gestione", importedDtbOrdt.getGestione());
}};
exchangeImportDataManagerService.updateImportedStatus(connection, "dtb_ordt_lav", importedKey, useTempTable);
final List<HashMap<String, Object>> importedRowKeys = importedDtbOrdt.getDtbOrdr().stream()
.map(x -> new HashMap<String, Object>() {{
put("data_ord", x.getDataOrd());
put("num_ord", x.getNumOrd());
put("gestione", x.getGestione());
put("riga_ord", x.getRigaOrd());
}})
.collect(Collectors.toList());
exchangeImportDataManagerService.updateImportedStatus(connection, "dtb_ordr_lav", importedRowKeys, useTempTable);
}
}

View File

@@ -0,0 +1,144 @@
package it.integry.ems.system.exchange.service;
import it.integry.ems.expansion.RunnableThrowable;
import it.integry.ems.service.EntityProcessor;
import it.integry.ems.sync.MultiDBTransaction.MultiDBTransactionManager;
import it.integry.ems_model.base.EquatableEntityInterface;
import it.integry.ems_model.entity.MtbPartitaMag;
import it.integry.ems_model.types.OperationType;
import it.integry.ems_model.utility.Query;
import it.integry.ems_model.utility.UtilityDB;
import it.integry.ems_model.utility.UtilityLocalDate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@SuppressWarnings("rawtypes")
@Service
@Scope("request")
public class ExchangePartiteMagazzinoImportService {
//TODO: To be remove, only for fast development
private final String ROSSOGARGANO_EXCHANGE_PROFILEDB = "ROSSO_GARGANO_EXCHANGE";
private final String ROSSOGARGANO_EXCHANGE_USER = "DBA";
@Autowired
private EntityProcessor entityProcessor;
@Autowired
private MultiDBTransactionManager multiDBTransactionManager;
@Autowired
private ExchangeImportSchemaManagerService exchangeImportSchemaManagerService;
@Autowired
private ExchangeImportDataManagerService exchangeImportDataManagerService;
private final Logger logger = LogManager.getLogger();
public void importPartiteMagazzinoLavorazione() throws Exception {
boolean useTempTable = true;
try (MultiDBTransactionManager exchangeDb = new MultiDBTransactionManager(ROSSOGARGANO_EXCHANGE_PROFILEDB)) {
try {
exchangeImportSchemaManagerService.syncSchema(exchangeDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.PartiteMagazzinoLavorazione, useTempTable);
final List<MtbPartitaMag> exchangeImportedData = retrievePartite(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusMonths(1),
UtilityLocalDate.getNow(),
true, false);
final List<MtbPartitaMag> exchangeUpdatedData = retrievePartite(
exchangeDb.getPrimaryConnection(),
UtilityLocalDate.getNow().minusMonths(1),
UtilityLocalDate.getNow(),
false, useTempTable);
List<EquatableEntityInterface> allData = exchangeImportDataManagerService
.runSync(MtbPartitaMag.class, exchangeImportedData, exchangeUpdatedData);
allData.forEach(x -> x.setOperation(x.getOperation() == OperationType.INSERT ? OperationType.INSERT_OR_UPDATE : x.getOperation()));
final Exception[] firstExceptionToThrow = {null};
AtomicInteger importedCounter = new AtomicInteger();
List<RunnableThrowable> calls = new ArrayList<>();
for (EquatableEntityInterface dataToSave : allData) {
//if(dataToSave.getOperation() == OperationType.DELETE) continue;
// calls.add(() -> {
logger.debug("Importate {} partite di magazzino di {}", importedCounter.incrementAndGet(), allData.size());
try {
entityProcessor.processEntity(dataToSave, true, true, ROSSOGARGANO_EXCHANGE_USER, multiDBTransactionManager);
singleUpdateImported(exchangeDb.getPrimaryConnection(), (MtbPartitaMag) dataToSave, useTempTable);
//multiDBTransactionManager.commitAll();
multiDBTransactionManager.commitAll();
exchangeDb.commitAll();
} catch (Exception ex) {
if (firstExceptionToThrow[0] == null) firstExceptionToThrow[0] = ex;
logger.error("Errore durante l'importazione della partita di magazzino", ex);
//multiDBTransactionManager.rollbackAll();
//throw ex;
}
// });
}
// UtilityThread.executeParallel(calls);
if (firstExceptionToThrow[0] != null) throw firstExceptionToThrow[0];
} finally {
if (useTempTable)
exchangeImportSchemaManagerService.deleteTempTables(exchangeDb.getPrimaryConnection(), ExchangeImportSchemaManagerService.SchemaType.PartiteMagazzinoLavorazione);
}
}
}
private List<MtbPartitaMag> retrievePartite(Connection connection,
LocalDate minDate, LocalDate maxDate, boolean retrieveAlreadyImported, boolean useTempTable) throws Exception {
String mtbPartitaMagLavOriginalName = "mtb_partita_mag_lav";
String mtbPartitaMagLavTableName = mtbPartitaMagLavOriginalName + (useTempTable ? "_tmp" : "");
if (useTempTable) {
UtilityDB.executeStatement(connection,
"INSERT INTO " + mtbPartitaMagLavTableName +
" SELECT * FROM " + mtbPartitaMagLavOriginalName
);
}
return exchangeImportDataManagerService.retrieveDataFromExchange(connection, MtbPartitaMag.class,
mtbPartitaMagLavTableName, Query.format("data_ins BETWEEN {} AND {}", minDate, maxDate), retrieveAlreadyImported);
}
private void singleUpdateImported(Connection connection, MtbPartitaMag importedData, boolean useTempTable) throws Exception {
final HashMap<String, Object> importedKey = new HashMap<String, Object>() {{
put("cod_mart", importedData.getCodMart());
put("partita_mag", importedData.getPartitaMag());
}};
exchangeImportDataManagerService.updateImportedStatus(connection, "mtb_partita_mag_lav", importedKey, useTempTable);
}
}