2026-02-12 组会

2.12 组会

1. 研究问题与设计目标

1.1 研究问题

Tor 的 stream-level 流控里,client 会基于“自身接收侧排队/排空能力”给对端一个速率信号(XON/XOFF)。

但原始实现中,XON 的 kbps_ewma 来自 client 的测量值(ewma_drain_rate),实验者很难在不破坏协议语义的情况下构造“可控”的速率建议序列。

因此我们的问题是:

1.2 设计目标

  • 可控:支持固定速率、上限、比例缩放、方波(high/low)、强制 XOFF
  • 热更新:无需重启,通过配置文件更新控制参数
  • 低侵入:修改集中在 client 的 flow control 发送路径
  • 可复现:现象稳定,对应清晰的触发条件

2. 背景机制:Tor 的 stream-level 流控

2.1 stream-level(而不是circuit-level)

Tor 的数据传输最终落在 TCP socket 上。对应用而言,关键瓶颈是 client 侧 socket 写出能力与缓冲排队。

  • outbuf 堆积:表示应用侧写不动,或网络回压导致排队
  • outbuf 排空:表示 client 能把数据持续写入 socket

因此在 stream 层做反馈,有两个直接优点:

  1. 反馈与“真实接收能力”绑定(排队/排空)
  2. 相对来说约束比较Hard,可能没办法调很高但是一定可以很低

2.2 XOFF 与 XON 的语义

  • XOFF:硬回压信号。接收方明确表示“现在别再推数据了”。
  • XON(kbps_ewma):软速率建议。接收方告诉发送方“我能稳定排空的速率大约是 X kbps”。

2.3 为什么 Exit 收到 XON 会对应调整速率(机制解释)

在发送端(exit 方向),“发送速率”受两类因素共同约束:

  1. 流控窗口/在途量(inflight)约束(Circuit 级别):发送端通常不会无限制地把数据推入网络,会受控于某种 inflight 上限或 pacing 预算。
  2. 接收端反馈(XON/XOFF)(Stream 级别):XOFF 会直接抑制发送;XON 提供一个速率目标,使发送端把 pacing 目标向该速率收敛。

直观类比:

  • XOFF 类似“红灯”,直接停。
  • XON(kbps) 类似“限速牌”,告诉你目标速度是多少。

因此,当 exit 收到新的 XON 建议值时:

  • 其 pacing 目标会更新
  • bucket/预算的补充速率会随之变化
  • 最终表现为:

实际吞吐随 advertised_kbps(你广播的 kbps_ewma)发生可观测的响应。


3. 实现:用“advertised_kbps”替换“纯测量 kbps”

3.1 Block A:头部与实验标记(L33–L38)

引入头文件:

1
2
3
4
#include <stdio.h>
#include <sys/stat.h>
#include <string.h>
#include <ctype.h>

3.2 Block B:Client-only FlowCtl knobs(L54–L273)

这是本次工作的“控制面”。核心贡献是:

  1. 配置系统:从文件读取 mode 与参数
  2. 热更新:mtime 检测 + reload
  3. 模式集合:off/fixed/cap/scale/square/xoff
  4. 速率计算函数flowctl_compute_advertised_kbps(measured)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
/* ===================== Client-only FlowCtl knobs ===================== */
/* A tiny runtime knob system (file-based) to rewrite XON kbps on CLIENT.
* Exit side does not change; it already consumes XON kbps and adjusts bucket.
*/

typedef enum {
FLOWCTL_MODE_OFF = 0,
FLOWCTL_MODE_FIXED,
FLOWCTL_MODE_CAP,
FLOWCTL_MODE_SCALE,
FLOWCTL_MODE_SQUARE,
FLOWCTL_MODE_XOFF,
} flowctl_mode_t;

typedef struct flowctl_cfg_t {
flowctl_mode_t mode;

/* fixed */
uint32_t target_kbps;

/* cap */
uint32_t cap_kbps;

/* scale: permille, 1000 = 1.0x */
uint32_t scale_permille;

/* square wave */
uint32_t high_kbps;
uint32_t low_kbps;
uint32_t period_ms;

/* logging */
int log_on;
} flowctl_cfg_t;

static flowctl_cfg_t flowctl_cfg = {
.mode = FLOWCTL_MODE_OFF,
.target_kbps = 0,
.cap_kbps = 0,
.scale_permille = 1000,
.high_kbps = 0,
.low_kbps = 0,
.period_ms = 2000,
.log_on = 1,
};

static uint64_t flowctl_epoch = 1; /* bump on cfg reload */
static time_t flowctl_cfg_mtime = 0;

static inline const char *
flowctl_cfg_path(void)
{
const char *p = getenv("TOR_FLOWCTL_FILE");
return (p && p[0]) ? p : "/tmp/tor-flowctl.conf";
}

