Skip to content

Commit 9e61efc

Browse files
author
张俊杰
committed
第一版完整功能提交
1 parent 1d35984 commit 9e61efc

37 files changed

+27563
-2
lines changed

README.md

Lines changed: 121 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,121 @@
1-
# mysql2elasticsearch
2-
一款带有WebGUI的elasticsearch数据同步小工具
1+
# 简介
2+
虽然 logstash dts 等工具提供了很好的多数据源同步订阅方案,但是实际项目中并不会把服务器的上机权限或云厂商账号开放给所有成员。
3+
这个小工具算对这类场景的一种补充,它可以帮助我们从mysql中取出数据并推送到elasticsearch上
4+
5+
# 特点
6+
7+
* 基于 elasticsearch REST APIs 理论无版本上的兼容问题
8+
* 配置简单仅需3步即可完成 1. 配置 mapping 2. 编写获取源数据sql语句 3. 对呀字段映射关系
9+
* 提供WebGUI配置完成后日常维护无需上机
10+
11+
12+
# 安装
13+
14+
下载对应平台运行,或源码编译安装
15+
16+
MAC
17+
```
18+
# Linux
19+
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build
20+
21+
# Windows
22+
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build
23+
```
24+
25+
Linux
26+
```
27+
# Mac
28+
CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build
29+
30+
# Windows
31+
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build
32+
```
33+
34+
Windows
35+
```
36+
# Mac
37+
SET CGO_ENABLED=0
38+
SET GOOS=darwin
39+
SET GOARCH=amd64
40+
go build
41+
42+
# Linux
43+
SET CGO_ENABLED=0
44+
SET GOOS=linux
45+
SET GOARCH=amd64
46+
go build
47+
48+
```
49+
50+
# 配置
51+
52+
```
53+
config.json //需要推送的索引列表和配置文件对应关系
54+
55+
{
56+
"jobList" : [
57+
{"name": "本地" , "filePath" : "MysqlReader.json"},
58+
{"name": "aaa" , "filePath" : "T1.json"}
59+
]
60+
}
61+
62+
-------------
63+
filePath.json //单个配置文件
64+
65+
{
66+
"job": {
67+
"setting": {
68+
"speed": {
69+
"channel" : 8 //worker 数量
70+
}
71+
},
72+
"content": {
73+
"reader": {
74+
"name": "mysqlreader",
75+
"parameter": {
76+
"connection": {
77+
"jdbcUrl" : "账号:密码@tcp(ip)/库",
78+
"querySql": "获取源数据sql",
79+
"boundarySql" : "SELECT min(id) as min,max(id) as max FROM t"
80+
}
81+
}
82+
},
83+
"writer": {
84+
"name": "elasticsearchwriter",
85+
"parameter": {
86+
"endpoint": "http://ip:9200",
87+
"accessId": "账号",
88+
"accessKey": "密码",
89+
"index": "test", //索引名称
90+
"type": "_doc",
91+
"batchSize": 10000,//一次bulk的数量
92+
"splitter": ",",
93+
"column" : [
94+
{"name": "id", "type": "id"},
95+
{"name": "pic_id", "type": "text"},
96+
{"name": "dl_num", "type": "integer"}
97+
],
98+
"dsl" : "{\n \"settings\": {\n \"index\": {\n \"sort.field\": \"pr\",\n \"sort.order\": \"desc\",\n \"store.type\": \"hybridfs\",\n \"number_of_shards\": 1, \n \"number_of_replicas\": 1,\n \"similarity\" : {\n \"default\" : {\n \"type\" : \"BM25\",\n \"b\": 0,\n \"k1\": 1.2\n }\n }\n }\n },\n \"mappings\": {\n \"properties\": {\n \"pic_id\": {\n \"type\": \"keyword\"\n },\n \"author\": {\n \"type\": \"keyword\"\n },\n \"title\": {\n \"type\": \"text\",\n \"analyzer\": \"ik_max_word\",\n \"search_analyzer\": \"ik_smart\"\n },\n \"title_smart\": {\n \"type\": \"text\",\n \"analyzer\": \"ik_smart\"\n },\n \"keyword\": {\n \"type\": \"text\",\n \"analyzer\": \"ik_smart\"\n },\n \"source\": {\n \"type\": \"keyword\"\n },\n \"tags\": {\n \"type\": \"text\",\n \"analyzer\": \"ik_smart\"\n },\n \"c1\": {\n \"type\": \"keyword\"\n },\n \"dl_num\": {\n \"type\": \"integer\",\n \"doc_values\": true\n },\n \"cl_num\": {\n \"type\": \"integer\"\n },\n \"format\": {\n \"type\": \"keyword\"\n },\n \"picwidth\": {\n \"type\": \"integer\"\n },\n \"picheight\": {\n \"type\": \"integer\"\n },\n \"format_type\": {\n \"type\": \"keyword\"\n },\n \"filesize\": {\n \"type\": \"integer\"\n },\n \"pr\": {\n \"type\": \"integer\",\n \"doc_values\": true\n },\n \"down\": {\n \"type\": \"integer\",\n \"doc_values\": true\n },\n \"t_id\": {\n \"type\": \"text\"\n },\n \"create_time\": {\n \"type\": \"integer\",\n \"doc_values\": true\n },\n \"hide_sort_by_type\": {\n \"type\": \"keyword\"\n },\n \"picurl\": {\n \"type\": \"keyword\"\n }\n }\n }\n}" //mapping 语句
99+
}
100+
}
101+
}
102+
103+
}
104+
}
105+
106+
107+
108+
109+
110+
```
111+
112+
113+
# 计划
114+
115+
* 加入更多支持的数据关键字
116+
* 错误写日志
117+
* WebGUI 加入 push 时的进度条
118+
* 在线编辑配置文件
119+
* docker版
120+
121+

