Skip to content

Commit 78ffe61

Browse files
fix infinite loop in profile picker and switch predictor based routing to on by default with a header to disable (#1929)
* fix infinite loop in profile picker when using latency routing with predictor based scheduling off * add fix in ProcessResults * Fix type for lint * Fix prefix cache not being being ordered properly in profile picker and set predictor scheduling to true instead of flase when no flag is present * Change predictor based scheduling header to one that dissables it, and make it on by default when deploying with latency based routing * Move slo profile handler into slo routing package * Add slo aware handler
1 parent 56e6721 commit 78ffe61

File tree

6 files changed

+44
-81
lines changed

6 files changed

+44
-81
lines changed

cmd/epp/runner/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ func (r *Runner) registerInTreePlugins() {
435435
plugins.Register(scorer.LoraAffinityScorerType, scorer.LoraAffinityScorerFactory)
436436
// Latency predictor plugins
437437
plugins.Register(slo_aware_router.SLOAwareRouterPluginType, slo_aware_router.SLOAwareRouterFactory)
438-
plugins.Register(profile.SLOAwareProfileHandlerType, profile.SLOAwareProfileHandlerFactory)
438+
plugins.Register(slo_aware_router.SLOAwareProfileHandlerType, slo_aware_router.SLOAwareProfileHandlerFactory)
439439
// register filter for test purpose only (used in conformance tests)
440440
plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
441441
// register response received plugin for test purpose only (used in conformance tests)

pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/headers.go

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,24 +46,8 @@ func parseFloatHeader(request schedulingtypes.LLMRequest, headerName string) (fl
4646
return parsedFloat, nil
4747
}
4848

49-
// parseFloatHeader retrieves a header by name, parses it as a bool,
50-
// and returns the value or an error if the header is missing or invalid.
51-
func parseBoolHeader(request schedulingtypes.LLMRequest, headerName string) (bool, error) {
52-
// 1. Get header value from the map
53-
headerValue, ok := request.Headers[headerName]
54-
if !ok {
55-
return false, nil // Header not found, return 0 and false
56-
}
57-
58-
// 2. Parse the header value to a bool
59-
parsedBool, err := strconv.ParseBool(headerValue)
60-
if err != nil {
61-
return false, errutil.Error{
62-
Code: errutil.BadRequest,
63-
Msg: headerName + " must be a bool",
64-
}
65-
}
66-
67-
// 3. Return the successfully parsed value
68-
return parsedBool, nil
49+
// hasHeader checks if a header key exists in the request headers map.
50+
func hasHeader(request schedulingtypes.LLMRequest, headerName string) bool {
51+
_, ok := request.Headers[headerName]
52+
return ok
6953
}

pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/scorer_helpers.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,7 @@ func (s *SLOAwareRouter) parseSLOHeaders(ctx context.Context, request *schedulin
4141
if err != nil {
4242
logger.V(logutil.DEBUG).Error(errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("%v must be a float: %v", tpotSLOHeaderKey, err)}, "SLOAwareRouter: Error parsing TPOT SLO from header")
4343
}
44-
sloCtx.predictorBasedScheduling, err = parseBoolHeader(*request, "x-prediction-based-scheduling")
45-
if err != nil {
46-
logger.V(logutil.DEBUG).Error(errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("x-prediction-based-scheduling must be a bool: %v", err)}, "SLOAwareRouter: Error parsing PredictorBasedScheduling from header")
47-
}
44+
sloCtx.predictorBasedScheduling = !hasHeader(*request, "x-prediction-based-scheduling-off")
4845
}
4946

5047
func (s *SLOAwareRouter) classifyPodsByHeadroom(allPreds []podPredictionResult) (posHeadroomPods, negHeadroomPods []podPredictionResult) {

pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/scorer_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23-
"strconv"
2423
"testing"
2524

2625
"github.com/stretchr/testify/assert"
@@ -127,7 +126,9 @@ func createTestLLMRequest(reqID string, ttftSLO, tpotSLO float64, predictionBase
127126
if tpotSLO > 0 {
128127
headers["x-avg-tpot-slo"] = fmt.Sprintf("%f", tpotSLO)
129128
}
130-
headers["x-prediction-based-scheduling"] = strconv.FormatBool(predictionBased)
129+
if !predictionBased {
130+
headers["x-prediction-based-scheduling-off"] = "true"
131+
}
131132

132133
return &schedulingtypes.LLMRequest{
133134
Headers: headers,

pkg/epp/scheduling/framework/plugins/profile/slo_aware_profile_handler.go renamed to pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/slo_aware_profile_handler.go

Lines changed: 32 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,17 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package profile
17+
package slo_aware_router
1818

1919
import (
2020
"context"
2121
"encoding/json"
2222
"errors"
2323
"fmt"
24-
"strconv"
25-
26-
"sigs.k8s.io/controller-runtime/pkg/log"
2724

2825
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2926
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
3027
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
31-
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3228
)
3329

3430
const (
@@ -38,7 +34,7 @@ const (
3834
LatencyRoutingProfileName = "predicted-latency-routing"
3935

4036
// Boolean header string for whether to use predictor based scheduling
41-
PreictionBasedSchedulingHeaderKey = "x-prediction-based-scheduling"
37+
PreictionBasedSchedulingHeaderKey = "x-prediction-based-scheduling-off"
4238
)
4339

4440
// compile-time type assertion
@@ -79,38 +75,36 @@ func (h *SLOAwareProfileHandler) WithName(name string) *SLOAwareProfileHandler {
7975
func (h *SLOAwareProfileHandler) Pick(ctx context.Context, _ *types.CycleState, request *types.LLMRequest, profiles map[string]*framework.SchedulerProfile,
8076
profileResults map[string]*types.ProfileRunResult) map[string]*framework.SchedulerProfile {
8177

82-
logger := log.FromContext(ctx)
78+
predictorBasedScheduling := !isHeaderPresent(*request, PreictionBasedSchedulingHeaderKey)
8379

84-
predictorBasedScheduling, err := parseBoolHeader(*request, PreictionBasedSchedulingHeaderKey)
85-
if err != nil {
86-
logger.V(logutil.DEBUG).Error(err, "error parsing predictorBasedScheduling from header failed to choose scheduling profile: x-prediction-based-scheduling must be a bool")
87-
return nil
80+
_, prefixExecuted := profileResults[PrefixProfileName]
81+
// if prefix profile was not executed yet, first let the scheduler run it
82+
if !prefixExecuted {
83+
return map[string]*framework.SchedulerProfile{
84+
PrefixProfileName: profiles[PrefixProfileName],
85+
}
8886
}
8987

9088
if predictorBasedScheduling {
91-
_, prefixExecuted := profileResults[PrefixProfileName]
9289
_, routingExecuted := profileResults[LatencyRoutingProfileName]
93-
if prefixExecuted && routingExecuted { // both routing profiles have been executed already in previous call
94-
return map[string]*framework.SchedulerProfile{}
95-
}
96-
97-
// if prefix profile was not executed yet, first let the scheduler run it
98-
if !prefixExecuted {
90+
// routing profile has not been executed yet
91+
if !routingExecuted {
9992
return map[string]*framework.SchedulerProfile{
100-
PrefixProfileName: profiles[PrefixProfileName],
93+
LatencyRoutingProfileName: profiles[LatencyRoutingProfileName],
10194
}
10295
}
103-
104-
// otherwise, return only the SLO profile to be executed next
105-
return map[string]*framework.SchedulerProfile{
106-
LatencyRoutingProfileName: profiles[LatencyRoutingProfileName],
96+
} else {
97+
_, defaultExecuted := profileResults[NoLatencyRoutingProfileName]
98+
// predictorBasedScheduling is off, and NoLatencyRoutingProfileName profile has not been executed yet
99+
if !defaultExecuted {
100+
return map[string]*framework.SchedulerProfile{
101+
NoLatencyRoutingProfileName: profiles[NoLatencyRoutingProfileName],
102+
}
107103
}
108104
}
109105

110-
// If predictor based scheduling is not requested, proceed with only default profile
111-
return map[string]*framework.SchedulerProfile{
112-
NoLatencyRoutingProfileName: profiles[NoLatencyRoutingProfileName],
113-
}
106+
// all previous profiles have been executed, nothing more to run
107+
return map[string]*framework.SchedulerProfile{}
114108
}
115109

116110
// ProcessResults handles the outcome of the profile runs after all profiles ran.
@@ -119,16 +113,12 @@ func (h *SLOAwareProfileHandler) Pick(ctx context.Context, _ *types.CycleState,
119113
// When a profile run fails, its result in the profileResults map is nil.
120114
func (h *SLOAwareProfileHandler) ProcessResults(ctx context.Context, _ *types.CycleState, request *types.LLMRequest, profileResults map[string]*types.ProfileRunResult) (*types.SchedulingResult, error) {
121115

122-
if len(profileResults) < 2 {
123-
return nil, errors.New("SLOAwareProfileHandler requires at least two profiles to operate")
124-
}
125-
126-
predictorBasedScheduling, err := parseBoolHeader(*request, PreictionBasedSchedulingHeaderKey)
127-
if err != nil {
128-
return nil, fmt.Errorf("error parsing predictorBasedScheduling from header failed to choose scheduling profile: x-prediction-based-scheduling must be a bool: %v", err)
129-
}
116+
predictorBasedScheduling := !isHeaderPresent(*request, PreictionBasedSchedulingHeaderKey)
130117

131118
if predictorBasedScheduling { // TODO grab header directly from request.Headers instead of request field
119+
if len(profileResults) < 2 {
120+
return nil, errors.New("SLOAwareProfileHandler requires at least two profiles to operate when predictorBasedScheduling is true")
121+
}
132122
if profileResults[LatencyRoutingProfileName] == nil { // there was an error while running the SLO profile
133123
return nil, fmt.Errorf("failed to run scheduler profile '%s'", LatencyRoutingProfileName)
134124
}
@@ -137,6 +127,9 @@ func (h *SLOAwareProfileHandler) ProcessResults(ctx context.Context, _ *types.Cy
137127
PrimaryProfileName: LatencyRoutingProfileName,
138128
}, nil
139129
}
130+
if len(profileResults) < 1 {
131+
return nil, errors.New("SLOAwareProfileHandler requires at least one profiles to operate when predictorBasedScheduling is false")
132+
}
140133

141134
if profileResults[NoLatencyRoutingProfileName] == nil { // there was an error while running the default profile
142135
return nil, fmt.Errorf("failed to run scheduler profile '%s'", NoLatencyRoutingProfileName)
@@ -148,21 +141,9 @@ func (h *SLOAwareProfileHandler) ProcessResults(ctx context.Context, _ *types.Cy
148141
}, nil
149142
}
150143

151-
// parseFloatHeader retrieves a header by name, parses it as a bool,
152-
// and returns the value or an error if the header is missing or invalid.
153-
func parseBoolHeader(request types.LLMRequest, headerName string) (bool, error) {
144+
// isHeaderPresent checks if a header key exists in the request headers map.
145+
func isHeaderPresent(request types.LLMRequest, headerName string) bool {
154146
// 1. Get header value from the map
155-
headerValue, ok := request.Headers[headerName]
156-
if !ok {
157-
return false, nil // Header not found, return 0 and false
158-
}
159-
160-
// 2. Parse the header value to a bool
161-
parsedBool, err := strconv.ParseBool(headerValue)
162-
if err != nil {
163-
return false, fmt.Errorf("must be a bool: %v", headerName)
164-
}
165-
166-
// 3. Return the successfully parsed value
167-
return parsedBool, nil
147+
_, ok := request.Headers[headerName]
148+
return ok
168149
}

site-src/guides/latency-based-predictor.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Latency-based routing is a feature of the Inference Gateway that enables intelli
88

99
The latency-based routing feature is implemented as a plugin for the Endpoint Picker (EPP). When a request is received, the plugin performs the following steps:
1010

11-
1. **SLO Extraction**: The plugin extracts the TTFT and TPOT SLOs from the request headers (`x-slo-ttft-ms` and `x-slo-tpot-ms`). It also checks for the `x-prediction-based-scheduling` header to determine if latency-based routing should be used for this request.
11+
1. **SLO Extraction**: The plugin extracts the TTFT and TPOT SLOs from the request headers (`x-slo-ttft-ms` and `x-slo-tpot-ms`). It also checks for the `x-prediction-based-scheduling-off` header to determine if latency-based routing should be used for this request.
1212

1313
2. **Latency Prediction**: The plugin uses a latency predictor, deployed as a set of sidecar containers to the EPP, to predict the TTFT and TPOT for the request on each of the available model servers. The prediction is based on the current state of the server, including its KV cache utilization, and the number of running and waiting requests.
1414

@@ -22,7 +22,7 @@ The latency-based routing feature is implemented as a plugin for the Endpoint Pi
2222

2323
To use latency-based routing, you need to include the following headers in your inference requests:
2424

25-
- `x-prediction-based-scheduling`: Set to `true` to enable latency-based routing for the request, setting this to false or omiting the header will use non-SLO routing, but will still use the latency data to train the predictor.
25+
- `x-prediction-based-scheduling-off`: Include this header to disable predictive routing for that specific request. If omitted, predictive routing is enabled by default.
2626
- `x-slo-ttft-ms`: The Time to First Token SLO in milliseconds.
2727
- `x-slo-tpot-ms`: The Time Per Output Token SLO in milliseconds (this is vLLMs equivalent of ITL, is it **not** NTPOT).
2828

@@ -78,7 +78,7 @@ If you have a standard setup via using the [Getting Started Guide](getting-start
7878
```txt
7979
export GW_IP=$(kubectl get gateway/inference-gateway -o jsonpath='{.status.addresses[0].value}'):80
8080
81-
curl -v $GW_IP/v1/completions -H 'Content-Type: application/json' -H 'x-slo-ttft-ms: 100' -H 'x-slo-tpot-ms: 100' -H 'x-prediction-based-scheduling: true' -d '{
81+
curl -v $GW_IP/v1/completions -H 'Content-Type: application/json' -H 'x-slo-ttft-ms: 100' -H 'x-slo-tpot-ms: 100' -d '{
8282
"model": "meta-llama/Llama-3.1-8B-Instruct",
8383
"prompt": "Write as if you were a critic: San Francisco where the ",
8484
"max_tokens": 100,

0 commit comments

Comments
 (0)