分批讀取
import pandas as pd
import pymysql
from sqlalchemy import create_engine
pymysql.install_as_MySQLdb()
第一種讀取數(shù)據(jù)的方式
#建立數(shù)據(jù)庫連接
con=pymysql.connect(
host="10.***.***.***", #ip地址
database="db1", #需讀取的數(shù)據(jù)表所在數(shù)據(jù)庫的庫名
user="user", #mysql用戶名
password="password", #密碼
port=3306, #端口號
charset='utf8'
)
cur = con.cursor() #創(chuàng)建游標(biāo)
#數(shù)據(jù)讀取函數(shù)
def read_table(cur, sql):
try:
cur.execute(sql)
d = cur.fetchall()
df = pd.DataFrame(list(d))
except Exception as e:
df = pd.DataFrame()
print('read data from mysql failed:',sqli)
print(e)
return df
#設(shè)置分批的節(jié)點(diǎn)
def batch_generate(n, b): #n 數(shù)據(jù)條數(shù),b分批數(shù)目
batch = []
for i in range(0, n, int(n/b)):
batch.append(i)
batch.append(n)
return batch
batch_i = batch_generate(n=10050, b=10)
#分批讀取數(shù)據(jù),保存成dataframe。示例中mysql數(shù)據(jù)表中的數(shù)據(jù)可根據(jù)id分批
dat = pd.DataFrame()
for i in range(1,len(batch_i)):
# print(batch_i[i-1],batch_i[i])
sqli = 'select * from db1.test where id>%d and id<=%d' % (batch_i[i-1], batch_i[i]) #sql語句
d = read_table(cur=cur, sql=sqli) #讀取數(shù)據(jù)
dat = pd.concat([dat,d], axis=0) #按行合并數(shù)據(jù)
cur.close() #關(guān)閉游標(biāo)
con.close() #關(guān)閉連接
第二種讀取數(shù)據(jù)的方式
con_engine = create_engine('mysql://user:password@10.***.***.***:3306/db1?charset=utf8')
sql = 'select * from db1.test'
d = pd.read_sql(sql=sql, con=con_engine, chunksize=10) #分10批讀取數(shù)據(jù),語句返回的是生成器
dat = pd.DataFrame()
for i in d:
dat = pd.concat([dat, i], axis = 0)
dat.index = range(len(dat)) #上述方法輸出的index會有重復(fù),更新index
分批寫入mysql
第一種方式數(shù)據(jù)寫入數(shù)據(jù)庫
#建立連接
con=pymysql.connect(
host="10.***.***.***", #ip地址
database="db1", #需讀取的數(shù)據(jù)表所在數(shù)據(jù)庫的庫名
user="user", #mysql用戶名
password="password", #密碼
port=3306, #端口號
charset='utf8'
)
cur = con.cursor()
#寫入數(shù)據(jù)
batch_insert_i = batch_generate(n=1000,b=10)
for i in range(1,len(batch_insert_i)):
sqli = 'insert into db1.test_py(id, w_no, c_code, s_code, c_dt, l_dt) values(%s,%s,%s,%s,%s,%s)'
dati = (dat.iloc[batch_insert_i[i-1]:batch_insert_i[i], :]).values.tolist()
#print(batch_insert_i[i-1], batch_insert_i[i])
try:
# 執(zhí)行sql語句
cur.executemany(sqli,dati)
con.commit() # 提交到數(shù)據(jù)庫執(zhí)行
except Exception as e:
# 如果發(fā)生錯誤則退出,也可以不退出,回滾con.rollback()
print(e)
break #con.rollback()
cur.close() # 關(guān)閉游標(biāo)
con.close() # 關(guān)閉數(shù)據(jù)庫連接
第二種方式寫入數(shù)據(jù)庫
con_engine = create_engine('mysql://user:password@10.***.***.***:3306/db1?charset=utf8')
dat.to_sql(con=con_engine, name='test_py', if_exists='append', index=False, chunksize=10)
#name數(shù)據(jù)表名; if_exists='append' 若不存在test_py表則新建,若存在則追加寫入。
注:
第二種讀取和寫入方式直接調(diào)用pandas連接mysql,在讀寫大規(guī)模數(shù)據(jù)時(shí)效率更高。