文章目录
  1. 1. chaincode.go
    1. 1.1. MustGetLogger
    2. 1.2. Start(cc Chaincode)
    3. 1.3. getPeerAddress()
    4. 1.4. newPeerClientConnection()
    5. 1.5. chatWithPeer()
    6. 1.6. GetState
    7. 1.7. PutState
    8. 1.8. DelState
    9. 1.9. StateRangeQueryIterator
    10. 1.10. StateRangeQueryIterator
    11. 1.11. HasNext()
    12. 1.12. Next()
    13. 1.13. Close()
    14. 1.14. InvokeChaincode()
    15. 1.15. QueryChaincode
    16. 1.16. GetRows
  2. 2. chaincode.pb.go
    1. 2.1. ColumnDefinition
    2. 2.2. Table
    3. 2.3. Column
    4. 2.4. Row
  3. 3. shim包下handler.go
    1. 3.1. Handler Struct
    2. 3.2. markIsTransaction
  4. 4. exectransaction.go
    1. 4.1. Execute
    2. 4.2. ExecuteTransactions
  5. 5. chaincode下handler.go

openblockchain是IBM开源的blockchain项目,具体安装流程之前已经介绍过,具体请看http://blog.csdn.net/pangjiuzala/article/details/50897819。

解压后会发现在obc-peer根目录下出现一个main.go文件,其中主要功能是生成obc-peer命令,核心代码集中在openchain中的。接下来,将首先从chaincode代码分析开始,包含了如图下图所示的几个核心文件。



chaincode.go

chaincode.go位于shim包中,

var chaincodeLogger = logging.MustGetLogger("chaincode")
//调用go-logging中logging库的MustGetLogger函数对shim package进行记录,相当于日志文件
//其中传递的参数为当前go文件

MustGetLogger

MustGetLogger位于logger.go文件中,具体代码如下

// GetLogger 创建和返回一个基于模块名称的Logger对象
func GetLogger(module string) (*Logger, error) {
return &Logger{Module: module}, nil
}

// MustGetLogger 与GetLogger相似,但是当logger不能被创建时会出现
//错乱,它简化了安全初始化全局记录器

func MustGetLogger(module string) *Logger {
logger, err := GetLogger(module)
if err != nil {
panic("logger: " + module + ": " + err.Error())
}
return logger
}

定义chaincode接口,它是一个供chaincode开发者需要实现标准回调接口

type Chaincode interface {
// run方法在每一笔交易初始化的时候被调用
Run(stub *ChaincodeStub, function string, args []string) ([]byte, error)
// Query函数以只读方式查询chaincode状态
Query(stub *ChaincodeStub, function string, args []string) ([]byte, error)
}

Start(cc Chaincode)

启动chaincode引导程序的入口节点

func Start(cc Chaincode) error {
viper.SetEnvPrefix("OPENCHAIN")
viper.AutomaticEnv()
replacer := strings.NewReplacer(".", "_")
//替换前缀为openchain的文件中的.为_
viper.SetEnvKeyReplacer(replacer)

flag.StringVar(&peerAddress, "peer.address", "", "peer address")

flag.Parse()

chaincodeLogger.Debug("Peer address: %s", getPeerAddress())

// 使用同步检验建立与client的连接
clientConn, err := newPeerClientConnection()
if err != nil {
chaincodeLogger.Error(fmt.Sprintf("Error trying to connect to local peer: %s", err))
return fmt.Errorf("Error trying to connect to local peer: %s", err)
}

chaincodeLogger.Debug("os.Args returns: %s", os.Args)

chaincodeSupportClient := pb.NewChaincodeSupportClient(clientConn)

err = chatWithPeer(chaincodeSupportClient, cc)
//启动chiancodeSuppportClient

return err
}

getPeerAddress()

获取peer地址

func getPeerAddress() string {
if peerAddress != "" {
return peerAddress
}

if peerAddress = viper.GetString("peer.address"); peerAddress == "" {
//假如被docker容器包含,返回主机地址
peerAddress = "172.17.42.1:30303"
}

return peerAddress
}

newPeerClientConnection()

创建peer 客户端连接

