feat: implements text/event-stream(SSE) MIME parser (#1416)

Co-authored-by: 007gzs <007gzs@gmail.com>
This commit is contained in:
纪卓志
2024-10-24 16:58:45 +08:00
committed by GitHub
parent cdd71155a9
commit e7561c30e5
13 changed files with 885 additions and 6 deletions

View File

@@ -6,12 +6,6 @@ IMAGE_TAG = $(if $(strip $(PLUGIN_VERSION)),${PLUGIN_VERSION},${BUILD_TIME}-${CO
IMG ?= ${REGISTRY}${PLUGIN_NAME}:${IMAGE_TAG}
.DEFAULT:
lint-base:
cargo fmt --all --check
cargo clippy --workspace --all-features --all-targets
lint:
cargo fmt --all --check --manifest-path extensions/${PLUGIN_NAME}/Cargo.toml
cargo clippy --workspace --all-features --all-targets --manifest-path extensions/${PLUGIN_NAME}/Cargo.toml
build:
DOCKER_BUILDKIT=1 docker build \
--build-arg PLUGIN_NAME=${PLUGIN_NAME} \
@@ -20,3 +14,10 @@ build:
.
@echo ""
@echo "output wasm file: extensions/${PLUGIN_NAME}/plugin.wasm"
lint-base:
cargo fmt --all --check
cargo clippy --workspace --all-features --all-targets
lint:
cargo fmt --all --check --manifest-path extensions/${PLUGIN_NAME}/Cargo.toml
cargo clippy --workspace --all-features --all-targets --manifest-path extensions/${PLUGIN_NAME}/Cargo.toml

View File

@@ -0,0 +1,270 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "ahash"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
name = "allocator-api2"
version = "0.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f"
[[package]]
name = "bytes"
version = "1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "downcast-rs"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2"
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "getrandom"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash",
"allocator-api2",
]
[[package]]
name = "higress-wasm-rust"
version = "0.1.0"
dependencies = [
"downcast-rs",
"http",
"lazy_static",
"multimap",
"proxy-wasm",
"serde",
"serde_json",
"uuid",
]
[[package]]
name = "http"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "itoa"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "lazy_static"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "libc"
version = "0.2.161"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1"
[[package]]
name = "log"
version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "memchr"
version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "multimap"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
dependencies = [
"serde",
]
[[package]]
name = "once_cell"
version = "1.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775"
[[package]]
name = "proc-macro2"
version = "1.0.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c3a7fc5db1e57d5a779a352c8cdb57b29aa4c40cc69c3a68a7fedc815fbf2f9"
dependencies = [
"unicode-ident",
]
[[package]]
name = "proxy-wasm"
version = "0.2.2"
source = "git+https://github.com/higress-group/proxy-wasm-rust-sdk?branch=main#6735737fad486c8a7cc324241f58df4a160e7887"
dependencies = [
"downcast-rs",
"hashbrown",
"log",
]
[[package]]
name = "quote"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af"
dependencies = [
"proc-macro2",
]
[[package]]
name = "ryu"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
[[package]]
name = "serde"
version = "1.0.210"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.210"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.132"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03"
dependencies = [
"itoa",
"memchr",
"ryu",
"serde",
]
[[package]]
name = "sse-timing"
version = "0.1.0"
dependencies = [
"higress-wasm-rust",
"proxy-wasm",
"serde",
"serde_json",
]
[[package]]
name = "syn"
version = "2.0.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83540f837a8afc019423a8edb95b52a8effe46957ee402287f4292fae35be021"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "unicode-ident"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe"
[[package]]
name = "uuid"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a"
dependencies = [
"getrandom",
]
[[package]]
name = "version_check"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "zerocopy"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

View File

@@ -0,0 +1,15 @@
[package]
name = "sse-timing"
version = "0.1.0"
edition = "2021"
publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
crate-type = ["cdylib"]
[dependencies]
higress-wasm-rust = { path = "../../", version = "0.1.0" }
proxy-wasm = { git="https://github.com/higress-group/proxy-wasm-rust-sdk", branch="main", version="0.2.2" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View File

@@ -0,0 +1,10 @@
BUILD_OPTS="--release"
.DEFAULT:
build:
cargo build --target wasm32-wasi ${BUILD_OPTS}
find target -name "*.wasm" -d 3 -exec cp "{}" plugin.wasm \;
clean:
cargo clean
rm -f plugin.wasm

View File

@@ -0,0 +1,26 @@
## Proxy-Wasm plugin example: SSE Timing
Proxy-Wasm plugin that traces Server-Side Event(SSE) duration from request start.
### Building
```sh
$ make
```
### Using in Envoy
This example can be run with [`docker compose`](https://docs.docker.com/compose/install/)
and has a matching Envoy configuration.
```sh
$ docker compose up
```
#### Access granted.
Send HTTP request to `localhost:10000/`:
```sh
$ curl localhost:10000/
```

View File

@@ -0,0 +1,35 @@
# Copyright (c) 2023 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
services:
envoy:
image: higress-registry.cn-hangzhou.cr.aliyuncs.com/higress/all-in-one:latest
entrypoint: /usr/local/bin/envoy
command: -c /etc/envoy/envoy.yaml --component-log-level wasm:debug
depends_on:
- sse-server
hostname: envoy
ports:
- "10000:10000"
volumes:
- ./envoy.yaml:/etc/envoy/envoy.yaml
- ./target/wasm32-wasi/release:/etc/envoy/proxy-wasm-plugins
networks:
- envoymesh
sse-server:
build: sse-server
networks:
- envoymesh
networks:
envoymesh: {}

View File

@@ -0,0 +1,76 @@
# Copyright (c) 2023 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
static_resources:
listeners:
- name: listener_0
address:
socket_address:
protocol: TCP
address: 0.0.0.0
port_value: 10000
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match:
prefix: "/"
route:
cluster: sse-server
http_filters:
- name: envoy.filters.http.wasm
typed_config:
"@type": type.googleapis.com/udpa.type.v1.TypedStruct
type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm
value:
config:
name: "http_body"
configuration:
"@type": type.googleapis.com/google.protobuf.StringValue
value: |-
{
"name": "sse_timing",
"_rules_": []
}
vm_config:
runtime: "envoy.wasm.runtime.v8"
code:
local:
filename: "/etc/envoy/proxy-wasm-plugins/sse_timing.wasm"
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
clusters:
- name: sse-server
connect_timeout: 30s
type: LOGICAL_DNS
# dns_lookup_family: V4_ONLY
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: sse-server
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: sse-server
port_value: 8080

View File

@@ -0,0 +1,198 @@
// Copyright (c) 2023 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use higress_wasm_rust::event_stream::EventStream;
use higress_wasm_rust::log::Log;
use higress_wasm_rust::rule_matcher::{on_configure, RuleMatcher, SharedRuleMatcher};
use proxy_wasm::traits::{Context, HttpContext, RootContext};
use proxy_wasm::types::{ContextType, DataAction, HeaderAction, LogLevel};
use serde::Deserialize;
use std::cell::RefCell;
use std::ops::DerefMut;
use std::rc::Rc;
use std::str::from_utf8;
use std::time::{Duration, SystemTime};
proxy_wasm::main! {{
proxy_wasm::set_log_level(LogLevel::Trace);
proxy_wasm::set_root_context(|_|Box::new(SseTimingRoot::new()));
}}
struct SseTimingRoot {
log: Rc<Log>,
rule_matcher: SharedRuleMatcher<SseTimingConfig>,
}
struct SseTiming {
log: Rc<Log>,
rule_matcher: SharedRuleMatcher<SseTimingConfig>,
vendor: String,
is_event_stream: bool,
event_stream: EventStream,
start_time: SystemTime,
}
#[derive(Default, Clone, Debug, Deserialize)]
struct SseTimingConfig {
vendor: Option<String>,
}
impl SseTimingRoot {
fn new() -> Self {
SseTimingRoot {
log: Rc::new(Log::new("sse_timing".to_string())),
rule_matcher: Rc::new(RefCell::new(RuleMatcher::default())),
}
}
}
impl Context for SseTimingRoot {}
impl RootContext for SseTimingRoot {
fn on_configure(&mut self, _plugin_configuration_size: usize) -> bool {
on_configure(
self,
_plugin_configuration_size,
self.rule_matcher.borrow_mut().deref_mut(),
&self.log,
)
}
fn create_http_context(&self, _context_id: u32) -> Option<Box<dyn HttpContext>> {
Some(Box::new(SseTiming {
log: self.log.clone(),
rule_matcher: self.rule_matcher.clone(),
vendor: "higress".into(),
is_event_stream: false,
event_stream: EventStream::new(),
start_time: self.get_current_time(),
}))
}
fn get_type(&self) -> Option<ContextType> {
Some(ContextType::HttpContext)
}
}
impl Context for SseTiming {}
impl HttpContext for SseTiming {
fn on_http_request_headers(
&mut self,
_num_headers: usize,
_end_of_stream: bool,
) -> HeaderAction {
self.start_time = self.get_current_time();
let binding = self.rule_matcher.borrow();
let config = match binding.get_match_config() {
None => {
return HeaderAction::Continue;
}
Some(config) => config.1,
};
match config.vendor.clone() {
None => {}
Some(vendor) => self.vendor = vendor,
}
HeaderAction::Continue
}
fn on_http_response_headers(
&mut self,
_num_headers: usize,
_end_of_stream: bool,
) -> HeaderAction {
match self.get_http_response_header("Content-Type") {
None => self
.log
.warn("upstream response is not set Content-Type, skipped"),
Some(content_type) => {
if content_type.starts_with("text/event-stream") {
self.is_event_stream = true
} else {
self.log.warn(format!("upstream response Content-Type is not text/event-stream, but {}, skipped", content_type).as_str())
}
}
}
HeaderAction::Continue
}
fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> DataAction {
if !self.is_event_stream {
return DataAction::Continue;
}
let body = self
.get_http_response_body(0, body_size)
.unwrap_or_default();
self.event_stream.update(body);
self.process_event_stream(end_of_stream)
}
}
impl SseTiming {
fn process_event_stream(&mut self, end_of_stream: bool) -> DataAction {
let mut modified_events = Vec::new();
loop {
match self.event_stream.next() {
None => break,
Some(raw_event) => {
if !raw_event.is_empty() {
// according to spec, event-stream must be utf-8 encoding
let event = from_utf8(raw_event.as_slice()).unwrap();
let processed_event = self.process_event(event.to_string());
modified_events.push(processed_event);
}
}
}
}
if end_of_stream {
match self.event_stream.flush() {
None => {}
Some(raw_event) => {
if !raw_event.is_empty() {
// according to spec, event-stream must be utf-8 encoding
let event = from_utf8(raw_event.as_slice()).unwrap();
let modified_event = self.process_event(event.into());
modified_events.push(modified_event);
}
}
}
}
if !modified_events.is_empty() {
let modified_body = modified_events.concat();
self.set_http_response_body(0, modified_body.len(), modified_body.as_bytes());
DataAction::Continue
} else {
DataAction::StopIterationNoBuffer
}
}
fn process_event(&self, event: String) -> String {
let duration = self
.get_current_time()
.duration_since(self.start_time)
.unwrap_or(Duration::ZERO);
format!(
": server-timing: {};dur={}\n{}\n\n",
self.vendor,
duration.as_millis(),
event
)
}
}

View File

@@ -0,0 +1,5 @@
FROM golang:latest AS builder
WORKDIR /workspace
COPY . .
RUN GOOS=linux GOARCH=amd64 go build -o main .
CMD ./main

View File

@@ -0,0 +1,3 @@
module sse
go 1.22

View File

@@ -0,0 +1,42 @@
package main
import (
"log"
"net/http"
"time"
)
var events = []string{
": this is a test stream\n\n",
"data: some text\n",
"data: another message\n",
"data: with two lines\n\n",
"event: userconnect\n",
"data: {\"username\": \"bobby\", \"time\": \"02:33:48\"}\n\n",
"event: usermessage\n",
"data: {\"username\": \"bobby\", \"time\": \"02:34:11\", \"text\": \"Hi everyone.\"}\n\n",
"event: userdisconnect\n",
"data: {\"username\": \"bobby\", \"time\": \"02:34:23\"}\n\n",
"event: usermessage\n",
"data: {\"username\": \"sean\", \"time\": \"02:34:36\", \"text\": \"Bye, bobby.\"}\n\n",
}
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
log.Println("receive request")
w.Header().Set("Content-Type", "text/event-stream")
for _, e := range events {
_, _ = w.Write([]byte(e))
time.Sleep(1 * time.Second)
w.(http.Flusher).Flush()
}
})
if err := http.ListenAndServe("0.0.0.0:8080", nil); err != nil {
panic(err)
}
}

View File

@@ -0,0 +1,197 @@
// Copyright (c) 2024 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Parsing MIME type text/event-stream according to https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
///
/// The event stream format is as described by the stream production of the following ABNF
///
/// | rule | expression |
/// |--------|---------------------------|
/// |stream |= [ bom ] *event |
/// |event |= *( comment / field ) eol |
/// |comment |= colon *any-char eol |
/// |field |= 1*name-char [ colon [ space ] *any-char ] eol |
/// |eol |= ( cr lf / cr / lf ) |
///
/// According to spec, we must judge EOL twice before we can identify a complete event.
/// However, in the rules of event and field, there is an ambiguous grammar in the judgment of eol,
/// and it will bring ambiguity (whether the field ends). In order to eliminate this ambiguity,
/// we believe that CRLF as CR+LF belongs to event and field respectively.
pub struct EventStream {
buffer: Vec<u8>,
processed_offset: usize,
}
impl EventStream {
pub fn new() -> Self {
EventStream {
buffer: Vec::new(),
processed_offset: 0,
}
}
/// Update the event stream by adding new data to the buffer and resetting processed offset if needed.
pub fn update(&mut self, data: Vec<u8>) {
if self.processed_offset > 0 {
self.buffer.drain(0..self.processed_offset);
self.processed_offset = 0;
}
self.buffer.extend(data);
}
/// Get the next event from the event stream. Return the event data if available, otherwise return None.
/// Next will consume all the data in the current buffer. However, if there is a valid event at the end of the buffer,
/// it will return the event directly even if the data after the next `update` could be considered part of the same event
/// (especially in cases where CRLF hits an ambiguous grammar).
/// When this happens, the next call to next may return an empty event.
///
/// ```
/// let mut parser = EventStream::new();
/// parser.update(...);
/// loop {
/// match parser.next() {
/// None => {}
/// Some(event) => {
/// if !event.is_empty() {
/// ...
/// }
/// }
/// }
/// }
/// ```
pub fn next(&mut self) -> Option<Vec<u8>> {
let mut i = self.processed_offset;
while i < self.buffer.len() {
if let Some(size) = self.is_2eol(i) {
let event = self.buffer[self.processed_offset..i].to_vec();
self.processed_offset = i + size;
return Some(event);
}
i += 1;
}
None
}
/// Flush the event stream and return any remaining unprocessed event data. Return None if there is none.
pub fn flush(&mut self) -> Option<Vec<u8>> {
if self.processed_offset < self.buffer.len() {
let remaining_event = self.buffer[self.processed_offset..].to_vec();
self.processed_offset = self.buffer.len();
Some(remaining_event)
} else {
None
}
}
fn is_eol(&self, i: usize) -> Option<usize> {
if i + 1 < self.buffer.len() && self.buffer[i] == b'\r' && self.buffer[i + 1] == b'\n' {
Some(2)
} else if self.buffer[i] == b'\r' || self.buffer[i] == b'\n' {
Some(1)
} else {
None
}
}
fn is_2eol(&self, i: usize) -> Option<usize> {
let size1 = match self.is_eol(i) {
None => return None,
Some(size1) => size1,
};
if i + size1 < self.buffer.len() {
match self.is_eol(i + size1) {
None => {
if size1 == 2 {
Some(2)
} else {
None
}
}
Some(size2) => Some(size1 + size2),
}
} else if size1 == 2 {
Some(2)
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_crlf_events() {
let mut parser = EventStream::new();
parser.update(b"event1\n\nevent2\n\n".to_vec());
assert_eq!(parser.next(), Some(b"event1".to_vec()));
assert_eq!(parser.next(), Some(b"event2".to_vec()));
}
#[test]
fn test_lf_events() {
let mut parser = EventStream::new();
parser.update(b"event3\n\r\nevent4\r\n".to_vec());
assert_eq!(parser.next(), Some(b"event3".to_vec()));
assert_eq!(parser.next(), Some(b"event4".to_vec()));
}
#[test]
fn test_partial_event() {
let mut parser = EventStream::new();
parser.update(b"partial_event1".to_vec());
assert_eq!(parser.next(), None);
parser.update(b"\n\n".to_vec());
assert_eq!(parser.next(), Some(b"partial_event1".to_vec()));
}
#[test]
fn test_mixed_eol_events() {
let mut parser = EventStream::new();
parser.update(b"event5\r\nevent6\r\n\r\nevent7\r\n".to_vec());
assert_eq!(parser.next(), Some(b"event5".to_vec()));
assert_eq!(parser.next(), Some(b"event6".to_vec()));
assert_eq!(parser.next(), Some(b"event7".to_vec()));
}
#[test]
fn test_mixed2_eol_events() {
let mut parser = EventStream::new();
parser.update(b"event5\r\nevent6\r\n".to_vec());
assert_eq!(parser.next(), Some(b"event5".to_vec()));
assert_eq!(parser.next(), Some(b"event6".to_vec()));
parser.update(b"\r\nevent7\r\n".to_vec());
assert_eq!(parser.next(), Some(b"".to_vec()));
assert_eq!(parser.next(), Some(b"event7".to_vec()));
}
#[test]
fn test_no_event() {
let mut parser = EventStream::new();
parser.update(b"no_eol_in_this_string".to_vec());
assert_eq!(parser.next(), None);
assert_eq!(parser.flush(), Some(b"no_eol_in_this_string".to_vec()));
}
}

View File

@@ -14,6 +14,7 @@
pub mod cluster_wrapper;
pub mod error;
pub mod event_stream;
mod internal;
pub mod log;
pub mod plugin_wrapper;