config/Config.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package config
2+
3+
import (
4+
"fmt"
5+
"github.com/spf13/viper"
6+
)
7+
8+
type Config struct {
9+
JobList []JobList
10+
}
11+
12+
type JobList struct {
13+
Name string
14+
FilePath string
15+
}
16+
17+
func NewConfig () Config{
18+
19+
config := viper.New()
20+
config.SetConfigName("config.json")
21+
config.SetConfigType("json")
22+
config.AddConfigPath("./config")
23+
err := config.ReadInConfig()
24+
25+
if err != nil {
26+
panic(fmt.Errorf("Fatal error config file: %s \n", err))
27+
}
28+
configjson := Config{}
29+
config.Unmarshal(&configjson)
30+
return configjson
31+
}
32+

config/SynchronousConfig.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package config
2+
3+
import (
4+
"fmt"
5+
"github.com/spf13/viper"
6+
"main/service"
7+
)
8+
9+
type SynchronousConfig struct {
10+
Job Job
11+
}
12+
13+
type Job struct {
14+
Setting Setting
15+
Content Content
16+
}
17+
type Setting struct {
18+
Speed Speed
19+
}
20+
type Speed struct {
21+
Channel int
22+
}
23+
24+
type Content struct {
25+
Reader Reader
26+
Writer Writer
27+
}
28+
29+
30+
//----- Reader ------
31+
type Reader struct {
32+
Name string
33+
Parameter ReaderParameter
34+
}
35+
36+
type ReaderParameter struct {
37+
Username string
38+
Password string
39+
Host string
40+
DbName string
41+
Connection Connection
42+
}
43+
44+
type Connection struct {
45+
QuerySql string
46+
JdbcUrl string
47+
BoundarySql string
48+
}
49+
//----- Reader ------
50+
51+
52+
//----- writer ------
53+
54+
type Writer struct {
55+
Name string
56+
Parameter WriterParameter
57+
}
58+
59+
type WriterParameter struct {
60+
Endpoint string
61+
AccessId string
62+
AccessKey string
63+
Index string
64+
Types string
65+
BatchSize int
66+
Splitter string
67+
Column []Column
68+
Dsl string
69+
}
70+
71+
type Column struct {
72+
Name string
73+
Type string
74+
Analyzer string
75+
Search_analyzer string
76+
Format string
77+
Array string
78+
}
79+
80+
//----- writer ------
81+
82+
83+
84+
func NewSynchronousConfig(configName string) SynchronousConfig {
85+
config := viper.New()
86+
config.SetConfigName(configName)
87+
config.SetConfigType("json")
88+
config.AddConfigPath("./config")
89+
90+
err := config.ReadInConfig()
91+
92+
if err != nil {
93+
panic(fmt.Errorf("Fatal error config file: %s \n", err))
94+
}
95+
synchronousConfig := SynchronousConfig{}
96+
config.Unmarshal(&synchronousConfig)
97+
98+
return synchronousConfig
99+
}
100+
101+
func GetMysqlConfig(job Job) string {
102+
return job.Content.Reader.Parameter.Username + ":" + job.Content.Reader.Parameter.Password + "@tcp(" + job.Content.Reader.Parameter.Host+ ")/" + job.Content.Reader.Parameter.DbName
103+
}
104+
105+
func GetEsConfig(job Job) service.EsConfig {
106+
return service.EsConfig{
107+
Addresses: job.Content.Writer.Parameter.Endpoint,
108+
Username: job.Content.Writer.Parameter.AccessId,
109+
Password: job.Content.Writer.Parameter.AccessKey,
110+
}
111+
}
112+
113+
114+
/**
115+
根据顶层配置文件 配置名称 返回 详细的配置文件信息
116+
*/
117+
func JobNameGetSynchronousConfig(jobName string) (SynchronousConfig, bool) {
118+
configFile := NewConfig()
119+
synchronousConfig := SynchronousConfig{}
120+
for _,v := range configFile.JobList {
121+
if v.Name == jobName {
122+
synchronousConfig = NewSynchronousConfig(v.FilePath)
123+
return synchronousConfig, true
124+
}
125+
}
126+
127+
return synchronousConfig, false
128+
}
129+
130+
/**
131+
根据顶层配置文件 配置名称 返回 es 配置信息
132+
*/
133+
func JobNameGetESConfig(jobName string) (service.EsConfig, SynchronousConfig , bool) {
134+
135+
synchronousConfig, status := JobNameGetSynchronousConfig(jobName)
136+
137+
if status == false {
138+
return service.EsConfig{}, synchronousConfig , false
139+
}
140+
141+
return GetEsConfig(synchronousConfig.Job), synchronousConfig, true
142+
}
143+