func newPeerClientConnection() (*grpc.ClientConn, error) {
//调用google.golang.org中grpc的ClinetConn方法
var opts []grpc.DialOption
if viper.GetBool("peer.tls.enabled") {
//viper来源于github.com/spf13/viper,一个应用程序的配置系统
var sn string
if viper.GetString("peer.tls.server-host-override") != "" {
sn = viper.GetString("peer.tls.server-host-override")
}
var creds credentials.TransportAuthenticator
//credenetials包实现了各种支持g RPC库调用的凭证
if viper.GetString("peer.tls.cert.file") != "" {
var err error
creds, err = credentials.NewClientTLSFromFile(viper.GetString("peer.tls.cert.file"), sn)
if err != nil {
grpclog.Fatalf("Failed to create TLS credentials %v", err)
}
} else {
creds = credentials.NewClientTLSFromCert(nil, sn)
//NewClientTLSFromCert从客户端输入的证书中构造了TLS
}
//append中grpc调用的方法在grpc根目录下下clientconn.go文件中定义方//法
opts = append(opts, grpc.WithTransportCredentials(creds))
}
///WithTransportCredentials返回配置了一个连接级别的安全凭据(例//如,TLS/ SSL)的拨号操作。
opts = append(opts, grpc.WithTimeout(1*time.Second))
//WithTimeout返回配置客户端拨号超时连接的拨号操作
opts = append(opts, grpc.WithBlock())
//WithBlock返回一个拨号选项,它将持续调用一个拨号块直到底层的连接已经建立。没有这一点,在后台发生将会立即返回拨打和服务器连接。
opts = append(opts, grpc.WithInsecure())
conn, err := grpc.Dial(getPeerAddress(), opts...)
if err != nil {
return nil, err
}
return conn, err
}

chatWithPeer()

使用peer进行通信

func chatWithPeer(chaincodeSupportClient pb.ChaincodeSupportClient, cc Chaincode) error {

// 通过peer验证创建流
stream, err := chaincodeSupportClient.Register(context.Background())
if err != nil {
return fmt.Errorf("Error chatting with leader at address=%s: %s", getPeerAddress(), err)
}

//创建传递给链码的链码存根
//stub := &ChaincodeStub{}

// Create the shim handler responsible for all control logic
handler = newChaincodeHandler(getPeerAddress(), stream, cc)

defer stream.CloseSend()
// Send the ChaincodeID during register.
chaincodeID := &pb.ChaincodeID{Name: viper.GetString("chaincode.id.name")}
payload, err := proto.Marshal(chaincodeID)
if err != nil {
return fmt.Errorf("Error marshalling chaincodeID during chaincode registration: %s", err)
}
// 流寄存器
chaincodeLogger.Debug("Registering.. sending %s", pb.ChaincodeMessage_REGISTER)
handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload})
waitc := make(chan struct{})
go func() {
defer close(waitc)
msgAvail := make(chan *pb.ChaincodeMessage)
var nsInfo *nextStateInfo
var in *pb.ChaincodeMessage
recv := true
for {
in = nil
err = nil
nsInfo = nil
if recv {
recv = false
go func() {
var in2 *pb.ChaincodeMessage
in2, err = stream.Recv()
msgAvail <- in2
}()
}
select {
case in = <-msgAvail:
if err == io.EOF {
chaincodeLogger.Debug("Received EOF, ending chaincode stream, %s", err)
return
} else if err != nil {
chaincodeLogger.Error(fmt.Sprintf("Received error from server: %s, ending chaincode stream", err))
return
} else if in == nil {
err = fmt.Errorf("Received nil message, ending chaincode stream")
chaincodeLogger.Debug("Received nil message, ending chaincode stream")
return
}
chaincodeLogger.Debug("[%s]Received message %s from shim", shortuuid(in.Uuid), in.Type.String())
recv = true
case nsInfo = <-handler.nextState:
in = nsInfo.msg
if in == nil {
panic("nil msg")
}
chaincodeLogger.Debug("[%s]Move state message %s", shortuuid(in.Uuid), in.Type.String())
}

// 调用 FSM.handleMessage(),fsm即状态机
err = handler.handleMessage(in)
if err != nil {
err = fmt.Errorf("Error handling message: %s", err)
return
}
if nsInfo != nil && nsInfo.sendToCC {
chaincodeLogger.Debug("[%s]send state message %s", shortuuid(in.Uuid), in.Type.String())
if err = handler.serialSend(in); err != nil {
err = fmt.Errorf("Error sending %s: %s", in.Type.String(), err)
return
}
}
}
}()
<-waitc
return err
}

GetState

GetState被调用后,将从总帐中获取chaincode状态记录

