Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client-go/workqueue: Drain work queue on shutdown #101928

Merged

Conversation

alexanderConstantinescu
Copy link
Member

What type of PR is this?

/kind api-change

What this PR does / why we need it:

This PR implements a drained shutdown of the workqueue, in line with what the doc bloc mention for Shutdown in both delaying_queue.go and queue.go. This drained shutdown was not the case today (i.e: the doc bloc were wrong). The underlying functional reason for this PR is that it would be better if controllers finished executing their syncHandler before shutting down completely, some controllers might depend (or are at least better off) being able to terminate that sync term, processing the result and updating whatever data store they depend on, before terminating completely.

This PR assumes that whatever is put on the queue will also terminate at one point (I am not sure if this is a valid assumption), i.e: that no syncHandler will indefinitely execute. It thus does not implement a timeout when verifying if the queue is still processing.

Moreover, this introduces an API change to the client-go's workqueue library. I am thus not sure if this requires additional work besides this PR?

Assigning to the people whom - it seems - have most frequently come in contact with the workqueue.

/assign @deads2k @lavalamp

Which issue(s) this PR fixes:

Fixes #

Special notes for your reviewer:

Does this PR introduce a user-facing change?

Added a `Processing` condition for the workqueue API
Changed `Shutdown` for the workqueue API to wait until the work queue finishes processing all in-flight items.  

Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.:


@k8s-ci-robot k8s-ci-robot added release-note Denotes a PR that will be considered when it comes time to generate release notes. kind/api-change Categorizes issue or PR as related to adding, removing, or otherwise changing an API size/M Denotes a PR that changes 30-99 lines, ignoring generated files. cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. do-not-merge/needs-sig Indicates an issue or PR lacks a `sig/foo` label and requires one. needs-triage Indicates an issue or PR lacks a `triage/foo` label and requires one. labels May 12, 2021
@k8s-ci-robot
Copy link
Contributor

Hi @alexanderConstantinescu. Thanks for your PR.

I'm waiting for a kubernetes member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@k8s-ci-robot k8s-ci-robot added needs-priority Indicates a PR lacks a `priority/foo` label and requires one. needs-ok-to-test Indicates a PR that requires an org member to verify it is safe to test. labels May 12, 2021
@k8s-ci-robot k8s-ci-robot added sig/api-machinery Categorizes an issue or PR as relevant to SIG API Machinery. and removed do-not-merge/needs-sig Indicates an issue or PR lacks a `sig/foo` label and requires one. labels May 12, 2021
@fejta-bot
Copy link

This PR may require API review.

If so, when the changes are ready, complete the pre-review checklist and request an API review.

Status of requested reviews is tracked in the API Review project.

@alexanderConstantinescu
Copy link
Member Author

Thinking about this further, I am not sure if this is considered a bug: that would depend what Kubernetes defines is its contract to the user. If the doc bloc for public API functions is considered a contract, then this could qualify as a bug, since the implementation does not up-hold its publicly stated contract.

@caesarxuchao
Copy link
Member

/triage accepted
/cc @yliaog

@k8s-ci-robot k8s-ci-robot requested a review from yliaog May 13, 2021 20:07
@k8s-ci-robot k8s-ci-robot added triage/accepted Indicates an issue or PR is ready to be actively worked on. and removed needs-triage Indicates an issue or PR lacks a `triage/foo` label and requires one. labels May 13, 2021
@alexanderConstantinescu
Copy link
Member Author

/remove-kind api-change

since the new version of this patch does not expose Processing through the interface and has the API remain the same.

@k8s-ci-robot k8s-ci-robot added do-not-merge/needs-kind Indicates a PR lacks a `kind/foo` label and requires one. and removed kind/api-change Categorizes issue or PR as related to adding, removing, or otherwise changing an API labels May 13, 2021
@k8s-ci-robot k8s-ci-robot added the sig/apps Categorizes an issue or PR as relevant to SIG Apps. label Jun 15, 2021
@alexanderConstantinescu alexanderConstantinescu force-pushed the drain-workqueue branch 2 times, most recently from e0f73fa to 051204d Compare June 15, 2021 22:44
@alexanderConstantinescu
Copy link
Member Author

