标签 并发 下的文章

仅用大约 65 行代码,开发一个用于生成随机数、支持并发的 TCP 服务端。

TCP 和 UDP 服务端随处可见,它们基于 TCP/IP 协议栈,通过网络为客户端提供服务。在这篇文章中,我将介绍如何使用 Go 语言 开发一个用于返回随机数、支持并发的 TCP 服务端。对于每一个来自 TCP 客户端的连接,它都会启动一个新的 goroutine(轻量级线程)来处理相应的请求。

你可以在 GitHub 上找到本项目的源码:concTcp.go

处理 TCP 连接

这个程序的主要逻辑在 handleConnection() 函数中,具体实现如下:

func handleConnection(c net.Conn) {
        fmt.Printf("Serving %s\n", c.RemoteAddr().String())
        for {
                netData, err := bufio.NewReader(c).ReadString('\n')
                if err != nil {
                        fmt.Println(err)
                        return
                }

                temp := strings.TrimSpace(string(netData))
                if temp == "STOP" {
                        break
                }

                result := strconv.Itoa(random()) + "\n"
                c.Write([]byte(string(result)))
        }
        c.Close()
}

如果 TCP 客户端发送了一个 “STOP” 字符串,为它提供服务的 goroutine 就会终止;否则,TCP 服务端就会返回一个随机数给它。只要客户端不主动终止,服务端就会一直提供服务,这是由 for 循环保证的。具体来说,for 循环中的代码使用了 bufio.NewReader(c).ReadString('\n') 来逐行读取客户端发来的数据,并使用 c.Write([]byte(string(result))) 来返回数据(生成的随机数)。你可以在 Go 的 net 标准包 文档 中了解更多。

支持并发

main() 函数的实现部分,每当 TCP 服务端收到 TCP 客户端的连接请求,它都会启动一个新的 goroutine 来为这个请求提供服务。

func main() {
        arguments := os.Args
        if len(arguments) == 1 {
                fmt.Println("Please provide a port number!")
                return
        }

        PORT := ":" + arguments[1]
        l, err := net.Listen("tcp4", PORT)
        if err != nil {
                fmt.Println(err)
                return
        }
        defer l.Close()
        rand.Seed(time.Now().Unix())

        for {
                c, err := l.Accept()
                if err != nil {
                        fmt.Println(err)
                        return
                }
                go handleConnection(c)
        }
}

首先,main() 确保程序至少有一个命令行参数。注意,现有代码并没有检查这个参数是否为有效的 TCP 端口号。不过,如果它是一个无效的 TCP 端口号,net.Listen() 就会调用失败,并返回一个错误信息,类似下面这样:

$ go run concTCP.go 12a
listen tcp4: lookup tcp4/12a: nodename nor servname provided, or not known
$ go run concTCP.go -10
listen tcp4: address -10: invalid port

net.Listen() 函数用于告诉 Go 接受网络连接,因而承担了服务端的角色。它的返回值类型是 net.Conn,后者实现了 io.Readerio.Writer 接口。此外,main() 函数中还调用了 rand.Seed() 函数,用于初始化随机数生成器。最后,for 循环允许程序一直使用 Accept() 函数来接受 TCP 客户端的连接请求,并以 goroutine 的方式来运行 handleConnection(c) 函数,处理客户端的后续请求。

net.Listen() 的第一个参数

net.Listen() 函数的第一个参数定义了使用的网络类型,而第二个参数定义了服务端监听的地址和端口号。第一个参数的有效值为 tcptcp4tcp6udpudp4udp6ipip4ip6Unix(Unix 套接字)、UnixgramUnixpacket,其中:tcp4udp4ip4 只接受 IPv4 地址,而 tcp6udp6ip6 只接受 IPv6 地址。

服务端并发测试

concTCP.go 需要一个命令行参数,来指定监听的端口号。当它开始服务 TCP 客户端时,你会得到类似下面的输出:

$ go run concTCP.go 8001
Serving 127.0.0.1:62554
Serving 127.0.0.1:62556

netstat 的输出可以确认 congTCP.go 正在为多个 TCP 客户端提供服务,并且仍在继续监听建立连接的请求:

$ netstat -anp TCP | grep 8001
tcp4       0      0  127.0.0.1.8001         127.0.0.1.62556        ESTABLISHED
tcp4       0      0  127.0.0.1.62556        127.0.0.1.8001         ESTABLISHED
tcp4       0      0  127.0.0.1.8001         127.0.0.1.62554        ESTABLISHED
tcp4       0      0  127.0.0.1.62554        127.0.0.1.8001         ESTABLISHED
tcp4       0      0  *.8001                 *.*                    LISTEN

在上面输出中,最后一行显示了有一个进程正在监听 8001 端口,这意味着你可以继续连接 TCP 的 8001 端口。第一行和第二行显示了有一个已建立的 TCP 网络连接,它占用了 8001 和 62556 端口。相似地,第三行和第四行显示了有另一个已建立的 TCP 连接,它占用了 8001 和 62554 端口。

下面这张图片显示了 concTCP.go 在服务多个 TCP 客户端时的输出:

concTCP.go TCP 服务端测试

类似地,下面这张图片显示了两个 TCP 客户端的输出(使用了 nc 工具):

是用 nc 工具作为 concTCP.go 的 TCP 客户端

你可以在 维基百科 上找到更多关于 nc(即 netcat)的信息。

总结

现在,你学会了如何用大约 65 行 Go 代码来开发一个生成随机数、支持并发的 TCP 服务端,这真是太棒了!如果你想要让你的 TCP 服务端执行别的任务,只需要修改 handleConnection() 函数即可。


via: https://opensource.com/article/18/5/building-concurrent-tcp-server-go

作者:Mihalis Tsoukalos 选题:lkxed 译者:lkxed 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

通过学习如何定位并发处理的陷阱来避免未来处理这些问题时的困境。

在复杂的分布式系统进行任务处理时,你通常会需要进行并发的操作。在 Mode.net 公司,我们每天都要和实时、快速和灵活的软件打交道。而没有一个高度并发的系统,就不可能构建一个毫秒级的动态地路由数据包的全球专用网络。这个动态路由是基于网络状态的,尽管这个过程需要考虑众多因素,但我们的重点是链路指标。在我们的环境中,链路指标可以是任何跟网络链接的状态和当前属性(如链接延迟)有关的任何内容。

并发探测链接监控

我们的动态路由算法 H.A.L.O. 逐跳自适应链路状态最佳路由 Hop-by-Hop Adaptive Link-State Optimal Routing )部分依赖于链路指标来计算路由表。这些指标由位于每个 PoP( 存活节点 Point of Presence )上的独立组件收集。PoP 是表示我们的网络中单个路由实体的机器,通过链路连接并分布在我们的网络拓扑中的各个位置。某个组件使用网络数据包探测周围的机器,周围的机器回复数据包给前者。从接收到的探测包中可以获得链路延迟。由于每个 PoP 都有不止一个临近节点,所以这种探测任务实质上是并发的:我们需要实时测量每个临近连接点的延迟。我们不能串行地处理;为了计算这个指标,必须尽快处理每个探测。

 title=

序列号和重置:一个重新排列场景

我们的探测组件互相发送和接收数据包,并依靠序列号进行数据包处理。这旨在避免处理重复的包或顺序被打乱的包。我们的第一个实现依靠特殊的序列号 0 来重置序列号。这个数字仅在组件初始化时使用。主要的问题是我们考虑了递增的序列号总是从 0 开始。在该组件重启后,包的顺序可能会重新排列,某个包的序列号可能会轻易地被替换成重置之前使用过的值。这意味着,后继的包都会被忽略掉,直到排到重置之前用到的序列值。

UDP 握手和有限状态机

这里的问题是该组件重启前后的序列号是否一致。有几种方法可以解决这个问题,经过讨论,我们选择了实现一个带有清晰状态定义的三步握手协议。这个握手过程在初始化时通过链接建立会话。这样可以确保节点通过同一个会话进行通信且使用了适当的序列号。

为了正确实现这个过程,我们必须定义一个有清晰状态和过渡的有限状态机。这样我们就可以正确管理握手过程中的所有极端情况。

 title=

会话 ID 由握手的初始化程序生成。一个完整的交换顺序如下:

  1. 发送者发送一个 SYN(ID) 数据包。
  2. 接收者存储接收到的 ID 并发送一个 SYN-ACK(ID)
  3. 发送者接收到 SYN-ACK(ID) 并发送一个 ACK(ID)。它还发送一个从序列号 0 开始的数据包。
  4. 接收者检查最后接收到的 ID,如果 ID 匹配,则接受 ACK(ID)。它还开始接受序列号为 0 的数据包。