func (stub *ChaincodeStub) GetState(key string) ([]byte, error) {
return handler.handleGetState(key, stub.UUID)

PutState

相对于GetState,PutState被调用后,将会把chaincode状态记录到总账中

func (stub *ChaincodeStub) PutState(key string, value []byte) error {
return handler.handlePutState(key, value, stub.UUID)
}

DelState

从总账中删除chaincode状态记录

func (stub *ChaincodeStub) DelState(key string) error {
return handler.handleDelState(key, stub.UUID)
}

StateRangeQueryIterator

StateRangeQueryIterator是一个迭代器,遍历一定范围内的以键值对形式记录的状态

type StateRangeQueryIterator struct {
handler *Handler
uuid string
response *pb.RangeQueryStateResponse
currentLoc int
}

StateRangeQueryIterator

chaincode 调用RangeQueryState来查询一定范围内键的状态。假设startKey和endKey按词汇顺序排序,将返回一个迭代器,可用于遍历startKey和endKey之间的所有键,并且迭代器返回顺序是随机的。

func (stub *ChaincodeStub) RangeQueryState(startKey, endKey string) (*StateRangeQueryIterator, error) {
response, err := handler.handleRangeQueryState(startKey, endKey, stub.UUID)
if err != nil {
return nil, err
}
return &StateRangeQueryIterator{handler, stub.UUID, response, 0}, nil
}

HasNext()

如果迭代器的查询范围包含附加键和值,hasnext将返回true

func (iter *StateRangeQueryIterator) HasNext() bool {
if iter.currentLoc < len(iter.response.KeysAndValues) || iter.response.HasMore {
return true
}
return false
}

Next()

Next将返回下一个在迭代器查询范围内的键和值

func (iter *StateRangeQueryIterator) Next() (string, []byte, error) {
if iter.currentLoc < len(iter.response.KeysAndValues) {
keyValue := iter.response.KeysAndValues[iter.currentLoc]
iter.currentLoc++
return keyValue.Key, keyValue.Value, nil

} else if !iter.response.HasMore {
return "", nil, errors.New("No such key")
} else {
response, err := iter.handler.handleRangeQueryStateNext(iter.response.ID, iter.uuid)
//返回迭代器的响应id和uuid

if err != nil {
return "", nil, err
}

iter.currentLoc = 0
iter.response = response
keyValue := iter.response.KeysAndValues[iter.currentLoc]
iter.currentLoc++
return keyValue.Key, keyValue.Value, nil


}
}

Close()

当读取过程完成从迭代器中释放资源后,调用Close函数关闭范围查询迭代器

func (iter *StateRangeQueryIterator) Close() error {
_, err := iter.handler.handleRangeQueryStateClose(iter.response.ID, iter.uuid)
return err
}

InvokeChaincode()

通过一个chaincode调用中执行对另一个chaincode的调用

func (stub *ChaincodeStub) InvokeChaincode(chaincodeName string, function string, args []string) ([]byte, error) {
return handler.handleInvokeChaincode(chaincodeName, function, args, stub.UUID)
}

QueryChaincode

当一个chaincode调用中执行对另一个chaincode的查询操作时调用
QueryChaincode


func (stub *ChaincodeStub) QueryChaincode(chaincodeName string, function string, args []string) ([]byte, error) {
return handler.handleQueryChaincode(chaincodeName, function, args, stub.UUID)
}

GetRows

接下来的方法是对数据库建表以及查询的一些功能函数,这里就不一一赘述,以GetRows方法为例介绍

GetRows返回基于部分key的行,例如 ,下给出下表
| A | B | C | D |
当A,C和D是key的时候,GetRow方法可以被|A,C|调用,来返回所有包含A,C以及其他例如D的值的行。当然,只包含A时也可以获取C以及D的值

func (stub *ChaincodeStub) GetRows(tableName string, key []Column) (<-chan Row, error) {

keyString, err := buildKeyString(tableName, key)
if err != nil {
return nil, err
}

iter, err := stub.RangeQueryState(keyString+"1", keyString+":")
if err != nil {
return nil, fmt.Errorf("Error fetching rows: %s", err)
}
defer iter.Close()

rows := make(chan Row)

go func() {
for iter.HasNext() {
_, rowBytes, err := iter.Next()
if err != nil {
close(rows)
}

var row Row
err = proto.Unmarshal(rowBytes, &row)
//调用proto的Unmarshal函数,实现对数据的散集
if err != nil {
close(rows)
}

rows <- row

}
close(rows)
}()

return rows, nil

}

chaincode.pb.go

chaincode.pb.go是从chaincode.proto文件中产生的,它包含了一些顶端消息

  • 列的定义
  • 表信息
  • 行信息
  • 列信息

ColumnDefinition

type ColumnDefinition struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
Type ColumnDefinition_Type `protobuf:"varint,2,opt,name=type,enum=shim.ColumnDefinition_Type" json:"type,omitempty"`
Key bool `protobuf:"varint,3,opt,name=key" json:"key,omitempty"`
}

