packageを細かく分けている(アンチパターンかも…)
main()
設定ファイル読み込んで、ロガー設定して、start()
に処理を渡す
func main() {
conf, printVersion := resolveConfig()
// (snip)
logger.Infof("Starting mackerel-agent version:%s, rev:%s", version.VERSION, version.GITCOMMIT)
// (snip)
if err := start(conf); err != nil {
exit(1, conf)
}
}
start()
command.Run()
に処理を渡すc := make(chan os.Signal, 1)
termChan := make(chan chan int) // メインの処理との情報のやりとり
signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP)
go func() { // シグナルハンドリング用のgoroutine
for sig := range c {
if sig == syscall.SIGHUP { // sighupを受け取ったらhost情報を読み込み直す
command.UpdateHostSpecs(conf, api, host)
} else { // 他のシグナルを受け取ったら処理の終了を待ってexit
exitChan := make(chan int)
termChan <- exitChan // チャンネルにチャンネルを渡す
go func() { // 但し処理に時間が掛かるようなら強制的に終了
time.Sleep(MAX_TERMINATING_INTERVAL * time.Second)
exitChan <- 1
}()
exitCode := <-exitChan // 渡したチャンネルからexitCodeの返却を待つ
exit(exitCode, conf)
}
}
}()
command.Run(conf, api, host, termChan) // メインの処理
Run()
メトリクス収集項目をロードしてAgentオブジェクトの作成した後、loop()
(メイン処理)に処理を渡す
func Run(conf *config.Config, api *mackerel.API, host *mackerel.Host, termChan chan chan int) {
logger.Infof("Start: apibase = %s, hostName = %s, hostId = %s", conf.Apibase, host.Name, host.Id)
ag := &agent.Agent{
MetricsGenerators: metricsGenerators(conf),
PluginGenerators: pluginGenerators(conf),
}
ag.InitPluginGenerators(api)
loop(ag, conf, api, host, termChan)
}
loop()
queueStateというtypeを定義
type queueState int
const (
queueStateFirst queueState = iota // 初期状態
queueStateDefault // 通常
queueStateQueued // キューが溜まっている場合
queueStateTerminated // シグナルを受け取っている場合
)
// 十分なバッファを溜め込めるように
postQueue := make(chan []*mackerel.CreatingMetricsValue, conf.Connection.Post_Metrics_Buffer_Size)
go func() {
postDelaySeconds := delayByHost(host) // あとで解説
qState := queueStateFirst // キューの状態
exitChan := make(chan int) // 処理を抜ける際にexitCodeを送るチャンネル
for {
select {
case exitChan = <-termChan: // シグナルを受け取ったら割り込まれる
if len(postQueue) <= 0 {
exitChan <- 0 // キューにデータが残ってなかったら抜ける
} else { // 残っている場合はキューの状態を更新して処理続行
qState = queueStateTerminated
}
case values := <-postQueue:
case values := <-postQueue:
if len(postQueue) > 0 {
// 送り過ぎないように最大2件
logger.Debugf("Merging datapoints with next queued ones")
nextValues := <-postQueue
values = append(values, nextValues...)
}
// 状態に応じてdelayさせるタイミングを調整
delaySeconds := 0
switch qState {
case queueStateTerminated:
delaySeconds = 1
case queueStateFirst:
// nop
case queueStateQueued:
delaySeconds = conf.Connection.Post_Metrics_Dequeue_Delay_Seconds
default:
// 通常状態で00秒にAPIへのサーバーがアクセスすると困るので投稿時間をhashingしている
elapsedSeconds := time.Now().Second() % int(config.PostMetricsInterval.Seconds())
if postDelaySeconds > elapsedSeconds {
delaySeconds = postDelaySeconds - elapsedSeconds
}
}
if qState != queueStateTerminated {
if len(postQueue) > 0 {
qState = queueStateQueued
} else {
qState = queueStateDefault
}
}
sleep中にシグナル受け取っても大丈夫なように待ち受け(汚い…)
sleepCh := make(chan struct{})
go func() {
time.Sleep(delaySeconds * time.Second)
sleepCh <- struct{}{}
}()
sleepLoop:
for {
select {
case <-sleepCh:
break sleepLoop
case exitChan = <-termChan:
qState = queueStateTerminated
break sleepLoop
}
}
泥臭い。ここは要改善。
tries := conf.Connection.Post_Metrics_Retry_Max
for {
err := api.PostMetricsValues(values)
if err == nil {
logger.Debugf("Posting metrics succeeded.")
break
}
tries -= 1
if tries <= 0 {
logger.Errorf("Give up retrying to post metrics.")
break
}
time.Sleep(conf.Connection.Post_Metrics_Retry_Delay_Seconds * time.Second)
}
最後です。
// シグナル受信状態でキューにデータが残ってなかったらexit
if qState == queueStateTerminated && len(postQueue) <= 0 {
exitChan <- 0
}