0%

Tor的Vegas机制源码解析

sendme_pending_timestamps 每个待确认数据包发送时的时间戳

inflight 已发送但未收到SENDME的cell数

cwnd当前允许飞行的最大cell数

cwnd_min拥塞窗口下限,至少为sendme_inc

cwnd_inc稳态阶段每次允许增长的cwnd单位

cwnd_inc_rate每多少个SENDME更新一次cwnd

next_cc_event剩余多少个SENDME ack后允许执行一次拥塞检测

next_cwnd_event剩余多少个SENDME ack后判断cwnd是否满

in_slow_start是否处于慢启动阶段

cwnd_full 最近是否发生过窗口满的情况

blocked_chan当前电路是否因为本地拥堵被block

sendme_inc每个SENDME确认多少个cells

vegas_params

  • alphas空闲信号阈值
  • beta中度拥塞
  • gamma慢启动退出阈值
  • delta重度拥塞阈值
  • ss_cwnd_max慢启动阶段最大cwnd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/* Update ack counter until next congestion signal event is allowed */
if (cc->next_cc_event)
cc->next_cc_event--;

/* Update ack counter until a full cwnd is processed */
if (cc->next_cwnd_event)
cc->next_cwnd_event--;

/* Compute BDP and RTT. If we did not update, don't run the alg */
if (!congestion_control_update_circuit_estimates(cc, circ)) {
cc->inflight = cc->inflight - cc->sendme_inc;
return 0;
}

/* The queue use is the amount in which our cwnd is above BDP;
* if it is below, then 0 queue use. */
if (vegas_bdp(cc) > cc->cwnd)
queue_use = 0; // This should not happen anymore..
else
queue_use = cc->cwnd - vegas_bdp(cc);

/* Update the full state */
if (cwnd_became_full(cc))
cc->cwnd_full = 1;
else if (cwnd_became_nonfull(cc))
cc->cwnd_full = 0;

当前已经接收到了一个SENDME,于是两个“剩余”事件都减1。

然后进入到congestion_control_update_circuit_estimates函数计算rtt和bdp。

如果bdp没有更新,那将不进行后续的调整算法,直接把inflight减去当前确认掉的,然后返回。

如果bdp大于cwnd,剩余可用为0,否则剩余可用为cwnd - bdp。然后更新cwnd_full状态值。(在inflight和cwnd之间留一个缓冲量gap)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
/**
* Upon receipt of a SENDME, pop the oldest timestamp off the timestamp
* list, and use this to update RTT.
*
* Returns true if circuit estimates were successfully updated, false
* otherwise.
*/
bool
congestion_control_update_circuit_estimates(congestion_control_t *cc,
const circuit_t *circ)
{
uint64_t now_usec = monotime_absolute_usec();

/* Update RTT first, then BDP. BDP needs fresh RTT */
uint64_t curr_rtt_usec = congestion_control_update_circuit_rtt(cc, now_usec);
return congestion_control_update_circuit_bdp(cc, circ, curr_rtt_usec);
}

/**
* Called when we get a SENDME. Updates circuit RTT by pulling off a
* timestamp of when we sent the CIRCWINDOW_INCREMENT-th cell from
* the queue of such timestamps, and comparing that to current time.
*
* Also updates min, max, and EWMA of RTT.
*
* Returns the current circuit RTT in usecs, or 0 if it could not be
* measured (due to clock jump, stall, etc).
*/
STATIC uint64_t
congestion_control_update_circuit_rtt(congestion_control_t *cc,
uint64_t now_usec)
{
uint64_t rtt, ewma_cnt;
uint64_t sent_at_timestamp;

tor_assert(cc);

/* Get the time that we sent the cell that resulted in the other
* end sending this sendme. Use this to calculate RTT */
sent_at_timestamp = dequeue_timestamp(cc->sendme_pending_timestamps);

rtt = now_usec - sent_at_timestamp;

/* Do not update RTT at all if it looks fishy */
if (time_delta_stalled_or_jumped(cc, cc->ewma_rtt_usec, rtt)) {
num_clock_stalls++; /* Accounting */
return 0;
}

ewma_cnt = n_ewma_count(cc);

cc->ewma_rtt_usec = n_count_ewma(rtt, cc->ewma_rtt_usec, ewma_cnt);

if (rtt > cc->max_rtt_usec) {
cc->max_rtt_usec = rtt;
}

if (cc->min_rtt_usec == 0) {
// If we do not have a min_rtt yet, use current ewma
cc->min_rtt_usec = cc->ewma_rtt_usec;
} else if (cc->cwnd == cc->cwnd_min && !cc->in_slow_start) {
// Raise min rtt if cwnd hit cwnd_min. This gets us out of a wedge state
// if we hit cwnd_min due to an abnormally low rtt.
uint64_t new_rtt = percent_max_mix(cc->ewma_rtt_usec, cc->min_rtt_usec,
rtt_reset_pct);

static ratelim_t rtt_notice_limit = RATELIM_INIT(300);
log_fn_ratelim(&rtt_notice_limit, LOG_NOTICE, LD_CIRC,
"Resetting circ RTT from %"PRIu64" to %"PRIu64" due to low cwnd",
cc->min_rtt_usec/1000, new_rtt/1000);

cc->min_rtt_usec = new_rtt;
num_rtt_reset++; /* Accounting */
} else if (cc->ewma_rtt_usec < cc->min_rtt_usec) {
// Using the EWMA for min instead of current RTT helps average out
// effects from other conns
cc->min_rtt_usec = cc->ewma_rtt_usec;
}

return rtt;
}

