Skip to content

Commit 2de075c

Browse files
author
张俊杰
committed
添加进度条和配置信息
1 parent 150f739 commit 2de075c

File tree

12 files changed

+298
-72
lines changed

12 files changed

+298
-72
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
# Dependency directories (remove the comment below to include it)
1515
vendor/
1616

17+
config/*.json
18+
1719
#MAC
1820
.DS_Store
1921
#IDE

README.md

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
虽然 logstash dts 等工具提供了很好的多数据源同步订阅方案,但是实际项目中并不会把服务器的上机权限或云厂商账号开放给所有成员。
33
这个小工具算对这类场景的一种补充,它可以帮助我们从mysql中取出数据并推送到elasticsearch上
44

5+
![16291791332153](http://pic.phpzjj.com/mweb/2021/08/17/16291791332153.jpg)
6+
57
# 特点
68

79
* 基于 elasticsearch REST APIs 理论无版本上的兼容问题
8-
* 配置简单仅需3步即可完成 1. 配置 mapping 2. 编写获取源数据sql语句 3. 对呀字段映射关系
10+
* 配置简单仅需3步即可完成 1. 配置 mapping 2. 编写获取源数据sql语句 3. 对应字段映射关系
911
* 提供WebGUI配置完成后日常维护无需上机
1012

1113

@@ -14,6 +16,7 @@
1416
下载对应平台运行,或源码编译安装
1517

1618
MAC
19+
1720
```
1821
# Linux
1922
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build
@@ -108,14 +111,3 @@ filePath.json //单个配置文件
108111
109112
110113
```
111-
112-
113-
# 计划
114-
115-
* 加入更多支持的数据关键字
116-
* 错误写日志
117-
* WebGUI 加入 push 时的进度条
118-
* 在线编辑配置文件
119-
* docker版
120-
121-

config/Config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
)
77

88
type Config struct {
9+
Title string
10+
Port int
911
JobList []JobList
1012
}
1113

config/config.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
{
2+
"title" : "es推送工具",
3+
"port" : 9002,
24
"jobList" : [
35
{"name": "测试" , "filePath" : "example.json"}
46
]

go.mod

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,8 @@ require github.com/go-sql-driver/mysql v1.5.0
1010

1111
require github.com/olivere/elastic/v7 v7.0.24
1212

13-
require github.com/Jeffail/gabs/v2 v2.6.1
13+
require (
14+
github.com/Jeffail/gabs/v2 v2.6.1
15+
github.com/fvbock/endless v0.0.0-20170109170031-447134032cb6
16+
github.com/gorilla/websocket v1.4.2
17+
)

go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
5050
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
5151
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
5252
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
53+
github.com/fvbock/endless v0.0.0-20170109170031-447134032cb6 h1:6VSn3hB5U5GeA6kQw4TwWIWbOhtvR2hmbBJnTOtqTWc=
54+
github.com/fvbock/endless v0.0.0-20170109170031-447134032cb6/go.mod h1:YxOVT5+yHzKvwhsiSIWmbAYM3Dr9AEEbER2dVayfBkg=
5355
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
5456
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
5557
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
@@ -111,6 +113,7 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
111113
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
112114
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
113115
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
116+
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
114117
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
115118
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
116119
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=

service/monitor/progress.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package monitor
2+
3+
import "sync"
4+
5+
var ProgressBars = map[string]ProgressBar{}
6+
7+
type ProgressBar struct {
8+
Total int
9+
Progress *sync.Map
10+
}
11+
12+
type ProgressBarJson struct {
13+
Name string
14+
Total int
15+
Progress int
16+
Status int // 101 运行中 200 执行完成
17+
}

service/push/bulkPush.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/olivere/elastic/v7"
99
"main/config"
1010
"main/service"
11+
"main/service/monitor"
1112
"main/service/parse"
1213
"strconv"
1314
"sync"
@@ -21,7 +22,7 @@ type section struct {
2122

2223

2324
func BulkPushRun(esConfig service.EsConfig, name string,
24-
conn config.Content, channel int,) error {
25+
conn config.Content, channel int, configName string) error {
2526

2627
// 最小 最大 启动数 计算区间
2728
var max,min int
@@ -38,11 +39,31 @@ func BulkPushRun(esConfig service.EsConfig, name string,
3839

3940
channelData := generate(max, min, channel)
4041

42+
43+
channelWorkNumbers := (max-min)/conn.Writer.Parameter.BatchSize
44+
monitor.ProgressBars[configName] = monitor.ProgressBar{Total: channelWorkNumbers, Progress: &sync.Map{}}
45+
monitor.ProgressBars[configName].Progress.Store("number", -1)
46+
47+
//fmt.Printf("max:%s, min:%s, = : %s \n", max, min, channelWorkNumbers)
48+
49+
50+
4151
var wg = sync.WaitGroup{}
4252

53+
54+
/* go func() {
55+
for true {
56+
a,_ := monitor.ProgressBars[name].Progress.Load("number")
57+
fmt.Printf("【%s】total:%s, ing:%s \n",name,monitor.ProgressBars[name].Total, a)
58+
time.Sleep(time.Second*10)
59+
}
60+
61+
}()*/
62+
4363
for _,v := range channelData{
4464
wg.Add(1)
45-
go workProcess(v.Max, v.Min, conn, esConfig, name, &wg)
65+
//fmt.Printf("最小:%s, 最大:%s \n",v.Min, v.Max)
66+
go workProcess(v.Max, v.Min, conn, esConfig, name, &wg, configName)
4667
}
4768

