一、概述
線性流水線與非線性流水線是CPU中指令處理流水線的一種分類標(biāo)準(zhǔn)。線性流水線很好理解,就是一條路走到黑的流水線;非線性流水線則不同,它可能存在前饋與反饋,每個部件可能使用一次或多次,它就沒法像線性流水線那么一個一個部件按部就班的走。因此出現(xiàn)了一個問題,如果我第一個任務(wù)第二次使用部件A,第二個任務(wù)恰好第一次也使用部件A,這會怎么樣?
出現(xiàn)矛盾了,流水線卡住了。這不好,因此需要流水線調(diào)度算法來安排好每一個任務(wù),在讓它們不沖突的同時,最大可能提高流水線的效率。
二、分析
1、非線性流水線的描述
線性流水線能夠用流水線連接圖唯一表示,非線性流水線則不行,它的描述需要流水線連接圖和預(yù)約表的協(xié)同作用。一個流水線連接圖可能有好幾種執(zhí)行順序,也就對應(yīng)好幾個可能的預(yù)約表,一個預(yù)約表也可能對應(yīng)好幾張流水線連接圖。只有同時看這兩者,才能確定最終的處理順序。下圖就是一張預(yù)約表。
這是一個有四個組件,七個流水段的流水線。橫軸代表時間,縱軸代表組件。×則代表在某一時刻當(dāng)前使用的組件。因此我們可以得知,每個框中只能有一個×,否則會出現(xiàn)沖突。
2、調(diào)度算法的推導(dǎo)
首先,我們要提出三個定義:
啟動距離、禁止向量、沖突向量。
啟動距離指的是第一個任務(wù)進(jìn)入流水線后,第二個任務(wù)進(jìn)入時,不發(fā)生沖突的時間。顯然,啟動距離又長又短,我們的目的就是找出平均啟動距離最短的一個任務(wù)載入序列。
禁止向量指的是預(yù)約表中每一行任意兩個×之間距離的集合。例如上圖中,禁止向量為(2,4,6)。
沖突向量的指的是如下一個向量:(Cm,Cm-1......C2C1),其中C為0或1;m為禁止向量中的最大值。對于Ci,如果禁止向量在i中,則Ci=1,否則Ci=0。
上圖預(yù)約表對應(yīng)的沖突向量指的是(101010)。
我們可以這樣理解沖突向量:對于一個任務(wù)x,其耗時為nk,n為流水段數(shù)量,k為每一流水段耗費(fèi)的時間。假設(shè)各流水段好費(fèi)時間相同。那么,在x進(jìn)入后的第k,2k...nk時刻,均可以加載下一個任務(wù)y。然而可能出現(xiàn)沖突。可以通過計(jì)算沖突向量來規(guī)避這種沖突:
若沖突向量的Cm=1,則x進(jìn)入后的第mk時刻不能加載y,否則會沖突。如圖,C1=0,C2=1。如果在x進(jìn)入后2時刻,即橫軸為3的時候加載y,那么在橫軸為5的時候,x與y爭用S3部件,出現(xiàn)沖突。也就是說,y可以在C1、C3、C5也就是橫軸為2、4、6的時候載入。
現(xiàn)在我們假設(shè)y在C3載入,也就是在橫軸為4處載入。對于y來說,在它沒結(jié)束之前,有哪些時刻不能載入任務(wù)呢?很容易想的一點(diǎn)是:在C2、C4、C6不能載入。因?yàn)閥是和x一樣的任務(wù),因此x中不能載入的時刻,y也不能載入。但這夠了么?還不夠,x還會對y之后的任務(wù)產(chǎn)生影響,如何表達(dá)這種影響呢?用x的沖突向量來表達(dá)。
接著拿y在C3載入舉例,這時,從x的沖突向量可以知道,橫軸為3、5、7的時候不能載入,從y的沖突向量可以知道,橫軸為6、8、10的時候無法載入。做一下并集,在3、5、6、7、8、10的時候不能載入。由于y在4的時候載入,因此僅需要注意5、6、7、8、10即可。由5、6、7、8、10可以得出y的沖突向量101111。這是我們手動算出來的。如何通過數(shù)學(xué)方法計(jì)算呢?
通過右移與按位取并操作進(jìn)行。
如上圖,右移3代表選擇x的非沖突時刻C3,右移3得到的000101代表對于y來說,x的影響,101010是y自身的影響。二者疊加可得所有不能載入的時刻。
再如上圖,初始任務(wù)x的沖突向量為1010100,第二個任務(wù)y若選擇在2時間后載入,那么x對其的影響可以寫為0010101;兩個影響疊加為1010101,與最初的影響相同;第二個任務(wù)y若選擇在1時間后載入,那么x對其的影響為0101010,兩個影響合并為1111110,這是y的沖突向量;第三個任務(wù)z若選擇在y后1時間載入,那么y對其的影響為0111111,兩個影響合并為1111111。
也可以將1111111看成是xyz三個任務(wù)共同影響的結(jié)果,因?yàn)?111111是x的沖突向量右移兩位并上y右移一位并上z得到的。
如此,我們需要得到所有沖突向量右移所有的0的位置得到的所有的結(jié)果,然后按位取并,繼續(xù)右移繼續(xù)取并,直到結(jié)果與之前的相同。這樣我們就會得到一張圖。
上圖是101010所得到的的圖。這其中的所有循環(huán),就是我們要找的非沖突的載入序列。也就是我們算法最后需要的結(jié)果。
三、代碼實(shí)現(xiàn)
由1、2可以得知,算法的整體思路可以按如下幾步:
第一,輸入預(yù)約表;
第二,得到初始沖突向量;
第三,生成狀態(tài)圖;
第四,找到所有循環(huán)。
其中,第三和第四步可以轉(zhuǎn)化為如下問題:生成一張有向圖,記錄其中所有循環(huán);但是我的算法可以按另一種形式描述:生成一棵樹,當(dāng)根節(jié)點(diǎn)對應(yīng)的葉子節(jié)點(diǎn)與之前的節(jié)點(diǎn)相同是,記錄之前的節(jié)點(diǎn)到根節(jié)點(diǎn)的路徑。
以下是我的代碼。
1、輸入函數(shù)
為便于輸入,選擇從外部文件讀取預(yù)約表,函數(shù)如下:
int readTable(int a,int b)
{
FILE*fp = NULL;//需要注意
fp = fopen(F_PATH, "r");
if (NULL == fp) return -1;//要返回錯誤代碼
for (int i = 0; i
< b; j++)
{
int tmp;
fscanf(fp,"%d", &tmp);
ResTable[i][j] = tmp;
}
fclose(fp);
fp = NULL;//需要指向空,否則會指向原打開文件地址
return 0;
}
使用fopen讀入文件流,用fp儲存文件流,然后生成預(yù)約表對應(yīng)的二維數(shù)組即可。
2、得到初始沖突向量
為得到?jīng)_突向量,首先要得到禁止向量。由于禁止向量中沒有重復(fù)元素,選擇使用set保存。然后遍歷各行,計(jì)算出預(yù)約表中不為零的元素對應(yīng)的列坐標(biāo)的差的絕對值,保存在dis中。
然后遍歷dis,使用string生成沖突向量,dis中的值作為string中元素下標(biāo),這些下標(biāo)對應(yīng)的值為1,其余為0。從而生成初始沖突向量。
void Create_Initial_Conflict_Vector(int a,int b)
{
set
dis;//不為零之間的距離
for (int i = 0; i
tmp;
for (int j = 0; j < b; j++)
{
if (ResTable[i][j] != 0)
{
tmp.push_back(j);
}
for (int m = 0; m < tmp.size(); m++)
{
for (int n = m + 1; n < tmp.size(); n++)
{
dis.insert(abs(tmp[m] - tmp[n]));
}
}
}
}
//printSet(dis);
for (int i = 1; i < b; i++)
{
if (dis.find(i) != dis.end())
Initial_Conflict_Vector = '1' + Initial_Conflict_Vector;
else
Initial_Conflict_Vector = '0' + Initial_Conflict_Vector;
}
}
3、生成狀態(tài)圖并找出所有循環(huán)
重頭戲,算法的精髓就在這里。我選擇使用遞歸生成狀態(tài)圖并保存循環(huán)。
首先需要寫兩個工具函數(shù),string右移以及string按位取并。如下:
string StringRightMove(string s, int k)
{
while (k != 0)
{
s = '0' + s;
s.pop_back();
k--;
}
return s;
}
string StringAnd(string s1, string s2)
{
for (int i = 0; i < s1.size(); i++)
{
if (s1[i] == '0'&&s2[i] == '0')
s1[i] = '0';
else
s1[i] = '1';
}
return s1;
}
然后開始寫遞歸函數(shù)。代碼如下:
void Find_real_Circle(string ConVector, unordered_set
CVTable, vector
Now_Start_Cycle, int len)
{
if (CVTable.find(ConVector) != CVTable.end())
{
StartCycle.push_back(Now_Start_Cycle);
unordered_set
::iterator it = CVTable.begin();
int numhead = 0;
while (*it != ConVector)
{
it++;
numhead++;
}
vector
RealCycle(Now_Start_Cycle.begin() + numhead, Now_Start_Cycle.end());
real_StartCycle.push_back(RealCycle);
//CVTable.erase(ConVector);
return;
}
else
{
CVTable.insert(ConVector);
int num_1 = 0;
for (int i = len - 1; i >= 0; i--)
{
if (ConVector[i] == '0')
{
string tmpVector, tmpResult;
tmpVector = StringRightMove(ConVector, len - i);
tmpResult = StringAnd(tmpVector, Initial_Conflict_Vector);
Now_Start_Cycle.push_back(len - i);
Find_real_Circle(tmpResult, CVTable, Now_Start_Cycle, len);
Now_Start_Cycle.pop_back();
}
else
num_1++;
}
Now_Start_Cycle.push_back(len + 1);
//VectorPrint(Now_Start_Cycle);
real_StartCycle.push_back(Now_Start_Cycle);
StartCycle.push_back(Now_Start_Cycle);
return;
}
}
參數(shù)如下:
string ConVector:上一任務(wù)的沖突向量
unordered_set
?vector
int len:沖突向量長度
首先,要設(shè)計(jì)遞歸出口:什么時候退出遞歸?當(dāng)然是右移取并得到的結(jié)果發(fā)現(xiàn)之前已經(jīng)得到過,這時候退出遞歸,即找到了一個循環(huán)。如何發(fā)現(xiàn)本次得到的結(jié)果之前已經(jīng)得到過?用set可以,但是考慮到存在如下的循環(huán):
42222222,循環(huán)體為2,但是要先經(jīng)過4才能開始循環(huán)。因此只用set只能“發(fā)現(xiàn)循環(huán)”,而不能“發(fā)現(xiàn)循環(huán)體”。這不好,unordered_set更好:一旦我在unordered_set中發(fā)現(xiàn)結(jié)果,由于unordered_set相比于set保留了壓入時的順序,通過定位到結(jié)果的位置,那么結(jié)果到unordered_set結(jié)尾便是循環(huán)體。
使用全局變量vector
然后設(shè)計(jì)遞歸體。對于遞歸體,我們要對ConVector的所有為0的位都照顧到,即所有為0的位都要右移然后取并得到新的沖突向量。另外,進(jìn)入遞歸體說明還沒找到循環(huán),那么當(dāng)前的沖突向量要保存在CVTable中,當(dāng)前右移的值要保存在Now_Start_Cycle中。在保存并形成新的沖突向量后,繼續(xù)調(diào)用函數(shù)開始遞歸。
在所有的0都右移之后,要將len+1保存在Now_Start_Cycle中,因?yàn)樗休d入的任務(wù)在len+1之后都必定可以開始下一個任務(wù)而不產(chǎn)生沖突。
這樣就得到了所有循環(huán)。
4、輸出結(jié)果
這就是按部就班的寫就可以了。沒什么好說的。
for (int i = 0; i < StartCycle.size(); i++)
{
cout << "初始循環(huán)為:";
for (int j = 0; j < StartCycle[i].size(); j++)
{
cout << StartCycle[i][j] << ' ';
}
cout << "循環(huán)體為:";
for (int j = 0; j < real_StartCycle[i].size(); j++)
{
cout << real_StartCycle[i][j] << ' ';
}
cout << '\n';
}
int try_num = 0;
while (try_num<1)
{
cout << "請選擇循環(huán)序號:\n";
int Cycle_x;
cin >> Cycle_x;
int Display_table[10][200] = { 0 };
int CycleNum[10] = { 0 };
int len1, len2;
len1 = StartCycle[Cycle_x].size();
len2 = real_StartCycle[Cycle_x].size();
CycleNum[0] = StartCycle[Cycle_x][0];
for (int i = 1; i < len1; i++)
CycleNum[i] = CycleNum[i - 1] + StartCycle[Cycle_x][i];
for (int i = len1; i
< b; j++)
{
int p = 1;
if (ResTable[i][j] != 0)
{
Display_table[i][j] = p;
for (int k = 0; k < len1 + len2 * 2; k++)
{
Display_table[i][j + CycleNum[k]] = p + k + 1;
}
}
}
printTable(a, CycleNum[len1 + len2 * 2 - 1], Display_table);
try_num++;
}
5、效果
6、流水線生成圖片
由于使用C++生成圖片太過困難,而利用python的matplotlib包生成圖片又很簡單。因此我不得不寫了另外一個python版本。
python版本使用了numpy和matplotlib兩個常用庫。其算法思想與C++如出一轍,唯一要注意的就是在迭代的時候的深拷貝與淺拷貝的問題,迭代中許多變量要使用深拷貝,否則會出錯,這里很難通過debug得出。
效果如下:
效果比只有0和1還是好不少的。
四、總結(jié)
在實(shí)現(xiàn)方面,調(diào)度算法的核心就在于查找循環(huán),查找循環(huán)的核心就是遞歸算法。遞歸算法寫好,整個問題便迎刃而解。
在理論方面,理解沖突向量是如何生成的是關(guān)鍵,而這需要理解沖突向量的含義,理解其含義,配合預(yù)約表中多個任務(wù)走一遍流程,就可以知道算法的原理了。
PS:代碼如下:
C++版:
#include
#include
#include
#include
#include
python版:
import matplotlib.pyplot as plt
import pylab
import pandas as pd
import copy
StartCycle=list()
RealStartCycle=list()
InitialConflictVector=str()
def Create_Initial_Conflict_Vector(a:int ,b:int ,Table:list)->str:
dis=set()
for i in range(a):
tmp=[]
for j in range(b):
if (Table[i][j]==1):
tmp.append(j)
for m in range(0,len(tmp)):
for n in range(m+1,len(tmp)):
dis.add(abs(tmp[m]-tmp[n]))
result=str()
for i in range(1,b):
if i in dis:
result='1'+result
else:
result='0'+result
return result
def String_Right_Move(s:str, k:int)->str:
while (k != 0):
s = '0' + s
s = s[:-1]
k = k-1
return s
def String_And(s1:str,s2:str):
s=str()
for i in range(len(s1)):
if s1[i] == '0'and s2[i] == '0':
s = s + '0';
else:
s = s + '1';
return s;
def Find_Real_Circle(ConVector:str, CVTable:set, CVVector:list, NowStartCycle:list, lenth:int)->None:
if ConVector in CVTable:
StartCycle.append(copy.deepcopy(NowStartCycle))
numhead=0
while(ConVector!=CVVector[numhead]):
numhead=numhead+1
RealStartCycle.append(copy.deepcopy(NowStartCycle[numhead:len(NowStartCycle)]))
CVTable.remove(ConVector)
CVVector.pop()
return
else:
CVTable.add(copy.deepcopy(ConVector))
CVVector.append(copy.deepcopy(ConVector))
for i in range((lenth-1),-1,-1):
if (ConVector[i]=='0'):
TmpVector = String_Right_Move(ConVector,lenth-i)
TmpResult = String_And(TmpVector,ConVector)
NowStartCycle.append(lenth-i)
Find_Real_Circle(TmpResult, copy.deepcopy(CVTable), copy.deepcopy(CVVector), NowStartCycle, lenth)
NowStartCycle.pop()
NowStartCycle.append(lenth+1)
RealStartCycle.append(copy.deepcopy(NowStartCycle))
StartCycle.append(copy.deepcopy(NowStartCycle))
NowStartCycle.pop()
return
def Create_Cycle_Using_List(Cycle1:list,Cycle2:list,kth:int)->list:
CycleUsingList=list()
CycleUsingList.append(Cycle1[kth][0])
for i in range(1,len(Cycle1[kth])):
CycleUsingList.append(CycleUsingList[i-1]+Cycle1[kth][i])
for i in range(len(Cycle2[kth])):
CycleUsingList.append(CycleUsingList[len(Cycle1[kth])+i-1]+Cycle2[kth][i])
for i in range(len(Cycle2[kth])):
CycleUsingList.append(CycleUsingList[len(Cycle1[kth])+len(Cycle2[kth])+i-1]+Cycle2[kth][i])
for i in range(len(Cycle2[kth])):
CycleUsingList.append(CycleUsingList[len(Cycle1[kth])+len(Cycle2[kth])*2+i-1]+Cycle2[kth][i])
return CycleUsingList
def Create_Matrix(InitialMatrix:list,CycleList:list,a:int)->list:
Resultlist=[[0 for col in range(CycleList[len(CycleList)-1]+len(InitialMatrix[0]))] for row in range(a)]
TmpList=[0]*len(CycleList)
for i in range(a):
for j in range(len(InitialMatrix[0])):
if InitialMatrix[i][j]==1:
Resultlist[i][j]=1
for k in range(1,len(CycleList)):
Resultlist[i][j+CycleList[k]]=1+k
return Resultlist
#預(yù)約表
ResTable_DataFrame=pd.read_csv(r'D:\LeetCode\Nonlinear Pipelining Python\file.csv',header=None)
ResTable_list=ResTable_DataFrame.values.tolist()
ResTable=tuple(ResTable_list)
a=int(input("請輸入流水線的功能部件數(shù)量:"))
b=int(input("請輸入流水段數(shù)量:"))
InitialConflictVector=Create_Initial_Conflict_Vector(a,b,ResTable)
s1="初始沖突向量為: %s"%(InitialConflictVector)
print(s1)
RealCVTable=set()
RealCVVector=list()
RealNowStartCycle=list()
Find_Real_Circle(InitialConflictVector,RealCVTable,RealCVVector,RealNowStartCycle,b-1)
for i in range(len(StartCycle)):
s2 = "進(jìn)入循環(huán)的序列為: %s 循環(huán)體為: %s" %(StartCycle[i],RealStartCycle[i])
print(s2)
for i in range(3):
n=int(input("請選擇查看第幾個循環(huán):"))
CycleUsingList=Create_Cycle_Using_List(StartCycle,RealStartCycle,n)
CycleUsingMatrix=Create_Matrix(ResTable_list,CycleUsingList,a)
plt.imshow(CycleUsingMatrix, interpolation='nearest')
pylab.show()
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