static inline void
flowctl_strstrip_inplace(char *s)
{
if (!s) return;
/* lstrip */
while (*s && isspace((unsigned char)*s)) {
memmove(s, s+1, strlen(s));
}
/* rstrip */
size_t n = strlen(s);
while (n > 0 && isspace((unsigned char)s[n-1])) {
s[n-1] = '\0';
n--;
}
}

static flowctl_mode_t
flowctl_parse_mode(const char *v)
{
if (!v) return FLOWCTL_MODE_OFF;
if (!strcasecmp(v, "off")) return FLOWCTL_MODE_OFF;
if (!strcasecmp(v, "fixed")) return FLOWCTL_MODE_FIXED;
if (!strcasecmp(v, "cap")) return FLOWCTL_MODE_CAP;
if (!strcasecmp(v, "scale")) return FLOWCTL_MODE_SCALE;
if (!strcasecmp(v, "square")) return FLOWCTL_MODE_SQUARE;
if (!strcasecmp(v, "xoff")) return FLOWCTL_MODE_XOFF;
return FLOWCTL_MODE_OFF;
}

/* returns 1 if reloaded and epoch bumped */
static int
flowctl_maybe_reload_cfg(void)
{
const char *path = flowctl_cfg_path();
struct stat st;

if (stat(path, &st) != 0) {
/* no file => keep current cfg */
return 0;
}
if (st.st_mtime == flowctl_cfg_mtime) {
return 0;
}

FILE *fp = fopen(path, "r");
if (!fp) {
return 0;
}

flowctl_cfg_t newcfg = flowctl_cfg; /* start from current */
char line[256];

while (fgets(line, sizeof(line), fp)) {
flowctl_strstrip_inplace(line);
if (line[0] == '\0' || line[0] == '#')
continue;

char *eq = strchr(line, '=');
if (!eq) continue;
*eq = '\0';
char *k = line;
char *v = eq + 1;
flowctl_strstrip_inplace(k);
flowctl_strstrip_inplace(v);

if (!strcasecmp(k, "mode")) {
newcfg.mode = flowctl_parse_mode(v);
} else if (!strcasecmp(k, "target_kbps")) {
newcfg.target_kbps = (uint32_t)strtoul(v, NULL, 10);
} else if (!strcasecmp(k, "cap_kbps")) {
newcfg.cap_kbps = (uint32_t)strtoul(v, NULL, 10);
} else if (!strcasecmp(k, "scale_permille")) {
newcfg.scale_permille = (uint32_t)strtoul(v, NULL, 10);
} else if (!strcasecmp(k, "high_kbps")) {
newcfg.high_kbps = (uint32_t)strtoul(v, NULL, 10);
} else if (!strcasecmp(k, "low_kbps")) {
newcfg.low_kbps = (uint32_t)strtoul(v, NULL, 10);
} else if (!strcasecmp(k, "period_ms")) {
newcfg.period_ms = (uint32_t)strtoul(v, NULL, 10);
if (newcfg.period_ms == 0) newcfg.period_ms = 2000;
} else if (!strcasecmp(k, "log")) {
newcfg.log_on = (int)strtol(v, NULL, 10);
}
}

fclose(fp);

flowctl_cfg = newcfg;
flowctl_cfg_mtime = st.st_mtime;
flowctl_epoch++;

if (flowctl_cfg.log_on) {
log_notice(LD_EDGE,
"[flowctl] reload: mode=%d target=%u cap=%u scale=%u high=%u low=%u period_ms=%u epoch=%" PRIu64 " file=%s",
(int)flowctl_cfg.mode,
flowctl_cfg.target_kbps,
flowctl_cfg.cap_kbps,
flowctl_cfg.scale_permille,
flowctl_cfg.high_kbps,
flowctl_cfg.low_kbps,
flowctl_cfg.period_ms,
(uint64_t)flowctl_epoch,
path);
}

return 1;
}