4869
wg.Wait()
@@ -59,7 +80,7 @@ func generate(max int, min int, channel int) map[int]section {
5980
channelPip := make(map[int]section)
6081
extent := (max - min) / channel
6182

62-
for i:=0; i <= channel; i++ {
83+
for i:=1; i <= channel; i++ {
6384
channelPip[i] = section{min,min + extent}
6485
min = min + extent
6586
}
@@ -72,6 +93,7 @@ func generate(max int, min int, channel int) map[int]section {
7293
func workProcess (max int , min int, conn config.Content,
7394
esConfig service.EsConfig, name string,
7495
wg *sync.WaitGroup,
96+
configName string,
7597
) (error) {
7698

7799
client := service.NewEsObj(esConfig)
@@ -102,10 +124,14 @@ func workProcess (max int , min int, conn config.Content,
102124
return error
103125
}
104126

105-
for i:=min ; i <= max; i = i + conn.Writer.Parameter.BatchSize {
127+
for i:=min; i <= max; i = i + conn.Writer.Parameter.BatchSize {
128+
129+
a,_ := monitor.ProgressBars[configName].Progress.Load("number")
130+
monitor.ProgressBars[configName].Progress.Store("number", a.(int)+1)
106131

107132
temp := conn.Writer.Parameter.BatchSize + i
108133

134+
109135
if temp > max {
110136
temp = max
111137
}

webGUI/controllers/IndexController.go

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,22 @@ package controllers
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"github.com/gin-gonic/gin"
8+
"github.com/gorilla/websocket"
9+
"log"
710
"main/config"
811
"main/service"
912
"main/service/errno"
13+
"main/service/monitor"
1014
"main/service/push"
15+
"net/http"
16+
"os"
17+
"os/exec"
18+
"time"
1119
)
1220

13-
1421
func Index(c *gin.Context) {
1522

1623
configFile := config.NewConfig()
@@ -45,7 +52,7 @@ func Index(c *gin.Context) {
4552
}
4653

4754
c.HTML(200, "index.tmpl", gin.H{
48-
"title" : "包图网es推送service",
55+
"title" : configFile.Title,
4956
"jobList" : job,
5057
})
5158
}
@@ -121,6 +128,7 @@ func Push(c *gin.Context) {
121128
synchronousConfig.Job.Content.Writer.Parameter.Index+ "_a",
122129
synchronousConfig.Job.Content,
123130
synchronousConfig.Job.Setting.Speed.Channel,
131+
name,
124132
)
125133

126134
if db_error != nil {
@@ -167,6 +175,7 @@ func Push(c *gin.Context) {
167175
new_index_suffix,
168176
synchronousConfig.Job.Content,
169177
synchronousConfig.Job.Setting.Speed.Channel,
178+
name,
170179
)
171180

172181
if db_error != nil {
@@ -188,7 +197,8 @@ func Push(c *gin.Context) {
188197

189198
client.DeleteIndex(now_index_suffix).Do(ctx)
190199

191-
200+
monitor.ProgressBars[name].Progress.Delete("number")
201+
delete(monitor.ProgressBars, name)
192202

193203
c.JSON(200, gin.H{
194204
"code": 200,
@@ -197,3 +207,85 @@ func Push(c *gin.Context) {
197207
}
198208

199209
}
210+
211+
func Progress(c *gin.Context) {
212+
213+
var upgrader = websocket.Upgrader{
214+
ReadBufferSize: 1024,
215+
WriteBufferSize: 1024,
216+
CheckOrigin: func(r *http.Request) bool {
217+
return true
218+
},
219+
}
220+
221+
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) // 升级协议
222+
223+
defer conn.Close()
224+
225+
if err != nil {
226+
log.Println(err)
227+
return
228+
}
229+
230+
for {
231+
messageType, message, err := conn.ReadMessage()
232+
233+
if err != nil {
234+
log.Println(err)
235+
return
236+
}
237+
238+
var progressBarJson []byte
239+
var IsClose = false
240+
for {
241+
progress,ok := monitor.ProgressBars[string(message[:])]
242+
243+
if ok == true {
244+
progress,_ := progress.Progress.Load("number")
245+
246+
progressBarJson,_ = json.Marshal(monitor.ProgressBarJson{
247+
Total: monitor.ProgressBars[string(message[:])].Total,
248+
Progress: progress.(int),
249+
Name: string(message[:]),
250+
Status: 101,
251+
})
252+
253+
} else {
254+
255+
progressBarJson,_ = json.Marshal(monitor.ProgressBarJson{
256+
Name: string(message[:]),
257+
Status: 200,
258+
})
259+
260+
IsClose = true
261+
262+
}
263+
264+
if err := conn.WriteMessage(messageType, progressBarJson); err != nil {
265+
log.Println(err)
266+
return
267+
}
268+
269+
if IsClose {
270+
conn.Close()
271+
}
272+
273+
274+
time.Sleep(time.Second*1)
275+
}
276+
277+
278+
}
279+
280+
}
281+
282+
func Restart(c *gin.Context) {
283+
284+
//todo 重新启动还要找其他的解决方法
285+
str,_ :=os.Getwd()
286+
fmt.Println(str)
287+
s := exec.Command("bash", "-c", "kill `lsof -t -i:9100` && " + str +"/main")
288+
output, _ := s.CombinedOutput()
289+
fmt.Println(string(output))
290+
291+
}

0 commit comments

Comments
 (0)