Python操作Mysql类

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import MySQLdb
from warnings import filterwarnings
filterwarnings('error', category = MySQLdb.Warning)

class MySQL :

    __conn = None
    __cursor = None

    def __init__(self, host, user, passwd, db ,port = 3306, charset = 'utf8'):
        try:
            self.__conn = MySQLdb.connect(host=host, user=user, passwd=passwd, db=db, port=port,charset=charset)
            self.__cursor = self.__conn.cursor()
        except MySQLdb.Warning, w:
            print "警告信息 %s" % str(w)
        except MySQLdb.Error, e:
            print "错误信息 %d %s" % (e.args[0], e.args[1])
    #插入数据
    def insert(self, table, dict):
        if len(dict) > 0 :
            keys = dict.keys()
            keyStr = ""
            valStr = ""
            for key in keys:
                keyStr += "`" + str(key) + '`,'
                valStr += "'" + str(dict[key]) + "',"
            keyStr = keyStr.rstrip(',')
            valStr = valStr.rstrip(',')
            sql = "INSERT INTO `" + str(table) + "` (" + keyStr + ") VALUES " + "("+ valStr +")"
            print '执行语句: %s' % sql
            try :
                self.__cursor.execute(sql)
                #lastId = self.__conn.insert_id() #这个一定要在commit之前!!!
                lastId = self.__cursor.lastrowid
                self.__conn.commit()
                return lastId
            except:
                print '插入数据异常!执行回滚!'
                self.__conn.rollback()
                return False
        else:
            return False

    #查询一条数据
    def find(self, table, fields = "*", where = "", order = 'id desc'):
        sql = "select " + str(fields) + " from `"+ str(table) + "` "
        if where != '':
            sql += " where " + str(where)
        sql += " order by " + str(order) + " limit 1"
        print "执行语句: %s" % sql
        try:
            self.__cursor.execute(sql)
            return self.__cursor.fetchone()
        except :
            print '查询失败'
            return False

    #查询多条数据
    def fetchall(self, table, fields = "*", where = "", order = "id desc", offset = "0", limit = '20'):
        sql = "select " + str(fields) + " from `" + str(table) + "` "
        if where != '':
            sql += " where " + str(where)
        sql += " order by " + str(order) + " limit " + str(offset) + "," + str(limit)
        print "执行语句: %s" % sql
        try:
            self.__cursor.execute(sql)
            return self.__cursor.fetchall()
        except:
            print '查询失败'
            return False
    # 删除数据
    def delete(self, table, where, limit = 1):
        sql = "delete from `" + str(table) + "` where " + str(where)
        if limit > 0 :
            sql += " limit " + str(limit)
        print '执行语句: %s' % sql
        try:
           self.__cursor.execute(sql)
           self.__conn.commit()
           return True
        except:
            print '删除异常!执行回滚!'
            self.__conn.rollback()
            return False

    # 更新数据
    def update(self, table, dict, where):
        keys = dict.keys()
        updateStr = ""
        for key in keys:
            updateStr += "`" + str(key) + "` = '" + str(dict[key]) +"',"
        updateStr = updateStr.rstrip(',')
        sql = "update `"+str(table)+"` set "+updateStr+" where " + where
        print '执行语句: %s' % sql
        try:
            self.__cursor.execute(sql)
            self.__conn.commit()
            return True
        except:
            print '更新异常!执行回滚!'
            self.__conn.rollback()
    # 释放资源
    def __del__(self):
        self.__cursor.close()
        self.__conn.close()

############################################案例##############

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import time
from MySQL import *
mysql = MySQL(host='10.200.10.203', user='test', passwd='test', db='test')
# print mysql.find('test', 'count(*) as t')
# print mysql.fetchall('test')
# print mysql.delete('test', 'id = 69')
create_time = time.strftime('%Y-%m-%d %H:%M:%S')
content ='这里是测试内容'
name = '测试'
dict = {'name':name,'create_time':create_time,'content':content}
lastid = mysql.insert('test', dict)
name = '测试最新的!!!'
dict = {'name':name}
print mysql.update('test',dict,"id = " + str(lastid))



 

