gem5
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
dist_iface.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-2016 ARM Limited
3  * All rights reserved
4  *
5  * The license below extends only to copyright in the software and shall
6  * not be construed as granting a license to any other intellectual
7  * property including but not limited to intellectual property relating
8  * to a hardware implementation of the functionality of the software
9  * licensed hereunder. You may use the software subject to the license
10  * terms below provided that you ensure that this notice is replicated
11  * unmodified and in its entirety in all distributions of the software,
12  * modified or unmodified, in source code or in binary form.
13  *
14  * Redistribution and use in source and binary forms, with or without
15  * modification, are permitted provided that the following conditions are
16  * met: redistributions of source code must retain the above copyright
17  * notice, this list of conditions and the following disclaimer;
18  * redistributions in binary form must reproduce the above copyright
19  * notice, this list of conditions and the following disclaimer in the
20  * documentation and/or other materials provided with the distribution;
21  * neither the name of the copyright holders nor the names of its
22  * contributors may be used to endorse or promote products derived from
23  * this software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  *
37  * Authors: Gabor Dozsa
38  */
39 
40 /* @file
41  * The interface class for dist-gem5 simulations.
42  */
43 
44 #include "dev/net/dist_iface.hh"
45 
46 #include <queue>
47 #include <thread>
48 
49 #include "base/random.hh"
50 #include "base/trace.hh"
51 #include "cpu/thread_context.hh"
52 #include "debug/DistEthernet.hh"
53 #include "debug/DistEthernetPkt.hh"
54 #include "dev/net/etherpkt.hh"
55 #include "sim/sim_exit.hh"
56 #include "sim/sim_object.hh"
57 #include "sim/system.hh"
58 
59 using namespace std;
61 System *DistIface::sys = nullptr;
63 unsigned DistIface::distIfaceNum = 0;
64 unsigned DistIface::recvThreadsNum = 0;
65 DistIface *DistIface::master = nullptr;
66 bool DistIface::isSwitch = false;
67 
68 void
69 DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
70 {
71  if (start_tick < nextAt) {
72  nextAt = start_tick;
73  inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
74  }
75 
76  if (repeat_tick == 0)
77  panic("Dist synchronisation interval must be greater than zero");
78 
79  if (repeat_tick < nextRepeat) {
80  nextRepeat = repeat_tick;
81  inform("Dist synchronisation interval is changed to %lu.\n",
82  nextRepeat);
83  }
84 }
85 
86 void
88 {
89  std::unique_lock<std::mutex> sync_lock(lock);
90  waitNum = 0;
91  isAbort = true;
92  sync_lock.unlock();
93  cv.notify_one();
94 }
95 
97 {
98  numNodes = num_nodes;
99  waitNum = num_nodes;
100  numExitReq = 0;
101  numCkptReq = 0;
102  numStopSyncReq = 0;
103  doExit = false;
104  doCkpt = false;
105  doStopSync = false;
106  nextAt = std::numeric_limits<Tick>::max();
107  nextRepeat = std::numeric_limits<Tick>::max();
108  isAbort = false;
109 }
110 
112 {
113  waitNum = 0;
114  needExit = ReqType::none;
115  needCkpt = ReqType::none;
116  needStopSync = ReqType::none;
117  doExit = false;
118  doCkpt = false;
119  doStopSync = false;
120  nextAt = std::numeric_limits<Tick>::max();
121  nextRepeat = std::numeric_limits<Tick>::max();
122  isAbort = false;
123 }
124 
125 bool
127 {
128  std::unique_lock<std::mutex> sync_lock(lock);
129  Header header;
130 
131  assert(waitNum == 0);
132  assert(!isAbort);
133  waitNum = DistIface::recvThreadsNum;
134  // initiate the global synchronisation
135  header.msgType = MsgType::cmdSyncReq;
136  header.sendTick = curTick();
137  header.syncRepeat = nextRepeat;
138  header.needCkpt = needCkpt;
139  header.needStopSync = needStopSync;
140  if (needCkpt != ReqType::none)
141  needCkpt = ReqType::pending;
142  header.needExit = needExit;
143  if (needExit != ReqType::none)
144  needExit = ReqType::pending;
145  if (needStopSync != ReqType::none)
146  needStopSync = ReqType::pending;
147  DistIface::master->sendCmd(header);
148  // now wait until all receiver threads complete the synchronisation
149  auto lf = [this]{ return waitNum == 0; };
150  cv.wait(sync_lock, lf);
151  // global synchronisation is done.
152  assert(isAbort || !same_tick || (nextAt == curTick()));
153  return !isAbort;
154 }
155 
156 
157 bool
159 {
160  std::unique_lock<std::mutex> sync_lock(lock);
161  Header header;
162  // Wait for the sync requests from the nodes
163  if (waitNum > 0) {
164  auto lf = [this]{ return waitNum == 0; };
165  cv.wait(sync_lock, lf);
166  }
167  assert(waitNum == 0);
168  if (isAbort) // sync aborted
169  return false;
170  assert(!same_tick || (nextAt == curTick()));
171  waitNum = numNodes;
172  // Complete the global synchronisation
173  header.msgType = MsgType::cmdSyncAck;
174  header.sendTick = nextAt;
175  header.syncRepeat = nextRepeat;
176  if (doCkpt || numCkptReq == numNodes) {
177  doCkpt = true;
178  header.needCkpt = ReqType::immediate;
179  numCkptReq = 0;
180  } else {
181  header.needCkpt = ReqType::none;
182  }
183  if (doExit || numExitReq == numNodes) {
184  doExit = true;
185  header.needExit = ReqType::immediate;
186  } else {
187  header.needExit = ReqType::none;
188  }
189  if (doStopSync || numStopSyncReq == numNodes) {
190  doStopSync = true;
191  numStopSyncReq = 0;
192  header.needStopSync = ReqType::immediate;
193  } else {
194  header.needStopSync = ReqType::none;
195  }
196  DistIface::master->sendCmd(header);
197  return true;
198 }
199 
200 bool
202  Tick sync_repeat,
203  ReqType need_ckpt,
204  ReqType need_exit,
205  ReqType need_stop_sync)
206 {
207  std::unique_lock<std::mutex> sync_lock(lock);
208  if (isAbort) // sync aborted
209  return false;
210  assert(waitNum > 0);
211 
212  if (send_tick > nextAt)
213  nextAt = send_tick;
214  if (nextRepeat > sync_repeat)
215  nextRepeat = sync_repeat;
216 
217  if (need_ckpt == ReqType::collective)
218  numCkptReq++;
219  else if (need_ckpt == ReqType::immediate)
220  doCkpt = true;
221  if (need_exit == ReqType::collective)
222  numExitReq++;
223  else if (need_exit == ReqType::immediate)
224  doExit = true;
225  if (need_stop_sync == ReqType::collective)
226  numStopSyncReq++;
227  else if (need_stop_sync == ReqType::immediate)
228  doStopSync = true;
229 
230  waitNum--;
231  // Notify the simulation thread if the on-going sync is complete
232  if (waitNum == 0) {
233  sync_lock.unlock();
234  cv.notify_one();
235  }
236  // The receive thread must keep alive in the switch until the node
237  // closes the connection. Thus, we always return true here.
238  return true;
239 }
240 
241 bool
243  Tick next_repeat,
244  ReqType do_ckpt,
245  ReqType do_exit,
246  ReqType do_stop_sync)
247 {
248  std::unique_lock<std::mutex> sync_lock(lock);
249  if (isAbort) // sync aborted
250  return false;
251  assert(waitNum > 0);
252 
253  nextAt = max_send_tick;
254  nextRepeat = next_repeat;
255  doCkpt = (do_ckpt != ReqType::none);
256  doExit = (do_exit != ReqType::none);
257  doStopSync = (do_stop_sync != ReqType::none);
258 
259  waitNum--;
260  // Notify the simulation thread if the on-going sync is complete
261  if (waitNum == 0) {
262  sync_lock.unlock();
263  cv.notify_one();
264  }
265  // The receive thread must finish when simulation is about to exit
266  return !doExit;
267 }
268 
269 void
271 {
272  std::lock_guard<std::mutex> sync_lock(lock);
273  assert(req != ReqType::none);
274  if (needCkpt != ReqType::none)
275  warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
276  if (needCkpt == ReqType::none || req == ReqType::immediate)
277  needCkpt = req;
278 }
279 
280 void
282 {
283  std::lock_guard<std::mutex> sync_lock(lock);
284  assert(req != ReqType::none);
285  if (needExit != ReqType::none)
286  warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
287  if (needExit == ReqType::none || req == ReqType::immediate)
288  needExit = req;
289 }
290 
291 void
293 {
294  if (doCkpt) {
295  // The first DistIface object called this right before writing the
296  // checkpoint. We need to drain the underlying physical network here.
297  // Note that other gem5 peers may enter this barrier at different
298  // ticks due to draining.
299  run(false);
300  // Only the "first" DistIface object has to perform the sync
301  doCkpt = false;
302  }
303 }
304 
305 void
307 {
308  int need_exit = static_cast<int>(needExit);
309  SERIALIZE_SCALAR(need_exit);
310 }
311 
312 void
314 {
315  int need_exit;
316  UNSERIALIZE_SCALAR(need_exit);
317  needExit = static_cast<ReqType>(need_exit);
318 }
319 
320 void
322 {
323  SERIALIZE_SCALAR(numExitReq);
324 }
325 
326 void
328 {
329  UNSERIALIZE_SCALAR(numExitReq);
330 }
331 
332 void
334 {
335  // Note that this may be called either from startup() or drainResume()
336 
337  // At this point, all DistIface objects has already called Sync::init() so
338  // we have a local minimum of the start tick and repeat for the periodic
339  // sync.
340  repeat = DistIface::sync->nextRepeat;
341  // Do a global barrier to agree on a common repeat value (the smallest
342  // one from all participating nodes.
343  if (!DistIface::sync->run(false))
344  panic("DistIface::SyncEvent::start() aborted\n");
345 
346  assert(!DistIface::sync->doCkpt);
347  assert(!DistIface::sync->doExit);
348  assert(!DistIface::sync->doStopSync);
349  assert(DistIface::sync->nextAt >= curTick());
350  assert(DistIface::sync->nextRepeat <= repeat);
351 
352  if (curTick() == 0)
353  assert(!scheduled());
354 
355  // Use the maximum of the current tick for all participating nodes or a
356  // user provided starting tick.
357  if (scheduled())
358  reschedule(DistIface::sync->nextAt);
359  else
360  schedule(DistIface::sync->nextAt);
361 
362  inform("Dist sync scheduled at %lu and repeats %lu\n", when(),
363  DistIface::sync->nextRepeat);
364 }
365 
366 void
368 {
369  // We may not start a global periodic sync while draining before taking a
370  // checkpoint. This is due to the possibility that peer gem5 processes
371  // may not hit the same periodic sync before they complete draining and
372  // that would make this periodic sync clash with sync called from
373  // DistIface::serialize() by other gem5 processes.
374  // We would need a 'distributed drain' solution to eliminate this
375  // restriction.
376  // Note that if draining was not triggered by checkpointing then we are
377  // fine since no extra global sync will happen (i.e. all peer gem5 will
378  // hit this periodic sync eventually).
379  panic_if(_draining && DistIface::sync->doCkpt,
380  "Distributed sync is hit while draining");
381  /*
382  * Note that this is a global event so this process method will be called
383  * by only exactly one thread.
384  */
385  /*
386  * We hold the eventq lock at this point but the receiver thread may
387  * need the lock to schedule new recv events while waiting for the
388  * dist sync to complete.
389  * Note that the other simulation threads also release their eventq
390  * locks while waiting for us due to the global event semantics.
391  */
392  {
394  // we do a global sync here that is supposed to happen at the same
395  // tick in all gem5 peers
396  if (!DistIface::sync->run(true))
397  return; // global sync aborted
398  // global sync completed
399  }
400  if (DistIface::sync->doCkpt)
401  exitSimLoop("checkpoint");
402  if (DistIface::sync->doExit) {
403  exitSimLoop("exit request from gem5 peers");
404  return;
405  }
406  if (DistIface::sync->doStopSync) {
407  DistIface::sync->doStopSync = false;
408  inform("synchronization disabled at %lu\n", curTick());
409 
410  // The switch node needs to wait for the next sync immediately.
411  if (DistIface::isSwitch) {
412  start();
413  } else {
414  // Wake up thread contexts on non-switch nodes.
415  for (int i = 0; i < DistIface::master->sys->numContexts(); i++) {
416  ThreadContext *tc =
418  if (tc->status() == ThreadContext::Suspended)
419  tc->activate();
420  else
421  warn_once("Tried to wake up thread in dist-gem5, but it "
422  "was already awake!\n");
423  }
424  }
425  return;
426  }
427  // schedule the next periodic sync
428  repeat = DistIface::sync->nextRepeat;
429  schedule(curTick() + repeat);
430 }
431 
432 void
434 {
435  // This is called from the receiver thread when it starts running. The new
436  // receiver thread shares the event queue with the simulation thread
437  // (associated with the simulated Ethernet link).
438  curEventQueue(eventManager->eventQueue());
439 
440  recvDone = recv_done;
441  linkDelay = link_delay;
442 }
443 
444 Tick
446  Tick send_delay,
447  Tick prev_recv_tick)
448 {
449  Tick recv_tick = send_tick + send_delay + linkDelay;
450  // sanity check (we need atleast a send delay long window)
451  assert(recv_tick >= prev_recv_tick + send_delay);
452  panic_if(prev_recv_tick + send_delay > recv_tick,
453  "Receive window is smaller than send delay");
454  panic_if(recv_tick <= curTick(),
455  "Simulators out of sync - missed packet receive by %llu ticks"
456  "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
457  "linkDelay: %lu )",
458  curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
459  linkDelay);
460 
461  return recv_tick;
462 }
463 
464 void
466 {
467  // Schedule pending packets asap in case link speed/delay changed when
468  // restoring from the checkpoint.
469  // This may be done during unserialize except that curTick() is unknown
470  // so we call this during drainResume().
471  // If we are not restoring from a checkppint then link latency could not
472  // change so we just return.
473  if (!ckptRestore)
474  return;
475 
477  while (!descQueue.empty()) {
478  Desc d = descQueue.front();
479  descQueue.pop();
480  d.sendTick = curTick();
481  d.sendDelay = d.packet->simLength; // assume 1 tick/byte max link speed
482  v.push_back(d);
483  }
484 
485  for (auto &d : v)
486  descQueue.push(d);
487 
488  if (recvDone->scheduled()) {
489  assert(!descQueue.empty());
490  eventManager->reschedule(recvDone, curTick());
491  } else {
492  assert(descQueue.empty() && v.empty());
493  }
494  ckptRestore = false;
495 }
496 
497 void
499  Tick send_tick,
500  Tick send_delay)
501 {
502  // Note : this is called from the receiver thread
503  curEventQueue()->lock();
504  Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
505 
506  DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
507  "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
508  send_tick, send_delay, linkDelay, recv_tick);
509  // Every packet must be sent and arrive in the same quantum
510  assert(send_tick > master->syncEvent->when() -
511  master->syncEvent->repeat);
512  // No packet may be scheduled for receive in the arrival quantum
513  assert(send_tick + send_delay + linkDelay > master->syncEvent->when());
514 
515  // Now we are about to schedule a recvDone event for the new data packet.
516  // We use the same recvDone object for all incoming data packets. Packet
517  // descriptors are saved in the ordered queue. The currently scheduled
518  // packet is always on the top of the queue.
519  // NOTE: we use the event queue lock to protect the receive desc queue,
520  // too, which is accessed both by the receiver thread and the simulation
521  // thread.
522  descQueue.emplace(new_packet, send_tick, send_delay);
523  if (descQueue.size() == 1) {
524  assert(!recvDone->scheduled());
525  eventManager->schedule(recvDone, recv_tick);
526  } else {
527  assert(recvDone->scheduled());
528  panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
529  "Out of order packet received (recv_tick: %lu top(): %lu\n",
530  recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
531  }
532  curEventQueue()->unlock();
533 }
534 
537 {
538  // Note : this is called from the simulation thread when a receive done
539  // event is being processed for the link. We assume that the thread holds
540  // the event queue queue lock when this is called!
541  EthPacketPtr next_packet = descQueue.front().packet;
542  descQueue.pop();
543 
544  if (descQueue.size() > 0) {
545  Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
546  descQueue.front().sendDelay,
547  curTick());
548  eventManager->schedule(recvDone, recv_tick);
549  }
550  prevRecvTick = curTick();
551  return next_packet;
552 }
553 
554 void
556 {
557  SERIALIZE_SCALAR(sendTick);
558  SERIALIZE_SCALAR(sendDelay);
559  packet->serialize("rxPacket", cp);
560 }
561 
562 void
564 {
565  UNSERIALIZE_SCALAR(sendTick);
566  UNSERIALIZE_SCALAR(sendDelay);
567  packet = std::make_shared<EthPacketData>();
568  packet->unserialize("rxPacket", cp);
569 }
570 
571 void
573 {
574  SERIALIZE_SCALAR(prevRecvTick);
575  // serialize the receive desc queue
576  std::queue<Desc> tmp_queue(descQueue);
577  unsigned n_desc_queue = descQueue.size();
578  assert(tmp_queue.size() == descQueue.size());
579  SERIALIZE_SCALAR(n_desc_queue);
580  for (int i = 0; i < n_desc_queue; i++) {
581  tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
582  tmp_queue.pop();
583  }
584  assert(tmp_queue.empty());
585 }
586 
587 void
589 {
590  assert(descQueue.size() == 0);
591  assert(!recvDone->scheduled());
592  assert(!ckptRestore);
593 
594  UNSERIALIZE_SCALAR(prevRecvTick);
595  // unserialize the receive desc queue
596  unsigned n_desc_queue;
597  UNSERIALIZE_SCALAR(n_desc_queue);
598  for (int i = 0; i < n_desc_queue; i++) {
599  Desc recv_desc;
600  recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
601  descQueue.push(recv_desc);
602  }
603  ckptRestore = true;
604 }
605 
606 DistIface::DistIface(unsigned dist_rank,
607  unsigned dist_size,
608  Tick sync_start,
609  Tick sync_repeat,
610  EventManager *em,
611  bool use_pseudo_op,
612  bool is_switch, int num_nodes) :
613  syncStart(sync_start), syncRepeat(sync_repeat),
614  recvThread(nullptr), recvScheduler(em), syncStartOnPseudoOp(use_pseudo_op),
615  rank(dist_rank), size(dist_size)
616 {
617  DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
618  isMaster = false;
619  if (master == nullptr) {
620  assert(sync == nullptr);
621  assert(syncEvent == nullptr);
622  isSwitch = is_switch;
623  if (is_switch)
624  sync = new SyncSwitch(num_nodes);
625  else
626  sync = new SyncNode();
627  syncEvent = new SyncEvent();
628  master = this;
629  isMaster = true;
630  }
632  distIfaceNum++;
633 }
634 
636 {
637  assert(recvThread);
638  recvThread->join();
639  delete recvThread;
640  if (distIfaceNum-- == 0) {
641  assert(syncEvent);
642  delete syncEvent;
643  assert(sync);
644  delete sync;
645  }
646  if (this == master)
647  master = nullptr;
648 }
649 
650 void
652 {
653  Header header;
654 
655  // Prepare a dist header packet for the Ethernet packet we want to
656  // send out.
658  header.sendTick = curTick();
659  header.sendDelay = send_delay;
660 
661  header.dataPacketLength = pkt->length;
662  header.simLength = pkt->simLength;
663 
664  // Send out the packet and the meta info.
665  sendPacket(header, pkt);
666 
667  DPRINTF(DistEthernetPkt,
668  "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
669  pkt->length, send_delay);
670 }
671 
672 void
673 DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
674 {
675  EthPacketPtr new_packet;
676  DistHeaderPkt::Header header;
677 
678  // Initialize receive scheduler parameters
679  recvScheduler.init(recv_done, link_delay);
680 
681  // Main loop to wait for and process any incoming message.
682  for (;;) {
683  // recvHeader() blocks until the next dist header packet comes in.
684  if (!recvHeader(header)) {
685  // We lost connection to the peer gem5 processes most likely
686  // because one of them called m5 exit. So we stop here.
687  // Grab the eventq lock to stop the simulation thread
688  curEventQueue()->lock();
689  exitSimLoop("connection to gem5 peer got closed");
690  curEventQueue()->unlock();
691  // The simulation thread may be blocked in processing an on-going
692  // global synchronisation. Abort the sync to give the simulation
693  // thread a chance to make progress and process the exit event.
694  sync->abort();
695  // Finish receiver thread
696  break;
697  }
698 
699  // We got a valid dist header packet, let's process it
700  if (header.msgType == MsgType::dataDescriptor) {
701  recvPacket(header, new_packet);
702  recvScheduler.pushPacket(new_packet,
703  header.sendTick,
704  header.sendDelay);
705  } else {
706  // everything else must be synchronisation related command
707  if (!sync->progress(header.sendTick,
708  header.syncRepeat,
709  header.needCkpt,
710  header.needExit,
711  header.needStopSync))
712  // Finish receiver thread if simulation is about to exit
713  break;
714  }
715  }
716 }
717 
718 void
719 DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
720 {
721  assert(recvThread == nullptr);
722 
723  recvThread = new std::thread(&DistIface::recvThreadFunc,
724  this,
725  const_cast<Event *>(recv_done),
726  link_delay);
727  recvThreadsNum++;
728 }
729 
732 {
733  DPRINTF(DistEthernet,"DistIFace::drain() called\n");
734  // This can be called multiple times in the same drain cycle.
735  if (this == master)
736  syncEvent->draining(true);
737  return DrainState::Drained;
738 }
739 
740 void
742  DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
743  if (this == master)
744  syncEvent->draining(false);
746 }
747 
748 void
750 {
751  // Drain the dist interface before the checkpoint is taken. We cannot call
752  // this as part of the normal drain cycle because this dist sync has to be
753  // called exactly once after the system is fully drained.
754  sync->drainComplete();
755 
756  unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
757 
758  SERIALIZE_SCALAR(rank_orig);
759  SERIALIZE_SCALAR(dist_iface_id_orig);
760 
761  recvScheduler.serializeSection(cp, "recvScheduler");
762  if (this == master) {
763  sync->serializeSection(cp, "Sync");
764  }
765 }
766 
767 void
769 {
770  unsigned rank_orig, dist_iface_id_orig;
771  UNSERIALIZE_SCALAR(rank_orig);
772  UNSERIALIZE_SCALAR(dist_iface_id_orig);
773 
774  panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
775  rank, rank_orig);
776  panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
777  "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
778  dist_iface_id_orig);
779 
780  recvScheduler.unserializeSection(cp, "recvScheduler");
781  if (this == master) {
782  sync->unserializeSection(cp, "Sync");
783  }
784 }
785 
786 void
787 DistIface::init(const Event *done_event, Tick link_delay)
788 {
789  // Init hook for the underlaying message transport to setup/finalize
790  // communication channels
791  initTransport();
792 
793  // Spawn a new receiver thread that will process messages
794  // coming in from peer gem5 processes.
795  // The receive thread will also schedule a (receive) doneEvent
796  // for each incoming data packet.
797  spawnRecvThread(done_event, link_delay);
798 
799 
800  // Adjust the periodic sync start and interval. Different DistIface
801  // might have different requirements. The singleton sync object
802  // will select the minimum values for both params.
803  assert(sync != nullptr);
805 
806  // Initialize the seed for random generator to avoid the same sequence
807  // in all gem5 peer processes
808  assert(master != nullptr);
809  if (this == master)
810  random_mt.init(5489 * (rank+1) + 257);
811 }
812 
813 void
815 {
816  DPRINTF(DistEthernet, "DistIface::startup() started\n");
817  // Schedule synchronization unless we are not a switch in pseudo_op mode.
818  if (this == master && (!syncStartOnPseudoOp || isSwitch))
819  syncEvent->start();
820  DPRINTF(DistEthernet, "DistIface::startup() done\n");
821 }
822 
823 bool
825 {
826  bool ret = true;
827  DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
828  "period:%lu\n", delay, period);
829  if (master) {
830  if (delay == 0) {
831  inform("m5 checkpoint called with zero delay => triggering collaborative "
832  "checkpoint\n");
834  } else {
835  inform("m5 checkpoint called with non-zero delay => triggering immediate "
836  "checkpoint (at the next sync)\n");
838  }
839  if (period != 0)
840  inform("Non-zero period for m5_ckpt is ignored in "
841  "distributed gem5 runs\n");
842  ret = false;
843  }
844  return ret;
845 }
846 
847 void
849 {
850  std::lock_guard<std::mutex> sync_lock(lock);
851  needStopSync = req;
852 }
853 
854 void
856 {
857  // Unforunate that we have to populate the system pointer member this way.
858  master->sys = tc->getSystemPtr();
859 
860  // The invariant for both syncing and "unsyncing" is that all threads will
861  // stop executing intructions until the desired sync state has been reached
862  // for all nodes. This is the easiest way to prevent deadlock (in the case
863  // of "unsyncing") and causality errors (in the case of syncing).
864  if (master->syncEvent->scheduled()) {
865  inform("Request toggling syncronization off\n");
867 
868  // At this point, we have no clue when everyone will reach the sync
869  // stop point. Suspend execution of all local thread contexts.
870  // Dist-gem5 will reactivate all thread contexts when everyone has
871  // reached the sync stop point.
872  for (int i = 0; i < master->sys->numContexts(); i++) {
874  if (tc->status() == ThreadContext::Active)
875  tc->quiesce();
876  }
877  } else {
878  inform("Request toggling syncronization on\n");
879  master->syncEvent->start();
880 
881  // We need to suspend all CPUs until the sync point is reached by all
882  // nodes to prevent causality errors. We can also schedule CPU
883  // activation here, since we know exactly when the next sync will
884  // occur.
885  for (int i = 0; i < master->sys->numContexts(); i++) {
887  if (tc->status() == ThreadContext::Active)
888  tc->quiesceTick(master->syncEvent->when() + 1);
889  }
890  }
891 }
892 
893 bool
895 {
896  bool ret = true;
897  DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
898  delay);
899  if (master) {
900  // To successfully coordinate an exit, all nodes must be synchronising
901  if (!master->syncEvent->scheduled())
902  master->syncEvent->start();
903 
904  if (delay == 0) {
905  inform("m5 exit called with zero delay => triggering collaborative "
906  "exit\n");
908  } else {
909  inform("m5 exit called with non-zero delay => triggering immediate "
910  "exit (at the next sync)\n");
912  }
913  ret = false;
914  }
915  return ret;
916 }
917 
918 uint64_t
920 {
921  uint64_t val;
922  if (master) {
923  val = master->rank;
924  } else {
925  warn("Dist-rank parameter is queried in single gem5 simulation.");
926  val = 0;
927  }
928  return val;
929 }
930 
931 uint64_t
933 {
934  uint64_t val;
935  if (master) {
936  val = master->size;
937  } else {
938  warn("Dist-size parameter is queried in single gem5 simulation.");
939  val = 1;
940  }
941  return val;
942 }
bool run(bool same_tick) override
Core method to perform a full dist sync.
Definition: dist_iface.cc:126
void pushPacket(EthPacketPtr new_packet, Tick send_tick, Tick send_delay)
Push a newly arrived packet into the desc queue.
Definition: dist_iface.cc:498
#define DPRINTF(x,...)
Definition: trace.hh:212
virtual System * getSystemPtr()=0
virtual void sendCmd(const Header &header)=0
Send out a control command to the remote end.
unsigned simLength
Length used for modeling timing in the simulator.
Definition: dist_packet.hh:93
Bitfield< 28 > v
Definition: miscregs.hh:1366
void start()
Schedule the first periodic sync event.
Definition: dist_iface.cc:333
void drainComplete()
Definition: dist_iface.cc:292
void spawnRecvThread(const Event *recv_done, Tick link_delay)
spawn the receiver thread.
Definition: dist_iface.cc:719
virtual void requestExit(ReqType req)=0
void startup()
Definition: dist_iface.cc:814
Bitfield< 7 > i
Definition: miscregs.hh:1378
DrainState
Object drain/handover states.
Definition: drain.hh:71
virtual void sendPacket(const Header &header, const EthPacketPtr &packet)=0
Send out a data packet to the remote end.
#define panic(...)
Definition: misc.hh:153
DrainState drain() override
Notify an object that it needs to drain its state.
Definition: dist_iface.cc:731
static unsigned recvThreadsNum
Number of receiver threads (in this gem5 process)
Definition: dist_iface.hh:502
void unserializeSection(CheckpointIn &cp, const char *name)
Unserialize an a child object.
Definition: serialize.cc:585
virtual bool recvHeader(Header &header)=0
Receive a header (i.e.
void abort()
Abort processing an on-going sync event (in case of an error, e.g.
Definition: dist_iface.cc:87
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
Definition: dist_iface.cc:201
bool run(bool same_tick) override
Core method to perform a full dist sync.
Definition: dist_iface.cc:158
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:555
bool syncStartOnPseudoOp
Use pseudoOp to start synchronization.
Definition: dist_iface.hh:476
panic_if(!root,"Invalid expression\n")
Bitfield< 2 > em
Definition: misc.hh:603
EthPacketPtr popPacket()
Fetch the next packet that is to be received by the simulated network link.
Definition: dist_iface.cc:536
static bool isSwitch
Is this node a switch?
Definition: dist_iface.hh:523
#define warn_once(...)
Definition: misc.hh:226
Definition: system.hh:83
void resumeRecvTicks()
Adjust receive ticks for pending packets when restoring from a checkpoint.
Definition: dist_iface.cc:465
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:563
void drainResume() override
Resume execution after a successful drain.
Definition: dist_iface.cc:741
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:768
void init(uint32_t s)
Definition: random.cc:68
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:306
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
Definition: dist_iface.cc:242
std::thread * recvThread
Receiver thread pointer.
Definition: dist_iface.hh:468
static void toggleSync(ThreadContext *tc)
Trigger the master to start/stop synchronization.
Definition: dist_iface.cc:855
ThreadContext is the external interface to all thread state for anything outside of the CPU...
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:321
virtual bool progress(Tick send_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync)=0
Callback when the receiver thread gets a sync ack message.
STL vector class.
Definition: stl.hh:40
Tick when() const
MsgType msgType
The msg type field is valid for all header packets.
Definition: dist_packet.hh:87
void quiesce()
Quiesce thread context.
virtual void initTransport()=0
Init hook for the underlaying transport.
Bitfield< 63 > val
Definition: misc.hh:770
bool scheduled() const
Received packet descriptor.
Definition: dist_iface.hh:341
#define warn(...)
Definition: misc.hh:219
static DistIface * master
The very first DistIface object created becomes the master.
Definition: dist_iface.hh:515
#define UNSERIALIZE_SCALAR(scalar)
Definition: serialize.hh:145
Tick curTick()
The current simulated tick.
Definition: core.hh:47
void packetOut(EthPacketPtr pkt, Tick send_delay)
Send out an Ethernet packet.
Definition: dist_iface.cc:651
std::string csprintf(const char *format, const Args &...args)
Definition: cprintf.hh:161
This class implements global sync operations among gem5 peer processes.
Definition: dist_iface.hh:118
static uint64_t sizeParam()
Getter for the dist size param.
Definition: dist_iface.cc:932
void init(const Event *e, Tick link_delay)
Definition: dist_iface.cc:787
uint64_t Tick
Tick count type.
Definition: types.hh:63
Tick calcReceiveTick(Tick send_tick, Tick send_delay, Tick prev_recv_tick)
Calculate the tick to schedule the next receive done event.
Definition: dist_iface.cc:445
Tick nextRepeat
The repeat value for the next periodic sync.
Definition: dist_iface.hh:151
Tick syncStart
Tick to schedule the first dist sync event.
Definition: dist_iface.hh:459
void init(Event *recv_done, Tick link_delay)
Initialize network link parameters.
Definition: dist_iface.cc:433
Bitfield< 9 > d
Definition: miscregs.hh:1375
static bool readyToCkpt(Tick delay, Tick period)
Initiate taking a checkpoint.
Definition: dist_iface.cc:824
void serializeSection(CheckpointOut &cp, const char *name) const
Serialize an object into a new section.
Definition: serialize.cc:578
EventQueue * curEventQueue()
Definition: eventq.hh:84
static SyncEvent * syncEvent
The singleton SyncEvent object to schedule periodic dist sync.
Definition: dist_iface.hh:510
std::shared_ptr< EthPacketData > EthPacketPtr
Definition: etherpkt.hh:90
DistIface(unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
ctor
Definition: dist_iface.cc:606
static bool readyToExit(Tick delay)
Initiate the exit from the simulation.
Definition: dist_iface.cc:894
const FlagsType none
Nothing extra to print.
Definition: info.hh:43
ThreadContext * getThreadContext(ContextID tid)
Definition: system.hh:203
bool doStopSync
Flag is set if sync is to stop upon sync completion.
Definition: dist_iface.hh:147
void requestExit(ReqType req) override
Definition: dist_iface.cc:281
virtual void activate()=0
Set the status to Active.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:327
The global event to schedule periodic dist sync.
Definition: dist_iface.hh:297
Draining buffers pending serialization/handover.
int numContexts()
Definition: system.hh:208
Bitfield< 20 > sr
unsigned size
The number of gem5 processes comprising this dist simulation.
Definition: dist_iface.hh:486
void recvThreadFunc(Event *recv_done, Tick link_delay)
The function executed by a receiver thread.
Definition: dist_iface.cc:673
bool isMaster
Definition: dist_iface.hh:496
#define SERIALIZE_SCALAR(scalar)
Definition: serialize.hh:143
RecvScheduler recvScheduler
Meta information about data packets received.
Definition: dist_iface.hh:472
void process() override
This is a global event so process() will only be called by exactly one simulation thread...
Definition: dist_iface.cc:367
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
Schedule an event to exit the simulation loop (returning to Python) at the end of the current cycle (...
Definition: sim_events.cc:83
ReqType needStopSync
Sync stop requested.
Definition: dist_iface.hh:219
int size()
Definition: pagetable.hh:146
void unlock()
Definition: eventq.hh:689
unsigned dataPacketLength
Actual length of the simulated Ethernet packet.
Definition: dist_packet.hh:103
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:588
void requestStopSync(ReqType req) override
Definition: dist_iface.cc:848
Tick syncRepeat
Frequency of dist sync events in ticks.
Definition: dist_iface.hh:463
static System * sys
System pointer used to wakeup sleeping threads when stopping sync.
Definition: dist_iface.hh:519
std::ostream CheckpointOut
Definition: serialize.hh:67
static Sync * sync
The singleton Sync object to perform dist synchronisation.
Definition: dist_iface.hh:506
void init(Tick start, Tick repeat)
Initialize periodic sync params.
Definition: dist_iface.cc:69
Definition: eventq.hh:185
unsigned distIfaceId
Unique id for the dist link.
Definition: dist_iface.hh:494
static unsigned distIfaceNum
Number of DistIface objects (i.e.
Definition: dist_iface.hh:490
virtual void recvPacket(const Header &header, EthPacketPtr &packet)=0
Receive a packet from the remote end.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:313
Random random_mt
Definition: random.cc:100
virtual void requestStopSync(ReqType req)=0
void requestCkpt(ReqType req) override
Definition: dist_iface.cc:270
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:572
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:749
virtual Status status() const =0
virtual ~DistIface()
Definition: dist_iface.cc:635
std::mutex lock
The lock to protect access to the Sync object.
Definition: dist_iface.hh:124
Temporarily inactive.
void lock()
Provide an interface for locking/unlocking the event queue.
Definition: eventq.hh:688
void quiesceTick(Tick resume)
Quiesce, suspend, and schedule activate at resume.
bool draining() const
Definition: dist_iface.hh:323
virtual void requestCkpt(ReqType req)=0
Temporarily release the event queue service lock.
Definition: eventq.hh:578
The interface class to talk to peer gem5 processes.
Definition: dist_iface.hh:101
#define inform(...)
Definition: misc.hh:221
static uint64_t rankParam()
Getter for the dist rank param.
Definition: dist_iface.cc:919
Bitfield< 5 > lock
Definition: types.hh:79
SyncSwitch(int num_nodes)
Definition: dist_iface.cc:96
unsigned rank
The rank of this process among the gem5 peers.
Definition: dist_iface.hh:482

Generated on Fri Jun 9 2017 13:03:46 for gem5 by doxygen 1.8.6