Cambiata gestione della pool di connessioni

This commit is contained in:
2025-01-21 18:44:21 +01:00
parent 33f1528f67
commit 49b69555ac
4 changed files with 203 additions and 88 deletions

View File

@@ -337,19 +337,12 @@ public class EmsController {
} catch (Exception e) {
logger.error(request.getRequestURI(), e);
response = new ServiceRestResponse(EsitoType.KO, e, multiDBTransactionManager.getPrimaryDatasource().getProfile(), entity);
} finally {
try {
multiDBTransactionManager.closeAll();
} catch (Exception e) {
logger.error(request.getRequestURI(), e);
response = new ServiceRestResponse(EsitoType.KO, e, multiDBTransactionManager.getPrimaryDatasource().getProfile(), entity);
}
}
if (entity.getOperation() == OperationType.SELECT_OBJECT) {
return obj;
} else {
return response;
}
if (entity.getOperation() == OperationType.SELECT_OBJECT) {
return obj;
} else {
return response;
}
}

View File

@@ -30,10 +30,8 @@ public class Migration_20230411163906 extends BaseMigration implements Migration
"WHERE mime_type IS NULL");
new Thread(() -> {
try {
String profileDb = advancedDataSource.getProfileName();
MultiDBTransactionManager mdb = new MultiDBTransactionManager(profileDb, false);
String profileDb = advancedDataSource.getProfileName();
try (MultiDBTransactionManager mdb = new MultiDBTransactionManager(profileDb, false)) {
for (HashMap<String, Object> stbFileAttached : stbFilesAttached) {
final MediaType mimeType = mimeTypesHandler.getContentType((String) stbFileAttached.get("file_name"));
@@ -41,7 +39,6 @@ public class Migration_20230411163906 extends BaseMigration implements Migration
Query.format("UPDATE stb_files_attached SET mime_type = {} WHERE id_attach = {}", mimeType.toString(), stbFileAttached.get("id_attach")));
}
mdb.commitAll();
mdb.closeAll();
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@@ -5,106 +5,230 @@ import it.integry.ems.settings.Model.AvailableConnectionsModel;
import it.integry.ems.settings.Model.SettingsModel;
import it.integry.ems.sync.MultiDBTransaction.exception.UnexpectedConnectionSwitchException;
import it.integry.ems.utility.UtilityDebug;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@Component
public class BasicConnectionPool {
private static final Logger logger = LogManager.getLogger(BasicConnectionPool.class);
@Autowired
private SettingsModel settingsModel;
private final Map<String, List<DataSource>> connectionPool = Collections.synchronizedMap(new HashMap<>());
private final Map<String, List<DataSource>> usedConnections = Collections.synchronizedMap(new HashMap<>());
private final Map<String, List<DataSource>> connectionPool = new ConcurrentHashMap<>();
private final Map<String, List<DataSource>> usedConnections = new ConcurrentHashMap<>();
private final Map<String, AtomicInteger> extraConnectionCounters = new ConcurrentHashMap<>();
private final Map<String, Set<String>> activeConnectionNames = new ConcurrentHashMap<>();
public synchronized void init() throws Exception {
private final ReentrantLock poolLock = new ReentrantLock();
int poolSize =
UtilityDebug.isIntegryServerMaster() ? 1 :
UtilityDebug.isIntegryServerDev() ? 2 :
UtilityDebug.isDebugExecution() ? 0 :
Runtime.getRuntime().availableProcessors();
public void init() throws Exception {
poolLock.lock();
try {
closeAllConnections();
int poolSize = calculatePoolSize();
initializeConnections(poolSize);
} finally {
poolLock.unlock();
}
}
if (!settingsModel.isPrimaryInstance()) poolSize = 0;
private void closeAllConnections() {
poolLock.lock();
try {
// Chiude tutte le connessioni esistenti
connectionPool.values().forEach(connections ->
connections.forEach(this::safeCloseConnection));
usedConnections.values().forEach(connections ->
connections.forEach(this::safeCloseConnection));
poolSize = Math.min(poolSize, 8);
// Pulisce le mappe
connectionPool.clear();
usedConnections.clear();
extraConnectionCounters.clear();
activeConnectionNames.clear();
} finally {
poolLock.unlock();
}
}
private void safeCloseConnection(DataSource ds) {
try {
if (ds != null) {
ds.forceClose();
}
} catch (Exception e) {
logger.error("Errore durante la chiusura della connessione", e);
}
}
private int calculatePoolSize() {
int poolSize = UtilityDebug.isIntegryServerMaster() ? 1 :
UtilityDebug.isIntegryServerDev() ? 2 :
UtilityDebug.isDebugExecution() ? 0 :
Runtime.getRuntime().availableProcessors();
if (!settingsModel.isPrimaryInstance()) {
poolSize = 0;
}
return Math.min(poolSize, 8);
}
private void initializeConnections(int poolSize) throws Exception {
final List<AvailableConnectionsModel> availableConnections = settingsModel.getAvailableConnections(true);
for (final AvailableConnectionsModel availableConnectionsModel : availableConnections) {
synchronized (connectionPool) {
if (!connectionPool.containsKey(availableConnectionsModel.getProfileName()))
connectionPool.put(availableConnectionsModel.getProfileName(), Collections.synchronizedList(new ArrayList<>()));
for (final AvailableConnectionsModel model : availableConnections) {
String dbName = model.getDbName();
connectionPool.putIfAbsent(dbName, Collections.synchronizedList(new ArrayList<>()));
usedConnections.putIfAbsent(dbName, Collections.synchronizedList(new ArrayList<>()));
activeConnectionNames.putIfAbsent(dbName, ConcurrentHashMap.newKeySet());
extraConnectionCounters.putIfAbsent(dbName, new AtomicInteger(0));
if (!usedConnections.containsKey(availableConnectionsModel.getProfileName()))
usedConnections.put(availableConnectionsModel.getProfileName(), Collections.synchronizedList(new ArrayList<>()));
for (int j = 0; j < poolSize; j++) {
DataSource ds = createNewDataSource(model.getProfileName(), j);
String connectionName = ds.getApplicationName();
int createdConnections = connectionPool.get(availableConnectionsModel.getProfileName()).size() +
usedConnections.get(availableConnectionsModel.getProfileName()).size();
for (int j = 0; j < poolSize - createdConnections; j++) {
DataSource ds = new DataSource();
ds.initialize(availableConnectionsModel.getProfileName(), "EMS Pool Connection #" + j, true);
connectionPool.get(availableConnectionsModel.getProfileName()).add(ds);
if (!activeConnectionNames.get(dbName).contains(connectionName)) {
connectionPool.get(dbName).add(ds);
activeConnectionNames.get(dbName).add(connectionName);
logger.trace("Created pool connection: {} for database: {}", connectionName, dbName);
}
}
}
}
public synchronized DataSource getConnection(String profileName) throws Exception {
DataSource ds = null;
synchronized (connectionPool) {
if (!connectionPool.containsKey(profileName) || connectionPool.get(profileName).isEmpty()) {
ds = createExtraConnection(profileName);
} else {
ds = connectionPool.get(profileName)
.remove(connectionPool.get(profileName).size() - 1);
if (ds.isClosed()) ds.initialize(profileName, ds.getApplicationName(), true);
usedConnections.get(profileName).add(ds);
}
}
private DataSource createNewDataSource(String profileName, int index) throws Exception {
DataSource ds = new DataSource();
ds.initialize(profileName, "EMS Pool Connection #" + index, true);
return ds;
}
public synchronized boolean releaseConnection(String profileName, DataSource ds) throws UnexpectedConnectionSwitchException, SQLException, IOException {
String currentCatalog = ds.getConnection().getCatalog();
final AvailableConnectionsModel availableConnectionsModel = settingsModel.getAvailableConnections().stream()
.filter(x -> x.getProfileName().equalsIgnoreCase(profileName))
.findFirst()
.orElse(null);
public DataSource getConnection(String profileName) throws Exception {
String dbName = settingsModel.getDbNameFromProfileDb(profileName);
poolLock.lock();
try {
if (!connectionPool.containsKey(dbName) || connectionPool.get(dbName).isEmpty()) {
return createExtraConnection(profileName);
}
if (!currentCatalog.equalsIgnoreCase(availableConnectionsModel.getDbName())) {
usedConnections.get(profileName).remove(ds);
ds.forceClose();
throw new UnexpectedConnectionSwitchException(availableConnectionsModel.getDbName(), currentCatalog);
List<DataSource> pool = connectionPool.get(dbName);
List<DataSource> used = usedConnections.get(dbName);
DataSource ds = pool.remove(0);
if (ds.isClosed()) {
String oldName = ds.getApplicationName();
activeConnectionNames.get(dbName).remove(oldName);
ds.initialize(profileName, oldName, true);
activeConnectionNames.get(dbName).add(oldName);
logger.warn("Reinitializing closed connection: {} for database: {}", oldName, dbName);
}
used.add(ds);
logger.trace("Assigned connection: {} for database: {}", ds.getApplicationName(), dbName);
return ds;
} finally {
poolLock.unlock();
}
}
public boolean releaseConnection(String profileName, DataSource ds)
throws UnexpectedConnectionSwitchException, SQLException, IOException {
if (ds == null) {
return false;
}
poolLock.lock();
try {
String currentCatalog = ds.getConnection().getCatalog();
AvailableConnectionsModel model = findConnectionModel(profileName);
if (ds.getApplicationName().equals("EMS Connection")) { //Extra connection
ds.forceClose();
return true;
} else {
synchronized (connectionPool) {
connectionPool.get(profileName).add(ds);
return usedConnections.get(profileName).remove(ds);
if (model == null) {
throw new IllegalArgumentException("Profile not found: " + profileName);
}
if (!currentCatalog.equalsIgnoreCase(model.getDbName())) {
handleCatalogMismatch(ds, model, currentCatalog);
return false;
}
return handleConnectionRelease(ds, model);
} finally {
poolLock.unlock();
}
}
private DataSource createExtraConnection(String profileName) throws Exception {
DataSource ds = new DataSource();
ds.initialize(profileName, "EMS Connection", true);
String dbName = settingsModel.getDbNameFromProfileDb(profileName);
poolLock.lock();
try {
// Calcola il numero totale di connessioni esistenti
int baseCount = connectionPool.get(dbName).size() + usedConnections.get(dbName).size();
int extraCount = extraConnectionCounters.get(dbName).incrementAndGet();
return ds;
String connectionName = "EMS Extra Connection #" + (baseCount + extraCount);
// Verifica se la connessione esiste già
if (activeConnectionNames.get(dbName).contains(connectionName)) {
logger.warn("Extra connection name collision detected: {}", connectionName);
extraConnectionCounters.get(dbName).decrementAndGet();
return createExtraConnection(profileName); // Riprova con un nuovo nome
}
DataSource ds = new DataSource();
ds.initialize(profileName, connectionName, true);
activeConnectionNames.get(dbName).add(connectionName);
logger.trace("Created extra connection: {} for database: {}", connectionName, dbName);
return ds;
} finally {
poolLock.unlock();
}
}
}
private AvailableConnectionsModel findConnectionModel(String profileName) {
return settingsModel.getAvailableConnections().stream()
.filter(x -> x.getProfileName().equalsIgnoreCase(profileName))
.findFirst()
.orElse(null);
}
private void handleCatalogMismatch(DataSource ds, AvailableConnectionsModel model, String currentCatalog)
throws UnexpectedConnectionSwitchException, SQLException, IOException {
String connectionName = ds.getApplicationName();
usedConnections.get(model.getDbName()).remove(ds);
activeConnectionNames.get(model.getDbName()).remove(connectionName);
ds.forceClose();
throw new UnexpectedConnectionSwitchException(model.getDbName(), currentCatalog);
}
private boolean handleConnectionRelease(DataSource ds, AvailableConnectionsModel model)
throws SQLException {
String connectionName = ds.getApplicationName();
String dbName = model.getDbName();
if (connectionName.startsWith("EMS Extra Connection #")) {
activeConnectionNames.get(dbName).remove(connectionName);
ds.forceClose();
logger.trace("Closed extra connection: {} for database: {}", connectionName, dbName);
return true;
}
connectionPool.get(dbName).add(ds);
boolean removed = usedConnections.get(dbName).remove(ds);
logger.trace("Released connection: {} for database: {}", connectionName, dbName);
return removed;
}
}

View File

@@ -233,19 +233,17 @@ public class MultiDBTransactionManager implements AutoCloseable {
}
public void closeAll() throws Exception {
if (dbDatasources != null && !dbDatasources.isEmpty())
if (dbDatasources != null && !dbDatasources.isEmpty()) {
for (AdvancedDataSource advancedDataSource : dbDatasources) {
if (!advancedDataSource.isClosed()) {
if (requestData != null && requestData.getRequestURI() != null) {
String methodName = " [" + requestData.getRequestURI() + "]";
if (shouldPrintLog(requestData.getRequestURI()))
logger.debug("Closing automatically: " + advancedDataSource.getDataSource().getProfile() + " (#" + advancedDataSource.getDataSource().getSessionID() + ")" + methodName);
} else if (enableLog) {
logger.debug("Closing manually: " + advancedDataSource.getDataSource().getProfile() + " (#" + advancedDataSource.getDataSource().getSessionID() + ")");
}
advancedDataSource.commit();
if (requestData != null && requestData.getRequestURI() != null) {
String methodName = " [" + requestData.getRequestURI() + "]";
if (shouldPrintLog(requestData.getRequestURI()))
logger.debug("Closing automatically: " + advancedDataSource.getDataSource().getProfile() + " (#" + advancedDataSource.getDataSource().getSessionID() + ")" + methodName);
} else if (enableLog) {
logger.debug("Closing manually: " + advancedDataSource.getDataSource().getProfile() + " (#" + advancedDataSource.getDataSource().getSessionID() + ")");
}
advancedDataSource.commit();
try {
connectionPool.releaseConnection(advancedDataSource.getProfileName(), advancedDataSource.getDataSource());
@@ -253,6 +251,9 @@ public class MultiDBTransactionManager implements AutoCloseable {
logger.error("UnexpectedConnectionSwitchException: " + advancedDataSource.getProfileName(), e);
}
}
dbDatasources.clear();
}
}