mirror of
https://github.com/alibaba/higress.git
synced 2026-05-08 04:17:27 +08:00
feat: Rust WASM supports Redis database configuration option (#2704)
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
use higress_wasm_rust::cluster_wrapper::{DnsCluster, StaticIpCluster};
|
||||
use higress_wasm_rust::cluster_wrapper::{DnsCluster, FQDNCluster};
|
||||
use higress_wasm_rust::log::Log;
|
||||
use higress_wasm_rust::plugin_wrapper::{HttpContextWrapper, RootContextWrapper};
|
||||
use higress_wasm_rust::redis_wrapper::{RedisClient, RedisClientBuilder, RedisClientConfig};
|
||||
use higress_wasm_rust::redis_wrapper::{RedisClient, RedisClientConfig};
|
||||
use higress_wasm_rust::rule_matcher::{on_configure, RuleMatcher, SharedRuleMatcher};
|
||||
use http::Method;
|
||||
use multimap::MultiMap;
|
||||
@@ -23,12 +23,45 @@ proxy_wasm::main! {{
|
||||
|
||||
const PLUGIN_NAME: &str = "demo-wasm";
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
struct RedisConfig {
|
||||
service_name: String,
|
||||
#[serde(default)]
|
||||
service_port: Option<u16>,
|
||||
#[serde(default)]
|
||||
username: Option<String>,
|
||||
#[serde(default)]
|
||||
password: Option<String>,
|
||||
#[serde(default = "default_redis_timeout")]
|
||||
timeout: u64, // 超时(默认1000ms)
|
||||
#[serde(default)]
|
||||
database: Option<u32>,
|
||||
}
|
||||
|
||||
impl Default for RedisConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
service_name: String::new(),
|
||||
service_port: None,
|
||||
username: None,
|
||||
password: None,
|
||||
timeout: default_redis_timeout(),
|
||||
database: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 超时时间默认值
|
||||
fn default_redis_timeout() -> u64 {
|
||||
1000
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Deserialize, Clone)]
|
||||
struct DemoWasmConfig {
|
||||
// 配置文件结构体
|
||||
test: String,
|
||||
#[serde(default)]
|
||||
password: Option<String>,
|
||||
#[serde(rename = "redis")]
|
||||
redis_config: RedisConfig,
|
||||
}
|
||||
|
||||
fn format_body(body: Option<Vec<u8>>) -> String {
|
||||
@@ -74,50 +107,49 @@ impl HttpContextWrapper<DemoWasmConfig> for DemoWasm {
|
||||
// 请求header获取完成回调
|
||||
self.log
|
||||
.info(&format!("on_http_request_complete_headers {:?}", headers));
|
||||
if let Some(config) = &self.config {
|
||||
let _redis_client = RedisClientBuilder::new(
|
||||
&StaticIpCluster::new("redis", 80, ""),
|
||||
Duration::from_secs(5),
|
||||
)
|
||||
.password(config.password.as_ref())
|
||||
.build();
|
||||
|
||||
let redis_client = RedisClient::new(
|
||||
RedisClientConfig::new(
|
||||
&StaticIpCluster::new("redis", 80, ""),
|
||||
Duration::from_secs(5),
|
||||
)
|
||||
.password(config.password.as_ref()),
|
||||
);
|
||||
|
||||
// 验证配置是否存在
|
||||
let config = match &self.config {
|
||||
Some(cfg) => cfg,
|
||||
None => {
|
||||
self.log.error("Plugin configuration is missing");
|
||||
return HeaderAction::Continue;
|
||||
}
|
||||
};
|
||||
// 初始化redis client
|
||||
let redis_client = match self.initialize_redis_client(config) {
|
||||
Ok(client) => client,
|
||||
Err(err) => {
|
||||
self.log
|
||||
.error(&format!("Failed to initialize Redis client: {}", err));
|
||||
return HeaderAction::Continue;
|
||||
}
|
||||
};
|
||||
self.redis_client = Some(redis_client);
|
||||
// 调用redis
|
||||
if let Some(redis_client) = &self.redis_client {
|
||||
if let Some(self_rc) = self.weak.upgrade() {
|
||||
let init_res = redis_client.init();
|
||||
self.log.info(&format!("redis init {:?}", init_res));
|
||||
if init_res.is_ok() {
|
||||
let incr_res = redis_client.incr(
|
||||
"connect",
|
||||
Box::new(move |res, status, token_id| {
|
||||
self_rc.borrow().log().info(&format!(
|
||||
"redis incr finish value_res:{:?}, status: {}, token_id: {}",
|
||||
res, status, token_id
|
||||
));
|
||||
if let Some(this) = self_rc.borrow_mut().downcast_mut::<DemoWasm>() {
|
||||
if let Ok(Value::Int(value)) = res {
|
||||
this.cid = *value;
|
||||
}
|
||||
let incr_res = redis_client.incr(
|
||||
"connect",
|
||||
Box::new(move |res, status, token_id| {
|
||||
self_rc.borrow().log().info(&format!(
|
||||
"redis incr finish value_res:{:?}, status: {}, token_id: {}",
|
||||
res, status, token_id
|
||||
));
|
||||
if let Some(this) = self_rc.borrow_mut().downcast_mut::<DemoWasm>() {
|
||||
if let Ok(Value::Int(value)) = res {
|
||||
this.cid = *value;
|
||||
}
|
||||
self_rc.borrow().resume_http_request();
|
||||
}),
|
||||
);
|
||||
match incr_res {
|
||||
Ok(s) => {
|
||||
self.log.info(&format!("redis incr ok {}", s));
|
||||
return HeaderAction::StopAllIterationAndBuffer;
|
||||
}
|
||||
Err(e) => self.log.info(&format!("redis incr error {:?}", e)),
|
||||
};
|
||||
}
|
||||
self.redis_client = Some(redis_client);
|
||||
self_rc.borrow().resume_http_request();
|
||||
}),
|
||||
);
|
||||
match incr_res {
|
||||
Ok(s) => {
|
||||
self.log.info(&format!("redis incr ok {}", s));
|
||||
return HeaderAction::StopAllIterationAndBuffer;
|
||||
}
|
||||
Err(e) => self.log.info(&format!("redis incr error {:?}", e)),
|
||||
};
|
||||
} else {
|
||||
self.log.error("self_weak upgrade error");
|
||||
}
|
||||
@@ -233,6 +265,48 @@ impl HttpContextWrapper<DemoWasmConfig> for DemoWasm {
|
||||
DataAction::Continue
|
||||
}
|
||||
}
|
||||
|
||||
impl DemoWasm {
|
||||
fn initialize_redis_client(&self, config: &DemoWasmConfig) -> Result<RedisClient, String> {
|
||||
let redis_cfg = &config.redis_config;
|
||||
|
||||
if redis_cfg.service_name.is_empty() {
|
||||
return Err("Redis service_name is required but missing".to_string());
|
||||
}
|
||||
|
||||
let service_port = redis_cfg.service_port.unwrap_or_else(|| {
|
||||
if redis_cfg.service_name.ends_with(".static") {
|
||||
80
|
||||
} else {
|
||||
6379
|
||||
}
|
||||
});
|
||||
|
||||
let cluster = FQDNCluster::new(&redis_cfg.service_name, "", service_port);
|
||||
let timeout = Duration::from_millis(redis_cfg.timeout);
|
||||
let mut client_config = RedisClientConfig::new(&cluster, timeout);
|
||||
if let Some(username) = &redis_cfg.username {
|
||||
client_config.username(Some(username));
|
||||
}
|
||||
if let Some(password) = &redis_cfg.password {
|
||||
client_config.password(Some(password));
|
||||
}
|
||||
if let Some(database) = redis_cfg.database {
|
||||
client_config.database(Some(database));
|
||||
}
|
||||
let redis_client = RedisClient::new(&client_config);
|
||||
|
||||
// 初始化redis client
|
||||
match redis_client.init() {
|
||||
Ok(_) => {
|
||||
self.log.info("Redis client initialized successfully");
|
||||
Ok(redis_client)
|
||||
}
|
||||
Err(e) => Err(format!("Failed to initialize Redis client: {:?}", e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct DemoWasmRoot {
|
||||
log: Log,
|
||||
rule_matcher: SharedRuleMatcher<DemoWasmConfig>,
|
||||
|
||||
Reference in New Issue
Block a user