/test pull-kubernetes-unit

@@ -178,19 +185,64 @@ func (q *Type) Done(item interface{}) {
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
} else if q.processing.len() == 0 {
q.cond.Signal()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to reason through this. In a typical usage we have multiple go routines doing .Get which has a .wait today. The .signal from an Add and here generally ensure that when an item is added to the queue we have a matching .signal for every .wait. This is leveraged in .Get so that a signal with an empty queue indicates that there is no more work to do.

This new signal appears to used to indicate that there is no processing. Now the .Get calls will continue returning items until the queue is empty, then they .wait.

If we have two workers doing Gets and one call to Shudown and item/A I think we can a situation like....

  1. Get/1 receives item/A
  2. Get/2 enters and waits
  3. Shutdown is called and enters a wait
  4. we now have two waiters
  5. Done for item/A is called and signals
  6. Get/2 is signaled from Done and exits as shutting down
  7. Get/1 enters and exits as shutting down
  8. Shutdown is never signaled

Is that a flow that can happen in this scenario?

Do you actually need the if-block in here to test for shutdown, number in processing, the number in queue, the number in dirty and broadcast?

If I'm correct, I'd like to see a test trying to force this race and fix it. If I'm incorrect, but the story was plausible to either @alexanderConstantinescu or @lavalamp I think we also need this test. If I'm incorrect, but the story was not plausible, please help me see the flaw.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the story was plausible to either @alexanderConstantinescu or @lavalamp I think we also need this test.

The story is plausible in my opinion. I will write a test for it and verify. If this is true, I suspect doing

else if q.processing.len() == 0 {		
        q.cond.Broadcast()
}

should fix it, as that would lead to:

  1. Done for item/A is called and signals
  2. Get/2 and Shutdown is signaled from Done and exits as shutting down
  3. Get/1 enters and exits as shutting down

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After having thought about this some more and written a small test to verify: I've realized that the scenario presented is not completely correct. The following is the actual chain of events:

  1. Get/1 receives item/A
  2. Get/2 enters and waits
  3. Shutdown is called, broadcasts shuttingDown and enters a wait
  4. Get/2 is signaled from shuttingDown and exits as shutting down
  5. we now have one waiter
  6. Done for item/A is called and signals
  7. Shutdown is signaled from Done and exists

Given that ShutDown always broadcasts to all Gets that they should finish waiting (so, either process one of the remaining items on the queue or just exit if there are none), and also broadcasts to all Adds to stop accepting items: we should be fine.

// ShutDown will cause q to ignore all new items added to it. As soon as the
// worker goroutines have drained the existing items in the queue, they will be
// instructed to exit.
func (q *Type) ShutDown() {
q.shutdown()
for q.isProcessing() && q.shouldDrain() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What prevents us from being in a state where we are not processing, but still have items in our queue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing. One example of this is what I mentioned in the description to this PR:

This PR assumes that whatever is put on the queue will also terminate at one point (I am not sure if this is a valid assumption), i.e: that no syncHandler will indefinitely execute

And if there's a caller (controller) for which that statement does not hold, then we block on ShutDown until the second ctrl+c.

I think for what concerns the logic in queue.go : there is no condition I could think of that would lead us to not process and still have items in our queue.

staging/src/k8s.io/client-go/util/workqueue/queue_test.go Outdated Show resolved Hide resolved
q.ShutDownWithDrain()
}()

// Add as many items as possible until the queue is marked "ShuttingDown"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

State the purpose of this? Is it to exercise a race condition? If so it might be good to have it run this test many times.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this was to ensure a sequence of events where ShutDownWithDrain is called before we start marking items as done - thus "simulating" that we drain the queue / wait for all items to finish processing. I realized that the way of doing this was a bit contrived given my initial goal and I have now re-written it, removing deltaItems: which should make the purpose clearer.

staging/src/k8s.io/client-go/util/workqueue/queue_test.go Outdated Show resolved Hide resolved
q.shutdown()
}

