2026-01-29 组会

2026-01-29 组会

Circuit 级别流控

算法原理及流程

背景:Client 发起下载请求,Exit 端大量给Client 端发数据。

在circuit 的级别的拥塞控制中,每条电路绑定一个congestion_control_t结构体储存拥塞指标:

1
2
3
4
5
6
7
8
9
10
11
12
13
/** RTT time data for congestion control. */
uint64_t ewma_rtt_usec; // 平滑后的当前估计rtt
uint64_t min_rtt_usec; // 历史观测到的最小rtt
uint64_t max_rtt_usec; // 历史观测到的最大rtt

/* Vegas BDP estimate */
uint64_t bdp;

/** Congestion window */
uint64_t cwnd;

/** Number of cells in-flight (sent but awaiting SENDME ack). */
uint64_t inflight;

Exit 端会根据这些指标调用一个函数congestion_control_get_package_window来计算还可以发多少包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Get a package window from either old sendme logic, or congestion control.
*
* A package window is how many cells you can still send.
*/
int
congestion_control_get_package_window(const circuit_t *circ,
const crypt_path_t *cpath)
{
int package_window;
congestion_control_t *cc;

tor_assert(circ);

if (cpath) {
package_window = cpath->package_window;
cc = cpath->ccontrol;
} else {
package_window = circ->package_window;
cc = circ->ccontrol;
}

if (!cc) {
return package_window;
} else {
/* Inflight can be above cwnd if cwnd was just reduced */
if (cc->inflight > cc->cwnd)
return 0;
/* In the extremely unlikely event that cwnd-inflight is larger than
* INT32_MAX, just return that cap, so old code doesn't explode. */
else if (cc->cwnd - cc->inflight > INT32_MAX)
return INT32_MAX;
else
return (int)(cc->cwnd - cc->inflight);
}
}

这个函数在两个地方被调用,第一个地方是Exit 端计算最多需要打包的数量:

1
2
3
4
5
6
//relay.c:circuit_resume_edge_reading_helper

/* How many cells do we have space for? It will be the minimum of
* the number needed to exhaust the package window, and the minimum
* needed to fill the cell queue. */
max_to_package = congestion_control_get_package_window(circ, layer_hint);

第二个地方是在Exit 端准备发送数据的时候检查当前选择的电路是否放行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//sendme.c:sendme_note_circuit_data_packaged

/* Return appropriate number designating how many cells can still be sent */
return congestion_control_get_package_window(circ, layer_hint);

//relay.c:relay_send_command_from_edge
/* If applicable, note the cell digest for the SENDME version 1 purpose if
* we need to. This call needs to be after the circuit_package_relay_cell()
* because the cell digest is set within that function. */
if (relay_command == RELAY_COMMAND_DATA) {
sendme_record_cell_digest_on_circ(circ, cpath_layer);

/* Handle the circuit-level SENDME package window. */
if (sendme_note_circuit_data_packaged(circ, cpath_layer) < 0) {
/* Package window has gone under 0. Protocol issue. */
log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
"Circuit package window is below 0. Closing circuit.");
circuit_mark_for_close(circ, END_CIRC_REASON_TORPROTOCOL);
return -1;
}
}

可以理解为,Exit 端判断是否能发,其实是在判断cc->cwnd>cc->inflight是否成立,只要成立circuit 级别的拥塞控制就会放行,允许继续打包发包。

问题

  1. 由于Vegas 算法本身并不激进,cwnd值在稳态阶段每次只+1,提前发SENDME基本没有调速的空间
  2. 虽然表面上可以通过延后/阻塞SENDME的方式来调速,但实际上没有可行性:
    1. 如果延后,虽然理论上能把cwnd调小,但是cwnd的最小值基本在186往上,这个值太大了,看不到流速的变化
    2. 如果阻塞,虽然在私网Chutney 测试框架下能直接看到整个下载被阻塞住了,但是这个在公网框架上完全不可行,Tor 目前对于cc->cwnd<cc->inflight的处理倾向是直接关闭这条电路,而非尝试等一下这条电路恢复,所以实际上也看不到流速变化
  3. 在实际公网下载的过程中,cwnd经常达到300+的程度,这是infligth(要有10+个SENDME在路上)几乎难以达到的区间,我认为circuit-level 的流控在实际的场景下几乎是没有起到作用的,这是一个非常非常“软”的东西。

Conflux 机制

算法原理及流程

在Conflux 调度算法选择上,会根据是Exit 还是Client 有一个默认值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//conflux_pool.c