/**
* Called when we get a SENDME. Updates the bandwidth-delay-product (BDP)
* estimates of a circuit. Several methods of computing BDP are used,
* depending on scenario. While some congestion control algorithms only
* use one of these methods, we update them all because it's quick and easy.
*
* - now_usec is the current monotime in usecs.
* - curr_rtt_usec is the current circuit RTT in usecs. It may be 0 if no
* RTT could bemeasured.
*
* Returns true if we were able to update BDP, false otherwise.
*/
static bool
congestion_control_update_circuit_bdp(congestion_control_t *cc,
const circuit_t *circ,
uint64_t curr_rtt_usec)
{
int chan_q = 0;
unsigned int blocked_on_chan = 0;

tor_assert(cc);

if (CIRCUIT_IS_ORIGIN(circ)) {
/* origin circs use n_chan */
chan_q = circ->n_chan_cells.n;
blocked_on_chan = circ->circuit_blocked_on_n_chan;
} else {
/* Both onion services and exits use or_circuit and p_chan */
chan_q = CONST_TO_OR_CIRCUIT(circ)->p_chan_cells.n;
blocked_on_chan = circ->circuit_blocked_on_p_chan;
}

/* If we have no EWMA RTT, it is because monotime has been stalled
* or messed up the entire time so far. Set our BDP estimates directly
* to current cwnd */
if (!cc->ewma_rtt_usec) {
uint64_t cwnd = cc->cwnd;

tor_assert_nonfatal(cc->cwnd <= cwnd_max);

/* If the channel is blocked, keep subtracting off the chan_q
* until we hit the min cwnd. */
if (blocked_on_chan) {
/* Cast is fine because we're less than int32 */
if (chan_q >= (int64_t)cwnd) {
log_notice(LD_CIRC,
"Clock stall with large chanq: %d %"PRIu64, chan_q, cwnd);
cwnd = cc->cwnd_min;
} else {
cwnd = MAX(cwnd - chan_q, cc->cwnd_min);
}
cc->blocked_chan = 1;
} else {
cc->blocked_chan = 0;
}

cc->bdp = cwnd;

static ratelim_t dec_notice_limit = RATELIM_INIT(300);
log_fn_ratelim(&dec_notice_limit, LOG_NOTICE, LD_CIRC,
"Our clock has been stalled for the entire lifetime of a circuit. "
"Performance may be sub-optimal.");

return blocked_on_chan;
}

/* Congestion window based BDP will respond to changes in RTT only, and is
* relative to cwnd growth. It is useful for correcting for BDP
* overestimation, but if BDP is higher than the current cwnd, it will
* underestimate it.
*
* We multiply here first to avoid precision issues from min_RTT being
* close to ewma RTT. Since all fields are u64, there is plenty of
* room here to multiply first.
*/
cc->bdp = cc->cwnd*cc->min_rtt_usec/cc->ewma_rtt_usec;

/* The orconn is blocked; use smaller of inflight vs SENDME */
if (blocked_on_chan) {
log_info(LD_CIRC, "CC: Streams blocked on circ channel. Chanq: %d",
chan_q);

/* A blocked channel is an immediate congestion signal, but it still
* happens only once per cwnd */
if (!cc->blocked_chan) {
cc->next_cc_event = 0;
cc->blocked_chan = 1;
}
} else {
/* If we were previously blocked, emit a new congestion event
* now that we are unblocked, to re-evaluate cwnd */
if (cc->blocked_chan) {
cc->blocked_chan = 0;
cc->next_cc_event = 0;
log_info(LD_CIRC, "CC: Streams un-blocked on circ channel. Chanq: %d",
chan_q);
}
}

if (cc->next_cc_event == 0) {
if (CIRCUIT_IS_ORIGIN(circ)) {
log_info(LD_CIRC,
"CC: Circuit %d "
"SENDME RTT: %"PRIu64", %"PRIu64", %"PRIu64", %"PRIu64", "
"BDP estimate: %"PRIu64,
CONST_TO_ORIGIN_CIRCUIT(circ)->global_identifier,
cc->min_rtt_usec/1000,
curr_rtt_usec/1000,
cc->ewma_rtt_usec/1000,
cc->max_rtt_usec/1000,
cc->bdp);
} else {
log_info(LD_CIRC,
"CC: Circuit %"PRIu64":%d "
"SENDME RTT: %"PRIu64", %"PRIu64", %"PRIu64", %"PRIu64", "
"%"PRIu64,
CONST_TO_OR_CIRCUIT(circ)->p_chan->global_identifier,
CONST_TO_OR_CIRCUIT(circ)->p_circ_id,
cc->min_rtt_usec/1000,
curr_rtt_usec/1000,
cc->ewma_rtt_usec/1000,
cc->max_rtt_usec/1000,
cc->bdp);
}
}

/* We updated BDP this round if either we had a blocked channel, or
* the curr_rtt_usec was not 0. */
bool ret = (blocked_on_chan || curr_rtt_usec != 0);
if (ret) {
tor_trace(TR_SUBSYS(cc), TR_EV(bdp_update), circ, cc, curr_rtt_usec);
}
return ret;
}

