@@ -19,276 +19,121 @@ package bbr
1919
2020import (
2121 "context"
22- "fmt"
2322 "testing"
24- "time"
2523
26- configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2724 extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
2825 "github.com/google/go-cmp/cmp"
29- "google.golang.org/grpc"
30- "google.golang.org/grpc/credentials/insecure"
26+ "github.com/stretchr/testify/require"
3127 "google.golang.org/protobuf/testing/protocmp"
32-
33- runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/server"
34- logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
35- integrationutils "sigs.k8s.io/gateway-api-inference-extension/test/integration"
28+ "sigs.k8s.io/gateway-api-inference-extension/test/integration"
3629)
3730
38- var logger = logutil . NewTestLogger (). V ( logutil . VERBOSE )
39-
31+ // TestBodyBasedRouting validates the "Unary" (Non-Streaming) behavior of BBR.
32+ // This simulates scenarios where Envoy buffers the body before sending it to ext_proc.
4033func TestBodyBasedRouting (t * testing.T ) {
34+ t .Parallel ()
35+
4136 tests := []struct {
42- name string
43- req * extProcPb.ProcessingRequest
44- wantHeaders [] * configPb. HeaderValueOption
45- wantErr bool
37+ name string
38+ req * extProcPb.ProcessingRequest
39+ wantResponse * extProcPb. ProcessingResponse
40+ wantErr bool
4641 }{
4742 {
48- name : "success adding model parameter to header" ,
49- req : integrationutils .GenerateRequest (logger , "test" , "llama" , nil ),
50- wantHeaders : []* configPb.HeaderValueOption {
51- {
52- Header : & configPb.HeaderValue {
53- Key : "X-Gateway-Model-Name" ,
54- RawValue : []byte ("llama" ),
55- },
56- },
57- },
58- wantErr : false ,
43+ name : "success: extracts model and sets header" ,
44+ req : integration .ReqLLMUnary (logger , "test" , "llama" ),
45+ wantResponse : ExpectBBRUnaryResponse ("llama" ),
46+ wantErr : false ,
5947 },
6048 {
61- name : " no model parameter" ,
62- req : integrationutils . GenerateRequest (logger , "test1" , "" , nil ),
63- wantHeaders : [] * configPb. HeaderValueOption {},
64- wantErr : false ,
49+ name : "noop: no model parameter in body " ,
50+ req : integration . ReqLLMUnary (logger , "test1" , "" ),
51+ wantResponse : ExpectBBRUnaryResponse ( "" ), // Expect no headers.
52+ wantErr : false ,
6553 },
6654 }
6755
68- for _ , test := range tests {
69- t .Run (test .name , func (t * testing.T ) {
70- client , cleanup := setUpHermeticServer (false )
71- t .Cleanup (cleanup )
56+ for _ , tc := range tests {
57+ t .Run (tc .name , func (t * testing.T ) {
58+ t .Parallel ()
59+
60+ ctx := context .Background ()
61+ h := NewBBRHarness (t , ctx , false )
62+
63+ res , err := integration .SendRequest (t , h .Client , tc .req )
7264
73- want := & extProcPb.ProcessingResponse {}
74- if len (test .wantHeaders ) > 0 {
75- want .Response = & extProcPb.ProcessingResponse_RequestBody {
76- RequestBody : & extProcPb.BodyResponse {
77- Response : & extProcPb.CommonResponse {
78- HeaderMutation : & extProcPb.HeaderMutation {
79- SetHeaders : test .wantHeaders ,
80- },
81- ClearRouteCache : true ,
82- },
83- },
84- }
65+ if tc .wantErr {
66+ require .Error (t , err , "expected error during request processing" )
8567 } else {
86- want .Response = & extProcPb.ProcessingResponse_RequestBody {
87- RequestBody : & extProcPb.BodyResponse {},
88- }
68+ require .NoError (t , err , "unexpected error during request processing" )
8969 }
9070
91- res , err := integrationutils .SendRequest (t , client , test .req )
92- if err != nil && ! test .wantErr {
93- t .Errorf ("Unexpected error, got: %v, want error: %v" , err , test .wantErr )
94- }
95- if diff := cmp .Diff (want , res , protocmp .Transform ()); diff != "" {
96- t .Errorf ("Unexpected response, (-want +got): %v" , diff )
71+ if diff := cmp .Diff (tc .wantResponse , res , protocmp .Transform ()); diff != "" {
72+ t .Errorf ("Response mismatch (-want +got): %v" , diff )
9773 }
9874 })
9975 }
10076}
10177
78+ // TestFullDuplexStreamed_BodyBasedRouting validates the "Streaming" behavior of BBR.
79+ // This validates that BBR correctly buffers streamed chunks, inspects the body, and injects the header.
10280func TestFullDuplexStreamed_BodyBasedRouting (t * testing.T ) {
81+ t .Parallel ()
82+
10383 tests := []struct {
10484 name string
10585 reqs []* extProcPb.ProcessingRequest
10686 wantResponses []* extProcPb.ProcessingResponse
10787 wantErr bool
10888 }{
10989 {
110- name : "success adding model parameter to header " ,
111- reqs : integrationutils . GenerateStreamedRequestSet (logger , "test" , "foo" , "foo" , nil ),
90+ name : "success: adds model header from simple body " ,
91+ reqs : integration . ReqLLM (logger , "test" , "foo" , "bar" ),
11292 wantResponses : []* extProcPb.ProcessingResponse {
113- {
114- Response : & extProcPb.ProcessingResponse_RequestHeaders {
115- RequestHeaders : & extProcPb.HeadersResponse {
116- Response : & extProcPb.CommonResponse {
117- ClearRouteCache : true ,
118- HeaderMutation : & extProcPb.HeaderMutation {
119- SetHeaders : []* configPb.HeaderValueOption {
120- {
121- Header : & configPb.HeaderValue {
122- Key : "X-Gateway-Model-Name" ,
123- RawValue : []byte ("foo" ),
124- },
125- },
126- }},
127- },
128- },
129- },
130- },
131- {
132- Response : & extProcPb.ProcessingResponse_RequestBody {
133- RequestBody : & extProcPb.BodyResponse {
134- Response : & extProcPb.CommonResponse {
135- BodyMutation : & extProcPb.BodyMutation {
136- Mutation : & extProcPb.BodyMutation_StreamedResponse {
137- StreamedResponse : & extProcPb.StreamedBodyResponse {
138- Body : []byte ("{\" max_tokens\" :100,\" model\" :\" foo\" ,\" prompt\" :\" test\" ,\" temperature\" :0}" ),
139- EndOfStream : true ,
140- },
141- },
142- },
143- },
144- },
145- },
146- },
93+ ExpectBBRHeader ("foo" ),
94+ ExpectBBRBodyPassThrough ("test" , "foo" ),
14795 },
14896 },
14997 {
150- name : "success adding model parameter to header with multiple body chunks" ,
151- reqs : []* extProcPb.ProcessingRequest {
152- {
153- Request : & extProcPb.ProcessingRequest_RequestHeaders {
154- RequestHeaders : & extProcPb.HttpHeaders {
155- Headers : & configPb.HeaderMap {
156- Headers : []* configPb.HeaderValue {
157- {
158- Key : "hi" ,
159- Value : "mom" ,
160- },
161- },
162- },
163- },
164- },
165- },
166- {
167- Request : & extProcPb.ProcessingRequest_RequestBody {
168- RequestBody : & extProcPb.HttpBody {Body : []byte ("{\" max_tokens\" :100,\" model\" :\" sql-lo" ), EndOfStream : false },
169- },
170- },
171- {
172- Request : & extProcPb.ProcessingRequest_RequestBody {
173- RequestBody : & extProcPb.HttpBody {Body : []byte ("ra-sheddable\" ,\" prompt\" :\" test\" ,\" temperature\" :0}" ), EndOfStream : true },
174- },
175- },
176- },
98+ name : "success: buffers split chunks and extracts model" ,
99+ reqs : integration .ReqRaw (
100+ map [string ]string {"hi" : "mom" },
101+ `{"max_tokens":100,"model":"sql-lo` ,
102+ `ra-sheddable","prompt":"test","temperature":0}` ,
103+ ),
177104 wantResponses : []* extProcPb.ProcessingResponse {
178- {
179- Response : & extProcPb.ProcessingResponse_RequestHeaders {
180- RequestHeaders : & extProcPb.HeadersResponse {
181- Response : & extProcPb.CommonResponse {
182- ClearRouteCache : true ,
183- HeaderMutation : & extProcPb.HeaderMutation {
184- SetHeaders : []* configPb.HeaderValueOption {
185- {
186- Header : & configPb.HeaderValue {
187- Key : "X-Gateway-Model-Name" ,
188- RawValue : []byte ("sql-lora-sheddable" ),
189- },
190- },
191- }},
192- },
193- },
194- },
195- },
196- {
197- Response : & extProcPb.ProcessingResponse_RequestBody {
198- RequestBody : & extProcPb.BodyResponse {
199- Response : & extProcPb.CommonResponse {
200- BodyMutation : & extProcPb.BodyMutation {
201- Mutation : & extProcPb.BodyMutation_StreamedResponse {
202- StreamedResponse : & extProcPb.StreamedBodyResponse {
203- Body : []byte ("{\" max_tokens\" :100,\" model\" :\" sql-lora-sheddable\" ,\" prompt\" :\" test\" ,\" temperature\" :0}" ),
204- EndOfStream : true ,
205- },
206- },
207- },
208- },
209- },
210- },
211- },
105+ ExpectBBRHeader ("sql-lora-sheddable" ),
106+ ExpectBBRBodyPassThrough ("test" , "sql-lora-sheddable" ),
212107 },
213108 },
214109 {
215- name : "no model parameter " ,
216- reqs : integrationutils . GenerateStreamedRequestSet (logger , "test" , "" , "" , nil ),
110+ name : "noop: handles missing model field gracefully " ,
111+ reqs : integration . ReqLLM (logger , "test" , "" , "" ),
217112 wantResponses : []* extProcPb.ProcessingResponse {
218- {
219- Response : & extProcPb.ProcessingResponse_RequestHeaders {
220- RequestHeaders : & extProcPb.HeadersResponse {},
221- },
222- },
223- {
224- Response : & extProcPb.ProcessingResponse_RequestBody {
225- RequestBody : & extProcPb.BodyResponse {
226- Response : & extProcPb.CommonResponse {
227- BodyMutation : & extProcPb.BodyMutation {
228- Mutation : & extProcPb.BodyMutation_StreamedResponse {
229- StreamedResponse : & extProcPb.StreamedBodyResponse {
230- Body : []byte ("{\" max_tokens\" :100,\" prompt\" :\" test\" ,\" temperature\" :0}" ),
231- EndOfStream : true ,
232- },
233- },
234- },
235- },
236- },
237- },
238- },
113+ ExpectBBRNoOpHeader (),
114+ ExpectBBRBodyPassThrough ("test" , "" ),
239115 },
240116 },
241117 }
242118
243- for _ , test := range tests {
244- t .Run (test .name , func (t * testing.T ) {
245- client , cleanup := setUpHermeticServer (true )
246- t .Cleanup (cleanup )
119+ for _ , tc := range tests {
120+ t .Run (tc .name , func (t * testing.T ) {
121+ t .Parallel ()
122+
123+ ctx := context .Background ()
124+ h := NewBBRHarness (t , ctx , true )
125+
126+ responses , err := integration .StreamedRequest (t , h .Client , tc .reqs , len (tc .wantResponses ))
247127
248- responses , err := integrationutils .StreamedRequest (t , client , test .reqs , len (test .wantResponses ))
249- if err != nil && ! test .wantErr {
250- t .Errorf ("Unexpected error, got: %v, want error: %v" , err , test .wantErr )
128+ if tc .wantErr {
129+ require .Error (t , err , "expected stream error" )
130+ } else {
131+ require .NoError (t , err , "unexpected stream error" )
251132 }
252133
253- if diff := cmp .Diff (test .wantResponses , responses , protocmp .Transform ()); diff != "" {
254- t .Errorf ("Unexpected response, (-want +got): %v" , diff )
134+ if diff := cmp .Diff (tc .wantResponses , responses , protocmp .Transform ()); diff != "" {
135+ t .Errorf ("Response mismatch (-want +got): %v" , diff )
255136 }
256137 })
257138 }
258139}
259-
260- func setUpHermeticServer (streaming bool ) (client extProcPb.ExternalProcessor_ProcessClient , cleanup func ()) {
261- port := 9004
262-
263- serverCtx , stopServer := context .WithCancel (context .Background ())
264- serverRunner := runserver .NewDefaultExtProcServerRunner (port , false )
265- serverRunner .SecureServing = false
266- serverRunner .Streaming = streaming
267-
268- go func () {
269- if err := serverRunner .AsRunnable (logger .WithName ("ext-proc" )).Start (serverCtx ); err != nil {
270- logutil .Fatal (logger , err , "Failed to start ext-proc server" )
271- }
272- }()
273-
274- address := fmt .Sprintf ("localhost:%v" , port )
275- // Create a grpc connection
276- conn , err := grpc .NewClient (address , grpc .WithTransportCredentials (insecure .NewCredentials ()))
277- if err != nil {
278- logutil .Fatal (logger , err , "Failed to connect" , "address" , address )
279- }
280-
281- ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
282- client , err = extProcPb .NewExternalProcessorClient (conn ).Process (ctx )
283- if err != nil {
284- logutil .Fatal (logger , err , "Failed to create client" )
285- }
286- return client , func () {
287- cancel ()
288- conn .Close ()
289- stopServer ()
290-
291- // wait a little until the goroutines actually exit
292- time .Sleep (5 * time .Second )
293- }
294- }
0 commit comments