// ShutDownWithDrain will cause q to ignore all new items added to it. As soon
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's important to clarify here: is "draining" about processing every item currently in the queue (including dirty items that will be re-added when a worker exits), or is it about getting every worker goroutine to exit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, "draining" is about processing every item currently in the queue. I have update the doc bloc with (what I hope is) a better wording for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great -- in that case, you need to exercise the case where an item is added while it's being processed, I'm pretty sure such items get left in the queue right now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this what TestAddWhileProcessing does? I've update https://github.com/kubernetes/kubernetes/blob/525a2032a4ed5eab44bcbb5f356ebf151c47fa46/staging/src/k8s.io/client-go/util/workqueue/queue_test.go#L113 to use ShutDownWithDrain and the test passes. Either that test doesn't test what you want me to test, or there's no problem?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you verify the queue was empty when ShutDownWithDrain exited?

I'd prefer adding a very short test that exercised the scenario deterministically.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you verify the queue was empty when ShutDownWithDrain exited?

Yes

I'd prefer adding a very short test that exercised the scenario deterministically.

I've modified both TestAddWhileProcessing and TestBasic to include both shut down mechanisms to the tests and added a verification on the queue length for both tests and shut downs. I did this to TestBasic too, for completeness sake. Let me know if you think this is sufficient / valid.

@alexanderConstantinescu alexanderConstantinescu force-pushed the drain-workqueue branch 3 times, most recently from 7c9f934 to dcec235 Compare July 10, 2021 12:53
@lavalamp
Copy link
Member

OK I wrote a test for the scenario I was worried about and it passes, so great. Can you add this test? Perhaps it is already covered but I don't see it.

func TestQueueDrainageUsingShutDownWithDrainWithDirtyItem(t *testing.T) {
        q := workqueue.New()

        q.Add("foo")
        gotten, _ := q.Get()
        q.Add("foo")

        finishedWG := sync.WaitGroup{}
        finishedWG.Add(1)
        go func() {
                defer finishedWG.Done()
                q.ShutDownWithDrain()
        }()

        // Ensure that ShutDownWithDrain has started and is blocked.
        shuttingDown := false
        for !shuttingDown {
                _, shuttingDown = q.Get()
        }

        // Finish "working".
        q.Done(gotten)

        // `shuttingDown` becomes false because Done caused an item to go back into
        // the queue.
        again, shuttingDown := q.Get()
        if shuttingDown {
                t.Fatalf("should not have been done")
        }
        q.Done(again)

        // Now we are really done.
        _, shuttingDown = q.Get()
        if !shuttingDown {
                t.Fatalf("should have been done")
        }

        finishedWG.Wait()
}

...unfortunately I also ran with -count to look for flakes and if you pass something like 40 it usually hangs. So there is some rare lock up.

@lavalamp
Copy link
Member

Stack trace from a frozen run:

panic: test timed out after 10m0s
                                                                                                         
goroutine 161 [running]:                                                                                                                                                                                           
testing.(*M).startAlarm.func1()                                                                          
        /usr/local/google/home/dbsmith/.gvm/gos/go1.16/src/testing/testing.go:1701 +0xe5                                                                                                                           
created by time.goFunc
        /usr/local/google/home/dbsmith/.gvm/gos/go1.16/src/time/sleep.go:180 +0x45
                                                                                                         
goroutine 1 [chan receive]:                                                                                                                                                                                        
testing.(*T).Run(0xc000001c80, 0x6a3c45, 0x16, 0x6b2d40, 0x490f01)
        /usr/local/google/home/dbsmith/.gvm/gos/go1.16/src/testing/testing.go:1240 +0x2da                                                                                                                          