处理状态超时

基本上,每种状态下你都需要处理最多三种类型的事件:链接事件、数据包事件和超时事件。这些事件会并发地出现,因此你必须正确处理并发。

  • 链接事件包括网络连接或网络断开的变化,相应的初始化一个链接会话或断开一个已建立的会话。
  • 数据包事件是控制数据包(SYN/SYN-ACK/ACK)或只是探测响应。
  • 超时事件在当前会话状态的预定超时时间到期后触发。

这里面临的最主要的问题是如何处理并发的超时到期和其他事件。这里很容易陷入死锁和资源竞争的陷阱。

第一种方法

本项目使用的语言是 Golang。它确实提供了原生的同步机制,如自带的通道和锁,并且能够使用轻量级线程来进行并发处理。

 title=

gopher 们聚众狂欢

首先,你可以设计两个分别表示我们的会话和超时处理程序的结构体。

type Session struct {  
  State SessionState  
  Id SessionId  
  RemoteIp string  
}

type TimeoutHandler struct {  
  callback func(Session)  
  session Session  
  duration int  
  timer *timer.Timer  
}

Session 标识连接会话,内有表示会话 ID、临近的连接点的 IP 和当前会话状态的字段。

TimeoutHandler 包含回调函数、对应的会话、持续时间和指向调度计时器的指针。

每一个临近连接点的会话都包含一个保存调度 TimeoutHandler 的全局映射。

SessionTimeout map[Session]*TimeoutHandler

下面方法注册和取消超时:

// schedules the timeout callback function.  
func (timeout* TimeoutHandler) Register() {  
  timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() {  
    timeout.callback(timeout.session)  
  })  
}

func (timeout* TimeoutHandler) Cancel() {  
  if timeout.timer == nil {  
    return  
  }  
  timeout.timer.Stop()  
}

你可以使用类似下面的方法来创建和存储超时:

func CreateTimeoutHandler(callback func(Session), session Session, duration int) *TimeoutHandler {  
  if sessionTimeout[session] == nil {  
    sessionTimeout[session] := new(TimeoutHandler)  
  }  
   
  timeout = sessionTimeout[session]  
  timeout.session = session  
  timeout.callback = callback  
  timeout.duration = duration  
  return timeout  
}

超时处理程序创建后,会在经过了设置的 duration 时间(秒)后执行回调函数。然而,有些事件会使你重新调度一个超时处理程序(与 SYN 状态时的处理一样,每 3 秒一次)。

为此,你可以让回调函数重新调度一次超时:

func synCallback(session Session) {  
  sendSynPacket(session)

  // reschedules the same callback.  
  newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION)  
  newTimeout.Register()

  sessionTimeout[state] = newTimeout  
}

这次回调在新的超时处理程序中重新调度自己,并更新全局映射 sessionTimeout

数据竞争和引用

你的解决方案已经有了。可以通过检查计时器到期后超时回调是否执行来进行一个简单的测试。为此,注册一个超时,休眠 duration 秒,然后检查是否执行了回调的处理。执行这个测试后,最好取消预定的超时时间(因为它会重新调度),这样才不会在下次测试时产生副作用。

令人惊讶的是,这个简单的测试发现了这个解决方案中的一个问题。使用 cancel 方法来取消超时并没有正确处理。以下顺序的事件会导致数据资源竞争:

  1. 你有一个已调度的超时处理程序。
  2. 线程 1:

    1. 你接收到一个控制数据包,现在你要取消已注册的超时并切换到下一个会话状态(如发送 SYN 后接收到一个 SYN-ACK
    2. 你调用了 timeout.Cancel(),这个函数调用了 timer.Stop()。(请注意,Golang 计时器的停止不会终止一个已过期的计时器。)
  3. 线程 2:

    1. 在取消调用之前,计时器已过期,回调即将执行。
    2. 执行回调,它调度一次新的超时并更新全局映射。
  4. 线程 1:

    1. 切换到新的会话状态并注册新的超时,更新全局映射。

两个线程并发地更新超时映射。最终结果是你无法取消注册的超时,然后你也会丢失对线程 2 重新调度的超时的引用。这导致处理程序在一段时间内持续执行和重新调度,出现非预期行为。

锁也解决不了问题

使用锁也不能完全解决问题。如果你在处理所有事件和执行回调之前加锁,它仍然不能阻止一个过期的回调运行:

func (timeout* TimeoutHandler) Register() {  
  timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time._Second_, func() {  
    stateLock.Lock()  
    defer stateLock.Unlock()

    timeout.callback(timeout.session)  
  })  
}

现在的区别就是全局映射的更新是同步的,但是这还是不能阻止在你调用 timeout.Cancel() 后回调的执行 —— 这种情况出现在调度计时器过期了但是还没有拿到锁的时候。你还是会丢失一个已注册的超时的引用。

使用取消通道

你可以使用取消通道,而不必依赖不能阻止到期的计时器执行的 golang 函数 timer.Stop()

这是一个略有不同的方法。现在你可以不用再通过回调进行递归地重新调度;而是注册一个死循环,这个循环接收到取消信号或超时事件时终止。

新的 Register() 产生一个新的 go 线程,这个线程在超时后执行你的回调,并在前一个超时执行后调度新的超时。返回给调用方一个取消通道,用来控制循环的终止。

func (timeout *TimeoutHandler) Register() chan struct{} {  
  cancelChan := make(chan struct{})  
   
  go func () {  
    select {  
    case _ = <- cancelChan:  
      return  
    case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):  
      func () {  
        stateLock.Lock()  
        defer stateLock.Unlock()

        timeout.callback(timeout.session)  
      } ()  
    }  
  } ()

  return cancelChan  
}

func (timeout* TimeoutHandler) Cancel() {  
  if timeout.cancelChan == nil {  
    return  
  }  
  timeout.cancelChan <- struct{}{}  
}

这个方法给你注册的所有超时提供了取消通道。一个取消调用向通道发送一个空结构体并触发取消操作。然而,这并不能解决前面的问题;可能在你通过通道取消之前以及超时线程拿到锁之前,超时时间就已经到了。

这里的解决方案是,在拿到锁之后,检查一下超时范围内的取消通道。

  case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):  
    func () {  
      stateLock.Lock()  
      defer stateLock.Unlock()  
     
      select {  
      case _ = <- handler.cancelChan:  
        return  
      default:  
        timeout.callback(timeout.session)  
      }  
    } ()  
  }

最终,这可以确保在拿到锁之后执行回调,不会触发取消操作。

小心死锁

这个解决方案看起来有效;但是还是有个隐患:死锁

请阅读上面的代码,试着自己找到它。考虑下描述的所有函数的并发调用。

这里的问题在取消通道本身。我们创建的是无缓冲通道,即发送的是阻塞调用。当你在一个超时处理程序中调用取消函数时,只有在该处理程序被取消后才能继续处理。问题出现在,当你有多个调用请求到同一个取消通道时,这时一个取消请求只被处理一次。当多个事件同时取消同一个超时处理程序时,如连接断开或控制包事件,很容易出现这种情况。这会导致死锁,可能会使应用程序停机。

 title=

有人在听吗?

(已获得 Trevor Forrey 授权。)

这里的解决方案是创建通道时指定缓存大小至少为 1,这样向通道发送数据就不会阻塞,也显式地使发送变成非阻塞的,避免了并发调用。这样可以确保取消操作只发送一次,并且不会阻塞后续的取消调用。

func (timeout* TimeoutHandler) Cancel() {  
  if timeout.cancelChan == nil {  
    return  
  }  
   
  select {  
  case timeout.cancelChan <- struct{}{}:  
  default:  
    // can’t send on the channel, someone has already requested the cancellation.  
  }  
}

总结

在实践中你学到了并发操作时出现的常见错误。由于其不确定性,即使进行大量的测试,也不容易发现这些问题。下面是我们在最初的实现中遇到的三个主要问题:

在非同步的情况下更新共享数据

这似乎是个很明显的问题,但如果并发更新发生在不同的位置,就很难发现。结果就是数据竞争,由于一个更新会覆盖另一个,因此对同一数据的多次更新中会有某些更新丢失。在我们的案例中,我们是在同时更新同一个共享映射里的调度超时引用。(有趣的是,如果 Go 检测到在同一个映射对象上的并发读写,会抛出致命错误 — 你可以尝试下运行 Go 的数据竞争检测器)。这最终会导致丢失超时引用,且无法取消给定的超时。当有必要时,永远不要忘记使用锁。

 title=

