fabric源码分析之七链码源码分析
一、容器和虚拟机在fabric中,有两类链码,一类是系统链码,一类是用户链码。而链码都需要安装和实例化才能使用,在这当中,它们虽然原理相似,但是实现的方式还是有所不同。在系统链码中,首先要Register,然后再Deploy才能使用;而用户链码则首先要Install,然后再instantiate就可以被外部接口使用了。因此,对容器的启动也可分成这两部分来进行解析,从宏观上把握入口,然后分类进行..
一、容器和虚拟机
在fabric中,有两类链码,一类是系统链码,一类是用户链码。而链码都需要安装和实例化才能使用,在这当中,它们虽然原理相似,但是实现的方式还是有所不同。在系统链码中,首先要Register,然后再Deploy才能使用;而用户链码则首先要Install,然后再instantiate就可以被外部接口使用了。
因此,对容器的启动也可分成这两部分来进行解析,从宏观上把握入口,然后分类进行源码的解析。
二、整体的入口
在前面的分析中可以知道在Launch函数中,是启动容器的入口。那么就从Launch这个函数开始看(core/chaincode/chaincode_support.go):
func (cs *ChaincodeSupport) Launch(chainID, chaincodeName, chaincodeVersion string, qe ledger.QueryExecutor) (*Handler, error) {
cname := chaincodeName + ":" + chaincodeVersion
if h := cs.HandlerRegistry.Handler(cname); h != nil {
return h, nil
}
//此处到得容器相关的信息,包括生产容器的具体类型是系统链码容器还是用户链码容器
//在后面会说明,系统链码启动的容器是:inprocVM---inproContainer,用户链码启动的容器是DockerVM---DockerContainer
ccci, err := cs.Lifecycle.ChaincodeContainerInfo(chaincodeName, qe)
if err != nil {
// TODO: There has to be a better way to do this...
if cs.UserRunsCC {
chaincodeLogger.Error(
"You are attempting to perform an action other than Deploy on Chaincode that is not ready and you are in developer mode. Did you forget to Deploy your chaincode?",
)
}
return nil, errors.Wrapf(err, "[channel %s] failed to get chaincode container info for %s", chainID, cname)
}
//启动Runtime中的Launch
if err := cs.Launcher.Launch(ccci); err != nil {
return nil, errors.Wrapf(err, "[channel %s] could not launch chaincode %s", chainID, cname)
}
h := cs.HandlerRegistry.Handler(cname)
if h == nil {
return nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", chainID, cname)
}
return h, nil
}
//runtime_launcher.go
func (r *RuntimeLauncher) Launch(ccci *ccprovider.ChaincodeContainerInfo) error {
var startFailCh chan error
var timeoutCh <-chan time.Time
startTime := time.Now()
cname := ccci.Name + ":" + ccci.Version
launchState, alreadyStarted := r.Registry.Launching(cname)
if !alreadyStarted {
startFailCh = make(chan error, 1)
timeoutCh = time.NewTimer(r.StartupTimeout).C
codePackage, err := r.getCodePackage(ccci)
if err != nil {
return err
}
go func() {
//启动Process
if err := r.Runtime.Start(ccci, codePackage); err != nil {
startFailCh <- errors.WithMessage(err, "error starting container")
return
}
exitCode, err := r.Runtime.Wait(ccci)
if err != nil {
launchState.Notify(errors.Wrap(err, "failed to wait on container exit"))
}
launchState.Notify(errors.Errorf("container exited with %d", exitCode))
}()
}
......
return err
}
// Start launches chaincode in a runtime environment.
func (c *ContainerRuntime) Start(ccci *ccprovider.ChaincodeContainerInfo, codePackage []byte) error {
cname := ccci.Name + ":" + ccci.Version
lc, err := c.LaunchConfig(cname, ccci.Type)
if err != nil {
return err
}
chaincodeLogger.Debugf("start container: %s", cname)
chaincodeLogger.Debugf("start container with args: %s", strings.Join(lc.Args, " "))
chaincodeLogger.Debugf("start container with env:\n\t%s", strings.Join(lc.Envs, "\n\t"))
scr := container.StartContainerReq{
Builder: &container.PlatformBuilder{
Type: ccci.Type,
Name: ccci.Name,
Version: ccci.Version,
Path: ccci.Path,
CodePackage: codePackage,
PlatformRegistry: c.PlatformRegistry,
},
Args: lc.Args,
Env: lc.Envs,
FilesToUpload: lc.Files,
CCID: ccintf.CCID{
Name: ccci.Name,
Version: ccci.Version,
},
}
//启动Process--注意传入的容器类型
if err := c.Processor.Process(ccci.ContainerType, scr); err != nil {
return errors.WithMessage(err, "error starting container")
}
return nil
}
func (vmc *VMController) Process(vmtype string, req VMCReq) error {
v := vmc.newVM(vmtype)
ccid := req.GetCCID()
id := ccid.GetName()
vmc.lockContainer(id)
defer vmc.unlockContainer(id)
//启动虚拟机
return req.Do(v)
}
//到这里容器的实例化就进入到了接口的具体确定阶段,根据不同的类型来确定是SCC或ACC
func (vmc *VMController) newVM(typ string) VM {
v, ok := vmc.vmProviders[typ]
if !ok {
vmLogger.Panicf("Programming error: unsupported VM type: %s", typ)
}
return v.NewVM()
}
三、系统容器虚拟机的启动
1、容器的启动
在上面的Process中最后一行代码中req.Do(v),启动了相关的虚拟机容器。看一下这个接口的定义:
type VMCReq interface {
Do(v VM) error
GetCCID() ccintf.CCID
}
//StartContainerReq - properties for starting a container.
type StartContainerReq struct {
ccintf.CCID
Builder Builder
Args []string
Env []string
FilesToUpload map[string][]byte
}
//StopContainerReq - properties for stopping a container.
type StopContainerReq struct {
ccintf.CCID
Timeout uint
//by default we will kill the container after stopping
Dontkill bool
//by default we will remove the container after killing
Dontremove bool
}
func (w WaitContainerReq) Do(v VM) error {
exited := w.Exited
go func() {
exitCode, err := v.Wait(w.CCID)
exited(exitCode, err)
}()
return nil
}
其它mock部分就不列出来了,供测试使用的,有兴趣可以看看源码。再看一下实例的具体生成成,沿着上面的NewVM来看:
// NewVM creates an inproc VM instance
func (r *Registry) NewVM() container.VM {
return NewInprocVM(r)
}
// NewInprocVM creates a new InprocVM
func NewInprocVM(r *Registry) *InprocVM {
return &InprocVM{
registry: r,
}
}
这里只分析启动部分,其它和这个基本差不多:
func (si StartContainerReq) Do(v VM) error {
return v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder)
}
//Start starts a previously registered system codechain
func (vm *InprocVM) Start(ccid ccintf.CCID, args []string, env []string, filesToUpload map[string][]byte, builder container.Builder) error {
path := ccid.GetName()
ipctemplate := vm.registry.getType(path)
if ipctemplate == nil {
return fmt.Errorf(fmt.Sprintf("%s not registered", path))
}
instName := vm.GetVMName(ccid)
ipc, err := vm.getInstance(ipctemplate, instName, args, env)
if err != nil {
return fmt.Errorf(fmt.Sprintf("could not create instance for %s", instName))
}
if ipc.running {
return fmt.Errorf(fmt.Sprintf("chaincode running %s", path))
}
ipc.running = true
go func() {
defer func() {
if r := recover(); r != nil {
inprocLogger.Criticalf("caught panic from chaincode %s", instName)
}
}()
ipc.launchInProc(instName, args, env)
}()
return nil
}
这里面会判断是否生成了链码的进程,否则:
func (ipc *inprocContainer) launchInProc(id string, args []string, env []string) error {
if ipc.ChaincodeSupport == nil {
inprocLogger.Panicf("Chaincode support is nil, most likely you forgot to set it immediately after calling inproccontroller.NewRegsitry()")
}
//建立一个Send和一个Recv通道
peerRcvCCSend := make(chan *pb.ChaincodeMessage)
ccRcvPeerSend := make(chan *pb.ChaincodeMessage)
var err error
//链码侧通道Handler
ccchan := make(chan struct{}, 1)
//Peer侧通道Handler
ccsupportchan := make(chan struct{}, 1)
shimStartInProc := _shimStartInProc // shadow to avoid race in test
//链码侧相关
go func() {
defer close(ccchan)
inprocLogger.Debugf("chaincode started for %s", id)
if args == nil {
args = ipc.args
}
if env == nil {
env = ipc.env
}
err := shimStartInProc(env, args, ipc.chaincode, ccRcvPeerSend, peerRcvCCSend)
if err != nil {
err = fmt.Errorf("chaincode-support ended with err: %s", err)
_inprocLoggerErrorf("%s", err)
}
inprocLogger.Debugf("chaincode ended for %s with err: %s", id, err)
}()
// shadow function to avoid data race
inprocLoggerErrorf := _inprocLoggerErrorf
//Peer侧相关
go func() {
defer close(ccsupportchan)
inprocStream := newInProcStream(peerRcvCCSend, ccRcvPeerSend)
inprocLogger.Debugf("chaincode-support started for %s", id)
//消息处理
err := ipc.ChaincodeSupport.HandleChaincodeStream(inprocStream)
if err != nil {
err = fmt.Errorf("chaincode ended with err: %s", err)
inprocLoggerErrorf("%s", err)
}
inprocLogger.Debugf("chaincode-support ended for %s with err: %s", id, err)
}()
select {
case <-ccchan:
close(peerRcvCCSend)
inprocLogger.Debugf("chaincode %s quit", id)
case <-ccsupportchan:
close(ccRcvPeerSend)
inprocLogger.Debugf("chaincode support %s quit", id)
case <-ipc.stopChan:
close(ccRcvPeerSend)
close(peerRcvCCSend)
inprocLogger.Debugf("chaincode %s stopped", id)
}
return err
}
// StartInProc is an entry point for system chaincodes bootstrap. It is not an
// API for chaincodes.
func StartInProc(env []string, args []string, cc Chaincode, recv <-chan *pb.ChaincodeMessage, send chan<- *pb.ChaincodeMessage) error {
chaincodeLogger.Debugf("in proc %v", args)
var chaincodename string
for _, v := range env {
if strings.Index(v, "CORE_CHAINCODE_ID_NAME=") == 0 {
p := strings.SplitAfter(v, "CORE_CHAINCODE_ID_NAME=")
chaincodename = p[1]
break
}
}
if chaincodename == "" {
return errors.New("error chaincode id not provided")
}
stream := newInProcStream(recv, send)
chaincodeLogger.Debugf("starting chat with peer using name=%s", chaincodename)
//看看这是谁,Handler消息处理就在这个函数里,前面分析过,这里就不再赘述
err := chatWithPeer(chaincodename, stream, cc)
return err
}
系统链码直接启动了内存的虚拟机。只有用户链码才会启动Docker,在内部运行虚拟机。所以这二者才分在类的生成中从同一接口继承但分成了两个类。特别需要注意的是要看看上面对Peer侧和链码侧的消息处理的生成过程,这个非常重要。代码里有直接注释,感兴趣可以把代码跟到底看看到底是如何生成的。
2、消息的传递
看到了chatWithPeer,就想到了handleMessage,这个在前面有详细分析,如果有什么不明白可以看看“链码源码分析”。下面只列出代码:
// handleMessage message handles loop for shim side of chaincode/peer stream.
func (handler *Handler) handleMessage(msg *pb.ChaincodeMessage, errc chan error) error {
if msg.Type == pb.ChaincodeMessage_KEEPALIVE {
chaincodeLogger.Debug("Sending KEEPALIVE response")
handler.serialSendAsync(msg, nil) // ignore errors, maybe next KEEPALIVE will work
return nil
}
chaincodeLogger.Debugf("[%s] Handling ChaincodeMessage of type: %s(state:%s)", shorttxid(msg.Txid), msg.Type, handler.state)
var err error
switch handler.state {
case ready:
err = handler.handleReady(msg, errc)
case established:
err = handler.handleEstablished(msg, errc)
case created:
err = handler.handleCreated(msg, errc)
default:
err = errors.Errorf("[%s] Chaincode handler cannot handle message (%s) with payload size (%d) while in state: %s", msg.Txid, msg.Type, len(msg.Payload), handler.state)
}
if err != nil {
payload := []byte(err.Error())
errorMsg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
handler.serialSend(errorMsg)
return err
}
return nil
}
四、用户容器虚拟机的启动
1、虚拟机的启动
// NewVM creates a new DockerVM instance
func (p *Provider) NewVM() container.VM {
return NewDockerVM(p.PeerID, p.NetworkID, p.BuildMetrics)
}
// NewDockerVM returns a new DockerVM instance
func NewDockerVM(peerID, networkID string, buildMetrics *BuildMetrics) *DockerVM {
return &DockerVM{
PeerID: peerID,
NetworkID: networkID,
getClientFnc: getDockerClient,
BuildMetrics: buildMetrics,
}
}
// Start starts a container using a previously created docker image
func (vm *DockerVM) Start(ccid ccintf.CCID, args, env []string, filesToUpload map[string][]byte, builder container.Builder) error {
imageName, err := vm.GetVMNameForDocker(ccid)
if err != nil {
return err
}
attachStdout := viper.GetBool("vm.docker.attachStdout")
containerName := vm.GetVMName(ccid)
logger := dockerLogger.With("imageName", imageName, "containerName", containerName)
//通过VM获得客户端
client, err := vm.getClientFnc()
if err != nil {
logger.Debugf("failed to get docker client", "error", err)
return err
}
//停止容器和虚拟机
vm.stopInternal(client, containerName, 0, false, false)
// 此处创建容器
err = vm.createContainer(client, imageName, containerName, args, env, attachStdout)
if err == docker.ErrNoSuchImage {
//如果没有镜像,则使用builder来创建相关容器
reader, err := builder.Build()
if err != nil {
return errors.Wrapf(err, "failed to generate Dockerfile to build %s", containerName)
}
//部署镜像
err = vm.deployImage(client, ccid, reader)
if err != nil {
return err
}
//创建镜像后,再创建容器
err = vm.createContainer(client, imageName, containerName, args, env, attachStdout)
if err != nil {
logger.Errorf("failed to create container: %s", err)
return err
}
} else if err != nil {
logger.Errorf("create container failed: %s", err)
return err
}
// stream stdout and stderr to chaincode logger
if attachStdout {
containerLogger := flogging.MustGetLogger("peer.chaincode." + containerName)
streamOutput(dockerLogger, client, containerName, containerLogger)
}
// upload specified files to the container before starting it
// this can be used for configurations such as TLS key and certs
//处理容器需要的证书相关的文件
if len(filesToUpload) != 0 {
// the docker upload API takes a tar file, so we need to first
// consolidate the file entries to a tar
payload := bytes.NewBuffer(nil)
gw := gzip.NewWriter(payload)
tw := tar.NewWriter(gw)
for path, fileToUpload := range filesToUpload {
cutil.WriteBytesToPackage(path, fileToUpload, tw)
}
// Write the tar file out
if err := tw.Close(); err != nil {
return fmt.Errorf("Error writing files to upload to Docker instance into a temporary tar blob: %s", err)
}
gw.Close()
//上传数据
err := client.UploadToContainer(containerName, docker.UploadToContainerOptions{
InputStream: bytes.NewReader(payload.Bytes()),
Path: "/",
NoOverwriteDirNonDir: false,
})
if err != nil {
return fmt.Errorf("Error uploading files to the container instance %s: %s", containerName, err)
}
}
// start container with HostConfig was deprecated since v1.10 and removed in v1.2
err = client.StartContainer(containerName, nil)
if err != nil {
dockerLogger.Errorf("start-could not start container: %s", err)
return err
}
dockerLogger.Debugf("Started container %s", containerName)
return nil
}
看一下创建容器的代码:
func (vm *DockerVM) createContainer(client dockerClient, imageID, containerID string, args, env []string, attachStdout bool) error {
logger := dockerLogger.With("imageID", imageID, "containerID", containerID)
logger.Debugw("create container")
_, err := client.CreateContainer(docker.CreateContainerOptions{
Name: containerID,
Config: &docker.Config{
Cmd: args,
Image: imageID,
Env: env,
AttachStdout: attachStdout,
AttachStderr: attachStdout,
},
HostConfig: getDockerHostConfig(),
})
if err != nil {
return err
}
logger.Debugw("created container")
return nil
}
// See https://goo.gl/tyzwVM for more details.
func (c *Client) CreateContainer(opts CreateContainerOptions) (*Container, error) {
path := "/containers/create?" + queryString(opts)
resp, err := c.do(
"POST",
path,
doOptions{
data: struct {
*Config
HostConfig *HostConfig `json:"HostConfig,omitempty" yaml:"HostConfig,omitempty" toml:"HostConfig,omitempty"`
NetworkingConfig *NetworkingConfig `json:"NetworkingConfig,omitempty" yaml:"NetworkingConfig,omitempty" toml:"NetworkingConfig,omitempty"`
}{
opts.Config,
opts.HostConfig,
opts.NetworkingConfig,
},
context: opts.Context,
},
)
if e, ok := err.(*Error); ok {
if e.Status == http.StatusNotFound {
return nil, ErrNoSuchImage
}
if e.Status == http.StatusConflict {
return nil, ErrContainerAlreadyExists
}
// Workaround for 17.09 bug returning 400 instead of 409.
// See https://github.com/moby/moby/issues/35021
if e.Status == http.StatusBadRequest && strings.Contains(e.Message, "Conflict.") {
return nil, ErrContainerAlreadyExists
}
}
if err != nil {
return nil, err
}
defer resp.Body.Close()
var container Container
if err := json.NewDecoder(resp.Body).Decode(&container); err != nil {
return nil, err
}
container.Name = opts.Name
return &container, nil
}
func (c *Client) startContainer(id string, hostConfig *HostConfig, opts doOptions) error {
path := "/containers/" + id + "/start"
if c.serverAPIVersion == nil {
c.checkAPIVersion()
}
if c.serverAPIVersion != nil && c.serverAPIVersion.LessThan(apiVersion124) {
opts.data = hostConfig
opts.forceJSON = true
}
resp, err := c.do("POST", path, opts)
if err != nil {
if e, ok := err.(*Error); ok && e.Status == http.StatusNotFound {
return &NoSuchContainer{ID: id, Err: err}
}
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotModified {
return &ContainerAlreadyRunning{ID: id}
}
return nil
}
2、消息的传递
在chaincode.go的shim包启动时(用户链码启动时)的Start函数中调用了userChaincodeStreamGetter–>chaincodeSupportClient.Register,而chaindcode_support.go中的Register实现了
(protos/peer/chaincode_shim.pb.go)
// ChaincodeSupportServer is the server API for ChaincodeSupport service.
type ChaincodeSupportServer interface {
Register(ChaincodeSupport_RegisterServer) error
}
即Register调用HandleChaincodeStream,再调用ProcessStream,在其中的默认选项中调用handleMessage,又回到了Peer侧。
而在前面的链码源码分析中已经分析过,一个用户的链码启动是以在链码的main函数中调用 shim.Start()为开始的,此函数数最终会调用chatWithPeer函数,其中的默认项为调用handleMessage,开始链码铡的消息循环。这样二者再按照前面提到的通过GRPC互相发送消息,就可以展开一个用户链码和Peer侧的通信了.
五、总结
通过分析两类链码容器和执行情况,基本上就明白了链码源码执行的环境,这正是对以前的“链码源码分析”的进一步完善。结合这两篇文章基本上就明白了,链码在Fabric上执行的看哪个流程和方式。掌握了链码执行的原理和运行的过程,就可以针对实际情况对其做为相应的优化和修改,从为我所用到我想我用。
推荐一下阿里朋友的PerfMa社区的课程,都是干货:
更多推荐
所有评论(0)