先取出最早的发送时间戳,然后与现在时间戳相减得到这个SENDME的rtt,检查时间是否跳变或则和单调时钟是否失效。

congestion_control_update_circuit_rtt函数:

根据#324 计算ewma_rtt。

当前没有min_rtt,当前rtt作为min_rtt,当前有min_rtt,但是退回到慢启动阶段,用ewma和min加权平均来重置min_rtt

最后返回当前SENDME的rtt。

congestion_control_update_circuit_bdp函数:

成功更新bdp返回1,不能成功更新返回0。

n_chan_cells表示n信道上面排队等待传输的cells

如果当前没有ewma_rtt,说明时间计算上出现混乱,直接通过当前的cwnd来估计BDP。判断当前信道是否之前阻塞,如果目前阻塞,而且等待发送的cells比cwnd大,发Log然后调整cwnd = cwnd_min。否则正常更新cwnd,更新后的下限为cwnd_min。继续标记阻塞。bdp直接估计为cwnd。

如果有ewma_rtt,按照#324更新bdp。

如果现在阻塞了但是之前没阻塞,把next_cc_event赋0,强制进行拥塞检测,然后把之前拥塞信号改为1。

如果当前没有阻塞但之前是阻塞的,把之前拥塞信号改为0,然后立即进行拥塞检测。