Table

type Table struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
//protobuf以bytes为单位,opt未操作
//json中传递的是表名,可以忽略空表情况
ColumnDefinitions []*ColumnDefinition `protobuf:"bytes,2,rep,name=columnDefinitions" json:"columnDefinitions,omitempty"`
}

Column

type Column struct {
// Types that are valid to be assigned to Value:
// *Column_String_
// *Column_Int32
// *Column_Int64
// *Column_Uint32
// *Column_Uint64
// *Column_Bytes
// *Column_Bool
Value isColumn_Value `protobuf_oneof:"value"`
}

Row

type Row struct {
Columns []*Column `protobuf:"bytes,1,rep,name=columns" json:"columns,omitempty"`
}

shim包下handler.go

Handler Struct

Handler实现shim包中的一面


type Handler struct {
sync.RWMutex
// RWMutex提供了四个方法:

// func (*RWMutex) Lock 写锁定

// func (*RWMutex) Unlock 写解锁

// func (*RWMutex) RLock 读锁定

// func (*RWMutex) RUnlock 读解锁
To string
ChatStream PeerChaincodeStream
FSM *fsm.FSM
cc Chaincode
// 多个查询(和一个事务)具有不同的UUID可以并行为执行chaincode
// responseChannel是其上响应由shim到chaincodeStub连通的通道。
responseChannel map[string]chan pb.ChaincodeMessage
// 跟踪哪些是的UUID交易,这是查询,以决定是否允许获取/把状态并调用chaincode。
isTransaction map[string]bool
//isTransaction的key为String类型,value为bool类型
nextState chan *nextStateInfo
}

PeerChaincodeStream接口定义了Peer和chaincode实例之间的流。


type PeerChaincodeStream interface {
Send(*pb.ChaincodeMessage) error
Recv() (*pb.ChaincodeMessage, error)
}

markIsTransaction

markIsTransaction函数标志着UUID作为交易或查询,
为true的时候代表交易,为false的时候代表查询

func (handler *Handler) markIsTransaction(uuid string, isTrans bool) bool {
if handler.isTransaction == nil {
return false
}
handler.Lock()
defer handler.Unlock()
handler.isTransaction[uuid] = isTrans
return true
}

exectransaction.go

Execute

执行交易或查询

func Execute(ctxt context.Context, chain *ChaincodeSupport, t *pb.Transaction) ([]byte, error) {
var err error

// 得到一个处理账本至标记TX的开始/结束
ledger, ledgerErr := ledger.GetLedger()
if ledgerErr != nil {
return nil, fmt.Errorf("Failed to get handle to ledger (%s)", ledgerErr)
}

if secHelper := chain.getSecHelper(); nil != secHelper {
var err error
t, err = secHelper.TransactionPreExecution(t)
// 注意,t被现在解密并且是原始输入t的深克隆
if nil != err {
return nil, err
}
}

if t.Type == pb.Transaction_CHAINCODE_NEW {
_, err := chain.DeployChaincode(ctxt, t)
if err != nil {
return nil, fmt.Errorf("Failed to deploy chaincode spec(%s)", err)
}

//启动并等待准备就绪
markTxBegin(ledger, t)
_, _, err = chain.LaunchChaincode(ctxt, t)
if err != nil {
markTxFinish(ledger, t, false)
return nil, fmt.Errorf("%s", err)
}
markTxFinish(ledger, t, true)
} else if t.Type == pb.Transaction_CHAINCODE_EXECUTE || t.Type == pb.Transaction_CHAINCODE_QUERY {
//将发动(如有必要,并等待就绪)
cID, cMsg, err := chain.LaunchChaincode(ctxt, t)
if err != nil {
return nil, fmt.Errorf("Failed to launch chaincode spec(%s)", err)
}

//这里应该生效,因为它上面的生效...
chaincode := cID.Name

if err != nil {
return nil, fmt.Errorf("Failed to stablish stream to container %s", chaincode)
}

// 当getTimeout调用被创建的事务块需要注释下一行,并取消注释
timeout := time.Duration(30000) * time.Millisecond
//timeout, err := getTimeout(cID)

if err != nil {
return nil, fmt.Errorf("Failed to retrieve chaincode spec(%s)", err)
}

var ccMsg *pb.ChaincodeMessage
if t.Type == pb.Transaction_CHAINCODE_EXECUTE {
ccMsg, err = createTransactionMessage(t.Uuid, cMsg)
if err != nil {
return nil, fmt.Errorf("Failed to transaction message(%s)", err)
}
} else {
ccMsg, err = createQueryMessage(t.Uuid, cMsg)
if err != nil {
return nil, fmt.Errorf("Failed to query message(%s)", err)
}
}

markTxBegin(ledger, t)
resp, err := chain.Execute(ctxt, chaincode, ccMsg, timeout, t)
if err != nil {
// 交易回滚
markTxFinish(ledger, t, false)
return nil, fmt.Errorf("Failed to execute transaction or query(%s)", err)
} else if resp == nil {
// 交易回滚
markTxFinish(ledger, t, false)
return nil, fmt.Errorf("Failed to receive a response for (%s)", t.Uuid)
} else {
if resp.Type == pb.ChaincodeMessage_COMPLETED || resp.Type == pb.ChaincodeMessage_QUERY_COMPLETED {
// Success
markTxFinish(ledger, t, true)
return resp.Payload, nil
} else if resp.Type == pb.ChaincodeMessage_ERROR || resp.Type == pb.ChaincodeMessage_QUERY_ERROR {
// Rollback transaction
markTxFinish(ledger, t, false)
return nil, fmt.Errorf("Transaction or query returned with failure: %s", string(resp.Payload))
}
markTxFinish(ledger, t, false)
return resp.Payload, fmt.Errorf("receive a response for (%s) but in invalid state(%d)", t.Uuid, resp.Type)
}

} else {
err = fmt.Errorf("Invalid transaction type %s", t.Type.String())
}
return nil, err
}

