脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服务器之家 - 脚本之家 - Python - python实现mysql的读写分离及负载均衡

python实现mysql的读写分离及负载均衡

2021-01-12 00:04lc19861217 Python

这篇文章主要介绍了python实现mysql的读写分离及负载均衡 ,需要的朋友可以参考下

Oracle数据库有其公司开发的配套rac来实现负载均衡,目前已知的最大节点数能到128个,但是其带来的维护成本无疑是很高的,并且rac的稳定性也并不是特别理想,尤其是节点很多的时候。

       但是,相对mysql来说,rac的实用性要比mysql的配套集群软件mysql-cluster要高很多。因为从网上了解到情况来看,很少公司在使用mysql-cluster,大多数企业都会选择第三方代理软件,例如MySQL Proxy、Mycat、haproxy等,但是这会引起另外一个问题:单点故障(包括mysql-cluster:管理节点)。如果要解决这个问题,就需要给代理软件搭建集群,在访问量很大的情况下,代理软件的双机或三机集群会成为访问瓶颈,继续增加其节点数,无疑会带来各方面的成本。

那么,如何可以解决这个问题呢?

          解决上述问题,最好的方式个人认为应该是在程序中实现。通过和其他mysql DBA的沟通,也证实了这个想法。但是由此带来的疑问也就产生了:会不会增加开发成本?对现有的应用系统做修改会不会改动很大?会不会增加后期版本升级的难度?等等。

        对于一个架构设计良好的应用系统可以很肯定的回答:不会。

        那么怎么算一个架构设计良好的应用系统呢?

       简单来说,就是分层合理、功能模块之间耦合性底。以本人的经验来说,系统设计基本上可以划分为以下四层:

       1.  实体层:主要定义一些实体类

       2.  数据层:也可以叫SQL处理层。主要负责跟数据库交互取得数据

       3.  业务处:主要是根据业务流程及功能区分模块(或者说定义不同的业务类)

       4.  表现层:呈现最终结果给用户

       实现上述功能(mysql的读写分离及负载均衡),在这四个层次中,仅仅涉及到数据层。

严格来说,对于设计良好的系统,只涉及到一个类的一个函数:在数据层中,一般都会单独划分出一个连接类,并且这个连接类中会有一个连接函数,需要改动的就是这个函数:在读取连接字符串之前加一个功能函数返回需要的主机、ip、端口号等信息(没有开发经历的同学可能理解这段话有点费劲)。

       流程图如下:

python实现mysql的读写分离及负载均衡

           代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import mmap
import json
import random
import mysql.connector
import time
##公有变量
#dbinfos={
#   "db0":{'host':'192.168.42.60','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,"database":"","role":"RW","weight":10,"status":1},
#   "db1":{'host':'192.168.42.61','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,,"database":"":"R","weight":20,"status":1}
#   }
dbinfos={}
mmap_file = None
mmap_time=None
##这个函数返回json格式的字符串,也是实现初始化数据库信息的地方
##使用json格式是为了方便数据转换,从字符串---》二进制--》字符串---》字典
##如果采用其它方式共享dbinfos的方法,可以不用此方式
##配置库的地址
def get_json_str1():
 return json.dumps(dbinfos)
##读取配置库中的内容
def get_json_str():
 try:
  global dbinfos
  cnx = mysql.connector.connect(user='root', password='Abcd.1234',
        host='192.168.42.60',
        database='rwlb')
  cursor = cnx.cursor()
  cmdString="select * from rwlb"
  cnt=-1
  cursor.execute(cmdString)
  for (host,user,pwd,my_user,my_pwd,role,weight,status,port,db ) in cursor:
   cnt=cnt+1
   dict_db={'host':host,'user':user,'pwd':pwd,'my_user':my_user,'my_pwd':my_pwd,"port":port,"database":db,"role":role,"weight":weight,"status":status}
   dbinfos["db"+str(cnt)]=dict_db
  cursor.close()
  cnx.close()
  return json.dumps(dbinfos)
 except:
  cursor.close()
  cnx.close()
  return ""
##判断是否能正常连接到数据库
def check_conn_host():
 try:
  cnx = mysql.connector.connect(user='root', password='Abcd.1234',
        host='192.168.42.60',
        database='rwlb')
  cursor = cnx.cursor()
  cmdString="select user()"
  cnt=-1
  cursor.execute(cmdString)
  for user in cursor:
   cnt=len(user)
  cursor.close()
  cnx.close()
  return cnt
 except :
  return -1;
##select 属于读操作,其他属于写操作-----这里可以划分的更详细,比如执行存储过程等
def analyze_sql_state(sql):
 if "select" in sql:
  return "R"
 else:
  return "W"
##读取时间信息
def read_mmap_time():
 global mmap_time,mmap_file
 mmap_time.seek(0)
 ##初始时间
 inittime=int(mmap_time.read().translate(None, b'\x00').decode())
 ##当前时间
 endtime=int(time.time())
 ##时间差
 dis_time=endtime-inittime
 print("dis_time:"+str(dis_time))
 #重新读取数据
 if dis_time>10:
  ##当配置库正常的情况下才重新读取数据
  print(str(check_conn_host()))
  if check_conn_host()>0:  
   print("read data again")
   mmap_time.seek(0)
   mmap_file.seek(0)
   mmap_time.write(b'\x00')
   mmap_file.write(b'\x00')
   get_mmap_time()
   get_mmap_info()
  else:
   print("can not connect to host")  
 #不重新读取数据
 else:
  print("do not read data again")
