第十六章 一个数据库函数库
来源:互联网 时间: 2006-04-11
      

  
16.1 引言  
    在八十年代早期,Unix环境被认为不适合运行多用户数据库系统(见Stonebr  
aker[1981]和Weinberger[1982])。早期的系统,如Version 7,因为没有提供任  
何形式的IPC机制(除了半双工管道),也没有提供任何形式的记录锁机制,所以  
确实不适合运行多用户数据库。新一些的系统,象SVR4和4.3+BSD,则为运行可靠  
的、多用户的数据库系统提供了一个适合的环境。很多商业公司在许多年前就已提  
供这种系统。  
    在本章中,我们设计一个简单的、多用户数据库的函数库。通过此函数库提供  
的C语言函数,其他程序可以访问数据库中的记录。这个C函数库只是一个完整的数  
据库的很小的一部分,并不包括其他很多部分,如查询语言等,关于其他部分可以  
参阅专门介绍数据库的书。我们感兴趣的是一个数据库函数库与Unix的接口,以及  
这些接口与前面各章节的关系(如12.3节的记录锁)。  
16.2 历史  
    dbm(3)是一个在Unix系统中很流行的数据库函数库,它由Ken Thompson开发  
,使用了动态Hash结构。最初,它与Version 7一起提供,并出现在所有Berkeley  
的版本中,也包含在Berkeley的SVR4兼容函数库中。Seltzer和Yigit[1991]中有关  
于dbm函数库使用的动态Hash算法历史的详细介绍,以及这个库的其它实现方法。  
但是,这些实现的一个致命缺点是它们都不支持多个进程对数据库的并发修改。它  
们都没有提供并发控制(如记录锁)。  
    4.3+BSD提供了一个新的库db(3),这个库支持3种不同的访问方式:(a)面向  
记录,(b)Hash和(c)B树。同样,db也没有提供并发控制(这一点在db手册的  
BUGS栏中说得很清楚)。Seltzer和Olson[1992]中说以后的版本将提供象大部分商  
业数据库系统一样的并发控制功能。  
    绝大部分商业的数据库函数库提供多进程并发修改一个数据库所需要的并发控  
制。这些系统一般都使用我们在12.3节中介绍的建议记录锁,并且用B+树来实现他  
们的数据库。  
16.3 函数库  
    在本节中,我们定义了这个数据库函数库的C语言接口,在下一节中再讨论其  
实现。  
    当打开一个数据库时,通过返回值得到一个DB结构的指针。这一点很象通过f  
open得到一个FILE结构的指针(5.2节),以及通过opendir得到一个DIR结构的指  
针(4.21节)。我们将用此指针作为参数来调用以后的数据库函数。  
  如果db_open成功返回  
,则将建立两个文件:pathname.idx 和pathname.dat,pathname.idx是索引文件  
,pathname.dat是数据文件。oflag被用作第二个参数传递给open(3.3节),表明  
这些文件的打开模式(只读、读写或如果文件不存在则建立等)。如果需要建立新  
的数据库,mode将作为第三个参数传递给open(文件访问权限)。  
    当我们不再使用数据库时,调用db_close来关闭数据库。db_close将关闭索引  
文件和数据文件,并释放数据库使用过程中分配的所有用于内部缓冲的存储空间。  
   
    当我们向数据库中加入一条新的记录时,必须提供一个此记录的主键,以及与  