static inline uint32_t
flowctl_compute_advertised_kbps(uint32_t measured_kbps)
{
uint32_t adv = measured_kbps;

switch (flowctl_cfg.mode) {
case FLOWCTL_MODE_OFF:
return measured_kbps;

case FLOWCTL_MODE_FIXED:
adv = flowctl_cfg.target_kbps;
break;

case FLOWCTL_MODE_CAP:
if (flowctl_cfg.cap_kbps > 0)
adv = MIN(measured_kbps, flowctl_cfg.cap_kbps);
break;

case FLOWCTL_MODE_SCALE: {
uint64_t t = (uint64_t)measured_kbps * (uint64_t)flowctl_cfg.scale_permille;
adv = (uint32_t)MIN((uint64_t)INT32_MAX, (t + 500) / 1000); /* rounded */
break;
}

case FLOWCTL_MODE_SQUARE: {
uint64_t now_us = monotime_absolute_usec();
uint64_t phase = (flowctl_cfg.period_ms ? (uint64_t)flowctl_cfg.period_ms : 2000) * 1000;
uint64_t slot = (phase ? (now_us / phase) : 0);
adv = (slot % 2 == 0) ? flowctl_cfg.high_kbps : flowctl_cfg.low_kbps;
break;
}

case FLOWCTL_MODE_XOFF:
/* handled by caller */
adv = measured_kbps;
break;

default:
adv = measured_kbps;
break;
}

/* Never advertise 0 here. In this file, rate==0 may be treated as "no limit".
* If you want "pause", use XOFF mode.
*/
if (adv == 0) adv = 1;
if (adv > (uint32_t)INT32_MAX) adv = (uint32_t)INT32_MAX;

return adv;
}

/* ===================== End Client-only FlowCtl knobs ===================== */

3.3 Block C:XON 发送路径改写(L448–L470)

这一块是“数据面”入口:在 circuit_send_stream_xon()(或等价函数)里做三件事:

  1. 发送前 reload:保证配置热更新能即时生效
  2. AP(client) 限定:只对 client 侧生效,避免影响 relay/exit
  3. payload 改写:把 xon_cell_set_kbps_ewma() 的输入由 measured 改为 advertised

没有改变 XON 作为反馈信号的协议语义,只是把反馈值从“纯测量”替换为“实验可控的 advertised_kbps”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* reload knobs (cheap: mtime check) */
(void)flowctl_maybe_reload_cfg();

/* If we are the CLIENT (AP conn), we may rewrite what we advertise. */
uint32_t measured_kbps = stream->ewma_drain_rate;
uint32_t advertised_kbps = measured_kbps;

if (TO_CONN(stream)->type == CONN_TYPE_AP) {
if (flowctl_cfg.mode == FLOWCTL_MODE_XOFF) {
if (flowctl_cfg.log_on) {
log_notice(LD_EDGE, "[flowctl] send XOFF (mode=xoff) stream=%p epoch=%" PRIu64,
(void*)stream, (uint64_t)flowctl_epoch);
}
circuit_send_stream_xoff(stream);
return;
}

advertised_kbps = flowctl_compute_advertised_kbps(measured_kbps);
}

3.4 Block D:last_sent 语义统一(L485–L488)

你把 stream->ewma_rate_last_sent 的语义改成:

  • 记录 上一次发送的 advertised_kbps
1
stream->ewma_rate_last_sent = advertised_kbps;

3.5 Block E:xoff / 日志 / epoch 等增强(L497–L507)

  • mode=xoff 时直接发送 XOFF
  • 额外日志(measured vs advertised)
  • epoch 记录(标记本次发送对应哪一版配置)
1
2
3
4
5
6
7
8
9
stream->flowctl_epoch_last_sent = flowctl_epoch;

if (flowctl_cfg.log_on && TO_CONN(stream)->type == CONN_TYPE_AP) {
log_notice(LD_EDGE,
"[flowctl] sent XON stream=%p measured_kbps=%u adv_kbps=%u outbuf=%" TOR_PRIuSZ " epoch=%" PRIu64,
(void*)stream, measured_kbps, advertised_kbps,
connection_get_outbuf_len(TO_CONN(stream)),
(uint64_t)flowctl_epoch);
}

3.6 Block F:rate-change 判定逻辑支持方波(L823–L850)

  • 对 client 且 mode!=off:
    • 计算 adv_now
    • square 模式:只要 adv_now != last_sent 就触发
    • 其他模式:用 change_pct 比较 adv_nowlast_sent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/* Client-only: if knobs changed, force an advisory update quickly. */
if (TO_CONN(stream)->type == CONN_TYPE_AP &&
flowctl_cfg.mode != FLOWCTL_MODE_OFF) {

uint32_t adv_now = flowctl_compute_advertised_kbps(stream->ewma_drain_rate);

/* 如果从未发过,就别触发,避免启动抖动 */
if (!stream->ewma_rate_last_sent)
return false;

/* square 的核心:相位变了就必须发一次 */
if (flowctl_cfg.mode == FLOWCTL_MODE_SQUARE) {
return adv_now != stream->ewma_rate_last_sent;
}

/* 其它模式:保留原来的 change_pct 阈值逻辑(用 measured 或 adv 都行,但要一致) */
if (adv_now >
(100+(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100)
return true;

if (adv_now <
(100-(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100)
return true;

return false;
}

4. 实验现象

下载时按照预期方波随周期从100k波动到3000k下一点。

电路切换现象

两条电路 connection-level

复现错误找原因