/* For unit tests only: please treat these exactly as the defines in the
* code. */
STATIC uint8_t DEFAULT_CLIENT_UX = CONFLUX_UX_HIGH_THROUGHPUT;
STATIC uint8_t DEFAULT_EXIT_UX = CONFLUX_UX_MIN_LATENCY;

/**
* Return the conflux algorithm for a desired UX value.
*/
static uint8_t
conflux_choose_algorithm(uint8_t desired_ux)
{
switch (desired_ux) {
case CONFLUX_UX_NO_OPINION:
return CONFLUX_ALG_LOWRTT;
case CONFLUX_UX_MIN_LATENCY:
return CONFLUX_ALG_MINRTT;
case CONFLUX_UX_HIGH_THROUGHPUT:
return CONFLUX_ALG_LOWRTT;
/* For now, we have no low mem algs, so use minRTT since it should
* switch less and thus use less mem */
/* TODO-329-TUNING: Pick better algs here*/
case CONFLUX_UX_LOW_MEM_THROUGHPUT:
case CONFLUX_UX_LOW_MEM_LATENCY:
return CONFLUX_ALG_MINRTT;
default:
/* Trunnel should protect us from this */
tor_assert_nonfatal_unreached();
return CONFLUX_ALG_LOWRTT;
}
}

可以看到实际上是用minrtt 和lowrtt 两个算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* Return the circuit with the minimum RTT. Do not use any
* other circuit.
*
* This algorithm will minimize RTT always, and will not provide
* any throughput benefit. We expect it to be useful for VoIP/UDP
* use cases. Because it only uses one circuit on a leg at a time,
* it can have more than one circuit per guard (ie: to find
* lower-latency middles for the path).
*/
static const circuit_t *
conflux_decide_circ_minrtt(const conflux_t *cfx)
{
uint64_t min_rtt = UINT64_MAX;
const circuit_t *circ = NULL;

/* Can't get here without any legs. */
tor_assert(CONFLUX_NUM_LEGS(cfx));

CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {

/* Ignore circuits with no RTT measurement */
if (leg->circ_rtts_usec && leg->circ_rtts_usec < min_rtt) {
circ = leg->circ;
min_rtt = leg->circ_rtts_usec;
}
} CONFLUX_FOR_EACH_LEG_END(leg);

/* If the minRTT circuit can't send, dont send on any circuit. */
if (!circ || !circuit_ready_to_send(circ)) {
return NULL;
}
return circ;
}

/**
* Favor the circuit with the lowest RTT that still has space in the
* congestion window.
*
* This algorithm will maximize total throughput at the expense of
* bloating out-of-order queues.
*/
static const circuit_t *
conflux_decide_circ_lowrtt(const conflux_t *cfx)
{
uint64_t low_rtt = UINT64_MAX;
const circuit_t *circ = NULL;

/* Can't get here without any legs. */
tor_assert(CONFLUX_NUM_LEGS(cfx));

CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
/* If the package window is full, skip it */
if (!circuit_ready_to_send(leg->circ)) {
continue;
}

/* Ignore circuits with no RTT */
if (leg->circ_rtts_usec && leg->circ_rtts_usec < low_rtt) {
low_rtt = leg->circ_rtts_usec;
circ = leg->circ;
}
} CONFLUX_FOR_EACH_LEG_END(leg);

/* At this point, if we found a circuit, we've already validated that its
* congestion window has room. */
return circ;
}

可能技术路线

  1. 由于这个选择是在客户端进行的,可以客户端固定到lowrtt然后做换路。

Stream 级别流控

算法原理及流程

Stream 级别的流控主要关注上游发数据的速度太快,导致下游的outbuf(Client 写给下游的socket)堆满的问题。

每个stream 会维护一个connection_t结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//connection.c

/** Description of a connection to another host or process, and associated
* data.
*
* A connection is named based on what it's connected to -- an "OR
* connection" has a Tor node on the other end, an "exit
* connection" has a website or other server on the other end, and an
* "AP connection" has an application proxy (and thus a user) on the
* other end.
*
* Every connection has a type and a state. Connections never change
* their type, but can go through many state changes in their lifetime.
*
* Every connection has two associated input and output buffers.
* Listeners don't use them. For non-listener connections, incoming
* data is appended to conn->inbuf, and outgoing data is taken from
* conn->outbuf. Connections differ primarily in the functions called
* to fill and drain these buffers.
*/
struct connection_t {

当Client 端接收到一个包,把这个包写入outbuf之后,会调用connection_get_outbuf_len来计算outbuf 现在积压了多少数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//connection.c

/**
* Called from sendme_stream_data_received(), when data arrives
* from a circuit to our edge's outbuf, to decide if we need to send
* an XOFF.
*
* Returns the amount of cells remaining until the buffer is full, at
* which point it sends an XOFF, and returns 0.
*
* Returns less than 0 if we have queued more than a congestion window
* worth of data and need to close the circuit.
*/
int
flow_control_decide_xoff(edge_connection_t *stream)
{
size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));