不要忘记同步 gopher 们的工作

缺少条件检查

在不能仅依赖锁的独占性的情况下,就需要进行条件检查。我们遇到的场景稍微有点不一样,但是核心思想跟条件变量是一样的。假设有个一个生产者和多个消费者使用一个共享队列的经典场景,生产者可以将一个元素添加到队列并唤醒所有消费者。这个唤醒调用意味着队列中的数据是可访问的,并且由于队列是共享的,消费者必须通过锁来进行同步访问。每个消费者都可能拿到锁;然而,你仍然需要检查队列中是否有元素。因为在你拿到锁的瞬间并不知道队列的状态,所以还是需要进行条件检查。

在我们的例子中,超时处理程序收到了计时器到期时发出的“唤醒”调用,但是它仍需要检查是否已向其发送了取消信号,然后才能继续执行回调。

 title=

如果你要唤醒多个 gopher,可能就需要进行条件检查

死锁

当一个线程被卡住,无限期地等待一个唤醒信号,但是这个信号永远不会到达时,就会发生这种情况。死锁可以通过让你的整个程序停机来彻底杀死你的应用。

在我们的案例中,这种情况的发生是由于多次发送请求到一个非缓冲且阻塞的通道。这意味着向通道发送数据只有在从这个通道接收完数据后才能返回。我们的超时线程循环迅速从取消通道接收信号;然而,在接收到第一个信号后,它将跳出循环,并且再也不会从这个通道读取数据。其他的调用会一直被卡住。为避免这种情况,你需要仔细检查代码,谨慎处理阻塞调用,并确保不会发生线程饥饿。我们例子中的解决方法是使取消调用成为非阻塞调用 — 我们不需要阻塞调用。


via: https://opensource.com/article/19/12/go-common-pitfalls

作者:Eduardo Ferreira 选题:lujun9972 译者:lxbwolf 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

Pony,一种“Rust 遇上 Erlang”的语言,让开发快捷、安全、高效、高并发的程序更简单。

Wallaroo Labs,我是工程副总裁,我们正在构建一个用 Pony 编程语言编写的 高性能分布式流处理器。大多数人没有听说过 Pony,但它一直是 Wallaroo 的最佳选择,它也可能成为你的下一个项目的最佳选择。

“一门编程语言只是另一种工具。与语法无关,与表达性无关,与范式或模型无关,仅与解决难题有关。” —Sylvan Clebsch,Pony 的创建者

我是 Pony 项目的贡献者,但在这里我要谈谈为什么 Pony 对于像 Wallaroo 这样的应用是个好选择,并分享我使用 Pony 的方式。如果你对我们为什么使用 Pony 来编写 Wallaroo 甚感兴趣,我们有一篇关于它的 博文

Pony 是什么?

你可以把 Pony 想象成某种“Rust 遇上 Erlang”的东西。Pony 有着最引人注目的特性,它们是:

  • 类型安全
  • 存储安全
  • 异常安全
  • 无数据竞争
  • 无死锁

此外,它可以被编译为高效的本地代码,它是在开放的情况下开发的,在两句版 BSD 许可证下发布。

以上说的功能不少,但在这里我将重点关注那些对我们公司来说采用 Pony 至关重要的功能。

为什么使用 Pony?

使用大多数我们现有的工具编写快速、安全、高效、高并发的程序并非易事。“快速、高效、高并发”是可实现的目标,但加入“安全”之后,就困难了许多。对于 Wallaroo,我们希望同时实现四个目标,而 Pony 让实现它们更加简单。

高并发

Pony 让并发变得简单。部分是通过提供一个固执的并发方式实现的。在 Pony 语言中,所有的并发都是通过 Actor 模型 进行的。

Actor 模型以在 Erlang 和 Akka 中的实现最为著名。Actor 模型出现于上世纪 70 年代,细节因实现方式而异。不变的是,所有计算都由通过异步消息进行通信的 actor 来执行。

你可以用这种方式来看待 Actor 模型:面向对象中的对象是状态 + 同步方法,而 actor 是状态 + 异步方法。

当一个 actor 收到一个消息时,它执行相应的方法。该方法可以在只有该 actor 可访问的状态下运行。Actor 模型允许我们以并发安全的方式使用可变状态。每个 actor 都是单线程的。一个 actor 中的两个方法绝不会并发运行。这意味着,在给定的 actor 中,数据更新不会引起数据竞争或通常与线程和可变状态相关的其他问题。

快速高效

Pony actor 通过一个高效的工作窃取调度程序来调度。每个可用的 CPU 都有一个单独 Pony 调度程序。这种每个核心一个线程的并发模型是 Pony 尝试与 CPU 协同工作以尽可能高效运行的一部分。Pony 运行时尝试尽可能利用 CPU 缓存。代码越少干扰缓存,运行得越好。Pony 意在帮你的代码与 CPU 缓存友好相处。

Pony 的运行时还会有每个 actor 的堆,因此在垃圾收集期间,没有 “停止一切” 的垃圾收集步骤。这意味着你的程序总是至少能做一点工作。因此 Pony 程序最终具有非常一致的性能和可预测的延迟。

安全

Pony 类型系统引入了一个新概念:引用能力,它使得数据安全成为类型系统的一部分。Pony 语言中每种变量的类型都包含了有关如何在 actor 之间分享数据的信息。Pony 编译器用这些信息来确认,在编译时,你的代码是无数据竞争和无死锁的。

如果这听起来有点像 Rust,那是因为本来就是这样的。Pony 的引用功能和 Rust 的借用检查器都提供数据安全性;它们只是以不同的方式来接近这个目标,并有不同的权衡。

Pony 适合你吗?

决定是否要在一个非业余爱好的项目上使用一门新的编程语言是困难的。与其他方法想比,你必须权衡工具的适当性和不成熟度。那么,Pony 和你搭不搭呢?

如果你有一个困难的并发问题需要解决,那么 Pony 可能是一个好选择。解决并发应用问题是 Pony 之所以存在的理由。如果你能用一个单线程的 Python 脚本就完成所需操作,那你大概不需要它。如果你有一个困难的并发问题,你应该考虑 Pony 及其强大的无数据竞争、并发感知类型系统。

你将获得一个这样的编译器,它将阻止你引入许多与并发相关的错误,并在运行时为你提供出色的性能特征。

开始使用 Pony

如果你准备好开始使用 Pony,你需要先在 Pony 的网站上访问 学习部分。在这里你会找到安装 Pony 编译器的步骤和学习这门语言的资源。

如果你愿意为你正在使用的这个语言做出贡献,我们会在 GitHub 上为你提供一些 初学者友好的问题

同时,我迫不及待地想在 我们的 IRC 频道Pony 邮件列表 上与你交谈。

要了解更多有关 Pony 的消息,请参阅 Sean Allen 2018 年 7 月 16 日至 19 日在俄勒冈州波特兰举行的 第 20 届 OSCON 会议 上的演讲: Pony,我如何学会停止担心并拥抱未经证实的技术


via: https://opensource.com/article/18/5/pony

作者:Sean T Allen 选题:lujun9972 译者:beamrolling 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

Go 是一个内置支持并发编程的语言。借助使用 go 关键字去创建 协程 goroutine (轻量级线程)和在 Go 中提供的 使用 信道其它的并发 同步方法,使得并发编程变得很容易、很灵活和很有趣。

另一方面,Go 并不会阻止一些因 Go 程序员粗心大意或者缺乏经验而造成的并发编程错误。在本文的下面部分将展示一些在 Go 编程中常见的并发编程错误,以帮助 Go 程序员们避免再犯类似的错误。

需要同步的时候没有同步

代码行或许 不是按出现的顺序运行的

在下面的程序中有两个错误。

  • 第一,在 main 协程中读取 b 和在新的 协程 中写入 b 可能导致数据争用。
  • 第二,条件 b == true 并不能保证在 main 协程 中的 a != nil。在新的协程中编译器和 CPU 可能会通过 重排序指令 进行优化,因此,在运行时 b 赋值可能发生在 a 赋值之前,在 main 协程 中当 a 被修改后,它将会让部分 a 一直保持为 nil
package main

import (
    "time"
    "runtime"
)

func main() {
    var a []int // nil
    var b bool  // false

    // a new goroutine
    go func () {
        a = make([]int, 3)
        b = true // write b
    }()

    for !b { // read b
        time.Sleep(time.Second)
        runtime.Gosched()
    }
    a[0], a[1], a[2] = 0, 1, 2 // might panic
}

