52 #include "debug/DistEthernet.hh"
53 #include "debug/DistEthernetPkt.hh"
71 if (start_tick < nextAt) {
73 inform(
"Next dist synchronisation tick is changed to %lu.\n", nextAt);
77 panic(
"Dist synchronisation interval must be greater than zero");
79 if (repeat_tick < nextRepeat) {
80 nextRepeat = repeat_tick;
81 inform(
"Dist synchronisation interval is changed to %lu.\n",
89 std::unique_lock<std::mutex> sync_lock(
lock);
106 nextAt = std::numeric_limits<Tick>::max();
107 nextRepeat = std::numeric_limits<Tick>::max();
120 nextAt = std::numeric_limits<Tick>::max();
121 nextRepeat = std::numeric_limits<Tick>::max();
128 std::unique_lock<std::mutex> sync_lock(
lock);
131 assert(waitNum == 0);
135 header.
msgType = MsgType::cmdSyncReq;
141 needCkpt = ReqType::pending;
144 needExit = ReqType::pending;
146 needStopSync = ReqType::pending;
149 auto lf = [
this]{
return waitNum == 0; };
150 cv.wait(sync_lock, lf);
152 assert(isAbort || !same_tick || (nextAt ==
curTick()));
160 std::unique_lock<std::mutex> sync_lock(
lock);
164 auto lf = [
this]{
return waitNum == 0; };
165 cv.wait(sync_lock, lf);
167 assert(waitNum == 0);
170 assert(!same_tick || (nextAt ==
curTick()));
173 header.
msgType = MsgType::cmdSyncAck;
176 if (doCkpt || numCkptReq == numNodes) {
178 header.
needCkpt = ReqType::immediate;
183 if (doExit || numExitReq == numNodes) {
185 header.
needExit = ReqType::immediate;
189 if (doStopSync || numStopSyncReq == numNodes) {
207 std::unique_lock<std::mutex> sync_lock(
lock);
212 if (send_tick > nextAt)
214 if (nextRepeat > sync_repeat)
215 nextRepeat = sync_repeat;
217 if (need_ckpt == ReqType::collective)
219 else if (need_ckpt == ReqType::immediate)
221 if (need_exit == ReqType::collective)
223 else if (need_exit == ReqType::immediate)
225 if (need_stop_sync == ReqType::collective)
227 else if (need_stop_sync == ReqType::immediate)
248 std::unique_lock<std::mutex> sync_lock(
lock);
253 nextAt = max_send_tick;
254 nextRepeat = next_repeat;
272 std::lock_guard<std::mutex> sync_lock(
lock);
275 warn(
"Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
283 std::lock_guard<std::mutex> sync_lock(
lock);
286 warn(
"Exit requested multiple times (req:%d)\n", static_cast<int>(req));
308 int need_exit =
static_cast<int>(needExit);
317 needExit =
static_cast<ReqType>(need_exit);
344 panic(
"DistIface::SyncEvent::start() aborted\n");
353 assert(!scheduled());
362 inform(
"Dist sync scheduled at %lu and repeats %lu\n", when(),
380 "Distributed sync is hit while draining");
421 warn_once(
"Tried to wake up thread in dist-gem5, but it "
422 "was already awake!\n");
440 recvDone = recv_done;
441 linkDelay = link_delay;
449 Tick recv_tick = send_tick + send_delay + linkDelay;
451 assert(recv_tick >= prev_recv_tick + send_delay);
452 panic_if(prev_recv_tick + send_delay > recv_tick,
453 "Receive window is smaller than send delay");
455 "Simulators out of sync - missed packet receive by %llu ticks"
456 "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
458 curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
477 while (!descQueue.empty()) {
478 Desc d = descQueue.front();
488 if (recvDone->scheduled()) {
489 assert(!descQueue.empty());
490 eventManager->reschedule(recvDone,
curTick());
492 assert(descQueue.empty() && v.empty());
504 Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
506 DPRINTF(DistEthernetPkt,
"DistIface::recvScheduler::pushPacket "
507 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
508 send_tick, send_delay, linkDelay, recv_tick);
510 assert(send_tick > master->syncEvent->when() -
511 master->syncEvent->repeat);
513 assert(send_tick + send_delay + linkDelay > master->syncEvent->when());
522 descQueue.emplace(new_packet, send_tick, send_delay);
523 if (descQueue.size() == 1) {
524 assert(!recvDone->scheduled());
525 eventManager->schedule(recvDone, recv_tick);
527 assert(recvDone->scheduled());
528 panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
529 "Out of order packet received (recv_tick: %lu top(): %lu\n",
530 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
544 if (descQueue.size() > 0) {
545 Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
546 descQueue.front().sendDelay,
548 eventManager->schedule(recvDone, recv_tick);
559 packet->serialize(
"rxPacket", cp);
567 packet = std::make_shared<EthPacketData>();
568 packet->unserialize(
"rxPacket", cp);
576 std::queue<Desc> tmp_queue(descQueue);
577 unsigned n_desc_queue = descQueue.size();
578 assert(tmp_queue.size() == descQueue.size());
580 for (
int i = 0;
i < n_desc_queue;
i++) {
581 tmp_queue.front().serializeSection(cp,
csprintf(
"rxDesc_%d",
i));
584 assert(tmp_queue.empty());
590 assert(descQueue.size() == 0);
591 assert(!recvDone->scheduled());
592 assert(!ckptRestore);
596 unsigned n_desc_queue;
598 for (
int i = 0;
i < n_desc_queue;
i++) {
601 descQueue.push(recv_desc);
612 bool is_switch,
int num_nodes) :
613 syncStart(sync_start), syncRepeat(sync_repeat),
614 recvThread(nullptr), recvScheduler(em), syncStartOnPseudoOp(use_pseudo_op),
615 rank(dist_rank),
size(dist_size)
617 DPRINTF(DistEthernet,
"DistIface() ctor rank:%d\n",dist_rank);
620 assert(
sync ==
nullptr);
668 "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
669 pkt->length, send_delay);
725 const_cast<Event *>(recv_done),
733 DPRINTF(DistEthernet,
"DistIFace::drain() called\n");
742 DPRINTF(DistEthernet,
"DistIFace::drainResume() called\n");
770 unsigned rank_orig, dist_iface_id_orig;
774 panic_if(
rank != rank_orig,
"Rank mismatch at resume (rank=%d, orig=%d)",
777 "at resume (distIfaceId=%d, orig=%d)",
distIfaceId,
803 assert(
sync !=
nullptr);
808 assert(
master !=
nullptr);
816 DPRINTF(DistEthernet,
"DistIface::startup() started\n");
820 DPRINTF(DistEthernet,
"DistIface::startup() done\n");
827 DPRINTF(DistEthernet,
"DistIface::readyToCkpt() called, delay:%lu "
828 "period:%lu\n", delay, period);
831 inform(
"m5 checkpoint called with zero delay => triggering collaborative "
835 inform(
"m5 checkpoint called with non-zero delay => triggering immediate "
836 "checkpoint (at the next sync)\n");
840 inform(
"Non-zero period for m5_ckpt is ignored in "
841 "distributed gem5 runs\n");
850 std::lock_guard<std::mutex> sync_lock(
lock);
865 inform(
"Request toggling syncronization off\n");
878 inform(
"Request toggling syncronization on\n");
897 DPRINTF(DistEthernet,
"DistIface::readyToExit() called, delay:%lu\n",
905 inform(
"m5 exit called with zero delay => triggering collaborative "
909 inform(
"m5 exit called with non-zero delay => triggering immediate "
910 "exit (at the next sync)\n");
925 warn(
"Dist-rank parameter is queried in single gem5 simulation.");
938 warn(
"Dist-size parameter is queried in single gem5 simulation.");
bool run(bool same_tick) override
Core method to perform a full dist sync.
void pushPacket(EthPacketPtr new_packet, Tick send_tick, Tick send_delay)
Push a newly arrived packet into the desc queue.
virtual System * getSystemPtr()=0
virtual void sendCmd(const Header &header)=0
Send out a control command to the remote end.
void start()
Schedule the first periodic sync event.
void spawnRecvThread(const Event *recv_done, Tick link_delay)
spawn the receiver thread.
virtual void requestExit(ReqType req)=0
DrainState
Object drain/handover states.
virtual void sendPacket(const Header &header, const EthPacketPtr &packet)=0
Send out a data packet to the remote end.
DrainState drain() override
Notify an object that it needs to drain its state.
static unsigned recvThreadsNum
Number of receiver threads (in this gem5 process)
void unserializeSection(CheckpointIn &cp, const char *name)
Unserialize an a child object.
virtual bool recvHeader(Header &header)=0
Receive a header (i.e.
void abort()
Abort processing an on-going sync event (in case of an error, e.g.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
bool run(bool same_tick) override
Core method to perform a full dist sync.
void serialize(CheckpointOut &cp) const override
Serialize an object.
bool syncStartOnPseudoOp
Use pseudoOp to start synchronization.
panic_if(!root,"Invalid expression\n")
EthPacketPtr popPacket()
Fetch the next packet that is to be received by the simulated network link.
static bool isSwitch
Is this node a switch?
void resumeRecvTicks()
Adjust receive ticks for pending packets when restoring from a checkpoint.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
void drainResume() override
Resume execution after a successful drain.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
void serialize(CheckpointOut &cp) const override
Serialize an object.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
std::thread * recvThread
Receiver thread pointer.
static void toggleSync(ThreadContext *tc)
Trigger the master to start/stop synchronization.
ThreadContext is the external interface to all thread state for anything outside of the CPU...
void serialize(CheckpointOut &cp) const override
Serialize an object.
virtual bool progress(Tick send_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync)=0
Callback when the receiver thread gets a sync ack message.
void quiesce()
Quiesce thread context.
virtual void initTransport()=0
Init hook for the underlaying transport.
Received packet descriptor.
static DistIface * master
The very first DistIface object created becomes the master.
#define UNSERIALIZE_SCALAR(scalar)
Tick curTick()
The current simulated tick.
void packetOut(EthPacketPtr pkt, Tick send_delay)
Send out an Ethernet packet.
std::string csprintf(const char *format, const Args &...args)
This class implements global sync operations among gem5 peer processes.
static uint64_t sizeParam()
Getter for the dist size param.
void init(const Event *e, Tick link_delay)
uint64_t Tick
Tick count type.
Tick calcReceiveTick(Tick send_tick, Tick send_delay, Tick prev_recv_tick)
Calculate the tick to schedule the next receive done event.
Tick nextRepeat
The repeat value for the next periodic sync.
Tick syncStart
Tick to schedule the first dist sync event.
void init(Event *recv_done, Tick link_delay)
Initialize network link parameters.
static bool readyToCkpt(Tick delay, Tick period)
Initiate taking a checkpoint.
void serializeSection(CheckpointOut &cp, const char *name) const
Serialize an object into a new section.
EventQueue * curEventQueue()
static SyncEvent * syncEvent
The singleton SyncEvent object to schedule periodic dist sync.
std::shared_ptr< EthPacketData > EthPacketPtr
DistIface(unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
ctor
static bool readyToExit(Tick delay)
Initiate the exit from the simulation.
const FlagsType none
Nothing extra to print.
ThreadContext * getThreadContext(ContextID tid)
bool doStopSync
Flag is set if sync is to stop upon sync completion.
void requestExit(ReqType req) override
virtual void activate()=0
Set the status to Active.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
The global event to schedule periodic dist sync.
Draining buffers pending serialization/handover.
unsigned size
The number of gem5 processes comprising this dist simulation.
void recvThreadFunc(Event *recv_done, Tick link_delay)
The function executed by a receiver thread.
#define SERIALIZE_SCALAR(scalar)
RecvScheduler recvScheduler
Meta information about data packets received.
void process() override
This is a global event so process() will only be called by exactly one simulation thread...
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
Schedule an event to exit the simulation loop (returning to Python) at the end of the current cycle (...
ReqType needStopSync
Sync stop requested.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
void requestStopSync(ReqType req) override
Tick syncRepeat
Frequency of dist sync events in ticks.
static System * sys
System pointer used to wakeup sleeping threads when stopping sync.
std::ostream CheckpointOut
static Sync * sync
The singleton Sync object to perform dist synchronisation.
void init(Tick start, Tick repeat)
Initialize periodic sync params.
unsigned distIfaceId
Unique id for the dist link.
static unsigned distIfaceNum
Number of DistIface objects (i.e.
virtual void recvPacket(const Header &header, EthPacketPtr &packet)=0
Receive a packet from the remote end.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
virtual void requestStopSync(ReqType req)=0
void requestCkpt(ReqType req) override
void serialize(CheckpointOut &cp) const override
Serialize an object.
void serialize(CheckpointOut &cp) const override
Serialize an object.
virtual Status status() const =0
std::mutex lock
The lock to protect access to the Sync object.
void lock()
Provide an interface for locking/unlocking the event queue.
void quiesceTick(Tick resume)
Quiesce, suspend, and schedule activate at resume.
virtual void requestCkpt(ReqType req)=0
Temporarily release the event queue service lock.
The interface class to talk to peer gem5 processes.
static uint64_t rankParam()
Getter for the dist rank param.
SyncSwitch(int num_nodes)
unsigned rank
The rank of this process among the gem5 peers.