remove FLG_SQL flag, use sqlMode instead
[ftsbench.git] / pgdriver.c
1 /*
2  * Copyright (c) 2006 Teodor Sigaev <teodor@sigaev.ru>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *        notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *        notice, this list of conditions and the following disclaimer in the
12  *        documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of any co-contributors
14  *        may be used to endorse or promote products derived from this software
15  *        without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL CONTRIBUTORS BE LIABLE FOR ANY
21  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
23  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
25  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
27  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <unistd.h>
33 #include <string.h>
34 #include <time.h>
35 #include <errno.h>
36 #include <sys/types.h>
37 #include <netinet/in.h>
38
39 #include <libpq-fe.h>
40 #include "ftsbench.h"
41
42 #ifdef HAVE_POLL_H
43 #include <poll.h>
44 #else /* HAVE_POLL */
45 #ifdef HAVE_SYS_POLL_H
46 #include <sys/poll.h>
47 #else
48 #error Not defined HAVE_POLL_H or HAVE_SYS_POLL_H
49 #endif /* HAVE_SYS_POLL_H */
50 #endif /* HAVE_POLL */
51
52 #ifndef INFTIM
53 #define INFTIM (-1)
54 #endif
55
56 typedef enum SocketState {
57     SS_NONE = 0,
58         SS_READ,
59         SS_READYREAD,
60         SS_WRITE,
61         SS_READYWRITE
62 } SocketState;
63
64 typedef enum ResStatus {
65         RS_OK = 0,
66         RS_INPROGRESS
67 } ResStatus;
68
69 typedef struct ftsPG {
70         ftsDB   db;
71         PGconn  *conn;
72         int             flags;
73         PQnoticeReceiver        origreceiver;
74         int             emptyquery;
75         int             prepared;
76
77         int             socket;
78         SocketState     state;
79
80         StringBuf               b;
81 } ftsPG;
82
83 static void
84 NoticeReceiver(void *arg, const PGresult *res) {
85     ftsPG *db = (ftsPG*) arg;
86
87         db->emptyquery = 0;
88
89         if ( strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "00000") == 0 ) {
90                 char    *msg = PQresultErrorMessage(res);
91                 /* NOTICE */
92                 if ( strstr(msg, "does not exist, skipping")!=NULL ) {
93                         /* skip 'NOTICE:  table "wow" does not exist, skipping' */
94                         return;
95                 }
96                 if ( strstr(msg, "query contains only stopword(s) or doesn't contain lexeme(s), ignored")!=NULL ) {
97                         db->emptyquery = 1;
98                         return;
99                 }
100         }
101
102         db->origreceiver(db, res);
103 }
104
105 static void
106 pgflush(ftsPG *db) {
107         int flushres;
108
109         flushres=PQflush(db->conn);
110         if ( flushres > 0 ) {
111                 /* need more write */
112                 db->state = SS_WRITE;
113         } else if ( flushres == 0 ) {
114                 /* success write, waits for read */
115                 db->state = SS_READ;
116         } else {
117                 fprintf(stderr, "PQflush failed: %s\n", PQerrorMessage(db->conn));
118                 exit(1);
119         }
120 }
121
122 static ResStatus
123 checkStatus(ftsPG* db) {
124         switch( db->state ) {
125                 case SS_READYWRITE:
126                         pgflush(db);
127                 case SS_READ:
128                 case SS_WRITE:
129                         return RS_INPROGRESS;
130                 case SS_READYREAD:
131                         PQconsumeInput(db->conn);
132                         if ( PQisBusy(db->conn) != 0 ) {
133                                 db->state = SS_READ;
134                                 return RS_INPROGRESS;
135                         }
136                         break;
137                 case SS_NONE:
138                 default:
139                         fprintf(stderr,"Should not be here!\n");
140                         exit(1);
141                         break;
142         }
143
144         return RS_OK;
145 }
146                         
147 static void
148 waitResult(ftsPG *db) {
149         struct pollfd   pfd;
150         PGresult        *res;
151
152         pfd.fd = db->socket;
153         do {
154                 pfd.events = pfd.revents = 0;
155                 if ( db->state == SS_READ ) {
156                         pfd.events = POLLIN;
157                 } else if (  db->state == SS_WRITE ) {
158                         pfd.events = POLLOUT;
159                 }
160
161                 if ( pfd.events ) {
162                         int ret = poll( &pfd, 1, INFTIM);
163                         if ( ret<0 ) {
164                                 fprintf(stderr,"poll failed: %s\n", strerror(errno));
165                                 exit(1);
166                         }
167
168                         if ( pfd.revents & (POLLHUP | POLLNVAL | POLLERR) ) {
169                                 fprintf(stderr,"Poll report about socket error\n");
170                                 exit(1);
171                         } else if ( pfd.revents & POLLIN ) {
172                                 db->state = SS_READYREAD;
173                         } else if ( pfd.revents & POLLOUT ) {
174                                 db->state = SS_READYWRITE;
175                         }
176                 }
177         } while( checkStatus(db) != RS_OK );
178
179         while ( (res = PQgetResult(db->conn))!= NULL ) {
180                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
181                         fprintf(stderr, "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn));
182                         exit(1);
183                 }
184                 PQclear(res);
185         }
186 }
187
188 static int
189 checkEmptyQuery(ftsPG *db, PGresult *res) {
190         if ( 
191                         db->emptyquery  > 0 &&
192                         PQresultStatus(res) == PGRES_FATAL_ERROR &&
193                         strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "0A000") == 0 && /* FEATURE_NOT_SUPPORTED */
194                         strstr(PQresultErrorField(res, PG_DIAG_SOURCE_FILE), "ginscan.c") != NULL 
195                 ) 
196                 return 1;
197
198         return 0;
199 }
200
201 static void
202 execQuery(ftsDB* adb, char ** words, int flags) {
203         ftsPG *db = (ftsPG*)adb; 
204         const char      *paramValues[1];
205         int i = 0;
206         PGresult *res;
207
208         if ( db->prepared == 0 ) {
209                 /* firsttime */
210                 char buf[1024];
211
212                 db->flags = flags;
213
214                 sprintf(buf, "SELECT count(*) FROM ftsbench WHERE %s @@ to_tsquery( $1 ::text );",
215                         ( flags & FLG_FUNC ) ? "to_tsvector(body)" : "fts" );
216
217                 res = PQprepare( db->conn, "search_ftsbench", buf, 1, NULL );
218                 
219                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
220                         fprintf(stderr, "PREPARE SELECT command failed: %s\n", PQerrorMessage(db->conn));
221                         exit(1);
222                 }
223                 PQclear(res);
224
225                 db->prepared = 1;
226         }
227
228         db->b.strlen = 0;
229
230         while( *words ) {
231                 char *ptr = *words;
232
233                 if ( i!= 0 ) 
234                         sb_add(&db->b, (db->flags & FLG_OR) ? " | '" : " & '", 4);
235                 else
236                         sb_add(&db->b, "'", 1);
237
238                 while( *ptr ) {
239                         if (*ptr == '\'')
240                                 sb_add(&db->b, "'", 1);
241                         sb_add(&db->b, ptr, 1);
242                         ptr++;
243                 }
244                 sb_add(&db->b, "'", 1);
245
246                 i++;
247                         
248                 words++;
249         }
250         paramValues[0] = db->b.str;
251
252         res = PQexecPrepared( db->conn, "search_ftsbench",
253                                                  1, paramValues,
254                                                  NULL, NULL, 0);
255
256         if (PQresultStatus(res) != PGRES_TUPLES_OK) {
257                 /* skip error ' all words are a stop word' for GIN index -
258                    result is empty, in any case */
259                 if ( checkEmptyQuery(db, res) == 0 ) { 
260                         fprintf(stderr, "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn));
261                         exit(1);
262                 }
263         } else if ( PQntuples(res) == 1 ) {
264                 db->db.nres += atoi( PQgetvalue(res,0,0) );
265         } else {
266                 fprintf(stderr,"Bad PQntuples %d\n", PQntuples(res));
267                 exit(1);
268         }
269         
270         PQclear(res);
271
272         db->emptyquery = 0;
273         pthread_mutex_lock(&(db->db.nqueryMutex));
274         db->db.nquery ++;
275         pthread_mutex_unlock(&(db->db.nqueryMutex));
276 }
277
278 static void
279 startCreateScheme(ftsDB* adb, int flags) {
280         ftsPG *db = (ftsPG*)adb;
281         char    buf[1024];
282         PGresult *res;
283
284         db->flags = flags;
285
286         res = PQexec(db->conn, "DROP TABLE IF EXISTS ftsbench CASCADE;");
287         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
288                 fprintf(stderr, "DROP TABLE command failed: %s\n", PQerrorMessage(db->conn));
289                         exit(1);
290         }
291         PQclear(res);
292
293         if ( flags & FLG_FUNC ) 
294                 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text );");
295         else
296                 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text, fts     tsvector ); "
297                                         "CREATE TRIGGER tsvectorupdate BEFORE UPDATE OR INSERT ON ftsbench "
298                                         "FOR EACH ROW EXECUTE PROCEDURE tsearch2(fts, body);" );
299
300         res = PQexec(db->conn, buf);
301         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
302                 fprintf(stderr, "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn));
303                         exit(1);
304         }
305         PQclear(res);
306
307         res = PQexec(db->conn, "BEGIN;");
308         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
309                 fprintf(stderr, "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn));
310                         exit(1);
311         }
312         PQclear(res);
313
314         return;
315 }
316
317 static void
318 finishCreateScheme(ftsDB* adb) {
319         ftsPG *db = (ftsPG*)adb; 
320         PGresult *res;
321
322         if ( db->db.nquery > 0 ) {
323                 waitResult(db);
324                 
325                 if ( PQsetnonblocking(db->conn, 0) != 0 ) {
326                         fprintf(stderr, "PQsetnonblocking command failed: %s\n", PQerrorMessage(db->conn));
327                         exit(1);
328                 }
329         }
330
331         res = PQexec(db->conn, "COMMIT;");
332         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
333                 fprintf(stderr, "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn));
334                         exit(1);
335         }
336         PQclear(res);
337
338         if ( (db->flags & (FLG_GIST | FLG_GIN)) != 0 ) {
339                 char    buf[1024];
340
341                 if ( db->flags & FLG_FUNC ) 
342                         sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( to_tsvector(body) );", 
343                                                 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
344                 else
345                         sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( fts );", 
346                                                 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
347
348                 report("(create index, ");
349
350                 res = PQexec(db->conn, buf);
351                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
352                         fprintf(stderr, "CREATE INDEX command failed: %s\n", PQerrorMessage(db->conn));
353                                 exit(1);
354                 }
355                 PQclear(res);
356         } else 
357                 report("(");
358
359         report("vacuum");
360
361         res = PQexec(db->conn, "VACUUM ANALYZE ftsbench;");
362         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
363                 fprintf(stderr, "VACUUM ANALYZE command failed: %s\n", PQerrorMessage(db->conn));
364                 exit(1);
365         }
366         PQclear(res);
367
368         report(") ");
369
370         return;
371 }
372
373
374 static void
375 InsertRow(ftsDB* adb, int id, char *txt) {
376         ftsPG *db = (ftsPG*)adb;
377         PGresult *res;
378         const char      *paramValues[2];
379         uint32_t    binaryIntVal;
380         int             paramLengths[] = {sizeof(binaryIntVal), 0};
381         int     paramFormats[] = {1, 0};
382
383         if ( db->db.nquery == 0 ) {
384                 /* firsttime */
385                 
386                 res = PQprepare( db->conn, "insert_ftsbench",
387                                                  "INSERT INTO ftsbench (id, body) VALUES ( $1 ::int4, $2 ::text);",
388                                                  2, NULL );
389                 
390                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
391                         fprintf(stderr, "PREPARE INSERT command failed: %s\n", PQerrorMessage(db->conn));
392                         exit(1);
393                 }
394                 PQclear(res);
395
396                 if ( PQsetnonblocking(db->conn, 1) != 0 ) {
397                         fprintf(stderr, "PQsetnonblocking command failed: %s\n", PQerrorMessage(db->conn));
398                         exit(1);
399                 }
400         } else {
401                 waitResult(db);
402         }
403
404         binaryIntVal = htonl((uint32_t) id);
405         paramValues[0] = (char*)&binaryIntVal;
406         paramValues[1] = txt;
407
408         if ( PQsendQueryPrepared( db->conn, "insert_ftsbench",
409                                                  2, paramValues,
410                                                  paramLengths, paramFormats, 0) == 0 ) {
411                  fprintf(stderr, "PQsendQueryPrepared failed: %s\n", PQerrorMessage(db->conn));
412                  exit(1);
413         }
414
415         pgflush(db);
416
417         db->db.nquery++;
418 }
419
420 static void 
421 Close(ftsDB* adb) {
422         ftsPG *db = (ftsPG*)adb;
423
424         PQfinish(db->conn);
425 }
426
427 ftsDB* 
428 PGInit(char * connstr) {
429         ftsPG   *db = (ftsPG*)malloc(sizeof(ftsPG));
430         char    conninfo[1024];
431
432         memset(db,0,sizeof(ftsPG));
433
434         sprintf(conninfo, "dbname=%s", connstr);
435         db->conn = PQconnectdb(conninfo);
436
437         if (PQstatus(db->conn) != CONNECTION_OK) {
438                 fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(db->conn));
439                 exit(1);
440         }
441
442         db->origreceiver = PQsetNoticeReceiver(db->conn, NoticeReceiver, (void *)db);
443
444         db->db.execQuery = execQuery;
445         db->db.startCreateScheme = startCreateScheme;
446         db->db.finishCreateScheme = finishCreateScheme;
447         db->db.InsertRow = InsertRow;
448         db->db.Close = Close;
449         db->socket = PQsocket(db->conn);
450         if ( db->socket < 0 ) {
451                 fprintf(stderr,"Socket error\n");
452                 exit(1);
453         }
454         
455         return (ftsDB*)db;
456 }