golang并发控制WaitGroup
封装 waitgroup
func (c *Crawler) Stop() {
// 1. 匿名函数封装 wg 函数
asyncWaitStop := func(stop func()) {
c.wg.Add(1)
go func() {
stop()
c.wg.Done()
}()
}
// 2. 多次执行 函数调用
asyncWaitStop(c.prospectorsReloader.Stop)
// Stop prospectors in parallel
asyncWaitStop(p.Stop)
// 3. 等待执行结束
c.WaitForCompletion()
}
func (c *Crawler) WaitForCompletion() {
c.wg.Wait()
}
https://github.com/golang/sync/blob/master/errgroup/errgroup.go
var g errgroup.Group
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Launch a goroutine to fetch the URL.
url := url // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
// Fetch the URL.
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
// Wait for all HTTP fetches to complete.
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
}
// Group allows to start a group of goroutines and wait for their completion.
type Group struct {
wg sync.WaitGroup
}
func (g *Group) Wait() {
g.wg.Wait()
}
// StartWithChannel starts f in a new goroutine in the group.
// stopCh is passed to f as an argument. f should stop when stopCh is available.
func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{})) {
g.Start(func() {
f(stopCh)
})
}
// StartWithContext starts f in a new goroutine in the group.
// ctx is passed to f as an argument. f should stop when ctx.Done() is available.
func (g *Group) StartWithContext(ctx context.Context, f func(context.Context)) {
g.Start(func() {
f(ctx)
})
}
// Start starts f in a new goroutine in the group.
func (g *Group) Start(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}
从telegraf改造谈golang多协程精确控制
前言
telegraf是infuxdb公司开源出来的一个基于插件机制的收集metrics的项目。整个架构和elastic公司的日志收集系统极其类似,具备良好的扩展性。与现在流行的各种exporter+promethues监控方案相比:
大致具备良好的可扩展性。很容易增加自己的处理逻辑,在input,output,process,filter等环境定制自己专属的插件。
统一了各种exporter,减少了部署各种exporter的工作量和维护成本。
目前telegraf改造工作基本上是两大部分:
增加了一些telegraf不支持的插件,比如虚拟化(kvm,vmware等),数据库(oracle),k8s和openstack等input插件。
telegraf是基于配置文件的,所以会有两个问题,很难做分布式和无停机动态调度input任务。所以我们的工作就是将获取配置接口化,所有的配置文件来源于统一配置中心。然后就是改造无停机动态调度input。
在改造改造无停机动态调度input就涉及到golang多协程精确控制的问题。
一些golang常用并发手段
sync包下WaitGroup
具体事例:
var wg sync.WaitGroup
wg.Add(len(a.Config.Outputs))
for _, o := range a.Config.Outputs {
go func(output *models.RunningOutput) {
defer wg.Done()
err := output.Write()
if err != nil {
log.Printf("E! Error writing to output [%s]: %s\n",
output.Name, err.Error())
}
}(o)
}
wg.Wait()
WaitGroup内部维护了一个counter,当counter数值为0时,表明添加的任务都已经完成。
总共有三个方法:func (wg *WaitGroup) Add(delta int)
添加任务,delta参数表示添加任务的数量。func (wg *WaitGroup) Done()
任务执行完成,调用Done方法,一般使用姿势都是defer wg.Done(),此时counter中会减一。func (wg *WaitGroup) Wait()
通过使用sync.WaitGroup,可以阻塞主线程,直到相应数量的子线程结束。
chan struct{},控制协程退出
启动协程的时候,传递一个shutdown chan struct{},需要关闭该协程的时候,直接close(shutdown)。struct{}在golang中是一个消耗接近0的对象。
具体事例:
// gatherer runs the inputs that have been configured with their own
// reporting interval.
func (a *Agent) gatherer(
shutdown chan struct{},
kill chan struct{},
input *models.RunningInput,
interval time.Duration,
metricC chan telegraf.Metric,
) {
defer panicRecover(input)
GatherTime := selfstat.RegisterTiming("gather",
"gather_time_ns",
map[string]string{"input": input.Config.Name},
)
acc := NewAccumulator(input, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown)
start := time.Now()
gatherWithTimeout(shutdown, kill, input, acc, interval)
elapsed := time.Since(start)
GatherTime.Incr(elapsed.Nanoseconds())
select {
case <-shutdown:
return
case <-kill:
return
case <-ticker.C:
continue
}
}
}
借助chan 实现指定数量的协程或动态调整协程数量
当然这里必须是每个协程是幂等,也就是所有协程做的是同样的工作。
首先创建 一个 pool:= make(chan chan struct{}, maxWorkers),maxWorkers为目标协程数量。
然后启动协程:
for i := 0; i < s.workers; i++ {
go func() {
wQuit := make(chan struct{})
s.pool <- wQuit
s.sFlowWorker(wQuit)
}()
}
关闭协程:
func (s *SFlow) sFlowWorker(wQuit chan struct{}) {
LOOP:
for {
select {
case <-wQuit:
break LOOP
case msg, ok = <-sFlowUDPCh:
if !ok {
break LOOP
}
}
// 此处执行任务操作
}
动态调整:
for n = 0; n < 10; n++ {
if len(s.pool) > s.workers {
wQuit := <-s.pool
close(wQuit)
}
}
多协程精确控制
在改造telegraf过程中,要想动态调整input,每个input都是唯一的,分属不同类型插件。就必须实现精准控制指定的协程的启停。
这个时候实现思路就是:实现一个kills map[string]chan struct{},k为每个任务的唯一ID。添加任务时候,传递一个chan struct{},这个时候关闭指定ID的chan struct{},就能控制指定的协程。
// DelInput add input
func (a *Agent) DelInput(inputs []*models.RunningInput) error {
a.storeMutex.Lock()
defer a.storeMutex.Unlock()
for _, v := range inputs {
if _, ok := a.kills[v.Config.ID]; !ok {
return fmt.Errorf("input: %s,未找到,无法删除", v.Config.ID)
}
}
for _, input := range inputs {
if kill, ok := a.kills[input.Config.ID]; ok {
delete(a.kills, input.Config.ID)
close(kill)
}
}
return nil
}
添加任务:
// AddInput add input
func (a *Agent) AddInput(shutdown chan struct{}, inputs []*models.RunningInput) error {
a.storeMutex.Lock()
defer a.storeMutex.Unlock()
for _, v := range inputs {
if _, ok := a.kills[v.Config.ID]; ok {
return fmt.Errorf("input: %s,已经存在无法新增", v.Config.ID)
}
}
for _, input := range inputs {
interval := a.Config.Agent.Interval.Duration
// overwrite global interval if this plugin has it's own.
if input.Config.Interval != 0 {
interval = input.Config.Interval
}
if input.Config.ID == "" {
continue
}
a.wg.Add(1)
kill := make(chan struct{})
a.kills[input.Config.ID] = kill
go func(in *models.RunningInput, interv time.Duration) {
defer a.wg.Done()
a.gatherer(shutdown, kill, in, interv, a.metricC)
}(input, interval)
}
return nil
}
总结
简单介绍了一下telegraf项目。后续的优化和改造工作还在继续。主要是分布式telegraf的调度算法。毕竟集中化所有exporter以后,telegraf的负载能力受单机能力限制,而且也不符合高可用的使用目标。