外貿(mào)公司企業(yè)網(wǎng)站公司網(wǎng)絡(luò)推廣網(wǎng)站
MIT 6.830數(shù)據(jù)庫系統(tǒng) -- lab two
- 項(xiàng)目拉取
- Lab Two
- 實(shí)現(xiàn)提示
- 練習(xí)一 -- Filter and Join
- 練習(xí)二 -- Aggregates
- 練習(xí)三 -- HeapFile Mutability
- 練習(xí)四 -- Insertion & deletion
- 練習(xí)五 -- Page eviction
- 練習(xí)六 -- Query walkthrough
- 練習(xí)七 - 查詢解析
項(xiàng)目拉取
原項(xiàng)目使用ant進(jìn)行項(xiàng)目構(gòu)建,我已經(jīng)更改為Maven構(gòu)建,大家直接拉取我改好后的項(xiàng)目即可:
- https://gitee.com/DaHuYuXiXi/simple-db-hw-2021
然后就是正常的maven項(xiàng)目配置,啟動(dòng)即可。各個(gè)lab的實(shí)現(xiàn),會放在lab/分支下。
Lab Two
lab2必須在lab1提交的代碼基礎(chǔ)上進(jìn)行開發(fā),否則無法完成相應(yīng)的練習(xí)。此外,實(shí)驗(yàn)還提供了源碼中不存在的額外測試文件。
實(shí)現(xiàn)提示
開始編寫代碼之前,強(qiáng)烈建議通讀整篇文檔,以對SimpleDB的設(shè)計(jì)有個(gè)整體的認(rèn)識,對于我們編寫代碼非常有幫助
建議跟著文檔的練習(xí)來實(shí)現(xiàn)對應(yīng)的代碼,每個(gè)練習(xí)都標(biāo)明了要實(shí)現(xiàn)哪個(gè)類以及通過哪些單元測試,跟著練習(xí)走即可。
下面是本實(shí)驗(yàn)的大致流程:
- 實(shí)現(xiàn)Filter和Join操作并且通過相關(guān)的單元測試驗(yàn)證你的實(shí)現(xiàn),閱讀類的Javadoc將會幫助我們實(shí)現(xiàn)。項(xiàng)目中已經(jīng)提供了Project和OrderBy操作的實(shí)現(xiàn),閱讀其代碼能夠幫助我們理解其他操作是如何實(shí)現(xiàn)的
- 實(shí)現(xiàn)IntegerAggregator和StringAggregator,你將會編寫對元組的某一特定列分組進(jìn)行聚合操作;其中integer支持求和、求最大最小值、求數(shù)量、求平均值,string只支持count聚合操作
- 實(shí)現(xiàn)Aggregate操作;同其他操作一樣,聚合操作也實(shí)現(xiàn)類OpIterator接口。注意每次調(diào)用next()的Aggregate操作的輸出是整個(gè)分組的聚合值,Aggregate構(gòu)造函數(shù)將會設(shè)置聚合和分組操作對應(yīng)的列
- 實(shí)現(xiàn)BufferPool類中的插入、刪除和頁面丟棄策略,暫時(shí)不需要關(guān)心事務(wù)
- 實(shí)現(xiàn)Insert和Delete操作;與所有的操作相似,Insert和Delete實(shí)現(xiàn)OpIterator接口,接收用于插入或者刪除的元組并輸出該操作影響的元組個(gè)數(shù);這些操作將會調(diào)用BufferPool中合適的方法用于修改磁盤上的頁
注意SimpleDB沒有實(shí)現(xiàn)一致性和完整性檢查,所以它可能會插入重復(fù)的記錄,并且沒有方法保證主鍵或外鍵的一致性。
在本節(jié)實(shí)現(xiàn)的基礎(chǔ)上,我們需要使用項(xiàng)目提供的SQL解析器去運(yùn)行SQL語句查詢。
最后,你可能會發(fā)現(xiàn)本實(shí)驗(yàn)的操作擴(kuò)展Operator類而不是實(shí)現(xiàn)OpIterator接口。因?yàn)閚ext/hasNext的實(shí)現(xiàn)總是重復(fù)的、煩人的,Operator實(shí)現(xiàn)了通用的邏輯操作,并且僅需要實(shí)現(xiàn)readNext方法。可以隨意使用這種風(fēng)格,或者使用OpIterator。如果要實(shí)現(xiàn)OpIterator接口,請移除extends Operator,并替換為implements OpIterator。
練習(xí)一 – Filter and Join
Filter and Join:
本節(jié)將會實(shí)現(xiàn)比掃描整張表更有趣的操作:
- Filter:該操作僅返回滿足(構(gòu)造時(shí)指定的)Predicate操作的元組;因此,它會過濾那些不符合操作的元組
- Join:該操作將會通過(構(gòu)造時(shí)指定的)JoinPredicate聯(lián)合兩個(gè)表的元組,Join操作僅需實(shí)現(xiàn)一個(gè)簡單的嵌套循環(huán)連接
實(shí)現(xiàn)如下類中的方法:
- src/java/simpledb/execution/Predicate.java
- src/java/simpledb/execution/JoinPredicate.java
- src/java/simpledb/execution/Filter.java
- src/java/simpledb/execution/Join.java
Predict和JoinPredict分別負(fù)責(zé)普通的斷言和Join斷言的操作:
Predict類核心源碼如下:
/*** Predicate compares tuples to a specified Field value.* 比較元組某個(gè)特定的字段--> select * from t where t.a=1;*/
public class Predicate implements Serializable {private static final long serialVersionUID = 1L;/*** 待比較字段*/private final int field;/*** 操作碼*/private final Op op;/*** 操作數(shù)*/private final Field operand;/*** Constants used for return codes in Field.compare*/public enum Op implements Serializable {EQUALS, GREATER_THAN, LESS_THAN, LESS_THAN_OR_EQ, GREATER_THAN_OR_EQ, LIKE, NOT_EQUALS;/*** Interface to access operations by integer value for command-line* convenience.*/public static Op getOp(int i) {return values()[i];}public String toString() {if (this == EQUALS)return "=";if (this == GREATER_THAN)return ">";if (this == LESS_THAN)return "<";if (this == LESS_THAN_OR_EQ)return "<=";if (this == GREATER_THAN_OR_EQ)return ">=";if (this == LIKE)return "LIKE";if (this == NOT_EQUALS)return "<>";throw new IllegalStateException("impossible to reach here");}}public Predicate(int field, Op op, Field operand) {this.field = field;this.op = op;this.operand = operand;}/*** Compares the field number of t specified in the constructor to the* operand field specified in the constructor using the operator specific in* the constructor. The comparison can be made through Field's compare* method.*/public boolean filter(Tuple t) {if (t == null) {return false;}Field f = t.getField(this.field);return f.compare(this.op, this.operand);}. . .
}
JoinPredict類核心源碼如下:
/*** JoinPredicate compares fields of two tuples using a predicate. JoinPredicate* is most likely used by the Join operator.* 用于JOIN連接斷言的兩個(gè)元組的某個(gè)字段 --> select * from t1 join t2 on t1.id=t2.id;*/
public class JoinPredicate implements Serializable {private static final long serialVersionUID = 1L;/*** 字段1*/private final int field1;/*** 操作碼*/private final Predicate.Op op;/*** 字段2*/private final int field2;public JoinPredicate(int field1, Predicate.Op op, int field2) {this.field1 = field1;this.op = op;this.field2 = field2;}/*** Apply the predicate to the two specified tuples. The comparison can be* made through Field's compare method.*/public boolean filter(Tuple t1, Tuple t2) {if (t1 == null) {return false;}if (t2 == null) {return false;}Field first = t1.getField(field1);Field second = t2.getField(field2);return first.compare(this.op, second);}...
}
OpIterator意為可操作迭代器,在SimpleDB中的含義為: 迭代器遍歷元素的時(shí)候可以同時(shí)進(jìn)行一些操作,具體遍歷時(shí)執(zhí)行什么操作由子類決定。
操作迭代器意味著迭代器自身在遍歷數(shù)據(jù)時(shí),會根據(jù)自身實(shí)現(xiàn)搞點(diǎn)事情,Operator接口模板化了部分流程,各個(gè)需要在迭代器遍歷時(shí)進(jìn)行操作的子類,只需要去實(shí)現(xiàn)readNext這個(gè)核心方法,并且每次獲取下一個(gè)元組的時(shí)候,搞點(diǎn)事情即可。
這里不是說子類只需要去實(shí)現(xiàn)readNext方法,而是說readNext是子類需要實(shí)現(xiàn)的核心方法,其他均為輔助方法。
Operator類的核心源碼如下:
/*** Abstract class for implementing operators. It handles close, next and hasNext. Subclasses only need to implement open and readNext.*/
public abstract class Operator implements OpIterator {public boolean hasNext() throws DbException, TransactionAbortedException {if (!this.open)throw new IllegalStateException("Operator not yet open");if (next == null)next = fetchNext();return next != null;}public Tuple next() throws DbException, TransactionAbortedException,NoSuchElementException {if (next == null) {next = fetchNext();if (next == null)throw new NoSuchElementException();}Tuple result = next;next = null;return result;}protected abstract Tuple fetchNext() throws DbException,TransactionAbortedException;/*** Closes this iterator. If overridden by a subclass, they should call* super.close() in order for Operator's internal state to be consistent.*/public void close() {// Ensures that a future call to next() will failnext = null;this.open = false;}private Tuple next = null;private boolean open = false;public void open() throws DbException, TransactionAbortedException {this.open = true;}/*** @return return the children DbIterators of this operator. If there is* only one child, return an array of only one element. For join* operators, the order of the children is not important. But they* should be consistent among multiple calls.* */public abstract OpIterator[] getChildren();/*** Set the children(child) of this operator. If the operator has only one* child, children[0] should be used. If the operator is a join, children[0]* and children[1] should be used.* */public abstract void setChildren(OpIterator[] children);/*** @return return the TupleDesc of the output tuples of this operator* */public abstract TupleDesc getTupleDesc();...
}
- 迭代器調(diào)用約定: 先調(diào)用hasNext判斷是否還有下一個(gè)元素,如果有調(diào)用next獲取下一個(gè)元素,并且調(diào)用hashNext前需要先調(diào)用Open。
- OpIterator操作迭代器分為兩部分,一部分是原始職能: 提供數(shù)據(jù)進(jìn)行迭代遍歷; 另一部分是附加職能: 在原始迭代器遍歷過程中進(jìn)行操作。
- Operator采用裝飾器模式封裝原始迭代器遍歷行為,并在其基礎(chǔ)上增加了遍歷時(shí)進(jìn)行操作的行為。
- 裝飾器模式需要有被裝飾的對象,這里通過setChildren進(jìn)行設(shè)置,但是這里與普通的裝飾器模式不同,因?yàn)椴煌牟僮鲿婕暗讲煌膫€(gè)數(shù)的被裝飾對象。
- select * from t where t.age=18 --> 此處實(shí)際使用了單值比較過濾操作,所以只涉及單表的迭代器
- select * from t1 join t2 on t1.age=t2.age --> 此時(shí)實(shí)際使用了兩表JOIN操作,所以涉及兩個(gè)表的迭代器
SimpleDB整個(gè)迭代器的設(shè)計(jì)思路采用了裝飾器模式實(shí)現(xiàn),具體如下圖所示:
- Operator的實(shí)現(xiàn)類都是裝飾器,而SeqScan是迭代器的實(shí)現(xiàn),也就是被裝飾的對象
迭代器模式的用法可以參考Java IO整體架構(gòu)實(shí)現(xiàn)。
Filter用于單值比較操作,具體流程如下:
圖中只展示的一層裝飾,如果存在多層裝飾,那么child仍然是個(gè)裝飾器,可以利用多層裝飾實(shí)現(xiàn)如: select * from t where t.age=18 and t.name="dhy"的匹配過濾。
Filter核心源碼如下:
/*** Filter is an operator that implements a relational select.*/
public class Filter extends Operator {private final Predicate predicate;private OpIterator child;/*** Constructor accepts a predicate to apply and a child operator to read* tuples to filter from.*/public Filter(Predicate p, OpIterator child) {this.predicate = p;this.child = child;}public void open() throws DbException, NoSuchElementException,TransactionAbortedException {super.open();this.child.open();}public void close() {this.child.close();super.close();}public void rewind() throws DbException, TransactionAbortedException {this.child.rewind();}/*** AbstractDbIterator.readNext implementation. Iterates over tuples from the* child operator, applying the predicate to them and returning those that* pass the predicate (i.e. for which the Predicate.filter() returns true.)* */protected Tuple fetchNext() throws NoSuchElementException,TransactionAbortedException, DbException {while (this.child.hasNext()) {Tuple tuple = this.child.next();if (this.predicate.filter(tuple)) {return tuple;}}return null;}@Overridepublic OpIterator[] getChildren() {return new OpIterator[] {this.child};}@Overridepublic void setChildren(OpIterator[] children) {this.child = children[0];}...
}
Join用于連接條件判斷,流程如下:
Join的核心源碼如下:
/*** The Join operator implements the relational join operation.*/
public class Join extends Operator {private static final long serialVersionUID = 1L;/*** 連接條件*/private final JoinPredicate predicate;/*** 參與連接的表*/private OpIterator[] children;private Tuple tuple1;/*** Constructor. Accepts two children to join and the predicate to join them* on* @param p The predicate to use to join the children* @param child1 Iterator for the left(outer) relation to join* @param child2 Iterator for the right(inner) relation to join*/public Join(JoinPredicate p, OpIterator child1, OpIterator child2) {this.predicate = p;this.children = new OpIterator[]{child1, child2};this.tuple1 = null;}/*** 返回的是兩表連接后得到結(jié)果的行schema*/public TupleDesc getTupleDesc() {return TupleDesc.merge(this.children[0].getTupleDesc(), this.children[1].getTupleDesc());}public void open() throws DbException, NoSuchElementException,TransactionAbortedException {for (OpIterator child : this.children) {child.open();}super.open();}public void close() {for (OpIterator child : this.children) {child.close();}super.close();}public void rewind() throws DbException, TransactionAbortedException {for (OpIterator child : this.children) {child.rewind();}}/*** Returns the next tuple generated by the join, or null if there are no* more tuples. Logically, this is the next tuple in r1 cross r2 that* satisfies the join predicate. There are many possible implementations;* the simplest is a nested loops join.* <p>* Note that the tuples returned from this particular implementation of Join* are simply the concatenation of joining tuples from the left and right* relation. Therefore, if an equality predicate is used there will be two* copies of the join attribute in the results. (Removing such duplicate* columns can be done with an additional projection operator if needed.)* <p>* For example, if one tuple is {1,2,3} and the other tuple is {1,5,6},* joined on equality of the first column, then this returns {1,2,3,1,5,6}.** @return The next matching tuple.* @see JoinPredicate#filter*/protected Tuple fetchNext() throws TransactionAbortedException, DbException {// 雙重循環(huán),將children[0]作為驅(qū)動(dòng)表,children[1]作為被驅(qū)動(dòng)表while (this.children[0].hasNext() || tuple1 != null) {// 獲取驅(qū)動(dòng)表的一行記錄if (this.children[0].hasNext() && tuple1 == null) {tuple1 = this.children[0].next();}// 獲取被驅(qū)動(dòng)表的一行記錄while (this.children[1].hasNext()) {Tuple tuple2 = this.children[1].next();// JoinPredicate判斷join條件是否成立if (this.predicate.filter(tuple1, tuple2)) {// 獲取驅(qū)動(dòng)表schema和被驅(qū)動(dòng)表schema合并后的schemaTupleDesc tupleDesc = getTupleDesc();// 用于承載合并后的行記錄Tuple res = new Tuple(tupleDesc);int i = 0;// 拿到驅(qū)動(dòng)表當(dāng)前行的所有字段,然后設(shè)置到resIterator<Field> fields1 = tuple1.fields();while (fields1.hasNext() && i < tupleDesc.numFields()) {res.setField(i++, fields1.next());}// 拿到被驅(qū)動(dòng)表當(dāng)前行的所有字段,然后設(shè)置到resIterator<Field> fields2 = tuple2.fields();while (fields2.hasNext() && i < tupleDesc.numFields()) {res.setField(i++, fields2.next());}// 被驅(qū)動(dòng)表遍歷完了,重置指針,同時(shí)將tuple1也重置if (!this.children[1].hasNext()) {this.children[1].rewind();tuple1 = null;}// 返回?fù)频降挠涗?/span>return res;}}// 驅(qū)動(dòng)表當(dāng)前行在被驅(qū)動(dòng)表中沒有匹配行,那么將被驅(qū)動(dòng)表迭代指針復(fù)原this.children[1].rewind();tuple1 = null;}// 沒有匹配記錄return null;}@Overridepublic OpIterator[] getChildren() {return this.children;}@Overridepublic void setChildren(OpIterator[] children) {this.children=children;}}
關(guān)于tuple1屬性作用說明:
- 我們從驅(qū)動(dòng)表中獲取一條記錄后,需要遍歷被驅(qū)動(dòng)表,在被驅(qū)動(dòng)表中找出所有符合連接條件的行,然后拼接兩表字段,然后返回結(jié)果
- fetchNext方法每調(diào)用一次,都會返回符合條件的一行記錄,因此我們需要保留驅(qū)動(dòng)表當(dāng)前正在匹配的行,等到某一次fetchNext方法調(diào)用時(shí),發(fā)現(xiàn)當(dāng)前行與被驅(qū)動(dòng)表每一行都進(jìn)行了一次匹配后,才會從驅(qū)動(dòng)表中取出下一行進(jìn)行匹配。
練習(xí)二 – Aggregates
Aggregates:
本節(jié)我們應(yīng)該實(shí)現(xiàn)如下五種聚合操作:count、sum、avg、min、max,并且支持分組聚合操作。僅支持對一個(gè)域進(jìn)行聚合,對一個(gè)域進(jìn)行分組即可。
為了實(shí)現(xiàn)聚合操作,我們使用Aggregator接口將新的元組合并到現(xiàn)有的聚合操作結(jié)果中。實(shí)際進(jìn)行哪種聚合操作會在構(gòu)造Aggregate時(shí)指明。所以,客戶端代碼需要為子操作的每個(gè)元組調(diào)用Aggregator.mergeTupleIntoGroup()方法,當(dāng)所有的元組都被合并完成以后,客戶端將會獲得聚合操作的結(jié)果。
- 如果指定分組的話,那么返回結(jié)果格式為: (groupValue, aggregateValue);
- 沒有指定分組的話,返回格式為:(aggregateValue)
本節(jié)實(shí)驗(yàn)中,我們不需要擔(dān)心分組的數(shù)量超過可用內(nèi)存的限制。
實(shí)現(xiàn)如下類中的方法:
- src/java/simpledb/execution/IntegerAggregator.java
- src/java/simpledb/execution/StringAggregator.java
- src/java/simpledb/execution/Aggregate.java
Aggregator聚合器干的事情就是接收傳入的Tuple,然后內(nèi)部進(jìn)行計(jì)算,當(dāng)我們傳入n個(gè)tuple后,我們可以調(diào)用聚合器的迭代器,獲取當(dāng)前聚合的結(jié)果:
上面給出的是不涉及分組的聚合操作,如果涉及分組的話,聚合過程如下圖所示:
Aggregator聚合器接口定義如下:
/*** The common interface for any class that can compute an aggregate over a* list of Tuples.*/
public interface Aggregator extends Serializable {int NO_GROUPING = -1;/*** SUM_COUNT and SC_AVG will* only be used in lab7, you are not required* to implement them until then.* */enum Op implements Serializable {MIN, MAX, SUM, AVG, COUNT,/*** SUM_COUNT: compute sum and count simultaneously, will be* needed to compute distributed avg in lab7.* */SUM_COUNT,/*** SC_AVG: compute the avg of a set of SUM_COUNT tuples,* will be used to compute distributed avg in lab7.* */SC_AVG;...}/*** Merge a new tuple into the aggregate for a distinct group value;* creates a new group aggregate result if the group value has not yet* been encountered.** @param tup the Tuple containing an aggregate field and a group-by field*/void mergeTupleIntoGroup(Tuple tup);/*** Create a OpIterator over group aggregate results.* @see TupleIterator for a possible helper*/OpIterator iterator();}
對于不同類型字段的聚合有對應(yīng)限制,比如: 字符串只支持COUNT統(tǒng)計(jì)個(gè)數(shù)聚合,不支持例如SUM,AVG等聚合操作。因此針對不兼容的類型,我們需要給出不同的聚合器實(shí)現(xiàn):
- 首先來看比較簡單的StringAggregator字符串聚合器,其只支持對COUNT聚合的操作
/*** Knows how to compute some aggregate over a set of StringFields.*/
public class StringAggregator implements Aggregator {private static final IntField NO_GROUP = new IntField(-1);/*** 用于分組*/private int gbfield;private Type gbfieldtype;/*** 用于聚合*/private int afield;private Op what;/*** 存放結(jié)果-- 分組聚合返回的是多組鍵值對,分別代表分組字段不同值對應(yīng)的聚合結(jié)果* 非分組聚合只會返回一個(gè)聚合結(jié)果,這里為了統(tǒng)一化處理,采用NO_GROUP做標(biāo)記,進(jìn)行區(qū)分*/private Map<Field, Tuple> tupleMap;private TupleDesc desc;/*** Aggregate constructor** @param gbfield the 0-based index of the group-by field in the tuple, or NO_GROUPING if there is no grouping* @param gbfieldtype the type of the group by field (e.g., Type.INT_TYPE), or null if there is no grouping* @param afield the 0-based index of the aggregate field in the tuple* @param what aggregation operator to use -- only supports COUNT* @throws IllegalArgumentException if what != COUNT*/public StringAggregator(int gbfield, Type gbfieldtype, int afield, Op what) {//字符串只支持COUNT聚合操作if (!what.equals(Op.COUNT)) {throw new IllegalArgumentException();}this.gbfield = gbfield;this.gbfieldtype = gbfieldtype;this.afield = afield;this.what = what;this.tupleMap = new ConcurrentHashMap<>();//非分組聚合返回的結(jié)果采用占位符進(jìn)行統(tǒng)一適配if (gbfield == NO_GROUPING) {this.desc = new TupleDesc(new Type[]{Type.INT_TYPE}, new String[]{"aggregateValue"});Tuple tuple = new Tuple(desc);tuple.setField(0, new IntField(0));this.tupleMap.put(NO_GROUP, tuple);} else {//分組聚合返回結(jié)果Schema由兩個(gè)字段組成: 分組字段和聚合結(jié)果this.desc = new TupleDesc(new Type[]{gbfieldtype, Type.INT_TYPE}, new String[]{"groupValue", "aggregateValue"});}}/*** Merge a new tuple into the aggregate, grouping as indicated in the constructor** @param tup the Tuple containing an aggregate field and a group-by field*/public void mergeTupleIntoGroup(Tuple tup) {//只支持COUNT聚合if (this.gbfield == NO_GROUPING) {Tuple tuple = tupleMap.get(NO_GROUP);IntField field = (IntField) tuple.getField(0);tuple.setField(0, new IntField(field.getValue() + 1));tupleMap.put(NO_GROUP, tuple);} else {Field field = tup.getField(gbfield);if (!tupleMap.containsKey(field)) {Tuple tuple = new Tuple(this.desc);tuple.setField(0, field);tuple.setField(1, new IntField(1));tupleMap.put(field, tuple);} else {Tuple tuple = tupleMap.get(field);IntField intField = (IntField) tuple.getField(1);tuple.setField(1, new IntField(intField.getValue() + 1));tupleMap.put(field, tuple);}}}/*** Create a OpIterator over group aggregate results.*/public OpIterator iterator() {return new StringIterator(this);}public class StringIterator implements OpIterator {private StringAggregator aggregator;private Iterator<Tuple> iterator;public StringIterator(StringAggregator aggregator) {this.aggregator = aggregator;this.iterator = null;}@Overridepublic void open() throws DbException, TransactionAbortedException {this.iterator = aggregator.tupleMap.values().iterator();}@Overridepublic boolean hasNext() throws DbException, TransactionAbortedException {return iterator.hasNext();}@Overridepublic Tuple next() throws DbException, TransactionAbortedException, NoSuchElementException {return iterator.next();}@Overridepublic void rewind() throws DbException, TransactionAbortedException {iterator = aggregator.tupleMap.values().iterator();}@Overridepublic TupleDesc getTupleDesc() {return aggregator.desc;}@Overridepublic void close() {iterator = null;}}...
}
- 其次來看稍微比較復(fù)雜的IntegerAggregator整數(shù)聚合器,其支持Op枚舉中所有聚合操作
/*** Knows how to compute some aggregate over a set of IntFields.* <p/>* 針對int字段進(jìn)行聚合操作,聚合得到的結(jié)果需要是個(gè)整數(shù)*/
public class IntegerAggregator implements Aggregator {private static final long serialVersionUID = 1L;private static final Field NO_GROUP = new IntField(-1);/*** 用于分組*/private int gbfield;private Type gbfieldType;/*** 用于聚合*/private int afield;private Op what;/*** 存放結(jié)果*/private TupleDesc tupleDesc;private Map<Field, Tuple> aggregate;/*** 用于非分組情況下的聚合操作*/private int counts;private int summary;/*** 用于分組情況下的聚合操作*/private Map<Field, Integer> countsMap;private Map<Field, Integer> sumMap;/*** Aggregate constructor** @param gbfield the 0-based index of the group-by field in the tuple, or* NO_GROUPING if there is no grouping* @param gbfieldtype the type of the group by field (e.g., Type.INT_TYPE), or null* if there is no grouping* @param afield the 0-based index of the aggregate field in the tuple* @param what the aggregation operator*/public IntegerAggregator(int gbfield, Type gbfieldtype, int afield, Op what) {//分組字段this.gbfield = gbfield;//分組字段類型this.gbfieldType = gbfieldtype;//聚合得到的結(jié)果,在聚合返回結(jié)果行中的字段下標(biāo)this.afield = afield;//進(jìn)行什么樣的聚合操作this.what = what;//存放聚合結(jié)果this.aggregate = new ConcurrentHashMap<>();// 非分組聚合if (gbfield == NO_GROUPING) {this.tupleDesc = new TupleDesc(new Type[]{Type.INT_TYPE}, new String[]{"aggregateValue"});Tuple tuple = new Tuple(tupleDesc);//占位符this.aggregate.put(NO_GROUP, tuple);} else {// 分組聚合,那么返回的聚合結(jié)果行由分組字段和該分組字段的聚合結(jié)果值組成this.tupleDesc = new TupleDesc(new Type[]{gbfieldtype, Type.INT_TYPE}, new String[]{"groupValue", "aggregateValue"});}// 如果聚合操作是AVG,那么需要初始化count和summary變量,用于存放AVG聚合中間計(jì)算狀態(tài)if (gbfield == NO_GROUPING && what.equals(Op.AVG)) {this.counts = 0;this.summary = 0;} else if (gbfield != NO_GROUPING && what.equals(Op.AVG)) {this.countsMap = new ConcurrentHashMap<>();this.sumMap = new ConcurrentHashMap<>();}}/*** Merge a new tuple into the aggregate, grouping as indicated in the* constructor* <p>* 向整數(shù)聚合器中添加一行記錄,進(jìn)行分組計(jì)算* @param tup the Tuple containing an aggregate field and a group-by field*/public void mergeTupleIntoGroup(Tuple tup) {// 從傳遞給聚合器的行記錄中取出聚合字段的值IntField operationField = (IntField) tup.getField(afield);if (operationField == null) {return;}// 非分組聚合:if (gbfield == NO_GROUPING) {// 拿到承載聚合結(jié)果的元組對象Tuple tuple = aggregate.get(NO_GROUP);IntField field = (IntField) tuple.getField(0);// 說明是進(jìn)行聚合的第一行記錄if (field == null) {// 如果聚合是統(tǒng)計(jì)個(gè)數(shù)操作if (what.equals(Op.COUNT)) {// 初值為1tuple.setField(0, new IntField(1));} else if (what.equals(Op.AVG)) {// 如果聚合是求平均值操作// 統(tǒng)計(jì)參與聚合的記錄個(gè)數(shù)counts++;// 累加每個(gè)值summary = operationField.getValue();// 如果參與聚合的行只存在一個(gè),那么平均值就是當(dāng)前行的值tuple.setField(0, operationField);} else {// 其他的情況: MIN,MAX,SUM在參與聚合的行只存在一個(gè)時(shí),聚合結(jié)果就是當(dāng)前行的值// 所以這里可以統(tǒng)一處理tuple.setField(0, operationField);}return;}// 判斷是哪種類型的聚合// 非第一行記錄switch (what) {//select MIN(age) from t;case MIN:// 聚合字段的值和當(dāng)前階段已經(jīng)保存的聚合結(jié)果進(jìn)行比較,看誰更小if (operationField.compare(Predicate.Op.LESS_THAN, field)) {tuple.setField(0, operationField);aggregate.put(NO_GROUP, tuple);}return;//select MAX(age) from t;case MAX:// 聚合字段的值和當(dāng)前階段已經(jīng)保存的聚合結(jié)果進(jìn)行比較,看誰更大if (operationField.compare(Predicate.Op.GREATER_THAN, field)) {tuple.setField(0, operationField);aggregate.put(NO_GROUP, tuple);}return;//select COUNT(age) from t;case COUNT:// 計(jì)數(shù)+1IntField count = new IntField(field.getValue() + 1);tuple.setField(0, count);aggregate.put(NO_GROUP, tuple);return;//select SUM(age) from t;case SUM:// 求和IntField sum = new IntField(field.getValue() + operationField.getValue());tuple.setField(0, sum);aggregate.put(NO_GROUP, tuple);return;//select AVG(age) from t;case AVG:// 求平均值,每次往整數(shù)聚合器塞入一條記錄時(shí),都會將記錄數(shù)和總和累加counts++;summary += operationField.getValue();IntField avg = new IntField(summary / counts);tuple.setField(0, avg);aggregate.put(NO_GROUP, tuple);return;default:return;}} else {// 分組聚合操作:// 獲取分組字段 --> group by ageField groupField = tup.getField(gbfield);// 如果聚合結(jié)果中還不包括當(dāng)前字段值,說明當(dāng)前字段是第一次出現(xiàn)// 例如: group by age --> <age=18,count=20> ,如果此次獲取的age=20,那么就是第一次出現(xiàn)的分組值if (!aggregate.containsKey(groupField)) {Tuple value = new Tuple(this.tupleDesc);value.setField(0, groupField);if (what.equals(Op.COUNT)) {value.setField(1, new IntField(1));} else if (what.equals(Op.AVG)) {countsMap.put(groupField, countsMap.getOrDefault(groupField, 0) + 1);sumMap.put(groupField, sumMap.getOrDefault(groupField, 0) + operationField.getValue());value.setField(1, operationField);} else {// 其他的情況: MIN,MAX,SUM在參與聚合的行只存在一個(gè)時(shí),結(jié)果假設(shè)當(dāng)前行的值// 所以這里可以統(tǒng)一處理value.setField(1, operationField);}aggregate.put(groupField, value);return;}// 當(dāng)前字段不是第一次出現(xiàn)的分組值Tuple tuple = aggregate.get(groupField);// 獲取本階段的聚合結(jié)果IntField field = (IntField) tuple.getField(1);switch (what) {case MIN:if (operationField.compare(Predicate.Op.LESS_THAN, field)) {tuple.setField(1, operationField);aggregate.put(groupField, tuple);}return;case MAX:if (operationField.compare(Predicate.Op.GREATER_THAN, field)) {tuple.setField(1, operationField);aggregate.put(groupField, tuple);}return;case COUNT:IntField count = new IntField(field.getValue() + 1);tuple.setField(1, count);aggregate.put(groupField, tuple);return;case SUM:IntField sum = new IntField(field.getValue() + operationField.getValue());tuple.setField(1, sum);aggregate.put(groupField, tuple);return;case AVG:countsMap.put(groupField, countsMap.getOrDefault(groupField, 0) + 1);sumMap.put(groupField, sumMap.getOrDefault(groupField, 0) + operationField.getValue());IntField avg = new IntField(sumMap.get(groupField) / countsMap.get(groupField));tuple.setField(1, avg);aggregate.put(groupField, tuple);return;default:return;}}}public TupleDesc getTupleDesc() {return tupleDesc;}/*** Create a OpIterator over group aggregate results.** @return a OpIterator whose tuples are the pair (groupVal, aggregateVal)* if using group, or a single (aggregateVal) if no grouping. The* aggregateVal is determined by the type of aggregate specified in* the constructor.*/public OpIterator iterator() {return new IntOpIterator(this);}public class IntOpIterator implements OpIterator {private Iterator<Tuple> iterator;private IntegerAggregator aggregator;public IntOpIterator(IntegerAggregator aggregator) {this.aggregator = aggregator;this.iterator = null;}@Overridepublic void open() throws DbException, TransactionAbortedException {this.iterator = aggregator.aggregate.values().iterator();}@Overridepublic boolean hasNext() throws DbException, TransactionAbortedException {return iterator.hasNext();}@Overridepublic Tuple next() throws DbException, TransactionAbortedException, NoSuchElementException {return iterator.next();}@Overridepublic void rewind() throws DbException, TransactionAbortedException {iterator = aggregator.aggregate.values().iterator();}@Overridepublic TupleDesc getTupleDesc() {return aggregator.tupleDesc;}@Overridepublic void close() {iterator = null;}}
}
完成本節(jié)練習(xí)之后,需要通過PredicateTest, JoinPredicateTest, FilterTest, JoinTest單元測試;并通過FilterTest和JoinTest系統(tǒng)測試。
練習(xí)三 – HeapFile Mutability
本節(jié)我們將實(shí)現(xiàn)修改數(shù)據(jù)庫表文件的方法,我們從單獨(dú)的頁面和文件開始,主要實(shí)現(xiàn)兩種操作:增加元組和移除元組
- 移除元組:為了移除一個(gè)元組,我們需要實(shí)現(xiàn)
deleteTuple
方法,元組包含RecordIDs
可以幫助我們找到它們存儲在哪一頁,所以定位到元組對應(yīng)的page并且正確修改page的headers信息就很簡單了 - 增加元組:
HeapFile
中的insertTuple
方法主要用于向數(shù)據(jù)庫文件添加一個(gè)元組。為了向HeapFile中添加一個(gè)新的元組,我們需要找到帶有空槽的頁,如果不存在這樣的頁,我們需要?jiǎng)?chuàng)造一個(gè)新頁并且將其添加到磁盤的文件上。我們需要確保元組的RecordID被正確更新
實(shí)現(xiàn)如下類中的方法:
- src/java/simpledb/storage/HeapPage.java
- src/java/simpledb/storage/HeapFile.java (Note that you do not necessarily need to implement writePage at this point).
為了實(shí)現(xiàn)HeapPage,在insertTuple和deleteTuple方法中你需要修改表示header的bitmap;這里將會使用到我們在實(shí)驗(yàn)一中實(shí)現(xiàn)的getNumEmptySlots()和isSlotUsed方法,markSlotUsed方法是抽象方法,并且用于填充或者清除page header的的狀態(tài)信息。
注意,insertTuple和deleteTuple方法需要通過BufferPool.getPage方法訪問頁,否則下一個(gè)實(shí)驗(yàn)中關(guān)于事務(wù)的實(shí)現(xiàn)將無法正常工作
HeapPage作為數(shù)據(jù)讀寫的最小單位,主要負(fù)責(zé)維護(hù)Page數(shù)據(jù)組織格式和數(shù)據(jù)讀寫操作,其內(nèi)部屬性如下所示:
public class HeapPage implements Page {final HeapPageId pid;final TupleDesc td;final byte[] header;final Tuple[] tuples;final int numSlots;byte[] oldData;private final Byte oldDataLock = (byte) 0;// 本lab新增的兩個(gè)屬性private boolean dirty;private TransactionId tid;...
本節(jié)我們需要在HeapPage中實(shí)現(xiàn)的方法主要包括元組的插入,刪除以及臟頁標(biāo)記和判臟:
/*** Adds the specified tuple to the page; the tuple should be updated to reflect* that it is now stored on this page.** @param t The tuple to add.* @throws DbException if the page is full (no empty slots) or tupledesc* is mismatch.*/public void insertTuple(Tuple t) throws DbException {TupleDesc tupleDesc = t.getTupleDesc();if (getNumEmptySlots() == 0 || !tupleDesc.equals(this.td)) {throw new DbException("this page is full or tupledesc is mismatch");}for (int i = 0; i < numSlots; i++) {if (!isSlotUsed(i)) {markSlotUsed(i, true);t.setRecordId(new RecordId(this.pid, i));tuples[i] = t;break;}}}/*** Delete the specified tuple from the page; the corresponding header bit should be updated to reflect* that it is no longer stored on any page.** @param t The tuple to delete* @throws DbException if this tuple is not on this page, or tuple slot is* already empty.*/public void deleteTuple(Tuple t) throws DbException {RecordId recordId = t.getRecordId();int slotId = recordId.getTupleNumber();if (recordId.getPageId() != this.pid || !isSlotUsed(slotId)) {throw new DbException("tuple is not in this page");}// 將tuple對應(yīng)的slot置為0markSlotUsed(slotId, false);// 將slot對應(yīng)的tuple置為nulltuples[slotId] = null;}/*** Marks this page as dirty/not dirty and record that transaction* that did the dirtying*/public void markDirty(boolean dirty, TransactionId tid) {this.dirty = dirty;this.tid = tid;}/*** Returns the tid of the transaction that last dirtied this page, or null if the page is not dirty*/public TransactionId isDirty() {return dirty ? tid : null;}
其他輔助的工具方法大家自行查看源碼
HeapFile可以看做是表的實(shí)體對象,表由一堆HeadPage組成,這一堆HeadPage存放于當(dāng)前表的DBFile中,這里我們主要實(shí)現(xiàn)元組的插入和刪除方法:
// see DbFile.java for javadocspublic List<Page> insertTuple(TransactionId tid, Tuple t)throws DbException, IOException, TransactionAbortedException {List<Page> modified = new ArrayList<>();for (int i = 0; i < numPages(); i++) {HeapPage page = (HeapPage) bufferPool.getPage(tid, new HeapPageId(this.getId(), i), Permissions.READ_WRITE);if (page.getNumEmptySlots() == 0) {continue;}page.insertTuple(t);modified.add(page);return modified;}// 當(dāng)所有的頁都滿時(shí),我們需要?jiǎng)?chuàng)建新的頁并寫入文件中BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file, true));byte[] emptyPageData = HeapPage.createEmptyPageData();// 向文件末尾添加數(shù)據(jù)outputStream.write(emptyPageData);outputStream.close();// 加載到緩存中,使用numPages() - 1是因?yàn)榇藭r(shí)numPages()已經(jīng)變?yōu)椴迦牒蟮拇笮×?/span>HeapPage page = (HeapPage) bufferPool.getPage(tid, new HeapPageId(getId(), numPages() - 1), Permissions.READ_WRITE);page.insertTuple(t);modified.add(page);return modified;}// see DbFile.java for javadocspublic ArrayList<Page> deleteTuple(TransactionId tid, Tuple t) throws DbException,TransactionAbortedException {HeapPage page = (HeapPage) bufferPool.getPage(tid, t.getRecordId().getPageId(), Permissions.READ_WRITE);page.deleteTuple(t);ArrayList<Page> modified = new ArrayList<>();modified.add(page);return modified;}
實(shí)現(xiàn)BufferPool類中的如下方法:
- insertTuple()
- deleteTuple()
這些方法需要調(diào)用需要被修改的表的HeapFile中的合適的方法來實(shí)現(xiàn)
public void insertTuple(TransactionId tid, int tableId, Tuple t)throws DbException, IOException, TransactionAbortedException {DbFile dbFile = Database.getCatalog().getDatabaseFile(tableId);updateBufferPool(dbFile.insertTuple(tid, t), tid);}public void deleteTuple(TransactionId tid, Tuple t)throws DbException, IOException, TransactionAbortedException {DbFile dbFile = Database.getCatalog().getDatabaseFile(t.getRecordId().getPageId().getTableId());updateBufferPool(dbFile.deleteTuple(tid, t), tid);}private void updateBufferPool(List<Page> pages, TransactionId tid) throws DbException {for (Page page : pages) {page.markDirty(true, tid);}}
完成練習(xí)后,我們的代碼需要通過HeapPageWriteTest、HeapFileWriteTest和BufferPoolWriteTest單元測試
練習(xí)四 – Insertion & deletion
現(xiàn)在我們已經(jīng)實(shí)現(xiàn)了向HeapFile添加和刪除元組的機(jī)制,接下來就需要實(shí)現(xiàn)Insert和Delete操作
為了實(shí)現(xiàn)insert和delete查詢,我們需要使用Insert和Delete來修改磁盤上的頁,這些操作會返回被影響的元組數(shù)量
- Insert:該操作從他的子操作中讀取元組加入到構(gòu)造函數(shù)指定的tableid對應(yīng)的表中,需要調(diào)用BufferPool.insertTuple()方法實(shí)現(xiàn)
- Delete:該操作從構(gòu)造函數(shù)的tableid找到對應(yīng)的table,并刪除子操作中的元組,需要調(diào)用BufferPool.deleteTuple方法實(shí)現(xiàn)
實(shí)現(xiàn)如下類中的方法:
- src/java/simpledb/execution/Insert.java
- src/java/simpledb/execution/Delete.java
Insert和Delete采用的也是裝飾器模式,所以這里不再多講:
- Insert操作
/*** Inserts tuples read from the child operator into the tableId specified in the* constructor*/
public class Insert extends Operator {private static final long serialVersionUID = 1L;private final TransactionId tid;private OpIterator child;private final int tableId;private final TupleDesc tupleDesc;private Tuple insertTuple;/*** Constructor.** @param t The transaction running the insert.* @param child The child operator from which to read tuples to be inserted.* @param tableId The table in which to insert tuples.* @throws DbException if TupleDesc of child differs from table into which we are to* insert.*/public Insert(TransactionId t, OpIterator child, int tableId)throws DbException {this.tid = t;this.child = child;this.tableId = tableId;this.tupleDesc = new TupleDesc(new Type[]{Type.INT_TYPE}, new String[]{"insertNums"});this.insertTuple = null;}public TupleDesc getTupleDesc() {return this.tupleDesc;}public void open() throws DbException, TransactionAbortedException {super.open();child.open();}public void close() {super.close();child.close();}public void rewind() throws DbException, TransactionAbortedException {child.rewind();}/*** Inserts tuples read from child into the tableId specified by the* constructor. It returns a one field tuple containing the number of* inserted records. Inserts should be passed through BufferPool. An* instances of BufferPool is available via Database.getBufferPool(). Note* that insert DOES NOT need check to see if a particular tuple is a* duplicate before inserting it.** @return A 1-field tuple containing the number of inserted records, or* null if called more than once.* @see Database#getBufferPool* @see BufferPool#insertTuple*/protected Tuple fetchNext() throws TransactionAbortedException, DbException {if (insertTuple != null) {return null;}BufferPool bufferPool = Database.getBufferPool();int insertTuples = 0;while (child.hasNext()) {try {bufferPool.insertTuple(tid, tableId, child.next());insertTuples++;} catch (IOException e) {e.printStackTrace();}}//返回的是插入的元組數(shù)量insertTuple = new Tuple(this.tupleDesc);insertTuple.setField(0, new IntField(insertTuples));return insertTuple;}@Overridepublic OpIterator[] getChildren() {return new OpIterator[]{child};}@Overridepublic void setChildren(OpIterator[] children) {this.child = children[0];}
}
裝飾器模式要點(diǎn)有兩個(gè):
- 裝飾器對象繼承被裝飾對象的抽象父類或者父類接口,這樣我們才可以在使用時(shí)能夠用基類指針接收被裝飾后的對象實(shí)現(xiàn)
- 裝飾器對象內(nèi)部需要調(diào)用被裝飾對象的方法獲取原數(shù)據(jù),然后再此基礎(chǔ)上進(jìn)行計(jì)算然后返回一個(gè)結(jié)果,或者在原有數(shù)據(jù)基礎(chǔ)上增加附加信息,或者啥也不干,只進(jìn)行相關(guān)信息記錄。
- fetchNext方法這里就是Insert裝飾器對象需要實(shí)現(xiàn)的方法,其內(nèi)部調(diào)用被裝飾器對象的next方法獲取所有數(shù)據(jù),然后執(zhí)行insert操作,同時(shí)計(jì)算插入數(shù)據(jù)條數(shù),最終返回的是插入的數(shù)據(jù)條數(shù)。
- delete操作
/*** The delete operator. Delete reads tuples from its child operator and removes* them from the table they belong to.*/
public class Delete extends Operator {private static final long serialVersionUID = 1L;private final TransactionId tid;private OpIterator child;private final TupleDesc tupleDesc;private Tuple deleteTuple;/*** Constructor specifying the transaction that this delete belongs to as* well as the child to read from.** @param t The transaction this delete runs in* @param child The child operator from which to read tuples for deletion*/public Delete(TransactionId t, OpIterator child) {this.tid = t;this.child = child;this.tupleDesc = new TupleDesc(new Type[]{Type.INT_TYPE}, new String[]{"deleteNums"});this.deleteTuple = null;}public TupleDesc getTupleDesc() {return this.tupleDesc;}public void open() throws DbException, TransactionAbortedException {super.open();child.open();}public void close() {super.close();child.close();}public void rewind() throws DbException, TransactionAbortedException {child.rewind();}/*** Deletes tuples as they are read from the child operator. Deletes are* processed via the buffer pool (which can be accessed via the* Database.getBufferPool() method.** @return A 1-field tuple containing the number of deleted records.* @see Database#getBufferPool* @see BufferPool#deleteTuple*/protected Tuple fetchNext() throws TransactionAbortedException, DbException {if (deleteTuple != null) {return null;}BufferPool bufferPool = Database.getBufferPool();int deleteNums = 0;while (child.hasNext()) {try {bufferPool.deleteTuple(tid, child.next());deleteNums++;} catch (IOException e) {e.printStackTrace();}}deleteTuple = new Tuple(tupleDesc);deleteTuple.setField(0, new IntField(deleteNums));return deleteTuple;}@Overridepublic OpIterator[] getChildren() {return new OpIterator[]{child};}@Overridepublic void setChildren(OpIterator[] children) {this.child = children[0];}}
完成實(shí)驗(yàn)后需要通過InsertTest單元測試,并且通過InsertTest和DeleteTest系統(tǒng)測試
練習(xí)五 – Page eviction
在實(shí)驗(yàn)一中,我們沒有正確的根據(jù)BufferPool構(gòu)造函數(shù)中定義的numPages對BufferPool中緩存的最大頁面數(shù)量進(jìn)行限制,本節(jié)我們將實(shí)現(xiàn)拒絕策略。
當(dāng)緩沖池中存在超過numPages數(shù)量的頁時(shí),我們需要在加載下一個(gè)頁時(shí)選擇淘汰緩沖池中現(xiàn)存的一個(gè)頁;具體的拒絕策略我們自己選擇即可。
BufferPool中包含一個(gè)flushAllPages
方法,該方法不會被實(shí)際用到,只是用來進(jìn)行實(shí)際的測試,我們在實(shí)際代碼中不會調(diào)用此方法。
flushAllPages方法需要調(diào)用flushPage方法,并且flushPage方法需要在page離開BufferPool時(shí)將臟頁寫入磁盤,并且將其置為非臟。
從緩沖池中移除頁面的唯一方法是evictPage,當(dāng)任何臟頁被丟棄時(shí),我們需要調(diào)用flushPage方法來將其刷新到磁盤。
如果學(xué)過操作系統(tǒng),那么應(yīng)該了解過緩存頁面丟棄策略,主要有先進(jìn)先出(FIFO)、最近最少使用(LRU)和最不常用(LFU)這幾種方法,我們可以選擇不同的策略實(shí)現(xiàn)。我這里給定了一個(gè)抽象的接口,定義好方法,最后實(shí)現(xiàn)了FIFO和LRU頁面丟棄策略,詳情請看代碼。
實(shí)現(xiàn)BufferPool的頁面丟棄策略:
- src/java/simpledb/storage/BufferPool.java
我們需要實(shí)現(xiàn)discardPage方法去移除緩沖池中沒有被刷新到磁盤上的頁,本次實(shí)驗(yàn)不會使用該方法,但是它是未來的實(shí)驗(yàn)所必須的。
頁面淘汰采用策略模式進(jìn)行實(shí)現(xiàn),這里只展示FIFO策略的實(shí)現(xiàn),LRU可以采用哈希鏈表實(shí)現(xiàn),具體可以參考Lab2源代碼中的LRUEvict類:
public interface EvictStrategy {/*** 修改對應(yīng)的數(shù)據(jù)結(jié)構(gòu)以滿足丟棄策略* @param pageId*/void addPage(PageId pageId);/*** 獲取要丟棄的Page的PageId信息,用于丟棄* @return PageId*/PageId getEvictPageId();}public class FIFOEvict implements EvictStrategy {/*** 存儲數(shù)據(jù)的隊(duì)列*/private final Queue<PageId> queue;public FIFOEvict(int numPages) {this.queue = new ArrayDeque<>(numPages);}@Overridepublic void addPage(PageId pageId) {// 向尾部插入元素boolean offer = queue.offer(pageId);if (offer) {System.out.println("PageId: " + pageId + " 插入隊(duì)列成功");} else {System.out.println("PageId: " + pageId + " 插入隊(duì)列失敗");}}@Overridepublic PageId getEvictPageId() {// 從隊(duì)列頭部獲取元素return queue.poll();}}
借助淘汰策略接口和實(shí)現(xiàn)類,完成BufferPool中關(guān)于flushPage和evitPage相關(guān)方法:
private final EvictStrategy evict;public BufferPool(int numPages) {this.numPages = numPages;this.pageCache = new ConcurrentHashMap<>();this.evict = new FIFOEvict(numPages);}/*** Flush all dirty pages to disk.* NB: Be careful using this routine -- it writes dirty data to disk so will* break simpledb if running in NO STEAL mode.*/public synchronized void flushAllPages() throws IOException {pageCache.forEach((pageId, page) -> {try {flushPage(pageId);} catch (IOException e) {e.printStackTrace();}});} /*** Remove the specific page id from the buffer pool.* Needed by the recovery manager to ensure that the* buffer pool doesn't keep a rolled back page in its* cache.* <p>* Also used by B+ tree files to ensure that deleted pages* are removed from the cache so they can be reused safely*/public synchronized void discardPage(PageId pid) {pageCache.remove(pid);}/*** Flushes a certain page to disk** @param pid an ID indicating the page to flush*/private synchronized void flushPage(PageId pid) throws IOException {Page flush = pageCache.get(pid);// 通過tableId找到對應(yīng)的DbFile,并將page寫入到對應(yīng)的DbFile中int tableId = pid.getTableId();DbFile dbFile = Database.getCatalog().getDatabaseFile(tableId);// 將page刷新到磁盤dbFile.writePage(flush);}/*** Discards a page from the buffer pool.* Flushes the page to disk to ensure dirty pages are updated on disk.*/private synchronized void evictPage() throws DbException {PageId evictPageId = evict.getEvictPageId();try {flushPage(evictPageId);} catch (IOException e) {e.printStackTrace();}pageCache.remove(evictPageId);}public Page getPage(TransactionId tid, PageId pid, Permissions perm)throws TransactionAbortedException, DbException {if (!pageCache.containsKey(pid)) {if (pageCache.size() > numPages) {evictPage();}DbFile dbFile = Database.getCatalog().getDatabaseFile(pid.getTableId());Page page = dbFile.readPage(pid);pageCache.put(pid, page);evict.addPage(pid);}return pageCache.get(pid);}
完成練習(xí)之后,代碼需要通過EvictionTest單元測試。
至此我們就完成本次實(shí)驗(yàn)了,接下來還有對實(shí)驗(yàn)內(nèi)容的其他測試。
練習(xí)六 – Query walkthrough
通過我們實(shí)現(xiàn)的各種查詢策略,來執(zhí)行類似于下面SQL語句的聯(lián)合查詢:
SELECT *
FROM some_data_file1,some_data_file2
WHERE some_data_file1.field1 = some_data_file2.field1AND some_data_file1.id > 1
我們需要根據(jù)實(shí)驗(yàn)一中的方法創(chuàng)建兩個(gè)數(shù)據(jù)庫文件some_data_file1.dat和some_data_file2.dat,然后使用如下代碼進(jìn)行測試:
運(yùn)行下面這個(gè)測試,可以得到2,2,3,3,2,4和3,3,4,5,3,7兩條結(jié)果:
public class JoinTest {/*** select * from t1,t2 where t1.f0 > 1 and t1.f1 = t2.f1 ;*/public static void main (String[] args) {// construct a 3-column table schemaType[] types = new Type[]{Type.INT_TYPE, Type.INT_TYPE, Type.INT_TYPE};String[] names = new String[]{"f0", "f1", "f2"};TupleDesc td = new TupleDesc(types, names);// create the tables, associate them with the data files// and tell the catalog about the schema the tables.HeapFile table1 = new HeapFile(new File("some_data_file1.dat"), td);Database.getCatalog().addTable(table1, "t1");HeapFile table2 = new HeapFile(new File("some_data_file2.dat"), td);Database.getCatalog().addTable(table2, "t2");// construct the query: we use two SeqScans, which spoonfeed// tuples via iterators into joinTransactionId tid = new TransactionId();SeqScan ss1 = new SeqScan(tid, table1.getId(), "t1");SeqScan ss2 = new SeqScan(tid, table2.getId(), "t2");// create a filter for the where conditionFilter sf1 = new Filter(new Predicate(0,Predicate.Op.GREATER_THAN, new IntField(1)), ss1);JoinPredicate p = new JoinPredicate(1, Predicate.Op.EQUALS, 1);Join j = new Join(p, sf1, ss2);// and run ittry {j.open();while (j.hasNext()) {Tuple tup = j.next();System.out.println(tup);}j.close();Database.getBufferPool().transactionComplete(tid);} catch (Exception e) {e.printStackTrace();}}
}
練習(xí)七 - 查詢解析
本節(jié)我們將會使用SimpleDB中已經(jīng)編寫好的SQL解析器來實(shí)現(xiàn)基于SQL語句的查詢
首先我們需要?jiǎng)?chuàng)建數(shù)據(jù)庫表和數(shù)據(jù)庫目錄,其中數(shù)據(jù)庫表data.txt的內(nèi)容如下:
1,10
2,20
3,30
4,40
5,50
5,50
通過如下命令將其轉(zhuǎn)換為二進(jìn)制文件:
java -jar dist/simpledb.jar convert data.txt 2 "int,int"
接下來創(chuàng)建數(shù)據(jù)庫目錄文件catalog.txt:
data (f1 int, f2 int)
該文件會告訴SimpleDB數(shù)據(jù)庫中包含一個(gè)表:data,其結(jié)構(gòu)為兩個(gè)int類型的列
最后,我們運(yùn)行如下命令:
java -jar dist/simpledb.jar parser catalog.txt
可以看到如下輸出:
Added table : data with schema INT_TYPE(f1), INT_TYPE(f2)
Computing table stats.
Done.
SimpleDB>
接著輸入SQL語句即可進(jìn)行查詢:
SimpleDB> select d.f1, d.f2 from data d;
Started a new transaction tid = 0
Added scan of table d
Added select list field d.f1
Added select list field d.f2
The query plan is:π(d.f1,d.f2),card:0|
scan(data d)d.f1 d.f2
------------------
1 102 203 304 405 505 506 rows.
Transaction 0 committed.
----------------
0.10 seconds
如果沒有報(bào)錯(cuò)的話,證明你的相關(guān)實(shí)現(xiàn)都是正確的