Fix su sincronizzazioni
This commit is contained in:
@@ -13,7 +13,6 @@ import it.integry.ems.task.TaskExecutorService;
|
||||
import it.integry.ems.user.service.UserCacheService;
|
||||
import it.integry.ems_model.entity.Azienda;
|
||||
import it.integry.ems_model.service.SetupGest;
|
||||
import it.integry.ems_model.utility.UtilityDB;
|
||||
import it.integry.ems_model.utility.UtilityString;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@@ -90,10 +89,7 @@ public class EmsCoreDBLoader {
|
||||
futureTasks.add(() -> {
|
||||
try {
|
||||
Connection connection = connectionPool.getConnection(modelAtomicReference.get());
|
||||
String sql = "SELECT db_distributore FROM azienda";
|
||||
String dbDistributore = UtilityDB.executeSimpleQueryOnlyFirstRowFirstColumn(connection, sql);
|
||||
|
||||
multiDBTransactionManager.addConnection(connection, modelAtomicReference.get().getDbName().equalsIgnoreCase(dbDistributore));
|
||||
multiDBTransactionManager.addConnection(connection);
|
||||
|
||||
return null;
|
||||
} catch (Exception ex) {
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
package it.integry.ems.exception;
|
||||
|
||||
public class DistributoreDatabaseNotPresentException extends Exception {
|
||||
|
||||
public DistributoreDatabaseNotPresentException() {
|
||||
super("Distributore database not present yet");
|
||||
}
|
||||
}
|
||||
@@ -255,8 +255,10 @@ public class EntityProcessor {
|
||||
entity.manageWithParentConnection(primaryDB);
|
||||
} else {
|
||||
|
||||
String profileDbDistributore = syncManager.getDistributoreProfileDb(currentProfileDb);
|
||||
// String profileDbDistributore = syncManager.getDistributoreProfileDb(currentProfileDb);
|
||||
String profileDbDistributore = multiDBTransactionManager.getDistributoreProfileName();
|
||||
List<StbSubscription> syncDetails = syncManager.getSyncDetailIfPresent(currentDB.getProfileName(), profileDbDistributore, entity.getTableName());
|
||||
|
||||
boolean isSyncActive = syncDetails != null && !syncDetails.isEmpty();
|
||||
boolean isCurrentlyOnDistributore = UtilityString.isNullOrEmpty(profileDbDistributore) || profileDbDistributore.equalsIgnoreCase(currentProfileDb);
|
||||
|
||||
@@ -264,11 +266,11 @@ public class EntityProcessor {
|
||||
logger.info("SINCRONIZZAZIONE su entity " + entity.getTableName() + " ONLINE: " + isSyncActive);
|
||||
|
||||
if (!isCurrentlyOnDistributore) {
|
||||
if (!multiDBTransactionManager.containsDB(profileDbDistributore)) {
|
||||
multiDBTransactionManager.addConnection(profileDbDistributore, true);
|
||||
}
|
||||
// if (!multiDBTransactionManager.containsDB(profileDbDistributore)) {
|
||||
// multiDBTransactionManager.addConnection(profileDbDistributore, true);
|
||||
// }
|
||||
|
||||
currentDB = multiDBTransactionManager.getDatabaseConnection(profileDbDistributore);
|
||||
currentDB = multiDBTransactionManager.addAndGetDistributoreConnection();
|
||||
}
|
||||
|
||||
StbSubscription syncDetail = syncDetails.stream()
|
||||
|
||||
@@ -1,91 +0,0 @@
|
||||
package it.integry.ems.sync.MultiDBTransaction;
|
||||
|
||||
import it.integry.ems.datasource.DataSource;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Savepoint;
|
||||
|
||||
|
||||
public class AdvancedDataSource {
|
||||
|
||||
private String profileName;
|
||||
|
||||
private DataSource dataSource;
|
||||
|
||||
private boolean isDistributore;
|
||||
private boolean isInternalDb;
|
||||
|
||||
private Savepoint lastSavePoint;
|
||||
|
||||
public AdvancedDataSource(String profileName, DataSource dataSource, boolean isDistributore, boolean isInternalDb) {
|
||||
this.profileName = profileName;
|
||||
this.dataSource = dataSource;
|
||||
this.isDistributore = isDistributore;
|
||||
this.isInternalDb = isInternalDb;
|
||||
}
|
||||
|
||||
public void makeSavePoint() throws SQLException, IOException {
|
||||
if (!dataSource.isClosed()) {
|
||||
lastSavePoint = dataSource.getConnection().setSavepoint();
|
||||
}
|
||||
}
|
||||
|
||||
public void rollbackNow() throws SQLException, IOException {
|
||||
if (!dataSource.isClosed()) {
|
||||
if (lastSavePoint != null) {
|
||||
dataSource.getConnection().rollback(lastSavePoint);
|
||||
} else {
|
||||
dataSource.getConnection().rollback();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void commit() throws SQLException, IOException {
|
||||
if (!dataSource.isClosed()) {
|
||||
dataSource.getConnection().commit();
|
||||
}
|
||||
}
|
||||
|
||||
void close() throws SQLException, IOException {
|
||||
if (!dataSource.isClosed()) {
|
||||
dataSource.getConnection().close();
|
||||
}
|
||||
}
|
||||
|
||||
boolean isClosed() throws SQLException {
|
||||
return dataSource.isClosed();
|
||||
}
|
||||
|
||||
public Connection getConnection() throws IOException, SQLException {
|
||||
return dataSource.getConnection();
|
||||
}
|
||||
|
||||
public DataSource getDataSource() {
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
public void setDataSource(DataSource dataSource) {
|
||||
this.dataSource = dataSource;
|
||||
}
|
||||
|
||||
public boolean getIsDistributore() {
|
||||
return isDistributore;
|
||||
}
|
||||
|
||||
public void setDistributore(boolean distributore) {
|
||||
isDistributore = distributore;
|
||||
}
|
||||
|
||||
public String getProfileName() {
|
||||
return profileName;
|
||||
}
|
||||
|
||||
public void setProfileName(String profileName) {
|
||||
this.profileName = profileName;
|
||||
}
|
||||
|
||||
public boolean isInternalDb() {
|
||||
return isInternalDb;
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.sql.SQLException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -90,7 +91,7 @@ public class BasicConnectionPool {
|
||||
return ds;
|
||||
}
|
||||
|
||||
public Connection getConnection(AvailableConnectionsModel connectionsModel) throws Exception {
|
||||
public Connection getConnection(AvailableConnectionsModel connectionsModel) throws SQLException {
|
||||
if (connectionsModel == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ public class Connection implements java.sql.Connection {
|
||||
private long sessionId;
|
||||
|
||||
private String profileName;
|
||||
private boolean isDistributore;
|
||||
private boolean isInternalDb;
|
||||
|
||||
|
||||
@@ -347,16 +346,6 @@ public class Connection implements java.sql.Connection {
|
||||
this.profileName = profileName;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isDistributore() {
|
||||
return isDistributore;
|
||||
}
|
||||
|
||||
public Connection setDistributore(boolean distributore) {
|
||||
isDistributore = distributore;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isInternalDb() {
|
||||
return isInternalDb;
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package it.integry.ems.sync.MultiDBTransaction;
|
||||
|
||||
import it.integry.common.var.EmsDBConst;
|
||||
import it.integry.ems._context.ApplicationContextProvider;
|
||||
import it.integry.ems.exception.DistributoreDatabaseNotPresentException;
|
||||
import it.integry.ems.exception.PrimaryDatabaseNotPresentException;
|
||||
import it.integry.ems.javabeans.RequestDataDTO;
|
||||
import it.integry.ems.properties.EmsProperties;
|
||||
@@ -42,6 +44,9 @@ public class MultiDBTransactionManager implements AutoCloseable {
|
||||
@Autowired
|
||||
private RequestDataDTO requestData;
|
||||
|
||||
@Autowired
|
||||
private EmsDBConst emsDBConst;
|
||||
|
||||
|
||||
private boolean enableLog = true;
|
||||
|
||||
@@ -95,17 +100,13 @@ public class MultiDBTransactionManager implements AutoCloseable {
|
||||
this.addConnection(availableConnectionsModel.getProfileName());
|
||||
}
|
||||
|
||||
public void addConnection(String profileName) throws Exception {
|
||||
this.addConnection(profileName, false);
|
||||
}
|
||||
|
||||
public void addConnection(String profileName, boolean isDistributore) throws Exception {
|
||||
public void addConnection(String profileName) throws SQLException {
|
||||
AvailableConnectionsModel availableConnectionsModel = settingsModel.findConnectionModel(profileName);
|
||||
Connection connection = connectionPool.getConnection(availableConnectionsModel);
|
||||
this.addConnection(connection, isDistributore);
|
||||
this.addConnection(connection);
|
||||
}
|
||||
|
||||
public void addConnection(Connection connection, boolean isDistributore) {
|
||||
public void addConnection(Connection connection) {
|
||||
if (allConnections != null) {
|
||||
if (requestData != null && requestData.getRequestURI() != null) {
|
||||
String methodName = " [" + requestData.getRequestURI() + "]";
|
||||
@@ -118,30 +119,20 @@ public class MultiDBTransactionManager implements AutoCloseable {
|
||||
connectionListLock.lock();
|
||||
try {
|
||||
allConnections.add(connection);
|
||||
|
||||
if (isDistributore) {
|
||||
allConnections = allConnections.stream()
|
||||
.sorted((o1, o2) -> {
|
||||
if (o1.isDistributore() && !o2.isDistributore()) return -1;
|
||||
else if (!o1.isDistributore() && o2.isDistributore()) return 1;
|
||||
else return 0;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
} finally {
|
||||
connectionListLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean containsDB(final String dbName) {
|
||||
public boolean containsDB(final String profileDb) {
|
||||
if (allConnections != null) {
|
||||
return allConnections.stream()
|
||||
.anyMatch(advancedDataSource -> advancedDataSource.getProfileName().equalsIgnoreCase(dbName));
|
||||
.anyMatch(advancedDataSource -> advancedDataSource.getProfileName().equalsIgnoreCase(profileDb));
|
||||
} else return false;
|
||||
}
|
||||
|
||||
public Connection getDatabaseConnection(final String profileName) throws Exception {
|
||||
public Connection getDatabaseConnection(final String profileName) {
|
||||
List<Connection> resultList = allConnections.stream()
|
||||
.filter(advancedDataSource -> advancedDataSource.getProfileName().equalsIgnoreCase(profileName))
|
||||
.collect(Collectors.toList());
|
||||
@@ -149,7 +140,7 @@ public class MultiDBTransactionManager implements AutoCloseable {
|
||||
if (!resultList.isEmpty()) {
|
||||
return resultList.get(0);
|
||||
} else {
|
||||
throw new Exception("Database not present in connections list");
|
||||
throw new RuntimeException("Database not present in connections list");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,17 +153,25 @@ public class MultiDBTransactionManager implements AutoCloseable {
|
||||
}
|
||||
}
|
||||
|
||||
public String getDistributoreProfileName() throws PrimaryDatabaseNotPresentException {
|
||||
for (Connection connection : allConnections) {
|
||||
if (connection.isDistributore()) {
|
||||
return connection.getProfileName();
|
||||
}
|
||||
}
|
||||
return getPrimaryConnection().getProfileName();
|
||||
public String getDistributoreProfileName() throws DistributoreDatabaseNotPresentException, PrimaryDatabaseNotPresentException {
|
||||
final String dbDistributoreDbName = emsDBConst.getConsts(getPrimaryConnection().getDbName())
|
||||
.getAzienda()
|
||||
.getDbDistributore();
|
||||
|
||||
if(dbDistributoreDbName == null)
|
||||
throw new DistributoreDatabaseNotPresentException();
|
||||
|
||||
return dbDistributoreDbName;
|
||||
|
||||
}
|
||||
|
||||
public Connection getDistributoreConnection() throws Exception {
|
||||
return connectionPool.getConnection(settingsModel.findConnectionModel(getDistributoreProfileName()));
|
||||
public Connection addAndGetDistributoreConnection() throws DistributoreDatabaseNotPresentException, SQLException, PrimaryDatabaseNotPresentException {
|
||||
final String distributoreProfileName = getDistributoreProfileName();
|
||||
|
||||
if(!containsDB(distributoreProfileName))
|
||||
addConnection(distributoreProfileName);
|
||||
|
||||
return getDatabaseConnection(distributoreProfileName);
|
||||
}
|
||||
|
||||
|
||||
@@ -295,7 +294,7 @@ public class MultiDBTransactionManager implements AutoCloseable {
|
||||
primaryConnection = connectionPool.getConnection(connectionModel);
|
||||
|
||||
this.allConnections = new ArrayList<>();
|
||||
this.addConnection(primaryConnection, false);
|
||||
this.addConnection(primaryConnection);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,251 +1,250 @@
|
||||
package it.integry.ems.sync;
|
||||
|
||||
import it.integry.annotations.PostContextConstruct;
|
||||
import it.integry.common.var.EmsDBConst;
|
||||
import it.integry.ems.dynamic_cache.DynamicCacheService;
|
||||
import it.integry.ems.exception.SyncException;
|
||||
import it.integry.ems.json.ResponseJSONObjectMapper;
|
||||
import it.integry.ems.service.MailService;
|
||||
import it.integry.ems.settings.Model.AvailableConnectionsModel;
|
||||
import it.integry.ems.settings.Model.SettingsModel;
|
||||
import it.integry.ems.sync.MultiDBTransaction.Connection;
|
||||
import it.integry.ems.sync.MultiDBTransaction.MultiDBTransactionManager;
|
||||
import it.integry.ems_model.base.EntityBase;
|
||||
import it.integry.ems_model.base.EntityInterface;
|
||||
import it.integry.ems_model.entity.StbSubscription;
|
||||
import it.integry.ems_model.exception.EntityException;
|
||||
import it.integry.ems_model.service.SetupGest;
|
||||
import it.integry.ems_model.types.OperationType;
|
||||
import it.integry.ems_model.utility.UtilityString;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.josql.Query;
|
||||
import org.josql.QueryResults;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
public class SyncManager {
|
||||
|
||||
private final Logger logger = LogManager.getLogger();
|
||||
|
||||
@Autowired
|
||||
private ResponseJSONObjectMapper objectMapper;
|
||||
@Autowired
|
||||
private SetupGest setupGest;
|
||||
@Autowired
|
||||
private MailService mailService;
|
||||
|
||||
@Autowired
|
||||
private SettingsModel settingsModel;
|
||||
|
||||
@Autowired
|
||||
private DynamicCacheService dynamicCacheService;
|
||||
|
||||
@Autowired
|
||||
private EmsDBConst emsDBConst;
|
||||
|
||||
|
||||
@PostContextConstruct
|
||||
public void init() {
|
||||
List<String> dbNames = settingsModel.getAvailableConnections().stream()
|
||||
.filter(AvailableConnectionsModel::getInternalDb)
|
||||
.map(AvailableConnectionsModel::getDbName)
|
||||
.distinct()
|
||||
.collect(Collectors.toList());
|
||||
|
||||
dynamicCacheService.addEntity(DynamicCacheService.Keys.ENTITY_SYNC_KEY, 5, StbSubscription.class, dbNames);
|
||||
}
|
||||
|
||||
public void executeSynchronization(EntityInterface entitySource, EntityInterface entityCloned, MultiDBTransactionManager multiDBTransactionManager) throws Exception {
|
||||
StringBuffer[] lastProfileUsed = new StringBuffer[]{new StringBuffer("")};
|
||||
|
||||
try {
|
||||
logger.debug("Avvio sincronizzazione per " + entitySource.getTableName() + " su " + multiDBTransactionManager.getDistributoreProfileName());
|
||||
if (multiDBTransactionManager.getPrimaryConnection().getProfileName().equalsIgnoreCase(multiDBTransactionManager.getDistributoreProfileName())) {
|
||||
try {
|
||||
entitySource.manageWithParentConnection(multiDBTransactionManager.getPrimaryConnection());
|
||||
proceedSyncronization(entitySource, entityCloned, lastProfileUsed, multiDBTransactionManager);
|
||||
multiDBTransactionManager.getPrimaryConnection().commit();
|
||||
} catch (Exception e) {
|
||||
multiDBTransactionManager.getPrimaryConnection().rollback();
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
try (Connection distributoreConnection = multiDBTransactionManager.getDistributoreConnection()) {
|
||||
try {
|
||||
entityCloned.dataCompleting(distributoreConnection);
|
||||
entityCloned.manageWithParentConnection(distributoreConnection);
|
||||
|
||||
proceedSyncronization(entitySource, entityCloned, lastProfileUsed, multiDBTransactionManager);
|
||||
|
||||
distributoreConnection.commit();
|
||||
} catch (Exception e) {
|
||||
distributoreConnection.rollback();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
String subscriptor = null;
|
||||
if (e instanceof SyncException) subscriptor = ((SyncException) e).getProfileDb();
|
||||
else {
|
||||
if (UtilityString.isNullOrEmpty(lastProfileUsed[0].toString())) subscriptor = "";
|
||||
else subscriptor = lastProfileUsed[0].toString();
|
||||
}
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
objectMapper.writeValue(baos, entitySource);
|
||||
baos.close();
|
||||
|
||||
notifyFailSyncronization(e, multiDBTransactionManager.getDistributoreProfileName(), subscriptor, "", baos.toByteArray(), multiDBTransactionManager.getPrimaryConnection());
|
||||
|
||||
|
||||
if (!(e instanceof SyncException)) {
|
||||
String profileDb;
|
||||
if (UtilityString.isNullOrEmpty(lastProfileUsed[0].toString()))
|
||||
profileDb = multiDBTransactionManager.getPrimaryConnection().getProfileName();
|
||||
else profileDb = lastProfileUsed[0].toString();
|
||||
|
||||
e = new SyncException(e, profileDb);
|
||||
}
|
||||
|
||||
throw e;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void notifyFailSyncronization(Exception e, String publicator, String subscriptor, String transactionId, byte[] entityByteArray, Connection connection) throws Exception {
|
||||
|
||||
String htmlMailBody;
|
||||
|
||||
if (e instanceof EntityException && ((EntityException) e).getSqlErrorCode() != null && ((EntityException) e).getSqlErrorCode() == 2627) {
|
||||
//Se l'errore è di chiave duplicata non inviamo nessuna email
|
||||
} else if (e instanceof SQLException && "23000".equals(((SQLException) e).getSQLState())) {
|
||||
//Se l'errore è di chiave duplicata non inviamo nessuna email
|
||||
} else {
|
||||
String toEmailDbg = setupGest.getSetup(connection, "WS_REST", "SYNC_ONLINE", "EMAIL_FOR_LOG");
|
||||
|
||||
if (UtilityString.isNullOrEmpty(toEmailDbg))
|
||||
return;
|
||||
|
||||
String tipoSync;
|
||||
|
||||
if (UtilityString.isNullOrEmpty(transactionId)) tipoSync = "online";
|
||||
else tipoSync = "offline (#" + transactionId + ")";
|
||||
|
||||
mailService.sendErrorMail(connection, toEmailDbg, "Sincronizzazione", "Sincronizzazione " + tipoSync,
|
||||
null, null, null, Collections.singletonList(e));
|
||||
}
|
||||
}
|
||||
|
||||
private void proceedSyncronization(EntityInterface entitySource, EntityInterface entityCloned, StringBuffer[] lastProfileUsed, MultiDBTransactionManager multiDBTransactionManager) throws Exception {
|
||||
|
||||
String profileDbDistributore = multiDBTransactionManager.getDistributoreProfileName();
|
||||
List<StbSubscription> subscriptions = dynamicCacheService.getEntity(DynamicCacheService.Keys.ENTITY_SYNC_KEY, profileDbDistributore);
|
||||
|
||||
if (subscriptions == null)
|
||||
return;
|
||||
|
||||
subscriptions = subscriptions.stream()
|
||||
.filter(x -> x.isFlagAttivo() &&
|
||||
x.getTableNameField().toLowerCase().equalsIgnoreCase(entitySource.getTableName().toLowerCase()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (subscriptions.isEmpty())
|
||||
return;
|
||||
|
||||
subscriptions.forEach(x ->
|
||||
x.setRecalcColumnsField(x.getUserNameField().equalsIgnoreCase(profileDbDistributore) ? null : UtilityString.nullIfBlank(x.getRecalcColumnsField())));
|
||||
|
||||
for (StbSubscription subscription : subscriptions) {
|
||||
String whereCond = subscription.getWhereCondSql();
|
||||
String database = subscription.getUserNameField();
|
||||
String recalcColumns = subscription.getRecalcColumnsField();
|
||||
lastProfileUsed[0] = new StringBuffer(database);
|
||||
|
||||
EntityInterface entityToSave = null;
|
||||
if (!database.equalsIgnoreCase(multiDBTransactionManager.getPrimaryConnection().getProfileName())) {
|
||||
entityToSave = entityCloned.deepClone();
|
||||
entityToSave.setUsername(profileDbDistributore);
|
||||
} else {
|
||||
entityToSave = entitySource.deepClone();
|
||||
}
|
||||
|
||||
if (!UtilityString.isNullOrEmpty(whereCond)) {
|
||||
List<EntityBase> myObjs = new ArrayList<>();
|
||||
myObjs.add((EntityBase) entityToSave);
|
||||
|
||||
String selectSql = "SELECT * FROM " + entityToSave.getClass().getCanonicalName() + " WHERE " + whereCond;
|
||||
|
||||
// Create a new Query.
|
||||
Query q = new Query();
|
||||
q.parse(selectSql);
|
||||
|
||||
QueryResults qr = q.execute(myObjs);
|
||||
List res = qr.getResults();
|
||||
|
||||
if (res == null || res.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (!multiDBTransactionManager.containsDB(database)) {
|
||||
multiDBTransactionManager.addConnection(database);
|
||||
}
|
||||
|
||||
Connection syncConnection = multiDBTransactionManager.getDatabaseConnection(database);
|
||||
|
||||
//IMPORTANTE
|
||||
//inizializza un nuovo arraylist
|
||||
entityToSave.setRecalcColumns(new ArrayList<>());
|
||||
|
||||
//gestione nel caso non siano sincronizzati
|
||||
if (entityToSave.getOperation() == OperationType.UPDATE /*|| cloned.getOperation() == OperationType.NO_OP*/)
|
||||
entityToSave.setOperation(OperationType.INSERT_OR_UPDATE);
|
||||
|
||||
if (!database.equalsIgnoreCase(multiDBTransactionManager.getPrimaryConnection().getProfileName())) {
|
||||
if (recalcColumns != null) {
|
||||
entityToSave.setCompletingManager(entityToSave.getCompletingManager());
|
||||
entityToSave.setRecalcColumns(Arrays.asList(recalcColumns.split(",")));
|
||||
entityToSave.dataCompleting(syncConnection);
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("SYNC entity " + entityToSave.getTableName() + " su " + syncConnection.getCatalog());
|
||||
entityToSave.manageWithParentConnection(syncConnection);
|
||||
}
|
||||
}
|
||||
|
||||
public String getDistributoreProfileDb(String profileDb) {
|
||||
return emsDBConst.getConstsByProfile(profileDb)
|
||||
.getAzienda().getDbDistributore();
|
||||
}
|
||||
|
||||
public List<StbSubscription> getSyncDetailIfPresent(String profileDb, String profileDbDistributore, String tableName) {
|
||||
String dbNameDistributore = settingsModel.getDbNameFromProfileDb(UtilityString.isNull(profileDbDistributore, profileDb));
|
||||
|
||||
List<StbSubscription> subscriptions = dynamicCacheService.getEntity(DynamicCacheService.Keys.ENTITY_SYNC_KEY, dbNameDistributore);
|
||||
|
||||
if (subscriptions == null) return null;
|
||||
|
||||
boolean isCurrentlyOnDistributore = profileDb.equalsIgnoreCase(profileDbDistributore);
|
||||
|
||||
List<StbSubscription> subscriptionDetail = subscriptions.stream()
|
||||
.filter(x -> x.isFlagAttivo() &&
|
||||
x.getTableNameField().toLowerCase().equalsIgnoreCase(tableName.toLowerCase())
|
||||
&& (isCurrentlyOnDistributore || x.getUserNameField().equalsIgnoreCase(profileDb))
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
return subscriptionDetail;
|
||||
}
|
||||
|
||||
}
|
||||
package it.integry.ems.sync;
|
||||
|
||||
import it.integry.annotations.PostContextConstruct;
|
||||
import it.integry.common.var.EmsDBConst;
|
||||
import it.integry.ems.dynamic_cache.DynamicCacheService;
|
||||
import it.integry.ems.exception.SyncException;
|
||||
import it.integry.ems.json.ResponseJSONObjectMapper;
|
||||
import it.integry.ems.service.MailService;
|
||||
import it.integry.ems.settings.Model.AvailableConnectionsModel;
|
||||
import it.integry.ems.settings.Model.SettingsModel;
|
||||
import it.integry.ems.sync.MultiDBTransaction.Connection;
|
||||
import it.integry.ems.sync.MultiDBTransaction.MultiDBTransactionManager;
|
||||
import it.integry.ems_model.base.EntityBase;
|
||||
import it.integry.ems_model.base.EntityInterface;
|
||||
import it.integry.ems_model.entity.StbSubscription;
|
||||
import it.integry.ems_model.exception.EntityException;
|
||||
import it.integry.ems_model.service.SetupGest;
|
||||
import it.integry.ems_model.types.OperationType;
|
||||
import it.integry.ems_model.utility.UtilityString;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.josql.Query;
|
||||
import org.josql.QueryResults;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
public class SyncManager {
|
||||
|
||||
private final Logger logger = LogManager.getLogger();
|
||||
|
||||
@Autowired
|
||||
private ResponseJSONObjectMapper objectMapper;
|
||||
@Autowired
|
||||
private SetupGest setupGest;
|
||||
@Autowired
|
||||
private MailService mailService;
|
||||
|
||||
@Autowired
|
||||
private SettingsModel settingsModel;
|
||||
|
||||
@Autowired
|
||||
private DynamicCacheService dynamicCacheService;
|
||||
|
||||
@Autowired
|
||||
private EmsDBConst emsDBConst;
|
||||
|
||||
|
||||
@PostContextConstruct
|
||||
public void init() {
|
||||
List<String> dbNames = settingsModel.getAvailableConnections().stream()
|
||||
.filter(AvailableConnectionsModel::getInternalDb)
|
||||
.map(AvailableConnectionsModel::getDbName)
|
||||
.distinct()
|
||||
.collect(Collectors.toList());
|
||||
|
||||
dynamicCacheService.addEntity(DynamicCacheService.Keys.ENTITY_SYNC_KEY, 5, StbSubscription.class, dbNames);
|
||||
}
|
||||
|
||||
public void executeSynchronization(EntityInterface entitySource, EntityInterface entityCloned, MultiDBTransactionManager multiDBTransactionManager) throws Exception {
|
||||
StringBuffer[] lastProfileUsed = new StringBuffer[]{new StringBuffer("")};
|
||||
|
||||
try {
|
||||
logger.debug("Avvio sincronizzazione per " + entitySource.getTableName() + " su " + multiDBTransactionManager.getDistributoreProfileName());
|
||||
|
||||
Connection connection;
|
||||
EntityInterface entityToSave;
|
||||
|
||||
if (multiDBTransactionManager.getPrimaryConnection().getProfileName().equalsIgnoreCase(multiDBTransactionManager.getDistributoreProfileName())) {
|
||||
connection = multiDBTransactionManager.getPrimaryConnection();
|
||||
entityToSave = entitySource;
|
||||
} else {
|
||||
connection = multiDBTransactionManager.addAndGetDistributoreConnection();
|
||||
entityToSave = entityCloned;
|
||||
|
||||
entityToSave.dataCompleting(connection);
|
||||
}
|
||||
|
||||
entityToSave.manageWithParentConnection(connection);
|
||||
proceedSyncronization(entitySource, entityCloned, lastProfileUsed, multiDBTransactionManager);
|
||||
|
||||
multiDBTransactionManager.commitAll();
|
||||
|
||||
} catch (Exception e) {
|
||||
multiDBTransactionManager.rollbackAll();
|
||||
|
||||
String subscriptor = null;
|
||||
if (e instanceof SyncException) subscriptor = ((SyncException) e).getProfileDb();
|
||||
else {
|
||||
if (UtilityString.isNullOrEmpty(lastProfileUsed[0].toString())) subscriptor = "";
|
||||
else subscriptor = lastProfileUsed[0].toString();
|
||||
}
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
objectMapper.writeValue(baos, entitySource);
|
||||
baos.close();
|
||||
|
||||
notifyFailSyncronization(e, multiDBTransactionManager.getDistributoreProfileName(), subscriptor, "", baos.toByteArray(), multiDBTransactionManager.getPrimaryConnection());
|
||||
|
||||
|
||||
if (!(e instanceof SyncException)) {
|
||||
String profileDb;
|
||||
if (UtilityString.isNullOrEmpty(lastProfileUsed[0].toString()))
|
||||
profileDb = multiDBTransactionManager.getPrimaryConnection().getProfileName();
|
||||
else profileDb = lastProfileUsed[0].toString();
|
||||
|
||||
e = new SyncException(e, profileDb);
|
||||
}
|
||||
|
||||
throw e;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void notifyFailSyncronization(Exception e, String publicator, String subscriptor, String transactionId, byte[] entityByteArray, Connection connection) throws Exception {
|
||||
|
||||
String htmlMailBody;
|
||||
|
||||
if (e instanceof EntityException && ((EntityException) e).getSqlErrorCode() != null && ((EntityException) e).getSqlErrorCode() == 2627) {
|
||||
//Se l'errore è di chiave duplicata non inviamo nessuna email
|
||||
} else if (e instanceof SQLException && "23000".equals(((SQLException) e).getSQLState())) {
|
||||
//Se l'errore è di chiave duplicata non inviamo nessuna email
|
||||
} else {
|
||||
String toEmailDbg = setupGest.getSetup(connection, "WS_REST", "SYNC_ONLINE", "EMAIL_FOR_LOG");
|
||||
|
||||
if (UtilityString.isNullOrEmpty(toEmailDbg))
|
||||
return;
|
||||
|
||||
String tipoSync;
|
||||
|
||||
if (UtilityString.isNullOrEmpty(transactionId)) tipoSync = "online";
|
||||
else tipoSync = "offline (#" + transactionId + ")";
|
||||
|
||||
mailService.sendErrorMail(connection, toEmailDbg, "Sincronizzazione", "Sincronizzazione " + tipoSync,
|
||||
null, null, null, Collections.singletonList(e));
|
||||
}
|
||||
}
|
||||
|
||||
private void proceedSyncronization(EntityInterface entitySource, EntityInterface entityCloned, StringBuffer[] lastProfileUsed, MultiDBTransactionManager multiDBTransactionManager) throws Exception {
|
||||
|
||||
String profileDbDistributore = multiDBTransactionManager.getDistributoreProfileName();
|
||||
List<StbSubscription> subscriptions = dynamicCacheService.getEntity(DynamicCacheService.Keys.ENTITY_SYNC_KEY, profileDbDistributore);
|
||||
|
||||
if (subscriptions == null)
|
||||
return;
|
||||
|
||||
subscriptions = subscriptions.stream()
|
||||
.filter(x -> x.isFlagAttivo() &&
|
||||
x.getTableNameField().toLowerCase().equalsIgnoreCase(entitySource.getTableName().toLowerCase()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (subscriptions.isEmpty())
|
||||
return;
|
||||
|
||||
subscriptions.forEach(x ->
|
||||
x.setRecalcColumnsField(x.getUserNameField().equalsIgnoreCase(profileDbDistributore) ? null : UtilityString.nullIfBlank(x.getRecalcColumnsField())));
|
||||
|
||||
for (StbSubscription subscription : subscriptions) {
|
||||
String whereCond = subscription.getWhereCondSql();
|
||||
String database = subscription.getUserNameField();
|
||||
String recalcColumns = subscription.getRecalcColumnsField();
|
||||
lastProfileUsed[0] = new StringBuffer(database);
|
||||
|
||||
EntityInterface entityToSave = null;
|
||||
if (!database.equalsIgnoreCase(multiDBTransactionManager.getPrimaryConnection().getProfileName())) {
|
||||
entityToSave = entityCloned.deepClone();
|
||||
entityToSave.setUsername(profileDbDistributore);
|
||||
} else {
|
||||
entityToSave = entitySource.deepClone();
|
||||
}
|
||||
|
||||
if (!UtilityString.isNullOrEmpty(whereCond)) {
|
||||
List<EntityBase> myObjs = new ArrayList<>();
|
||||
myObjs.add((EntityBase) entityToSave);
|
||||
|
||||
String selectSql = "SELECT * FROM " + entityToSave.getClass().getCanonicalName() + " WHERE " + whereCond;
|
||||
|
||||
// Create a new Query.
|
||||
Query q = new Query();
|
||||
q.parse(selectSql);
|
||||
|
||||
QueryResults qr = q.execute(myObjs);
|
||||
List res = qr.getResults();
|
||||
|
||||
if (res == null || res.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (!multiDBTransactionManager.containsDB(database)) {
|
||||
multiDBTransactionManager.addConnection(database);
|
||||
}
|
||||
|
||||
Connection syncConnection = multiDBTransactionManager.getDatabaseConnection(database);
|
||||
|
||||
//IMPORTANTE
|
||||
//inizializza un nuovo arraylist
|
||||
entityToSave.setRecalcColumns(new ArrayList<>());
|
||||
|
||||
//gestione nel caso non siano sincronizzati
|
||||
if (entityToSave.getOperation() == OperationType.UPDATE /*|| cloned.getOperation() == OperationType.NO_OP*/)
|
||||
entityToSave.setOperation(OperationType.INSERT_OR_UPDATE);
|
||||
|
||||
if (!database.equalsIgnoreCase(multiDBTransactionManager.getPrimaryConnection().getProfileName())) {
|
||||
if (recalcColumns != null) {
|
||||
entityToSave.setCompletingManager(entityToSave.getCompletingManager());
|
||||
entityToSave.setRecalcColumns(Arrays.asList(recalcColumns.split(",")));
|
||||
entityToSave.dataCompleting(syncConnection);
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("SYNC entity " + entityToSave.getTableName() + " su " + syncConnection.getCatalog());
|
||||
entityToSave.manageWithParentConnection(syncConnection);
|
||||
}
|
||||
}
|
||||
|
||||
public String getDistributoreProfileDb(String profileDb) {
|
||||
return emsDBConst.getConstsByProfile(profileDb)
|
||||
.getAzienda()
|
||||
.getDbDistributore();
|
||||
}
|
||||
|
||||
public List<StbSubscription> getSyncDetailIfPresent(String profileDb, String profileDbDistributore, String tableName) {
|
||||
String dbNameDistributore = settingsModel.getDbNameFromProfileDb(UtilityString.isNull(profileDbDistributore, profileDb));
|
||||
|
||||
List<StbSubscription> subscriptions = dynamicCacheService.getEntity(DynamicCacheService.Keys.ENTITY_SYNC_KEY, dbNameDistributore);
|
||||
|
||||
if (subscriptions == null) return null;
|
||||
|
||||
boolean isCurrentlyOnDistributore = profileDb.equalsIgnoreCase(profileDbDistributore);
|
||||
|
||||
List<StbSubscription> subscriptionDetail = subscriptions.stream()
|
||||
.filter(x -> x.isFlagAttivo() &&
|
||||
x.getTableNameField().toLowerCase().equalsIgnoreCase(tableName.toLowerCase())
|
||||
&& (isCurrentlyOnDistributore || x.getUserNameField().equalsIgnoreCase(profileDb))
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
return subscriptionDetail;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user