Skip to content

gist 记录一些代码片段

print的时候顺带带上时间

使用这种方式的好处不仅是显示时间,而且可以很方便地将往屏幕输出改为写文件;更优雅的方式是用logging

import time
def myprint(*args, **kwargs):
    args = list(args)
    args[0] = "["+time.strftime("%Y-%m-%d %H:%M:%S")+"] " + str(args[0])
    print(*args, **kwargs)

使用的时候就和print一样使用,可以传入多个参数

myprint("aha myprint", 666)

连接mysql批量插入、查询

config.py:

MYSQL_DB = "web"
MYSQL_USER = "web"
MYSQL_PASSWORD = "sEcret_strOng_passw0rd"
MYSQL_HOST = "localhost"
MYSQL_PORT = 3306
import pymysql
import time
from config import MYSQL_DB, MYSQL_USER, MYSQL_PASSWORD, MYSQL_HOST, MYSQL_PORT

def db():
    conn = pymysql.connect(
                user=MYSQL_USER, passwd=MYSQL_PASSWORD,
                host=MYSQL_HOST, port=MYSQL_PORT,
                db=MYSQL_DB, charset='utf8',
                init_command="set NAMES utf8mb4", use_unicode=True)
    conn.encoding = "utf8"
    return conn

conn = db()
def runsql(sql, *args, onerror='raise'):
    global conn
    if not conn.open:
        conn = db()
    cur = conn.cursor()
    try:
        cur.execute(sql, args) #参数化查询 不要自己拼接sql
    except:
        if onerror=="ignore":
            return False
        else:
            raise
    result = list(cur)
    conn.commit()
    cur.close()
    return result

def tags_tagname2ids_set(tagname):
    """
    select查询示例
    返回包含标签tagname的帖子id集合
    """
    sql = "select topicid from tags where tagname=%s"
    sqlresult = runsql(sql, tagname)
    return set([i[0] for i in sqlresult])

def usersettings_write_bool(userid, username, settingname, settingvalue):
    """
    replace查询示例
    写入用户配置信息至usersetting表
    返回bool
    """
    sql = "replace into usersettings values (%s, %s, %s, %s)"
    try:
        runsql(sql, userid, username, settingname, settingvalue)
        return True
    except:
        traceback.print_exc()
        return False

大小写不敏感字典

from requests.structures import CaseInsensitiveDict
mydict = CaseInsensitiveDict(mydict)

mpms多线程下每个线程单独变量

自己写的类不是线程安全的,所以在多线程下要做到每个线程自己一个变量互不干扰

mpms下使用EasyLogin完整示例代码模板, 先要使用我fork的版本 加上了len支持:

wget https://d.py3.io/mpms.py
from mpms import MPMS
from EasyLogin import EasyLogin
import threading
thread_data = threading.local()

import time
myprint = lambda s: print("[{showtime}] {s}".format(showtime=time.strftime("%Y-%m-%d %H:%M:%S"), s=s))

def worker(id):
    global thread_data
    a = thread_data.__dict__.get("a")
    if not a:
        a = EasyLogin()
        thread_data.__dict__["a"] = a
    pass # do the stuff, like a.get
    return result

def handler(meta, result):
    # meta["fp"].write ...
    pass # do the stuff

if __name__ == "__main__":
    meta = {"fp": open("result.txt","w",encoding="utf-8")}
    m = MPMS(worker, handler, 2, 2, meta=meta)
    m.start()
    for i in range(...):
        m.put(i)
    while len(m)>10:
        myprint("Remaning "+str(len(m)))
        time.sleep(2)
    m.join()
    myprint("Done!")

将值存入字典中的列表 dict_add_list

判断key是否存在 应该有更好的方法?这个实现还是很naive的

def dict_add_list(dictname, key, value):
    if key not in dictname:
        dictname[key] = [value]
    else:
        dictname[key].append(value)

# 如果字典是存储count计数的话 用这个
def dict_incr(dictname, key):
    if key not in dictname:
        dictname[key] = 1
    else:
        dictname[key] += 1

使用AES加密字符串

https://github.com/ricmoo/pyaes

首先 pip install pyaes -t . 注意每次加密的时候都需要重新初始化aes

加密:

plaintext = "hello world"

import pyaes,base64
aes = pyaes.AESModeOfOperationCTR(b"This_key_for_demo_purposes_only!")
encrypted_text = base64.b64encode(aes.encrypt(plaintext.encode("utf-8")))

print(encrypted_text) # ipkEJevbnsfbEm4=

解密:

encrypted_text = "ipkEJevbnsfbEm4="

import pyaes, base64
aes = pyaes.AESModeOfOperationCTR(b"This_key_for_demo_purposes_only!")
plaintext = aes.decrypt(base64.b64decode(encrypted_text)).decode()

print(plaintext) # hello world