a8e922aa369a7f24ce2d62e9c28d5e179c63d691
[ftsbench.git] / pgdriver.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <string.h>
5 #include <time.h>
6 #include <errno.h>
7 #include <sys/types.h>
8 #include <netinet/in.h>
9
10 #include <libpq-fe.h>
11 #include "ftsbench.h"
12
13 #ifdef HAVE_POLL_H
14 #include <poll.h>
15 #else /* HAVE_POLL */
16 #ifdef HAVE_SYS_POLL_H
17 #include <sys/poll.h>
18 #else
19 #error Not defined HAVE_POLL_H or HAVE_SYS_POLL_H
20 #endif /* HAVE_SYS_POLL_H */
21 #endif /* HAVE_POLL */
22
23 #ifndef INFTIM
24 #define INFTIM (-1)
25 #endif
26
27 typedef enum SocketState {
28     SS_NONE = 0,
29         SS_READ,
30         SS_READYREAD,
31         SS_WRITE,
32         SS_READYWRITE
33 } SocketState;
34
35 typedef enum ResStatus {
36         RS_OK = 0,
37         RS_INPROGRESS
38 } ResStatus;
39
40 typedef struct ftsPG {
41         ftsDB   db;
42         PGconn  *conn;
43         int             flags;
44         PQnoticeReceiver        origreceiver;
45         int             emptyquery;
46         int             prepared;
47
48         int             socket;
49         SocketState     state;
50
51         StringBuf               b;
52 } ftsPG;
53
54 static void
55 NoticeReceiver(void *arg, const PGresult *res) {
56     ftsPG *db = (ftsPG*) arg;
57
58         db->emptyquery = 0;
59
60         if ( strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "00000") == 0 ) {
61                 char    *msg = PQresultErrorMessage(res);
62                 /* NOTICE */
63                 if ( strstr(msg, "does not exist, skipping")!=NULL ) {
64                         /* skip 'NOTICE:  table "wow" does not exist, skipping' */
65                         return;
66                 }
67                 if ( strstr(msg, "query contains only stopword(s) or doesn't contain lexeme(s), ignored")!=NULL ) {
68                         db->emptyquery = 1;
69                         return;
70                 }
71         }
72
73         db->origreceiver(db, res);
74 }
75
76 static void
77 pgflush(ftsPG *db) {
78         int flushres;
79
80         flushres=PQflush(db->conn);
81         if ( flushres > 0 ) {
82                 /* need more write */
83                 db->state = SS_WRITE;
84         } else if ( flushres == 0 ) {
85                 /* success write, waits for read */
86                 db->state = SS_READ;
87         } else {
88                 fprintf(stderr, "PQflush failed: %s", PQerrorMessage(db->conn));
89                 exit(1);
90         }
91 }
92
93 static ResStatus
94 checkStatus(ftsPG* db) {
95         switch( db->state ) {
96                 case SS_READYWRITE:
97                         pgflush(db);
98                 case SS_READ:
99                 case SS_WRITE:
100                         return RS_INPROGRESS;
101                 case SS_READYREAD:
102                         PQconsumeInput(db->conn);
103                         if ( PQisBusy(db->conn) != 0 ) {
104                                 db->state = SS_READ;
105                                 return RS_INPROGRESS;
106                         }
107                         break;
108                 case SS_NONE:
109                 default:
110                         fprintf(stderr,"Should not be here!\n");
111                         exit(1);
112                         break;
113         }
114
115         return RS_OK;
116 }
117                         
118 static void
119 waitResult(ftsPG *db) {
120         struct pollfd   pfd;
121         PGresult        *res;
122
123         pfd.fd = db->socket;
124         do {
125                 pfd.events = pfd.revents = 0;
126                 if ( db->state == SS_READ ) {
127                         pfd.events = POLLIN;
128                 } else if (  db->state == SS_WRITE ) {
129                         pfd.events = POLLOUT;
130                 }
131
132                 if ( pfd.events ) {
133                         int ret = poll( &pfd, 1, INFTIM);
134                         if ( ret<0 ) {
135                                 fprintf(stderr,"poll failed: %s", strerror(errno));
136                                 exit(1);
137                         }
138
139                         if ( pfd.revents & (POLLHUP | POLLNVAL | POLLERR) ) {
140                                 fprintf(stderr,"Poll report about socket error\n");
141                                 exit(1);
142                         } else if ( pfd.revents & POLLIN ) {
143                                 db->state = SS_READYREAD;
144                         } else if ( pfd.revents & POLLOUT ) {
145                                 db->state = SS_READYWRITE;
146                         }
147                 }
148         } while( checkStatus(db) != RS_OK );
149
150         while ( (res = PQgetResult(db->conn))!= NULL ) {
151                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
152                         fprintf(stderr, "Execution of prepared statement failed: %s", PQerrorMessage(db->conn));
153                         exit(1);
154                 }
155                 PQclear(res);
156         }
157 }
158
159 static int
160 checkEmptyQuery(ftsPG *db, PGresult *res) {
161         if ( 
162                         db->emptyquery  > 0 &&
163                         PQresultStatus(res) == PGRES_FATAL_ERROR &&
164                         strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "0A000") == 0 && /* FEATURE_NOT_SUPPORTED */
165                         strstr(PQresultErrorField(res, PG_DIAG_SOURCE_FILE), "ginscan.c") != NULL 
166                 ) 
167                 return 1;
168
169         return 0;
170 }
171
172 static void
173 execQuery(ftsDB* adb, char ** words, int flags) {
174         ftsPG *db = (ftsPG*)adb; 
175         const char      *paramValues[1];
176         int i = 0;
177         PGresult *res;
178
179         if ( db->prepared == 0 ) {
180                 /* firsttime */
181                 char buf[1024];
182
183                 db->flags = flags;
184
185                 if ( flags & FLG_FUNC )
186                         sprintf(buf, "SELECT count(*) FROM ftsbench WHERE to_tsvector(body) @@ to_tsquery( $1 ::text );");
187                 else
188                         sprintf(buf, "SELECT count(*) FROM ftsbench WHERE fts @@ to_tsquery( $1 ::text );");
189
190                 res = PQprepare( db->conn, "search_ftsbench", buf, 1, NULL );
191                 
192                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
193                         fprintf(stderr, "PREPARE SELECT command failed: %s", PQerrorMessage(db->conn));
194                         exit(1);
195                 }
196                 PQclear(res);
197
198                 db->prepared = 1;
199         }
200
201         db->b.strlen = 0;
202
203         while( *words ) {
204                 char *ptr = *words;
205
206                 if ( i!= 0 ) 
207                         sb_add(&db->b, (db->flags & FLG_OR) ? " | '" : " & '", 4);
208                 else
209                         sb_add(&db->b, "'", 1);
210
211                 while( *ptr ) {
212                         if (*ptr == '\'')
213                                 sb_add(&db->b, "'", 1);
214                         sb_add(&db->b, ptr, 1);
215                         ptr++;
216                 }
217                 sb_add(&db->b, "'", 1);
218
219                 i++;
220                         
221                 words++;
222         }
223         paramValues[0] = db->b.str;
224
225         res = PQexecPrepared( db->conn, "search_ftsbench",
226                                                  1, paramValues,
227                                                  NULL, NULL, 0);
228
229         if (PQresultStatus(res) != PGRES_TUPLES_OK) {
230                 /* skip error ' all words are a stop word' for GIN index -
231                    result is empty, in any case */
232                 if ( checkEmptyQuery(db, res) == 0 ) { 
233                         fprintf(stderr, "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn));
234                         exit(1);
235                 }
236         }
237         PQclear(res);
238
239         db->emptyquery = 0;
240         pthread_mutex_lock(&(db->db.nqueryMutex));
241         db->db.nquery ++;
242         pthread_mutex_unlock(&(db->db.nqueryMutex));
243 }
244
245 static void
246 startCreateScheme(ftsDB* adb, int flags) {
247         ftsPG *db = (ftsPG*)adb;
248         char    buf[1024];
249         PGresult *res;
250
251         db->flags = flags;
252
253         res = PQexec(db->conn, "DROP TABLE IF EXISTS ftsbench CASCADE;");
254         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
255                 fprintf(stderr, "DROP TABLE command failed: %s", PQerrorMessage(db->conn));
256                         exit(1);
257         }
258         PQclear(res);
259
260         if ( flags & FLG_FUNC ) 
261                 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text );");
262         else
263                 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text, fts     tsvector ); "
264                                         "CREATE TRIGGER tsvectorupdate BEFORE UPDATE OR INSERT ON ftsbench "
265                                         "FOR EACH ROW EXECUTE PROCEDURE tsearch2(fts, body);" );
266
267         res = PQexec(db->conn, buf);
268         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
269                 fprintf(stderr, "CREATE TABLE command failed: %s", PQerrorMessage(db->conn));
270                         exit(1);
271         }
272         PQclear(res);
273
274         res = PQexec(db->conn, "BEGIN;");
275         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
276                 fprintf(stderr, "CREATE TABLE command failed: %s", PQerrorMessage(db->conn));
277                         exit(1);
278         }
279         PQclear(res);
280
281         return;
282 }
283
284 static void
285 finishCreateScheme(ftsDB* adb) {
286         ftsPG *db = (ftsPG*)adb; 
287         PGresult *res;
288
289         if ( db->db.nquery > 0 ) {
290                 waitResult(db);
291                 
292                 if ( PQsetnonblocking(db->conn, 0) != 0 ) {
293                         fprintf(stderr, "PQsetnonblocking command failed: %s", PQerrorMessage(db->conn));
294                         exit(1);
295                 }
296         }
297
298         res = PQexec(db->conn, "COMMIT;");
299         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
300                 fprintf(stderr, "CREATE TABLE command failed: %s", PQerrorMessage(db->conn));
301                         exit(1);
302         }
303         PQclear(res);
304
305         if ( (db->flags & (FLG_GIST | FLG_GIN)) != 0 ) {
306                 char    buf[1024];
307
308                 if ( db->flags & FLG_FUNC ) 
309                         sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( to_tsvector(body) );", 
310                                                 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
311                 else
312                         sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( fts );", 
313                                                 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
314
315                 printf("(create index, ");
316                 fflush(stdout);
317
318                 res = PQexec(db->conn, buf);
319                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
320                         fprintf(stderr, "CREATE INDEX command failed: %s", PQerrorMessage(db->conn));
321                                 exit(1);
322                 }
323                 PQclear(res);
324         } else {
325                 printf("(");
326                 fflush(stdout);
327         }
328
329         printf("vacuum");
330         fflush(stdout);
331
332         res = PQexec(db->conn, "VACUUM ANALYZE ftsbench;");
333         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
334                 fprintf(stderr, "VACUUM ANALYZE command failed: %s", PQerrorMessage(db->conn));
335                 exit(1);
336         }
337         PQclear(res);
338         printf(") ");
339         fflush(stdout);
340
341         return;
342 }
343
344
345 static void
346 InsertRow(ftsDB* adb, int id, char *txt) {
347         ftsPG *db = (ftsPG*)adb;
348         PGresult *res;
349         const char      *paramValues[2];
350         uint32_t    binaryIntVal;
351         int             paramLengths[] = {sizeof(binaryIntVal), 0};
352         int     paramFormats[] = {1, 0};
353
354         if ( db->db.nquery == 0 ) {
355                 /* firsttime */
356                 
357                 res = PQprepare( db->conn, "insert_ftsbench",
358                                                  "INSERT INTO ftsbench (id, body) VALUES ( $1 ::int4, $2 ::text);",
359                                                  2, NULL );
360                 
361                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
362                         fprintf(stderr, "PREPARE INSERT command failed: %s", PQerrorMessage(db->conn));
363                         exit(1);
364                 }
365                 PQclear(res);
366
367                 if ( PQsetnonblocking(db->conn, 1) != 0 ) {
368                         fprintf(stderr, "PQsetnonblocking command failed: %s", PQerrorMessage(db->conn));
369                         exit(1);
370                 }
371         } else {
372                 waitResult(db);
373         }
374
375         binaryIntVal = htonl((uint32_t) id);
376         paramValues[0] = (char*)&binaryIntVal;
377         paramValues[1] = txt;
378
379         if ( PQsendQueryPrepared( db->conn, "insert_ftsbench",
380                                                  2, paramValues,
381                                                  paramLengths, paramFormats, 0) == 0 ) {
382                  fprintf(stderr, "PQsendQueryPrepared failed: %s", PQerrorMessage(db->conn));
383                  exit(1);
384         }
385
386         pgflush(db);
387
388         db->db.nquery++;
389 }
390
391 ftsDB* 
392 PGInit(char * connstr) {
393         ftsPG   *db = (ftsPG*)malloc(sizeof(ftsPG));
394         char    conninfo[1024];
395
396         memset(db,0,sizeof(ftsPG));
397
398         sprintf(conninfo, "dbname=%s", connstr);
399         db->conn = PQconnectdb(conninfo);
400
401         if (PQstatus(db->conn) != CONNECTION_OK) {
402                 fprintf(stderr, "Connection to database failed: %s", PQerrorMessage(db->conn));
403                 exit(1);
404         }
405
406         db->origreceiver = PQsetNoticeReceiver(db->conn, NoticeReceiver, (void *)db);
407
408         db->db.execQuery = execQuery;
409         db->db.startCreateScheme = startCreateScheme;
410         db->db.finishCreateScheme = finishCreateScheme;
411         db->db.InsertRow = InsertRow;
412         db->socket = PQsocket(db->conn);
413         if ( db->socket < 0 ) {
414                 fprintf(stderr,"Socket error\n");
415                 exit(1);
416         }
417         
418         return (ftsDB*)db;
419 }