testing.runTests.func1(0xc000102900)
        /usr/local/google/home/dbsmith/.gvm/gos/go1.16/src/testing/testing.go:1512 +0x78
testing.tRunner(0xc000102900, 0xc000151d68)                                                              
        /usr/local/google/home/dbsmith/.gvm/gos/go1.16/src/testing/testing.go:1194 +0xef                                                                                                                           
testing.runTests(0xc0001380c0, 0x84e780, 0x14, 0x14, 0xc03dc8a4d8509dbf, 0x8bb2d5261e, 0x853820, 0x203000)
        /usr/local/google/home/dbsmith/.gvm/gos/go1.16/src/testing/testing.go:1510 +0x2fe                                                                                                                          
testing.(*M).Run(0xc0001c2000, 0x0)
        /usr/local/google/home/dbsmith/.gvm/gos/go1.16/src/testing/testing.go:1418 +0x1eb
k8s.io/client-go/util/workqueue.TestMain(0xc0001c2000)                   
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/main_test.go:28 +0x93       
main.main()                                                                                              
        _testmain.go:89 +0x165                                                                                                                                                                                     

goroutine 19 [chan receive]:
k8s.io/klog/v2.(*loggingT).flushDaemon(0x853980)                                                         
        /usr/local/google/home/dbsmith/.gvm/pkgsets/go1.16/global/pkg/mod/k8s.io/klog/v2@v2.9.0/klog.go:1169 +0x8b     
created by k8s.io/klog/v2.init.0                                                                         
        /usr/local/google/home/dbsmith/.gvm/pkgsets/go1.16/global/pkg/mod/k8s.io/klog/v2@v2.9.0/klog.go:420 +0xdf     

goroutine 10 [sync.Cond.Wait]:
sync.runtime_notifyListWait(0xc0000122d0, 0xc000000002)                  
        /usr/local/google/home/dbsmith/.gvm/gos/go1.16/src/runtime/sema.go:513 +0xf8                                                                                                                               
sync.(*Cond).Wait(0xc0000122c0)                                                                          
        /usr/local/google/home/dbsmith/.gvm/gos/go1.16/src/sync/cond.go:56 +0x99                                                                                                                                   
k8s.io/client-go/util/workqueue.(*Type).wait(0xc000030a20)
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/queue.go:224 +0x7a
k8s.io/client-go/util/workqueue.(*Type).ShutDownWithDrain(0xc000030a20)
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/queue.go:209 +0x50
k8s.io/client-go/util/workqueue_test.TestAddWhileProcessing(0xc000001c80)
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/queue_test.go:143 +0x1ac
testing.tRunner(0xc000001c80, 0x6b2d40)
        /usr/local/google/home/dbsmith/.gvm/gos/go1.16/src/testing/testing.go:1194 +0xef
created by testing.(*T).Run
        /usr/local/google/home/dbsmith/.gvm/gos/go1.16/src/testing/testing.go:1239 +0x2b3

goroutine 25 [select]:
k8s.io/client-go/util/workqueue.(*delayingType).waitingLoop(0xc00012a480)
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go:231 +0x3df
created by k8s.io/client-go/util/workqueue.newDelayingQueue
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go:68 +0x185

goroutine 31 [select]:
k8s.io/client-go/util/workqueue.(*delayingType).waitingLoop(0xc00012ae40)
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go:231 +0x3df
created by k8s.io/client-go/util/workqueue.newDelayingQueue
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go:68 +0x185

goroutine 9 [chan receive]:
k8s.io/client-go/util/workqueue.(*Type).updateUnfinishedWorkLoop(0xc0000303c0)
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/queue.go:256 +0xac
created by k8s.io/client-go/util/workqueue.newQueue
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/queue.go:63 +0x154

