Skip to content

Commit a26cf47

Browse files
committed
add DeadlinePriority plugin in intra-flow dispatch policy
1 parent 6ed9419 commit a26cf47

File tree

2 files changed

+134
-0
lines changed

2 files changed

+134
-0
lines changed
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package deadlinepriority
18+
19+
import (
20+
"time"
21+
22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
25+
)
26+
27+
// DeadlinePriorityPolicyName is the name of the deadline-based priority policy implementation.
28+
//
29+
// This policy implements a deadline-urgency scheduling strategy by selecting the request with the earliest absolute
30+
// deadline, computed as `EnqueueTime() + EffectiveTTL()`. Requests without a valid TTL (i.e., EffectiveTTL <= 0) are
31+
// treated as having no deadline and are scheduled after all time-bound requests, using FCFS as a tie-breaker for fairness.
32+
//
33+
// # Behavior and Queue Pairing
34+
//
35+
// The correctness and performance of this policy are tightly coupled to the capabilities of the underlying
36+
// `framework.SafeQueue`:
37+
// - When paired with a `CapabilityPriorityConfigurable` queue (e.g., a heap-based priority queue), the policy provides
38+
// strict deadline-ordered dispatch. The queue uses the policy's vended `ItemComparator` to maintain items in
39+
// urgency-sorted order, ensuring that `PeekHead()` always returns the most urgent request.
40+
// - This policy **MUST NOT** be used with a `CapabilityFIFO` queue (e.g., "ListQueue"), as such queues do not respect
41+
// custom comparators. In that case, `PeekHead()` would return the physically first enqueued item, completely
42+
// ignoring deadlines and violating the policy's semantics.
43+
//
44+
// To enforce correct behavior, this policy explicitly requires `CapabilityPriorityConfigurable` via its
45+
// `RequiredQueueCapabilities()` method. The system will reject any configuration that attempts to bind this policy to
46+
// an incompatible queue.
47+
const DeadlinePriorityPolicyName = "DeadlinePriority"
48+
49+
func init() {
50+
dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(DeadlinePriorityPolicyName),
51+
func() (framework.IntraFlowDispatchPolicy, error) {
52+
return newDeadlinePriorityPolicy(), nil
53+
})
54+
}
55+
56+
// DeadlinePriorityPolicy implements an intra-flow dispatch policy that prioritizes
57+
// requests based on their deadline urgency: the closer the absolute deadline, the higher the priority.
58+
// See the documentation for the exported `DeadlinePriorityPolicyName` constant for detailed behavioral guarantees.
59+
type DeadlinePriorityPolicy struct {
60+
comparator framework.ItemComparator
61+
}
62+
63+
var _ framework.IntraFlowDispatchPolicy = &DeadlinePriorityPolicy{}
64+
65+
func newDeadlinePriorityPolicy() framework.IntraFlowDispatchPolicy {
66+
return &DeadlinePriorityPolicy{
67+
comparator: &deadlinePriorityComparator{},
68+
}
69+
}
70+
71+
func (p *DeadlinePriorityPolicy) Name() string {
72+
return DeadlinePriorityPolicyName
73+
}
74+
75+
// RequiredQueueCapabilities returns an empty slice, indicating that this policy can operate with any queue.
76+
func (p *DeadlinePriorityPolicy) RequiredQueueCapabilities() []framework.QueueCapability {
77+
return []framework.QueueCapability{framework.CapabilityPriorityConfigurable}
78+
}
79+
80+
func (p *DeadlinePriorityPolicy) Comparator() framework.ItemComparator {
81+
return p.comparator
82+
}
83+
84+
// SelectItem selects the next item to dispatch by returning the head of the queue.
85+
// This implementation assumes the underlying queue is ordered according to the policy's comparator
86+
// (enforced by RequiredQueueCapabilities). Therefore, the most urgent request is always at the head.
87+
// Returns (nil, nil) if the queue is empty or nil.
88+
func (p *DeadlinePriorityPolicy) SelectItem(queue framework.FlowQueueAccessor) (selectedItem types.QueueItemAccessor, err error) {
89+
if queue == nil {
90+
return nil, nil
91+
}
92+
return queue.PeekHead(), nil
93+
}
94+
95+
var maxDeadlineTime = time.Unix(1<<63-60, 0)
96+
97+
// calculateDeadline computes the absolute deadline for a request.
98+
// The deadline is defined as the logical enqueue time plus the effective time-to-live (TTL).
99+
// If EffectiveTTL is zero or negative, the request is considered non-time-sensitive and assigned a
100+
// far-future deadline so it sorts after all SLO-bound requests.
101+
func calculateDeadline(item types.QueueItemAccessor) time.Time {
102+
ttl := item.EffectiveTTL()
103+
if ttl <= 0 {
104+
// No TTL, Treat as "never expire", but still respect enqueue time for fairness.
105+
// Return max time so it sorts last.
106+
return maxDeadlineTime
107+
}
108+
return item.EnqueueTime().Add(ttl)
109+
}
110+
111+
type deadlinePriorityComparator struct{}
112+
113+
func (d *deadlinePriorityComparator) Func() framework.ItemComparatorFunc {
114+
return func(a, b types.QueueItemAccessor) bool {
115+
deadlineA := calculateDeadline(a)
116+
deadlineB := calculateDeadline(b)
117+
118+
if !deadlineA.Equal(deadlineB) {
119+
return deadlineA.Before(deadlineB) // earlier deadline = higher priority
120+
}
121+
122+
// Same deadline: FCFS (earlier enqueue time = higher priority)
123+
return a.EnqueueTime().Before(b.EnqueueTime())
124+
}
125+
}
126+
127+
// ScoreType indicates this policy uses deadline-based scoring.
128+
func (d *deadlinePriorityComparator) ScoreType() string {
129+
return string(framework.DeadlineUrgencyPriorityScoreType)
130+
}

pkg/epp/flowcontrol/framework/policies.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ const (
2525
// EnqueueTimePriorityScoreType indicates that the priority is based on the item's enqueue time, with earlier times
2626
// being higher priority.
2727
EnqueueTimePriorityScoreType PriorityScoreType = "enqueue_time_ns_asc"
28+
29+
// DeadlineUrgencyPriorityScoreType indicates that the priority is based on the item's deadline urgency, with more
30+
// urgent deadlines being higher priority.
31+
DeadlineUrgencyPriorityScoreType PriorityScoreType = "deadline_urgency_desc"
2832
)
2933

3034
// ItemComparatorFunc defines the function signature for comparing two `types.QueueItemAccessor` instances to determine

0 commit comments

Comments
 (0)