config/config.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"jobList" : [
3+
{"name": "测试" , "filePath" : "example.json"},
4+
]
5+
}

config/example.json

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
{
2+
"job": {
3+
"setting": {
4+
"speed": {
5+
"channel" : 8
6+
}
7+
},
8+
"content": {
9+
"reader": {
10+
"name": "mysqlreader",
11+
"parameter": {
12+
"connection": {
13+
"jdbcUrl" : "root:pass@tcp(ip)/db_name",
14+
"querySql" : "SELECT i.id,i.id AS a_id,'palnA' AS author,i.title,i.title as title_smart,i.keyword,i.source,REPLACE(GROUP_CONCAT(tn.name), ',', ' ') AS tags,p.c1,p.dl_num,p.cl_num,i.format,i.awidth,i.aheight,i.format_type,i.filesize,GREATEST(ifnull(pr.pr, 0), 1) AS pr,ifnull(d.down, 0) AS down,REPLACE(GROUP_CONCAT(tn.id), ',', ' ') AS t_id,UNIX_TIMESTAMP(p.createtime) AS create_time,ifnull(h.hide_sort_by_type, 0) AS hide_sort_by_type,i.aurl FROM t1_a p LEFT JOIN t1_ainfo i ON i.id = p.id LEFT JOIN t1_prinfo pr ON pr.id = p.id LEFT JOIN t1_tags t ON p.id = t.a_id LEFT JOIN t1_tags_name tn ON t.tname_id = tn.id LEFT JOIN t1_yesterday_download d ON p.id = d.pid LEFT JOIN t1_a_sort_by_hide h ON h.a_id = p.id WHERE p.sh = 1 and p.id >= ? and p.id <= ? GROUP BY i.id",
15+
"boundarySql" : "SELECT min(id) as min,max(id) as max FROM t1_a"
16+
}
17+
}
18+
},
19+
"writer": {
20+
"name": "elasticsearchwriter",
21+
"parameter": {
22+
"endpoint": "http://192.168.193.35:9200",
23+
"accessId": "root",
24+
"accessKey": "123123",
25+
"index": "t1",
26+
"type": "_doc",
27+
"batchSize": 10000,
28+
"splitter": ",",
29+
"column" : [
30+
{"name": "id", "type": "id"},
31+
{"name": "a_id", "type": "text"},
32+
{"name": "author", "type": "text"},
33+
{"name": "title", "type": "text"},
34+
{"name": "title_smart", "type": "text"},
35+
{"name": "keyword", "type": "text"},
36+
{"name": "source", "type": "text"},
37+
{"name": "tags", "type": "text"},
38+
{"name": "c1", "type": "text"},
39+
{"name": "dl_num", "type": "integer"},
40+
{"name": "cl_num", "type": "integer"},
41+
{"name": "format", "type": "text"},
42+
{"name": "awidth", "type": "integer"},
43+
{"name": "aheight", "type": "integer"},
44+
{"name": "format_type", "type":"text"},
45+
{"name": "filesize", "type": "integer"},
46+
{"name": "pr", "type": "integer"},
47+
{"name": "down", "type": "integer"},
48+
{"name": "t_id", "type": "text"},
49+
{"name": "create_time", "type": "integer"},
50+
{"name": "hide_sort_by_type", "type": "text"},
51+
{"name": "aurl", "type": "text"}
52+
],
53+
"dsl" : "{\n \"settings\": {\n \"index\": {\n \"sort.field\": \"pr\",\n \"sort.order\": \"desc\",\n \"store.type\": \"hybridfs\",\n \"number_of_shards\": 1, \n \"number_of_replicas\": 1,\n \"similarity\" : {\n \"default\" : {\n \"type\" : \"BM25\",\n \"b\": 0,\n \"k1\": 1.2\n }\n }\n }\n },\n \"mappings\": {\n \"properties\": {\n \"a_id\": {\n \"type\": \"keyword\"\n },\n \"author\": {\n \"type\": \"keyword\"\n },\n \"title\": {\n \"type\": \"text\",\n \"analyzer\": \"ik_max_word\",\n \"search_analyzer\": \"ik_smart\"\n },\n \"title_smart\": {\n \"type\": \"text\",\n \"analyzer\": \"ik_smart\"\n },\n \"keyword\": {\n \"type\": \"text\",\n \"analyzer\": \"ik_smart\"\n },\n \"source\": {\n \"type\": \"keyword\"\n },\n \"tags\": {\n \"type\": \"text\",\n \"analyzer\": \"ik_smart\"\n },\n \"c1\": {\n \"type\": \"keyword\"\n },\n \"dl_num\": {\n \"type\": \"integer\",\n \"doc_values\": true\n },\n \"cl_num\": {\n \"type\": \"integer\"\n },\n \"format\": {\n \"type\": \"keyword\"\n },\n \"awidth\": {\n \"type\": \"integer\"\n },\n \"aheight\": {\n \"type\": \"integer\"\n },\n \"format_type\": {\n \"type\": \"keyword\"\n },\n \"filesize\": {\n \"type\": \"integer\"\n },\n \"pr\": {\n \"type\": \"integer\",\n \"doc_values\": true\n },\n \"down\": {\n \"type\": \"integer\",\n \"doc_values\": true\n },\n \"t_id\": {\n \"type\": \"text\"\n },\n \"create_time\": {\n \"type\": \"integer\",\n \"doc_values\": true\n },\n \"hide_sort_by_type\": {\n \"type\": \"keyword\"\n },\n \"aurl\": {\n \"type\": \"keyword\"\n }\n }\n }\n}"
54+
}
55+
}
56+
}
57+
58+
}
59+
}
60+
61+

go.mod

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
module main
2+
3+
go 1.13
4+
5+
require github.com/spf13/viper v1.7.1
6+
7+
require github.com/gin-gonic/gin v1.7.1
8+
9+
require github.com/go-sql-driver/mysql v1.5.0
10+
11+
require github.com/olivere/elastic/v7 v7.0.24
12+
13+
require github.com/Jeffail/gabs/v2 v2.6.1

0 commit comments

Comments
 (0)