上面的程序或者在一台计算机上运行的很好,但是在另一台上可能会引发异常。或者它可能运行了 N 次都很好,但是可能在第 (N+1) 次引发了异常。

我们将使用 sync 标准包中提供的信道或者同步方法去确保内存中的顺序。例如,

package main

func main() {
    var a []int = nil
    c := make(chan struct{})

    // a new goroutine
    go func () {
        a = make([]int, 3)
        c <- struct{}{}
    }()

    <-c
    a[0], a[1], a[2] = 0, 1, 2
}

使用 time.Sleep 调用去做同步

我们先来看一个简单的例子。

package main

import (
    "fmt"
    "time"
)

func main() {
    var x = 123

    go func() {
        x = 789 // write x
    }()

    time.Sleep(time.Second)
    fmt.Println(x) // read x
}

我们预期程序将打印出 789。如果我们运行它,通常情况下,它确定打印的是 789。但是,这个程序使用的同步方式好吗?No!原因是 Go 运行时并不保证 x 的写入一定会发生在 x 的读取之前。在某些条件下,比如在同一个操作系统上,大部分 CPU 资源被其它运行的程序所占用的情况下,写入 x 可能就会发生在读取 x 之后。这就是为什么我们在正式的项目中,从来不使用 time.Sleep 调用去实现同步的原因。

我们来看一下另外一个示例。

package main

import (
    "fmt"
    "time"
)

var x = 0

func main() {
    var num = 123
    var p = &num

    c := make(chan int)

    go func() {
        c <- *p + x
    }()

    time.Sleep(time.Second)
    num = 789
    fmt.Println(<-c)
}

你认为程序的预期输出是什么?123 还是 789?事实上它的输出与编译器有关。对于标准的 Go 编译器 1.10 来说,这个程序很有可能输出是 123。但是在理论上,它可能输出的是 789,或者其它的随机数。

现在,我们来改变 c <- *p + xc <- *p,然后再次运行这个程序。你将会发现输出变成了 789 (使用标准的 Go 编译器 1.10)。这再次说明它的输出是与编译器相关的。

是的,在上面的程序中存在数据争用。表达式 *p 可能会被先计算、后计算、或者在处理赋值语句 num = 789 时计算。time.Sleep 调用并不能保证 *p 发生在赋值语句处理之前进行。

对于这个特定的示例,我们将在新的协程创建之前,将值保存到一个临时值中,然后在新的协程中使用临时值去消除数据争用。

...
    tmp := *p + x
    go func() {
        c <- tmp
    }()
...

使协程挂起

挂起协程是指让协程一直处于阻塞状态。导致协程被挂起的原因很多。比如,

  • 一个协程尝试从一个 nil 信道中或者从一个没有其它协程给它发送值的信道中检索数据。
  • 一个协程尝试去发送一个值到 nil 信道,或者发送到一个没有其它的协程接收值的信道中。
  • 一个协程被它自己死锁。
  • 一组协程彼此死锁。
  • 当运行一个没有 default 分支的 select 代码块时,一个协程被阻塞,以及在 select 代码块中 case 关键字后的所有信道操作保持阻塞状态。

除了有时我们为了避免程序退出,特意让一个程序中的 main 协程保持挂起之外,大多数其它的协程挂起都是意外情况。Go 运行时很难判断一个协程到底是处于挂起状态还是临时阻塞。因此,Go 运行时并不会去释放一个挂起的协程所占用的资源。

谁先响应谁获胜 的信道使用案例中,如果使用的 future 信道容量不够大,当尝试向 Future 信道发送结果时,一些响应较慢的信道将被挂起。比如,如果调用下面的函数,将有 4 个协程处于永远阻塞状态。

func request() int {
    c := make(chan int)
    for i := 0; i < 5; i++ {
        i := i
        go func() {
            c <- i // 4 goroutines will hang here.
        }()
    }
    return <-c
}

为避免这 4 个协程一直处于挂起状态, c 信道的容量必须至少是 4

实现谁先响应谁获胜的第二种方法 的信道使用案例中,如果将 future 信道用做非缓冲信道,那么有可能这个信息将永远也不会有响应而挂起。例如,如果在一个协程中调用下面的函数,协程可能会挂起。原因是,如果接收操作 <-c 准备就绪之前,五个发送操作全部尝试发送,那么所有的尝试发送的操作将全部失败,因此那个调用者协程将永远也不会接收到值。

func request() int {
    c := make(chan int)
    for i := 0; i < 5; i++ {
        i := i
        go func() {
            select {
            case c <- i:
            default:
            }
        }()
    }
    return <-c
}

将信道 c 变成缓冲信道将保证五个发送操作中的至少一个操作会发送成功,这样,上面函数中的那个调用者协程将不会被挂起。

sync 标准包中拷贝类型值

在实践中,sync 标准包中的类型值不会被拷贝。我们应该只拷贝这个值的指针。

下面是一个错误的并发编程示例。在这个示例中,当调用 Counter.Value 方法时,将拷贝一个 Counter 接收值。作为接收值的一个字段,Counter 接收值的各个 Mutex 字段也会被拷贝。拷贝不是同步发生的,因此,拷贝的 Mutex 值可能会出错。即便是没有错误,拷贝的 Counter 接收值的访问保护也是没有意义的。

import "sync"

type Counter struct {
    sync.Mutex
    n int64
}

// This method is okay.
func (c *Counter) Increase(d int64) (r int64) {
    c.Lock()
    c.n += d
    r = c.n
    c.Unlock()
    return
}

// The method is bad. When it is called, a Counter
// receiver value will be copied.
func (c Counter) Value() (r int64) {
    c.Lock()
    r = c.n
    c.Unlock()
    return
}

我们只需要改变 Value 接收类型方法为指针类型 *Counter,就可以避免拷贝 Mutex 值。

在官方的 Go SDK 中提供的 go vet 命令将会报告潜在的错误值拷贝。

在错误的地方调用 sync.WaitGroup 的方法

每个 sync.WaitGroup 值维护一个内部计数器,这个计数器的初始值为 0。如果一个 WaitGroup 计数器的值是 0,调用 WaitGroup 值的 Wait 方法就不会被阻塞,否则,在计数器值为 0 之前,这个调用会一直被阻塞。

为了让 WaitGroup 值的使用有意义,当一个 WaitGroup 计数器值为 0 时,必须在相应的 WaitGroup 值的 Wait 方法调用之前,去调用 WaitGroup 值的 Add 方法。

例如,下面的程序中,在不正确位置调用了 Add 方法,这将使最后打印出的数字不总是 100。事实上,这个程序最后打印的数字可能是在 [0, 100) 范围内的一个随意数字。原因就是 Add 方法的调用并不保证一定会发生在 Wait 方法调用之前。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    var wg sync.WaitGroup
    var x int32 = 0
    for i := 0; i < 100; i++ {
        go func() {
            wg.Add(1)
            atomic.AddInt32(&x, 1)
            wg.Done()
        }()
    }

    fmt.Println("To wait ...")
    wg.Wait()
    fmt.Println(atomic.LoadInt32(&x))
}

为让程序的表现符合预期,在 for 循环中,我们将把 Add 方法的调用移动到创建的新协程的范围之外,修改后的代码如下。

...
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            atomic.AddInt32(&x, 1)
            wg.Done()
        }()
    }
...

不正确使用 futures 信道

信道使用案例 的文章中,我们知道一些函数将返回 futures 信道。假设 fafb 就是这样的两个函数,那么下面的调用就使用了不正确的 future 参数。

doSomethingWithFutureArguments(<-fa(), <-fb())

在上面的代码行中,两个信道接收操作是顺序进行的,而不是并发的。我们做如下修改使它变成并发操作。

ca, cb := fa(), fb()
doSomethingWithFutureArguments(<-c1, <-c2)

没有等协程的最后的活动的发送结束就关闭信道

Go 程序员经常犯的一个错误是,还有一些其它的协程可能会发送值到以前的信道时,这个信道就已经被关闭了。当这样的发送(发送到一个已经关闭的信道)真实发生时,将引发一个异常。

这种错误在一些以往的著名 Go 项目中也有发生,比如在 Kubernetes 项目中的 这个 bug这个 bug

如何安全和优雅地关闭信道,请阅读 这篇文章

在值上做 64 位原子操作时没有保证值地址 64 位对齐

