-
Notifications
You must be signed in to change notification settings - Fork 48
WIP: Update network-mpi.c #133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
c70c5f2
0d7cb73
98d21e1
36de587
4231e96
4f21dc2
d84c56b
8ee4f47
39d24e3
0bc64a9
5191113
3bf12a3
426d8b9
3ff0b84
df0e564
370affc
1036df6
8c845f5
c2ed608
820c694
62037ae
cb76432
21d7151
2d65f87
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,13 +14,68 @@ struct act_q | |
| MPI_Request *req_list; | ||
| int *idx_list; | ||
| MPI_Status *status_list; | ||
| int *free_idx_list;//add, que of free indices | ||
|
|
||
|
|
||
| #if ROSS_MEMORY | ||
| char **buffers; | ||
| #endif | ||
|
|
||
| unsigned int cur; | ||
| unsigned int cur; | ||
| int front;//add, front of queue | ||
| int coda;//add, back of queue but back is already a variable somewhere | ||
| int sizeOfQ;//add, size of queue array | ||
| int numInQ;//add, number of elements in queue | ||
|
||
|
|
||
| // Deal with filling queue, then plateauing | ||
|
|
||
| }; | ||
|
|
||
| int deal_with_cur(struct act_q *q)// try this | ||
| { | ||
| if(q->cur < (q->sizeOfQ-1)) | ||
| { | ||
| q->cur++; | ||
| return 1; | ||
| } | ||
| else | ||
| { | ||
| return 1; | ||
| } | ||
| } | ||
|
|
||
|
|
||
| int fr_q_chq(struct act_q *q, int *frontOrCoda) //free index queue; check for modulating the front or back index of que | ||
| { | ||
| if(*frontOrCoda != q->sizeOfQ)//don't mess with queue | ||
| { | ||
| return 0;// return probably not necessary | ||
| } | ||
| else//mess with queue | ||
| { | ||
| *frontOrCoda = 0; | ||
| return 0; | ||
| } | ||
| } | ||
|
|
||
| void fr_q_aq(struct act_q *q, int ele) // free index queue; add to queue | ||
| { | ||
| q->free_idx_list[q->coda] = ele; | ||
| q->coda++; | ||
| q->numInQ++; | ||
| fr_q_chq(q,&q->coda);//wraps the queue array around | ||
|
|
||
| } | ||
|
|
||
| int fr_q_dq(struct act_q *q) // free index queue; dequeue | ||
| { | ||
| int rv =q->free_idx_list[q->front]; | ||
| q->front++; | ||
| q->numInQ--; | ||
| fr_q_chq(q,&q->front);// wraps the queue array around | ||
|
|
||
| return rv; | ||
| } | ||
| #define EVENT_TAG 1 | ||
|
|
||
| #if ROSS_MEMORY | ||
|
|
@@ -101,7 +156,19 @@ init_q(struct act_q *q, const char *name) | |
| q->event_list = (tw_event **) tw_calloc(TW_LOC, name, sizeof(*q->event_list), n); | ||
| q->req_list = (MPI_Request *) tw_calloc(TW_LOC, name, sizeof(*q->req_list), n); | ||
| q->idx_list = (int *) tw_calloc(TW_LOC, name, sizeof(*q->idx_list), n); | ||
| q->status_list = (MPI_Status *) tw_calloc(TW_LOC, name, sizeof(*q->status_list), n); | ||
| q->free_idx_list = (int *) tw_calloc(TW_LOC, name, sizeof(*q->idx_list), n); | ||
|
||
| q->status_list = (MPI_Status *) tw_calloc(TW_LOC, name, sizeof(*q->status_list), n+1);// queue, n+1 is meant to prevent a full queue | ||
| q->front = 0;// front of queue | ||
| q->coda = 0;// end of queue | ||
| q->sizeOfQ=n+1;// for wraparound | ||
| q->numInQ= 0;// number of elements in queue | ||
|
|
||
| int i = 0; | ||
| while(i<n) // initializes the queue | ||
| { | ||
| fr_q_aq( q , i) ; | ||
| i++; | ||
| } | ||
|
|
||
| #if ROSS_MEMORY | ||
| q->buffers = tw_calloc(TW_LOC, name, sizeof(*q->buffers), n); | ||
|
|
@@ -207,7 +274,7 @@ tw_net_minimum(tw_pe *me) | |
| e = e->next; | ||
| } | ||
|
|
||
| for (i = 0; i < posted_sends.cur; i++) { | ||
| for (i = 0; i < posted_sends.cur; i++) { //fix this line (?) | ||
| e = posted_sends.event_list[i]; | ||
| if (m > e->recv_ts) | ||
|
||
| m = e->recv_ts; | ||
|
|
@@ -228,7 +295,10 @@ test_q( | |
| char *tmp; | ||
| #endif | ||
|
|
||
| if (!q->cur) | ||
| // if ( !q->cur || q->numInQ == ((q->sizeOfQ)-1) ) //fixed this line (?) if queue is full, no elements are being processed | ||
| // return 0; | ||
|
|
||
| if( q->numInQ == ((q->sizeOfQ)-1) ) | ||
| return 0; | ||
|
|
||
| if (MPI_Testsome( | ||
|
|
@@ -254,6 +324,7 @@ test_q( | |
| n = q->idx_list[i]; | ||
| e = q->event_list[n]; | ||
| q->event_list[n] = NULL; | ||
| fr_q_aq(q,n);//add n onto queue | ||
|
|
||
| #if ROSS_MEMORY | ||
| finish(me, e, q->buffers[n]); | ||
|
|
@@ -263,7 +334,8 @@ test_q( | |
| } | ||
|
|
||
| /* Collapse the lists to remove any holes we left. */ | ||
| for (i = 0, n = 0; i < q->cur; i++) | ||
| /* | ||
| for (i = 0, n = 0; i < q->cur; i++)//fix these lines | ||
| { | ||
| if (q->event_list[i]) | ||
| { | ||
|
|
@@ -288,8 +360,8 @@ test_q( | |
| n++; | ||
| } // endif (q->event_list[i]) | ||
| } | ||
| q->cur -= ready; | ||
|
|
||
| q->cur -= ready;//fix this line | ||
| */ | ||
| return 1; | ||
| } | ||
|
|
||
|
|
@@ -303,14 +375,15 @@ recv_begin(tw_pe *me) | |
| int flag = 0; | ||
| int changed = 0; | ||
|
|
||
| while (posted_recvs.cur < read_buffer) | ||
| while (0 < posted_recvs.numInQ)//fix these lines | ||
| { | ||
| unsigned id = posted_recvs.cur; | ||
|
|
||
| int id = fr_q_dq(&posted_recvs); | ||
|
||
|
|
||
| if(!(e = tw_event_grab(me))) | ||
| { | ||
| if(tw_gvt_inprogress(me)) | ||
| tw_error(TW_LOC, "Out of events in GVT! Consider increasing --extramem"); | ||
| tw_error(TW_LOC, "out of events in GVT!"); | ||
|
||
| return changed; | ||
| } | ||
|
|
||
|
|
@@ -337,7 +410,8 @@ recv_begin(tw_pe *me) | |
| } | ||
|
|
||
| posted_recvs.event_list[id] = e; | ||
| posted_recvs.cur++; | ||
| deal_with_cur(&posted_recvs); | ||
| // fixed in fr_q_dq //posted_recvs.cur++; //fix this line | ||
| changed = 1; | ||
| } | ||
|
|
||
|
|
@@ -348,7 +422,6 @@ static void | |
| recv_finish(tw_pe *me, tw_event *e, char * buffer) | ||
| { | ||
| tw_pe *dest_pe; | ||
| tw_clock start; | ||
|
|
||
| #if ROSS_MEMORY | ||
| tw_memory *memory; | ||
|
|
@@ -460,9 +533,7 @@ recv_finish(tw_pe *me, tw_event *e, char * buffer) | |
| /* Fast case, we are sending to our own PE and | ||
| * there is no rollback caused by this send. | ||
| */ | ||
| start = tw_clock_read(); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line and lines 351 and 465 needs to be added back in as well (same reason as |
||
| tw_pq_enqueue(dest_pe->pq, e); | ||
| dest_pe->stats.s_pq += tw_clock_read() - start; | ||
| return; | ||
| } | ||
|
|
||
|
|
@@ -493,12 +564,13 @@ send_begin(tw_pe *me) | |
| { | ||
| int changed = 0; | ||
|
|
||
| while (posted_sends.cur < send_buffer) | ||
| while (0 < posted_sends.numInQ)//fixed these line (hopefully) | ||
| { | ||
| tw_event *e = tw_eventq_peek(&outq); | ||
| tw_event *e = tw_eventq_peek(&outq);//next event? | ||
| tw_node *dest_node = NULL; | ||
|
|
||
| unsigned id = posted_sends.cur; | ||
| int id = fr_q_dq(&posted_sends);// fixed, grabs from front of queue, moves front up one element | ||
|
||
| // posted_sends.cur; //fix this line | ||
|
|
||
| #if ROSS_MEMORY | ||
| tw_event *tmp_prev = NULL; | ||
|
|
@@ -609,7 +681,9 @@ send_begin(tw_pe *me) | |
| : TW_net_asend; | ||
|
|
||
| posted_sends.event_list[id] = e; | ||
| posted_sends.cur++; | ||
| deal_with_cur(&posted_sends); | ||
|
|
||
| // fixed in fr_q_dq //posted_sends.cur++;//fix this line | ||
| me->s_nwhite_sent++; | ||
|
|
||
| changed = 1; | ||
|
|
@@ -786,13 +860,31 @@ tw_net_statistics(tw_pe * me, tw_statistics * s) | |
|
|
||
| if(MPI_Reduce(&(s->s_net_events), | ||
| &me->stats.s_net_events, | ||
| 17, | ||
| 16, | ||
|
||
| MPI_UNSIGNED_LONG_LONG, | ||
| MPI_SUM, | ||
| (int)g_tw_masternode, | ||
| MPI_COMM_ROSS) != MPI_SUCCESS) | ||
| tw_error(TW_LOC, "Unable to reduce statistics!"); | ||
|
|
||
| if(MPI_Reduce(&s->s_total, | ||
| &me->stats.s_total, | ||
| 8, | ||
| MPI_UNSIGNED_LONG_LONG, | ||
| MPI_MAX, | ||
| (int)g_tw_masternode, | ||
| MPI_COMM_ROSS) != MPI_SUCCESS) | ||
| tw_error(TW_LOC, "Unable to reduce statistics!"); | ||
|
|
||
| if(MPI_Reduce(&s->s_pe_event_ties, | ||
| &me->stats.s_pe_event_ties, | ||
| 1, | ||
| MPI_UNSIGNED_LONG_LONG, | ||
| MPI_SUM, | ||
| (int)g_tw_masternode, | ||
| MPI_COMM_ROSS) != MPI_SUCCESS) | ||
| tw_error(TW_LOC, "Unable to reduce statistics!"); | ||
|
|
||
| if(MPI_Reduce(&s->s_min_detected_offset, | ||
| &me->stats.s_min_detected_offset, | ||
| 1, | ||
|
|
@@ -802,24 +894,69 @@ tw_net_statistics(tw_pe * me, tw_statistics * s) | |
| MPI_COMM_ROSS) != MPI_SUCCESS) | ||
| tw_error(TW_LOC, "Unable to reduce statistics!"); | ||
|
|
||
| if(MPI_Reduce(&(s->s_total), | ||
| &me->stats.s_total, | ||
| 16, | ||
| MPI_UNSIGNED_LONG_LONG, | ||
| MPI_MAX, | ||
| (int)g_tw_masternode, | ||
| MPI_COMM_ROSS) != MPI_SUCCESS) | ||
| if(MPI_Reduce(&s->s_avl, | ||
| &me->stats.s_avl, | ||
| 1, | ||
| MPI_UNSIGNED_LONG_LONG, | ||
| MPI_MAX, | ||
| (int)g_tw_masternode, | ||
| MPI_COMM_ROSS) != MPI_SUCCESS) | ||
| tw_error(TW_LOC, "Unable to reduce statistics!"); | ||
|
|
||
| if (MPI_Reduce(&s->s_buddy, | ||
| &me->stats.s_buddy, | ||
| 1, | ||
| MPI_UNSIGNED_LONG_LONG, | ||
| MPI_MAX, | ||
| (int)g_tw_masternode, | ||
| MPI_COMM_ROSS) != MPI_SUCCESS) | ||
| tw_error(TW_LOC, "Unable to reduce statistics!"); | ||
|
|
||
| if (MPI_Reduce(&s->s_lz4, | ||
| &me->stats.s_lz4, | ||
| 1, | ||
| MPI_UNSIGNED_LONG_LONG, | ||
| MPI_MAX, | ||
| (int)g_tw_masternode, | ||
| MPI_COMM_ROSS) != MPI_SUCCESS) | ||
| tw_error(TW_LOC, "Unable to reduce statistics!"); | ||
|
|
||
| if (MPI_Reduce(&s->s_events_past_end, | ||
| &me->stats.s_events_past_end, | ||
| 3, | ||
| 1, | ||
| MPI_UNSIGNED_LONG_LONG, | ||
| MPI_SUM, | ||
| (int)g_tw_masternode, | ||
| MPI_COMM_ROSS) != MPI_SUCCESS) | ||
| tw_error(TW_LOC, "Unable to reduce statistics!"); | ||
|
|
||
| if (MPI_Reduce(&g_st_stat_comp_ctr, | ||
| &me->stats.s_stat_comp, | ||
| 1, | ||
| MPI_UNSIGNED_LONG_LONG, | ||
| MPI_MAX, | ||
| (int)g_tw_masternode, | ||
| MPI_COMM_ROSS) != MPI_SUCCESS) | ||
| tw_error(TW_LOC, "Unable to reduce statistics!"); | ||
|
|
||
| if (MPI_Reduce(&g_st_stat_write_ctr, | ||
| &me->stats.s_stat_write, | ||
| 1, | ||
| MPI_UNSIGNED_LONG_LONG, | ||
| MPI_MAX, | ||
| (int)g_tw_masternode, | ||
| MPI_COMM_ROSS) != MPI_SUCCESS) | ||
| tw_error(TW_LOC, "Unable to reduce statistics!"); | ||
|
|
||
| if(MPI_Reduce(&(s->s_alp_nevent_processed), | ||
| &me->stats.s_alp_nevent_processed, | ||
| 2, | ||
| MPI_UNSIGNED_LONG_LONG, | ||
| MPI_SUM, | ||
| (int)g_tw_masternode, | ||
| MPI_COMM_ROSS) != MPI_SUCCESS) | ||
| tw_error(TW_LOC, "Unable to reduce statistics!"); | ||
|
|
||
| #ifdef USE_RIO | ||
| if (MPI_Reduce(&s->s_rio_load, | ||
| &me->stats.s_rio_load, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent, we use snake_case, not camelCase.