00001 00033 #include <itpp/protocol/tcp.h> 00034 #include <itpp/base/itfile.h> 00035 #include <limits> 00036 #include <cstdlib> 00037 #include <ctime> 00038 00040 00041 #ifdef _MSC_VER 00042 #pragma warning(disable:4355) 00043 #endif 00044 00045 namespace itpp { 00046 00047 // -------------------- Default parameters ---------------------------------- 00048 00049 // TCP sender and receiver 00050 00051 #define TCP_HEADERLENGTH 40 00052 00053 // TCP sender 00054 00055 #define TCP_VERSION kReno 00056 #define TCP_SMSS 1460 00057 #define TCP_INITIALCWNDREL 2 // related to MSS 00058 #define TCP_INITIALSSTHRESHREL 1 // related to MaxCWnd 00059 #define TCP_MAXCWNDREL 32 // related to MSS 00060 #define TCP_DUPACKS 3 00061 #define TCP_INITIALRTT 1 00062 const double TCP_STIMERGRAN = 0.2; 00063 const double TCP_SWSATIMERVALUE = 0.2; 00064 #define TCP_MAXBACKOFF 64 00065 const double TCP_MAXRTO = std::numeric_limits<double>::max(); 00066 #define TCP_IMMEDIATEBACKOFFRESET false 00067 #define TCP_TIMESTAMPS false 00068 #define TCP_KARN true 00069 #define TCP_NAGLE false 00070 #define TCP_GOBACKN true 00071 #define TCP_FLIGHTSIZERECOVERY false 00072 #define TCP_RENOCONSERVATION true 00073 #define TCP_CAREFULSSTHRESHREDUCTION true 00074 #define TCP_IGNOREDUPACKONTORECOVERY true 00075 #define TCP_CAREFULMULFASTRTXAVOIDANCE true 00076 #define TCP_RESTARTAFTERIDLE true 00077 00078 // TCP receiver 00079 00080 #define TCP_RMSS 1460 00081 const int TCP_BUFFERSIZE = std::numeric_limits<int>::max()/4; 00082 #define TCP_DELAYEDACK true 00083 const double TCP_ACKDELAYTIME = 0.2; 00084 #define TCP_SENDPERIODICACKS false 00085 #define TCP_STRICTPERIODICACKS false 00086 #define TCP_PERIODICACKINTERVAL 1 00087 #define TCP_ACKSCHEDULINGDELAY 0 00088 #define TCP_ACKBUFFERWRITE false 00089 #define TCP_ACKBUFFERREAD true 00090 const int TCP_MAXUSERBLOCKSIZE = std::numeric_limits<int>::max()/4; 00091 #define TCP_MINUSERBLOCKSIZE 1 00092 #define TCP_USERBLOCKPROCDELAY 0 00093 00094 // TCP generator 00095 00096 #define TCPGEN_BLOCKSIZE 1460 00097 00098 // TCP applications 00099 00100 #define TCPAPP_MAXNOOFACTIVEAPPS 500 00101 #define TCPAPP_DISTSTATARRAYSIZE 100 00102 #define TCPAPP_DISTSTATMAXGOODPUT 1000 00103 #define TCPAPP_DISTSTATMAXTRANSFERTIME 10000 00104 #define TCPAPP_CONDMEANSTATARRAYSIZE 100 00105 #define TCPAPP_CONDMEANSTATMAXREQSIZE 100000 00106 00107 00108 00109 inline int min(int opd1, int opd2) 00110 { 00111 return (opd1 < opd2)? opd1 : opd2; 00112 } 00113 00114 00115 inline int max(int opd1, int opd2) 00116 { 00117 return (opd1 > opd2)? opd1 : opd2; 00118 } 00119 00120 00121 // round is used to map a double value (e.g. RTO in TTCPSender) to the 00122 // next higher value of a certain granularity (e.g. timer granularity). 00123 inline double round (const double value, const double granularity) 00124 { 00125 return (std::ceil(value / granularity) * granularity); 00126 } 00127 00128 // -------------------- TCP_Segment ---------------------------------------- 00129 00130 TCP_Segment::TCP_Segment() : 00131 seq_begin(), 00132 seq_end() 00133 { 00134 } 00135 00136 TCP_Segment::TCP_Segment(const Sequence_Number &sn_begin, const Sequence_Number &sn_end) : 00137 seq_begin(sn_begin), 00138 seq_end(sn_end) 00139 { 00140 it_assert(seq_begin <= seq_end, "TCP_Segment::TCP_Segment, end byte " + to_str(seq_end.value()) + 00141 " < begin byte " + to_str(seq_begin.value())); 00142 } 00143 00144 00145 TCP_Segment::TCP_Segment(const TCP_Segment &segment) : 00146 seq_begin(segment.seq_begin), 00147 seq_end(segment.seq_end) 00148 { 00149 } 00150 00151 00152 TCP_Segment &TCP_Segment::operator=(const TCP_Segment &segment) 00153 { 00154 this->seq_begin = segment.seq_begin; 00155 this->seq_end = segment.seq_end; 00156 00157 return *this; 00158 } 00159 00160 00161 void TCP_Segment::combine(const TCP_Segment &segment) 00162 { 00163 it_assert(can_be_combined(segment), "TCP_Segment::CombineWith, segments cannot be combined"); 00164 00165 seq_begin = min(seq_begin, segment.seq_begin); 00166 seq_end = max(seq_end, segment.seq_end); 00167 } 00168 00169 00170 std::ostream & operator<<(std::ostream &os, const TCP_Segment &segment) 00171 { 00172 os << "(" << segment.seq_begin << "," << segment.seq_end << ")"; 00173 return os; 00174 } 00175 00176 00177 // -------------------- TCP_Packet ---------------------------------------- 00178 TCP_Packet::TCP_Packet() : 00179 fSegment(), 00180 fACK(), 00181 fWnd(0), 00182 fSessionId(0), 00183 fInfo(0) 00184 { 00185 } 00186 00187 00188 TCP_Packet::TCP_Packet(const TCP_Packet &packet) : 00189 fSegment(packet.fSegment), 00190 fACK(packet.fACK), 00191 fWnd(packet.fWnd), 00192 fSessionId(packet.fSessionId), 00193 fInfo(0) 00194 { 00195 std::cout << "TCP_Packet::TCP_Packet ############" << " "; 00196 00197 if (packet.fInfo != 0) { 00198 std::cout << "TCP_Packet::TCP_Packet rhs.fInfo ###########" << " "; 00199 fInfo = new TDebugInfo(*packet.fInfo); 00200 } 00201 } 00202 00203 00204 TCP_Packet::~TCP_Packet() 00205 { 00206 delete fInfo; 00207 } 00208 00209 00210 TCP_Packet & TCP_Packet::clone() const 00211 { 00212 return *new TCP_Packet(*this); 00213 } 00214 00215 00216 void TCP_Packet::set_info(unsigned ssThresh, unsigned recWnd, unsigned cWnd, 00217 double estRTT, Sequence_Number sndUna, 00218 Sequence_Number sndNxt, bool isRtx) 00219 { 00220 if (fInfo == 0) { 00221 fInfo = new TDebugInfo; 00222 } 00223 00224 fInfo->fSSThresh = ssThresh; 00225 fInfo->fRecWnd = recWnd; 00226 fInfo->fCWnd = cWnd; 00227 fInfo->fRTTEstimate = estRTT; 00228 fInfo->fSndUna = sndUna; 00229 fInfo->fSndNxt = sndNxt; 00230 fInfo->fRtxFlag = isRtx; 00231 } 00232 00233 00234 void TCP_Packet::print_header(std::ostream &out) const 00235 { 00236 std::cout << "Hello!\n"; 00237 00238 std::cout << "Ses = " << get_session_id() << " "; 00239 00240 std::cout << "Segment = " << get_segment() << " " 00241 << "ACK = " << get_ACK() << " " 00242 << "Wnd = " << get_wnd() << " "; 00243 00244 std::cout << "DestPort = " << fDestinationPort << " " 00245 << "SourcePort = " << fSourcePort << " "; 00246 00247 00248 if (fInfo != 0) { 00249 std::cout << "SndSSThresh = " << fInfo->fSSThresh << " "; 00250 std::cout << "RecWnd = " << fInfo->fRecWnd << " "; 00251 std::cout << "SndCWnd = " << fInfo->fCWnd << " "; 00252 std::cout << "RTTEstimate = " << fInfo->fRTTEstimate << " "; 00253 std::cout << "RtxFlag = " << fInfo->fRtxFlag; 00254 } 00255 else 00256 std::cout << "fInfo = " << fInfo << " "; 00257 00258 std::cout << std::endl; 00259 00260 } 00261 00262 00263 00264 std::ostream & operator<<(std::ostream & out, TCP_Packet & msg) 00265 { 00266 msg.print_header(out); 00267 return out; 00268 } 00269 00270 00271 // -------------------- TCP_Sender ---------------------------------------- 00272 TCP_Sender::TCP_Sender(int label) : 00273 fLabel(label), 00274 fTCPVersion(TCP_VERSION), 00275 fMSS(TCP_SMSS), 00276 fTCPIPHeaderLength(TCP_HEADERLENGTH), 00277 fInitialRTT(TCP_INITIALRTT), 00278 fInitialCWnd(0), // default initialization see below 00279 fInitialSSThresh(0), // default initialization see below 00280 fMaxCWnd(0), // default initialization see below 00281 fDupACKThreshold(TCP_DUPACKS), 00282 fTimerGranularity(TCP_STIMERGRAN), 00283 fMaxRTO(TCP_MAXRTO), 00284 fMaxBackoff(TCP_MAXBACKOFF), 00285 fImmediateBackoffReset(TCP_IMMEDIATEBACKOFFRESET), 00286 fKarn(TCP_KARN), 00287 fGoBackN(TCP_GOBACKN), 00288 fFlightSizeRecovery(TCP_FLIGHTSIZERECOVERY), 00289 fRenoConservation(TCP_RENOCONSERVATION), 00290 fCarefulSSThreshReduction(TCP_CAREFULSSTHRESHREDUCTION), 00291 fIgnoreDupACKOnTORecovery(TCP_IGNOREDUPACKONTORECOVERY), 00292 fCarefulMulFastRtxAvoidance(TCP_CAREFULMULFASTRTXAVOIDANCE), 00293 fNagle(TCP_NAGLE), 00294 fSWSATimerValue(TCP_SWSATIMERVALUE), 00295 fRestartAfterIdle(TCP_RESTARTAFTERIDLE), 00296 fDebug(false), 00297 fTrace(false), 00298 fSessionId(0), 00299 fRtxTimer(*this, &TCP_Sender::HandleRtxTimeout), 00300 fSWSATimer(*this, &TCP_Sender::HandleSWSATimeout)/*,*/ 00301 { 00302 00303 // default values and parameter check for MaxCWND, InitCWND, InitSSThresh 00304 if (fMaxCWnd == 0) { 00305 fMaxCWnd = (unsigned)(TCP_MAXCWNDREL * fMSS); 00306 } else if (fMaxCWnd < fMSS) { 00307 // throw (UL_CException("TCP_Sender::TCP_Sender", 00308 // "MaxCWnd must be >= MSS")); 00309 } 00310 00311 if (fInitialCWnd == 0) { 00312 fInitialCWnd = (unsigned)(TCP_INITIALCWNDREL * fMSS); 00313 } else if ((fInitialCWnd < fMSS) || (fInitialCWnd > fMaxCWnd)) { 00314 // throw (UL_CException("TCP_Sender::TCP_Sender", 00315 // "initial CWnd must be >= MSS and <= MaxCWnd")); 00316 } 00317 00318 if ((fInitialSSThresh == 0) && (fMaxCWnd >= 2 * fMSS)) { 00319 fInitialSSThresh = (unsigned)(TCP_INITIALSSTHRESHREL * fMaxCWnd); 00320 } else if ((fInitialSSThresh < 2*fMSS) || (fInitialCWnd > fMaxCWnd)) { 00321 // throw (UL_CException("TCP_Sender::TCP_Sender", 00322 // "initial CWnd must be >= 2*MSS and <= MaxCWnd")); 00323 } 00324 00325 setup(); 00326 00327 InitStatistics(); 00328 00329 00330 tcp_send.set_name("TCP Send"); 00331 tcp_receive_ack.forward(this, &TCP_Sender::ReceiveMessageFromNet); 00332 tcp_receive_ack.set_name("TCP ACK"); 00333 tcp_socket_write.forward(this, &TCP_Sender::HandleUserMessageIndication); 00334 tcp_socket_write.set_name("SocketWrite"); 00335 tcp_release.forward(this, &TCP_Sender::release); 00336 tcp_release.set_name("Release"); 00337 00338 } 00339 00340 00341 TCP_Sender::~TCP_Sender () 00342 { 00343 } 00344 00345 void TCP_Sender::set_debug(const bool enable_debug) 00346 { 00347 fDebug = enable_debug; 00348 tcp_send.set_debug(enable_debug); 00349 } 00350 00351 void TCP_Sender::set_debug(bool enable_debug, bool enable_signal_debug) 00352 { 00353 fDebug = enable_debug; 00354 tcp_send.set_debug(enable_signal_debug); 00355 } 00356 00357 void TCP_Sender::set_trace(const bool enable_trace) 00358 { 00359 fTrace = enable_trace; 00360 } 00361 00362 void TCP_Sender::set_label(int label) 00363 { 00364 fLabel = label; 00365 } 00366 00367 void TCP_Sender::setup() 00368 { 00369 fSndUna = 0; 00370 fSndNxt = 0; 00371 fSndMax = 0; 00372 fMaxRecWnd = 0; 00373 fRecWnd = fMaxCWnd; 00374 fUserNxt = 0; 00375 fCWnd = fInitialCWnd; 00376 fSSThresh = fInitialSSThresh; 00377 fRecoveryDupACK = 0; 00378 fRecoveryTO = 0; 00379 fDupACKCnt = 0; 00380 00381 // timers 00382 fBackoff = 1; 00383 fPendingBackoffReset = false; 00384 fLastSendTime = Event_Queue::now(); 00385 00386 // RTT measurement 00387 fTimUna = 0; 00388 fSRTT = 0; 00389 fRTTVar = 0; 00390 fRTTEstimate = fInitialRTT; 00391 fRTTMPending = false; 00392 fRTTMByte = 0; 00393 00394 CWnd_val.set_size(1000); 00395 CWnd_val.zeros(); 00396 CWnd_time.set_size(1000); 00397 CWnd_time.zeros(); 00398 CWnd_val(0) = fInitialCWnd; 00399 CWnd_time(0) = 0; 00400 CWnd_index=1; 00401 00402 SSThresh_val.set_size(1000); 00403 SSThresh_val.zeros(); 00404 SSThresh_time.set_size(1000); 00405 SSThresh_time.zeros(); 00406 SSThresh_val(0) = fInitialSSThresh; 00407 SSThresh_time(0) = 0; 00408 SSThresh_index=1; 00409 00410 sent_seq_num_val.set_size(1000); 00411 sent_seq_num_val.zeros(); 00412 sent_seq_num_time.set_size(1000); 00413 sent_seq_num_time.zeros(); 00414 sent_seq_num_val(0) = 0; 00415 sent_seq_num_time(0) = 0; 00416 sent_seq_num_index=1; 00417 00418 sender_recv_ack_seq_num_val.set_size(1000); 00419 sender_recv_ack_seq_num_val.zeros(); 00420 sender_recv_ack_seq_num_time.set_size(1000); 00421 sender_recv_ack_seq_num_time.zeros(); 00422 sender_recv_ack_seq_num_val(0) = 0; 00423 sender_recv_ack_seq_num_time(0) = 0; 00424 sender_recv_ack_seq_num_index=1; 00425 00426 RTTEstimate_val.set_size(1000); 00427 RTTEstimate_val.zeros(); 00428 RTTEstimate_time.set_size(1000); 00429 RTTEstimate_time.zeros(); 00430 RTTEstimate_val(0) = fInitialRTT; 00431 RTTEstimate_time(0) = 0; 00432 RTTEstimate_index=1; 00433 00434 RTTsample_val.set_size(1000); 00435 RTTsample_val.zeros(); 00436 RTTsample_time.set_size(1000); 00437 RTTsample_time.zeros(); 00438 RTTsample_val(0) = 0; 00439 RTTsample_time(0) = 0; 00440 RTTsample_index=1; 00441 00442 } 00443 00444 std::string TCP_Sender::GenerateFilename() 00445 { 00446 time_t rawtime; 00447 struct tm *timeinfo; 00448 timeinfo = localtime(&rawtime); 00449 std::ostringstream filename_stream; 00450 filename_stream << "trace_tcp_sender_u" << fLabel 00451 << "_" << 1900+timeinfo->tm_year 00452 << "_" << timeinfo->tm_mon 00453 << "_" << timeinfo->tm_mday 00454 << "__" << timeinfo->tm_hour 00455 << "_" << timeinfo->tm_min 00456 << "_" << timeinfo->tm_sec 00457 << "_.it"; 00458 return filename_stream.str(); 00459 } 00460 00461 00462 void TCP_Sender::release(std::string file) 00463 { 00464 std::string filename; 00465 fSessionId++; 00466 00467 fRtxTimer.Reset(); 00468 fSWSATimer.Reset(); 00469 00470 if (fTrace) { 00471 if (file == "") 00472 filename = GenerateFilename(); 00473 else 00474 filename = file; 00475 00476 save_trace(filename); 00477 } 00478 } 00479 00480 00481 void TCP_Sender::InitStatistics() 00482 { 00483 fNumberOfTimeouts = 0; 00484 fNumberOfIdleTimeouts = 0; 00485 fNumberOfFastRetransmits = 0; 00486 fNumberOfRTTMeasurements = 0; 00487 fNumberOfReceivedACKs = 0; 00488 } 00489 00490 00491 void TCP_Sender::StopTransientPhase() 00492 { 00493 InitStatistics(); 00494 } 00495 00496 00497 void TCP_Sender::HandleUserMessageIndication(itpp::Packet *user_data_p) 00498 { 00499 if (fDebug) { 00500 std::cout << "TCP_Sender::HandleUserMessageIndication" 00501 << " byte_size=" << user_data_p->bit_size()/8 00502 << " ptr=" << user_data_p 00503 << " time=" << Event_Queue::now() << std::endl; 00504 } 00505 00506 SocketWriteQueue.push(user_data_p); 00507 00508 SendNewData(); // will call GetMessage (via GetNextSegmentSize) 00509 // if new data can be sent 00510 } 00511 00512 00513 void TCP_Sender::ReceiveMessageFromNet(itpp::Packet *msg) 00514 { 00515 TCP_Packet & packet = (TCP_Packet &)*msg; 00516 00517 if (fDebug) { 00518 std::cout << "TCP_Sender::ReceiveMessageFromNet" 00519 << " byte_size=" << msg->bit_size()/8 00520 << " ptr=" << msg 00521 << " time=" << Event_Queue::now() << std::endl; 00522 } 00523 00524 if((packet.get_session_id() == fSessionId) && // ACK of current session 00525 (packet.get_ACK() >= fSndUna)) { // ACK is OK 00526 HandleACK(packet); 00527 } 00528 00529 delete &packet; 00530 } 00531 00532 00533 void TCP_Sender::HandleACK(TCP_Packet &msg) 00534 { 00535 it_assert(msg.get_ACK() <= fSndMax, "TCP_Sender::HandleACK, received ACK > SndMax at "); 00536 00537 fNumberOfReceivedACKs++; 00538 00539 if (fTrace) { 00540 TraceACKedSeqNo(msg.get_ACK()); 00541 } 00542 00543 if (fDebug) { 00544 std::cout << "sender " << fLabel << ": " 00545 << "receive ACK: " 00546 << " t = " << Event_Queue::now() << ", " 00547 << msg << std::endl; 00548 } 00549 00550 // update receiver advertised window size 00551 fRecWnd = msg.get_wnd(); 00552 fMaxRecWnd = max(fRecWnd, fMaxRecWnd); 00553 00554 if (msg.get_ACK() == fSndUna) { // duplicate ACK 00555 00556 bool ignoreDupACK = (fSndMax == fSndUna); // no outstanding data 00557 00558 if (fIgnoreDupACKOnTORecovery) { 00559 // don't count dupacks during TO recovery! 00560 if (fCarefulMulFastRtxAvoidance) { // see RFC 2582, Section 5 00561 // like in Solaris 00562 ignoreDupACK = ignoreDupACK || (fSndUna <= fRecoveryTO); 00563 } else { 00564 // like in ns 00565 ignoreDupACK = ignoreDupACK || (fSndUna < fRecoveryTO); 00566 } 00567 } 00568 00569 if (!ignoreDupACK) { 00570 fDupACKCnt++; // count the number of duplicate ACKs 00571 00572 if (fDupACKCnt == fDupACKThreshold) { 00573 // dupack threshold is reached 00574 fNumberOfFastRetransmits++; 00575 00576 fRecoveryDupACK = fSndMax; 00577 00578 ReduceSSThresh(); // halve ssthresh (in most cases) 00579 00580 if ((fTCPVersion == kReno) || (fTCPVersion == kNewReno)) { 00581 fCWnd = fSSThresh; 00582 } else if (fTCPVersion == kTahoe) { 00583 fCWnd = fMSS; 00584 } 00585 00586 if (fTCPVersion == kReno || fTCPVersion == kNewReno) { 00587 // conservation of packets: 00588 if (fRenoConservation) { 00589 fCWnd += fDupACKThreshold * fMSS; 00590 } 00591 } else if (fTCPVersion == kTahoe) { 00592 if (fGoBackN) { 00593 fSndNxt = fSndUna; // Go-Back-N (like in ns) 00594 } 00595 } 00596 00597 UnaRetransmit(); // initiate retransmission 00598 } else if (fDupACKCnt > fDupACKThreshold) { 00599 if (fTCPVersion == kReno || fTCPVersion == kNewReno) { 00600 // conservation of packets 00601 // CWnd may exceed MaxCWnd during fast recovery, 00602 // however, the result of SendWindow() is always <= MaxCwnd 00603 if (fRenoConservation) { 00604 fCWnd += fMSS; 00605 } 00606 } 00607 } 00608 } 00609 } else { // new ACK 00610 Sequence_Number oldSndUna = fSndUna; // required for NewReno partial ACK 00611 fSndUna = msg.get_ACK(); 00612 fSndNxt = max(fSndNxt, fSndUna); // required in case of "Go-Back-N" 00613 00614 // reset retransmission timer 00615 00616 if ((fSndUna > fTimUna) && fRtxTimer.IsPending()) { 00617 // seq. no. for which rtx timer is running has been received 00618 fRtxTimer.Reset(); 00619 } 00620 00621 // backoff reset 00622 00623 if (fImmediateBackoffReset) { 00624 fBackoff = 1; 00625 } else { 00626 if (fPendingBackoffReset) { 00627 fBackoff = 1; 00628 fPendingBackoffReset = false; 00629 } else if (fBackoff > 1) { 00630 // reset backoff counter only on next new ACK (this is probably 00631 // the way to operate intended by Karn) 00632 fPendingBackoffReset = true; 00633 } 00634 } 00635 00636 // RTT measurement 00637 00638 if ((fSndUna > fRTTMByte) && fRTTMPending) { 00639 UpdateRTTVariables(Event_Queue::now() - fRTTMStartTime); 00640 fRTTMPending = false; 00641 } 00642 00643 // update CWnd and reset dupack counter 00644 00645 if (fDupACKCnt >= fDupACKThreshold) { 00646 // we are in fast recovery 00647 if (fTCPVersion == kNewReno && fSndUna < fRecoveryDupACK) { 00648 // New Reno partial ACK handling 00649 if (fRenoConservation) { 00650 fCWnd = max(fMSS, fCWnd - (fSndUna - oldSndUna) + fMSS); 00651 } 00652 UnaRetransmit(); // start retransmit immediately 00653 } else { 00654 FinishFastRecovery(); 00655 } 00656 } else { 00657 // no fast recovery 00658 fDupACKCnt = 0; 00659 if (fCWnd < fSSThresh) { 00660 // slow start phase 00661 fCWnd = min (fCWnd + fMSS, fMaxCWnd); 00662 } else { 00663 // congestion avoidance phase 00664 fCWnd += max (fMSS * fMSS / fCWnd, 1); // RFC 2581 00665 fCWnd = min (fCWnd, fMaxCWnd); 00666 } 00667 } 00668 } // new ACK 00669 00670 SendNewData(); // try to send new data (even in the case that a retransmit 00671 // had to be performed) 00672 00673 if (fTrace) { 00674 TraceCWnd(); 00675 } 00676 } 00677 00678 00679 void TCP_Sender::SendNewData(bool skipSWSA) 00680 { 00681 unsigned nextSegmentSize; 00682 00683 it_assert(fSndUna <= fSndNxt, "TCP_Sender::SendNewData, SndUna > SndNxt in sender " + to_str(fLabel) + "!"); 00684 00685 if (fRestartAfterIdle) { 00686 IdleCheck(); 00687 } 00688 00689 bool sillyWindowAvoidanceFailed = false; 00690 00691 while (!sillyWindowAvoidanceFailed && 00692 ((nextSegmentSize = GetNextSegmentSize(fSndNxt)) > 0)) 00693 { 00694 // there is new data to send and window is large enough 00695 00696 // SWSA and Nagle (RFC 1122): assume PUSH to be set 00697 unsigned queuedUnsent = fUserNxt - fSndNxt; 00698 unsigned usableWindow = max(0, (fSndUna + SendWindow()) - fSndNxt); 00699 00700 if (((unsigned)min(queuedUnsent, usableWindow) >= fMSS) || 00701 ((!fNagle || (fSndUna == fSndNxt)) && 00702 ((queuedUnsent <= usableWindow) || // Silly W. A. 00703 ((unsigned)min(queuedUnsent, usableWindow) >= fMaxRecWnd / 2) 00704 ) 00705 ) || 00706 skipSWSA 00707 ) { 00708 // Silly Window Syndrome Avoidance (SWSA) and Nagle passed 00709 00710 TCP_Segment nextSegment(fSndNxt, fSndNxt + nextSegmentSize); 00711 TCP_Packet & msg = * new TCP_Packet (); 00712 00713 msg.set_segment(nextSegment); 00714 msg.set_session_id(fSessionId); 00715 msg.set_destination_port(fLabel); // The dest and src port are set to the same 00716 msg.set_source_port(fLabel); // number for simplicity. 00717 msg.set_bit_size(8 * (nextSegmentSize + fTCPIPHeaderLength)); 00718 00719 if (fDebug) { 00720 std::cout << "TCP_Sender::SendNewData," 00721 << " nextSegmentSize=" << nextSegmentSize 00722 << " fTCPIPHeaderLength=" << fTCPIPHeaderLength 00723 << " byte_size=" << msg.bit_size()/8 00724 << " ptr=" << &msg 00725 << " time=" << Event_Queue::now() << std::endl; 00726 } 00727 00728 // no RTT measurement for retransmitted segments 00729 // changes on Dec. 13. 2002 (Ga, Bo, Scharf) 00730 00731 if (!fRTTMPending && fSndNxt >= fSndMax) { // ##Bo## 00732 fRTTMStartTime = Event_Queue::now(); 00733 fRTTMByte = nextSegment.begin(); 00734 fRTTMPending = true; 00735 } 00736 00737 fSndNxt += nextSegmentSize; 00738 fSndMax = max(fSndNxt, fSndMax); 00739 00740 // reset SWSA timer if necessary 00741 if (skipSWSA) { 00742 skipSWSA = false; 00743 } else if (fSWSATimer.IsPending()) { 00744 fSWSATimer.Reset(); 00745 } 00746 00747 // set rtx timer if necessary 00748 if (!fRtxTimer.IsPending()) { 00749 SetRtxTimer(); 00750 } 00751 00752 00753 if (fDebug) { 00754 msg.set_info(fSSThresh, fRecWnd, fCWnd, fRTTEstimate, 00755 fSndUna, fSndNxt, false); 00756 std::cout << "sender " << fLabel 00757 << ": send new data: " 00758 << " t = " << Event_Queue::now() << ", " 00759 << msg << std::endl; 00760 } 00761 00762 SendMsg(msg); 00763 } else { 00764 sillyWindowAvoidanceFailed = true; 00765 // set SWSA timer 00766 if (!fSWSATimer.IsPending()) { 00767 fSWSATimer.Set(fSWSATimerValue); 00768 } 00769 } 00770 } 00771 00772 // set timers in case that no new data could have been sent 00773 if (!fRtxTimer.IsPending()) { 00774 if (fSndMax > fSndUna) { // there is outstanding data 00775 if (!fImmediateBackoffReset && fPendingBackoffReset) { 00776 // backoff is reset if no new data could have been sent since last 00777 // (successfull) retransmission; this is useful in case of 00778 // Reno recovery and multiple losses to avoid that in 00779 // the (unavoidable) series of timeouts the timer value 00780 // increases exponentially as this is not the intention 00781 // of the delayed backoff reset in Karn's algorithm 00782 fBackoff = 1; 00783 fPendingBackoffReset = false; 00784 } 00785 SetRtxTimer(); 00786 } 00787 } 00788 } 00789 00790 00791 void TCP_Sender::UnaRetransmit() 00792 { 00793 // resend after timeout or fast retransmit 00794 unsigned nextSegmentSize = GetNextSegmentSize(fSndUna); 00795 00796 if (nextSegmentSize > 0) { 00797 TCP_Segment nextSegment(fSndUna, fSndUna + nextSegmentSize); 00798 TCP_Packet & msg = *new TCP_Packet(); 00799 msg.set_segment(nextSegment); 00800 msg.set_session_id(fSessionId); 00801 msg.set_destination_port(fLabel); // The dest and src port are set to the same 00802 msg.set_source_port(fLabel); // number for simplicity. 00803 msg.set_bit_size(8 * (nextSegmentSize + fTCPIPHeaderLength)); 00804 00805 fSndNxt = max(fSndNxt, fSndUna + nextSegmentSize); 00806 fSndMax = max(fSndNxt, fSndMax); 00807 00808 // The RTT measurement is cancelled if the RTTM byte has a sequence 00809 // number higher or equal than the first retransmitted byte as 00810 // the ACK for the RTTM byte will be delayed by the rtx for at least 00811 // one round 00812 if (fKarn && (nextSegment.begin() <= fRTTMByte) && fRTTMPending) { 00813 fRTTMPending = false; 00814 } 00815 00816 SetRtxTimer(); 00817 00818 if (fDebug) { 00819 msg.set_info(fSSThresh, fRecWnd, fCWnd, fRTTEstimate, 00820 fSndUna, fSndNxt, true); 00821 std::cout << "sender " << fLabel; 00822 if (fDupACKCnt >= fDupACKThreshold) { 00823 std::cout << ": fast rtx: "; 00824 } else { 00825 std::cout << ": TO rtx: "; 00826 } 00827 std::cout << " t = " << Event_Queue::now() << ", " 00828 << msg << std::endl; 00829 } 00830 00831 SendMsg(msg); 00832 } else { 00833 // throw(UL_CException("TCP_Sender::UnaRetransmit", "no bytes to send")); 00834 } 00835 } 00836 00837 00838 void TCP_Sender::FinishFastRecovery() 00839 { 00840 if (fTCPVersion == kTahoe) { 00841 fDupACKCnt = 0; 00842 } else if (fTCPVersion == kReno) { 00843 // Reno fast recovery 00844 fDupACKCnt = 0; 00845 if (fFlightSizeRecovery) { 00846 fCWnd = min(fSndMax - fSndUna + fMSS, fSSThresh); 00847 } else { 00848 fCWnd = fSSThresh; 00849 } 00850 } else if (fTCPVersion == kNewReno) { 00851 // New Reno fast recovery 00852 // "Set CWnd to ... min (ssthresh, FlightSize + MSS) 00853 // ... or ssthresh" (RFC 2582) 00854 if (fFlightSizeRecovery) { 00855 fCWnd = min(fSndMax - fSndUna + fMSS, fSSThresh); 00856 } else { 00857 fCWnd = fSSThresh; 00858 } 00859 fDupACKCnt = 0; 00860 } 00861 } 00862 00863 00864 void TCP_Sender::ReduceSSThresh() 00865 { 00866 if (fCarefulSSThreshReduction) { 00867 // If Reno conservation is enabled the amount of 00868 // outstanding data ("flight size") might be rather large 00869 // and even larger than twice the old ssthresh value; 00870 // so this corresponds more to the ns behaviour where always cwnd is 00871 // taken instead of flight size. 00872 fSSThresh = max(2 * fMSS, 00873 min(min(fCWnd, fSndMax - fSndUna), fRecWnd) / 2); 00874 } else { 00875 // use filght size / 2 as recommended in RFC 2581 00876 fSSThresh = max(2 * fMSS, min(fSndMax - fSndUna, fRecWnd) / 2); 00877 } 00878 00879 it_assert(fSSThresh <= fMaxCWnd, "TCP_Sender::HandleACK, internal error: SndSSThresh is > MaxCWnd"); 00880 00881 if (fTrace) { 00882 TraceSSThresh(); 00883 } 00884 } 00885 00886 00887 void TCP_Sender::SendMsg(TCP_Packet &msg) 00888 { 00889 if (fTrace) { 00890 TraceSentSeqNo(msg.get_segment().end()); 00891 } 00892 00893 if (fRestartAfterIdle) { 00894 fLastSendTime = Event_Queue::now(); // needed for idle detection 00895 } 00896 00897 tcp_send(&msg); 00898 } 00899 00900 00901 void TCP_Sender::IdleCheck() 00902 { 00903 // idle detection according to Jacobson, SIGCOMM, 1988: 00904 // sender is currently idle and nothing has been send since RTO 00905 00906 if (fSndMax == fSndUna && Event_Queue::now() - fLastSendTime > CalcRTOValue()) { 00907 fCWnd = fInitialCWnd; // see RFC2581 00908 00909 fNumberOfIdleTimeouts++; 00910 00911 if (fTrace) { 00912 TraceCWnd(); 00913 } 00914 00915 if (fDebug) { 00916 std::cout << "sender " << fLabel 00917 << ": idle timeout: " 00918 << "t = " << Event_Queue::now() 00919 << ", SndNxt = " << fSndNxt 00920 << ", SndUna = " << fSndUna 00921 << ", Backoff = " << fBackoff 00922 << std::endl; 00923 } 00924 } 00925 } 00926 00927 00928 void TCP_Sender::HandleRtxTimeout(Ttype time) 00929 { 00930 fNumberOfTimeouts++; 00931 00932 // update backoff 00933 fBackoff = min(fMaxBackoff, fBackoff * 2); 00934 if (!fImmediateBackoffReset) { 00935 fPendingBackoffReset = false; 00936 } 00937 00938 if (fDupACKCnt >= fDupACKThreshold) { 00939 FinishFastRecovery(); // reset dup ACK cnt and CWnd 00940 } else if (fDupACKCnt > 0) { 00941 fDupACKCnt = 0; // don't allow dupack action during TO recovery 00942 } 00943 00944 // update CWnd and SSThresh 00945 ReduceSSThresh(); // halve ssthresh (in most cases) 00946 fCWnd = fMSS; // not initial CWnd, see RFC 2581 00947 00948 it_assert(fSSThresh <= fMaxCWnd, "TCP_Sender::HandleRtxTimeout, internal error: SndSSThresh is > MaxCWnd"); 00949 00950 fRecoveryTO = fSndMax; 00951 00952 if (fGoBackN) { 00953 // go back N is mainly relevant in the case of multiple losses 00954 // which would lead to a series of timeouts without resetting sndnxt 00955 fSndNxt = fSndUna; 00956 } 00957 00958 if (fDebug) { 00959 std::cout << "sender " << fLabel 00960 << ": rtx timeout: " 00961 << "t = " << Event_Queue::now() 00962 << ", SndNxt = " << fSndNxt 00963 << ", SndUna = " << fSndUna 00964 << std::endl; 00965 } 00966 00967 if (fTrace) { 00968 TraceCWnd(); 00969 } 00970 00971 UnaRetransmit(); // initiate retransmission 00972 } 00973 00974 00975 void TCP_Sender::HandleSWSATimeout(Ttype) 00976 { 00977 SendNewData(true); 00978 } 00979 00980 00981 unsigned TCP_Sender::GetNextSegmentSize(const Sequence_Number & begin) 00982 { 00983 // try to get new user messages if available and necessary 00984 while ((fUserNxt < begin + fMSS) && (!SocketWriteQueue.empty())) { 00985 itpp::Packet *packet_p = SocketWriteQueue.front(); 00986 SocketWriteQueue.pop(); 00987 fUserNxt += (unsigned) packet_p->bit_size()/8; 00988 delete packet_p; 00989 } 00990 00991 Sequence_Number end = min(min(fUserNxt, begin + fMSS), 00992 fSndUna + SendWindow()); 00993 00994 if (fDebug) { 00995 std::cout << "TCP_Sender::GetNextSegmentSize," 00996 << " fUserNxt=" << fUserNxt 00997 << " begin_seq_num=" << begin 00998 << " fMSS=" << fMSS 00999 << " fSndUna=" << fSndUna 01000 << " SendWindow()=" << SendWindow() 01001 << " end_seq_num=" << end 01002 << " time=" << Event_Queue::now() << std::endl; 01003 } 01004 01005 return max(0, end - begin); 01006 } 01007 01008 01009 unsigned TCP_Sender::SendWindow() const 01010 { 01011 return min(fRecWnd, min (fMaxCWnd, fCWnd)); 01012 } 01013 01014 01015 double TCP_Sender::CalcRTOValue() const 01016 { 01017 static const double factor = 1 + 1e-8; 01018 // to avoid "simultaneous" TO/receive ACK events in case of const. RTT 01019 01020 double rto = fBackoff * fRTTEstimate * factor; 01021 01022 if (rto > fMaxRTO) { 01023 rto = fMaxRTO; 01024 } 01025 01026 return rto; 01027 } 01028 01029 01030 void TCP_Sender::SetRtxTimer() 01031 { 01032 double rto = CalcRTOValue(); 01033 fRtxTimer.Set(rto); 01034 fTimUna = fSndUna; 01035 if (fDebug) { 01036 std::cout << "sender " << fLabel 01037 << ": set rtx timer: " 01038 << "t = " << Event_Queue::now() 01039 << ", RTO = " << rto 01040 << ", Backoff = " << fBackoff 01041 << ", TimUna = " << fTimUna 01042 << std::endl; 01043 } 01044 } 01045 01046 01047 void TCP_Sender::UpdateRTTVariables(double sampleRTT) 01048 { 01049 if (fSRTT == 0) { 01050 fSRTT = sampleRTT; 01051 fRTTVar = sampleRTT / 2; 01052 } else { 01053 // see, e.g., Comer for the values used as weights 01054 fSRTT = 0.875 * fSRTT + 0.125 * sampleRTT; 01055 fRTTVar = 0.75 * fRTTVar + 0.25 * fabs(sampleRTT - fSRTT); 01056 } 01057 01058 fRTTEstimate = round(fSRTT + 4 * fRTTVar, fTimerGranularity); 01059 01060 if (fTrace) { 01061 TraceRTTVariables(sampleRTT); 01062 } 01063 01064 fNumberOfRTTMeasurements++; 01065 } 01066 01067 01068 void TCP_Sender::TraceRTTVariables(double sampleRTT) 01069 { 01070 if (fDebug) { 01071 std::cout << "sender " << fLabel 01072 << ": RTT update: " 01073 << "t = " << Event_Queue::now() 01074 << ", sample = " << sampleRTT 01075 << ", SRTT = " << fSRTT 01076 << ", RTTVar = " << fRTTVar 01077 << ", RTTEstimate = " << fRTTEstimate 01078 << std::endl; 01079 } 01080 01081 if (RTTsample_index >= RTTsample_time.size()) { 01082 RTTsample_time.set_size(2*RTTsample_time.size(),true); 01083 RTTsample_val.set_size(2*RTTsample_val.size(),true); 01084 } 01085 RTTsample_val(RTTsample_index) = sampleRTT; 01086 RTTsample_time(RTTsample_index) = Event_Queue::now(); 01087 RTTsample_index++; 01088 01089 if (RTTEstimate_index >= RTTEstimate_time.size()) { 01090 RTTEstimate_time.set_size(2*RTTEstimate_time.size(),true); 01091 RTTEstimate_val.set_size(2*RTTEstimate_val.size(),true); 01092 } 01093 RTTEstimate_val(RTTEstimate_index) = fRTTEstimate; 01094 RTTEstimate_time(RTTEstimate_index) = Event_Queue::now(); 01095 RTTEstimate_index++; 01096 } 01097 01098 01099 void TCP_Sender::TraceCWnd() 01100 { 01101 if (fDebug) { 01102 std::cout << "sender " << fLabel 01103 << " t = " << Event_Queue::now() 01104 << " cwnd = " << fCWnd << std::endl; 01105 } 01106 if (CWnd_index >= CWnd_time.size()) { 01107 CWnd_time.set_size(2*CWnd_time.size(),true); 01108 CWnd_val.set_size(2*CWnd_val.size(),true); 01109 } 01110 CWnd_val(CWnd_index) = fCWnd; 01111 CWnd_time(CWnd_index) = Event_Queue::now(); 01112 CWnd_index++; 01113 01114 } 01115 01116 void TCP_Sender::TraceSSThresh() 01117 { 01118 if (fDebug) { 01119 std::cout << "sender " << fLabel 01120 << " t = " << Event_Queue::now() 01121 << " cwnd = " << fSSThresh << std::endl; 01122 } 01123 if (SSThresh_index >= SSThresh_time.size()) { 01124 SSThresh_time.set_size(2*SSThresh_time.size(),true); 01125 SSThresh_val.set_size(2*SSThresh_val.size(),true); 01126 } 01127 SSThresh_val(SSThresh_index) = fSSThresh; 01128 SSThresh_time(SSThresh_index) = Event_Queue::now(); 01129 SSThresh_index++; 01130 01131 } 01132 01133 void TCP_Sender::TraceSentSeqNo(const Sequence_Number sn) 01134 { 01136 if (fDebug) { 01137 std::cout << "sender " << fLabel 01138 << " t = " << Event_Queue::now() 01139 << " sent = " << sn 01140 << std::endl; 01141 } 01142 if (sent_seq_num_index >= sent_seq_num_time.size()) { 01143 sent_seq_num_time.set_size(2*sent_seq_num_time.size(),true); 01144 sent_seq_num_val.set_size(2*sent_seq_num_val.size(),true); 01145 } 01146 sent_seq_num_val(sent_seq_num_index) = sn.value(); 01147 sent_seq_num_time(sent_seq_num_index) = Event_Queue::now(); 01148 sent_seq_num_index++; 01149 } 01150 01151 01152 void TCP_Sender::TraceACKedSeqNo(const Sequence_Number sn) 01153 { 01154 if (fDebug) { 01155 std::cout << "sender " << fLabel 01156 << " t = " << Event_Queue::now() 01157 << " ACK = " << sn 01158 << std::endl; 01159 } 01160 01161 if (sender_recv_ack_seq_num_index >= sender_recv_ack_seq_num_time.size()) { 01162 sender_recv_ack_seq_num_time.set_size(2*sender_recv_ack_seq_num_time.size(),true); 01163 sender_recv_ack_seq_num_val.set_size(2*sender_recv_ack_seq_num_val.size(),true); 01164 } 01165 sender_recv_ack_seq_num_val(sender_recv_ack_seq_num_index) = sn.value(); 01166 sender_recv_ack_seq_num_time(sender_recv_ack_seq_num_index) = Event_Queue::now(); 01167 sender_recv_ack_seq_num_index++; 01168 } 01169 01170 01171 void TCP_Sender::save_trace(std::string filename) { 01172 01173 CWnd_val.set_size(CWnd_index, true); 01174 CWnd_time.set_size(CWnd_index,true); 01175 01176 SSThresh_val.set_size(SSThresh_index, true); 01177 SSThresh_time.set_size(SSThresh_index,true); 01178 01179 sent_seq_num_val.set_size(sent_seq_num_index, true); 01180 sent_seq_num_time.set_size(sent_seq_num_index,true); 01181 01182 sender_recv_ack_seq_num_val.set_size(sender_recv_ack_seq_num_index, true); 01183 sender_recv_ack_seq_num_time.set_size(sender_recv_ack_seq_num_index,true); 01184 01185 RTTEstimate_val.set_size(RTTEstimate_index, true); 01186 RTTEstimate_time.set_size(RTTEstimate_index,true); 01187 01188 RTTsample_val.set_size(RTTsample_index, true); 01189 RTTsample_time.set_size(RTTsample_index,true); 01190 01191 if (fDebug) { 01192 std::cout << "CWnd_val" << CWnd_val << std::endl; 01193 std::cout << "CWnd_time" << CWnd_time << std::endl; 01194 std::cout << "CWnd_index" << CWnd_index << std::endl; 01195 01196 std::cout << "SSThresh_val" << SSThresh_val << std::endl; 01197 std::cout << "SSThresh_time" << SSThresh_time << std::endl; 01198 std::cout << "SSThresh_index" << SSThresh_index << std::endl; 01199 01200 std::cout << "sent_seq_num_val" << sent_seq_num_val << std::endl; 01201 std::cout << "sent_seq_num_time" << sent_seq_num_time << std::endl; 01202 std::cout << "sent_seq_num_index" << sent_seq_num_index << std::endl; 01203 01204 std::cout << "sender_recv_ack_seq_num_val" << sender_recv_ack_seq_num_val << std::endl; 01205 std::cout << "sender_recv_ack_seq_num_time" << sender_recv_ack_seq_num_time << std::endl; 01206 std::cout << "sender_recv_ack_seq_num_index" << sender_recv_ack_seq_num_index << std::endl; 01207 01208 std::cout << "RTTEstimate_val" << RTTEstimate_val << std::endl; 01209 std::cout << "RTTEstimate_time" << RTTEstimate_time << std::endl; 01210 std::cout << "RTTEstimate_index" << RTTEstimate_index << std::endl; 01211 01212 std::cout << "RTTsample_val" << RTTsample_val << std::endl; 01213 std::cout << "RTTsample_time" << RTTsample_time << std::endl; 01214 std::cout << "RTTsample_index" << RTTsample_index << std::endl; 01215 01216 std::cout << "TCP_Sender::saving to file: " << filename << std::endl; 01217 } 01218 01219 it_file ff2; 01220 ff2.open(filename); 01221 01222 ff2 << Name("CWnd_val") << CWnd_val; 01223 ff2 << Name("CWnd_time") << CWnd_time; 01224 ff2 << Name("CWnd_index") << CWnd_index; 01225 01226 ff2 << Name("SSThresh_val") << SSThresh_val; 01227 ff2 << Name("SSThresh_time") << SSThresh_time; 01228 ff2 << Name("SSThresh_index") << SSThresh_index; 01229 01230 ff2 << Name("sent_seq_num_val") << sent_seq_num_val; 01231 ff2 << Name("sent_seq_num_time") << sent_seq_num_time; 01232 ff2 << Name("sent_seq_num_index") << sent_seq_num_index; 01233 01234 ff2 << Name("sender_recv_ack_seq_num_val") << sender_recv_ack_seq_num_val; 01235 ff2 << Name("sender_recv_ack_seq_num_time") << sender_recv_ack_seq_num_time; 01236 ff2 << Name("sender_recv_ack_seq_num_index") << sender_recv_ack_seq_num_index; 01237 01238 ff2 << Name("RTTEstimate_val") << RTTEstimate_val; 01239 ff2 << Name("RTTEstimate_time") << RTTEstimate_time; 01240 ff2 << Name("RTTEstimate_index") << RTTEstimate_index; 01241 01242 ff2 << Name("RTTsample_val") << RTTsample_val; 01243 ff2 << Name("RTTsample_time") << RTTsample_time; 01244 ff2 << Name("RTTsample_index") << RTTsample_index; 01245 01246 ff2.flush(); 01247 ff2.close(); 01248 } 01249 01250 01251 void TCP_Sender::print_item(std::ostream & out, const std::string & keyword) 01252 { 01253 if (keyword == "Label") { 01254 std::cout << fLabel; 01255 } else if (keyword == "CWnd") { 01256 std::cout << fCWnd; 01257 } else if (keyword == "SSThresh") { 01258 std::cout << fSSThresh; 01259 } else if (keyword == "SRTT") { 01260 std::cout << fSRTT; 01261 } else if (keyword == "RTTvar") { 01262 std::cout << fRTTVar; 01263 } else if (keyword == "Backoff") { 01264 std::cout << fBackoff; 01265 } else if (keyword == "RTO") { 01266 std::cout << CalcRTOValue(); 01267 } else if (keyword == "NoOfFastRets") { 01268 std::cout << fNumberOfFastRetransmits; 01269 } else if (keyword == "NoOfRetTOs") { 01270 std::cout << fNumberOfTimeouts; 01271 } else if (keyword == "NoOfIdleTOs") { 01272 std::cout << fNumberOfIdleTimeouts; 01273 } else if (keyword == "NoOfRTTMs") { 01274 std::cout << fNumberOfRTTMeasurements; 01275 } else if (keyword == "NoOfRecACKs") { 01276 std::cout << fNumberOfReceivedACKs; 01277 } else { 01278 } 01279 } 01280 01281 01282 // -------------------- TCP_Receiver_Buffer ---------------------------------------- 01283 TCP_Receiver_Buffer::TCP_Receiver_Buffer() : 01284 fFirstByte() 01285 { 01286 } 01287 01288 01289 TCP_Receiver_Buffer::TCP_Receiver_Buffer(const TCP_Receiver_Buffer & rhs) : 01290 fFirstByte(rhs.fFirstByte), 01291 fBufList(rhs.fBufList) 01292 { 01293 } 01294 01295 01296 void TCP_Receiver_Buffer::reset() 01297 { 01298 fBufList.clear(); 01299 fFirstByte = 0; 01300 } 01301 01302 01303 TCP_Receiver_Buffer::~TCP_Receiver_Buffer() 01304 { 01305 } 01306 01307 01308 void TCP_Receiver_Buffer::write(TCP_Segment newBlock) 01309 { 01310 // error cases 01311 it_assert(newBlock.begin() <= newBlock.end(), "TCP_Receiver_Buffer::Write, no valid segment"); 01312 01313 // cut blocks beginning before fFirstByte 01314 if (newBlock.begin() < fFirstByte) { 01315 if (newBlock.end() > fFirstByte) { 01316 newBlock.set_begin(fFirstByte); 01317 } else { 01318 return; //// TODO: Is this strange? 01319 } 01320 } 01321 01322 if (newBlock.length() == 0) { // empty block, nothing to do 01323 return; 01324 } 01325 01326 if (fBufList.empty() || (newBlock.begin() > fBufList.back().end())) { 01327 // new block is behind last block in buffer 01328 fBufList.push_back(newBlock); 01329 } else { 01330 // skip list entries if beginning of newBlock > end of current one 01331 // (search for correct list position) 01332 std::list<TCP_Segment>::iterator iter; 01333 iter = fBufList.begin(); 01334 while (newBlock.begin() > iter->end()) { 01335 iter++; 01336 it_assert(iter != fBufList.end(), "TCP_Receiver_Buffer::Write, internal error"); 01337 } 01338 01339 TCP_Segment & exBlock = *iter; 01340 01341 if (exBlock.can_be_combined(newBlock)) { 01342 // overlapping or contiguous blocks -> combine 01343 exBlock.combine(newBlock); 01344 01345 // check following blocks 01346 iter++; 01347 while ((iter != fBufList.end()) && 01348 exBlock.can_be_combined(*iter)) { 01349 exBlock.combine(*iter); 01350 iter = fBufList.erase(iter); 01351 } 01352 } else { 01353 // no overlap, newBlock lies between two existing list entries 01354 // new list entry has to be created 01355 01356 fBufList.insert(iter, newBlock); 01357 } 01358 } 01359 01360 it_assert(!fBufList.empty() && fBufList.front().begin() >= fFirstByte, "TCP_Receiver_Buffer::Write, internal error"); 01361 01362 } 01363 01364 01365 // The amount of data read from the buffer is given as parameter. It has 01366 // to be less than or equal to the size of the first block stored. This 01367 // mean the caller of Read should first check how much data is available 01368 // by calling FirstBlockSize. 01369 void TCP_Receiver_Buffer::read(unsigned noOfBytes) 01370 { 01371 it_assert(first_block_size() > 0, "TCP_Receiver_Buffer::Read, No block to read"); 01372 it_assert(noOfBytes <= first_block_size(), "TCP_Receiver_Buffer::Read, submitted block size not valid"); 01373 01374 01375 if (noOfBytes < first_block_size()) { 01376 fBufList.front().set_begin(fBufList.front().begin() + noOfBytes); 01377 } else { // first block will be read completely 01378 fBufList.pop_front(); 01379 } 01380 fFirstByte += noOfBytes; 01381 01382 it_assert(fBufList.empty() || fBufList.front().begin() >= fFirstByte, "TCP_Receiver_Buffer::Read, internal error"); 01383 } 01384 01385 01386 // FirstBlockSize returns the size of the first block stored in the 01387 // buffer or 0 if the buffer is empty 01388 unsigned TCP_Receiver_Buffer::first_block_size() const 01389 { 01390 if (!fBufList.empty() && (fBufList.front().begin() == fFirstByte)) { 01391 return fBufList.front().length(); 01392 } else { 01393 return 0; 01394 } 01395 } 01396 01397 01398 std::ostream & TCP_Receiver_Buffer::info(std::ostream &os, int detail) const 01399 { 01400 os << "receiver buffer information" << std::endl 01401 << "number of blocks: " << fBufList.size() << std::endl 01402 << "first byte stored: " << fFirstByte << std::endl 01403 << "last byte stored +1: " << last_byte() << std::endl 01404 << "next byte expected: " << next_expected() << std::endl; 01405 01406 if (detail>0) { 01407 os << "segments in receiver buffer:" << std::endl; 01408 01409 typedef std::list<TCP_Segment>::const_iterator LI; 01410 for (LI i = fBufList.begin(); i != fBufList.end(); ++i) { 01411 const TCP_Segment & block = *i; 01412 os << ". segment: " << block << std::endl; 01413 } 01414 01415 } 01416 01417 return os; 01418 } 01419 01420 01421 // -------------------- TCP_Receiver ---------------------------------------- 01422 TCP_Receiver::TCP_Receiver(int label) : 01423 fReceiverBuffer(), 01424 fLabel(label), 01425 fTCPIPHeaderLength(TCP_HEADERLENGTH), 01426 fMSS(TCP_RMSS), 01427 fBufferSize(TCP_BUFFERSIZE), 01428 fDelayedACK(TCP_DELAYEDACK), 01429 fACKDelayTime(TCP_ACKDELAYTIME), 01430 fSendPeriodicACKs(TCP_SENDPERIODICACKS), 01431 fStrictPeriodicACKs(TCP_STRICTPERIODICACKS), 01432 fPeriodicACKInterval(TCP_PERIODICACKINTERVAL), 01433 fACKSchedulingDelay(TCP_ACKSCHEDULINGDELAY), 01434 fACKOnBufferWrite(TCP_ACKBUFFERWRITE), 01435 fACKOnBufferRead(TCP_ACKBUFFERREAD), 01436 fMaxUserBlockSize(TCP_MAXUSERBLOCKSIZE), 01437 fMinUserBlockSize(TCP_MINUSERBLOCKSIZE), 01438 fUserBlockProcDelay(TCP_USERBLOCKPROCDELAY), 01439 fTrace(false), 01440 fDebug(false), 01441 fSessionId(0), 01442 fDelayedACKTimer(*this, &TCP_Receiver::DelayedACKHandler), 01443 fPeriodicACKTimer(*this, &TCP_Receiver::PeriodicACKHandler), 01444 fACKSchedulingTimer(*this, &TCP_Receiver::SendACKMessage), 01445 fWaitingACKMsg(0), 01446 fUserBlockProcTimer(*this, &TCP_Receiver::HandleEndOfProcessing) 01447 { 01448 fUserMessage = NULL; 01449 01450 01451 if (!fACKOnBufferRead && !fACKOnBufferWrite) { 01452 // throw(UL_CException("TCP_Receiver::TCP_Receiver", 01453 // "ACKs must be sent on buffer read or write or both")); 01454 } 01455 01456 setup(); 01457 01458 tcp_receive.forward(this, &TCP_Receiver::ReceiveMessageFromNet); 01459 tcp_receive.set_name("TCP Receive"); 01460 tcp_send_ack.set_name("TCP send ACK"); 01461 tcp_new_data.set_name("TCP New Data"); 01462 tcp_release.forward(this, &TCP_Receiver::release); 01463 tcp_release.set_name("TCP Release"); 01464 01465 } 01466 01467 01468 TCP_Receiver::~TCP_Receiver () 01469 { 01470 delete fWaitingACKMsg; 01471 delete fUserMessage; 01472 } 01473 01474 01475 void TCP_Receiver::set_debug(const bool enable_debug) 01476 { 01477 fDebug = enable_debug; 01478 tcp_send_ack.set_debug(enable_debug); 01479 tcp_new_data.set_debug(); 01480 } 01481 01482 void TCP_Receiver::set_debug(bool enable_debug, bool enable_signal_debug) 01483 { 01484 fDebug = enable_debug; 01485 tcp_send_ack.set_debug(enable_signal_debug); 01486 tcp_new_data.set_debug(); 01487 } 01488 01489 void TCP_Receiver::set_trace(const bool enable_trace) 01490 { 01491 fTrace = enable_trace; 01492 } 01493 01494 01495 01496 void TCP_Receiver::setup() 01497 { 01498 fAdvRcvWnd = 0; 01499 fAdvRcvNxt = 0; 01500 01501 if (fSendPeriodicACKs) { 01502 fPeriodicACKTimer.Set(fPeriodicACKInterval); 01503 } 01504 01505 fReceiverBuffer.reset(); 01506 01507 received_seq_num_val.set_size(1000); 01508 received_seq_num_val.zeros(); 01509 received_seq_num_time.set_size(1000); 01510 received_seq_num_time.zeros(); 01511 received_seq_num_val(0) = 0; 01512 received_seq_num_time(0) = 0; 01513 received_seq_num_index=1; 01514 } 01515 01516 std::string TCP_Receiver::GenerateFilename() 01517 { 01518 time_t rawtime; 01519 struct tm *timeinfo; 01520 timeinfo = localtime(&rawtime); 01521 std::ostringstream filename_stream; 01522 filename_stream << "trace_tcp_receiver_u" << fLabel 01523 << "_" << 1900+timeinfo->tm_year 01524 << "_" << timeinfo->tm_mon 01525 << "_" << timeinfo->tm_mday 01526 << "__" << timeinfo->tm_hour 01527 << "_" << timeinfo->tm_min 01528 << "_" << timeinfo->tm_sec 01529 << "_.it"; 01530 return filename_stream.str(); 01531 } 01532 01533 void TCP_Receiver::release(std::string file) 01534 { 01535 std::string filename; 01536 fSessionId++; 01537 01538 if (fWaitingACKMsg != 0) { 01539 delete fWaitingACKMsg; 01540 fWaitingACKMsg = 0; 01541 } 01542 if (fUserMessage != 0) { 01543 delete fUserMessage; 01544 fUserMessage = 0; 01545 } 01546 01547 fUserBlockProcTimer.Reset(); 01548 fDelayedACKTimer.Reset(); 01549 fPeriodicACKTimer.Reset(); 01550 fACKSchedulingTimer.Reset(); 01551 01552 if (fTrace) { 01553 if (file == "") 01554 filename = GenerateFilename(); 01555 else 01556 filename = file; 01557 01558 save_trace(filename); 01559 } 01560 } 01561 01562 01563 void TCP_Receiver::ReceiveMessageFromNet(itpp::Packet *msg) 01564 { 01565 TCP_Packet & packet = (TCP_Packet &) *msg; 01566 if (packet.get_destination_port() == fLabel) { 01567 if (packet.get_session_id() == fSessionId) { 01568 ReceiveDataPacket(packet); 01569 } 01570 else { 01571 it_warning("Received a TCP packet with wrong SessionId"); 01572 std::cout << "TCP_Receiver::ReceiveMessageFromNet, " 01573 << "fLabel= " << fLabel 01574 << "fSessionId= " << fSessionId << std::endl; 01575 std::cout << "packet=" << packet 01576 << ", next exp. = " << fReceiverBuffer.next_expected() 01577 << std::endl; 01578 exit(0); 01579 } 01580 } 01581 else { 01582 it_warning("Received a TCP packet with label"); 01583 exit(0); 01584 } 01585 } 01586 01587 01588 void TCP_Receiver::ReceiveDataPacket(TCP_Packet &msg) 01589 { 01590 TCP_Segment segment = msg.get_segment(); 01591 01592 bool isOutOfOrder = (segment.begin() > fReceiverBuffer.next_expected()) || 01593 (segment.end() <= fReceiverBuffer.next_expected()); 01594 01595 if (fDebug) { 01596 std::cout << "TCP_Receiver::ReceiveDataPacket receiver: " << fLabel << ": " 01597 << "receive msg: " 01598 << "t = " << Event_Queue::now() 01599 << ", next exp. = " << fReceiverBuffer.next_expected() 01600 << ", " << msg << std::endl; 01601 } 01602 01603 if (fTrace) { 01604 TraceReceivedSeqNo(segment.end()); 01605 } 01606 01607 it_assert(segment.end() <= fReceiverBuffer.first_byte() + fBufferSize, "TCP_Receiver::ReceiveTCPPacket, packet exceeds window at "); 01608 it_assert(segment.begin() < segment.end(), "TCP_Receiver::ReceiveTCPPacket, silly packet received at "); 01609 01610 fReceiverBuffer.write(segment); 01611 01612 if (isOutOfOrder) { 01613 SendACK(true); // create dupack conditionless 01614 } else { 01615 if (fACKOnBufferWrite) { 01616 SendACK(false); 01617 } 01618 IndicateUserMessage(); 01619 } 01620 01621 delete &msg; 01622 } 01623 01624 01625 void TCP_Receiver::IndicateUserMessage() 01626 { 01627 if (fUserMessage == 0) { 01628 // receive a block 01629 unsigned noOfBytes = min(fReceiverBuffer.first_block_size(), 01630 fMaxUserBlockSize); 01631 01632 if (fDebug) { 01633 std::cout << "TCP_Receiver::IndicateUserMessage " 01634 << "t = " << Event_Queue::now() 01635 << " noOfBytes = " << noOfBytes 01636 << " firstBlock = " << fReceiverBuffer.first_block_size() 01637 << std::endl; 01638 } 01639 01640 if (noOfBytes >= fMinUserBlockSize) { 01641 fUserMessage = new Packet(); 01642 fUserMessage->set_bit_size(8*noOfBytes); 01643 fUserBlockProcTimer.Set(fUserBlockProcDelay); 01644 } 01645 } 01646 } 01647 01648 01649 bool TCP_Receiver::is_user_message_available() 01650 { 01651 if (fUserMessage != 0) { 01652 return true; 01653 } 01654 01655 unsigned noOfBytes = min(fReceiverBuffer.first_block_size(), 01656 fMaxUserBlockSize); 01657 01658 if (noOfBytes >= fMinUserBlockSize) { 01659 fUserMessage = new Packet(); 01660 fUserMessage->set_bit_size(8*noOfBytes); 01661 return true; 01662 } else { 01663 return false; 01664 } 01665 } 01666 01667 01668 itpp::Packet & TCP_Receiver::get_user_message() 01669 { 01670 it_assert(fUserMessage != 0, "TCP_Receiver::GetUserMessage, no message available"); 01671 if (fDebug) { 01672 std::cout << "TCP_Receiver::GetUserMessage " 01673 << "receiver: " << fLabel << ": " 01674 << "read from buffer: " 01675 << "t = " << Event_Queue::now() 01676 << ", user msg length = " << (fUserMessage->bit_size()/8) 01677 << ", first byte = " << fReceiverBuffer.first_byte() 01678 << ", first block size = " << fReceiverBuffer.first_block_size() 01679 << std::endl; 01680 } 01681 01682 fReceiverBuffer.read(fUserMessage->bit_size()/8); 01683 if (fACKOnBufferRead) { 01684 SendACK(false); // send acknowledgement 01685 } 01686 01687 itpp::Packet & msg = *fUserMessage; 01688 fUserMessage = 0; 01689 01690 if (fReceiverBuffer.first_block_size() > 0) { 01691 IndicateUserMessage(); 01692 } 01693 01694 return msg; 01695 } 01696 01697 01698 01699 void TCP_Receiver::HandleEndOfProcessing(Ttype) 01700 { 01701 it_assert(fUserMessage != 0, "TCP_Receiver::HandleEndOfProcessing, no message available"); 01702 01703 01704 tcp_new_data(fLabel); 01705 } 01706 01707 01708 void TCP_Receiver::DelayedACKHandler(Ttype) 01709 { 01710 if (fDebug) { 01711 std::cout << "TCP_Receiver::DelayedACKHandler " 01712 << "receiver " << fLabel 01713 << ": delACK TO: " 01714 << "t = " << Event_Queue::now() << std::endl; 01715 } 01716 01717 SendACK(true); 01718 } 01719 01720 01721 void TCP_Receiver::PeriodicACKHandler(Ttype) 01722 { 01723 if (fDebug) { 01724 std::cout << "TCP_Receiver::PeriodicACKHandler" 01725 << "receiver " << fLabel 01726 << ": periodicACK TO: " 01727 << "t = " << Event_Queue::now() << std::endl; 01728 } 01729 01730 SendACK(true); 01731 } 01732 01733 01734 void TCP_Receiver::SendACK(bool sendConditionless) 01735 { 01736 // sendConditionless is set 01737 // ... if packet was received out of order or 01738 // ... if delayed ACK timer has expired 01739 01740 // Bei eingeschaltetem "delayed ACK" wird ein ACK nur 01741 // gesendet, wenn das Fenster um 2MSS oder 35% der 01742 // maximalen Fenstergroesse verschoben worden ist 01743 // ... oder nach delayed ACK Timeout 01744 // ... oder wenn es das ACK fur ein Out of Order Segment ist 01745 // ... oder (in der Realitat), wenn ich auch was zu senden habe. 01746 01747 if (sendConditionless || !fDelayedACK || 01748 (fReceiverBuffer.next_expected() - fAdvRcvNxt >= (int)(2 * fMSS)) || 01749 (fReceiverBuffer.next_expected() - fAdvRcvNxt >= 01750 (int)(0.35 * fBufferSize))) { 01751 // Remark: RFC2581 recommends to acknowledge every second 01752 // packet conditionless (without setting this as a requirement) 01753 // in order to avoid excessive ack delays when the receiver MSS 01754 // is larger than the sender MSS. In this uni-directional 01755 // implementation, the receiver's MSS is not actively 01756 // used for sending but only for deciding when acknowledgments 01757 // have to be returned. Thus, the best solution to account for 01758 // RFC2581 is to set the receiver's MSS always equal to the 01759 // sender's MSS. 01760 01761 // Receiver Silly Window Syndrome Avoidance: 01762 01763 if (fAdvRcvNxt + fAdvRcvWnd + min(fBufferSize / 2, fMSS) 01764 <= fReceiverBuffer.first_byte() + fBufferSize) { 01765 // Die rechte Grenze des Empfangerfensters wird nur anders angezeigt 01766 // als beim letzten ACK, wenn sie sich seither um mindestens 01767 // min (BufferSize/ 2, MSS) geandert hat. 01768 fAdvRcvWnd = fBufferSize - fReceiverBuffer.first_block_size(); 01769 } else { 01770 fAdvRcvWnd = fAdvRcvNxt + fAdvRcvWnd - fReceiverBuffer.next_expected(); 01771 } 01772 01773 fAdvRcvNxt = fReceiverBuffer.next_expected(); 01774 01775 if (fSendPeriodicACKs && 01776 (!fStrictPeriodicACKs || !fPeriodicACKTimer.IsPending())) { 01777 fPeriodicACKTimer.Set(fPeriodicACKInterval); 01778 } 01779 01780 if (fDelayedACK && fDelayedACKTimer.IsPending()) { 01781 fDelayedACKTimer.Reset(); 01782 } 01783 01784 ScheduleACKMessage(); 01785 } else { 01786 if (!fDelayedACKTimer.IsPending()) { 01787 fDelayedACKTimer.Set(fACKDelayTime); 01788 if (fDebug) { 01789 std::cout << "TCP_Receiver::SendACK" 01790 << "receiver " << fLabel 01791 << ": set delACK timer: " 01792 << "t = " << Event_Queue::now() << std::endl; 01793 } 01794 } 01795 } 01796 } 01797 01798 01799 void TCP_Receiver::ScheduleACKMessage() 01800 { 01801 if (fWaitingACKMsg == 0) { 01802 fWaitingACKMsg = new TCP_Packet; 01803 } 01804 01805 fWaitingACKMsg->set_ACK(fAdvRcvNxt); 01806 fWaitingACKMsg->set_wnd(fAdvRcvWnd); 01807 fWaitingACKMsg->set_session_id(fSessionId); 01808 fWaitingACKMsg->set_destination_port(fLabel); 01809 fWaitingACKMsg->set_source_port(fLabel); 01810 fWaitingACKMsg->set_bit_size(8*fTCPIPHeaderLength); 01811 01812 if (fACKSchedulingDelay > 0) { 01813 if (!fACKSchedulingTimer.IsPending()) { 01814 fACKSchedulingTimer.Set(fACKSchedulingDelay); 01815 } 01816 } else { 01817 SendACKMessage(Event_Queue::now()); 01818 } 01819 } 01820 01821 01822 void TCP_Receiver::SendACKMessage(Ttype) 01823 { 01824 it_assert(fWaitingACKMsg != 0, "TCP_Receiver::SendACKMessage, no ACK message waiting"); 01825 01826 if (fDebug) { 01827 std::cout << "TCP_Receiver::SendACKMessage Ack sent" 01828 << "receiver " << fLabel 01829 << ": send ACK: " 01830 << "t = " << Event_Queue::now() 01831 << ", " << (*fWaitingACKMsg) 01832 << " byte_size=" << fWaitingACKMsg->bit_size()/8 01833 << " ptr=" << fWaitingACKMsg << std::endl; 01834 } 01835 01836 tcp_send_ack(fWaitingACKMsg); 01837 01838 fWaitingACKMsg = 0; 01839 } 01840 01841 01842 void TCP_Receiver::TraceReceivedSeqNo(const Sequence_Number &sn) 01843 { 01844 if (fDebug) { 01845 std::cout << "TCP_Receiver::TraceReceivedSeqNo " 01846 << "receiver " << fLabel 01847 << " t = " << Event_Queue::now() 01848 << " sn = " << sn << std::endl; 01849 } 01850 if (received_seq_num_index >= received_seq_num_time.size()) { 01851 received_seq_num_time.set_size(2*received_seq_num_time.size(),true); 01852 received_seq_num_val.set_size(2*received_seq_num_val.size(),true); 01853 } 01854 received_seq_num_val(received_seq_num_index) = sn.value(); 01855 received_seq_num_time(received_seq_num_index) = Event_Queue::now(); 01856 received_seq_num_index++; 01857 } 01858 01859 01860 void TCP_Receiver::save_trace(std::string filename) { 01861 01862 received_seq_num_val.set_size(received_seq_num_index, true); 01863 received_seq_num_time.set_size(received_seq_num_index,true); 01864 01865 if (fDebug) { 01866 std::cout << "received_seq_num_val" << received_seq_num_val << std::endl; 01867 std::cout << "received_seq_num_time" << received_seq_num_time << std::endl; 01868 std::cout << "received_seq_num_index" << received_seq_num_index << std::endl; 01869 std::cout << "TCP_Receiver::saving to file: " << filename << std::endl; 01870 } 01871 01872 it_file ff2; 01873 ff2.open(filename); 01874 01875 ff2 << Name("received_seq_num_val") << received_seq_num_val; 01876 ff2 << Name("received_seq_num_time") << received_seq_num_time; 01877 ff2 << Name("received_seq_num_index") << received_seq_num_index; 01878 01879 ff2.flush(); 01880 ff2.close(); 01881 } 01882 01883 01884 } //namespace itpp 01885 01886 #ifdef _MSC_VER 01887 #pragma warning(default:4355) 01888 #endif 01889
Generated on Sun Dec 9 17:38:49 2007 for IT++ by Doxygen 1.5.4