heatbeat

心跳处理的必要性:
服务端需要同时处理上千甚至上万的客户端的连接,所以每个连接资源都是很宝贵的,当客户端断开连接的时候服务端应该及时移除该连接。
正常情况下,客户端断开连接的时候,会和服务端进行四次挥手,服务端就会知道这个连接 已经不能用了优雅的退出监听消息。但是总会有意外,比如客户端忽然断网了,没电了,这个时候客户端肯定不可能按照流程和 服务端进行挥手,不知道消息的 服务端还傻傻的在哪儿等着,不知道客户端早就走了。
这个时候 心跳包就很完美的解决了此问题。客户端和服务端约定好每隔一段时间 就会发消息,如果服务端每过一段时间没有收到客户端 的心跳消息 就说明 客户端出事了,服务端就删除此连接,确保 资源最大化。
一般心跳包就是 符合该协议的 最小包。

一般的go语言框架都是一个连接单独开一个协程 去处理读取超时问题
当协程数量多到一定程度的时候,协程之间的调度也是一个很大的 消耗
一个协程 进行 心跳检测。



. 每当一个连接成功接收到消息的时候,就在该连接对象 上设置当前时间戳。来保存最近一次接收消息的时间
. 在框架启动的时候就开启一个协程 ,每隔一段时间就遍历 当前所有连接对象 ,如果当前时间 减去 连接对象里的最近接收时间 超过心跳时间,就说明 该连接 已经死了就执行删除



在多客户端同时访问服务器的工作模式下,首先要保证服务器的运行正常。因此,Server和Client建立通讯后,确保连接的及时断开就非常重要。否则,多个客户端长时间占用着连接不关闭,是非常可怕的服务器资源浪费。会使得服务器可服务的客户端数量大幅度减少。



因此,针对短链接和长连接,根据业务的需求,配套不同的处理机制。



短连接



一般建立完连接,就立刻传输数据。传输完数据,连接就关闭。服务端根据需要,设定连接的时长。超过时间长度,就算客户端超时。立刻关闭连接。


长连接



建立连接后,传输数据,然后要保持连接,然后再次传输数据。直到连接关闭。


socket读写可以通过 SetDeadline、SetReadDeadline、SetWriteDeadline设置阻塞的时间。



如果做短连接,直接在Server端的连接上设置SetReadDeadline。当你设置的时限到达,无论客户端是否还在继续传递消息,服务端都不会再接收。并且已经关闭连接。


根据业务需要,客户端可能需要长时间保持连接。但是服务端不能无限制的保持。这就需要一个机制,如果超过某个时间长度,服务端没有获得客户端的数据,就判定客户端已经不需要连接了(比如客户端挂掉了)。



做到这个,需要一个心跳机制。在限定的时间内,客户端给服务端发送一个指定的消息,以便服务端知道客户端还活着。



func sender(conn *net.TCPConn) {
for i := 0; i < 10; i++{
words := strconv.Itoa(i)+” Hello I’m MyHeartbeat Client.”
msg, err := conn.Write([]byte(words))
if err != nil {
Log(conn.RemoteAddr().String(), “Fatal error: “, err)
os.Exit(1)
}
Log(“服务端接收了”, msg)
time.Sleep(2 * time.Second)
}
for i := 0; i < 2 ; i++ {
time.Sleep(12 * time.Second)
}
for i := 0; i < 10; i++{
words := strconv.Itoa(i)+” Hi I’m MyHeartbeat Client.”
msg, err := conn.Write([]byte(words))
if err != nil {
Log(conn.RemoteAddr().String(), “Fatal error: “, err)
os.Exit(1)
}
Log(“服务端接收了”, msg)
time.Sleep(2 * time.Second)
}



}



这段客户端代码,实现了两个相同的信息发送频率给服务端。两个频率中间,我们让运行休息了12秒。然后,我们在服务端的对应机制是这样的。
func HeartBeating(conn net.Conn, bytes chan byte, timeout int) {
select {
case fk := <- bytes:
Log(conn.RemoteAddr().String(), “心跳:第”, string(fk), “times”)
conn.SetDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
break



    case <- time.After(5 * time.Second):
Log("conn dead now")
conn.Close()
} }


