python3 分批讀寫mysql

分批讀取

import pandas as pd
import pymysql
from sqlalchemy import create_engine
pymysql.install_as_MySQLdb()

第一種讀取數(shù)據(jù)的方式

#建立數(shù)據(jù)庫連接
con=pymysql.connect(
  host="10.***.***.***",      #ip地址
  database="db1",        #需讀取的數(shù)據(jù)表所在數(shù)據(jù)庫的庫名
  user="user",               #mysql用戶名
  password="password",    #密碼
  port=3306,       #端口號
  charset='utf8'    
)

cur = con.cursor()     #創(chuàng)建游標(biāo)

#數(shù)據(jù)讀取函數(shù)
def read_table(cur, sql): 
    try:
        cur.execute(sql) 
        d  = cur.fetchall()
        df = pd.DataFrame(list(d))
    except Exception as e:
        df = pd.DataFrame()
        print('read data from mysql failed:',sqli)
        print(e)
    return df

#設(shè)置分批的節(jié)點(diǎn)
def batch_generate(n, b):  #n 數(shù)據(jù)條數(shù),b分批數(shù)目
    batch = []
    for i in range(0, n, int(n/b)): 
        batch.append(i)
    batch.append(n)
    return batch

batch_i = batch_generate(n=10050, b=10)

#分批讀取數(shù)據(jù),保存成dataframe。示例中mysql數(shù)據(jù)表中的數(shù)據(jù)可根據(jù)id分批
dat = pd.DataFrame()
for i in range(1,len(batch_i)):  
   # print(batch_i[i-1],batch_i[i])
    sqli = 'select * from db1.test where id>%d and id<=%d' % (batch_i[i-1], batch_i[i])       #sql語句
    d = read_table(cur=cur, sql=sqli)          #讀取數(shù)據(jù)
    dat = pd.concat([dat,d], axis=0)            #按行合并數(shù)據(jù)

cur.close()    #關(guān)閉游標(biāo)
con.close()    #關(guān)閉連接

第二種讀取數(shù)據(jù)的方式

con_engine = create_engine('mysql://user:password@10.***.***.***:3306/db1?charset=utf8') 

sql = 'select * from db1.test'
d = pd.read_sql(sql=sql, con=con_engine, chunksize=10)    #分10批讀取數(shù)據(jù),語句返回的是生成器

dat = pd.DataFrame()
for i in d:
    dat = pd.concat([dat,  i], axis = 0)
dat.index = range(len(dat))    #上述方法輸出的index會有重復(fù),更新index

分批寫入mysql

第一種方式數(shù)據(jù)寫入數(shù)據(jù)庫

#建立連接
con=pymysql.connect(
  host="10.***.***.***",      #ip地址
  database="db1",        #需讀取的數(shù)據(jù)表所在數(shù)據(jù)庫的庫名
  user="user",               #mysql用戶名
  password="password",    #密碼
  port=3306,       #端口號
  charset='utf8'    
)

cur = con.cursor()

#寫入數(shù)據(jù)
batch_insert_i = batch_generate(n=1000,b=10)
for i in range(1,len(batch_insert_i)):
    sqli = 'insert into db1.test_py(id, w_no, c_code, s_code, c_dt, l_dt)  values(%s,%s,%s,%s,%s,%s)'
    dati = (dat.iloc[batch_insert_i[i-1]:batch_insert_i[i], :]).values.tolist()
    #print(batch_insert_i[i-1], batch_insert_i[i])
    try:
        # 執(zhí)行sql語句
        cur.executemany(sqli,dati)
        con.commit()     # 提交到數(shù)據(jù)庫執(zhí)行
    except Exception as e:
        # 如果發(fā)生錯誤則退出,也可以不退出,回滾con.rollback()
        print(e)
        break   #con.rollback()

cur.close()    # 關(guān)閉游標(biāo)
con.close()    # 關(guān)閉數(shù)據(jù)庫連接

第二種方式寫入數(shù)據(jù)庫

con_engine = create_engine('mysql://user:password@10.***.***.***:3306/db1?charset=utf8') 

dat.to_sql(con=con_engine, name='test_py', if_exists='append', index=False, chunksize=10) 
#name數(shù)據(jù)表名; if_exists='append'  若不存在test_py表則新建,若存在則追加寫入。

注:
第二種讀取和寫入方式直接調(diào)用pandas連接mysql,在讀寫大規(guī)模數(shù)據(jù)時(shí)效率更高。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容