aboutsummaryrefslogtreecommitdiffstats
path: root/crates/sh-util
diff options
context:
space:
mode:
Diffstat (limited to 'crates/sh-util')
-rw-r--r--crates/sh-util/Cargo.toml26
-rw-r--r--crates/sh-util/src/cache/cluster.rs56
-rw-r--r--crates/sh-util/src/cache/mod.rs176
-rw-r--r--crates/sh-util/src/cache/sentinel.rs66
-rw-r--r--crates/sh-util/src/lib.rs2
5 files changed, 326 insertions, 0 deletions
diff --git a/crates/sh-util/Cargo.toml b/crates/sh-util/Cargo.toml
new file mode 100644
index 0000000..12bf7a4
--- /dev/null
+++ b/crates/sh-util/Cargo.toml
@@ -0,0 +1,26 @@
+[package]
+name = "sh-util"
+version = "0.0.0"
+edition = "2024"
+license.workspace = true
+readme.workspace = true
+documentation.workspace = true
+homepage.workspace = true
+
+[dependencies]
+bb8 = { version = "0.9.1", optional = true }
+futures-util = { workspace = true, optional = true }
+redis = { workspace = true, optional = true }
+serde = { workspace = true, features = ["derive"] }
+
+[features]
+cache = [
+ "dep:redis",
+ "redis/cluster-async",
+ "redis/connection-manager",
+ "redis/tokio-comp",
+ "redis/sentinel",
+ "redis/bb8",
+ "dep:bb8",
+ "dep:futures-util",
+]
diff --git a/crates/sh-util/src/cache/cluster.rs b/crates/sh-util/src/cache/cluster.rs
new file mode 100644
index 0000000..de13629
--- /dev/null
+++ b/crates/sh-util/src/cache/cluster.rs
@@ -0,0 +1,56 @@
+use redis::{
+ ErrorKind, FromRedisValue, IntoConnectionInfo, RedisError,
+ cluster::{ClusterClient, ClusterClientBuilder},
+ cluster_routing::{MultipleNodeRoutingInfo, ResponsePolicy, RoutingInfo},
+};
+
+/// ConnectionManager that implements `bb8::ManageConnection` and supports
+/// asynchronous clustered connections via `redis_cluster_async::Connection`
+#[derive(Clone)]
+pub struct RedisClusterConnectionManager {
+ client: ClusterClient,
+}
+
+impl RedisClusterConnectionManager {
+ pub fn new<T: IntoConnectionInfo>(
+ info: T,
+ ) -> Result<RedisClusterConnectionManager, RedisError> {
+ Ok(RedisClusterConnectionManager {
+ client: ClusterClientBuilder::new(vec![info]).retries(0).build()?,
+ })
+ }
+}
+
+impl bb8::ManageConnection for RedisClusterConnectionManager {
+ type Connection = redis::cluster_async::ClusterConnection;
+ type Error = RedisError;
+
+ async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+ self.client.get_async_connection().await
+ }
+
+ async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
+ let pong = conn
+ .route_command(
+ redis::cmd("PING"),
+ RoutingInfo::MultiNode((
+ MultipleNodeRoutingInfo::AllMasters,
+ Some(ResponsePolicy::OneSucceeded),
+ )),
+ )
+ .await
+ .and_then(|v| Ok(String::from_redis_value(v)?))?;
+ match pong.as_str() {
+ "PONG" => Ok(()),
+ _ => Err((
+ ErrorKind::Server(redis::ServerErrorKind::ResponseError),
+ "ping request",
+ )
+ .into()),
+ }
+ }
+
+ fn has_broken(&self, _: &mut Self::Connection) -> bool {
+ false
+ }
+}
diff --git a/crates/sh-util/src/cache/mod.rs b/crates/sh-util/src/cache/mod.rs
new file mode 100644
index 0000000..67a5121
--- /dev/null
+++ b/crates/sh-util/src/cache/mod.rs
@@ -0,0 +1,176 @@
+mod cluster;
+mod sentinel;
+pub use sentinel::SentinelConfig;
+
+use std::{sync::Arc, time::Duration};
+
+use bb8::RunError;
+// use bb8_redis::RedisConnectionManager;
+use futures_util::lock::Mutex;
+use redis::{
+ AsyncConnectionConfig, ProtocolVersion, RedisConnectionInfo, RedisError, TlsMode,
+ aio::ConnectionManagerConfig, sentinel::SentinelNodeConnectionInfo,
+};
+
+pub use self::cluster::RedisClusterConnectionManager;
+
+pub const REDIS_CONN_TIMEOUT: Duration = Duration::from_secs(2);
+
+pub enum RedisVariant {
+ Clustered,
+ NonClustered,
+ Sentinel(sentinel::SentinelConfig),
+}
+
+#[derive(Clone)]
+pub enum RedisManager {
+ Clustered(redis::cluster_async::ClusterConnection),
+ NonClustered(redis::aio::ConnectionManager),
+ Sentinel(Arc<Mutex<redis::sentinel::SentinelClient>>),
+}
+
+impl RedisManager {
+ pub async fn new(dsn: &str, variant: RedisVariant) -> Self {
+ match variant {
+ RedisVariant::Clustered => {
+ let cli = redis::cluster::ClusterClient::builder(vec![dsn])
+ .retries(1)
+ .connection_timeout(REDIS_CONN_TIMEOUT)
+ .build()
+ .expect("Error initializing redis-unpooled cluster client");
+ let con = cli
+ .get_async_connection()
+ .await
+ .expect("Failed to get redis-cluster-unpooled connection");
+ RedisManager::Clustered(con)
+ }
+ RedisVariant::NonClustered => {
+ let cli =
+ redis::Client::open(dsn).expect("Error initializing redis unpooled client");
+ let con = redis::aio::ConnectionManager::new_with_config(
+ cli,
+ ConnectionManagerConfig::new()
+ .set_number_of_retries(1)
+ .set_connection_timeout(Some(REDIS_CONN_TIMEOUT)),
+ )
+ .await
+ .expect("Failed to get redis-unpooled connection manager");
+ RedisManager::NonClustered(con)
+ }
+ RedisVariant::Sentinel(cfg) => {
+ let tls_mode = if cfg.redis_tls_mode_secure {
+ TlsMode::Secure
+ } else {
+ TlsMode::Insecure
+ };
+ let protocol = if cfg.redis_use_resp3 {
+ ProtocolVersion::RESP3
+ } else {
+ ProtocolVersion::default()
+ };
+
+ let redis_connection_info = RedisConnectionInfo::default()
+ .set_db(cfg.redis_db.unwrap_or(0))
+ .set_protocol(protocol)
+ .set_username(cfg.redis_username.clone())
+ .set_password(cfg.redis_password.clone());
+ let sentinel = SentinelNodeConnectionInfo::default()
+ .set_redis_connection_info(redis_connection_info)
+ .set_tls_mode(tls_mode);
+
+ let cli = redis::sentinel::SentinelClient::build(
+ vec![dsn],
+ cfg.service_name.clone(),
+ Some(sentinel),
+ redis::sentinel::SentinelServerType::Master,
+ )
+ .expect("Failed to build sentinel client");
+
+ RedisManager::Sentinel(Arc::new(Mutex::new(cli)))
+ }
+ }
+ }
+
+ pub async fn get(&self) -> Result<RedisConnection, RunError<RedisError>> {
+ match self {
+ Self::Clustered(conn) => Ok(RedisConnection::Clustered(conn.clone())),
+ Self::NonClustered(conn) => Ok(RedisConnection::NonClustere(conn.clone())),
+ Self::Sentinel(conn) => {
+ let mut conn = conn.lock().await;
+ let con = conn
+ .get_async_connection_with_config(
+ &AsyncConnectionConfig::new()
+ .set_response_timeout(Some(REDIS_CONN_TIMEOUT)),
+ )
+ .await?;
+ Ok(RedisConnection::Sentinel(con))
+ }
+ }
+ }
+}
+
+pub enum RedisConnection {
+ Clustered(redis::cluster_async::ClusterConnection),
+ NonClustere(redis::aio::ConnectionManager),
+ Sentinel(redis::aio::MultiplexedConnection),
+}
+
+impl redis::aio::ConnectionLike for RedisConnection {
+ fn req_packed_command<'a>(
+ &'a mut self,
+ cmd: &'a redis::Cmd,
+ ) -> redis::RedisFuture<'a, redis::Value> {
+ match self {
+ RedisConnection::Clustered(conn) => conn.req_packed_command(cmd),
+ RedisConnection::NonClustere(conn) => conn.req_packed_command(cmd),
+ RedisConnection::Sentinel(conn) => conn.req_packed_command(cmd),
+ }
+ }
+
+ fn req_packed_commands<'a>(
+ &'a mut self,
+ cmd: &'a redis::Pipeline,
+ offset: usize,
+ count: usize,
+ ) -> redis::RedisFuture<'a, Vec<redis::Value>> {
+ match self {
+ RedisConnection::Clustered(conn) => conn.req_packed_commands(cmd, offset, count),
+ RedisConnection::NonClustere(conn) => conn.req_packed_commands(cmd, offset, count),
+ RedisConnection::Sentinel(conn) => conn.req_packed_commands(cmd, offset, count),
+ }
+ }
+
+ fn get_db(&self) -> i64 {
+ match self {
+ RedisConnection::Clustered(conn) => conn.get_db(),
+ RedisConnection::NonClustere(conn) => conn.get_db(),
+ RedisConnection::Sentinel(conn) => conn.get_db(),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use redis::AsyncCommands;
+
+ use super::RedisManager;
+
+ // Ensure basic set/get works -- should test sharding as well:
+ #[tokio::test]
+ // run with `cargo test -- --ignored redis` only when redis is up and configured
+ #[ignore]
+ async fn test_set_read_random_keys() {
+ let mgr = RedisManager::new(
+ "redis://127.0.0.1:6379/0",
+ super::RedisVariant::NonClustered,
+ )
+ .await;
+ let mut conn = mgr.get().await.unwrap();
+
+ for (val, key) in "abcdefghijklmnopqrstuvwxyz".chars().enumerate() {
+ let key = key.to_string();
+ let _: () = conn.set(key.clone(), val).await.unwrap();
+ assert_eq!(conn.get::<_, usize>(&key).await.unwrap(), val);
+ }
+ }
+}
diff --git a/crates/sh-util/src/cache/sentinel.rs b/crates/sh-util/src/cache/sentinel.rs
new file mode 100644
index 0000000..e52b043
--- /dev/null
+++ b/crates/sh-util/src/cache/sentinel.rs
@@ -0,0 +1,66 @@
+use futures_util::lock::Mutex;
+use redis::{
+ ErrorKind, IntoConnectionInfo, RedisError,
+ sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType},
+};
+use serde::Deserialize;
+
+struct LockedSentinelClient(pub(crate) Mutex<SentinelClient>);
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct SentinelConfig {
+ pub service_name: String,
+ pub redis_tls_mode_secure: bool,
+ pub redis_db: Option<i64>,
+ pub redis_username: String,
+ pub redis_password: String,
+ pub redis_use_resp3: bool,
+}
+
+/// ConnectionManager that implements `bb8::ManageConnection` and supports
+/// asynchronous Sentinel connections via `redis::sentinel::SentinelClient`
+pub struct RedisSentinelConnectionManager {
+ client: LockedSentinelClient,
+}
+
+impl RedisSentinelConnectionManager {
+ pub fn new<T: IntoConnectionInfo>(
+ info: Vec<T>,
+ service_name: String,
+ node_connection_info: Option<SentinelNodeConnectionInfo>,
+ ) -> Result<RedisSentinelConnectionManager, RedisError> {
+ Ok(RedisSentinelConnectionManager {
+ client: LockedSentinelClient(Mutex::new(SentinelClient::build(
+ info,
+ service_name,
+ node_connection_info,
+ SentinelServerType::Master,
+ )?)),
+ })
+ }
+}
+
+impl bb8::ManageConnection for RedisSentinelConnectionManager {
+ type Connection = redis::aio::MultiplexedConnection;
+ type Error = RedisError;
+
+ async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+ self.client.0.lock().await.get_async_connection().await
+ }
+
+ async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
+ let pong: String = redis::cmd("PING").query_async(conn).await?;
+ match pong.as_str() {
+ "PONG" => Ok(()),
+ _ => Err((
+ ErrorKind::Server(redis::ServerErrorKind::ResponseError),
+ "ping request",
+ )
+ .into()),
+ }
+ }
+
+ fn has_broken(&self, _: &mut Self::Connection) -> bool {
+ false
+ }
+}
diff --git a/crates/sh-util/src/lib.rs b/crates/sh-util/src/lib.rs
new file mode 100644
index 0000000..5501a81
--- /dev/null
+++ b/crates/sh-util/src/lib.rs
@@ -0,0 +1,2 @@
+#[cfg(feature = "cache")]
+pub mod cache;