IdDatabase.java
/*
* Copyright © 2023 Mark Raynsford <code@io7m.com> https://www.io7m.com
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
* SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR
* IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
package com.io7m.idstore.database.postgres.internal;
import com.io7m.idstore.database.api.IdDatabaseConfiguration;
import com.io7m.idstore.database.api.IdDatabaseConnectionType;
import com.io7m.idstore.database.api.IdDatabaseException;
import com.io7m.idstore.database.api.IdDatabaseRole;
import com.io7m.idstore.database.api.IdDatabaseTelemetry;
import com.io7m.idstore.database.api.IdDatabaseType;
import com.io7m.jmulticlose.core.CloseableCollectionType;
import com.zaxxer.hikari.HikariDataSource;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import org.jooq.conf.RenderNameCase;
import org.jooq.conf.Settings;
import java.sql.SQLException;
import java.time.Clock;
import java.time.OffsetDateTime;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import static com.io7m.idstore.error_codes.IdStandardErrorCodes.SQL_ERROR;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_SYSTEM;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DbSystemValues.POSTGRESQL;
import static java.lang.Math.max;
import static java.util.Objects.requireNonNullElse;
/**
* The default postgres server database implementation.
*/
public final class IdDatabase implements IdDatabaseType
{
private final Clock clock;
private final CloseableCollectionType<IdDatabaseException> resources;
private final ConcurrentLinkedQueue<Long> connectionTimes;
private final HikariDataSource dataSource;
private final IdDatabaseConfiguration configuration;
private final IdDatabaseTelemetry telemetry;
private final LongCounter transactionCommits;
private final LongCounter transactionRollbacks;
private final LongCounter transactions;
private final Settings settings;
private final Tracer tracer;
/**
* The default postgres server database implementation.
*
* @param inTelemetry A telemetry
* @param inConfiguration The configuration
* @param inClock The clock
* @param inDataSource A pooled data source
* @param inResources The resources to be closed
*/
public IdDatabase(
final IdDatabaseTelemetry inTelemetry,
final IdDatabaseConfiguration inConfiguration,
final Clock inClock,
final HikariDataSource inDataSource,
final CloseableCollectionType<IdDatabaseException> inResources)
{
this.telemetry =
Objects.requireNonNull(inTelemetry, "telemetry");
this.configuration =
Objects.requireNonNull(inConfiguration, "inConfiguration");
this.tracer =
inTelemetry.tracer();
this.resources =
Objects.requireNonNull(inResources, "resources");
this.clock =
Objects.requireNonNull(inClock, "clock");
this.dataSource =
Objects.requireNonNull(inDataSource, "dataSource");
this.settings =
new Settings().withRenderNameCase(RenderNameCase.LOWER);
final var dataSourceBean =
this.dataSource.getHikariPoolMXBean();
final var meter =
inTelemetry.meter();
this.transactions =
meter.counterBuilder("idstore_db_transactions")
.setDescription("The number of completed transactions.")
.build();
this.transactionCommits =
meter.counterBuilder("idstore_db_commits")
.setDescription("The number of database transaction commits.")
.build();
this.transactionRollbacks =
meter.counterBuilder("idstore_db_rollbacks")
.setDescription("The number of database transaction rollbacks.")
.build();
this.connectionTimes =
new ConcurrentLinkedQueue<>();
this.resources.add(
meter.gaugeBuilder("idstore_db_connection_time")
.setDescription("The amount of time a database connection is held.")
.ofLongs()
.buildWithCallback(measurement -> {
measurement.record(maxOf(this.connectionTimes));
})
);
this.resources.add(
meter.gaugeBuilder("idstore_db_connections_active")
.setDescription("Number of active database connections.")
.ofLongs()
.buildWithCallback(measurement -> {
measurement.record(
Integer.toUnsignedLong(dataSourceBean.getActiveConnections())
);
})
);
this.resources.add(
meter.gaugeBuilder("idstore_db_connections_idle")
.setDescription("Number of idle database connections.")
.ofLongs()
.buildWithCallback(measurement -> {
measurement.record(
Integer.toUnsignedLong(dataSourceBean.getIdleConnections())
);
})
);
this.resources.add(
meter.gaugeBuilder("idstore_db_connections_total")
.setDescription("Total number of database connections.")
.ofLongs()
.buildWithCallback(measurement -> {
measurement.record(
Integer.toUnsignedLong(dataSourceBean.getTotalConnections())
);
})
);
this.resources.add(
meter.gaugeBuilder("idstore_db_threads_waiting")
.setDescription("Number of threads waiting for connections.")
.ofLongs()
.buildWithCallback(measurement -> {
measurement.record(
Integer.toUnsignedLong(dataSourceBean.getThreadsAwaitingConnection())
);
})
);
}
private static long maxOf(
final ConcurrentLinkedQueue<Long> times)
{
var time = 0L;
while (!times.isEmpty()) {
time = max(time, times.poll().longValue());
}
return time;
}
LongCounter counterTransactions()
{
return this.transactions;
}
LongCounter counterTransactionCommits()
{
return this.transactionCommits;
}
LongCounter counterTransactionRollbacks()
{
return this.transactionRollbacks;
}
@Override
public void close()
throws IdDatabaseException
{
this.resources.close();
}
@Override
public IdDatabaseConfiguration configuration()
{
return this.configuration;
}
/**
* @return The OpenTelemetry tracer
*/
public Tracer tracer()
{
return this.tracer;
}
@Override
public IdDatabaseConnectionType openConnection(
final IdDatabaseRole role)
throws IdDatabaseException
{
final var span =
this.tracer
.spanBuilder("IdDatabaseConnection")
.setSpanKind(SpanKind.SERVER)
.setAttribute(DB_SYSTEM, POSTGRESQL)
.startSpan();
try {
span.addEvent("RequestConnection");
final var conn = this.dataSource.getConnection();
span.addEvent("ObtainedConnection");
final var timeNow = OffsetDateTime.now();
conn.setAutoCommit(false);
return new IdDatabaseConnection(this, conn, timeNow, role, span);
} catch (final SQLException e) {
span.recordException(e);
span.end();
throw new IdDatabaseException(
requireNonNullElse(e.getMessage(), e.getClass().getSimpleName()),
e,
SQL_ERROR,
Map.of(),
Optional.empty()
);
}
}
/**
* @return The jooq SQL settings
*/
Settings settings()
{
return this.settings;
}
/**
* @return The clock used for time-related queries
*/
Clock clock()
{
return this.clock;
}
@Override
public String description()
{
return "Server database service.";
}
@Override
public String toString()
{
return "[IdDatabase 0x%s]"
.formatted(Long.toUnsignedString(this.hashCode(), 16));
}
void setConnectionTimeNow(
final long nanos)
{
if (!this.telemetry.isNoOp()) {
this.connectionTimes.add(Long.valueOf(nanos));
}
}
}