/** * Get a package window from either old sendme logic, or congestion control. * * A package window is how many cells you can still send. */ int congestion_control_get_package_window(constcircuit_t *circ, constcrypt_path_t *cpath) { int package_window; congestion_control_t *cc;
tor_assert(circ);
if (cpath) { package_window = cpath->package_window; cc = cpath->ccontrol; } else { package_window = circ->package_window; cc = circ->ccontrol; }
if (!cc) { return package_window; } else { /* Inflight can be above cwnd if cwnd was just reduced */ if (cc->inflight > cc->cwnd) return0; /* In the extremely unlikely event that cwnd-inflight is larger than * INT32_MAX, just return that cap, so old code doesn't explode. */ elseif (cc->cwnd - cc->inflight > INT32_MAX) return INT32_MAX; else return (int)(cc->cwnd - cc->inflight); } }
这个函数在两个地方被调用,第一个地方是Exit 端计算最多需要打包的数量:
1 2 3 4 5 6
//relay.c:circuit_resume_edge_reading_helper
/* How many cells do we have space for? It will be the minimum of * the number needed to exhaust the package window, and the minimum * needed to fill the cell queue. */ max_to_package = congestion_control_get_package_window(circ, layer_hint);
/* Return appropriate number designating how many cells can still be sent */ return congestion_control_get_package_window(circ, layer_hint);
//relay.c:relay_send_command_from_edge /* If applicable, note the cell digest for the SENDME version 1 purpose if * we need to. This call needs to be after the circuit_package_relay_cell() * because the cell digest is set within that function. */ if (relay_command == RELAY_COMMAND_DATA) { sendme_record_cell_digest_on_circ(circ, cpath_layer);
/* Handle the circuit-level SENDME package window. */ if (sendme_note_circuit_data_packaged(circ, cpath_layer) < 0) { /* Package window has gone under 0. Protocol issue. */ log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL, "Circuit package window is below 0. Closing circuit."); circuit_mark_for_close(circ, END_CIRC_REASON_TORPROTOCOL); return-1; } }
/* For unit tests only: please treat these exactly as the defines in the * code. */ STATIC uint8_t DEFAULT_CLIENT_UX = CONFLUX_UX_HIGH_THROUGHPUT; STATIC uint8_t DEFAULT_EXIT_UX = CONFLUX_UX_MIN_LATENCY;
/** * Return the conflux algorithm for a desired UX value. */ staticuint8_t conflux_choose_algorithm(uint8_t desired_ux) { switch (desired_ux) { case CONFLUX_UX_NO_OPINION: return CONFLUX_ALG_LOWRTT; case CONFLUX_UX_MIN_LATENCY: return CONFLUX_ALG_MINRTT; case CONFLUX_UX_HIGH_THROUGHPUT: return CONFLUX_ALG_LOWRTT; /* For now, we have no low mem algs, so use minRTT since it should * switch less and thus use less mem */ /* TODO-329-TUNING: Pick better algs here*/ case CONFLUX_UX_LOW_MEM_THROUGHPUT: case CONFLUX_UX_LOW_MEM_LATENCY: return CONFLUX_ALG_MINRTT; default: /* Trunnel should protect us from this */ tor_assert_nonfatal_unreached(); return CONFLUX_ALG_LOWRTT; } }
/** * Return the circuit with the minimum RTT. Do not use any * other circuit. * * This algorithm will minimize RTT always, and will not provide * any throughput benefit. We expect it to be useful for VoIP/UDP * use cases. Because it only uses one circuit on a leg at a time, * it can have more than one circuit per guard (ie: to find * lower-latency middles for the path). */ staticconstcircuit_t * conflux_decide_circ_minrtt(constconflux_t *cfx) { uint64_t min_rtt = UINT64_MAX; constcircuit_t *circ = NULL;
/* Can't get here without any legs. */ tor_assert(CONFLUX_NUM_LEGS(cfx));
CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
/* Ignore circuits with no RTT measurement */ if (leg->circ_rtts_usec && leg->circ_rtts_usec < min_rtt) { circ = leg->circ; min_rtt = leg->circ_rtts_usec; } } CONFLUX_FOR_EACH_LEG_END(leg);
/* If the minRTT circuit can't send, dont send on any circuit. */ if (!circ || !circuit_ready_to_send(circ)) { returnNULL; } return circ; }
/** * Favor the circuit with the lowest RTT that still has space in the * congestion window. * * This algorithm will maximize total throughput at the expense of * bloating out-of-order queues. */ staticconstcircuit_t * conflux_decide_circ_lowrtt(constconflux_t *cfx) { uint64_t low_rtt = UINT64_MAX; constcircuit_t *circ = NULL;
/* Can't get here without any legs. */ tor_assert(CONFLUX_NUM_LEGS(cfx));
CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) { /* If the package window is full, skip it */ if (!circuit_ready_to_send(leg->circ)) { continue; }
/* Ignore circuits with no RTT */ if (leg->circ_rtts_usec && leg->circ_rtts_usec < low_rtt) { low_rtt = leg->circ_rtts_usec; circ = leg->circ; } } CONFLUX_FOR_EACH_LEG_END(leg);
/* At this point, if we found a circuit, we've already validated that its * congestion window has room. */ return circ; }
/** Description of a connection to another host or process, and associated * data. * * A connection is named based on what it's connected to -- an "OR * connection" has a Tor node on the other end, an "exit * connection" has a website or other server on the other end, and an * "AP connection" has an application proxy (and thus a user) on the * other end. * * Every connection has a type and a state. Connections never change * their type, but can go through many state changes in their lifetime. * * Every connection has two associated input and output buffers. * Listeners don't use them. For non-listener connections, incoming * data is appended to conn->inbuf, and outgoing data is taken from * conn->outbuf. Connections differ primarily in the functions called * to fill and drain these buffers. */ structconnection_t {
/** * Called from sendme_stream_data_received(), when data arrives * from a circuit to our edge's outbuf, to decide if we need to send * an XOFF. * * Returns the amount of cells remaining until the buffer is full, at * which point it sends an XOFF, and returns 0. * * Returns less than 0 if we have queued more than a congestion window * worth of data and need to close the circuit. */ int flow_control_decide_xoff(edge_connection_t *stream) { size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
/* Onion services and clients are typically localhost edges, so they * need different buffering limits than exits do */ if (TO_CONN(stream)->type == CONN_TYPE_AP || stream->hs_ident != NULL) { buffer_limit_xoff = xoff_client; } else { buffer_limit_xoff = xoff_exit; }
/** * Update global congestion control related consensus parameter values, every * consensus update. * * More details for each of the parameters can be found in proposal 324, * section 6.5 including tuning notes. */ void flow_control_new_consensus_params(constnetworkstatus_t *ns) { #define CC_XOFF_CLIENT_DFLT 500 #define CC_XOFF_CLIENT_MIN 1 #define CC_XOFF_CLIENT_MAX 10000 xoff_client = networkstatus_get_param(ns, "cc_xoff_client", CC_XOFF_CLIENT_DFLT, CC_XOFF_CLIENT_MIN, CC_XOFF_CLIENT_MAX)*RELAY_PAYLOAD_SIZE_MIN;
if (total_buffered > buffer_limit_xoff) { if (!stream->xoff_sent) { uint64_t now = monotime_absolute_usec();
if (stream->xoff_grace_period_start_usec == 0) { /* If unset, we haven't begun the XOFF grace period. We need to start. */ log_debug(LD_EDGE, "Exceeded XOFF limit; Beginning grace period: " "total-buffered=%" TOR_PRIuSZ " xoff-limit=%d", total_buffered, buffer_limit_xoff);
stream->xoff_grace_period_start_usec = now; } elseif (now > stream->xoff_grace_period_start_usec + XOFF_GRACE_PERIOD_USEC) { /* If we've exceeded our XOFF grace period, we need to send an XOFF. */ log_info(LD_EDGE, "Sending XOFF: total-buffered=%" TOR_PRIuSZ " xoff-limit=%d grace-period-dur=%" PRIu64 "usec", total_buffered, buffer_limit_xoff, now - stream->xoff_grace_period_start_usec); tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xoff_sending), stream);
/** * Send an XOFF for this stream, and note that we sent one */ staticvoid circuit_send_stream_xoff(edge_connection_t *stream) { xoff_cell_t xoff; uint8_t payload[CELL_PAYLOAD_SIZE]; ssize_t xoff_size;
/* If this is an entry conn, notify control port */ if (TO_CONN(stream)->type == CONN_TYPE_AP) { control_event_stream_status(TO_ENTRY_CONN(TO_CONN(stream)), STREAM_EVENT_XOFF_SENT, 0); } } }
/** * Called whenever we drain an edge connection outbuf by writing on * its socket, to decide if it is time to send an xon. * * The n_written parameter tells us how many bytes we have written * this time, which is used to compute the advisory drain rate fields. */ void flow_control_decide_xon(edge_connection_t *stream, size_t n_written) { size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
/* Bounds check the number of drained bytes, and scale */ if (stream->drained_bytes >= UINT32_MAX - n_written) { /* Cut the bytes in half, and move the start time up halfway to now * (if we have one). */ stream->drained_bytes /= 2;
if (stream->drain_start_usec) { uint64_t now = monotime_absolute_usec();
stream->drain_start_usec = now - (now-stream->drain_start_usec)/2; } } /* Accumulate drained bytes since last rate computation */ stream->drained_bytes += n_written;
if (stream->drain_start_usec) { /* If we have spent enough time in a queued state, update our drain * rate. */ if (stream->drained_bytes > xon_rate_bytes) { /* No previous drained bytes means it is the first time we are computing * it so use the value we just drained onto the socket as a baseline. It * won't be accurate but it will be a start towards the right value. * * We have to do this in order to have a drain rate else we could be * sending a drain rate of 0 in an XON which would be undesirable and * basically like sending an XOFF. */ if (stream->prev_drained_bytes == 0) { stream->prev_drained_bytes = stream->drained_bytes; } uint32_t drain_rate = compute_drain_rate(stream); /* Once the drain rate has been computed, note how many bytes we just * drained so it can be used at the next calculation. We do this here * because it gets reset once the rate is changed. */ stream->prev_drained_bytes = stream->drained_bytes;
if (drain_rate) { stream->ewma_drain_rate = (uint32_t)n_count_ewma(drain_rate, stream->ewma_drain_rate, xon_ewma_cnt); log_debug(LD_EDGE, "Updating drain rate: %d %d %"TOR_PRIuSZ, drain_rate, stream->ewma_drain_rate, total_buffered); tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_update), stream, drain_rate); /* Reset recent byte counts. This prevents us from sending advisory * XONs more frequent than every xon_rate_bytes. */ stream->drained_bytes = 0; stream->drain_start_usec = 0; } } }
/** * Compute the recent drain rate (write rate) for this edge * connection and return it, in KB/sec (1000 bytes/sec). * * Returns 0 if the monotime clock is busted. */ staticinlineuint32_t compute_drain_rate(constedge_connection_t *stream) { if (BUG(!is_monotime_clock_reliable())) { log_warn(LD_BUG, "Computing drain rate with stalled monotime clock"); return0; }
/* If we don't have an XOFF outstanding, consider updating an * old rate */ if (!stream->xoff_sent) { if (stream_drain_rate_changed(stream)) { /* If we are still buffering and the rate changed, update * advisory XON */ log_info(LD_EDGE, "Sending rate-change XON: %d %d %"TOR_PRIuSZ, stream->ewma_rate_last_sent, stream->ewma_drain_rate, total_buffered); tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_rate_change), stream);
/** * Process a stream XON, and if it validates, clear the xoff * flag and resume reading on this edge connection. * * Also, use provided rate information to rate limit * reading on this edge (or packagaing from it onto * the circuit), to avoid XON/XOFF chatter. * * Returns true if the XON validates, false otherwise. */ bool circuit_process_stream_xon(edge_connection_t *conn, constcrypt_path_t *layer_hint, constrelay_msg_t *msg) { xon_cell_t *xon; bool retval = true;
if (BUG(!conn)) { log_fn(LOG_PROTOCOL_WARN, LD_EDGE, "Got XON on invalid stream?"); returnfalse; }
/* Make sure this XON came from the right hop */ if (!edge_uses_cpath(conn, layer_hint)) { log_fn(LOG_PROTOCOL_WARN, LD_EDGE, "Got XON from wrong hop."); returnfalse; }
if (!edge_uses_flow_control(conn)) { log_fn(LOG_PROTOCOL_WARN, LD_EDGE, "Got XON for non-congestion control circuit"); returnfalse; }
if (xon_cell_parse(&xon, msg->body, msg->length) < 0) { log_fn(LOG_PROTOCOL_WARN, LD_EDGE, "Received malformed XON cell."); returnfalse; }
/* If we are near the max, scale everything down */ if (conn->num_xon_recv == XON_COUNT_SCALE_AT) { log_info(LD_EDGE, "Scaling down for XON count: %d %d %d", conn->total_bytes_xmit, conn->num_xoff_recv, conn->num_xon_recv); conn->total_bytes_xmit /= 2; conn->num_xoff_recv /= 2; conn->num_xon_recv /= 2; }
conn->num_xon_recv++;
/* Client-side check to make sure that XON is not sent too early, * for dropmark attacks. The main sidechannel risk is early cells, * but we also check to see that we did not get more XONs than make * sense for the number of bytes we sent. */ if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) { uint32_t limit = 0;
if (conn->total_bytes_xmit < limit*conn->num_xon_recv) { log_fn(LOG_PROTOCOL_WARN, LD_EDGE, "Got extra XON for bytes sent. Got %d, expected max %d", conn->num_xon_recv, conn->total_bytes_xmit/limit);
/* We still process this, because the only dropmark defenses * in C tor are via the vanguards addon's use of the read valid * cells. So just signal that we think this is not valid protocol * data and proceed. */ retval = false; } }
log_info(LD_EDGE, "Got XON: %d", xon->kbps_ewma);
/* Adjust the token bucket of this edge connection with the drain rate in * the XON. Rate is in bytes from kilobit (kpbs). */ uint64_t rate = ((uint64_t) xon_cell_get_kbps_ewma(xon) * 1000); if (rate == 0 || INT32_MAX < rate) { /* No rate. */ rate = INT32_MAX; } token_bucket_rw_adjust(&conn->bucket, (uint32_t) rate, (uint32_t) rate);
if (conn->xoff_received) { /* Clear the fact that we got an XOFF, so that this edge can * start and stop reading normally */ conn->xoff_received = false; connection_start_reading(TO_CONN(conn)); }
/* If this is an entry conn, notify control port */ if (TO_CONN(conn)->type == CONN_TYPE_AP) { control_event_stream_status(TO_ENTRY_CONN(TO_CONN(conn)), STREAM_EVENT_XON_RECV, 0); }