New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Free APF seats for watches handled by an aggregated apiserver. #105511
Conversation
Welcome @benluddy! |
Hi @benluddy. Thanks for your PR. I'm waiting for a kubernetes member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/ok-to-test |
/triage accepted |
@@ -123,6 +124,8 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { | |||
return | |||
} | |||
|
|||
utilflowcontrol.RequestDelegated(req.Context()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it make more sense to do this here rather than after the cost has been paid to get the new transport ready, just before the proxy connection is made around line 190?
If we want to account for the cost of a request, that seems to be more in keeping with accounting for as much of the cost as-is reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@deads2k could you please explain what you mean by the cost of getting the new transport ready?
Other than the placement of the call, this lgtm |
459daf9
to
96b0965
Compare
e0bd6d9
to
56db85d
Compare
@@ -175,6 +176,12 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { | |||
handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w}) | |||
handler.InterceptRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects) | |||
handler.RequireSameHostRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ValidateProxyRedirects) | |||
|
|||
if upgrade { | |||
// Don't block the delegation signal on upgrade direct dialing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should unblock the non-upgrade requests as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need it since we have ?
return dial(req, h.Transport) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Upgrades dial the backend without calling RoundTrip. The RoundTripper is passed here, but only to use a custom Dialer (e.g. if configured here
kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go
Lines 280 to 291 in 0cd75e8
if r.egressSelector != nil { | |
networkContext := egressselector.Cluster.AsNetworkContext() | |
var egressDialer utilnet.DialFunc | |
egressDialer, err := r.egressSelector.Lookup(networkContext) | |
if err != nil { | |
klog.Warning(err.Error()) | |
} else { | |
newInfo.restConfig.Dial = egressDialer | |
} | |
} else if r.proxyTransport != nil && r.proxyTransport.DialContext != nil { | |
newInfo.restConfig.Dial = r.proxyTransport.DialContext | |
} |
*http.Transport
or when it can be unwrapped to *http.Transport
via WrappedRoundTripper()
(in kubernetes/staging/src/k8s.io/apimachinery/pkg/util/net/http.go
Lines 215 to 239 in 0cd75e8
func DialerFor(transport http.RoundTripper) (DialFunc, error) { | |
if transport == nil { | |
return nil, nil | |
} | |
switch transport := transport.(type) { | |
case *http.Transport: | |
// transport.DialContext takes precedence over transport.Dial | |
if transport.DialContext != nil { | |
return transport.DialContext, nil | |
} | |
// adapt transport.Dial to the DialWithContext signature | |
if transport.Dial != nil { | |
return func(ctx context.Context, net, addr string) (net.Conn, error) { | |
return transport.Dial(net, addr) | |
}, nil | |
} | |
// otherwise return nil | |
return nil, nil | |
case RoundTripperWrapper: | |
return DialerFor(transport.WrappedRoundTripper()) | |
default: | |
return nil, fmt.Errorf("unknown transport type: %T", transport) | |
} | |
} |
This special handling for upgrade avoids the case where a dial for upgrade takes a long time. From my understanding, all other requests dial via (*http.Transport).RoundTrip(...)
.
} | ||
|
||
func (fcrt *flowcontrolRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { | ||
utilflowcontrol.RequestDelegated(r.Context()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, okay this is the code that will unblock non-upgrade requests
@@ -271,3 +278,16 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationv1api.APIServ | |||
} | |||
r.handlingInfo.Store(newInfo) | |||
} | |||
|
|||
type flowcontrolRoundTripper struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a comment explaining the purpose of this custom RT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please make sure we don't need the CancelRequest
method.
for some reason, you had to provide the WrappedRoundTripper
method
case utilnet.RoundTripperWrapper: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do. I responded in another comment (https://github.com/kubernetes/kubernetes/pull/105511/files#r729021620) with the reason for implementing RoundTripperWrapper.
// boundaries, so we generously fire it as soon as we know | ||
// that the request won't be serviced locally. Safe to call | ||
// for non-watch requests. | ||
WatchInitialized(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens with a request calling this function on a cluster with disabled p&f?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this method infer that the request was a watch request? or do we need to call it only for watch requests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's always safe to call because it's a no-op unless the context contains a signal value. Same with the feature gate disabled -- without the p&f filter, the signal value won't be added to the context.
@@ -160,7 +161,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { | |||
return | |||
} | |||
|
|||
proxyRoundTripper := handlingInfo.proxyRoundTripper | |||
var proxyRoundTripper http.RoundTripper = &flowcontrolRoundTripper{rt: handlingInfo.proxyRoundTripper} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if I didn't prefer the previous simple version, for the reason that we want to cherrypick it.
How about just in the first commit simply calling RequestDelegated() before calling ServeHTTP, cherrypicking this one and then improving further at head?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's why I asked @deads2k to share more details about the cost of setting up a transport, here #105511 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM. I'd also feel more confident about this second approach with a sanity check from @deads2k. I'll go ahead and split this PR now.
56db85d
to
1873915
Compare
Just pushed the simpler option (#105511 (comment)). @wojtek-t |
This LGTM, but I would prefer to have an explicit ACK from @deads2k on it. /approve /hold |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: benluddy, wojtek-t The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
this lgtm /lgtm |
/retest |
The Kubernetes project has merge-blocking tests that are currently too flaky to consistently pass. This bot retests PRs for certain kubernetes repos according to the following rules:
You can:
/retest |
…5511-upstream-release-1.22 Automated cherry pick of #105511: Free APF seats for watches handled by an aggregated
What type of PR is this?
/kind bug
What this PR does / why we need it:
Fires the APF initialization signal before proxying a request to an aggregated apiserver.
Which issue(s) this PR fixes:
Fixes #105409.
Special notes for your reviewer:
Does this PR introduce a user-facing change?
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.: