IBM openblockchain学习(二)--chaincode源码分析
- 1. chaincode.go
- 1.1. MustGetLogger
- 1.2. Start(cc Chaincode)
- 1.3. getPeerAddress()
- 1.4. newPeerClientConnection()
- 1.5. chatWithPeer()
- 1.6. GetState
- 1.7. PutState
- 1.8. DelState
- 1.9. StateRangeQueryIterator
- 1.10. StateRangeQueryIterator
- 1.11. HasNext()
- 1.12. Next()
- 1.13. Close()
- 1.14. InvokeChaincode()
- 1.15. QueryChaincode
- 1.16. GetRows
- 2. chaincode.pb.go
- 3. shim包下handler.go
- 4. exectransaction.go
- 5. chaincode下handler.go
openblockchain是IBM开源的blockchain项目,具体安装流程之前已经介绍过,具体请看http://blog.csdn.net/pangjiuzala/article/details/50897819。
解压后会发现在obc-peer根目录下出现一个main.go文件,其中主要功能是生成obc-peer命令,核心代码集中在openchain中的。接下来,将首先从chaincode代码分析开始,包含了如图下图所示的几个核心文件。
chaincode.go
chaincode.go位于shim包中,
var chaincodeLogger = logging.MustGetLogger("chaincode") |
MustGetLogger
MustGetLogger位于logger.go文件中,具体代码如下
// GetLogger 创建和返回一个基于模块名称的Logger对象 |
定义chaincode接口,它是一个供chaincode开发者需要实现标准回调接口type Chaincode interface {
// run方法在每一笔交易初始化的时候被调用
Run(stub *ChaincodeStub, function string, args []string) ([]byte, error)
// Query函数以只读方式查询chaincode状态
Query(stub *ChaincodeStub, function string, args []string) ([]byte, error)
}
Start(cc Chaincode)
启动chaincode引导程序的入口节点
func Start(cc Chaincode) error { |
getPeerAddress()
获取peer地址
func getPeerAddress() string { |
newPeerClientConnection()
创建peer 客户端连接
func newPeerClientConnection() (*grpc.ClientConn, error) { |
chatWithPeer()
使用peer进行通信func chatWithPeer(chaincodeSupportClient pb.ChaincodeSupportClient, cc Chaincode) error {
// 通过peer验证创建流
stream, err := chaincodeSupportClient.Register(context.Background())
if err != nil {
return fmt.Errorf("Error chatting with leader at address=%s: %s", getPeerAddress(), err)
}
//创建传递给链码的链码存根
//stub := &ChaincodeStub{}
// Create the shim handler responsible for all control logic
handler = newChaincodeHandler(getPeerAddress(), stream, cc)
defer stream.CloseSend()
// Send the ChaincodeID during register.
chaincodeID := &pb.ChaincodeID{Name: viper.GetString("chaincode.id.name")}
payload, err := proto.Marshal(chaincodeID)
if err != nil {
return fmt.Errorf("Error marshalling chaincodeID during chaincode registration: %s", err)
}
// 流寄存器
chaincodeLogger.Debug("Registering.. sending %s", pb.ChaincodeMessage_REGISTER)
handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload})
waitc := make(chan struct{})
go func() {
defer close(waitc)
msgAvail := make(chan *pb.ChaincodeMessage)
var nsInfo *nextStateInfo
var in *pb.ChaincodeMessage
recv := true
for {
in = nil
err = nil
nsInfo = nil
if recv {
recv = false
go func() {
var in2 *pb.ChaincodeMessage
in2, err = stream.Recv()
msgAvail <- in2
}()
}
select {
case in = <-msgAvail:
if err == io.EOF {
chaincodeLogger.Debug("Received EOF, ending chaincode stream, %s", err)
return
} else if err != nil {
chaincodeLogger.Error(fmt.Sprintf("Received error from server: %s, ending chaincode stream", err))
return
} else if in == nil {
err = fmt.Errorf("Received nil message, ending chaincode stream")
chaincodeLogger.Debug("Received nil message, ending chaincode stream")
return
}
chaincodeLogger.Debug("[%s]Received message %s from shim", shortuuid(in.Uuid), in.Type.String())
recv = true
case nsInfo = <-handler.nextState:
in = nsInfo.msg
if in == nil {
panic("nil msg")
}
chaincodeLogger.Debug("[%s]Move state message %s", shortuuid(in.Uuid), in.Type.String())
}
// 调用 FSM.handleMessage(),fsm即状态机
err = handler.handleMessage(in)
if err != nil {
err = fmt.Errorf("Error handling message: %s", err)
return
}
if nsInfo != nil && nsInfo.sendToCC {
chaincodeLogger.Debug("[%s]send state message %s", shortuuid(in.Uuid), in.Type.String())
if err = handler.serialSend(in); err != nil {
err = fmt.Errorf("Error sending %s: %s", in.Type.String(), err)
return
}
}
}
}()
<-waitc
return err
}
GetState
GetState被调用后,将从总帐中获取chaincode状态记录func (stub *ChaincodeStub) GetState(key string) ([]byte, error) {
return handler.handleGetState(key, stub.UUID)
PutState
相对于GetState,PutState被调用后,将会把chaincode状态记录到总账中
func (stub *ChaincodeStub) PutState(key string, value []byte) error { |
DelState
从总账中删除chaincode状态记录func (stub *ChaincodeStub) DelState(key string) error {
return handler.handleDelState(key, stub.UUID)
}
StateRangeQueryIterator
StateRangeQueryIterator是一个迭代器,遍历一定范围内的以键值对形式记录的状态type StateRangeQueryIterator struct {
handler *Handler
uuid string
response *pb.RangeQueryStateResponse
currentLoc int
}
StateRangeQueryIterator
chaincode 调用RangeQueryState来查询一定范围内键的状态。假设startKey和endKey按词汇顺序排序,将返回一个迭代器,可用于遍历startKey和endKey之间的所有键,并且迭代器返回顺序是随机的。func (stub *ChaincodeStub) RangeQueryState(startKey, endKey string) (*StateRangeQueryIterator, error) {
response, err := handler.handleRangeQueryState(startKey, endKey, stub.UUID)
if err != nil {
return nil, err
}
return &StateRangeQueryIterator{handler, stub.UUID, response, 0}, nil
}
HasNext()
如果迭代器的查询范围包含附加键和值,hasnext将返回truefunc (iter *StateRangeQueryIterator) HasNext() bool {
if iter.currentLoc < len(iter.response.KeysAndValues) || iter.response.HasMore {
return true
}
return false
}
Next()
Next将返回下一个在迭代器查询范围内的键和值func (iter *StateRangeQueryIterator) Next() (string, []byte, error) {
if iter.currentLoc < len(iter.response.KeysAndValues) {
keyValue := iter.response.KeysAndValues[iter.currentLoc]
iter.currentLoc++
return keyValue.Key, keyValue.Value, nil
} else if !iter.response.HasMore {
return "", nil, errors.New("No such key")
} else {
response, err := iter.handler.handleRangeQueryStateNext(iter.response.ID, iter.uuid)
//返回迭代器的响应id和uuid
if err != nil {
return "", nil, err
}
iter.currentLoc = 0
iter.response = response
keyValue := iter.response.KeysAndValues[iter.currentLoc]
iter.currentLoc++
return keyValue.Key, keyValue.Value, nil
}
}
Close()
当读取过程完成从迭代器中释放资源后,调用Close函数关闭范围查询迭代器func (iter *StateRangeQueryIterator) Close() error {
_, err := iter.handler.handleRangeQueryStateClose(iter.response.ID, iter.uuid)
return err
}
InvokeChaincode()
通过一个chaincode调用中执行对另一个chaincode的调用func (stub *ChaincodeStub) InvokeChaincode(chaincodeName string, function string, args []string) ([]byte, error) {
return handler.handleInvokeChaincode(chaincodeName, function, args, stub.UUID)
}
QueryChaincode
当一个chaincode调用中执行对另一个chaincode的查询操作时调用
QueryChaincode
|
GetRows
接下来的方法是对数据库建表以及查询的一些功能函数,这里就不一一赘述,以GetRows方法为例介绍
GetRows返回基于部分key的行,例如 ,下给出下表
| A | B | C | D |
当A,C和D是key的时候,GetRow方法可以被|A,C|调用,来返回所有包含A,C以及其他例如D的值的行。当然,只包含A时也可以获取C以及D的值func (stub *ChaincodeStub) GetRows(tableName string, key []Column) (<-chan Row, error) {
keyString, err := buildKeyString(tableName, key)
if err != nil {
return nil, err
}
iter, err := stub.RangeQueryState(keyString+"1", keyString+":")
if err != nil {
return nil, fmt.Errorf("Error fetching rows: %s", err)
}
defer iter.Close()
rows := make(chan Row)
go func() {
for iter.HasNext() {
_, rowBytes, err := iter.Next()
if err != nil {
close(rows)
}
var row Row
err = proto.Unmarshal(rowBytes, &row)
//调用proto的Unmarshal函数,实现对数据的散集
if err != nil {
close(rows)
}
rows <- row
}
close(rows)
}()
return rows, nil
}
chaincode.pb.go
chaincode.pb.go是从chaincode.proto文件中产生的,它包含了一些顶端消息
- 列的定义
- 表信息
- 行信息
- 列信息
ColumnDefinition
type ColumnDefinition struct { |
Table
type Table struct { |
Column
type Column struct { |
Row
type Row struct { |
shim包下handler.go
Handler Struct
Handler实现shim包中的一面
type Handler struct {
sync.RWMutex
// RWMutex提供了四个方法:
// func (*RWMutex) Lock 写锁定
// func (*RWMutex) Unlock 写解锁
// func (*RWMutex) RLock 读锁定
// func (*RWMutex) RUnlock 读解锁
To string
ChatStream PeerChaincodeStream
FSM *fsm.FSM
cc Chaincode
// 多个查询(和一个事务)具有不同的UUID可以并行为执行chaincode
// responseChannel是其上响应由shim到chaincodeStub连通的通道。
responseChannel map[string]chan pb.ChaincodeMessage
// 跟踪哪些是的UUID交易,这是查询,以决定是否允许获取/把状态并调用chaincode。
isTransaction map[string]bool
//isTransaction的key为String类型,value为bool类型
nextState chan *nextStateInfo
}
PeerChaincodeStream接口定义了Peer和chaincode实例之间的流。
type PeerChaincodeStream interface {
Send(*pb.ChaincodeMessage) error
Recv() (*pb.ChaincodeMessage, error)
}
markIsTransaction
markIsTransaction函数标志着UUID作为交易或查询,
为true的时候代表交易,为false的时候代表查询func (handler *Handler) markIsTransaction(uuid string, isTrans bool) bool {
if handler.isTransaction == nil {
return false
}
handler.Lock()
defer handler.Unlock()
handler.isTransaction[uuid] = isTrans
return true
}
exectransaction.go
Execute
执行交易或查询func Execute(ctxt context.Context, chain *ChaincodeSupport, t *pb.Transaction) ([]byte, error) {
var err error
// 得到一个处理账本至标记TX的开始/结束
ledger, ledgerErr := ledger.GetLedger()
if ledgerErr != nil {
return nil, fmt.Errorf("Failed to get handle to ledger (%s)", ledgerErr)
}
if secHelper := chain.getSecHelper(); nil != secHelper {
var err error
t, err = secHelper.TransactionPreExecution(t)
// 注意,t被现在解密并且是原始输入t的深克隆
if nil != err {
return nil, err
}
}
if t.Type == pb.Transaction_CHAINCODE_NEW {
_, err := chain.DeployChaincode(ctxt, t)
if err != nil {
return nil, fmt.Errorf("Failed to deploy chaincode spec(%s)", err)
}
//启动并等待准备就绪
markTxBegin(ledger, t)
_, _, err = chain.LaunchChaincode(ctxt, t)
if err != nil {
markTxFinish(ledger, t, false)
return nil, fmt.Errorf("%s", err)
}
markTxFinish(ledger, t, true)
} else if t.Type == pb.Transaction_CHAINCODE_EXECUTE || t.Type == pb.Transaction_CHAINCODE_QUERY {
//将发动(如有必要,并等待就绪)
cID, cMsg, err := chain.LaunchChaincode(ctxt, t)
if err != nil {
return nil, fmt.Errorf("Failed to launch chaincode spec(%s)", err)
}
//这里应该生效,因为它上面的生效...
chaincode := cID.Name
if err != nil {
return nil, fmt.Errorf("Failed to stablish stream to container %s", chaincode)
}
// 当getTimeout调用被创建的事务块需要注释下一行,并取消注释
timeout := time.Duration(30000) * time.Millisecond
//timeout, err := getTimeout(cID)
if err != nil {
return nil, fmt.Errorf("Failed to retrieve chaincode spec(%s)", err)
}
var ccMsg *pb.ChaincodeMessage
if t.Type == pb.Transaction_CHAINCODE_EXECUTE {
ccMsg, err = createTransactionMessage(t.Uuid, cMsg)
if err != nil {
return nil, fmt.Errorf("Failed to transaction message(%s)", err)
}
} else {
ccMsg, err = createQueryMessage(t.Uuid, cMsg)
if err != nil {
return nil, fmt.Errorf("Failed to query message(%s)", err)
}
}
markTxBegin(ledger, t)
resp, err := chain.Execute(ctxt, chaincode, ccMsg, timeout, t)
if err != nil {
// 交易回滚
markTxFinish(ledger, t, false)
return nil, fmt.Errorf("Failed to execute transaction or query(%s)", err)
} else if resp == nil {
// 交易回滚
markTxFinish(ledger, t, false)
return nil, fmt.Errorf("Failed to receive a response for (%s)", t.Uuid)
} else {
if resp.Type == pb.ChaincodeMessage_COMPLETED || resp.Type == pb.ChaincodeMessage_QUERY_COMPLETED {
// Success
markTxFinish(ledger, t, true)
return resp.Payload, nil
} else if resp.Type == pb.ChaincodeMessage_ERROR || resp.Type == pb.ChaincodeMessage_QUERY_ERROR {
// Rollback transaction
markTxFinish(ledger, t, false)
return nil, fmt.Errorf("Transaction or query returned with failure: %s", string(resp.Payload))
}
markTxFinish(ledger, t, false)
return resp.Payload, fmt.Errorf("receive a response for (%s) but in invalid state(%d)", t.Uuid, resp.Type)
}
} else {
err = fmt.Errorf("Invalid transaction type %s", t.Type.String())
}
return nil, err
}
ExecuteTransactions
由一个执行数组中的一个上的交易将返回错误阵列之一的每个交易。如果执行成功,数组元素将是零。返回状态的哈希值func ExecuteTransactions(ctxt context.Context, cname ChainName, xacts []*pb.Transaction) ([]byte, []error) {
var chain = GetChain(cname)
if chain == nil {
// 我们不应该到调到这里来,但在其他方面一个很好的提醒,以更好地处理
panic(fmt.Sprintf("[ExecuteTransactions]Chain %s not found\n", cname))
}
errs := make([]error, len(xacts)+1)
for i, t := range xacts {
_, errs[i] = Execute(ctxt, chain, t)
}
ledger, hasherr := ledger.GetLedger()
var statehash []byte
if hasherr == nil {
statehash, hasherr = ledger.GetTempStateHash()
}
errs[len(errs)-1] = hasherr
return statehash, errs
}
chaincode下handler.go
//负责处理对Peer's 侧的chaincode流的管理 |
chaincode就分析到这里,其中有不正确的地方还望读者批评指正。接下来将分析另一个重要的部分Ledger,敬请期待哦