New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
client-go/workqueue: Drain work queue on shutdown #101928
client-go/workqueue: Drain work queue on shutdown #101928
Conversation
Hi @alexanderConstantinescu. Thanks for your PR. I'm waiting for a kubernetes member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
This PR may require API review. If so, when the changes are ready, complete the pre-review checklist and request an API review. Status of requested reviews is tracked in the API Review project. |
Thinking about this further, I am not sure if this is considered a bug: that would depend what Kubernetes defines is its contract to the user. If the doc bloc for public API functions is considered a contract, then this could qualify as a bug, since the implementation does not up-hold its publicly stated contract. |
/triage accepted |
48f9c81
to
347d5be
Compare
/remove-kind api-change since the new version of this patch does not expose |
347d5be
to
22f2cf3
Compare
f9c6927
to
75e701f
Compare
e0f73fa
to
051204d
Compare
/test pull-kubernetes-unit |
@@ -178,19 +185,64 @@ func (q *Type) Done(item interface{}) { | |||
if q.dirty.has(item) { | |||
q.queue = append(q.queue, item) | |||
q.cond.Signal() | |||
} else if q.processing.len() == 0 { | |||
q.cond.Signal() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to reason through this. In a typical usage we have multiple go routines doing .Get
which has a .wait
today. The .signal
from an Add and here generally ensure that when an item is added to the queue we have a matching .signal
for every .wait
. This is leveraged in .Get
so that a signal with an empty queue indicates that there is no more work to do.
This new signal appears to used to indicate that there is no processing. Now the .Get
calls will continue returning items until the queue is empty, then they .wait
.
If we have two workers doing Gets and one call to Shudown and item/A I think we can a situation like....
- Get/1 receives item/A
- Get/2 enters and waits
- Shutdown is called and enters a wait
- we now have two waiters
- Done for item/A is called and signals
- Get/2 is signaled from Done and exits as shutting down
- Get/1 enters and exits as shutting down
- Shutdown is never signaled
Is that a flow that can happen in this scenario?
Do you actually need the if-block in here to test for shutdown, number in processing, the number in queue, the number in dirty and broadcast?
If I'm correct, I'd like to see a test trying to force this race and fix it. If I'm incorrect, but the story was plausible to either @alexanderConstantinescu or @lavalamp I think we also need this test. If I'm incorrect, but the story was not plausible, please help me see the flaw.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the story was plausible to either @alexanderConstantinescu or @lavalamp I think we also need this test.
The story is plausible in my opinion. I will write a test for it and verify. If this is true, I suspect doing
else if q.processing.len() == 0 {
q.cond.Broadcast()
}
should fix it, as that would lead to:
- Done for item/A is called and signals
- Get/2 and Shutdown is signaled from Done and exits as shutting down
- Get/1 enters and exits as shutting down
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After having thought about this some more and written a small test to verify: I've realized that the scenario presented is not completely correct. The following is the actual chain of events:
- Get/1 receives item/A
- Get/2 enters and waits
- Shutdown is called, broadcasts
shuttingDown
and enters a wait - Get/2 is signaled from
shuttingDown
and exits as shutting down - we now have one waiter
- Done for item/A is called and signals
- Shutdown is signaled from Done and exists
Given that ShutDown
always broadcasts to all Get
s that they should finish waiting (so, either process one of the remaining items on the queue or just exit if there are none), and also broadcasts to all Add
s to stop accepting items: we should be fine.
// ShutDown will cause q to ignore all new items added to it. As soon as the | ||
// worker goroutines have drained the existing items in the queue, they will be | ||
// instructed to exit. | ||
func (q *Type) ShutDown() { | ||
q.shutdown() | ||
for q.isProcessing() && q.shouldDrain() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What prevents us from being in a state where we are not processing, but still have items in our queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing. One example of this is what I mentioned in the description to this PR:
This PR assumes that whatever is put on the queue will also terminate at one point (I am not sure if this is a valid assumption), i.e: that no syncHandler will indefinitely execute
And if there's a caller (controller) for which that statement does not hold, then we block on ShutDown
until the second ctrl+c.
I think for what concerns the logic in queue.go
: there is no condition I could think of that would lead us to not process and still have items in our queue.
051204d
to
0bf5337
Compare
q.ShutDownWithDrain() | ||
}() | ||
|
||
// Add as many items as possible until the queue is marked "ShuttingDown" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
State the purpose of this? Is it to exercise a race condition? If so it might be good to have it run this test many times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of this was to ensure a sequence of events where ShutDownWithDrain
is called before we start marking items as done - thus "simulating" that we drain the queue / wait for all items to finish processing. I realized that the way of doing this was a bit contrived given my initial goal and I have now re-written it, removing deltaItems
: which should make the purpose clearer.
q.shutdown() | ||
} | ||
|
||
// ShutDownWithDrain will cause q to ignore all new items added to it. As soon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's important to clarify here: is "draining" about processing every item currently in the queue (including dirty items that will be re-added when a worker exits), or is it about getting every worker goroutine to exit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, "draining" is about processing every item currently in the queue. I have update the doc bloc with (what I hope is) a better wording for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great -- in that case, you need to exercise the case where an item is added while it's being processed, I'm pretty sure such items get left in the queue right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this what TestAddWhileProcessing
does? I've update https://github.com/kubernetes/kubernetes/blob/525a2032a4ed5eab44bcbb5f356ebf151c47fa46/staging/src/k8s.io/client-go/util/workqueue/queue_test.go#L113 to use ShutDownWithDrain
and the test passes. Either that test doesn't test what you want me to test, or there's no problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you verify the queue was empty when ShutDownWithDrain exited?
I'd prefer adding a very short test that exercised the scenario deterministically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you verify the queue was empty when ShutDownWithDrain exited?
Yes
I'd prefer adding a very short test that exercised the scenario deterministically.
I've modified both TestAddWhileProcessing
and TestBasic
to include both shut down mechanisms to the tests and added a verification on the queue length for both tests and shut downs. I did this to TestBasic
too, for completeness sake. Let me know if you think this is sufficient / valid.
7c9f934
to
dcec235
Compare
OK I wrote a test for the scenario I was worried about and it passes, so great. Can you add this test? Perhaps it is already covered but I don't see it. func TestQueueDrainageUsingShutDownWithDrainWithDirtyItem(t *testing.T) {
q := workqueue.New()
q.Add("foo")
gotten, _ := q.Get()
q.Add("foo")
finishedWG := sync.WaitGroup{}
finishedWG.Add(1)
go func() {
defer finishedWG.Done()
q.ShutDownWithDrain()
}()
// Ensure that ShutDownWithDrain has started and is blocked.
shuttingDown := false
for !shuttingDown {
_, shuttingDown = q.Get()
}
// Finish "working".
q.Done(gotten)
// `shuttingDown` becomes false because Done caused an item to go back into
// the queue.
again, shuttingDown := q.Get()
if shuttingDown {
t.Fatalf("should not have been done")
}
q.Done(again)
// Now we are really done.
_, shuttingDown = q.Get()
if !shuttingDown {
t.Fatalf("should have been done")
}
finishedWG.Wait()
} ...unfortunately I also ran with -count to look for flakes and if you pass something like 40 it usually hangs. So there is some rare lock up. |
Stack trace from a frozen run:
|
dcec235
to
34f1362
Compare
Indeed. It's now been fixed. Background: I modified the test case:
A very subtle lock up, as mentioned, since the window allowing for this to happen is needless to say: small. I've fixed this by now making sure that we do not wait if the queue is empty. The tests seem good now:
|
…n shutting down Signed-off-by: Alexander Constantinescu <aconstan@redhat.com>
34f1362
to
5b740f4
Compare
/kind feature |
/approve OK, I believe the tests are good now, I think we caught all problems. |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: alexanderConstantinescu, lavalamp The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What type of PR is this?
/kind api-change
What this PR does / why we need it:
This PR implements a drained shutdown of the workqueue, in line with what the doc bloc mention for
Shutdown
in bothdelaying_queue.go
andqueue.go
. This drained shutdown was not the case today (i.e: the doc bloc were wrong). The underlying functional reason for this PR is that it would be better if controllers finished executing theirsyncHandler
before shutting down completely, some controllers might depend (or are at least better off) being able to terminate that sync term, processing the result and updating whatever data store they depend on, before terminating completely.This PR assumes that whatever is put on the queue will also terminate at one point (I am not sure if this is a valid assumption), i.e: that no
syncHandler
will indefinitely execute. It thus does not implement a timeout when verifying if the queue is still processing.Moreover, this introduces an API change to the client-go's workqueue library. I am thus not sure if this requires additional work besides this PR?
Assigning to the people whom - it seems - have most frequently come in contact with the workqueue.
/assign @deads2k @lavalamp
Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
Does this PR introduce a user-facing change?
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.: