-
Notifications
You must be signed in to change notification settings - Fork 4.1k
Expand file tree
/
Copy pathsemaphore.cpp
More file actions
173 lines (153 loc) · 6.33 KB
/
semaphore.cpp
File metadata and controls
173 lines (153 loc) · 6.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "butil/memory/scope_guard.h"
#include "bvar/collector.h"
#include "bthread/bthread.h"
#include "bthread/butex.h"
namespace bthread {
// Define in bthread/mutex.cpp
class ContentionProfiler;
extern ContentionProfiler* g_cp;
extern bvar::CollectorSpeedLimit g_cp_sl;
extern bool is_contention_site_valid(const bthread_contention_site_t& cs);
extern void make_contention_site_invalid(bthread_contention_site_t* cs);
extern void submit_contention(const bthread_contention_site_t& csite, int64_t now_ns);
static inline int bthread_sem_trywait(bthread_sem_t* sema) {
auto whole = (butil::atomic<unsigned>*)sema->butex;
while (true) {
unsigned num = whole->load(butil::memory_order_relaxed);
if (num == 0) {
return EAGAIN;
}
if (whole->compare_exchange_weak(num, num - 1,
butil::memory_order_acquire,
butil::memory_order_relaxed)) {
return 0;
}
}
}
static int bthread_sem_wait_impl(bthread_sem_t* sem, const struct timespec* abstime) {
bool queue_lifo = false;
bool first_wait = true;
size_t sampling_range = bvar::INVALID_SAMPLING_RANGE;
// -1: don't sample.
// 0: default value.
// > 0: Start time of sampling.
int64_t start_ns = 0;
auto whole = (butil::atomic<unsigned>*)sem->butex;
while (true) {
unsigned num = whole->load(butil::memory_order_relaxed);
if (num > 0) {
if (whole->compare_exchange_weak(num, num - 1,
butil::memory_order_acquire,
butil::memory_order_relaxed)) {
if (start_ns > 0) {
const int64_t end_ns = butil::cpuwide_time_ns();
const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
bthread::submit_contention(csite, end_ns);
}
return 0;
}
}
// Don't sample when contention profiler is off.
if (NULL != bthread::g_cp && start_ns == 0 && sem->enable_csite &&
!bvar::is_sampling_range_valid(sampling_range)) {
// Ask Collector if this (contended) sem waiting should be sampled.
sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
start_ns = bvar::is_sampling_range_valid(sampling_range) ?
butil::cpuwide_time_ns() : -1;
} else {
start_ns = -1;
}
if (bthread::butex_wait(sem->butex, 0, abstime, queue_lifo) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
// A sema should ignore interruptions in general since
// user code is unlikely to check the return value.
if (ETIMEDOUT == errno && start_ns > 0) {
// Failed to lock due to ETIMEDOUT, submit the elapse directly.
const int64_t end_ns = butil::cpuwide_time_ns();
const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
bthread::submit_contention(csite, end_ns);
}
return errno;
}
// Ignore EWOULDBLOCK and EINTR.
if (first_wait && 0 == errno) {
first_wait = false;
}
if (!first_wait) {
// Normally, bthreads are queued in FIFO order. But competing with new
// arriving bthreads over sema, a woken up bthread has good chances of
// losing. Because new arriving bthreads are already running on CPU and
// there can be lots of them. In such case, for fairness, to avoid
// starvation, it is queued at the head of the waiter queue.
queue_lifo = true;
}
}
}
static inline int bthread_sem_post(bthread_sem_t* sem, size_t num) {
if (num > 0) {
unsigned n = ((butil::atomic<unsigned>*)sem->butex)
->fetch_add(num, butil::memory_order_relaxed);
const size_t sampling_range = NULL != bthread::g_cp && sem->enable_csite ?
bvar::is_collectable(&bthread::g_cp_sl) : bvar::INVALID_SAMPLING_RANGE;
const int64_t start_ns = bvar::is_sampling_range_valid(sampling_range) ?
butil::cpuwide_time_ns() : -1;
bthread::butex_wake_n(sem->butex, n);
if (start_ns > 0) {
const int64_t end_ns = butil::cpuwide_time_ns();
const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
bthread::submit_contention(csite, end_ns);
}
}
return 0;
}
} // namespace bthread
__BEGIN_DECLS
int bthread_sem_init(bthread_sem_t* sem, unsigned value) {
sem->butex = bthread::butex_create_checked<unsigned>();
if (!sem->butex) {
return ENOMEM;
}
*sem->butex = value;
sem->enable_csite = true;
return 0;
}
int bthread_sem_disable_csite(bthread_sem_t* sema) {
sema->enable_csite = false;
return 0;
}
int bthread_sem_destroy(bthread_sem_t* semaphore) {
bthread::butex_destroy(semaphore->butex);
return 0;
}
int bthread_sem_trywait(bthread_sem_t* sem) {
return bthread::bthread_sem_trywait(sem);
}
int bthread_sem_wait(bthread_sem_t* sem) {
return bthread::bthread_sem_wait_impl(sem, NULL);
}
int bthread_sem_timedwait(bthread_sem_t* sem, const struct timespec* abstime) {
return bthread::bthread_sem_wait_impl(sem, abstime);
}
int bthread_sem_post(bthread_sem_t* sem) {
return bthread::bthread_sem_post(sem, 1);
}
int bthread_sem_post_n(bthread_sem_t* sem, size_t n) {
return bthread::bthread_sem_post(sem, n);
}
__END_DECLS