拥塞或者rtt进行更新,返回1进行后续vegas算法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
if (cc->in_slow_start) {
if (queue_use < cc->vegas_params.gamma && !cc->blocked_chan) {
/* If the congestion window is not fully in use, skip any
* increment of cwnd in slow start */
if (cc->cwnd_full) {
/* Get the "Limited Slow Start" increment */
uint64_t inc = rfc3742_ss_inc(cc);
cc->cwnd += inc;

// Check if inc is less than what we would do in steady-state
// avoidance. Note that this is likely never to happen
// in practice, but we keep this block and the metrics to make
// sure.
if (inc*SENDME_PER_CWND(cc) <= CWND_INC(cc)*cc->cwnd_inc_rate) {
congestion_control_vegas_exit_slow_start(circ, cc);

cc_stats_vegas_below_ss_inc_floor++;

/* We exited slow start without being blocked */
cc_stats_vegas_ss_csig_blocked_ma =
stats_update_running_avg(cc_stats_vegas_ss_csig_blocked_ma,
0);
}
}
} else {
uint64_t old_cwnd = cc->cwnd;

/* Congestion signal: Set cwnd to gamma threshold */
cc->cwnd = vegas_bdp(cc) + cc->vegas_params.gamma;

/* Compute the percentage we experience a blocked csig vs RTT sig */
if (cc->blocked_chan) {
cc_stats_vegas_ss_csig_blocked_ma =
stats_update_running_avg(cc_stats_vegas_ss_csig_blocked_ma,
100);
} else {
uint64_t cwnd_diff = (old_cwnd > cc->cwnd ? old_cwnd - cc->cwnd : 0);

cc_stats_vegas_ss_csig_blocked_ma =
stats_update_running_avg(cc_stats_vegas_ss_csig_blocked_ma,
0);

/* Account the amount we reduced the cwnd by for the gamma cutoff */
cc_stats_vegas_gamma_drop_ma =
stats_update_running_avg(cc_stats_vegas_gamma_drop_ma,
cwnd_diff);
}

congestion_control_vegas_exit_slow_start(circ, cc);
}

if (cc->cwnd >= cc->vegas_params.ss_cwnd_max) {
cc->cwnd = cc->vegas_params.ss_cwnd_max;
congestion_control_vegas_exit_slow_start(circ, cc);
cc_stats_vegas_above_ss_cwnd_max++;
}

cc_stats_vegas_ss_queue_ma =
stats_update_running_avg(cc_stats_vegas_ss_queue_ma,
queue_use);
}

慢启动阶段:

如果剩余量小于gamma并且没有阻塞,如果cwnd满了,根据rfc3742提高cwnd,然后做一个安全性检查。

否则,根据gamma参数设置一个新的cwnd,然后去更新cc_stas_vegas_ss_csig_blocked_ma,这个参数记录拥塞原因,越接近1证明通道阻塞的占比越多,越接近0表示rtt增大引发。rtt增大引发的窗口下降复读做记录。

检测到拥塞之后立即退出慢启动。

慢启动窗口设置最大值,如果超过了就退出慢启动阶段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
else if (cc->next_cc_event == 0) {
if (queue_use > cc->vegas_params.delta) {
uint64_t old_cwnd = cc->cwnd;
uint64_t cwnd_diff;

/* If we are above the delta threshold, drop cwnd down to the
* delta threshold. */
cc->cwnd = vegas_bdp(cc) + cc->vegas_params.delta - CWND_INC(cc);

/* Account the amount we reduced the cwnd by for the gamma cutoff */
cwnd_diff = (old_cwnd > cc->cwnd ? old_cwnd - cc->cwnd : 0);
cc_stats_vegas_delta_drop_ma =
stats_update_running_avg(cc_stats_vegas_delta_drop_ma,
cwnd_diff);

cc_stats_vegas_above_delta++;

/* Percentage metrics: Add 100% delta, 0 for other two */
cc_stats_vegas_csig_alpha_ma =
stats_update_running_avg(cc_stats_vegas_csig_alpha_ma,
0);
cc_stats_vegas_csig_beta_ma =
stats_update_running_avg(cc_stats_vegas_csig_beta_ma,
0);
cc_stats_vegas_csig_delta_ma =
stats_update_running_avg(cc_stats_vegas_csig_delta_ma,
100);
} else if (queue_use > cc->vegas_params.beta || cc->blocked_chan) {
cc->cwnd -= CWND_INC(cc);

/* Compute the percentage we experience a blocked csig vs RTT sig */
if (cc->blocked_chan) {
cc_stats_vegas_csig_blocked_ma =
stats_update_running_avg(cc_stats_vegas_csig_blocked_ma,
100);
} else {
cc_stats_vegas_csig_blocked_ma =
stats_update_running_avg(cc_stats_vegas_csig_blocked_ma,
0);
}

/* Percentage counters: Add 100% beta, 0 for other two */
cc_stats_vegas_csig_alpha_ma =
stats_update_running_avg(cc_stats_vegas_csig_alpha_ma,
0);
cc_stats_vegas_csig_beta_ma =
stats_update_running_avg(cc_stats_vegas_csig_beta_ma,
100);
cc_stats_vegas_csig_delta_ma =
stats_update_running_avg(cc_stats_vegas_csig_delta_ma,
0);
} else if (cc->cwnd_full &&
queue_use < cc->vegas_params.alpha) {
cc->cwnd += CWND_INC(cc);

/* Percentage counters: Add 100% alpha, 0 for other two */
cc_stats_vegas_csig_alpha_ma =
stats_update_running_avg(cc_stats_vegas_csig_alpha_ma,
100);
cc_stats_vegas_csig_beta_ma =
stats_update_running_avg(cc_stats_vegas_csig_beta_ma,
0);
cc_stats_vegas_csig_delta_ma =
stats_update_running_avg(cc_stats_vegas_csig_delta_ma,
0);
} else {
/* Percentage counters: No signal this round. Add 0% to all */
cc_stats_vegas_csig_alpha_ma =
stats_update_running_avg(cc_stats_vegas_csig_alpha_ma,
0);
cc_stats_vegas_csig_beta_ma =
stats_update_running_avg(cc_stats_vegas_csig_beta_ma,
0);
cc_stats_vegas_csig_delta_ma =
stats_update_running_avg(cc_stats_vegas_csig_delta_ma,
0);
}

/* cwnd can never fall below 1 increment */
cc->cwnd = MAX(cc->cwnd, cc->cwnd_min);

congestion_control_vegas_log(circ, cc);

/* Update metrics */
cc_stats_vegas_queue_ma =
stats_update_running_avg(cc_stats_vegas_queue_ma,
queue_use);
cc_stats_vegas_bdp_ma =
stats_update_running_avg(cc_stats_vegas_bdp_ma,
vegas_bdp(cc));

/* Log if we're above the ss_cap */
if (cc->cwnd >= cc->vegas_params.ss_cwnd_max) {
log_info(LD_CIRC,
"CC: TOR_VEGAS above ss_max in steady state for circ %d: %"PRIu64,
circ->purpose, cc->cwnd);
}
}

