博主辛苦了,我要打赏银两给博主,犒劳犒劳站长。
【摘要】这段时间需要经常使用 python 来操作 mysql 进行数据处理工作,由于之前一直没有对此进行整理,导致每次遇到对数据库的操作都是重新写,没有一套固定的代码,不够规范,为了能够方便以后的工作,能够快速上手,所以本文对此记录,仅供个人参考使用!另外,非常欢迎大家能够提出建议,使得代码更加完善!
完整代码如下:
# -*- coding: utf-8 -*-
__author__ = "mafutian"
__time__ = "2020-03-27"
import sys
import pymysql
class myDb():
objs = dict()
def __new__(cls,db_env = ''):
if db_env not in myDb.objs.keys():
obj = object.__new__(cls)
myDb.objs[db_env] = obj
else:
obj = myDb.objs[db_env]
return obj
def __init__(self,db_env = '',db_charset = 'utf8mb4',db_port = 3306):
self.__conn = None
self.__cursor = None
dev_config = {
'db_host':'127.0.0.1',
'db_username':'root',
'db_password':'root',
'db_name':'test',
'db_charset':db_charset,
'db_port':db_port,
}
test_config = {
'db_host':'',
'db_username':'',
'db_password':'',
'db_name':'',
'db_charset':db_charset,
'db_port':db_port,
}
pro_config = {
'db_host':'',
'db_username':'',
'db_password':'',
'db_name':'',
'db_charset':db_charset,
'db_port':db_port,
}
if db_env == 'pro_db':
db_config = pro_config
elif db_env == 'test_db':
db_config = test_config
elif db_env == 'dev_db':
db_config = dev_config
else:
self.error_msg('database connect failed!')
try:
self.__conn = pymysql.connect(
host = db_config['db_host'],
user = db_config['db_username'],
passwd = db_config['db_password'],
db = db_config['db_name'],
port = db_config['db_port'],
charset = db_config['db_charset'],
)
except Exception as e:
self.error_msg('database connect failed:',e)
else:
self.__cursor = self.__conn.cursor(cursor = pymysql.cursors.DictCursor)
def __del__(self):
try:
self.__cursor.close()
self.__conn.close()
except:
pass
def error_msg(self,msg = '',e = ''):
sys.exit(msg + str(e))
def query(self,sql,values = []):
try:
if not values:
self.__cursor.execute(sql)
else:
self.__cursor.execute(sql,values)
except Exception as e:
self.error_msg('execute failed:',e)
res = self.__cursor.fetchall()
return res
def query_one(self,sql,values = []):
try:
if not values:
self.__cursor.execute(sql)
else:
self.__cursor.execute(sql,values)
except Exception as e:
self.error_msg('execute failed:',e)
res = self.__cursor.fetchone()
return res
def select(self,table_name,columns = ['*'],condition = ''):
condition = ' where ' + condition if condition else None
if condition:
sql = "select %s from %s %s" % (','.join(columns),table_name,condition)
else:
sql = "select %s from %s" % (','.join(columns),table_name)
try:
self.__cursor.execute(sql)
except Exception as e:
self.error_msg('execute failed:',e)
return self.__cursor.fetchall()
def insert(self,table_name,columns = (),val = (),auto_commit = False):
lens = len(columns)
sql = "INSERT INTO " + table_name + "(" + ','.join(columns) + ") VALUES " + "(" + ','.join(['%s'] * lens) + ")"
try:
self.__cursor.executemany(sql,val)
if auto_commit == True:
self.commit()
except Exception as e:
self.error_msg('execute failed:',e)
self.rollback()
return self.rowcount()
def insert_one(self,table_name = '',data = {},auto_commit = False):
lens = len(data)
placeholder = ['%s'] * lens
fields = []
values = []
for field,value in data.items():
fields.append(field)
values.append(value)
sql = "INSERT INTO " + table_name + "(" + ','.join(fields) + ") VALUES (" + ','.join(placeholder) + ")"
try:
self.__cursor.execute(sql,values)
if auto_commit == True:
self.commit()
except Exception as e:
print('execute failed:',e)
return self.rowcount()
def execute(self,sql,values = [],auto_commit = False):
try:
if not values:
self.__cursor.execute(sql)
else:
self.__cursor.execute(sql,values)
if auto_commit == True:
self.commit()
except Exception as e:
self.error_msg('execute failed:',e)
return self.rowcount()
def rowcount(self):
return self.__cursor.rowcount
def last_insert_id(self):
return self.__conn.insert_id()
def commit(self):
self.__conn.commit()
def rollback(self):
self.__conn.rollback()
def close(self):
self.__del__()
if __name__ == '__main__':
'''
用到的表:
CREATE TABLE `test` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
'''
db = myDb(db_env = 'dev_db')
table_name = 'test'
columns = ('name','age')
val = (('mafutian',20),('zhangsan',18))
rowcount = db.insert(table_name = table_name,columns = columns,val = val)
db.commit()
if rowcount > 0:
print('insert success')
data = {
'name':'hello world',
'age':16,
}
rowcount = db.insert_one(table_name = table_name,data = data,auto_commit = True)
if rowcount > 0:
print('insert success')
res = db.select(table_name = table_name,columns = columns,condition = '')
# print(res)
sql = 'select id,name,age from test where id >= %s'
res = db.query_one(sql,['5'])
rowcount = db.rowcount()
if rowcount > 0:
print('find success,total is ',rowcount)
# print(res)
res = db.query(sql,['2'])
rowcount = db.rowcount()
if rowcount > 0:
print('find success,total is ',rowcount)
# print(res)
sql = 'update test set name = %s where id = %s'
rowcount = db.execute(sql,['python',1])
db.commit()
if rowcount > 0:
print('data change,update success')
sql = 'delete from test where id = 2'
rowcount = db.execute(sql)
db.commit()
if rowcount > 0:
print('data change,delete success')
代码没有写太多注释,因为主要是给我自己整理、使用的,所以没有写注释啦,但是相信大家都能够看懂,所以期待大家能够给出建议啊~
版权归 马富天个人博客 所有
本文链接地址:http://www.mafutian.com/442.html
转载请务必注明出处,小生将不胜感激,谢谢! 喜欢本文或觉得本文对您有帮助,请分享给您的朋友 ^_^
顶0
踩0
评论审核未开启 |
![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() |
||