When should sync.Cond be used instead of channel?
Golang provides easy ways for asynchronous processing. As you know, channel is one of the ways and it’s often used. However, channel might not be the right choice when multiple goroutines need to use a shared resource because channel can send data to a single channel receiver. Once the first data is consumed by a goroutine, additional data needs to be added so that other goroutines can work with the same data. It’s not a good design.
For example, we should use sync.Cond
in the following cases.
- Data sender must not be blocked
- Multiple goroutines want to use the same data
- Condition(s) need to be checked periodically
Let’s consider this case. We have a main switch. When the switch is turned on, goroutines start reading data from sensors. Each sensor provides a different API and thus the logic should be separated. So we created 3 gortoueins. Once the switch is turned off, it stops reading the sensor. The state of the switch must periodically be checked.
How to check whether the condition is fulfilled
If we need to know whether the specific condition is fulfilled to process something, it needs to be written for example in the following manner.
for {
mutex.Lock()
for !condition {
// somehow wait here
}
mutex.Unlock()
// Start process
}
We want to check the state after each process. The process is repeatedly executed while the condition is fulfilled. The condition must be checked in a critical section with a mutex because the value for the condition is updated on another goroutine. Let’s say this logic above is for reading a sensor.
When using sync.Cond
, the wait logic can be written in this way.
for {
mutex.Lock()
for !condition {
cond.Wait()
}
mutex.Unlock()
// Start process
}
The cond.Wait()
is awakened by cond.Signal()
or cond.Broadcast()
. The difference is the following.
cond.Signal()
: It awakes a singlecond.Wait()
cond.Broadcast()
: It awakes allcond.Wait()
If we use a channel, we can write it in this way.
for {
s.mutex.Lock()
for s.state == 0 {
s.mutex.Unlock()
<-s.signal
s.mutex.Lock()
}
s.mutex.Unlock()
// Start process
}
It becomes longer than using sync.Cond
but we can write the same logic as using cond.Signal()
. The main difference here is that it’s impossible to notify all the waiting goroutines because a channel can send data to only a single channel at the same time. We have to close the channel if we want to awaken all goroutines where the channel is used. However, the channel can’t be used any more once it’s closed. So it can’t be used for our use case.
Again, even if it takes a long time to finish the process and lots of events occur, the data is not either blocked or discarded when using sync.Cond
. The events are just stored in an array/slice for example without size limitation.
When using a channel, it has a size limitation. The process has to wait for the channel data for each execution. It means that the sender has to send data at the right timing for the receiver.
Implementation example with Channel
Let’s check the code. As mentioned, the value for the condition is updated by another goroutine and it has to be in a critical section with a mutex. Therefore, it’s better to define a struct that does it.
type syncWithChannel struct {
signal chan bool
mutex sync.Mutex
state int
}
func (s *syncWithChannel) Wait() {
s.mutex.Lock()
for s.state == 0 {
s.mutex.Unlock()
<-s.signal
s.mutex.Lock()
}
s.mutex.Unlock()
}
func (s *syncWithChannel) Update(value int) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.state = value
s.signal <- true
}
func withChannel() {
withChannel := &syncWithChannel{
signal: make(chan bool, 10),
}
for i := 0; i < 10; i++ {
go func(i int) {
fmt.Printf("Waiting %d\n", i)
withChannel.Wait()
fmt.Printf("Go %d\n", i)
}(i)
}
fmt.Println("--- Waiting for a second ---")
time.Sleep(time.Second)
for i := 0; i < 5; i++ {
time.Sleep(200 * time.Millisecond)
fmt.Printf("send channel %d\n", i)
withChannel.Update(0)
}
fmt.Println("--- Update state to 1 ---")
for i := 0; i < 5; i++ {
time.Sleep(500 * time.Millisecond)
fmt.Printf("send channel %d\n", i)
withChannel.Update(1)
}
time.Sleep(2 * time.Second)
fmt.Println("--- Close ---")
close(withChannel.signal)
time.Sleep(time.Second)
}
Assume that fmt.Printf("Go %d\n", i)
is a sensor read process. There’s a delay between each output in this example because channel can send data to a single channel at the same time.
Implementation example with sync.Cond
The functions are similar to the previous example. sync.Cond
has Signal()
and Broadcast()
, so I added two update functions.
type condTester struct {
cond *sync.Cond
state int
}
func (c *condTester) Wait() {
c.cond.L.Lock()
defer c.cond.L.Unlock()
for c.state == 0 {
c.cond.Wait()
}
}
func (c *condTester) SignalUpdate(value int) {
c.update(value)
c.cond.Signal()
}
func (c *condTester) BroadcastUpdate(value int) {
c.update(value)
c.cond.Broadcast()
}
func (c *condTester) update(value int) {
c.cond.L.Lock()
defer c.cond.L.Unlock()
c.state = value
}
func withCond() {
condTester := &condTester{}
condTester.cond = sync.NewCond(&sync.Mutex{})
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
fmt.Printf("Waiting %d\n", i)
condTester.Wait()
fmt.Printf("Go %d\n", i)
wg.Done()
}(i)
}
fmt.Println("--- Waiting for a second ---")
time.Sleep(time.Second)
for i := 0; i < 5; i++ {
time.Sleep(200 * time.Millisecond)
fmt.Printf("send signal %d\n", i)
condTester.SignalUpdate(0)
}
fmt.Println("--- Signal Update state to 1 ---")
for i := 0; i < 5; i++ {
time.Sleep(500 * time.Millisecond)
fmt.Printf("send signal %d\n", i)
condTester.SignalUpdate(1)
}
fmt.Println("--- Signal with 0 ---")
condTester.SignalUpdate(0)
time.Sleep(2 * time.Second)
fmt.Println("--- Broadcast ---")
condTester.BroadcastUpdate(1)
wg.Wait()
}
sync.WaitGroup
is used here to show all print results. The program ends without showing them all if sync.WaitGroup
is not used.
The main difference between channel and sync.Cond
is the speed of the last 5 goroutine processes. cond.Broadcast()
awakes all goroutines at once, 5 lines are shown at the same time. If we want to start multiple processes on different goroutines, cond.Broadcast()
is a powerful way.
Bad usage of sync.Cond
I saw a mistake on another website. So, it might be better to show a bad example too.
This is a bad implementation.
func badCondExample() {
var mutex sync.Mutex
cond := sync.NewCond(&mutex)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
fmt.Printf("Waiting %d\n", i)
mutex.Lock()
defer mutex.Unlock()
cond.Wait()
fmt.Printf("Start %d\n", i)
time.Sleep(time.Second)
fmt.Printf("End %d\n", i)
wg.Done()
}(i)
}
time.Sleep(2 * time.Second)
fmt.Println("--- Broadcast ---")
cond.Broadcast()
wg.Wait()
}
cond.Broadcast()
is used in this example to awaken all the goroutines at once. However, it doesn’t start all the processes. Instead, each process waits until the previous process finishes.
Did you find the root cause?
defer
keyword is often used right after mutex.Lock()
so that the mutex is unlocked. However, it’s the root cause here. mutex.Unlock()
is executed after wg.Done()
is called in this example because it’s the end of the function. While it’s locked, other goroutines can’t go forward. Therefore, it takes 10 seconds to finish all the processes.
The correct code looks like this.
mutex.Lock()
cond.Wait()
mutex.Unlock() // Unlock right after wait
To avoid such an error, this waiting logic should be in a separate function. Then, we can use defer keyword without considering such a case.
Comments