Python不同层级目录文件import

 

 

目录层级如图所示QQ截图20160621165358

 

#ex1定义函数
#!/usr/bin/python
# -*- coding: UTF-8 -*-

def test(str):
    print 'This is ex1'
    print str

#ex2导入示例
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# import ex1
# ex1.test('ex2')
from ex1 import *
test('ex2')

#ex4和ex3需要在a目录下建立一个__init__.py文件(内容可以为空) 这个玩意在导入时会执行,可以把项目需要的包写在这里那么其他地方就不用再一个一个导入了
#ex4导入示例
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# import a.ex1
# a.ex1.test('ex4')
from a.ex1 import *
test('ex4')

#ex3导入示例
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import sys
sys.path.append('..')
# import a.ex1
# a.ex1.test('ex3')
from a.ex1 import *
test('ex3')


 

Python列表、元组、字典操作

#!/usr/bin/python
# -*- coding: UTF-8 -*-

list1 = [1,2,'a', 'b']
seq = (1,4,5)
print   list1[2]  #获取第三个元素(下标是从0开始)
print   list1[-3] #获取倒数第三个元素
print   list1[2:] #获取第三个元素到最后一个
print   len(list1) #获取列表元素个数
print   max(list1)  #获取最大的元素
print   min(list1)  #获取最小的元素
print   list(seq)   #将元组转换成列表
list1.append(12)    #列表最后元素追加一个元素
list1.count(1)      #统计元素出现次数
list1.index(1)      #元素第一次出现的位置
list1.insert(1,33)  #将元素插入指定位置
print list1
print list1.pop()   #弹出最后一个元素
list1.remove(2) #移除匹配元素的第一个值
print  list1
list1.reverse()    #反转列表
print list1
list1.sort()
print list1
#元组操作
#!/usr/bin/python
# -*- coding: UTF-8 -*-
#元组操作
seq1 = (1,) #创建一个元素的元组也要带逗号
seq2 = (1,68,3,444,333,55,5)
list1 = [1,4,5,8,9,0]
print seq2
#seq1[0] = 123 #元组元素不可修改
print tuple(list1) #列表转换成元组
print max(seq2) #元组最大元素
print len(seq2) #元组长度
print min(seq2) #元组最小元素

#字典操作
dict = {'Name': 'Joyous', 'Age': 27} #键必须唯一,键可以是数字、字符、元组
dict1 = {'H':5}
print dict
# del dict['Name'] #删除单个
# dict.clear()     #清空字典
# del dict          #删除字典
print len(dict) #打印字典长度
# print str(dict) #打印字典
# print type(dict) #变量类型
print dict.get('sex', 1) #返回键值,如果不存在则用第二个参数做默认值
print dict.has_key('sex') #返回是否存在sex键
print dict.items() #列表返回字典(key,val)
print dict.keys()  #返回所有键,列表
dict.update(dict1) #把dict1加入dict中
print  dict
print dict.values() #返回dict所有val

 

Python文件操作

#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
path = os.getcwd()
file_list = os.listdir(path)
if '2.log' in file_list :
    os.remove(path + '/2.log')

