一般场景下取消goroutine的方法
var wg sync.WaitGroup
var exit bool
func worker(exitChan chan struct{}) {
LOOP:
for {
fmt.Printf("work\n")
time.Sleep(time.Second)
/*if exit {
break
}
*/
select {
case <- exitChan:
break LOOP
default:
}
}
wg.Done()
}
func main() {
var exitChan chan struct{} = make(chan struct{},1)
wg.Add(1)
go worker(exitChan)
time.Sleep(time.Second*3)
exitChan <- struct{}{}
//exit = true
wg.Wait()
}
var wg sync.WaitGroup
func worker(ctx context.Context) {
LOOP:
for {
fmt.Printf("work\n")
time.Sleep(time.Second)
select {
case <- ctx.Done():
break LOOP
default:
}
}
wg.Done()
}
func main() {
cxt := context.Background()
cxt,cancel := context.WithCancel(cxt)
wg.Add(1)
go worker(cxt)
time.Sleep(time.Second*3)
cancel() //取消goroutine
wg.Wait()
}
var wg sync.WaitGroup
func worker(cxt context.Context) {
traceCode,ok := cxt.Value("TRACE_CODE").(string)
if ok {
fmt.Printf("traceCode=%s\n",traceCode)
}
LOOP:
for {
fmt.Printf("worker\n")
time.Sleep(time.Millisecond)
select {
case <- cxt.Done():
break LOOP
default:
}
}
fmt.Printf("worker Done,trace_Code:%s\n",traceCode)
wg.Done()
}
func main() {
ctx := context.Background()
ctx,cancel := context.WithTimeout(ctx,time.Millisecond*50)
ctx = context.WithValue(ctx,"TRACE_CODE","212334121")
wg.Add(1)
go worker(ctx)
time.Sleep(time.Second*3)
cancel() //释放contex资源
wg.Wait()
}
高可用kafka集群部署:https://blog.51cto.com/navyaijm/2429959?source=drh
import (
"fmt"
"github.com/Shopify/sarama"
"time"
)
func main() {
//初始化配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
//消息配置
msg := &sarama.ProducerMessage{}
msg.Topic = "nginx_log"
msg.Value = sarama.StringEncoder("this is a test,messsage transfer ok")
//连接配置
client,err := sarama.NewSyncProducer([]string{"192.168.56.11:9092"},config)
if err != nil {
fmt.Printf("send message faild,error:%v\n",err)
return
}
defer client.Close()
for {
pid,offset,err := client.SendMessage(msg)
if err != nil {
fmt.Printf("send message faild,err:%v\n",err)
return
}
fmt.Printf("pid:%v,offset:%v\n",pid,offset)
time.Sleep(time.Second)
}
}
import (
"fmt"
"github.com/hpcloud/tail"
"time"
)
func main() {
filename := "./my.log"
tails,err := tail.TailFile(filename,tail.Config{
Location: &tail.SeekInfo{
Offset: 0,
Whence: 2,
},
ReOpen: true,
MustExist: false,
Poll: true,
Follow: true,
})
if err != nil {
fmt.Printf("tail file error:%v\n",err)
return
}
var msg *tail.Line
var ok bool
for true {
msg,ok = <- tails.Lines
if !ok {
fmt.Printf("tail file close reopen,filename:%s\n",filename)
time.Sleep(100*time.Millisecond)
continue
}
fmt.Printf("msg=%v\n",msg)
}
}
[root@centos7-node1 ~]# cd /opt/application/kafka/bin/
[root@centos7-node1 bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic nginx_log --from-beginning