通常在我们进行多进程应用开发的过程中,不可避免的会遇到多个进程访问同一个资源(临界资源)的状况,这时候必须通过加一个全局性的锁,来实现资源的同步访问(即:同一时间里只能有一个进程访问资源)。
举个例子如下:
假设我们用mysql来实现一个任务队列,实现的过程如下:
1. 在Mysql中创建Job表,用于储存队列任务,如下:
create table jobs( id auto_increment not null primary key, message text not null, job_status not null default 0 );
message 用来存储任务信息,job_status用来标识任务状态,假设只有两种状态,0:在队列中, 1:已出队列
2. 有一个生产者进程,往job表中放新的数据,进行排队:
insert into jobs(message) values(\'msg1\');
3.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:
select * from jobs where job_status=0 order by id asc limit 1; update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id
4. 如果没有跨进程的锁,两个消费者进程有可能同时取到重复的消息,导致一个消息被消费多次。这种情况是我们不希望看到的,于是,我们需要实现一个跨进程的锁。
=========================分割线=======================================
说到跨进程的锁实现,我们主要有几种实现方式:
(1)信号量
(2)文件锁fcntl
(3)socket(端口号绑定)
(4)signal
这几种方式各有利弊,总体来说前2种方式可能多一点,这里我就不详细说了,大家可以去查阅资料。
查资料的时候发现mysql中有锁的实现,适用于对于性能要求不是很高的应用场景,大并发的分布式访问可能会有瓶颈.
对此用python实现了一个demo,如下:
文件名:glock.py
#!/usr/bin/env python2.7
#
# -*- coding:utf-8 -*-
#
# Desc :
#
import logging, time
import MySQLdb
class Glock:
def __init__(self, db):
self.db = db
def _execute(self, sql):
cursor = self.db.cursor()
try:
ret = None
cursor.execute(sql)
if cursor.rowcount != 1:
logging.error(\"Multiple rows returned in mysql lock function.\")
ret = None
else:
ret = cursor.fetchone()
cursor.close()
return ret
except Exception, ex:
logging.error(\"Execute sql \\\"%s\\\" failed! Exception: %s\", sql, str(ex))
cursor.close()
return None
def lock(self, lockstr, timeout):
sql = \"SELECT GET_LOCK(\'%s\', %s)\" % (lockstr, timeout)
ret = self._execute(sql)
if ret[0] == 0:
logging.debug(\"Another client has previously locked \'%s\'.\", lockstr)
return False
elif ret[0] == 1:
logging.debug(\"The lock \'%s\' was obtained successfully.\", lockstr)
return True
else:
logging.error(\"Error occurred!\")
return None
def unlock(self, lockstr):
sql = \"SELECT RELEASE_LOCK(\'%s\')\" % (lockstr)
ret = self._execute(sql)
if ret[0] == 0:
logging.debug(\"The lock \'%s\' the lock is not released(the lock was not established by this thread).\", lockstr)
return False
elif ret[0] == 1:
logging.debug(\"The lock \'%s\' the lock was released.\", lockstr)
return True
else:
logging.error(\"The lock \'%s\' did not exist.\", lockstr)
return None
#Init logging
def init_logging():
sh = logging.StreamHandler()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(\'%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s\')
sh.setFormatter(formatter)
logger.addHandler(sh)
logging.info(\"Current log level is : %s\",logging.getLevelName(logger.getEffectiveLevel()))
def main():
init_logging()
db = MySQLdb.connect(host=\'localhost\', user=\'root\', passwd=\'\')
lock_name = \'queue\'
l = Glock(db)
ret = l.lock(lock_name, 10)
if ret != True:
logging.error(\"Can\'t get lock! exit!\")
quit()
time.sleep(10)
logging.info(\"You can do some synchronization work across processes!\")
##TODO
## you can do something in here ##
l.unlock(lock_name)
if __name__ == \"__main__\":
main()
在main函数里:
l.lock(lock_name, 10) 中,10是表示timeout的时间是10秒,如果10秒还获取不了锁,就返回,执行后面的操作。
在这个demo中,在标记TODO的地方,可以将消费者从job表中取消息的逻辑放在这里。即分割线以上的.
2.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:
select * from jobs where job_status=0 order by id asc limit 1; update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id
这样,就能保证多个进程访问临界资源时同步进行了,保证数据的一致性。
测试的时候,启动两个glock.py, 结果如下:
[@tj-10-47 test]# ./glock.py 2014-03-14 17:08:40,277 -glock:glock.py-L70-INFO: Current log level is : DEBUG 2014-03-14 17:08:40,299 -glock:glock.py-L43-DEBUG: The lock \'queue\' was obtained successfully. 2014-03-14 17:08:50,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes! 2014-03-14 17:08:50,299 -glock:glock.py-L56-DEBUG: The lock \'queue\' the lock was released.
可以看到第一个glock.py是 17:08:50解锁的,下面的glock.py是在17:08:50获取锁的,可以证实这样是完全可行的。
[@tj-10-47 test]# ./glock.py 2014-03-14 17:08:46,873 -glock:glock.py-L70-INFO: Current log level is : DEBUG 2014-03-14 17:08:50,299 -glock:glock.py-L43-DEBUG: The lock \'queue\' was obtained successfully. 2014-03-14 17:09:00,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes! 2014-03-14 17:09:00,300 -glock:glock.py-L56-DEBUG: The lock \'queue\' the lock was released. [@tj-10-47 test]#