Skip to content

Commit 0748486

Browse files
authored
Merge branch 'main' into marcsanmi/profile-id-selector
2 parents 9aa5288 + 1075882 commit 0748486

File tree

5 files changed

+191
-471
lines changed

5 files changed

+191
-471
lines changed

pkg/api/api.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,10 +191,10 @@ func (a *API) RegisterOverridesExporter(oe *exporter.OverridesExporter) {
191191
}
192192

193193
// RegisterDistributor registers the endpoints associated with the distributor.
194-
func (a *API) RegisterDistributor(d *distributor.Distributor, limits *validation.Overrides, multitenancyEnabled bool, cfg server.Config) {
194+
func (a *API) RegisterDistributor(d *distributor.Distributor, limits *validation.Overrides, cfg server.Config) {
195195
writePathOpts := a.registerOptionsWritePath(limits)
196196
pyroscopeHandler := pyroscope.NewPyroscopeIngestHandler(d, a.logger)
197-
otlpHandler := otlp.NewOTLPIngestHandler(cfg, d, a.logger, multitenancyEnabled)
197+
otlpHandler := otlp.NewOTLPIngestHandler(cfg, d, a.logger)
198198

199199
a.RegisterRoute("/ingest", pyroscopeHandler, writePathOpts...)
200200
a.RegisterRoute("/pyroscope/ingest", pyroscopeHandler, writePathOpts...)

pkg/ingester/otlp/ingest_handler.go

Lines changed: 11 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/go-kit/log/level"
1919
"github.com/google/uuid"
2020
"github.com/grafana/dskit/server"
21-
"github.com/grafana/dskit/user"
2221
pprofileotlp "go.opentelemetry.io/proto/otlp/collector/profiles/v1development"
2322
v1 "go.opentelemetry.io/proto/otlp/common/v1"
2423

@@ -33,10 +32,9 @@ import (
3332

3433
type ingestHandler struct {
3534
pprofileotlp.UnimplementedProfilesServiceServer
36-
svc PushService
37-
log log.Logger
38-
handler http.Handler
39-
multitenancyEnabled bool
35+
svc PushService
36+
log log.Logger
37+
handler http.Handler
4038
}
4139

4240
type Handler interface {
@@ -48,11 +46,10 @@ type PushService interface {
4846
PushBatch(ctx context.Context, req *distirbutormodel.PushRequest) error
4947
}
5048

51-
func NewOTLPIngestHandler(cfg server.Config, svc PushService, l log.Logger, multitenancyEnabled bool) Handler {
49+
func NewOTLPIngestHandler(cfg server.Config, svc PushService, l log.Logger) Handler {
5250
h := &ingestHandler{
53-
svc: svc,
54-
log: l,
55-
multitenancyEnabled: multitenancyEnabled,
51+
svc: svc,
52+
log: l,
5653
}
5754

5855
grpcServer := newGrpcServer(cfg)
@@ -109,52 +106,10 @@ func (h *ingestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
109106
h.handler.ServeHTTP(w, r)
110107
}
111108

112-
// extractTenantIDFromHTTPRequest extracts the tenant ID from HTTP request headers.
113-
// If multitenancy is disabled, it injects the default tenant ID.
114-
// Returns a context with the tenant ID injected.
115-
func (h *ingestHandler) extractTenantIDFromHTTPRequest(r *http.Request) (context.Context, error) {
116-
if !h.multitenancyEnabled {
117-
return user.InjectOrgID(r.Context(), tenant.DefaultTenantID), nil
118-
}
119-
120-
_, ctx, err := user.ExtractOrgIDFromHTTPRequest(r)
121-
if err != nil {
122-
return nil, err
123-
}
124-
return ctx, nil
125-
}
126-
127-
// extractTenantIDFromGRPCRequest extracts the tenant ID from a gRPC request context.
128-
// If multitenancy is disabled, it injects the default tenant ID.
129-
// Returns a context with the tenant ID injected.
130-
func (h *ingestHandler) extractTenantIDFromGRPCRequest(ctx context.Context) (context.Context, error) {
131-
// TODO: ideally should be merged with function above
132-
if !h.multitenancyEnabled {
133-
return user.InjectOrgID(ctx, tenant.DefaultTenantID), nil
134-
}
135-
136-
_, ctx, err := user.ExtractFromGRPCRequest(ctx)
137-
if err != nil {
138-
return nil, err
139-
}
140-
return ctx, nil
141-
}
142-
143109
func (h *ingestHandler) handleHTTPRequest(w http.ResponseWriter, r *http.Request) {
144110
defer r.Body.Close()
145111

146-
ctx, err := h.extractTenantIDFromHTTPRequest(r)
147-
if err != nil {
148-
level.Error(h.log).Log("msg", "failed to extract tenant ID from HTTP request", "err", err)
149-
http.Error(w, "Failed to extract tenant ID from HTTP request", http.StatusUnauthorized)
150-
return
151-
}
152-
153-
// Read the request body - we need to read it all for protobuf unmarshaling
154-
// Note: Protobuf wire format requires reading the entire message to determine field boundaries
155112
var body []byte
156-
157-
// Check if the body is gzip-encoded
158113
if strings.EqualFold(r.Header.Get("Content-Encoding"), "gzip") {
159114
gzipReader, gzipErr := gzip.NewReader(r.Body)
160115
if gzipErr != nil {
@@ -181,7 +136,6 @@ func (h *ingestHandler) handleHTTPRequest(w http.ResponseWriter, r *http.Request
181136
}
182137
}
183138

184-
// Unmarshal the protobuf request
185139
req := &pprofileotlp.ExportProfilesServiceRequest{}
186140

187141
if r.Header.Get("Content-Type") == "application/json" {
@@ -198,11 +152,9 @@ func (h *ingestHandler) handleHTTPRequest(w http.ResponseWriter, r *http.Request
198152
}
199153
}
200154

201-
// Process the request using the existing export method
202-
resp, err := h.export(ctx, req)
155+
resp, err := h.export(r.Context(), req)
203156
if err != nil {
204157
level.Error(h.log).Log("msg", "failed to process profiles", "err", err)
205-
// Convert gRPC status to HTTP status
206158
st, ok := status.FromError(err)
207159
if ok {
208160
switch st.Code() {
@@ -221,38 +173,29 @@ func (h *ingestHandler) handleHTTPRequest(w http.ResponseWriter, r *http.Request
221173
return
222174
}
223175

224-
// Marshal the response
225176
respBytes, err := proto.Marshal(resp)
226177
if err != nil {
227178
level.Error(h.log).Log("msg", "failed to marshal response", "err", err)
228179
http.Error(w, "Failed to marshal response", http.StatusInternalServerError)
229180
return
230181
}
231182

232-
// Write the response
233183
w.Header().Set("Content-Type", "application/x-protobuf")
234184
w.WriteHeader(http.StatusOK)
235185
if _, err := w.Write(respBytes); err != nil {
236186
level.Error(h.log).Log("msg", "failed to write response", "err", err)
237187
}
238188
}
239189

240-
// Export is the gRPC handler for the ProfilesService Export RPC.
241-
// Extracts tenant ID from gRPC request metadata before processing.
242190
func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfilesServiceRequest) (*pprofileotlp.ExportProfilesServiceResponse, error) {
243-
// Extract tenant ID from gRPC request
244-
ctx, err := h.extractTenantIDFromGRPCRequest(ctx)
245-
if err != nil {
246-
level.Error(h.log).Log("msg", "failed to extract tenant ID from gRPC request", "err", err)
247-
return &pprofileotlp.ExportProfilesServiceResponse{}, fmt.Errorf("failed to extract tenant ID from gRPC request: %w", err)
248-
}
249-
250191
return h.export(ctx, er)
251192
}
252193

253-
// export is the common implementation for processing OTLP profile export requests.
254-
// The context must already have the tenant ID injected before calling this method.
255194
func (h *ingestHandler) export(ctx context.Context, er *pprofileotlp.ExportProfilesServiceRequest) (*pprofileotlp.ExportProfilesServiceResponse, error) {
195+
_, err := tenant.ExtractTenantIDFromContext(ctx)
196+
if err != nil {
197+
return &pprofileotlp.ExportProfilesServiceResponse{}, status.Errorf(codes.Unauthenticated, "failed to extract tenant ID from context: %s", err.Error())
198+
}
256199

257200
dc := er.Dictionary
258201
if dc == nil {

0 commit comments

Comments
 (0)