size_t
connection_get_outbuf_len(const connection_t *conn)
{
return conn->outbuf ? buf_datalen(conn->outbuf) : 0;
}

然后接着判断stream 的类型取不同的buffer_limit_xoff

1
2
3
4
5
6
7
8
9
//congestion_control_flow.c:flow_control_decide_xoff

/* Onion services and clients are typically localhost edges, so they
* need different buffering limits than exits do */
if (TO_CONN(stream)->type == CONN_TYPE_AP || stream->hs_ident != NULL) {
buffer_limit_xoff = xoff_client;
} else {
buffer_limit_xoff = xoff_exit;
}

两个阈值在实际上取自共识参数,默认值都是500个cell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//congestion_control_flow.c

/**
* Update global congestion control related consensus parameter values, every
* consensus update.
*
* More details for each of the parameters can be found in proposal 324,
* section 6.5 including tuning notes.
*/
void
flow_control_new_consensus_params(const networkstatus_t *ns)
{
#define CC_XOFF_CLIENT_DFLT 500
#define CC_XOFF_CLIENT_MIN 1
#define CC_XOFF_CLIENT_MAX 10000
xoff_client = networkstatus_get_param(ns, "cc_xoff_client",
CC_XOFF_CLIENT_DFLT,
CC_XOFF_CLIENT_MIN,
CC_XOFF_CLIENT_MAX)*RELAY_PAYLOAD_SIZE_MIN;

#define CC_XOFF_EXIT_DFLT 500
#define CC_XOFF_EXIT_MIN 1
#define CC_XOFF_EXIT_MAX 10000
xoff_exit = networkstatus_get_param(ns, "cc_xoff_exit",
CC_XOFF_EXIT_DFLT,
CC_XOFF_EXIT_MIN,
CC_XOFF_EXIT_MAX)*RELAY_PAYLOAD_SIZE_MIN;

如果outbuf超过了阈值,而且过了5ms也没有降下来,那么XOFF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//congestion_control_flow.c:flow_control_decide_xoff

if (total_buffered > buffer_limit_xoff) {
if (!stream->xoff_sent) {
uint64_t now = monotime_absolute_usec();

if (stream->xoff_grace_period_start_usec == 0) {
/* If unset, we haven't begun the XOFF grace period. We need to start.
*/
log_debug(LD_EDGE,
"Exceeded XOFF limit; Beginning grace period: "
"total-buffered=%" TOR_PRIuSZ " xoff-limit=%d",
total_buffered, buffer_limit_xoff);

stream->xoff_grace_period_start_usec = now;
} else if (now > stream->xoff_grace_period_start_usec +
XOFF_GRACE_PERIOD_USEC) {
/* If we've exceeded our XOFF grace period, we need to send an XOFF. */
log_info(LD_EDGE,
"Sending XOFF: total-buffered=%" TOR_PRIuSZ
" xoff-limit=%d grace-period-dur=%" PRIu64 "usec",
total_buffered, buffer_limit_xoff,
now - stream->xoff_grace_period_start_usec);
tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xoff_sending), stream);

cc_stats_flow_xoff_outbuf_ma =
stats_update_running_avg(cc_stats_flow_xoff_outbuf_ma,
total_buffered);

circuit_send_stream_xoff(stream);

/* Clear the drain rate. It is considered wrong if we
* got all the way to XOFF */
stream->ewma_drain_rate = 0;

/* Unset our grace period. */
stream->xoff_grace_period_start_usec = 0;
} else {
/* Else we're in the XOFF grace period, so don't do anything. */
}
}
} else {
/* The outbuf length is less than the XOFF limit, so unset our grace
* period. */
stream->xoff_grace_period_start_usec = 0;
}

此时调用circuit_send_stream_xoff发送XOFF信号:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//congestion_control_flow.c

/**
* Send an XOFF for this stream, and note that we sent one
*/
static void
circuit_send_stream_xoff(edge_connection_t *stream)
{
xoff_cell_t xoff;
uint8_t payload[CELL_PAYLOAD_SIZE];
ssize_t xoff_size;

memset(&xoff, 0, sizeof(xoff));
memset(payload, 0, sizeof(payload));

xoff_cell_set_version(&xoff, 0);

if ((xoff_size = xoff_cell_encode(payload, CELL_PAYLOAD_SIZE, &xoff)) < 0) {
log_warn(LD_BUG, "Failed to encode xon cell");
return;
}

if (connection_edge_send_command(stream, RELAY_COMMAND_XOFF,
(char*)payload, (size_t)xoff_size) == 0) {
stream->xoff_sent = true;
cc_stats_flow_num_xoff_sent++;

/* If this is an entry conn, notify control port */
if (TO_CONN(stream)->type == CONN_TYPE_AP) {
control_event_stream_status(TO_ENTRY_CONN(TO_CONN(stream)),
STREAM_EVENT_XOFF_SENT,
0);
}
}
}