每次接收到心跳数据就 SetDeadline 延长一个时间段 timeout。如果没有接到心跳数据,5秒后连接关闭。



心跳机制
 client每隔几分钟发送一个固定信息给服务端,服务端收到后回复一个固定信息如果服务端几分钟内没有收到客户端信息则视客户端断开。发包方可以是客户也可以是服务端..
 心跳包之所以叫心跳包是因为:它像心跳一样每隔固定时间发一次,以此来告诉服务器,这个客户端还活着。事实上这是为了保持长连接,至于这个包的内容,是没有什么特别规定的,不过一般都是很小的包,或者只包含包头的一个空包。心跳包主要也就是用于长连接的保活和断线处理。一般的应用下,判定时间在30-40秒比较不错。如果实在要求高,那就在6-9秒。



package main
import (
“encoding/json”
“errors”
“flag”
“fmt”
“io/ioutil”
“log”
“net”
“os”
“strings”
“time”
)
// 镜像结构
type Image struct {
Created uint64
Id string
ParentId string
RepoTags []string
Size uint64
VirtualSize uint64
}
// 容器结构
type Container struct {
Id string json:"Id"
Names []string json:"Names"
Image string json:"Image"
ImageID string json:"ImageID"
Command string json:"Command"
Created uint64 json:"Created"
State string json:"State"
Status string json:"Status"
Ports []Port json:"Ports"
Labels map[string]string json:"Labels"
HostConfig map[string]string json:"HostConfig"
NetworkSettings map[string]interface{} json:"NetworkSettings"
Mounts []Mount json:"Mounts"
}
// docker 端口映射
type Port struct {
IP string json:"IP"
PrivatePort int json:"PrivatePort"
PublicPort int json:"PublicPort"
Type string json:"Type"
}
// docker 挂载
type Mount struct {
Type string json:"Type"
Source string json:"Source"
Destination string json:"Destination"
Mode string json:"Mode"
RW bool json:"RW"
Propatation string json:"Propagation"
}
// 连接列表
var SockAddr = “/var/run//docker.sock” //这可不是随便写的,是docker官网文档的套接字默认值,当然守护进程通讯方式还有tcp,fd等方式,各自都有适用场景。。。
var imagesSock = “GET /images/json HTTP/1.0\r\n\r\n” //docker对外的镜像api操作
var containerSock = “GET /containers/json?all=true HTTP/1.0\r\n\r\n” //docker对外的容器查看api
var startContainerSock = “POST /containers/%s/start HTTP/1.0\r\n\r\n” //docker对外的容器启动api
// 白名单
var whiteList []string
func main() {
// 读取命令行参数
// 白名单列表
list := flag.String(“list”, “”, “docker white list to restart, eg: token,explorer”)
// 轮询的时间间隔,单位秒
times := flag.Int64(“time”, 10, “time interval to set read docker containers [second], default is 10 second”)
flag.Parse()
// 解析list => whiteList
whiteList = strings.Split(list, “,”) //将我们命令行中list参数的容器列表解析到代码中
log.SetOutput(os.Stdout)
log.Println(“start docker watching…”)
log.Printf(“Your whiteList: %v\n”, *list)
log.Printf(“Your shedule times: %ds\n”, *times)
//接下来的这个for循环就是每隔一定时间监控docker容器是否正常运行,不正常就重新启动它
for {
// 轮询docker
err := listenDocker()
if err != nil {
log.Println(err.Error())
}
time.Sleep(time.Duration(
times)time.Second)
}
}
func listenDocker() error {
// 获取容器列表,拿到所有的容器信息
containers, err := readContainer()
if err != nil {
return errors.New(“read container error: “ + err.Error())
}
// 先遍历白名单快,次数少
for _, name := range whiteList {
Name:
for _, container := range containers {
for _, cname := range container.Names {
// 如果匹配到白名单
if cname[1:] == name {
// 关心一下容器状态
log.Printf(“id=%s, name=%s, state=%s”, container.Id[:12], container.Names, container.Status)
if strings.Contains(container.Status, “Exited”) {
// 如果出现异常退出的容器,启动它
log.Printf(“find container: [%s] has exited, ready to start it. “, name)
e := startContainer(container.Id)
if e != nil {
log.Println(“start container error: “, e.Error())
}
break Name
}
}
}
}
}
return nil
}
// 获取 unix sock 连接
func connectDocker() (
net.UnixConn, error) {
addr := net.UnixAddr{SockAddr, “unix”} // SockAddr 这个变量的值被设定为docker的/var/run/docker 套接字路径值,也就是说此处就是拨通与docker的daemon通讯建立的关键处,其他处的代码就是些正常的逻辑处理了
return net.DialUnix(“unix”, nil, &addr)
}
// 启动容器
func startContainer(id string) error {
conn, err := connectDocker()
if err != nil {
return errors.New(“connect error: “ + err.Error())
}
start := fmt.Sprintf(startContainerSock, id)
fmt.Println(start)
cmd := []byte(start)
code, err := conn.Write(cmd)
if err != nil {
return err
}
log.Println(“start container response code: “, code)
// 启动容器等待20秒,防止数据重发
time.Sleep(20time.Second)
return nil
}
// 获取容器列表
func readContainer() ([]Container, error) {
conn, err := connectDocker() //建立一个unix连接,这其实是一个关键点,需要你了解unix 套接字 建立连接
if err != nil {
return nil, errors.New(“connect error: “ + err.Error())
}
_, err = conn.Write([]byte(containerSock))
if err != nil {
return nil, err
}
result, err := ioutil.ReadAll(conn)
if err != nil {
return nil, err
}
body := getBody(result)
var containers []Container
err = json.Unmarshal(body, &containers)
if err != nil {
return nil, err
}
log.Println(“len of containers: “, len(containers))
if len(containers) == 0 {
return nil, errors.New(“no containers”)
}
return containers, nil
}
// 获取镜像列表
func readImage(conn *net.UnixConn) ([]Image, error) {
_, err := conn.Write([]byte(imagesSock))
if err != nil {
return nil, err
}
result, err := ioutil.ReadAll(conn)
if err != nil {
return nil, err
}
body := getBody(result[:])
var images []Image
err = json.Unmarshal(body, &images)
if err != nil {
return nil, err
}
return images, nil
}
// 从返回的 http 响应中提取 body
func getBody(result []byte) (body []byte) {
for i:=0; i<=len(result)-4; i++ {
if result[i] == 13 && result[i+1] == 10 && result[i+2] == 13 && result[i+3] == 10 {
body = result[i+4:]
break
}
}
return
}
/

error log :
1、write unix @->/var/run/docker.sock: write: broken pipe
建立的tcp连接不能复用,每次操作都建立连接
*/
使用方法



1.编译



go build -o main main.go



2.linux下直接当可执行文件执行便可



./main -list=”容器名称1,容器名称2…”



思路分析:



原来docker这个软件对外是提供了一些列api用来管理容器的增删该查的 官方api文档 ,既然提供了api了那么任何语言都能实现对其的管理控制及动态部署了。



但其实这里面真要弄明白还是有很多话要讲了



docker这个服务已经已进程的形式运行在linux的系统中了,为什么我们输入docker有关的命令能够与之交互,这好像是一个习以为常的行为,貌似理应如此,但是要知道我们是在与一个正在运行的进程发生通讯,若仍不以为然,请接以下问:



1.进程间都是如何通讯的? 进程通讯间方式



在明白了进程之间的通讯方式之后,我明白了docker的这个daemon通讯原理,瞬间就打通了之前对k8管理docker的疑惑(老实讲只知道kubernetes很强大,却没想明白它是如何能动态增容我的容器配置,负载等等等),套接字(socket) /var/run/docker 这个我们使用起来不会接触到,理解起来却必须打通的关键点请务必了解它。



Category golang