Fix examples
[ftsbench.git] / pgdriver.c
1 /*
2  * Copyright (c) 2006 Teodor Sigaev <teodor@sigaev.ru>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *        notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *        notice, this list of conditions and the following disclaimer in the
12  *        documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of any co-contributors
14  *        may be used to endorse or promote products derived from this software
15  *        without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL CONTRIBUTORS BE LIABLE FOR ANY
21  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
23  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
25  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
27  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <unistd.h>
33 #include <string.h>
34 #include <time.h>
35 #include <errno.h>
36 #include <sys/types.h>
37 #include <netinet/in.h>
38
39 #include <libpq-fe.h>
40 #include "ftsbench.h"
41
42 #ifdef HAVE_POLL_H
43 #include <poll.h>
44 #else /* HAVE_POLL */
45 #ifdef HAVE_SYS_POLL_H
46 #include <sys/poll.h>
47 #else
48 #error Not defined HAVE_POLL_H or HAVE_SYS_POLL_H
49 #endif /* HAVE_SYS_POLL_H */
50 #endif /* HAVE_POLL */
51
52 #ifndef INFTIM
53 #define INFTIM (-1)
54 #endif
55
56 typedef enum SocketState {
57     SS_NONE = 0,
58         SS_READ,
59         SS_READYREAD,
60         SS_WRITE,
61         SS_READYWRITE
62 } SocketState;
63
64 typedef enum ResStatus {
65         RS_OK = 0,
66         RS_INPROGRESS
67 } ResStatus;
68
69 typedef struct ftsPG {
70         ftsDB   db;
71         PGconn  *conn;
72         int             flags;
73         PQnoticeReceiver        origreceiver;
74         int             emptyquery;
75         int             prepared;
76
77         int             socket;
78         SocketState     state;
79
80         StringBuf               b;
81 } ftsPG;
82
83 static void
84 NoticeReceiver(void *arg, const PGresult *res) {
85     ftsPG *db = (ftsPG*) arg;
86
87         db->emptyquery = 0;
88
89         if ( strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "00000") == 0 ) {
90                 char    *msg = PQresultErrorMessage(res);
91                 /* NOTICE */
92                 if ( strstr(msg, "does not exist, skipping")!=NULL ) {
93                         /* skip 'NOTICE:  table "wow" does not exist, skipping' */
94                         return;
95                 }
96                 if ( strstr(msg, "query contains only stopword(s) or doesn't contain lexeme(s), ignored")!=NULL ) {
97                         db->emptyquery = 1;
98                         return;
99                 }
100         }
101
102         db->origreceiver(db, res);
103 }
104
105 static void
106 pgflush(ftsPG *db) {
107         int flushres;
108
109         flushres=PQflush(db->conn);
110         if ( flushres > 0 ) {
111                 /* need more write */
112                 db->state = SS_WRITE;
113         } else if ( flushres == 0 ) {
114                 /* success write, waits for read */
115                 db->state = SS_READ;
116         } else {
117                 fprintf(stderr, "PQflush failed: %s", PQerrorMessage(db->conn));
118                 exit(1);
119         }
120 }
121
122 static ResStatus
123 checkStatus(ftsPG* db) {
124         switch( db->state ) {
125                 case SS_READYWRITE:
126                         pgflush(db);
127                 case SS_READ:
128                 case SS_WRITE:
129                         return RS_INPROGRESS;
130                 case SS_READYREAD:
131                         PQconsumeInput(db->conn);
132                         if ( PQisBusy(db->conn) != 0 ) {
133                                 db->state = SS_READ;
134                                 return RS_INPROGRESS;
135                         }
136                         break;
137                 case SS_NONE:
138                 default:
139                         fprintf(stderr,"Should not be here!\n");
140                         exit(1);
141                         break;
142         }
143
144         return RS_OK;
145 }
146                         
147 static void
148 waitResult(ftsPG *db) {
149         struct pollfd   pfd;
150         PGresult        *res;
151
152         pfd.fd = db->socket;
153         do {
154                 pfd.events = pfd.revents = 0;
155                 if ( db->state == SS_READ ) {
156                         pfd.events = POLLIN;
157                 } else if (  db->state == SS_WRITE ) {
158                         pfd.events = POLLOUT;
159                 }
160
161                 if ( pfd.events ) {
162                         int ret = poll( &pfd, 1, INFTIM);
163                         if ( ret<0 ) {
164                                 fprintf(stderr,"poll failed: %s", strerror(errno));
165                                 exit(1);
166                         }
167
168                         if ( pfd.revents & (POLLHUP | POLLNVAL | POLLERR) ) {
169                                 fprintf(stderr,"Poll report about socket error\n");
170                                 exit(1);
171                         } else if ( pfd.revents & POLLIN ) {
172                                 db->state = SS_READYREAD;
173                         } else if ( pfd.revents & POLLOUT ) {
174                                 db->state = SS_READYWRITE;
175                         }
176                 }
177         } while( checkStatus(db) != RS_OK );
178
179         while ( (res = PQgetResult(db->conn))!= NULL ) {
180                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
181                         fprintf(stderr, "Execution of prepared statement failed: %s", PQerrorMessage(db->conn));
182                         exit(1);
183                 }
184                 PQclear(res);
185         }
186 }
187
188 static int
189 checkEmptyQuery(ftsPG *db, PGresult *res) {
190         if ( 
191                         db->emptyquery  > 0 &&
192                         PQresultStatus(res) == PGRES_FATAL_ERROR &&
193                         strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "0A000") == 0 && /* FEATURE_NOT_SUPPORTED */
194                         strstr(PQresultErrorField(res, PG_DIAG_SOURCE_FILE), "ginscan.c") != NULL 
195                 ) 
196                 return 1;
197
198         return 0;
199 }
200
201 static void
202 execQuery(ftsDB* adb, char ** words, int flags) {
203         ftsPG *db = (ftsPG*)adb; 
204         const char      *paramValues[1];
205         int i = 0;
206         PGresult *res;
207
208         if ( db->prepared == 0 ) {
209                 /* firsttime */
210                 char buf[1024];
211
212                 db->flags = flags;
213
214                 if ( flags & FLG_FUNC )
215                         sprintf(buf, "SELECT count(*) FROM ftsbench WHERE to_tsvector(body) @@ to_tsquery( $1 ::text );");
216                 else
217                         sprintf(buf, "SELECT count(*) FROM ftsbench WHERE fts @@ to_tsquery( $1 ::text );");
218
219                 res = PQprepare( db->conn, "search_ftsbench", buf, 1, NULL );
220                 
221                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
222                         fprintf(stderr, "PREPARE SELECT command failed: %s", PQerrorMessage(db->conn));
223                         exit(1);
224                 }
225                 PQclear(res);
226
227                 db->prepared = 1;
228         }
229
230         db->b.strlen = 0;
231
232         while( *words ) {
233                 char *ptr = *words;
234
235                 if ( i!= 0 ) 
236                         sb_add(&db->b, (db->flags & FLG_OR) ? " | '" : " & '", 4);
237                 else
238                         sb_add(&db->b, "'", 1);
239
240                 while( *ptr ) {
241                         if (*ptr == '\'')
242                                 sb_add(&db->b, "'", 1);
243                         sb_add(&db->b, ptr, 1);
244                         ptr++;
245                 }
246                 sb_add(&db->b, "'", 1);
247
248                 i++;
249                         
250                 words++;
251         }
252         paramValues[0] = db->b.str;
253
254         res = PQexecPrepared( db->conn, "search_ftsbench",
255                                                  1, paramValues,
256                                                  NULL, NULL, 0);
257
258         if (PQresultStatus(res) != PGRES_TUPLES_OK) {
259                 /* skip error ' all words are a stop word' for GIN index -
260                    result is empty, in any case */
261                 if ( checkEmptyQuery(db, res) == 0 ) { 
262                         fprintf(stderr, "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn));
263                         exit(1);
264                 }
265         }
266         PQclear(res);
267
268         db->emptyquery = 0;
269         pthread_mutex_lock(&(db->db.nqueryMutex));
270         db->db.nquery ++;
271         pthread_mutex_unlock(&(db->db.nqueryMutex));
272 }
273
274 static void
275 startCreateScheme(ftsDB* adb, int flags) {
276         ftsPG *db = (ftsPG*)adb;
277         char    buf[1024];
278         PGresult *res;
279
280         db->flags = flags;
281
282         res = PQexec(db->conn, "DROP TABLE IF EXISTS ftsbench CASCADE;");
283         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
284                 fprintf(stderr, "DROP TABLE command failed: %s", PQerrorMessage(db->conn));
285                         exit(1);
286         }
287         PQclear(res);
288
289         if ( flags & FLG_FUNC ) 
290                 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text );");
291         else
292                 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text, fts     tsvector ); "
293                                         "CREATE TRIGGER tsvectorupdate BEFORE UPDATE OR INSERT ON ftsbench "
294                                         "FOR EACH ROW EXECUTE PROCEDURE tsearch2(fts, body);" );
295
296         res = PQexec(db->conn, buf);
297         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
298                 fprintf(stderr, "CREATE TABLE command failed: %s", PQerrorMessage(db->conn));
299                         exit(1);
300         }
301         PQclear(res);
302
303         res = PQexec(db->conn, "BEGIN;");
304         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
305                 fprintf(stderr, "CREATE TABLE command failed: %s", PQerrorMessage(db->conn));
306                         exit(1);
307         }
308         PQclear(res);
309
310         return;
311 }
312
313 static void
314 finishCreateScheme(ftsDB* adb) {
315         ftsPG *db = (ftsPG*)adb; 
316         PGresult *res;
317
318         if ( db->db.nquery > 0 ) {
319                 waitResult(db);
320                 
321                 if ( PQsetnonblocking(db->conn, 0) != 0 ) {
322                         fprintf(stderr, "PQsetnonblocking command failed: %s", PQerrorMessage(db->conn));
323                         exit(1);
324                 }
325         }
326
327         res = PQexec(db->conn, "COMMIT;");
328         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
329                 fprintf(stderr, "CREATE TABLE command failed: %s", PQerrorMessage(db->conn));
330                         exit(1);
331         }
332         PQclear(res);
333
334         if ( (db->flags & (FLG_GIST | FLG_GIN)) != 0 ) {
335                 char    buf[1024];
336
337                 if ( db->flags & FLG_FUNC ) 
338                         sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( to_tsvector(body) );", 
339                                                 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
340                 else
341                         sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( fts );", 
342                                                 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
343
344                 printf("(create index, ");
345                 fflush(stdout);
346
347                 res = PQexec(db->conn, buf);
348                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
349                         fprintf(stderr, "CREATE INDEX command failed: %s", PQerrorMessage(db->conn));
350                                 exit(1);
351                 }
352                 PQclear(res);
353         } else {
354                 printf("(");
355                 fflush(stdout);
356         }
357
358         printf("vacuum");
359         fflush(stdout);
360
361         res = PQexec(db->conn, "VACUUM ANALYZE ftsbench;");
362         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
363                 fprintf(stderr, "VACUUM ANALYZE command failed: %s", PQerrorMessage(db->conn));
364                 exit(1);
365         }
366         PQclear(res);
367         printf(") ");
368         fflush(stdout);
369
370         return;
371 }
372
373
374 static void
375 InsertRow(ftsDB* adb, int id, char *txt) {
376         ftsPG *db = (ftsPG*)adb;
377         PGresult *res;
378         const char      *paramValues[2];
379         uint32_t    binaryIntVal;
380         int             paramLengths[] = {sizeof(binaryIntVal), 0};
381         int     paramFormats[] = {1, 0};
382
383         if ( db->db.nquery == 0 ) {
384                 /* firsttime */
385                 
386                 res = PQprepare( db->conn, "insert_ftsbench",
387                                                  "INSERT INTO ftsbench (id, body) VALUES ( $1 ::int4, $2 ::text);",
388                                                  2, NULL );
389                 
390                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
391                         fprintf(stderr, "PREPARE INSERT command failed: %s", PQerrorMessage(db->conn));
392                         exit(1);
393                 }
394                 PQclear(res);
395
396                 if ( PQsetnonblocking(db->conn, 1) != 0 ) {
397                         fprintf(stderr, "PQsetnonblocking command failed: %s", PQerrorMessage(db->conn));
398                         exit(1);
399                 }
400         } else {
401                 waitResult(db);
402         }
403
404         binaryIntVal = htonl((uint32_t) id);
405         paramValues[0] = (char*)&binaryIntVal;
406         paramValues[1] = txt;
407
408         if ( PQsendQueryPrepared( db->conn, "insert_ftsbench",
409                                                  2, paramValues,
410                                                  paramLengths, paramFormats, 0) == 0 ) {
411                  fprintf(stderr, "PQsendQueryPrepared failed: %s", PQerrorMessage(db->conn));
412                  exit(1);
413         }
414
415         pgflush(db);
416
417         db->db.nquery++;
418 }
419
420 ftsDB* 
421 PGInit(char * connstr) {
422         ftsPG   *db = (ftsPG*)malloc(sizeof(ftsPG));
423         char    conninfo[1024];
424
425         memset(db,0,sizeof(ftsPG));
426
427         sprintf(conninfo, "dbname=%s", connstr);
428         db->conn = PQconnectdb(conninfo);
429
430         if (PQstatus(db->conn) != CONNECTION_OK) {
431                 fprintf(stderr, "Connection to database failed: %s", PQerrorMessage(db->conn));
432                 exit(1);
433         }
434
435         db->origreceiver = PQsetNoticeReceiver(db->conn, NoticeReceiver, (void *)db);
436
437         db->db.execQuery = execQuery;
438         db->db.startCreateScheme = startCreateScheme;
439         db->db.finishCreateScheme = finishCreateScheme;
440         db->db.InsertRow = InsertRow;
441         db->socket = PQsocket(db->conn);
442         if ( db->socket < 0 ) {
443                 fprintf(stderr,"Socket error\n");
444                 exit(1);
445         }
446         
447         return (ftsDB*)db;
448 }