r_file = open(path + '/1.log')
w_file = open(path + '/2.log', 'w')
r_content = r_file.read()
w_content = r_content.replace('你好','他好')
w_file.writelines(w_content)
r_file.close()
w_file.close()
函数 参数 说明
os.getcwd() 获取当前工作目录
os.chdir(path) 目录路径 切换当前工作目录到path
os.listdir(path) 目录路径 返回path文件夹和文件名的列表
os.makedirs(path  [, mod]) 目录路径,模式 创建文件夹,递归创建
os.mkdir(path [, mode] ) 目录路径,模式 创建文件夹,上层文件夹不存在则抛出异常
os.open(file, flags [,mode]) 文件名,标示,模式 打开文件
os.remove(filename) 文件名 删除文件,如果为路径则抛出异常
os.removedirs(path) 目录路径 删除非空目录
file.close() 关闭文件
file.flush() 内部缓冲区内容写入文件
file.next() 返回文件下一行
file.read([size]) 长度 读取文件制定长度
file.readline([size]) 长度 读取文件整行
file.readlines([size]) 长度 读取文件所有行
file.seek(offset [,size]) 设置文件当前位置
file.tell() 返回文件当前位置
file.write(str) 字符串 将字符串写入文件
file.writelines(seq) 列表 将一个字符串列表写入文本
os.path.altsep 目录分割符
os.path.split(filename) 文件路径 返回目录好文件名组成的二元组
os.path.splitext(filename) 文件路径 分离扩展名,二元组
os.path.dirname(filename) 文件路径 返回文件路径
os.path.basename(filename) 文件路径 返回文件名
os.path.getsize(filename) 文件路径 返回文件大小,字节
os.mknod(filename) 文件名 创建空白文件
os.path.isfile(str) 检查str是否是文件
os.path.isdir(str) 检查str是否是目录

Python字符串

# -*- coding: utf-8 -*-
import re
#字符串替换
str1 = 'hello world  world world abc=123 abc=valu'
#第一参数是被替换的字符串,第二个参数是新字符串,第三个是替换次数(默认替换全部)
print str1.replace('world', 'Pythoner')
old = re.compile("abc=\d{1,}")
print old.sub("abc=234",str1)

 

字符串函数列表
函数名 参数 说明
str.strip() 去除字符串两端空格
str.lstrip() 去除左侧空格
str.rstrip() 默认去除右侧空格,传递参数则去除指定字符
str.index() 字符 查找字符在字符串位置
str.upper() 将字符串转换成大写
str.lower() 将字符串转换成小写
str[::-1] 反转字符串
str.split() 分隔符 分割字符串,返回一个列表
str.join() 列表 将列表元素用str连接
str.swapcase() 将字符串中大小写互换
str.capitalize() 将字符串首字母大写
str.replace() 旧字符串,新字符串,[替换次数] 将字符串当中旧字符串替换成新字符串
str.center(width [, char]) 宽度,字符串 将字符串填充到长度为width,默认用空格
str.count(sub,start=0,end=len(str)) 字符,起始位置,结束位置 查看字符串出现的次数
str.encode(encoding, errors) 编码,错误处理方案默认strict 将字符串编码
str.decode(encoding,errors) 编码,错误处理方案默认strict 将字符串解码
str.isalnum() 如果str至少有一个字符并且所有字符都是字母或者数字返回True,否则False
str.isalpha() 如果str至少有一个字符并且所有字符都是字母则返回True,否则返回False
str.islower() 检查字符串当中能区分大小写的字符,并且这些字符都是小写
str.isnumeric() 检查字符串是否都是数字
str.isspace() 检查字符串是否只包含空格
str.istitle() 检查字符串首字母是否大写
str.isupper() 检查字符串当中能区分大小写的字符,并且这些字符都是大写
str.maketrans(in,out) 要替换的字符,替换后字符 将字符串中要替换的字符串替换成新的字符(from string import maketrans)
max(str) 字符串 返回字符串中最大的字母
min(str) 字符串 返回字符串中最小的字母
str.title() 将字符串所有单子首字母大写
str.splitlines(num=str.count(‘\n’)) 长度 按照行分割,返回列表
str.partition(string) 字符串 按照字符串把str分割成3元的元组

Python调用Shell命令–总有一款适合你


#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import commands
import subprocess
# content = os.system('ls -al')  #执行结果 0或者1
# print '结果:%s' % content
# content = os.popen('ls -al').read()
# print '###结果:%s' % content
# content = os.popen('ls -al').readlines()
# print '###结果:%s' % content
#commands.getstatusoutput(cmd)         返回(status, output)
#commands.getoutput(cmd)               返回输出结果
# (status, content) = commands.getstatusoutput('ls -al')
# print "###状态 %d ###结果 %s" % (status, content)