进入稳态阶段,如果立即检测是否拥塞:

如果严重拥塞,那cwnd回退一个单位。统计回退幅度,然后设置信号。

如果轻度拥塞或者通道阻塞,那么cwnd减一个单位。接着判断是否通道阻塞,根据判断结果更新拥塞原因。然后设置信号。

如果网络畅通,那么cwnd增加一个单位,然后设置信号。

如果没有更新,所有信号设置为0.

接下来cwnd保证不小于cuwnd_min,然后打log。

根据queue_use和bdp来更新阻塞原因。超出最大慢启动窗口值时仍然发出警告。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
  /* Reset event counters */
if (cc->next_cwnd_event == 0) {
cc->next_cwnd_event = SENDME_PER_CWND(cc);
}
if (cc->next_cc_event == 0) {
cc->next_cc_event = CWND_UPDATE_RATE(cc);
}

/* Decide if enough time has passed to reset the cwnd utilization */
if (cwnd_full_reset(cc))
cc->cwnd_full = 0;

/* Update inflight with ack */
cc->inflight = cc->inflight - cc->sendme_inc;

return 0;

/**
* Returns the number of sendme acks we will receive before we update cwnd.
*
* Congestion control literature recommends only one update of cwnd per
* cwnd worth of acks. However, we can also tune this to be more frequent
* by increasing the 'cc_cwnd_inc_rate' consensus parameter. This tuning
* only applies after slow start.
*
* If this returns 0 due to high cwnd_inc_rate, the calling code will
* update every sendme ack.
*/
static inline uint64_t CWND_UPDATE_RATE(const struct congestion_control_t *cc)
{
/* We add cwnd_inc_rate*sendme_inc/2 to round to nearest integer number
* of acks */

if (cc->in_slow_start) {
return 1;
} else {
return ((cc->cwnd + cc->cwnd_inc_rate*cc->sendme_inc/2)
/ (cc->cwnd_inc_rate*cc->sendme_inc));
}
}

/**
* Gives us the number of SENDMEs in a CWND, rounded.
*/
static inline uint64_t SENDME_PER_CWND(const struct congestion_control_t *cc)
{
/* We add cwnd_inc_rate*sendme_inc/2 to round to nearest integer number
* of acks */
return ((cc->cwnd + cc->sendme_inc/2)/cc->sendme_inc);
}

更新两个event,重置full信号,更新inflight。