##从内存中读取信息,
def read_mmap_info(sql):
 read_mmap_time()
 print("The data is in memory")
 global mmap_file,dict_db
 mmap_file.seek(0)
 ##把二进制转换为字符串
 info_str=mmap_file.read().translate(None, b'\x00').decode()
 #3把字符串转成json格式,方便后面转换为字典使用
 infos=json.loads(info_str)
 host_count=len(infos)
 ##权重列表
 listw=[]
 ##总的权重数量
 wtotal=0
 ##数据库角色
 dbrole=analyze_sql_state(sql)
 ##根据权重初始化一个列表。这个是比较简单的算法,所以权重和控制在100以内比较好----这里可以选择其他比较好的算法
 for i in range(host_count):
  db="db"+str(i)
  if dbrole in infos[db]["role"]:
   if int(infos[db]["status"])==1:
    w=infos[db]["weight"]
    wtotal=wtotal+w
    for j in range(w):
     listw.append(i)
 if wtotal >0:
  ##产生一个随机数
  rad=random.randint(0,wtotal-1)
  ##读取随机数所在的列表位置的数据
  dbindex=listw[rad]
  ##确定选择的是哪个db
  db="db"+str(dbindex)
  ##为dict_db赋值,即选取的db的信息
  dict_db=infos[db]
  return dict_db
 else :
  return {}
##如果内存中没有时间信息,则向内存红写入时间信息
def get_mmap_time():
 global mmap_time
 ##第二个参数1024是设定的内存大小,单位:字节。如果内容较多,可以调大一点
 mmap_time = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time')
 ##读取有效比特数,不包括空比特
 cnt=mmap_time.read_byte()
 if cnt==0:
  print("Load time to memory")
  mmap_time = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time')
  inittime=str(int(time.time()))
  mmap_time.write(inittime.encode())
##如果内存中没有对应信息,则向内存中写信息以供下次调用使用
def get_mmap_info():
 global mmap_file
 ##第二个参数1024是设定的内存大小,单位:字节。如果内容较多,可以调大一点
 mmap_file = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap')
 ##读取有效比特数,不包括空比特
 cnt=mmap_file.read_byte()
 if cnt==0:
  print("Load data to memory")
  mmap_file = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap')
  mmap_file.write(get_json_str().encode())
##测试函数
def test1():
 get_mmap_time()
 get_mmap_info()
 for i in range(10):
  sql="select * from db"
  #sql="update t set col1=a where b=2"
  dbrole=analyze_sql_state(sql)
  dict_db=read_mmap_info(sql)
  print(dict_db["host"])
def test2():
 sql="select * from db"
 res=analyze_sql_state(sql)
 print("select:"+res)
 sql="update t set col1=a where b=2"
 res=analyze_sql_state(sql)
 print("update:"+res)
 sql="insert into t values(1,2)"
 res=analyze_sql_state(sql)
 print("insert:"+res)
 sql="delete from t where b=2"
 res=analyze_sql_state(sql)
 print("delete:"+res)
##类似主函数
if __name__=="__main__":
 test2()

测试结果:

python实现mysql的读写分离及负载均衡

从结果可以看出,只有第一次向内存加载数据,并且按照权重实现了负载均衡。

因为测试函数test1()写的是固定语句,所以读写分离的结果没有显示出来。

另外:测试使用的数据库表结构及数据:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
desc rwlb;
+---------+-------------+------+-----+---------+-------+
| Field | Type  | Null | Key | Default | Extra |
+---------+-------------+------+-----+---------+-------+
| host | varchar(50) | YES |  | NULL |  |
| user | varchar(50) | YES |  | NULL |  |
| pwd  | varchar(50) | YES |  | NULL |  |
| my_user | varchar(50) | YES |  | NULL |  |
| my_pwd | varchar(50) | YES |  | NULL |  |
| role | varchar(10) | YES |  | NULL |  |
| weight | int(11)  | YES |  | NULL |  |
| status | int(11)  | YES |  | NULL |  |
| port | int(11)  | YES |  | NULL |  |
| db  | varchar(50) | YES |  | NULL |  |
+---------+-------------+------+-----+---------+-------+
select * from rwlb;
+---------------+------+----------+---------+-----------+------+--------+--------+------+------+
| host   | user | pwd  | my_user | my_pwd | role | weight | status | port | db |
+---------------+------+----------+---------+-----------+------+--------+--------+------+------+
| 192.168.42.60 | root | Abcd1234 | root | Abcd.1234 | RW |  10 1 | NULL | NULL |
| 192.168.42.61 | root | Abcd1234 | root | Abcd.1234 | R |  20 1 | NULL | NULL |
+---------------+------+----------+---------+-----------+------+--------+--------+------+------+

总结

以上所述是小编给大家介绍的python实现mysql的读写分离及负载均衡,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!

原文链接:https://www.cnblogs.com/lc1217/p/6514349.html

延伸 · 阅读

精彩推荐