到目前为止(Go 1.10),在标准的 Go 编译器中,在一个 64 位原子操作中涉及到的值的地址要求必须是 64 位对齐的。如果没有对齐则导致当前的协程异常。对于标准的 Go 编译器来说,这种失败仅发生在 32 位的架构上。请阅读 内存布局 去了解如何在一个 32 位操作系统上保证 64 位对齐。

没有注意到大量的资源被 time.After 函数调用占用

time 标准包中的 After 函数返回 一个延迟通知的信道。这个函数在某些情况下用起来很便捷,但是,每次调用它将创建一个 time.Timer 类型的新值。这个新创建的 Timer 值在通过传递参数到 After 函数指定期间保持激活状态,如果在这个期间过多的调用了该函数,可能会有太多的 Timer 值保持激活,这将占用大量的内存和计算资源。

例如,如果调用了下列的 longRunning 函数,将在一分钟内产生大量的消息,然后在某些周期内将有大量的 Timer 值保持激活,即便是大量的这些 Timer 值已经没用了也是如此。

import (
    "fmt"
    "time"
)

// The function will return if a message arrival interval
// is larger than one minute.
func longRunning(messages <-chan string) {
    for {
        select {
        case <-time.After(time.Minute):
            return
        case msg := <-messages:
            fmt.Println(msg)
        }
    }
}

为避免在上述代码中创建过多的 Timer 值,我们将使用一个单一的 Timer 值去完成同样的任务。

func longRunning(messages <-chan string) {
    timer := time.NewTimer(time.Minute)
    defer timer.Stop()

    for {
        select {
        case <-timer.C:
            return
        case msg := <-messages:
            fmt.Println(msg)
            if !timer.Stop() {
                <-timer.C
            }
        }

        // The above "if" block can also be put here.

        timer.Reset(time.Minute)
    }
}

不正确地使用 time.Timer

在最后,我们将展示一个符合语言使用习惯的 time.Timer 值的使用示例。需要注意的一个细节是,那个 Reset 方法总是在停止或者 time.Timer 值释放时被使用。

select 块的第一个 case 分支的结束部分,time.Timer 值被释放,因此,我们不需要去停止它。但是必须在第二个分支中停止定时器。如果在第二个分支中 if 代码块缺失,它可能至少在 Reset 方法调用时,会(通过 Go 运行时)发送到 timer.C 信道,并且那个 longRunning 函数可能会早于预期返回,对于 Reset 方法来说,它可能仅仅是重置内部定时器为 0,它将不会清理(耗尽)那个发送到 timer.C 信道的值。

例如,下面的程序很有可能在一秒内而不是十秒时退出。并且更重要的是,这个程序并不是 DRF 的(LCTT 译注:data race free,多线程程序的一种同步程度)。

package main

import (
    "fmt"
    "time"
)

func main() {
    start := time.Now()
    timer := time.NewTimer(time.Second/2)
    select {
    case <-timer.C:
    default:
        time.Sleep(time.Second) // go here
    }
    timer.Reset(time.Second * 10)
    <-timer.C
    fmt.Println(time.Since(start)) // 1.000188181s
}

time.Timer 的值不再被其它任何一个东西使用时,它的值可能被停留在一种非停止状态,但是,建议在结束时停止它。

在多个协程中如果不按建议使用 time.Timer 值并发,可能会有 bug 隐患。

我们不应该依赖一个 Reset 方法调用的返回值。Reset 方法返回值的存在仅仅是为了兼容性目的。


via: https://go101.org/article/concurrent-common-mistakes.html

作者:<go101.org> 译者:qhwdw 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

这是我写的并发网络服务器系列文章的第五部分。在前四部分中我们讨论了并发服务器的结构,这篇文章我们将去研究一个在生产系统中大量使用的服务器的案例—— Redis

Redis logo

Redis 是一个非常有魅力的项目,我关注它很久了。它最让我着迷的一点就是它的 C 源代码非常清晰。它也是一个高性能、大并发的内存数据库服务器的非常好的例子,它是研究网络并发服务器的一个非常好的案例,因此,我们不能错过这个好机会。

我们来看看前四部分讨论的概念在真实世界中的应用程序。

本系列的所有文章有:

事件处理库

Redis 最初发布于 2009 年,它最牛逼的一件事情大概就是它的速度 —— 它能够处理大量的并发客户端连接。需要特别指出的是,它是用一个单线程来完成的,而且还不对保存在内存中的数据使用任何复杂的锁或者同步机制。

Redis 之所以如此牛逼是因为,它在给定的系统上使用了其可用的最快的事件循环,并将它们封装成由它实现的事件循环库(在 Linux 上是 epoll,在 BSD 上是 kqueue,等等)。这个库的名字叫做 ae。ae 使得编写一个快速服务器变得很容易,只要在它内部没有阻塞即可,而 Redis 则保证 注1 了这一点。

在这里,我们的兴趣点主要是它对文件事件的支持 —— 当文件描述符(如网络套接字)有一些有趣的未决事情时将调用注册的回调函数。与 libuv 类似,ae 支持多路事件循环(参阅本系列的第三节第四节)和不应该感到意外的 aeCreateFileEvent 信号:

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
                      aeFileProc *proc, void *clientData);

它在 fd 上使用一个给定的事件循环,为新的文件事件注册一个回调(proc)函数。当使用的是 epoll 时,它将调用 epoll_ctl 在文件描述符上添加一个事件(可能是 EPOLLINEPOLLOUT、也或许两者都有,取决于 mask 参数)。ae 的 aeProcessEvents 功能是 “运行事件循环和发送回调函数”,它在底层调用了 epoll_wait

处理客户端请求

我们通过跟踪 Redis 服务器代码来看一下,ae 如何为客户端事件注册回调函数的。initServer 启动时,通过注册一个回调函数来读取正在监听的套接字上的事件,通过使用回调函数 acceptTcpHandler 来调用 aeCreateFileEvent。当新的连接可用时,这个回调函数被调用。它调用 accept 注2 ,接下来是 acceptCommonHandler,它转而去调用 createClient 以初始化新客户端连接所需要的数据结构。

createClient 的工作是去监听来自客户端的入站数据。它将套接字设置为非阻塞模式(一个异步事件循环中的关键因素)并使用 aeCreateFileEvent 去注册另外一个文件事件回调函数以读取事件 —— readQueryFromClient。每当客户端发送数据,这个函数将被事件循环调用。

readQueryFromClient 就让我们期望的那样 —— 解析客户端命令和动作,并通过查询和/或操作数据来回复。因为客户端套接字是非阻塞的,所以这个函数必须能够处理 EAGAIN,以及部分数据;从客户端中读取的数据是累积在客户端专用的缓冲区中,而完整的查询可能被分割在回调函数的多个调用当中。

将数据发送回客户端

在前面的内容中,我说到了 readQueryFromClient 结束了发送给客户端的回复。这在逻辑上是正确的,因为 readQueryFromClient 准备要发送回复,但它不真正去做实质的发送 —— 因为这里并不能保证客户端套接字已经准备好写入/发送数据。我们必须为此使用事件循环机制。

Redis 是这样做的,它注册一个 beforeSleep 函数,每次事件循环即将进入休眠时,调用它去等待套接字变得可以读取/写入。beforeSleep 做的其中一件事情就是调用 handleClientsWithPendingWrites。它的作用是通过调用 writeToClient 去尝试立即发送所有可用的回复;如果一些套接字不可用时,那么套接字可用时,它将注册一个事件循环去调用 sendReplyToClient。这可以被看作为一种优化 —— 如果套接字可用于立即发送数据(一般是 TCP 套接字),这时并不需要注册事件 ——直接发送数据。因为套接字是非阻塞的,它从不会去阻塞循环。

为什么 Redis 要实现它自己的事件库?

第四节 中我们讨论了使用 libuv 来构建一个异步并发服务器。需要注意的是,Redis 并没有使用 libuv,或者任何类似的事件库,而是它去实现自己的事件库 —— ae,用 ae 来封装 epoll、kqueue 和 select。事实上,Antirez(Redis 的创建者)恰好在 2011 年的一篇文章 中回答了这个问题。他的回答的要点是:ae 只有大约 770 行他理解的非常透彻的代码;而 libuv 代码量非常巨大,也没有提供 Redis 所需的额外功能。

现在,ae 的代码大约增长到 1300 多行,比起 libuv 的 26000 行(这是在没有 Windows、测试、示例、文档的情况下的数据)来说那是小巫见大巫了。libuv 是一个非常综合的库,这使它更复杂,并且很难去适应其它项目的特殊需求;另一方面,ae 是专门为 Redis 设计的,与 Redis 共同演进,只包含 Redis 所需要的东西。

这是我 前些年在一篇文章中 提到的软件项目依赖关系的另一个很好的示例:

依赖的优势与在软件项目上花费的工作量成反比。

在某种程度上,Antirez 在他的文章中也提到了这一点。他提到,提供大量附加价值(在我的文章中的“基础” 依赖)的依赖比像 libuv 这样的依赖更有意义(它的例子是 jemalloc 和 Lua),对于 Redis 特定需求,其功能的实现相当容易。

Redis 中的多线程

在 Redis 的绝大多数历史中,它都是一个不折不扣的单线程的东西。一些人觉得这太不可思议了,有这种想法完全可以理解。Redis 本质上是受网络束缚的 —— 只要数据库大小合理,对于任何给定的客户端请求,其大部分延时都是浪费在网络等待上,而不是在 Redis 的数据结构上。

然而,现在事情已经不再那么简单了。Redis 现在有几个新功能都用到了线程:

  1. “惰性” 内存释放
  2. 在后台线程中使用 fsync 调用写一个 持久化日志
  3. 运行需要执行一个长周期运行的操作的用户定义模块。

对于前两个特性,Redis 使用它自己的一个简单的 bio(它是 “Background I/O" 的首字母缩写)库。这个库是根据 Redis 的需要进行了硬编码,它不能用到其它的地方 —— 它运行预设数量的线程,每个 Redis 后台作业类型需要一个线程。

而对于第三个特性,Redis 模块 可以定义新的 Redis 命令,并且遵循与普通 Redis 命令相同的标准,包括不阻塞主线程。如果在模块中自定义的一个 Redis 命令,希望去执行一个长周期运行的操作,这将创建一个线程在后台去运行它。在 Redis 源码树中的 src/modules/helloblock.c 提供了这样的一个示例。

有了这些特性,Redis 使用线程将一个事件循环结合起来,在一般的案例中,Redis 具有了更快的速度和弹性,这有点类似于在本系统文章中 第四节 讨论的工作队列。

  • 注1: Redis 的一个核心部分是:它是一个 内存中 数据库;因此,查询从不会运行太长的时间。当然了,这将会带来各种各样的其它问题。在使用分区的情况下,服务器可能最终路由一个请求到另一个实例上;在这种情况下,将使用异步 I/O 来避免阻塞其它客户端。
  • 注2: 使用 anetAcceptanet 是 Redis 对 TCP 套接字代码的封装。

via: https://eli.thegreenplace.net/2017/concurrent-servers-part-5-redis-case-study/

作者:Eli Bendersky 译者:qhwdw 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

这是并发网络服务器系列文章的第四部分。在这一部分中,我们将使用 libuv 再次重写我们的服务器,并且也会讨论关于使用一个线程池在回调中去处理耗时任务。最终,我们去看一下底层的 libuv,花一点时间去学习如何用异步 API 对文件系统阻塞操作进行封装。

本系列的所有文章:

使用 libuv 抽象出事件驱动循环

第三节 中,我们看到了基于 selectepoll 的服务器的相似之处,并且,我说过,在它们之间抽象出细微的差别是件很有吸引力的事。许多库已经做到了这些,所以在这一部分中我将去选一个并使用它。我选的这个库是 libuv,它最初设计用于 Node.js 底层的可移植平台层,并且,后来发现在其它的项目中也有使用。libuv 是用 C 写的,因此,它具有很高的可移植性,非常适用嵌入到像 JavaScript 和 Python 这样的高级语言中。

虽然 libuv 为了抽象出底层平台细节已经变成了一个相当大的框架,但它仍然是以 事件循环 思想为中心的。在我们第三部分的事件驱动服务器中,事件循环是显式定义在 main 函数中的;当使用 libuv 时,该循环通常隐藏在库自身中,而用户代码仅需要注册事件句柄(作为一个回调函数)和运行这个循环。此外,libuv 会在给定的平台上使用更快的事件循环实现,对于 Linux 它是 epoll,等等。

libuv loop

libuv 支持多路事件循环,因此事件循环在库中是非常重要的;它有一个句柄 —— uv_loop_t,以及创建/杀死/启动/停止循环的函数。也就是说,在这篇文章中,我将仅需要使用 “默认的” 循环,libuv 可通过 uv_default_loop() 提供它;多路循环大多用于多线程事件驱动的服务器,这是一个更高级别的话题,我将留在这一系列文章的以后部分。

使用 libuv 的并发服务器

为了对 libuv 有一个更深的印象,让我们跳转到我们的可靠协议的服务器,它通过我们的这个系列已经有了一个强大的重新实现。这个服务器的结构与第三部分中的基于 selectepoll 的服务器有一些相似之处,因为,它也依赖回调。完整的 示例代码在这里;我们开始设置这个服务器的套接字绑定到一个本地端口:

int portnum = 9090;
if (argc >= 2) {
  portnum = atoi(argv[1]);
}
printf("Serving on port %d\n", portnum);

int rc;
uv_tcp_t server_stream;
if ((rc = uv_tcp_init(uv_default_loop(), &server_stream)) < 0) {
  die("uv_tcp_init failed: %s", uv_strerror(rc));
}

struct sockaddr_in server_address;
if ((rc = uv_ip4_addr("0.0.0.0", portnum, &server_address)) < 0) {
  die("uv_ip4_addr failed: %s", uv_strerror(rc));
}

if ((rc = uv_tcp_bind(&server_stream, (const struct sockaddr*)&server_address, 0)) < 0) {
  die("uv_tcp_bind failed: %s", uv_strerror(rc));
}

除了它被封装进 libuv API 中之外,你看到的是一个相当标准的套接字。在它的返回中,我们取得了一个可工作于任何 libuv 支持的平台上的可移植接口。

这些代码也展示了很认真负责的错误处理;多数的 libuv 函数返回一个整数状态,返回一个负数意味着出现了一个错误。在我们的服务器中,我们把这些错误看做致命问题进行处理,但也可以设想一个更优雅的错误恢复。

现在,那个套接字已经绑定,是时候去监听它了。这里我们运行首个回调注册:

// Listen on the socket for new peers to connect. When a new peer connects,
// the on_peer_connected callback will be invoked.
if ((rc = uv_listen((uv_stream_t*)&server_stream, N_BACKLOG, on_peer_connected)) < 0) {
  die("uv_listen failed: %s", uv_strerror(rc));
}

uv_listen 注册一个事件回调,当新的对端连接到这个套接字时将会调用事件循环。我们的回调在这里被称为 on_peer_connected,我们一会儿将去查看它。

最终,main 运行这个 libuv 循环,直到它被停止(uv_run 仅在循环被停止或者发生错误时返回)。

// Run the libuv event loop.
uv_run(uv_default_loop(), UV_RUN_DEFAULT);

// If uv_run returned, close the default loop before exiting.
return uv_loop_close(uv_default_loop());

注意,在运行事件循环之前,只有一个回调是通过 main 注册的;我们稍后将看到怎么去添加更多的回调。在事件循环的整个运行过程中,添加和删除回调并不是一个问题 —— 事实上,大多数服务器就是这么写的。

这是一个 on_peer_connected,它处理到服务器的新的客户端连接:

void on_peer_connected(uv_stream_t* server_stream, int status) {
  if (status < 0) {
    fprintf(stderr, "Peer connection error: %s\n", uv_strerror(status));
    return;
  }

  // client will represent this peer; it's allocated on the heap and only
  // released when the client disconnects. The client holds a pointer to
  // peer_state_t in its data field; this peer state tracks the protocol state
  // with this client throughout interaction.
  uv_tcp_t* client = (uv_tcp_t*)xmalloc(sizeof(*client));
  int rc;
  if ((rc = uv_tcp_init(uv_default_loop(), client)) < 0) {
    die("uv_tcp_init failed: %s", uv_strerror(rc));
  }
  client->data = NULL;

  if (uv_accept(server_stream, (uv_stream_t*)client) == 0) {
    struct sockaddr_storage peername;
    int namelen = sizeof(peername);
    if ((rc = uv_tcp_getpeername(client, (struct sockaddr*)&peername,
                                 &namelen)) < 0) {
      die("uv_tcp_getpeername failed: %s", uv_strerror(rc));
    }
    report_peer_connected((const struct sockaddr_in*)&peername, namelen);

    // Initialize the peer state for a new client: we start by sending the peer
    // the initial '*' ack.
    peer_state_t* peerstate = (peer_state_t*)xmalloc(sizeof(*peerstate));
    peerstate->state = INITIAL_ACK;
    peerstate->sendbuf[0] = '*';
    peerstate->sendbuf_end = 1;
    peerstate->client = client;
    client->data = peerstate;

    // Enqueue the write request to send the ack; when it's done,
    // on_wrote_init_ack will be called. The peer state is passed to the write
    // request via the data pointer; the write request does not own this peer
    // state - it's owned by the client handle.
    uv_buf_t writebuf = uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end);
    uv_write_t* req = (uv_write_t*)xmalloc(sizeof(*req));
    req->data = peerstate;
    if ((rc = uv_write(req, (uv_stream_t*)client, &writebuf, 1,
                       on_wrote_init_ack)) < 0) {
      die("uv_write failed: %s", uv_strerror(rc));
    }
  } else {
    uv_close((uv_handle_t*)client, on_client_closed);
  }
}

这些代码都有很好的注释,但是,这里有一些重要的 libuv 语法我想去强调一下:

  • 传入自定义数据到回调中:因为 C 语言还没有闭包,这可能是个挑战,libuv 在它的所有的处理类型中有一个 void* data 字段;这些字段可以被用于传递用户数据。例如,注意 client->data 是如何指向到一个 peer_state_t 结构上,以便于 uv_writeuv_read_start 注册的回调可以知道它们正在处理的是哪个客户端的数据。
  • 内存管理:在带有垃圾回收的语言中进行事件驱动编程是非常容易的,因为,回调通常运行在一个与它们注册的地方完全不同的栈帧中,使得基于栈的内存管理很困难。它总是需要传递堆分配的数据到 libuv 回调中(当所有回调运行时,除了 main,其它的都运行在栈上),并且,为了避免泄漏,许多情况下都要求这些数据去安全释放(free())。这些都是些需要实践的内容 注1

这个服务器上对端的状态如下:

typedef struct {
  ProcessingState state;
  char sendbuf[SENDBUF_SIZE];
  int sendbuf_end;
  uv_tcp_t* client;
} peer_state_t;

它与第三部分中的状态非常类似;我们不再需要 sendptr,因为,在调用 “done writing” 回调之前,uv_write 将确保发送它提供的整个缓冲。我们也为其它的回调使用保持了一个到客户端的指针。这里是 on_wrote_init_ack

void on_wrote_init_ack(uv_write_t* req, int status) {
  if (status) {
    die("Write error: %s\n", uv_strerror(status));
  }
  peer_state_t* peerstate = (peer_state_t*)req->data;
  // Flip the peer state to WAIT_FOR_MSG, and start listening for incoming data
  // from this peer.
  peerstate->state = WAIT_FOR_MSG;
  peerstate->sendbuf_end = 0;

  int rc;
  if ((rc = uv_read_start((uv_stream_t*)peerstate->client, on_alloc_buffer,
                          on_peer_read)) < 0) {
    die("uv_read_start failed: %s", uv_strerror(rc));
  }

  // Note: the write request doesn't own the peer state, hence we only free the
  // request itself, not the state.
  free(req);
}

然后,我们确信知道了这个初始的 '*' 已经被发送到对端,我们通过调用 uv_read_start 去监听从这个对端来的入站数据,它注册一个将被事件循环调用的回调(on_peer_read),不论什么时候,事件循环都在套接字上接收来自客户端的调用:

void on_peer_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf) {
  if (nread < 0) {
    if (nread != uv_eof) {
      fprintf(stderr, "read error: %s\n", uv_strerror(nread));
    }
    uv_close((uv_handle_t*)client, on_client_closed);
  } else if (nread == 0) {
    // from the documentation of uv_read_cb: nread might be 0, which does not
    // indicate an error or eof. this is equivalent to eagain or ewouldblock
    // under read(2).
  } else {
    // nread > 0
    assert(buf->len >= nread);

    peer_state_t* peerstate = (peer_state_t*)client->data;
    if (peerstate->state == initial_ack) {
      // if the initial ack hasn't been sent for some reason, ignore whatever
      // the client sends in.
      free(buf->base);
      return;
    }

    // run the protocol state machine.
    for (int i = 0; i < nread; ++i) {
      switch (peerstate->state) {
      case initial_ack:
        assert(0 && "can't reach here");
        break;
      case wait_for_msg:
        if (buf->base[i] == '^') {
          peerstate->state = in_msg;
        }
        break;
      case in_msg:
        if (buf->base[i] == '$') {
          peerstate->state = wait_for_msg;
        } else {
          assert(peerstate->sendbuf_end < sendbuf_size);
          peerstate->sendbuf[peerstate->sendbuf_end++] = buf->base[i] + 1;
        }
        break;
      }
    }

    if (peerstate->sendbuf_end > 0) {
      // we have data to send. the write buffer will point to the buffer stored
      // in the peer state for this client.
      uv_buf_t writebuf =
          uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end);
      uv_write_t* writereq = (uv_write_t*)xmalloc(sizeof(*writereq));
      writereq->data = peerstate;
      int rc;
      if ((rc = uv_write(writereq, (uv_stream_t*)client, &writebuf, 1,
                         on_wrote_buf)) < 0) {
        die("uv_write failed: %s", uv_strerror(rc));
      }
    }
  }
  free(buf->base);
}

这个服务器的运行时行为非常类似于第三部分的事件驱动服务器:所有的客户端都在一个单个的线程中并发处理。并且类似的,一些特定的行为必须在服务器代码中维护:服务器的逻辑实现为一个集成的回调,并且长周期运行是禁止的,因为它会阻塞事件循环。这一点也很类似。让我们进一步探索这个问题。

在事件驱动循环中的长周期运行的操作

单线程的事件驱动代码使它先天就容易受到一些常见问题的影响:长周期运行的代码会阻塞整个循环。参见如下的程序:

void on_timer(uv_timer_t* timer) {
  uint64_t timestamp = uv_hrtime();
  printf("on_timer [%" PRIu64 " ms]\n", (timestamp / 1000000) % 100000);

  // "Work"
  if (random() % 5 == 0) {
    printf("Sleeping...\n");
    sleep(3);
  }
}

int main(int argc, const char** argv) {
  uv_timer_t timer;
  uv_timer_init(uv_default_loop(), &timer);
  uv_timer_start(&timer, on_timer, 0, 1000);
  return uv_run(uv_default_loop(), UV_RUN_DEFAULT);
}

它用一个单个注册的回调运行一个 libuv 事件循环:on_timer,它被每秒钟循环调用一次。回调报告一个时间戳,并且,偶尔通过睡眠 3 秒去模拟一个长周期运行。这是运行示例:

$ ./uv-timer-sleep-demo
on_timer [4840 ms]
on_timer [5842 ms]
on_timer [6843 ms]
on_timer [7844 ms]
Sleeping...
on_timer [11845 ms]
on_timer [12846 ms]
Sleeping...
on_timer [16847 ms]
on_timer [17849 ms]
on_timer [18850 ms]
...

on_timer 忠实地每秒执行一次,直到随机出现的睡眠为止。在那个时间点,on_timer 不再被调用,直到睡眠时间结束;事实上,没有其它的回调 会在这个时间帧中被调用。这个睡眠调用阻塞了当前线程,它正是被调用的线程,并且也是事件循环使用的线程。当这个线程被阻塞后,事件循环也被阻塞。

这个示例演示了在事件驱动的调用中为什么回调不能被阻塞是多少的重要。并且,同样适用于 Node.js 服务器、客户端侧的 Javascript、大多数的 GUI 编程框架、以及许多其它的异步编程模型。

但是,有时候运行耗时的任务是不可避免的。并不是所有任务都有一个异步 API;例如,我们可能使用一些仅有同步 API 的库去处理,或者,正在执行一个可能的长周期计算。我们如何用事件驱动编程去结合这些代码?线程可以帮到你!

“转换” 阻塞调用为异步调用的线程

一个线程池可以用于转换阻塞调用为异步调用,通过与事件循环并行运行,并且当任务完成时去由它去公布事件。以阻塞函数 do_work() 为例,这里介绍了它是怎么运行的:

  1. 不在一个回调中直接调用 do_work() ,而是将它打包进一个 “任务”,让线程池去运行这个任务。当任务完成时,我们也为循环去调用它注册一个回调;我们称它为 on_work_done()
  2. 在这个时间点,我们的回调就可以返回了,而事件循环保持运行;在同一时间点,线程池中的有一个线程运行这个任务。
  3. 一旦任务运行完成,通知主线程(指正在运行事件循环的线程),并且事件循环调用 on_work_done()

让我们看一下,使用 libuv 的工作调度 API,是怎么去解决我们前面的计时器/睡眠示例中展示的问题的:

void on_after_work(uv_work_t* req, int status) {
  free(req);
}

void on_work(uv_work_t* req) {
  // "Work"
  if (random() % 5 == 0) {
    printf("Sleeping...\n");
    sleep(3);
  }
}

void on_timer(uv_timer_t* timer) {
  uint64_t timestamp = uv_hrtime();
  printf("on_timer [%" PRIu64 " ms]\n", (timestamp / 1000000) % 100000);

  uv_work_t* work_req = (uv_work_t*)malloc(sizeof(*work_req));
  uv_queue_work(uv_default_loop(), work_req, on_work, on_after_work);
}

int main(int argc, const char** argv) {
  uv_timer_t timer;
  uv_timer_init(uv_default_loop(), &timer);
  uv_timer_start(&timer, on_timer, 0, 1000);
  return uv_run(uv_default_loop(), UV_RUN_DEFAULT);
}

通过一个 work_req 注2 类型的句柄,我们进入一个任务队列,代替在 on_timer 上直接调用 sleep,这个函数在任务中(on_work)运行,并且,一旦任务完成(on_after_work),这个函数被调用一次。on_work 是指 “work”(阻塞中的/耗时的操作)进行的地方。注意在这两个回调传递到 uv_queue_work 时的一个关键区别:on_work 运行在线程池中,而 on_after_work 运行在事件循环中的主线程上 —— 就好像是其它的回调一样。

让我们看一下这种方式的运行:

$ ./uv-timer-work-demo
on_timer [89571 ms]
on_timer [90572 ms]
on_timer [91573 ms]
on_timer [92575 ms]
Sleeping...
on_timer [93576 ms]
on_timer [94577 ms]
Sleeping...
on_timer [95577 ms]
on_timer [96578 ms]
on_timer [97578 ms]
...

即便在 sleep 函数被调用时,定时器也每秒钟滴答一下,睡眠现在运行在一个单独的线程中,并且不会阻塞事件循环。

一个用于练习的素数测试服务器

因为通过睡眠去模拟工作并不是件让人兴奋的事,我有一个事先准备好的更综合的一个示例 —— 一个基于套接字接受来自客户端的数字的服务器,检查这个数字是否是素数,然后去返回一个 “prime" 或者 “composite”。完整的 服务器代码在这里 —— 我不在这里粘贴了,因为它太长了,更希望读者在一些自己的练习中去体会它。

这个服务器使用了一个原生的素数测试算法,因此,对于大的素数可能花很长时间才返回一个回答。在我的机器中,对于 2305843009213693951,它花了 ~5 秒钟去计算,但是,你的方法可能不同。

练习 1:服务器有一个设置(通过一个名为 MODE 的环境变量)要么在套接字回调(意味着在主线程上)中运行素数测试,要么在 libuv 工作队列中。当多个客户端同时连接时,使用这个设置来观察服务器的行为。当它计算一个大的任务时,在阻塞模式中,服务器将不回复其它客户端,而在非阻塞模式中,它会回复。

练习 2:libuv 有一个缺省大小的线程池,并且线程池的大小可以通过环境变量配置。你可以通过使用多个客户端去实验找出它的缺省值是多少?找到线程池缺省值后,使用不同的设置去看一下,在重负载下怎么去影响服务器的响应能力。

在非阻塞文件系统中使用工作队列

对于只是呆板的演示和 CPU 密集型的计算来说,将可能的阻塞操作委托给一个线程池并不是明智的;libuv 在它的文件系统 API 中本身就大量使用了这种能力。通过这种方式,libuv 使用一个异步 API,以一个轻便的方式显示出它强大的文件系统的处理能力。

让我们使用 uv_fs_read(),例如,这个函数从一个文件中(表示为一个 uv_fs_t 句柄)读取一个文件到一个缓冲中 注3,并且当读取完成后调用一个回调。换句话说, uv_fs_read() 总是立即返回,即使是文件在一个类似 NFS 的系统上,而数据到达缓冲区可能需要一些时间。换句话说,这个 API 与这种方式中其它的 libuv API 是异步的。这是怎么工作的呢?

在这一点上,我们看一下 libuv 的底层;内部实际上非常简单,并且它是一个很好的练习。作为一个可移植的库,libuv 对于 Windows 和 Unix 系统在它的许多函数上有不同的实现。我们去看一下在 libuv 源树中的 src/unix/fs.c

这是 uv_fs_read 的代码:

int uv_fs_read(uv_loop_t* loop, uv_fs_t* req,
               uv_file file,
               const uv_buf_t bufs[],
               unsigned int nbufs,
               int64_t off,
               uv_fs_cb cb) {
  if (bufs == NULL || nbufs == 0)
    return -EINVAL;

  INIT(READ);
  req->file = file;

  req->nbufs = nbufs;
  req->bufs = req->bufsml;
  if (nbufs > ARRAY_SIZE(req->bufsml))
    req->bufs = uv__malloc(nbufs * sizeof(*bufs));

  if (req->bufs == NULL) {
    if (cb != NULL)
      uv__req_unregister(loop, req);
    return -ENOMEM;
  }

  memcpy(req->bufs, bufs, nbufs * sizeof(*bufs));

  req->off = off;
  POST;
}

第一次看可能觉得很困难,因为它延缓真实的工作到 INITPOST 宏中,以及为 POST 设置了一些本地变量。这样做可以避免了文件中的许多重复代码。

这是 INIT 宏:

#define INIT(subtype)                                                         \
  do {                                                                        \
    req->type = UV_FS;                                                        \
    if (cb != NULL)                                                           \
      uv__req_init(loop, req, UV_FS);                                         \
    req->fs_type = UV_FS_ ## subtype;                                         \
    req->result = 0;                                                          \
    req->ptr = NULL;                                                          \
    req->loop = loop;                                                         \
    req->path = NULL;                                                         \
    req->new_path = NULL;                                                     \
    req->cb = cb;                                                             \
  }                                                                           \
  while (0)

它设置了请求,并且更重要的是,设置 req->fs_type 域为真实的 FS 请求类型。因为 uv_fs_read 调用 INIT(READ),它意味着 req->fs_type 被分配一个常数 UV_FS_READ

这是 POST 宏:

#define POST                                                                  \
  do {                                                                        \
    if (cb != NULL) {                                                         \
      uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done);        \
      return 0;                                                               \
    }                                                                         \
    else {                                                                    \
      uv__fs_work(&req->work_req);                                            \
      return req->result;                                                     \
    }                                                                         \
  }                                                                           \
  while (0)

它做什么取决于回调是否为 NULL。在 libuv 文件系统 API 中,一个 NULL 回调意味着我们真实地希望去执行一个 同步 操作。在这种情况下,POST 直接调用 uv__fs_work(我们需要了解一下这个函数的功能),而对于一个非 NULL 回调,它把 uv__fs_work 作为一个工作项提交到工作队列(指的是线程池),然后,注册 uv__fs_done 作为回调;该函数执行一些登记并调用用户提供的回调。

如果我们去看 uv__fs_work 的代码,我们将看到它使用很多宏按照需求将工作分发到实际的文件系统调用。在我们的案例中,对于 UV_FS_READ 这个调用将被 uv__fs_read 生成,它(最终)使用普通的 POSIX API 去读取。这个函数可以在一个 阻塞 方式中很安全地实现。因为,它通过异步 API 调用时被置于一个线程池中。

在 Node.js 中,fs.readFile 函数是映射到 uv_fs_read 上。因此,可以在一个非阻塞模式中读取文件,甚至是当底层文件系统 API 是阻塞方式时。


  • 注1: 为确保服务器不泄露内存,我在一个启用泄露检查的 Valgrind 中运行它。因为服务器经常是被设计为永久运行,这是一个挑战;为克服这个问题,我在服务器上添加了一个 “kill 开关” —— 一个从客户端接收的特定序列,以使它可以停止事件循环并退出。这个代码在 theon_wrote_buf 句柄中。
  • 注2: 在这里我们不过多地使用 work_req;讨论的素数测试服务器接下来将展示怎么被用于去传递上下文信息到回调中。
  • 注3: uv_fs_read() 提供了一个类似于 preadv Linux 系统调用的通用 API:它使用多缓冲区用于排序,并且支持一个到文件中的偏移。基于我们讨论的目的可以忽略这些特性。

via: https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/

作者:Eli Bendersky 译者:qhwdw 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出