From 49b69555ace3b4d78fb167a3adbcde5d726e3673 Mon Sep 17 00:00:00 2001 From: GiuseppeS Date: Tue, 21 Jan 2025 18:44:21 +0100 Subject: [PATCH] Cambiata gestione della pool di connessioni --- .../integry/ems/controller/EmsController.java | 17 +- .../model/Migration_20230411163906.java | 7 +- .../BasicConnectionPool.java | 246 +++++++++++++----- .../MultiDBTransactionManager.java | 21 +- 4 files changed, 203 insertions(+), 88 deletions(-) diff --git a/ems-core/src/main/java/it/integry/ems/controller/EmsController.java b/ems-core/src/main/java/it/integry/ems/controller/EmsController.java index 16b1b92e2d..e7e6193d9b 100644 --- a/ems-core/src/main/java/it/integry/ems/controller/EmsController.java +++ b/ems-core/src/main/java/it/integry/ems/controller/EmsController.java @@ -337,19 +337,12 @@ public class EmsController { } catch (Exception e) { logger.error(request.getRequestURI(), e); response = new ServiceRestResponse(EsitoType.KO, e, multiDBTransactionManager.getPrimaryDatasource().getProfile(), entity); - } finally { - try { - multiDBTransactionManager.closeAll(); - } catch (Exception e) { - logger.error(request.getRequestURI(), e); - response = new ServiceRestResponse(EsitoType.KO, e, multiDBTransactionManager.getPrimaryDatasource().getProfile(), entity); - } + } - if (entity.getOperation() == OperationType.SELECT_OBJECT) { - return obj; - } else { - return response; - } + if (entity.getOperation() == OperationType.SELECT_OBJECT) { + return obj; + } else { + return response; } } diff --git a/ems-core/src/main/java/it/integry/ems/migration/model/Migration_20230411163906.java b/ems-core/src/main/java/it/integry/ems/migration/model/Migration_20230411163906.java index b1bca41532..b57bc641ac 100644 --- a/ems-core/src/main/java/it/integry/ems/migration/model/Migration_20230411163906.java +++ b/ems-core/src/main/java/it/integry/ems/migration/model/Migration_20230411163906.java @@ -30,10 +30,8 @@ public class Migration_20230411163906 extends BaseMigration implements Migration "WHERE mime_type IS NULL"); new Thread(() -> { - try { - String profileDb = advancedDataSource.getProfileName(); - MultiDBTransactionManager mdb = new MultiDBTransactionManager(profileDb, false); - + String profileDb = advancedDataSource.getProfileName(); + try (MultiDBTransactionManager mdb = new MultiDBTransactionManager(profileDb, false)) { for (HashMap stbFileAttached : stbFilesAttached) { final MediaType mimeType = mimeTypesHandler.getContentType((String) stbFileAttached.get("file_name")); @@ -41,7 +39,6 @@ public class Migration_20230411163906 extends BaseMigration implements Migration Query.format("UPDATE stb_files_attached SET mime_type = {} WHERE id_attach = {}", mimeType.toString(), stbFileAttached.get("id_attach"))); } mdb.commitAll(); - mdb.closeAll(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/ems-core/src/main/java/it/integry/ems/sync/MultiDBTransaction/BasicConnectionPool.java b/ems-core/src/main/java/it/integry/ems/sync/MultiDBTransaction/BasicConnectionPool.java index dceb558f04..681b54a0eb 100644 --- a/ems-core/src/main/java/it/integry/ems/sync/MultiDBTransaction/BasicConnectionPool.java +++ b/ems-core/src/main/java/it/integry/ems/sync/MultiDBTransaction/BasicConnectionPool.java @@ -5,106 +5,230 @@ import it.integry.ems.settings.Model.AvailableConnectionsModel; import it.integry.ems.settings.Model.SettingsModel; import it.integry.ems.sync.MultiDBTransaction.exception.UnexpectedConnectionSwitchException; import it.integry.ems.utility.UtilityDebug; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.sql.SQLException; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; @Component public class BasicConnectionPool { + private static final Logger logger = LogManager.getLogger(BasicConnectionPool.class); @Autowired private SettingsModel settingsModel; - private final Map> connectionPool = Collections.synchronizedMap(new HashMap<>()); - private final Map> usedConnections = Collections.synchronizedMap(new HashMap<>()); + private final Map> connectionPool = new ConcurrentHashMap<>(); + private final Map> usedConnections = new ConcurrentHashMap<>(); + private final Map extraConnectionCounters = new ConcurrentHashMap<>(); + private final Map> activeConnectionNames = new ConcurrentHashMap<>(); - public synchronized void init() throws Exception { + private final ReentrantLock poolLock = new ReentrantLock(); - int poolSize = - UtilityDebug.isIntegryServerMaster() ? 1 : - UtilityDebug.isIntegryServerDev() ? 2 : - UtilityDebug.isDebugExecution() ? 0 : - Runtime.getRuntime().availableProcessors(); + public void init() throws Exception { + poolLock.lock(); + try { + closeAllConnections(); + int poolSize = calculatePoolSize(); + initializeConnections(poolSize); + } finally { + poolLock.unlock(); + } + } - if (!settingsModel.isPrimaryInstance()) poolSize = 0; + private void closeAllConnections() { + poolLock.lock(); + try { + // Chiude tutte le connessioni esistenti + connectionPool.values().forEach(connections -> + connections.forEach(this::safeCloseConnection)); + usedConnections.values().forEach(connections -> + connections.forEach(this::safeCloseConnection)); - poolSize = Math.min(poolSize, 8); + // Pulisce le mappe + connectionPool.clear(); + usedConnections.clear(); + extraConnectionCounters.clear(); + activeConnectionNames.clear(); + } finally { + poolLock.unlock(); + } + } + private void safeCloseConnection(DataSource ds) { + try { + if (ds != null) { + ds.forceClose(); + } + } catch (Exception e) { + logger.error("Errore durante la chiusura della connessione", e); + } + } + + private int calculatePoolSize() { + int poolSize = UtilityDebug.isIntegryServerMaster() ? 1 : + UtilityDebug.isIntegryServerDev() ? 2 : + UtilityDebug.isDebugExecution() ? 0 : + Runtime.getRuntime().availableProcessors(); + + if (!settingsModel.isPrimaryInstance()) { + poolSize = 0; + } + + return Math.min(poolSize, 8); + } + + private void initializeConnections(int poolSize) throws Exception { final List availableConnections = settingsModel.getAvailableConnections(true); - for (final AvailableConnectionsModel availableConnectionsModel : availableConnections) { - synchronized (connectionPool) { - if (!connectionPool.containsKey(availableConnectionsModel.getProfileName())) - connectionPool.put(availableConnectionsModel.getProfileName(), Collections.synchronizedList(new ArrayList<>())); + for (final AvailableConnectionsModel model : availableConnections) { + String dbName = model.getDbName(); + connectionPool.putIfAbsent(dbName, Collections.synchronizedList(new ArrayList<>())); + usedConnections.putIfAbsent(dbName, Collections.synchronizedList(new ArrayList<>())); + activeConnectionNames.putIfAbsent(dbName, ConcurrentHashMap.newKeySet()); + extraConnectionCounters.putIfAbsent(dbName, new AtomicInteger(0)); - if (!usedConnections.containsKey(availableConnectionsModel.getProfileName())) - usedConnections.put(availableConnectionsModel.getProfileName(), Collections.synchronizedList(new ArrayList<>())); + for (int j = 0; j < poolSize; j++) { + DataSource ds = createNewDataSource(model.getProfileName(), j); + String connectionName = ds.getApplicationName(); - int createdConnections = connectionPool.get(availableConnectionsModel.getProfileName()).size() + - usedConnections.get(availableConnectionsModel.getProfileName()).size(); - - for (int j = 0; j < poolSize - createdConnections; j++) { - DataSource ds = new DataSource(); - ds.initialize(availableConnectionsModel.getProfileName(), "EMS Pool Connection #" + j, true); - connectionPool.get(availableConnectionsModel.getProfileName()).add(ds); + if (!activeConnectionNames.get(dbName).contains(connectionName)) { + connectionPool.get(dbName).add(ds); + activeConnectionNames.get(dbName).add(connectionName); + logger.trace("Created pool connection: {} for database: {}", connectionName, dbName); } } } } - - public synchronized DataSource getConnection(String profileName) throws Exception { - DataSource ds = null; - - synchronized (connectionPool) { - if (!connectionPool.containsKey(profileName) || connectionPool.get(profileName).isEmpty()) { - ds = createExtraConnection(profileName); - } else { - ds = connectionPool.get(profileName) - .remove(connectionPool.get(profileName).size() - 1); - - if (ds.isClosed()) ds.initialize(profileName, ds.getApplicationName(), true); - - usedConnections.get(profileName).add(ds); - } - } - + private DataSource createNewDataSource(String profileName, int index) throws Exception { + DataSource ds = new DataSource(); + ds.initialize(profileName, "EMS Pool Connection #" + index, true); return ds; } - public synchronized boolean releaseConnection(String profileName, DataSource ds) throws UnexpectedConnectionSwitchException, SQLException, IOException { - String currentCatalog = ds.getConnection().getCatalog(); - final AvailableConnectionsModel availableConnectionsModel = settingsModel.getAvailableConnections().stream() - .filter(x -> x.getProfileName().equalsIgnoreCase(profileName)) - .findFirst() - .orElse(null); + public DataSource getConnection(String profileName) throws Exception { + String dbName = settingsModel.getDbNameFromProfileDb(profileName); + poolLock.lock(); + try { + if (!connectionPool.containsKey(dbName) || connectionPool.get(dbName).isEmpty()) { + return createExtraConnection(profileName); + } - if (!currentCatalog.equalsIgnoreCase(availableConnectionsModel.getDbName())) { - usedConnections.get(profileName).remove(ds); - ds.forceClose(); - throw new UnexpectedConnectionSwitchException(availableConnectionsModel.getDbName(), currentCatalog); + List pool = connectionPool.get(dbName); + List used = usedConnections.get(dbName); + + DataSource ds = pool.remove(0); + + if (ds.isClosed()) { + String oldName = ds.getApplicationName(); + activeConnectionNames.get(dbName).remove(oldName); + + ds.initialize(profileName, oldName, true); + activeConnectionNames.get(dbName).add(oldName); + + logger.warn("Reinitializing closed connection: {} for database: {}", oldName, dbName); + } + + used.add(ds); + logger.trace("Assigned connection: {} for database: {}", ds.getApplicationName(), dbName); + return ds; + } finally { + poolLock.unlock(); + } + } + + public boolean releaseConnection(String profileName, DataSource ds) + throws UnexpectedConnectionSwitchException, SQLException, IOException { + if (ds == null) { + return false; } + poolLock.lock(); + try { + String currentCatalog = ds.getConnection().getCatalog(); + AvailableConnectionsModel model = findConnectionModel(profileName); - if (ds.getApplicationName().equals("EMS Connection")) { //Extra connection - ds.forceClose(); - return true; - } else { - synchronized (connectionPool) { - connectionPool.get(profileName).add(ds); - return usedConnections.get(profileName).remove(ds); + if (model == null) { + throw new IllegalArgumentException("Profile not found: " + profileName); } + + if (!currentCatalog.equalsIgnoreCase(model.getDbName())) { + handleCatalogMismatch(ds, model, currentCatalog); + return false; + } + + return handleConnectionRelease(ds, model); + } finally { + poolLock.unlock(); } } private DataSource createExtraConnection(String profileName) throws Exception { - DataSource ds = new DataSource(); - ds.initialize(profileName, "EMS Connection", true); + String dbName = settingsModel.getDbNameFromProfileDb(profileName); + poolLock.lock(); + try { + // Calcola il numero totale di connessioni esistenti + int baseCount = connectionPool.get(dbName).size() + usedConnections.get(dbName).size(); + int extraCount = extraConnectionCounters.get(dbName).incrementAndGet(); - return ds; + String connectionName = "EMS Extra Connection #" + (baseCount + extraCount); + + // Verifica se la connessione esiste giĆ  + if (activeConnectionNames.get(dbName).contains(connectionName)) { + logger.warn("Extra connection name collision detected: {}", connectionName); + extraConnectionCounters.get(dbName).decrementAndGet(); + return createExtraConnection(profileName); // Riprova con un nuovo nome + } + + DataSource ds = new DataSource(); + ds.initialize(profileName, connectionName, true); + activeConnectionNames.get(dbName).add(connectionName); + + logger.trace("Created extra connection: {} for database: {}", connectionName, dbName); + return ds; + } finally { + poolLock.unlock(); + } } -} + private AvailableConnectionsModel findConnectionModel(String profileName) { + return settingsModel.getAvailableConnections().stream() + .filter(x -> x.getProfileName().equalsIgnoreCase(profileName)) + .findFirst() + .orElse(null); + } + + private void handleCatalogMismatch(DataSource ds, AvailableConnectionsModel model, String currentCatalog) + throws UnexpectedConnectionSwitchException, SQLException, IOException { + String connectionName = ds.getApplicationName(); + usedConnections.get(model.getDbName()).remove(ds); + activeConnectionNames.get(model.getDbName()).remove(connectionName); + ds.forceClose(); + throw new UnexpectedConnectionSwitchException(model.getDbName(), currentCatalog); + } + + private boolean handleConnectionRelease(DataSource ds, AvailableConnectionsModel model) + throws SQLException { + String connectionName = ds.getApplicationName(); + String dbName = model.getDbName(); + + if (connectionName.startsWith("EMS Extra Connection #")) { + activeConnectionNames.get(dbName).remove(connectionName); + ds.forceClose(); + logger.trace("Closed extra connection: {} for database: {}", connectionName, dbName); + return true; + } + + connectionPool.get(dbName).add(ds); + boolean removed = usedConnections.get(dbName).remove(ds); + logger.trace("Released connection: {} for database: {}", connectionName, dbName); + return removed; + } +} \ No newline at end of file diff --git a/ems-core/src/main/java/it/integry/ems/sync/MultiDBTransaction/MultiDBTransactionManager.java b/ems-core/src/main/java/it/integry/ems/sync/MultiDBTransaction/MultiDBTransactionManager.java index 09ed364ceb..b1c4ebd3b9 100644 --- a/ems-core/src/main/java/it/integry/ems/sync/MultiDBTransaction/MultiDBTransactionManager.java +++ b/ems-core/src/main/java/it/integry/ems/sync/MultiDBTransaction/MultiDBTransactionManager.java @@ -233,19 +233,17 @@ public class MultiDBTransactionManager implements AutoCloseable { } public void closeAll() throws Exception { - if (dbDatasources != null && !dbDatasources.isEmpty()) + if (dbDatasources != null && !dbDatasources.isEmpty()) { for (AdvancedDataSource advancedDataSource : dbDatasources) { - if (!advancedDataSource.isClosed()) { - if (requestData != null && requestData.getRequestURI() != null) { - String methodName = " [" + requestData.getRequestURI() + "]"; - if (shouldPrintLog(requestData.getRequestURI())) - logger.debug("Closing automatically: " + advancedDataSource.getDataSource().getProfile() + " (#" + advancedDataSource.getDataSource().getSessionID() + ")" + methodName); - } else if (enableLog) { - logger.debug("Closing manually: " + advancedDataSource.getDataSource().getProfile() + " (#" + advancedDataSource.getDataSource().getSessionID() + ")"); - } - advancedDataSource.commit(); + if (requestData != null && requestData.getRequestURI() != null) { + String methodName = " [" + requestData.getRequestURI() + "]"; + if (shouldPrintLog(requestData.getRequestURI())) + logger.debug("Closing automatically: " + advancedDataSource.getDataSource().getProfile() + " (#" + advancedDataSource.getDataSource().getSessionID() + ")" + methodName); + } else if (enableLog) { + logger.debug("Closing manually: " + advancedDataSource.getDataSource().getProfile() + " (#" + advancedDataSource.getDataSource().getSessionID() + ")"); } + advancedDataSource.commit(); try { connectionPool.releaseConnection(advancedDataSource.getProfileName(), advancedDataSource.getDataSource()); @@ -253,6 +251,9 @@ public class MultiDBTransactionManager implements AutoCloseable { logger.error("UnexpectedConnectionSwitchException: " + advancedDataSource.getProfileName(), e); } } + + dbDatasources.clear(); + } }