###新版调用模块subprocess
#阻塞
# subprocess.call(['ls -al'],shell=True)
# #无阻塞
# pop = subprocess.Popen('ls -al',shell=True)
# #阻塞
# pop.wait()

#获取返回结果
# result = subprocess.Popen('ls -al', shell=True,stdout =subprocess.PIPE)
# content = result.communicate()  #会阻塞主进程等返回结果
# print content #元组(stdoutdata, stderrdata)
s = subprocess.check_output('ls -l', shell=True)
print s

 

Python-MySQL查询函数使用示例

# -*- coding: UTF-8 -*-
import MySQLdb
import time
#警告信息try except是无法捕捉的
from warnings import filterwarnings
filterwarnings(‘error’, category = MySQLdb.Warning)
cursor = conn = delete_id = update_id = 0
try:
conn = MySQLdb.connect(host=’10.200.10.203′, user=’test’, passwd=’test’, db=’test’, port=3306,charset=’utf8′)
cursor = conn.cursor()
#创建表
create_tabl_sql = “””CREATE TABLE IF NOT EXISTS `test` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(200) NOT NULL,
`create_time` datetime NOT NULL,
`content` text NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8″””
#cursor.execute(create_tabl_sql)
except MySQLdb.Warning, w:
print “警告信息 %s” % str(w)
except MySQLdb.Error, e:
print “错误信息 %d %s” % (e.args[0], e.args[1])
# 查询数据
select_data_sql = “select * from test”
print “执行语句: %s” % select_data_sql
try:
cursor.execute(select_data_sql)
result = cursor.fetchone() #读取一行数据
print ‘获取记录数:%d’ % cursor.rowcount #返回总记录数
print u’编号:%d,姓名:%s,时间:%s,内容:%s’ % (result[0], result[1], result[2], result[3])
#读取指定行内容
result_many = cursor.fetchmany(2) #读取2行数据
print ‘获取记录数:%d’ % cursor.rowcount #返回总记录数
print result_many
results = cursor.fetchall() #读取所有数据
print ‘获取记录数:%d’ % cursor.rowcount #返回总记录数
for row in results:
print ‘编号:%d’ % (row[0])
print u’姓名:%s’ % (row[1])
print u’内容:%s’ % (row[3])
except :
# 异常回滚
print ‘查询数据失败’

cursor.close()
conn.close()

Python操作MySQL数据库

数据库python-mysql安装参考:http://blog.phpfs.com/archives/2343.html

# -*- coding: UTF-8 -*-
import MySQLdb
import time
#警告信息try except是无法捕捉的
from warnings import filterwarnings
filterwarnings('error', category = MySQLdb.Warning)
cursor = conn = delete_id = update_id = 0
try:
    conn = MySQLdb.connect(host='10.200.10.203', user='test', passwd='test', db='test', port=3306,charset='utf8')
    cursor = conn.cursor()
    #创建表
    create_tabl_sql = """CREATE TABLE IF NOT EXISTS `test` (
      `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
      `name` varchar(200) NOT NULL,
      `create_time` datetime NOT NULL,
      `content` text NOT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8"""
    #cursor.execute(create_tabl_sql)
except MySQLdb.Warning, w:
    print "警告信息 %s" % str(w)
except MySQLdb.Error, e:
    print "错误信息 %d %s" % (e.args[0], e.args[1])

# 插入数据
name = '张三'
create_time = time.strftime('%Y-%m-%d %H:%M:%S')
content = '这是内容'
insert_data_sql = "insert into `test` (`name`, `create_time`, `content`) values ('" + name + "','" + create_time + "','" + content + "')"
print "执行语句: %s" % insert_data_sql
try:
    cursor.execute(insert_data_sql)
    conn.commit()
except :
    # 异常回滚
    print '插入异常回滚'
    conn.rollback()