goroutine 36 [select]:
k8s.io/client-go/util/workqueue.(*delayingType).waitingLoop(0xc00020e4e0)
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go:231 +0x3df
created by k8s.io/client-go/util/workqueue.newDelayingQueue
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go:68 +0x185

goroutine 52 [select]:
k8s.io/client-go/util/workqueue.(*delayingType).waitingLoop(0xc00012b6e0)
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go:231 +0x3df
created by k8s.io/client-go/util/workqueue.newDelayingQueue
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go:68 +0x185

goroutine 99 [select]:
k8s.io/client-go/util/workqueue.(*delayingType).waitingLoop(0xc00020e8a0)
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go:231 +0x3df
created by k8s.io/client-go/util/workqueue.newDelayingQueue
        /usr/local/google/home/dbsmith/code/k4/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go:68 +0x185
FAIL    k8s.io/client-go/util/workqueue 600.032s

@alexanderConstantinescu
Copy link
Member Author

alexanderConstantinescu commented Aug 20, 2021

...unfortunately I also ran with -count to look for flakes and if you pass something like 40 it usually hangs. So there is some rare lock up.

Indeed. It's now been fixed.

Background: I modified the test case: TestAddWhileProcessing to test the scenario for both shut down mechanism, and it was failing sometimes for ShutDownWithDrain. The reason as to why it was failing was because there was a slight window in https://github.com/kubernetes/kubernetes/blob/dcec2352c0e0b56854e37d375cc3fac5718e18c5/staging/src/k8s.io/client-go/util/workqueue/queue.go#L208-L210 which allowed for the following condition to occur.

  1. The last processing item is still in the processing set, hence q.isProcessing returns true
  2. The worker go-routine finishes the item up and calls Done on it
  3. q.wait starts waiting for any item to finish up and being called Done on, but since the queue now is empty: that never happens.

A very subtle lock up, as mentioned, since the window allowing for this to happen is needless to say: small.

I've fixed this by now making sure that we do not wait if the queue is empty.

The tests seem good now:

$ go test -timeout 180s k8s.io/client-go/util/workqueue -count 100 -race 
ok      k8s.io/client-go/util/workqueue 107.167s

…n shutting down

Signed-off-by: Alexander Constantinescu <aconstan@redhat.com>
@pacoxu
Copy link
Member

pacoxu commented Sep 3, 2021

/kind feature

@k8s-ci-robot k8s-ci-robot added kind/feature Categorizes issue or PR as related to a new feature. and removed do-not-merge/needs-kind Indicates a PR lacks a `kind/foo` label and requires one. labels Sep 3, 2021
@lavalamp
Copy link
Member

/approve
/lgtm

OK, I believe the tests are good now, I think we caught all problems.

@k8s-ci-robot k8s-ci-robot added the lgtm "Looks good to me", indicates that a PR is ready to be merged. label Sep 10, 2021
@k8s-ci-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: alexanderConstantinescu, lavalamp

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@k8s-ci-robot k8s-ci-robot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Sep 10, 2021
@k8s-ci-robot k8s-ci-robot merged commit 8ac9526 into kubernetes:master Sep 11, 2021
@k8s-ci-robot k8s-ci-robot added this to the v1.23 milestone Sep 11, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. kind/feature Categorizes issue or PR as related to a new feature. lgtm "Looks good to me", indicates that a PR is ready to be merged. needs-priority Indicates a PR lacks a `priority/foo` label and requires one. ok-to-test Indicates a non-member PR verified by an org member that is safe to test. release-note Denotes a PR that will be considered when it comes time to generate release notes. sig/api-machinery Categorizes an issue or PR as relevant to SIG API Machinery. sig/apps Categorizes an issue or PR as relevant to SIG Apps. size/L Denotes a PR that changes 100-499 lines, ignoring generated files. triage/accepted Indicates an issue or PR is ready to be actively worked on.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants