首页 > 文章列表 > Go并发编程的用例与场景详解

Go并发编程的用例与场景详解

Go并发 编程场景
174 2024-05-26

并发编程在 Go 中通过 goroutine 实现,允许同时执行多个任务,提高效率。其用例包括:并行处理事件处理I/O 密集型操作HTTP 服务任务调度

Go并发编程的用例与场景详解

Go 并发编程的用例与场景详解

简介
并发编程是一种编程范式,它允许我们同时执行多个任务。在 Go 语言中,并发编程通过 goroutine 实现,goroutine 是轻量级线程。本篇文章将探讨 Go 中并发编程的用例和场景,并提供实际示例。

用例和场景

1. 并行处理

  • 将大型任务分解为较小的子任务并并行处理它们,以提高效率。
  • 示例:使用 Goroutines paralleize 图片处理任务。

2. 事件处理

  • 监听传入事件并使用 goroutine 并行处理每个事件。
  • 示例:使用 Goroutines 处理来自 WebSocket 连接的传入消息。

3. I/O 密集型操作

  • 对于 I/O 密集型操作,例如文件读取或网络调用,使用 Goroutines 可以提高性能。
  • 示例:使用 Goroutines 从多个文件中并行读取数据。

4. HTTP 服务

  • 在 HTTP 服务中,使用 Goroutines 处理传入请求可以提高并发性。
  • 示例:用 Goroutines 处理来自 Web 服务器的传入 HTTP 请求。

5. 任务调度

  • 使用 Goroutines 管理和调度需要在特定时间或周期性执行的任务。
  • 示例:使用 Goroutine 实现 Cron 定时器来调度作业。

实战示例

示例 1:并发图片处理

package main

import (
    "fmt"
    "image"
    "image/color"
    "image/draw"
    "runtime"
)

func main() {
    width, height := 1000, 1000
    images := []image.Image{}

    // 并行创建 100 个图像
    for i := 0; i < 100; i++ {
        img := image.NewRGBA(image.Rect(0, 0, width, height))
        draw.Draw(img, img.Bounds(), &image.Uniform{color.RGBA{0, 0, 0, 255}}, image.ZP, draw.Src)
        images = append(images, img)
    }

    // 计算创建图像所花费的时间
    numCPUs := runtime.NumCPU()
    start := time.Now()
    for i := 0; i < 100; i++ {
        go createImage(images[i])
    }

    // 等待所有 Goroutine 完成
    time.Sleep(10 * time.Second)
    elapsed := time.Since(start)
    fmt.Printf("Creating %d images using %d CPUs took %sn", len(images), numCPUs, elapsed)
}

func createImage(img image.Image) {
    // 模拟耗时的图像处理操作
    time.Sleep(500 * time.Millisecond)
}

示例 2:处理 WebSocket 消息

package main

import (
    "errors"
    "fmt"
    "net/http"
    "sync/atomic"

    "github.com/gorilla/websocket"
)

type client struct {
    conn *websocket.Conn
    name string
}

var (
    upgrader = websocket.Upgrader{}
    messages = make(chan string)
)

var connectedClients uint64

func main() {
    http.HandleFunc("/websocket", serveWebSocket)

    // 启动 Goroutine 来处理传入消息
    go handleMessage()

    if err := http.ListenAndServe(":8080", nil); err != nil {
        fmt.Println(err)
    }
}

func serveWebSocket(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        fmt.Println(err)
        return
    }

    atomic.AddUint64(&connectedClients, 1)

    go handleConnection(conn)
}

func handleConnection(conn *websocket.Conn) {
    defer func() {
        conn.Close()
        atomic.AddUint64(&connectedClients, -1)
    }()

    // 监听来自客户端的消息
    for {
        _, message, err := conn.ReadMessage()
        if err != nil {
            if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
                fmt.Println(err)
            }
            return
        }

        messages <- message
    }
}

func handleMessage() {
    for message := range messages {
        // 处理消息逻辑
        fmt.Println("Received message:", message)

        // 例如,将消息广播给所有已连接的客户端
        for clients.Range(func(_, v interface{}) bool {
            client := v.(client)
            if err := client.conn.WriteMessage(websocket.TextMessage, []byte(message)); err != nil {
                if errors.Is(err, websocket.ErrCloseSent) {
                    clients.Delete(client.name)
                    fmt.Printf("Client %s disconnectedn", client.name)
                }
            }
            return true
        }) { }
    }
}