ExecuteTransactions

由一个执行数组中的一个上的交易将返回错误阵列之一的每个交易。如果执行成功,数组元素将是零。返回状态的哈希值

func ExecuteTransactions(ctxt context.Context, cname ChainName, xacts []*pb.Transaction) ([]byte, []error) {
var chain = GetChain(cname)
if chain == nil {
// 我们不应该到调到这里来,但在其他方面一个很好的提醒,以更好地处理
panic(fmt.Sprintf("[ExecuteTransactions]Chain %s not found\n", cname))
}
errs := make([]error, len(xacts)+1)
for i, t := range xacts {
_, errs[i] = Execute(ctxt, chain, t)
}
ledger, hasherr := ledger.GetLedger()
var statehash []byte
if hasherr == nil {
statehash, hasherr = ledger.GetTempStateHash()
}
errs[len(errs)-1] = hasherr
return statehash, errs
}

chaincode下handler.go

//负责处理对Peer's 侧的chaincode流的管理

type Handler struct {
sync.RWMutex
ChatStream PeerChaincodeStream
FSM *fsm.FSM
ChaincodeID *pb.ChaincodeID

//此处理管理解密部署TX的副本,没有编码
deployTXSecContext *pb.Transaction

chaincodeSupport *ChaincodeSupport
registered bool
readyNotify chan bool
//是对TX UUID的要么调用或查询TX(解密)的映射。每个TX将被添加之前执行并完成时,执行删除操作
txCtxs map[string]*transactionContext

uuidMap map[string]bool

//跟踪哪些是UUID查询的;虽然shim保持包含这个,但它不能被信任
isTransaction map[string]bool

//用来发送并确保后的状态转换完成,
nextState chan *nextStateInfo
}

chaincode就分析到这里,其中有不正确的地方还望读者批评指正。接下来将分析另一个重要的部分Ledger,敬请期待哦

文章目录
  1. 1. chaincode.go
    1. 1.1. MustGetLogger
    2. 1.2. Start(cc Chaincode)
    3. 1.3. getPeerAddress()
    4. 1.4. newPeerClientConnection()
    5. 1.5. chatWithPeer()
    6. 1.6. GetState
    7. 1.7. PutState
    8. 1.8. DelState
    9. 1.9. StateRangeQueryIterator
    10. 1.10. StateRangeQueryIterator
    11. 1.11. HasNext()
    12. 1.12. Next()
    13. 1.13. Close()
    14. 1.14. InvokeChaincode()
    15. 1.15. QueryChaincode
    16. 1.16. GetRows
  2. 2. chaincode.pb.go
    1. 2.1. ColumnDefinition
    2. 2.2. Table
    3. 2.3. Column
    4. 2.4. Row
  3. 3. shim包下handler.go
    1. 3.1. Handler Struct
    2. 3.2. markIsTransaction
  4. 4. exectransaction.go
    1. 4.1. Execute
    2. 4.2. ExecuteTransactions
  5. 5. chaincode下handler.go