此时Exit 端接收到XOFF信号,暂停发包,等待XON信号。

当数据从outbuf流出到socket 的时候,调用flow_control_decide_xon来决定是否发送XON信号,首先保持drained_bytes稳定,然后更新drained_bytes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//congestion_control_flow.c:flow_control_decide_xon

/**
* Called whenever we drain an edge connection outbuf by writing on
* its socket, to decide if it is time to send an xon.
*
* The n_written parameter tells us how many bytes we have written
* this time, which is used to compute the advisory drain rate fields.
*/
void
flow_control_decide_xon(edge_connection_t *stream, size_t n_written)
{
size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));

/* Bounds check the number of drained bytes, and scale */
if (stream->drained_bytes >= UINT32_MAX - n_written) {
/* Cut the bytes in half, and move the start time up halfway to now
* (if we have one). */
stream->drained_bytes /= 2;

if (stream->drain_start_usec) {
uint64_t now = monotime_absolute_usec();

stream->drain_start_usec = now - (now-stream->drain_start_usec)/2;
}
}

/* Accumulate drained bytes since last rate computation */
stream->drained_bytes += n_written;

如果已经排队了一段时间了,且最近排出的数据有一定量,那么调用compute_drain_rate更新drain_rate,然后充值drained_bytes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//congestion_control_flow.c:flow_control_decide_xon

if (stream->drain_start_usec) {
/* If we have spent enough time in a queued state, update our drain
* rate. */
if (stream->drained_bytes > xon_rate_bytes) {
/* No previous drained bytes means it is the first time we are computing
* it so use the value we just drained onto the socket as a baseline. It
* won't be accurate but it will be a start towards the right value.
*
* We have to do this in order to have a drain rate else we could be
* sending a drain rate of 0 in an XON which would be undesirable and
* basically like sending an XOFF. */
if (stream->prev_drained_bytes == 0) {
stream->prev_drained_bytes = stream->drained_bytes;
}
uint32_t drain_rate = compute_drain_rate(stream);
/* Once the drain rate has been computed, note how many bytes we just
* drained so it can be used at the next calculation. We do this here
* because it gets reset once the rate is changed. */
stream->prev_drained_bytes = stream->drained_bytes;

if (drain_rate) {
stream->ewma_drain_rate =
(uint32_t)n_count_ewma(drain_rate,
stream->ewma_drain_rate,
xon_ewma_cnt);
log_debug(LD_EDGE, "Updating drain rate: %d %d %"TOR_PRIuSZ,
drain_rate,
stream->ewma_drain_rate,
total_buffered);
tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_update),
stream, drain_rate);
/* Reset recent byte counts. This prevents us from sending advisory
* XONs more frequent than every xon_rate_bytes. */
stream->drained_bytes = 0;
stream->drain_start_usec = 0;
}
}
}

drain_rate直接计算,单位是KB/sec:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//congestion_control_flow.c

/**
* Compute the recent drain rate (write rate) for this edge
* connection and return it, in KB/sec (1000 bytes/sec).
*
* Returns 0 if the monotime clock is busted.
*/
static inline uint32_t
compute_drain_rate(const edge_connection_t *stream)
{
if (BUG(!is_monotime_clock_reliable())) {
log_warn(LD_BUG, "Computing drain rate with stalled monotime clock");
return 0;
}

uint64_t delta = monotime_absolute_usec() - stream->drain_start_usec;

if (delta == 0) {
log_warn(LD_BUG, "Computing stream drain rate with zero time delta");
return 0;
}

/* Overflow checks */
if (stream->prev_drained_bytes > INT32_MAX/1000 || /* Intermediate */
stream->prev_drained_bytes/delta > INT32_MAX/1000) { /* full value */
return INT32_MAX;
}

/* kb/sec = bytes/usec * 1000 usec/msec * 1000 msec/sec * kb/1000bytes */
return MAX(1, (uint32_t)(stream->prev_drained_bytes * 1000)/delta);
}

如果当前是没有发送XOFF的状态或者发送了XOFF但是outbuf已经清零了,那么考虑更新速率,默认的变化阈值是25%:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//congestion_control_flow.c:flow_control_decide_xon