此主键相联系的数据。如果此数据库存储的是人事信息,主键可以是雇员号,数据  
可以是此雇员的姓名、地址、电话号码、受聘日等等。我们的实现要求主键必须是  
唯一的(比方说,我们不会有两个雇员记录有同样的雇员号)。  
   
    #include "db.h"  
    int db_store( DB *db, const char *key, const char *data, int flag ) ;  
    返回:0表示成功,非0表示错误(见  
   
    key和data是由NULL结束的字符串。它们可以包含除了NULL外的任何字符,如  
换行符。  
    flag只能是DB_INSERT(加一条新记录)或DB_REPLACE(替换一条已有的记录  
)。这两个常数定义在db.h头文件中。如果我们使用DB_REPLACE,而记录不存在,  
返回值为-1。如果我们使用DB_INSERT,而记录已经存在,返回值为1。  
    通过提供主键key可以从数据库中取出一条记录。  
     #include "db.h"  
    char *db_fetch( DB *db, const char *key ) ;  
    返回:指向数据的指针表示成功,NULL表示记录没有找到。  
   
    如果记录找到了,返回的指针指向与主键联系在一起的数据。  
    通过提供主键key我们也可以从数据库中删除一条记录。  
   
#include "db.h"  
int db_delete( DB *db, const char *key ) ;  
 返回:0表示成功,-1表示记录没有找到。  
   
除了通过主键访问数据库外,也可以一条一条记录地访问数据库。为此,我们首  
先调用db_rewind回到数据库的第一条记录,再调用db_nextrec来读每个顺序的记  
录。  
   
#include "db.h"  
void db_rewind( DB *db ) ;  
char *db_nextrec( DB *db, char *key ) ;  
 返回:如果成功返回指向数据的指针,NULL表示到了数据库的  
   
 如果key是非NULL的指针,db_nextrec将当前记录的主键存入key中。  
    db_nextrec不保证记录访问的次序,只保证每一条记录被访问恰好一次。如果  
我们顺序存储三条主键分别为A、B、C的记录,我们无法确定db_nextrec将按什么  
顺序返回这三条记录。它可能按B、A、C的顺序返回,也可能按其它顺序。实际的  
顺序由数据库的实现决定。  
    这七个函数提供了数据库函数库的接口。接下来介绍我们的实现。  
16.4 实现概述  
    大多数数据库访问的函数库使用两个文件来存储信息:一个索引文件和一个数  
据文件。索引文件包括索引值(主键)和一个指向数据文件中对应数据记录的指针  
。有许多技术可用来组织索引文件以提高按键查询的速度和效率,Hash表和B+树是  
两种常用的技术。我们采用固定大小的Hash表来组织索引文件结构,并采用链表法  
解决Hash冲突。在介绍db_open时,我们曾提到将建立两个文件:一个以.idx为后  
缀的索引文件和一个以.dat为后缀的数据文件。  
    我们将主键和索引以NULL结尾的字符串形式存储--它们不能包含任意的二进制  
数据。有些数据库系统用二进制的形式存储数值数据(如用1、2或4个字节存储一  
个整数)以节省空间,这样一来使函数复杂化,也使数据库文件在不同的平台间移  
植比较困难。比方说,我们在网络上有两个系统使用不同的二进制格式存储整数,  
如果我们想要这两个系统都能够访问数据库就必须解决这个问题(今天不同体系结  
构的系统共享文件已经很常见了)。按照字符串形式存储所有的记录,包括主键和  
数据,能使这一切变得简单。这确实会需要更多的磁盘空间,但随着磁盘技术的发  
展,这渐渐不再构成问题。  
    db_store要求对每个主键,最多只有一个对应的记录。有些数据库系统允许多  
条记录使用同样的主键,并提供方法访问与一个主键相关的所有记录。另外,我们  
只有一个索引文件,这意味着每个数据记录只能有一个主键。有些数据库允许一条  
记录拥有多个键,并且对每一个键使用一个索引文件。当加入或删除一条记录时,  
要对所有的索引文件进行相应的修改。(一个有多个索引的例子是雇员库文件,我  
们可以将雇员号作为键,也可以将雇员的社会保险号作为键。由于一般雇员的名字  
并不保证唯一,所以名字不能作为键。)  
    图16.1是我们的数据库实现的基本结构。索引文件由三部分组成:空闲链表指  
针、Hash表和索引记录。图16.1中所有叫做ptr的字段中实际存储的是以ASCII字符  
串形式记录的在文件中的偏移量。  
图16.1 索引文件和数据文件结构  
    当给定一个主键要在数据库中寻找一条记录时,db_fetch根据主键计算Hash值  
,由此Hash值可确定一条Hash链(chain ptr字段可以为0,表示一条空的Hash链)  
。沿着这条Hash链,我们可以找到所有有同样Hash值的索引记录。当我们遇到一个  
索引记录的chain ptr字段为0时,表示我们到达了此Hash链的末尾。  
    下面让我们来看一个实际的数据库文件。程序16.1建立了一个新的数据库,并  
且加入了三条记录。由于我们将所有的字段都以ASCII字符串的形式存储在数据库  
中,所以我们可以用任何标准的Unix工具来查看索引文件和数据文件。  
$ ls -l db4.*  
-rw-r--r--  1  stevens 28 Oct 30 06:42 db4.dat  
-rw-r--r--  1  stevens 28 Oct 30 06:42 db4.idx  
$ cat db4.idx  
   0  53  35  
0  10Alpha:0:6  
   0  10beta:6:14  
17 11gamma:20:8  
$ cat db4.dat  
data1  
Data for beta  
Record3  
#include "db.h"  
int  
main(void)  
{  
 DB  *db;  
 if ( (db = db_open("db4", O_RDWR | O_CREAT | O_TRUNC,  
                                  FILE_MODE)) == NULL)  
  err_sys("db_open error");  
 if (db_store(db, "Alpha", "data1", DB_INSERT) != 0)  
  err_quit("db_store error for alpha");  
 if (db_store(db, "beta", "Data for beta", DB_INSERT) != 0)  
  err_quit("db_store error for beta");  
 if (db_store(db, "gamma", "record3", DB_INSERT) != 0)  
  err_quit("db_store error for gamma");  
 db_close(db);  
 exit(0);  
}  
程序16.1 建立一个数据库并向其写三条记录  
    为了使这个例子简单,我们将每个ptr字段的大小定为4个ASCII字符,将Hash  
表的大小(Hash链的条数)定为3。由于每一个ptr记录的是一个文件偏移量,所以  
4个ASCII字符限制了一个索引文件或数据文件的大小最多只能为10000字节。在后  
面的16.8节做一些性能方面的测试时,我们将ptr字段的大小设为6(这样文件的大  
小可以达到1000000字节),将Hash表的大小设为100。  
    索引文件的第一行为  
   0  53  35   0  
    分别为空闲链表指针(0,表示空闲链表为空),和三个Hash链表的指针:53  
,35和0。下一行  
   0  10Alpha:0:6  
    显示了一条索引记录的结构。第一个4字符的字段(0)表示这一条记录是此H  
ash链的最后一条。下一个4字符的字段(10)为idx len,表示此索引记录剩下部  
分的长度。我们通过两个read操作来读取一条索引记录:第一个read读取这两个固  
定长度的字段(chain ptr和idx len),然后再根据idx len来读取后面的不定长  
部分。剩下的三个字段为:key(主键)、dat off(在数据文件中的偏移量)和d  
at len(数据记录的长度)。这三个字段用分隔符隔开,在这里我们使用的是分号  
。由于此三个字段都是不定长的,所以我们需要一个专门的分隔符,而且这个分隔  
符不能出现在主键中。最后用一个回车符结束这一条索引记录。由于在idx len中  
已经有了记录的长度,所以这个回车符并不是必须的,加上回车符是为了把各条索  
引记录分开,这样就可以用标准的Unix工具如cat和more来查看索引文件。key是我  
们将记录加入数据库时选择的主键。数据记录在数据文件中的偏移为0,长度为6。  
从数据文件中我们看到数据记录确实从0开始,长度为6个字节。(与索引文件一样  
,我们在这里自动在每条数据记录的后面加上一个回车符,以便于使用Unix工具。  
在调用db_fetch时,此回车符不作为数据返回。)  
    如果在这个例子中跟踪三个Hash链,我们看到第一条Hash链上的第一条记录的  
偏移量是53(gamma)。这条链上的下一条记录的偏移量为17(Alpha),并且是这  
条链上的最后一条记录。第二条Hash链上的第一条记录的偏移量是35(beta),且  
是此链上最后一条记录。第三条Hash链为空。  
    请注意索引文件中索引记录的顺序和数据文件中对应数据记录的顺序与我们在  
程序16.1中调用db_store的顺序一样。由于我们在调用db_open时使用了O_TRUNC标  
志,索引文件和数据文件都被截断,整个数据库相当于从新初始化。在这种情形下  
,db_store将新的索引记录和数据记录添加到对应的文件末尾。在后面我们将看到  
db_store也可以重复使用这两个文件中因删除记录而生成的空间。  
    在这里使用固定大小的Hash表作为索引是一个妥协。当每个Hash链均不太长时  
,这个方法能保证快速的查找。我们的目的是能够快速地查找任意的键,同时又不  
使用太复杂的数据结构,如B树或动态可扩充Hash表。动态可扩充Hash表的优点是  
能保证仅用两次磁盘操作就能找到数据记录(详见Selter和Yigit[1991])。B树能  
够让我们用键的顺序来遍历数据库(采用Hash表的db_nextrec函数就做不到这一点  
)。  
16.5 集中式或非集中式  
    当有多个进程访问数据库时,有两种方法实现库函数。  
    1.集中式  由一个进程作为数据库管理者,所有的数据库访问工作由此进程完  
成。库函数通过IPC机制与此中心进程进行联系。  
    2.非集中式 每个库函数独立的申请并发控制(加锁),然后调用它自己的I/  
O函数。  
    使用这两种技术的数据库都有。在Unix系统中的潮流是使用非集中式方法。如  
果有适当的加锁函数,因为避免使用了IPC,那么非集中式方法一般要快一些。图  
16.2描绘了集中式方法的操作。  
    在图中,我们特意表示出IPC象绝大多数Unix的消息传送一样需要经过操作系  
统内核(在14.9节中的共享内存不需要这种经过内核的拷贝)。我们看到,在集中  
方式下,中心控制进程将记录读出,然后通过IPC机制将数据送给请求进程。注意  
到中心控制进程是唯一的通过I/O操作访问数据库文件的进程。  
图16.2 集中式数据库访问  
图16.3 非集中式数据库访问  
    集中式的优点是能够根据需要来对操作模式进行控制。例如,我们能够通过中  
心进程给不同的进程赋予不同的优先级,而用非集中式方法这是很难做到的。在这  
种情况下我们只能依赖于操作系统核心的磁盘I/O调度策略和加锁策略(如当三个  
进程同时等待一个锁开锁时,哪个进程下一个得到锁)。  
    图16.3描绘了非集中式方法,在本章我们的实现就是采用这种方法。使用库函  
数访问数据库的用户进程是平等的,它们通过使用记录锁机制来实现并发控制。  
16.6 并发  
    由于很多系统的实现都采用两个文件(一个索引文件和一个数据文件)的方法  
,所以我们也使用这种方法,这要求我们能够控制对两个文件的加锁。有很多方法  
可用来对两个文件进行加锁。  
粗锁  
    最简单的加锁方法是将这两个文件中的一个进行作为锁,并要求调用者在对数  
据库进行操作前必须获得这个锁。我们将这称为粗锁。比方说,我们可以认为一个  
进程对索引文件的第一个字节加了读锁后,才能获得读整个数据库的权力。一个进  
程对索引文件的第一个字节加了写锁后,才获得修改整个数据库的权力。我们可以  
使用Unix的记录锁机制来控制每次可以有多个读进程,而只能由一个写进程(见图  
12.2)。db_fetch和db_nextrec函数将获得读锁,而db_delete、db_store以及db  
_open则获得写锁。(db_open要获得写锁的原因是如果要创建新文件的话,要在索  
引文件前端建立空闲区链表以及Hash链表。)  
    粗锁的问题是限制了最大程度的并发。用粗锁时,当一个进程向一条Hash链中  
加一条记录时,其它进程无法访问在另一条Hash链上的记录。  
细锁  
    下面我们用称为细锁的方法来加强粗锁以提高并发度。我们要求一个读进程或  
写进程在操作一条记录前必须先获得此记录所在的Hash链的读锁或写锁。我们允许  
对一条Hash链同时可以有多个读进程,而只能有一个写进程。另外,一个写进程在  
操作空闲区链表(如db_delete或db_store)前,必须获得空闲区链表的写锁。最  
后,当db_store向索引文件或数据文件加一条新记录时,必须获得对应文件相应区  
域的写锁。  
    我们期望细锁能比粗锁提供更高的并发度。在16.8节我们将给出一些实际的比  
较测试结果。16.7节给出我们的采用细锁的实现,并详细讨论了锁的实现(粗锁是  
我们实现的简化)。  
    在源文件中,我们直接调用read,readv,write和writev。我们没有使用标准  
I/O函数库。虽然使用标准I/O函数库也可以使用记录锁,但是需要非常复杂的缓存  
管理。我们不希望例如fgets返回的数据是10分钟之前读入标准I/O缓存的,而被另  
一个进程在5分钟之前修改了。  
    我们对并发讨论依据的是数据库函数库的简单的需求。商业系统一般有更多的  
需要。关于并发的更多的细节可以参看Data[1982]的第三章。  
16.7 源码  
我们从程序16.2中的头文件db.h开始。所有函数以及调用此函数库的用户进程都包  
含这一头文件。  
#include <sys/types.h>  
#include <sys/stat.h> /* open() & db_open() mode */  
#include <fcntl.h>  /* open() & db_open() flags */  
#include <stddef.h>  /* NULL */  
#include "ourhdr.h"  
  /* flags for db_store() */  
#define DB_INSERT 1  /* insert new record only */  
#define DB_REPLACE 2  /* replace existing record */  
  /* magic numbers */  
#define IDXLEN_SZ    4 /* #ascii chars for length of index record */  
#define IDXLEN_MIN    6 /* key, sep, start, sep, length, newline */  
#define IDXLEN_MAX 1024 /* arbitrary */  
#define SEP    ':' /* separator character in index record */  
#define DATLEN_MIN    2 /* data byte, newline */  
#define DATLEN_MAX 1024 /* arbitrary */  
  /* following definitions are for hash chains and free list chain  
     in index file */  
#define PTR_SZ     6 /* size of ptr field in hash chain */  
#define PTR_MAX   999999 /* max offset (file size) = 10**PTR_SZ - 1 */  
#define NHASH_DEF  137 /* default hash table size */  
#define FREE_OFF    0 /* offset of ptr to free list in index file */  
#define HASH_OFF  PTR_SZ /* offset of hash table in index file */  
typedef struct { /* our internal structure */  
  int idxfd; /* fd for index file */  
  int datfd; /* fd for data file */  
  int oflag; /* flags for open()/db_open(): O_xxx */  
  char *idxbuf;/* malloc'ed buffer for index record */  
  char *datbuf;/* malloc'ed buffer for data record*/  
  char *name; /* name db was opened under */  
  off_t idxoff; /* offset in index file of index record */  
    /* actual key is at (idxoff + PTR_SZ + IDXLEN_SZ) */  
  size_t idxlen;/* length of index record */  
    /* excludes IDXLEN_SZ bytes at front of index record */  
    /* includes newline at end of index record */  
  off_t datoff; /* offset in data file of data record */  
  size_t datlen;/* length of data record */  
    /* includes newline at end */  
  off_t ptrval; /* contents of chain ptr in index record */  
  off_t ptroff; /* offset of chain ptr that points to this index record  
 */  
  off_t chainoff;/* offset of hash chain for this index record */  
  off_t hashoff;/* offset in index file of hash table */  
  int nhash; /* current hash table size */  
#define HASH_OFF  PTR_SZ /* offset of hash table in index file */  
typedef struct { /* our internal structure */  
  int idxfd; /* fd for index file */  
  int datfd; /* fd for data file */  
  int oflag; /* flags for open()/db_open(): O_xxx */  
  char *idxbuf;/* malloc'ed buffer for index record */  
  char *datbuf;/* malloc'ed buffer for data record*/  
  char *name; /* name db was opened under */  
  off_t idxoff; /* offset in index file of index record */  
    /* actual key is at (idxoff + PTR_SZ + IDXLEN_SZ) */  
  size_t idxlen;/* length of index record */  
    /* excludes IDXLEN_SZ bytes at front of index record */  
    /* includes newline at end of index record */  
  off_t datoff; /* offset in data file of data record */  
  size_t datlen;/* length of data record */  
    /* includes newline at end */  
  off_t ptrval; /* contents of chain ptr in index record */  
  off_t ptroff; /* offset of chain ptr that points to this index record  
 */  
  off_t chainoff;/* offset of hash chain for this index record */  
  off_t hashoff;/* offset in index file of hash table */  
  int nhash; /* current hash table size */  
  long cnt_delok; /* delete OK */  
  long cnt_delerr; /* delete error */  
  long cnt_fetchok;/* fetch OK */  
  long cnt_fetcherr;/* fetch error */  
  long cnt_nextrec;/* nextrec */  
  long cnt_stor1; /* store: DB_INSERT, no empty, appended */  
  long cnt_stor2; /* store: DB_INSERT, found empty, reused */  
  long cnt_stor3; /* store: DB_REPLACE, diff data len, appended */  
  long cnt_stor4; /* store: DB_REPLACE, same data len, overwrote */  
  long cnt_storerr;/* store error */  
} DB;  
typedef unsigned long hash_t; /* hash values */  
   /* user-callable functions */  
DB  *db_open(const char *, int, int);  
void  db_close(DB *);  
char *db_fetch(DB *, const char *);  
int   db_store(DB *, const char *, const char *, int);  
int   db_delete(DB *, const char *);  
void  db_rewind(DB *);  
char *db_nextrec(DB *, char *);  
void  db_stats(DB *);  
   /* internal functions */  
DB  *_db_alloc(int);  
int   _db_checkfree(DB *);  
int   _db_dodelete(DB *);  
int   _db_emptykey(char *);  
int   _db_find(DB *, const char *, int);  
int   _db_findfree(DB *, int, int);  
int   _db_free(DB *);  
hash_t  _db_hash(DB *, const char *);  
char *_db_nextkey(DB *);  
char *_db_readdat(DB *);  
off_t  _db_readidx(DB *, off_t);  
off_t  _db_readptr(DB *, off_t);  
void  _db_writedat(DB *, const char *, off_t, int);  
void  _db_writeidx(DB *, const char *, off_t, int, off_t);  
void  _db_writeptr(DB *, off_t, off_t);  
程序16.2 db.h头文件  
    这里,我们定义了实现的基本限制。如果要支持更大的数据库的话,这些限制  
也可以修改。其中一些我们定义为常数的值也可定义为变量,只是会使实现复杂一  
些。例如,我们设定Hash表的大小为137,一个也许更好的方法是让db_open的调用  
者根据数据库的大小通过参数来设定这个值,然后将这个值存在索引文件的最前面  
。  
    在DB结构中我们记录一个打开的数据库的所有信息。db_open函数返回一个DB  
结构的指针,这个指针被用于其它的所有函数。  
    我们选择用db_开头来命名用户可调用的库函数,用_db_开头来命名内部函数  
。  
    在程序16.3中定义了函数db_open。它打开索引文件和数据文件,必要的话初  
始化索引文件。通过调用_db_alloc来为DB结构分配空间,并初始化此结构。  
#include "db.h"  
/* Open or create a database.  Same arguments as open(). */  
DB *  
db_open(const char *pathname, int oflag, int mode)  
{  
 DB   *db;  
 int   i, len;  
 char  asciiptr[PTR_SZ + 1],  
    hash[(NHASH_DEF + 1) * PTR_SZ + 2];  
     /* +2 for newline and null */  
 struct stat statbuff;  
  /* Allocate a DB structure, and the buffers it needs */  
 len = strlen(pathname);  
 if ( (db = _db_alloc(len)) == NULL)  
  err_dump("_db_alloc error for DB");  
 db->oflag = oflag;  /* save a copy of the open flags */  
  /* Open index file */  
 strcpy(db->name, pathname);  
 strcat(db->name, ".idx");  
 if ( (db->idxfd = open(db->name, oflag, mode)) < 0) {  
  _db_free(db);  
  return(NULL);  
 }  
  /* Open data file */  
 strcpy(db->name + len, ".dat");  
 if ( (db->datfd = open(db->name, oflag, mode)) < 0) {  
  _db_free(db);  
  return(NULL);  
 }  
  /* If the database was created, we have to initialize it */  
 if ((oflag & (O_CREAT | O_TRUNC)) == (O_CREAT | O_TRUNC)) {  
   /* Write lock the entire file so that we can stat  
      the file, check its size, and initialize it,  
      as an atomic operation. */  
  if (writew_lock(db->idxfd, 0, SEEK_SET, 0) < 0)  
   err_dump("writew_lock error");  
  if (fstat(db->idxfd, &statbuff) < 0)  
   err_sys("fstat error");  
  if (statbuff.st_size == 0) {  
    如果数据库正被建立,则我们必须加锁。考虑两个进程同时试图建立同一个数  
据库的情况。第一个进程运行到调用fstat,并且在fstat返回后被核心切换。这时  
第二个进程调用db_open,发现索引文件的长度为0,并初始化空闲链表和Hash链表  
。第二个进程继续运行,向数据库中添加了一条记录。这时第二个进程被阻塞,第  
一个进程继续运行,并发现索引文件的大小为0(因为第一个进程是在fstat返回后  
才被切换),所以第一个进程重新初始化空闲链表和Hash链表,第二个进程写入的  
记录就被抹去了。要避免发生这种情况的方法是进行加锁,我们使用12.3节中的r  
eadw_lock,writew_lock和un_lock这三个函数。  
db_open调用程序16.4中定义的函数_db_alloc来为DB结构分配空间,包括一个索引  
缓冲和一个数据缓冲。  
#include "db.h"  
/* Allocate & initialize a DB structure, and all the buffers it needs  
*/  
DB *  
_db_alloc(int namelen)  
{  
 DB  *db;  
   /* Use calloc, to init structure to zero */  
 if ( (db = calloc(1, sizeof(DB))) == NULL)  
  err_dump("calloc error for DB");  
 db->idxfd = db->datfd = -1;    /* descriptors */  
  /* Allocate room for the name.  
     +5 for ".idx" or ".dat" plus null at end. */  
 if ( (db->name = malloc(namelen + 5)) == NULL)  
  err_dump("malloc error for name");  
  /* Allocate an index buffer and a data buffer.  
     +2 for newline and null at end. */  
 if ( (db->idxbuf = malloc(IDXLEN_MAX + 2)) == NULL)  
  err_dump("malloc error for index buffer");  
 if ( (db->datbuf = malloc(DATLEN_MAX + 2)) == NULL)  
  err_dump("malloc error for data buffer");  
 return(db);  
}  
程序16.4 _db_alloc函数  
    索引缓冲和数据缓冲的大小在db.h中定义。我们的数据库函数库可以通过让这  
些缓冲按需要动态扩张来得到加强。其方法可以是记录这两个缓冲的大小,然后在  
需要更大的缓冲时调用realloc。  
    在函数_db_free(程序16.5)中,这些缓冲将被释放,同时打开的文件被关闭  
。db_open在打开索引文件和数据文件时如果遇到错误,则调用_db_free释放资源  
。db_close(程序16.6)也调用_db_free。  
    函数db_fetch(程序16.7)根据给定的主键来读取一条记录。它调用_db_fin  
d在数据库中查找一条索引记录,如果找到,再调用_db_readdat来读取对应的数据  
记录。  
   
#include "db.h"  
/* Free up a DB structure, and all the malloc'ed buffers it  
 * may point to.  Also close the file descriptors if still open. */  
int  
_db_free(DB *db)  
{  
 if (db->idxfd >= 0 && close(db->idxfd) < 0)  
  err_dump("index close error");  
 if (db->datfd >= 0 && close(db->datfd) < 0)  
  err_dump("data close error");  
 db->idxfd = db->datfd = -1;  
 if (db->idxbuf != NULL)  
  free(db->idxbuf);  
 if (db->datbuf != NULL)  
  free(db->datbuf);  
 if (db->name != NULL)  
  free(db->name);  
 free(db);  
 return(0);  
}  
程序16.5 _db_free函数  
#include "db.h"  
void  
db_close(DB *db)  
{  
 _db_free(db); /* closes fds, free buffers & struct */  
}  
程序16.6 db_close函数  
#include "db.h"  
/* Fetch a specified record.  
 * We return a pointer to the null-terminated data. */  
char *  
db_fetch(DB *db, const char *key)  
{  
 char *ptr;  
 if (_db_find(db, key, 0) < 0) {  
  ptr = NULL;    /* error, record not found */  
  db->cnt_fetcherr++;  
 } else {  
  ptr = _db_readdat(db); /* return pointer to data */  
  db->cnt_fetchok++;  
 }  
   /* Unlock the hash chain that _db_find() locked */  
 if (un_lock(db->idxfd, db->chainoff, SEEK_SET, 1) < 0)  
  err_dump("un_lock error");  
 return(ptr);  
}  
程序16.7 db_fetch函数  
    函数_db_find(程序16.8)通过遍历Hash链来查找记录。db_fetch,db_dele  
te和db_store这几个需要根据主键找记录的函数都调用它。  
#include "db.h"  
/* Find the specified record.  
 * Called by db_delete(), db_fetch(), and db_store(). */  
int  
_db_find(DB *db, const char *key, int writelock)  
{  
 off_t offset, nextoffset;  
  /* Calculate hash value for this key, then calculate byte  
     offset of corresponding chain ptr in hash table.  
     This is where our search starts. */  
   /* calc offset in hash table for this key */  
 db->chainoff = (_db_hash(db, key) * PTR_SZ) + db->hashoff;  
 db->ptroff = db->chainoff;  
   /* Here's where we lock this hash chain.  It's the  
      caller's responsibility to unlock it when done.  
      Note we lock and unlock only the first byte. */  
 if (writelock) {  
  if (writew_lock(db->idxfd, db->chainoff, SEEK_SET, 1) < 0)  
   err_dump("writew_lock error");  
 } else {  
  if (readw_lock(db->idxfd, db->chainoff, SEEK_SET, 1) < 0)  
   err_dump("readw_lock error");  
 }  
   /* Get the offset in the index file of first record  
      on the hash chain (can be 0) */  
 offset = _db_readptr(db, db->ptroff);  
 while (offset != 0) {  
  nextoffset = _db_readidx(db, offset);  
  if (strcmp(db->idxbuf, key) == 0)  
   break;  /* found a match */  
  db->ptroff = offset; /* offset of this (unequal) record */  
  offset = nextoffset; /* next one to compare */  
 }  
 if (offset == 0)  
  return(-1);  /* error, record not found */  
  /* We have a match.  We're guaranteed that db->ptroff contains  
     the offset of the chain ptr that points to this matching  
     index record.  _db_dodelete() uses this fact.  (The chain  
     ptr that points to this matching record could be in an  
     index record or in the hash table.) */  
 return(0);  
}  
程序16.8 _db_find函数  
    _db_find的最后一个参数指明我们需要加什么样的锁,0表示读锁,1表示写锁  
。我们知道,db_fetch需要加读锁,而db_delete和db_store需要加写锁。_db_fi  
nd在获得需要的锁之前将等待。  
    _db_find中的while循环遍历Hash链中的索引记录,并比较主键。函数_db_re  
adidx用于读取每条索引记录。  
    请注意阅读_db_find中的最后一条注释。当沿着Hash链进行遍历时,必须始终  
跟踪索引前一条索引记录,其中有一个指针指向当前记录。这一点在我们删除一条  
记录时是很有用的,因为我们必须修改当前索引记录的前一条记录的链指针。  
    让我们先来看看_db_find调用的一些比较简单的函数。_db_hash(程序16.9)  
根据给定的主键计算Hash值。它将主键中的每一个ASCII字符乘以这个字符在字符  
串中以1开始的索引号,将这些结果加起来,除以Hash表的大小,将余数作为这个  
主键的Hash值。  
#include "db.h"  
/* Calculate the hash value for a key. */  
hash_t  
_db_hash(DB *db, const char *key)  
{  
 hash_t  hval;  
 const char *ptr;  
 char  c;  
 int   i;  
 hval = 0;  
 for (ptr = key, i = 1; c = *ptr++; i++)  
  hval += c * i;  /* ascii char times its 1-based index */  
 return(hval % db->nhash);  
}  
程序16.9 _db_hash函数  
    _db_find调用的下一个函数是_db_readptr(程序16.10)。这个函数能够读取  
以下三种不同的链指针中的任意一种:(1)索引文件最开始的空闲链表指针。(  
2)Hash表中指向Hash链的第一条索引记录的指针。(3)每条索引记录头的指向下  
一条记录的指针(这里的索引记录既可以处于一条Hash链表中,也可以处于空闲链  
表中)。这个函数的调用者应做好必要的加锁,此函数不进行任何加锁。  
#include "db.h"  
/* Read a chain ptr field from anywhere in the index file:  
 * the free list pointer, a hash table chain ptr, or an  
 * index record chain ptr.  */  
off_t  
_db_readptr(DB *db, off_t offset)  
{  
 char asciiptr[PTR_SZ + 1];  
 if (lseek(db->idxfd, offset, SEEK_SET) == -1)  
  err_dump("lseek error to ptr field");  
 if (read(db->idxfd, asciiptr, PTR_SZ) != PTR_SZ)  
  err_dump("read error of ptr field");  
 asciiptr[PTR_SZ] = 0;  /* null terminate */  
 return(atol(asciiptr));  
}  
程序16.10 _db_readptr函数  
    _db_find中的while循环通过调用_db_readidx来读取各条索引记录。_db_rea  
didx(程序16.11)是一个较长的函数,这个函数读取索引记录,并将索引记录的  
信息存储到适当的字段中。  
#include "db.h"  
#include <sys/uio.h>  /* struct iovec */  
/* Read the next index record.  We start at the specified offset in  
 * the index file.  We read the index record into db->idxbuf and  
 * replace the separators with null bytes.  If all is OK we set  
 * db->datoff and db->datlen to the offset and length of the  
 * corresponding data record in the data file.  */  
off_t  
_db_readidx(DB *db, off_t offset)  
{  
 int    i;  
 char   *ptr1, *ptr2;  
 char   asciiptr[PTR_SZ + 1], asciilen[IDXLEN_SZ + 1];  
 struct iovec iov[2];  
  /* Position index file and record the offset.  db_nextrec()  
     calls us with offset==0, meaning read from current offset.  
     We still need to call lseek() to record the current offset. */  
 if ( (db->idxoff = lseek(db->idxfd, offset,  
        offset == 0 ? SEEK_CUR : SEEK_SET)) == -1)  
  err_dump("lseek error");  
  /* Read the ascii chain ptr and the ascii length at  
     the front of the index record.  This tells us the  
     remaining size of the index record. */  
 iov[0].iov_base = asciiptr;  
 iov[0].iov_len  = PTR_SZ;  
 iov[1].iov_base = asciilen;  
 iov[1].iov_len  = IDXLEN_SZ;  
 if ( (i = readv(db->idxfd, &iov[0], 2)) != PTR_SZ + IDXLEN_SZ) {  
  if (i == 0 && offset == 0)  
   return(-1);  /* EOF for db_nextrec() */  
  err_dump("readv error of index record");  
 }  
 asciiptr[PTR_SZ] = 0;   /* null terminate */  
 db->ptrval = atol(asciiptr); /* offset of next key in chain */  
      /* this is our return value; always >= 0 */  
 asciilen[IDXLEN_SZ] = 0;  /* null terminate */  
 if ( (db->idxlen = atoi(asciilen)) < IDXLEN_MIN ||  
       db->idxlen > IDXLEN_MAX)  
  err_dump("invalid length");  
  /* Now read the actual index record.  We read it into the key  
     buffer that we malloced when we opened the database. */  
 if ( (i = read(db->idxfd, db->idxbuf, db->idxlen)) != db->idxlen)  
  err_dump("read error of indexc record");  
 if (db->idxbuf[db->idxlen-1] != '\n')  
  err_dump("missing newline"); /* sanity checks */  
 db->idxbuf[db->idxlen-1] = 0;  /* replace newline with null */  
  /* Find the separators in the index record */  
 if ( (ptr1 = strchr(db->idxbuf, SEP)) == NULL)  
  err_dump("missing first separator");  
 *ptr1++ = 0;    /* replace SEP with null */  
 if ( (ptr2 = strchr(ptr1, SEP)) == NULL)  
  err_dump("missing second separator");  
 *ptr2++ = 0;    /* replace SEP with null */  
 if (strchr(ptr2, SEP) != NULL)  
 asciiptr[PTR_SZ] = 0;   /* null terminate */  
 db->ptrval = atol(asciiptr); /* offset of next key in chain */  
      /* this is our return value; always >= 0 */  
 asciilen[IDXLEN_SZ] = 0;  /* null terminate */  
 if ( (db->idxlen = atoi(asciilen)) < IDXLEN_MIN ||  
       db->idxlen > IDXLEN_MAX)  
  err_dump("invalid length");  
  /* Now read the actual index record.  We read it into the key  
     buffer that we malloced when we opened the database. */  
 if ( (i = read(db->idxfd, db->idxbuf, db->idxlen)) != db->idxlen)  
  err_dump("read error of indexc record");  
 if (db->idxbuf[db->idxlen-1] != '\n')  
  err_dump("missing newline"); /* sanity checks */  
 db->idxbuf[db->idxlen-1] = 0;  /* replace newline with null */  
  /* Find the separators in the index record */  
 if ( (ptr1 = strchr(db->idxbuf, SEP)) == NULL)  
  err_dump("missing first separator");  
 *ptr1++ = 0;    /* replace SEP with null */  
 if ( (ptr2 = strchr(ptr1, SEP)) == NULL)  
  err_dump("missing second separator");  
 *ptr2++ = 0;    /* replace SEP with null */  
 if (strchr(ptr2, SEP) != NULL)  
  err_dump("too many separators");  
  /* Get the starting offset and length of the data record */  
 if ( (db->datoff = atol(ptr1)) < 0)  
  err_dump("starting offset < 0");  
 if ( (db->datlen = atol(ptr2)) <= 0 || db->datlen > DATLEN_MAX)  
  err_dump("invalid length");  
 return(db->ptrval);  /* return offset of next key in chain */  
}  
程序16.11 _db_readidx函数  
    我们调用readv来读取索引记录开始处的两个固定长度的字段:指向下一条索  
引记录的链指针和索引记录剩下的不定长部分的长度。然后,索引记录的剩下的部  
分被读入:主键、数据记录的偏移量和数据记录的长度。我们并不读数据记录,这  
由调用者自己完成。例如,在db_fetch中,在_db_find按主键找到索引记录前是不  
去读取数据记录的。  
    下面我们回到db_fetch。如果_db_find找到了索引记录,我们调用_db_readd  
at来读取对应的数据记录。这是一个很简单的函数(见程序16.12)。  
#include "db.h"  
/* Read the current data record into the data buffer.  
 * Return a pointer to the null-terminated data buffer. */  
char *  
_db_readdat(DB *db)  
{  
 if (lseek(db->datfd, db->datoff, SEEK_SET) == -1)  
  err_dump("lseek error");  
 if (read(db->datfd, db->datbuf, db->datlen) != db->datlen)  
  err_dump("read error");  
 if (db->datbuf[db->datlen - 1] != '\n')  /* sanity check */  
  err_dump("missing newline");  
 db->datbuf[db->datlen - 1] = 0;  /* replace newline with null */  
 return(db->datbuf);  /* return pointer to data record */  
}  
程序16.12 _de_readdat函数  
    我们从db_fetch开始,现在已经读取了索引记录和对应的数据记录。注意到,  
我们只在_db_find中加了一个读锁。由于我们对这条Hash链加了读锁,所以其他进  
程在这段时间内不可能对这条Hash链进行修改。  
    下面我们来看函数db_delete(程序16.13)。它开始时和db_fetch一样,调用  
_db_find来查找记录,只是这里传递给_db_find的最后一个参数为1,表示我们要  
对这一条Hash链加写锁。  
    db_delete调用_db_dodelete(程序16.14)来完成剩下的工作(在后面我们将  
看到db_store也调用_db_dodelete)。_db_dodelete的大部分工作是修正空闲链以  
及与主键对应的Hash链。  
    当一条记录被删除后,我们将其主键和数据记录设为空。在本章后面我们将提  
到的函数db_nextrec要用到这一点。  
    _db_dodelete对空闲链表加写锁,这样能防止两个进程同时删除不同链表上的  
记录时产生相互影响,因为我们将被删除的记录移到空闲链表上,这将改变空闲链  
表,而一次只能有一个进程能这样做。  
#include "db.h"  
/* Delete the specified record */  
int  
db_delete(DB *db, const char *key)  
{  
 int  rc;  
 if (_db_find(db, key, 1) == 0) {  
  rc = _db_dodelete(db); /* record found */  
  db->cnt_delok++;  
 } else {  
  rc = -1;    /* not found */  
  db->cnt_delerr++;  
 }  
 if (un_lock(db->idxfd, db->chainoff, SEEK_SET, 1) < 0)  
  err_dump("un_lock error");  
 return(rc);  
}  
程序16.13 db_delete函数  
#include "db.h"  
/* Delete the current record specified by the DB structure.  
 * This function is called by db_delete() and db_store(),  
 * after the record has been located by _db_find(). */  
int  
_db_dodelete(DB *db)  
{  
 int  i;  
 char *ptr;  
 off_t freeptr, saveptr;  
  /* Set data buffer to all blanks */  
 for (ptr = db->datbuf, i = 0; i < db->datlen - 1; i++)  
  *ptr++ = ' ';  
 *ptr = 0; /* null terminate for _db_writedat() */  
  /* Set key to blanks */  
 ptr = db->idxbuf;  
 while (*ptr)  
  *ptr++ = ' ';  
  /* We have to lock the free list */  
 if (writew_lock(db->idxfd, FREE_OFF, SEEK_SET, 1) < 0)  
  err_dump("writew_lock error");  
  /* Write the data record with all blanks */  
 _db_writedat(db, db->datbuf, db->datoff, SEEK_SET);  
  /* Read the free list pointer.  Its value becomes the  
     chain ptr field of the deleted index record.  This means  
     the deleted record becomes the head of the free list. */  
 freeptr = _db_readptr(db, FREE_OFF);  
  /* Save the contents of index record chain ptr,  
     before it's rewritten by _db_writeidx(). */  
 saveptr = db->ptrval;  
  /* Rewrite the index record.  This also rewrites the length  
     of the index record, the data offset, and the data length,  
     none of which has changed, but that's OK. */  
 _db_writeidx(db, db->idxbuf, db->idxoff, SEEK_SET, freeptr);  
  /* Write the new free list pointer */  
 _db_writeptr(db, FREE_OFF, db->idxoff);  
  /* Rewrite the chain ptr that pointed to this record  
     being deleted.  Recall that _db_find() sets db->ptroff  
     to point to this chain ptr.  We set this chain ptr  
     to the contents of the deleted record's chain ptr,  
     saveptr, which can be either zero or nonzero. */  
 _db_writeptr(db, db->ptroff, saveptr);  
 if (un_lock(db->idxfd, FREE_OFF, SEEK_SET, 1) < 0)  
  err_dump("un_lock error");  
 return(0);  
}  
程序16.14 _db_dodelete函数  
    _db_dodelete通过调用函数_db_writedat(程序16.15)清空数据记录。这时  
_db_writedat并不对数据文件加写锁,这是因为db_delete对这条记录



相关文章

进入科教论坛

百度搜索更多同类文章

  科教频道