diff --git a/plugins/wasm-rust/Makefile b/plugins/wasm-rust/Makefile index 25587e969..5bc1809a3 100644 --- a/plugins/wasm-rust/Makefile +++ b/plugins/wasm-rust/Makefile @@ -6,12 +6,6 @@ IMAGE_TAG = $(if $(strip $(PLUGIN_VERSION)),${PLUGIN_VERSION},${BUILD_TIME}-${CO IMG ?= ${REGISTRY}${PLUGIN_NAME}:${IMAGE_TAG} .DEFAULT: -lint-base: - cargo fmt --all --check - cargo clippy --workspace --all-features --all-targets -lint: - cargo fmt --all --check --manifest-path extensions/${PLUGIN_NAME}/Cargo.toml - cargo clippy --workspace --all-features --all-targets --manifest-path extensions/${PLUGIN_NAME}/Cargo.toml build: DOCKER_BUILDKIT=1 docker build \ --build-arg PLUGIN_NAME=${PLUGIN_NAME} \ @@ -20,3 +14,10 @@ build: . @echo "" @echo "output wasm file: extensions/${PLUGIN_NAME}/plugin.wasm" + +lint-base: + cargo fmt --all --check + cargo clippy --workspace --all-features --all-targets +lint: + cargo fmt --all --check --manifest-path extensions/${PLUGIN_NAME}/Cargo.toml + cargo clippy --workspace --all-features --all-targets --manifest-path extensions/${PLUGIN_NAME}/Cargo.toml diff --git a/plugins/wasm-rust/example/sse-timing/Cargo.lock b/plugins/wasm-rust/example/sse-timing/Cargo.lock new file mode 100644 index 000000000..9123a0e01 --- /dev/null +++ b/plugins/wasm-rust/example/sse-timing/Cargo.lock @@ -0,0 +1,270 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + +[[package]] +name = "bytes" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "downcast-rs" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "getrandom" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] + +[[package]] +name = "higress-wasm-rust" +version = "0.1.0" +dependencies = [ + "downcast-rs", + "http", + "lazy_static", + "multimap", + "proxy-wasm", + "serde", + "serde_json", + "uuid", +] + +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "itoa" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + +[[package]] +name = "libc" +version = "0.2.161" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" + +[[package]] +name = "log" +version = "0.4.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "multimap" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +dependencies = [ + "serde", +] + +[[package]] +name = "once_cell" +version = "1.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" + +[[package]] +name = "proc-macro2" +version = "1.0.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c3a7fc5db1e57d5a779a352c8cdb57b29aa4c40cc69c3a68a7fedc815fbf2f9" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "proxy-wasm" +version = "0.2.2" +source = "git+https://github.com/higress-group/proxy-wasm-rust-sdk?branch=main#6735737fad486c8a7cc324241f58df4a160e7887" +dependencies = [ + "downcast-rs", + "hashbrown", + "log", +] + +[[package]] +name = "quote" +version = "1.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "ryu" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + +[[package]] +name = "serde" +version = "1.0.210" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.210" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.132" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "sse-timing" +version = "0.1.0" +dependencies = [ + "higress-wasm-rust", + "proxy-wasm", + "serde", + "serde_json", +] + +[[package]] +name = "syn" +version = "2.0.82" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83540f837a8afc019423a8edb95b52a8effe46957ee402287f4292fae35be021" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" + +[[package]] +name = "uuid" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +dependencies = [ + "getrandom", +] + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/plugins/wasm-rust/example/sse-timing/Cargo.toml b/plugins/wasm-rust/example/sse-timing/Cargo.toml new file mode 100644 index 000000000..44c4c5dfd --- /dev/null +++ b/plugins/wasm-rust/example/sse-timing/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "sse-timing" +version = "0.1.0" +edition = "2021" +publish = false + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +crate-type = ["cdylib"] + +[dependencies] +higress-wasm-rust = { path = "../../", version = "0.1.0" } +proxy-wasm = { git="https://github.com/higress-group/proxy-wasm-rust-sdk", branch="main", version="0.2.2" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" diff --git a/plugins/wasm-rust/example/sse-timing/Makefile b/plugins/wasm-rust/example/sse-timing/Makefile new file mode 100644 index 000000000..22ec19d79 --- /dev/null +++ b/plugins/wasm-rust/example/sse-timing/Makefile @@ -0,0 +1,10 @@ +BUILD_OPTS="--release" + +.DEFAULT: +build: + cargo build --target wasm32-wasi ${BUILD_OPTS} + find target -name "*.wasm" -d 3 -exec cp "{}" plugin.wasm \; + +clean: + cargo clean + rm -f plugin.wasm diff --git a/plugins/wasm-rust/example/sse-timing/README.md b/plugins/wasm-rust/example/sse-timing/README.md new file mode 100644 index 000000000..b7d8d4bfd --- /dev/null +++ b/plugins/wasm-rust/example/sse-timing/README.md @@ -0,0 +1,26 @@ +## Proxy-Wasm plugin example: SSE Timing + +Proxy-Wasm plugin that traces Server-Side Event(SSE) duration from request start. + +### Building + +```sh +$ make +``` + +### Using in Envoy + +This example can be run with [`docker compose`](https://docs.docker.com/compose/install/) +and has a matching Envoy configuration. + +```sh +$ docker compose up +``` + +#### Access granted. + +Send HTTP request to `localhost:10000/`: + +```sh +$ curl localhost:10000/ +``` diff --git a/plugins/wasm-rust/example/sse-timing/docker-compose.yaml b/plugins/wasm-rust/example/sse-timing/docker-compose.yaml new file mode 100644 index 000000000..78549a2ac --- /dev/null +++ b/plugins/wasm-rust/example/sse-timing/docker-compose.yaml @@ -0,0 +1,35 @@ +# Copyright (c) 2023 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +services: + envoy: + image: higress-registry.cn-hangzhou.cr.aliyuncs.com/higress/all-in-one:latest + entrypoint: /usr/local/bin/envoy + command: -c /etc/envoy/envoy.yaml --component-log-level wasm:debug + depends_on: + - sse-server + hostname: envoy + ports: + - "10000:10000" + volumes: + - ./envoy.yaml:/etc/envoy/envoy.yaml + - ./target/wasm32-wasi/release:/etc/envoy/proxy-wasm-plugins + networks: + - envoymesh + sse-server: + build: sse-server + networks: + - envoymesh +networks: + envoymesh: {} \ No newline at end of file diff --git a/plugins/wasm-rust/example/sse-timing/envoy.yaml b/plugins/wasm-rust/example/sse-timing/envoy.yaml new file mode 100644 index 000000000..6281aad0d --- /dev/null +++ b/plugins/wasm-rust/example/sse-timing/envoy.yaml @@ -0,0 +1,76 @@ +# Copyright (c) 2023 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +static_resources: + listeners: + - name: listener_0 + address: + socket_address: + protocol: TCP + address: 0.0.0.0 + port_value: 10000 + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http + route_config: + name: local_route + virtual_hosts: + - name: local_service + domains: ["*"] + routes: + - match: + prefix: "/" + route: + cluster: sse-server + http_filters: + - name: envoy.filters.http.wasm + typed_config: + "@type": type.googleapis.com/udpa.type.v1.TypedStruct + type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm + value: + config: + name: "http_body" + configuration: + "@type": type.googleapis.com/google.protobuf.StringValue + value: |- + { + "name": "sse_timing", + "_rules_": [] + } + vm_config: + runtime: "envoy.wasm.runtime.v8" + code: + local: + filename: "/etc/envoy/proxy-wasm-plugins/sse_timing.wasm" + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + clusters: + - name: sse-server + connect_timeout: 30s + type: LOGICAL_DNS +# dns_lookup_family: V4_ONLY + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: sse-server + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: sse-server + port_value: 8080 \ No newline at end of file diff --git a/plugins/wasm-rust/example/sse-timing/src/lib.rs b/plugins/wasm-rust/example/sse-timing/src/lib.rs new file mode 100644 index 000000000..ab1fa4f6c --- /dev/null +++ b/plugins/wasm-rust/example/sse-timing/src/lib.rs @@ -0,0 +1,198 @@ +// Copyright (c) 2023 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use higress_wasm_rust::event_stream::EventStream; +use higress_wasm_rust::log::Log; +use higress_wasm_rust::rule_matcher::{on_configure, RuleMatcher, SharedRuleMatcher}; +use proxy_wasm::traits::{Context, HttpContext, RootContext}; +use proxy_wasm::types::{ContextType, DataAction, HeaderAction, LogLevel}; +use serde::Deserialize; +use std::cell::RefCell; +use std::ops::DerefMut; +use std::rc::Rc; +use std::str::from_utf8; +use std::time::{Duration, SystemTime}; + +proxy_wasm::main! {{ + proxy_wasm::set_log_level(LogLevel::Trace); + proxy_wasm::set_root_context(|_|Box::new(SseTimingRoot::new())); +}} + +struct SseTimingRoot { + log: Rc, + rule_matcher: SharedRuleMatcher, +} + +struct SseTiming { + log: Rc, + rule_matcher: SharedRuleMatcher, + vendor: String, + is_event_stream: bool, + event_stream: EventStream, + start_time: SystemTime, +} + +#[derive(Default, Clone, Debug, Deserialize)] +struct SseTimingConfig { + vendor: Option, +} + +impl SseTimingRoot { + fn new() -> Self { + SseTimingRoot { + log: Rc::new(Log::new("sse_timing".to_string())), + rule_matcher: Rc::new(RefCell::new(RuleMatcher::default())), + } + } +} + +impl Context for SseTimingRoot {} + +impl RootContext for SseTimingRoot { + fn on_configure(&mut self, _plugin_configuration_size: usize) -> bool { + on_configure( + self, + _plugin_configuration_size, + self.rule_matcher.borrow_mut().deref_mut(), + &self.log, + ) + } + + fn create_http_context(&self, _context_id: u32) -> Option> { + Some(Box::new(SseTiming { + log: self.log.clone(), + rule_matcher: self.rule_matcher.clone(), + vendor: "higress".into(), + is_event_stream: false, + event_stream: EventStream::new(), + start_time: self.get_current_time(), + })) + } + + fn get_type(&self) -> Option { + Some(ContextType::HttpContext) + } +} + +impl Context for SseTiming {} + +impl HttpContext for SseTiming { + fn on_http_request_headers( + &mut self, + _num_headers: usize, + _end_of_stream: bool, + ) -> HeaderAction { + self.start_time = self.get_current_time(); + + let binding = self.rule_matcher.borrow(); + let config = match binding.get_match_config() { + None => { + return HeaderAction::Continue; + } + Some(config) => config.1, + }; + match config.vendor.clone() { + None => {} + Some(vendor) => self.vendor = vendor, + } + HeaderAction::Continue + } + + fn on_http_response_headers( + &mut self, + _num_headers: usize, + _end_of_stream: bool, + ) -> HeaderAction { + match self.get_http_response_header("Content-Type") { + None => self + .log + .warn("upstream response is not set Content-Type, skipped"), + Some(content_type) => { + if content_type.starts_with("text/event-stream") { + self.is_event_stream = true + } else { + self.log.warn(format!("upstream response Content-Type is not text/event-stream, but {}, skipped", content_type).as_str()) + } + } + } + HeaderAction::Continue + } + + fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> DataAction { + if !self.is_event_stream { + return DataAction::Continue; + } + + let body = self + .get_http_response_body(0, body_size) + .unwrap_or_default(); + self.event_stream.update(body); + self.process_event_stream(end_of_stream) + } +} + +impl SseTiming { + fn process_event_stream(&mut self, end_of_stream: bool) -> DataAction { + let mut modified_events = Vec::new(); + + loop { + match self.event_stream.next() { + None => break, + Some(raw_event) => { + if !raw_event.is_empty() { + // according to spec, event-stream must be utf-8 encoding + let event = from_utf8(raw_event.as_slice()).unwrap(); + let processed_event = self.process_event(event.to_string()); + modified_events.push(processed_event); + } + } + } + } + + if end_of_stream { + match self.event_stream.flush() { + None => {} + Some(raw_event) => { + if !raw_event.is_empty() { + // according to spec, event-stream must be utf-8 encoding + let event = from_utf8(raw_event.as_slice()).unwrap(); + let modified_event = self.process_event(event.into()); + modified_events.push(modified_event); + } + } + } + } + + if !modified_events.is_empty() { + let modified_body = modified_events.concat(); + self.set_http_response_body(0, modified_body.len(), modified_body.as_bytes()); + DataAction::Continue + } else { + DataAction::StopIterationNoBuffer + } + } + + fn process_event(&self, event: String) -> String { + let duration = self + .get_current_time() + .duration_since(self.start_time) + .unwrap_or(Duration::ZERO); + format!( + ": server-timing: {};dur={}\n{}\n\n", + self.vendor, + duration.as_millis(), + event + ) + } +} diff --git a/plugins/wasm-rust/example/sse-timing/sse-server/Dockerfile b/plugins/wasm-rust/example/sse-timing/sse-server/Dockerfile new file mode 100644 index 000000000..7d251e48f --- /dev/null +++ b/plugins/wasm-rust/example/sse-timing/sse-server/Dockerfile @@ -0,0 +1,5 @@ +FROM golang:latest AS builder +WORKDIR /workspace +COPY . . +RUN GOOS=linux GOARCH=amd64 go build -o main . +CMD ./main \ No newline at end of file diff --git a/plugins/wasm-rust/example/sse-timing/sse-server/go.mod b/plugins/wasm-rust/example/sse-timing/sse-server/go.mod new file mode 100644 index 000000000..63e8515e6 --- /dev/null +++ b/plugins/wasm-rust/example/sse-timing/sse-server/go.mod @@ -0,0 +1,3 @@ +module sse + +go 1.22 diff --git a/plugins/wasm-rust/example/sse-timing/sse-server/main.go b/plugins/wasm-rust/example/sse-timing/sse-server/main.go new file mode 100644 index 000000000..bb4a3089f --- /dev/null +++ b/plugins/wasm-rust/example/sse-timing/sse-server/main.go @@ -0,0 +1,42 @@ +package main + +import ( + "log" + "net/http" + "time" +) + +var events = []string{ + ": this is a test stream\n\n", + + "data: some text\n", + "data: another message\n", + "data: with two lines\n\n", + + "event: userconnect\n", + "data: {\"username\": \"bobby\", \"time\": \"02:33:48\"}\n\n", + + "event: usermessage\n", + "data: {\"username\": \"bobby\", \"time\": \"02:34:11\", \"text\": \"Hi everyone.\"}\n\n", + + "event: userdisconnect\n", + "data: {\"username\": \"bobby\", \"time\": \"02:34:23\"}\n\n", + + "event: usermessage\n", + "data: {\"username\": \"sean\", \"time\": \"02:34:36\", \"text\": \"Bye, bobby.\"}\n\n", +} + +func main() { + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + log.Println("receive request") + w.Header().Set("Content-Type", "text/event-stream") + for _, e := range events { + _, _ = w.Write([]byte(e)) + time.Sleep(1 * time.Second) + w.(http.Flusher).Flush() + } + }) + if err := http.ListenAndServe("0.0.0.0:8080", nil); err != nil { + panic(err) + } +} diff --git a/plugins/wasm-rust/src/event_stream.rs b/plugins/wasm-rust/src/event_stream.rs new file mode 100644 index 000000000..f28846317 --- /dev/null +++ b/plugins/wasm-rust/src/event_stream.rs @@ -0,0 +1,197 @@ +// Copyright (c) 2024 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Parsing MIME type text/event-stream according to https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream +/// +/// The event stream format is as described by the stream production of the following ABNF +/// +/// | rule | expression | +/// |--------|---------------------------| +/// |stream |= [ bom ] *event | +/// |event |= *( comment / field ) eol | +/// |comment |= colon *any-char eol | +/// |field |= 1*name-char [ colon [ space ] *any-char ] eol | +/// |eol |= ( cr lf / cr / lf ) | +/// +/// According to spec, we must judge EOL twice before we can identify a complete event. +/// However, in the rules of event and field, there is an ambiguous grammar in the judgment of eol, +/// and it will bring ambiguity (whether the field ends). In order to eliminate this ambiguity, +/// we believe that CRLF as CR+LF belongs to event and field respectively. +pub struct EventStream { + buffer: Vec, + processed_offset: usize, +} + +impl EventStream { + pub fn new() -> Self { + EventStream { + buffer: Vec::new(), + processed_offset: 0, + } + } + + /// Update the event stream by adding new data to the buffer and resetting processed offset if needed. + pub fn update(&mut self, data: Vec) { + if self.processed_offset > 0 { + self.buffer.drain(0..self.processed_offset); + self.processed_offset = 0; + } + + self.buffer.extend(data); + } + + /// Get the next event from the event stream. Return the event data if available, otherwise return None. + /// Next will consume all the data in the current buffer. However, if there is a valid event at the end of the buffer, + /// it will return the event directly even if the data after the next `update` could be considered part of the same event + /// (especially in cases where CRLF hits an ambiguous grammar). + /// When this happens, the next call to next may return an empty event. + /// + /// ``` + /// let mut parser = EventStream::new(); + /// parser.update(...); + /// loop { + /// match parser.next() { + /// None => {} + /// Some(event) => { + /// if !event.is_empty() { + /// ... + /// } + /// } + /// } + /// } + /// ``` + pub fn next(&mut self) -> Option> { + let mut i = self.processed_offset; + + while i < self.buffer.len() { + if let Some(size) = self.is_2eol(i) { + let event = self.buffer[self.processed_offset..i].to_vec(); + self.processed_offset = i + size; + return Some(event); + } + + i += 1; + } + + None + } + + /// Flush the event stream and return any remaining unprocessed event data. Return None if there is none. + pub fn flush(&mut self) -> Option> { + if self.processed_offset < self.buffer.len() { + let remaining_event = self.buffer[self.processed_offset..].to_vec(); + self.processed_offset = self.buffer.len(); + Some(remaining_event) + } else { + None + } + } + + fn is_eol(&self, i: usize) -> Option { + if i + 1 < self.buffer.len() && self.buffer[i] == b'\r' && self.buffer[i + 1] == b'\n' { + Some(2) + } else if self.buffer[i] == b'\r' || self.buffer[i] == b'\n' { + Some(1) + } else { + None + } + } + + fn is_2eol(&self, i: usize) -> Option { + let size1 = match self.is_eol(i) { + None => return None, + Some(size1) => size1, + }; + if i + size1 < self.buffer.len() { + match self.is_eol(i + size1) { + None => { + if size1 == 2 { + Some(2) + } else { + None + } + } + Some(size2) => Some(size1 + size2), + } + } else if size1 == 2 { + Some(2) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_crlf_events() { + let mut parser = EventStream::new(); + parser.update(b"event1\n\nevent2\n\n".to_vec()); + + assert_eq!(parser.next(), Some(b"event1".to_vec())); + assert_eq!(parser.next(), Some(b"event2".to_vec())); + } + + #[test] + fn test_lf_events() { + let mut parser = EventStream::new(); + parser.update(b"event3\n\r\nevent4\r\n".to_vec()); + + assert_eq!(parser.next(), Some(b"event3".to_vec())); + assert_eq!(parser.next(), Some(b"event4".to_vec())); + } + + #[test] + fn test_partial_event() { + let mut parser = EventStream::new(); + parser.update(b"partial_event1".to_vec()); + + assert_eq!(parser.next(), None); + + parser.update(b"\n\n".to_vec()); + assert_eq!(parser.next(), Some(b"partial_event1".to_vec())); + } + + #[test] + fn test_mixed_eol_events() { + let mut parser = EventStream::new(); + parser.update(b"event5\r\nevent6\r\n\r\nevent7\r\n".to_vec()); + + assert_eq!(parser.next(), Some(b"event5".to_vec())); + assert_eq!(parser.next(), Some(b"event6".to_vec())); + assert_eq!(parser.next(), Some(b"event7".to_vec())); + } + + #[test] + fn test_mixed2_eol_events() { + let mut parser = EventStream::new(); + parser.update(b"event5\r\nevent6\r\n".to_vec()); + assert_eq!(parser.next(), Some(b"event5".to_vec())); + assert_eq!(parser.next(), Some(b"event6".to_vec())); + parser.update(b"\r\nevent7\r\n".to_vec()); + assert_eq!(parser.next(), Some(b"".to_vec())); + assert_eq!(parser.next(), Some(b"event7".to_vec())); + } + + #[test] + fn test_no_event() { + let mut parser = EventStream::new(); + parser.update(b"no_eol_in_this_string".to_vec()); + + assert_eq!(parser.next(), None); + assert_eq!(parser.flush(), Some(b"no_eol_in_this_string".to_vec())); + } +} diff --git a/plugins/wasm-rust/src/lib.rs b/plugins/wasm-rust/src/lib.rs index 3296ff648..f6a92be89 100644 --- a/plugins/wasm-rust/src/lib.rs +++ b/plugins/wasm-rust/src/lib.rs @@ -14,6 +14,7 @@ pub mod cluster_wrapper; pub mod error; +pub mod event_stream; mod internal; pub mod log; pub mod plugin_wrapper;