于2022年4月19日2022年4月19日由Sukuna发布
这一次我们要实现TCP的发送方,这一次我把必要的注释写在代码里面了.
1.头文件:
class TCPSender {
private:
//! our initial sequence number, the number for our SYN.
WrappingInt32 _isn;
uint64_t base{0};
//! outbound queue of segments that the TCPSender wants sent
std::queue<TCPSegment> _segments_out{};
//cached TCPSegment.
std::queue<TCPSegment> _segments_out_cached{};
//! retransmission timer for the connection
unsigned int _initial_retransmission_timeout;
//! outgoing stream of bytes that have not yet been sent
ByteStream _stream;
//nextseq numbers as the absolute TCP number.
uint64_t _next_seqnum{0};
//slide windows size
uint16_t _curr_window_size;
//isfinished?
bool _isfin;
size_t _times;
//ticking?
bool _time_waiting;
//remission times.
int _consecutive_remission;
// when is time out?
size_t _time_out;
//empty windows?
bool _window_zero;
//! the (absolute) sequence number for the next byte to be sent
uint64_t _next_seqno{0};
public:
//! Initialize a TCPSender
TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
const std::optional<WrappingInt32> fixed_isn = {});
//! \name "Input" interface for the writer
//!@{
ByteStream &stream_in() { return _stream; }
const ByteStream &stream_in() const { return _stream; }
//!@}
//! \name Methods that can cause the TCPSender to send a segment
//!@{
//! \brief A new acknowledgment was received
void ack_received(const WrappingInt32 ackno, const uint16_t window_size);
//! \brief Generate an empty-payload segment (useful for creating empty ACK segments)
void send_empty_segment();
//! \brief create and send segments to fill as much of the window as possible
void fill_window();
//! \brief Notifies the TCPSender of the passage of time
void tick(const size_t ms_since_last_tick);
//!@}
//! \name Accessors
//!@{
//! \brief How many sequence numbers are occupied by segments sent but not yet acknowledged?
//! \note count is in "sequence space," i.e. SYN and FIN each count for one byte
//! (see TCPSegment::length_in_sequence_space())
size_t bytes_in_flight() const;
//! \brief Number of consecutive retransmissions that have occurred in a row
unsigned int consecutive_retransmissions() const;
//! \brief TCPSegments that the TCPSender has enqueued for transmission.
//! \note These must be dequeued and sent by the TCPConnection,
//! which will need to fill in the fields that are set by the TCPReceiver
//! (ackno and window size) before sending.
std::queue<TCPSegment> &segments_out() { return _segments_out; }
//!@}
//! \name What is the next sequence number? (used for testing)
//!@{
//! \brief absolute seqno for the next byte to be sent
uint64_t next_seqno_absolute() const { return _next_seqno; }
//! \brief relative seqno for the next byte to be sent
WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }
//!@}
};
2.发送数据函数:
void TCPSender::fill_window() {
// windows is full or the programe is finished.
if(_curr_window_size==0||_isfin){
return;
}
//haven't send any bytes.
if(_next_seqno==0){
TCPSegment seg;
// the TCP transmission start from _isn.
seg.header().seqno = _isn;
seg.header().syn = true;
// the TCP first connection just send 1 bytes;
_next_seqno = 1;
_curr_window_size--;
_segments_out.push(seg);
_segments_out_cached.push(seg);
}
//the end of the file
else if(_stream.eof()){
//set the finish flag to true;
_isfin = true;
TCPSegment seg;
seg.header().syn=false;
seg.header().fin=true;
//convert the absolute TCP number to TCP number.
seg.header().seqno = wrap(_next_seqno,_isn);
//the fin packet only send a byte.
_next_seqno++;
_curr_window_size--;
_segments_out.push(seg);
_segments_out_cached.push(seg);
}
//normal file
else{
//make sure the windows is not full and there's any data to convert.
while(!_stream.buffer_empty()&&_curr_window_size>0){
//decide the length of the TCP Segment.
//make sure the length of TCP segment is below the silde windows size and data length.
uint64_t lens_byte=std::min(_stream.buffer_size(),uint64_t (_curr_window_size));
lens_byte=std::min(lens_byte,TCPConfig::MAX_PAYLOAD_SIZE);
TCPSegment seg;
seg.header().seqno = wrap(_next_seqno,_isn);
seg.header().syn = false;
//get the lens_byte data to the payload.
seg.payload()=_stream.read(lens_byte);
// increase the next seq_no;
_next_seqno += lens_byte;
_curr_window_size -= lens_byte;
// get the end of the file.
if(_stream.eof()&&_curr_window_size>0){
_isfin = true;
seg.header().fin=true;
//the fin packet only send a byte.
_next_seqno++;
_curr_window_size--;
}
_segments_out.push(seg);
_segments_out_cached.push(seg);
if(_isfin){
break;
}
}
}
//start ticking...
if(!_time_waiting){
_time_out = _initial_retransmission_timeout;
_time_waiting = true;
_times = 0;
}
}
3.接受ACK:
//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
DUMMY_CODE(ackno, window_size);
// get the absolute TCP number of ACK...
uint64_t acknos = unwrap(ackno,_isn,base);
//thrid connection...
//means the 0th bytes gets and desire to 1st bytes...
if(base==0&&acknos==1){
base=1;
_segments_out_cached.pop();
_consecutive_remission=0;
}
else if(acknos > _next_seqno){
return;
}
//the ack number is bigger than first cached segment...
//means the cached data gets by the reciever...
else if(!_segments_out_cached.empty() && acknos >= base + _segments_out_cached.front().length_in_sequence_space()){
//first segment in cache, and get the seqno and length of the segment...
uint64_t copy_seg_seqno = unwrap(_segments_out_cached.front().header().seqno, _isn, base);
uint64_t copy_seg_len = _segments_out_cached.front().length_in_sequence_space();
//find the segments that acked by recevier...
//hint:if seqno+len<=ackno:means the data is acked by recevier...
while(copy_seg_len+copy_seg_seqno<=acknos){
//move the base, base is the 1st bytes that nor acked...
base += _segments_out_cached.front().length_in_sequence_space();
_segments_out_cached.pop();
if(_segments_out_cached.empty()) break;
// judge the 2nd segs...
copy_seg_seqno = unwrap(_segments_out_cached.front().header().seqno, _isn, base);
copy_seg_len = _segments_out_cached.front().length_in_sequence_space();
}
_time_out = _initial_retransmission_timeout;
_times = 0;
_consecutive_remission = 0;
}
// 3rd disconnection.
else if(acknos == _next_seqno && _isfin){
base = acknos;
_segments_out_cached.pop();
}
//the windows is empty
if(_next_seqno-base==0){
_time_waiting = false;
}
// 流量控制,发送方窗口不大于接受方窗口
else if(_next_seqno-base>=window_size){
_curr_window_size = 0;
return;
}
if(window_size==0){
_curr_window_size = 1;
_window_zero = true;
}
else{
_curr_window_size = window_size;
_window_zero = false;
_consecutive_remission = 0;
}
fill_window();
}
4. 构造函数
//! \param[in] capacity the capacity of the outgoing byte stream
//! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
//! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
, base(0)
, _initial_retransmission_timeout{retx_timeout}
, _stream(capacity)
, _curr_window_size(1)
, _isfin(false)
, _times(0)
, _time_waiting(false)
, _consecutive_remission(0)
, _time_out(0)
, _window_zero(false)
{
}
5.超时处理:
//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
DUMMY_CODE(ms_since_last_tick);
// the times pased by
_times += ms_since_last_tick;
//timeout and non-empty cache. resend...
if(!_segments_out_cached.empty()&&_time_waiting&&_times>=_time_out){
//resend..
_segments_out.push(_segments_out_cached.front());
// increase the time out times...
if(!_window_zero){
//add remissions
_consecutive_remission++;
_time_out*=2;
_time_waiting = true;
}
_times=0;
}
}