/* If we don't have an XOFF outstanding, consider updating an
* old rate */
if (!stream->xoff_sent) {
if (stream_drain_rate_changed(stream)) {
/* If we are still buffering and the rate changed, update
* advisory XON */
log_info(LD_EDGE, "Sending rate-change XON: %d %d %"TOR_PRIuSZ,
stream->ewma_rate_last_sent,
stream->ewma_drain_rate,
total_buffered);
tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_rate_change), stream);

cc_stats_flow_xon_outbuf_ma =
stats_update_running_avg(cc_stats_flow_xon_outbuf_ma,
total_buffered);

circuit_send_stream_xon(stream);
}
} else if (total_buffered == 0) {
log_info(LD_EDGE, "Sending XON: %d %d %"TOR_PRIuSZ,
stream->ewma_rate_last_sent,
stream->ewma_drain_rate,
total_buffered);
tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_partial_drain), stream);
circuit_send_stream_xon(stream);
}

Exit 端在接到一个XON信号之后,调用circuit_process_stream_xon来清除已有的XOFF或者调整限速:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/**
* Process a stream XON, and if it validates, clear the xoff
* flag and resume reading on this edge connection.
*
* Also, use provided rate information to rate limit
* reading on this edge (or packagaing from it onto
* the circuit), to avoid XON/XOFF chatter.
*
* Returns true if the XON validates, false otherwise.
*/
bool
circuit_process_stream_xon(edge_connection_t *conn,
const crypt_path_t *layer_hint,
const relay_msg_t *msg)
{
xon_cell_t *xon;
bool retval = true;

if (BUG(!conn)) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XON on invalid stream?");
return false;
}

/* Make sure this XON came from the right hop */
if (!edge_uses_cpath(conn, layer_hint)) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XON from wrong hop.");
return false;
}

if (!edge_uses_flow_control(conn)) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XON for non-congestion control circuit");
return false;
}

if (xon_cell_parse(&xon, msg->body, msg->length) < 0) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Received malformed XON cell.");
return false;
}

/* If we are near the max, scale everything down */
if (conn->num_xon_recv == XON_COUNT_SCALE_AT) {
log_info(LD_EDGE, "Scaling down for XON count: %d %d %d",
conn->total_bytes_xmit,
conn->num_xoff_recv,
conn->num_xon_recv);
conn->total_bytes_xmit /= 2;
conn->num_xoff_recv /= 2;
conn->num_xon_recv /= 2;
}

conn->num_xon_recv++;

/* Client-side check to make sure that XON is not sent too early,
* for dropmark attacks. The main sidechannel risk is early cells,
* but we also check to see that we did not get more XONs than make
* sense for the number of bytes we sent.
*/
if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) {
uint32_t limit = 0;

if (conn->hs_ident)
limit = MIN(xoff_client, xon_rate_bytes);
else
limit = MIN(xoff_exit, xon_rate_bytes);

if (conn->total_bytes_xmit < limit*conn->num_xon_recv) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got extra XON for bytes sent. Got %d, expected max %d",
conn->num_xon_recv, conn->total_bytes_xmit/limit);

/* We still process this, because the only dropmark defenses
* in C tor are via the vanguards addon's use of the read valid
* cells. So just signal that we think this is not valid protocol
* data and proceed. */
retval = false;
}
}

log_info(LD_EDGE, "Got XON: %d", xon->kbps_ewma);

/* Adjust the token bucket of this edge connection with the drain rate in
* the XON. Rate is in bytes from kilobit (kpbs). */
uint64_t rate = ((uint64_t) xon_cell_get_kbps_ewma(xon) * 1000);
if (rate == 0 || INT32_MAX < rate) {
/* No rate. */
rate = INT32_MAX;
}
token_bucket_rw_adjust(&conn->bucket, (uint32_t) rate, (uint32_t) rate);

if (conn->xoff_received) {
/* Clear the fact that we got an XOFF, so that this edge can
* start and stop reading normally */
conn->xoff_received = false;
connection_start_reading(TO_CONN(conn));
}

/* If this is an entry conn, notify control port */
if (TO_CONN(conn)->type == CONN_TYPE_AP) {
control_event_stream_status(TO_ENTRY_CONN(TO_CONN(conn)),
STREAM_EVENT_XON_RECV,
0);
}

xon_cell_free(xon);

return retval;
}

技术路线

  1. 由于这一整个算法都是在Client 端进行的,Exit 端只管收到两个信号执行对应的事情,所以方便调节。
  2. XON里面携带的速率建议可以进行改动进而调整Stream 级别的速率