# 查询数据
select_data_sql = "select * from test order by id desc limit 2"
print "执行语句: %s" % select_data_sql
try:
    cursor.execute(select_data_sql)
    results = cursor.fetchall()
    for row in results:
        print '编号:%d' % (row[0])
        print u'姓名:%s' % (row[1])
        print u'内容:%s' % (row[3])
    update_id = results[0][0]
    delete_id = results[1][0]
except :
    # 异常回滚
    print '查询数据失败'

# 删除数据
delete_data_sql = "delete from `test` WHERE id  = %d " % (delete_id)
print '执行语句: %s' % delete_data_sql
try:
   cursor.execute(delete_data_sql)
   conn.commit()
except:
    print '删除异常回滚'
    conn.rollback()
# 更新数据
name_2 = '李四'
content_2 = '这是新内容'
update_data_sql = "update `test` set name ='" + name_2 + "',content='" + content_2 + "' where id = %d" % (update_id)
print '执行语句: %s' % update_data_sql
try:
    cursor.execute(update_data_sql)
    conn.commit()
except:
    print '更新异常回滚'
    conn.rollback()

cursor.close()
conn.close()

 

Python警告信息捕捉

异常和警告处理案例

# -*- coding: UTF-8 -*-
import MySQLdb
#警告信息try except是无法捕捉的
from warnings import filterwarnings
filterwarnings('error', category = MySQLdb.Warning)
#当然也可以屏蔽警告filterwarnings("ignore")
try:
    conn = MySQLdb.connect(host='127.0.0.1',user='test',passwd='test',db='test',port=3306,charset='utf8')
    cursor = conn.cursor()
    #创建表
    create_tabl_sql = """CREATE TABLE IF NOT EXISTS `test1` (
      `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
      `name` varchar(200) NOT NULL,
      `create_time` datetime NOT NULL,
      `content` text NOT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8"""
    cursor.execute(create_tabl_sql)
    cursor.close()
    conn.close()
except MySQLdb.Warning, w:
    print "警告信息 %s" % str(w)
except MySQLdb.Error, e:
    print "错误信息 %d %s" % (e.args[0], e.args[1])


 

Python上传文件到FTP服务器

代码python2.7下测试通过!写在最前面!

# -*- coding: UTF-8 -*-
from ftplib import FTP
import os
import os.path
#上传文件到FTP服务器
def ftp_upload(filename, save_filename):
    ftp = FTP()
    ftp.set_debuglevel(0)                   # 打开调试级别2,显示详细信息;0为关闭调试信息
    ftp.connect('blog.phpfs.com', '21', 60)  # FTP主机 端口 超时时间
    ftp.login('python', 'python')           # 登录,如果匿名登录则用空串代替即可

    remote_dir = save_filename.split("/")
    newfilename = remote_dir.pop()

    if remote_dir :
        for dir_name in remote_dir :
            if dir_name == '.' or dir_name == '' :
                continue
            else :
                #尝试创建目录
                try:
                    ftp.mkd(dir_name)
                    ftp.cwd(dir_name)
                except:
                    ftp.cwd(dir_name)
    target_path = '/'.join(remote_dir)
    print '保存文件名:', newfilename
    print '上传目录:', target_path
    print '当前目录:', ftp.pwd()
    print '待上传文件名: %s' % os.path.basename(filename)
    bufsize = 1024                       # 设置缓冲块大小
    file_handler = open(filename, 'rb')  # 以读模式在本地打开文件
    ftp.storbinary('STOR %s' % newfilename, file_handler, bufsize)
    ftp.set_debuglevel(0)
    file_handler.close()
    ftp.quit()
    print "本地文件 ", filename, " 成功上传至 ", save_filename

#举个栗子
ftp_upload('D:/python/test/ex1/7.jpg', '/123.jpg')  #上传文件到根目录下
ftp_upload('D:/python/test/ex1/7.jpg', 'test/123/456/123.jpg');