Add close() call
[ftsbench.git] / pgdriver.c
1 /*
2  * Copyright (c) 2006 Teodor Sigaev <teodor@sigaev.ru>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *        notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *        notice, this list of conditions and the following disclaimer in the
12  *        documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of any co-contributors
14  *        may be used to endorse or promote products derived from this software
15  *        without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL CONTRIBUTORS BE LIABLE FOR ANY
21  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
23  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
25  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
27  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <unistd.h>
33 #include <string.h>
34 #include <time.h>
35 #include <errno.h>
36 #include <sys/types.h>
37 #include <netinet/in.h>
38
39 #include <libpq-fe.h>
40 #include "ftsbench.h"
41
42 #ifdef HAVE_POLL_H
43 #include <poll.h>
44 #else /* HAVE_POLL */
45 #ifdef HAVE_SYS_POLL_H
46 #include <sys/poll.h>
47 #else
48 #error Not defined HAVE_POLL_H or HAVE_SYS_POLL_H
49 #endif /* HAVE_SYS_POLL_H */
50 #endif /* HAVE_POLL */
51
52 #ifndef INFTIM
53 #define INFTIM (-1)
54 #endif
55
56 typedef enum SocketState {
57     SS_NONE = 0,
58         SS_READ,
59         SS_READYREAD,
60         SS_WRITE,
61         SS_READYWRITE
62 } SocketState;
63
64 typedef enum ResStatus {
65         RS_OK = 0,
66         RS_INPROGRESS
67 } ResStatus;
68
69 typedef struct ftsPG {
70         ftsDB   db;
71         PGconn  *conn;
72         int             flags;
73         PQnoticeReceiver        origreceiver;
74         int             emptyquery;
75         int             prepared;
76
77         int             socket;
78         SocketState     state;
79
80         StringBuf               b;
81 } ftsPG;
82
83 static void
84 NoticeReceiver(void *arg, const PGresult *res) {
85     ftsPG *db = (ftsPG*) arg;
86
87         db->emptyquery = 0;
88
89         if ( strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "00000") == 0 ) {
90                 char    *msg = PQresultErrorMessage(res);
91                 /* NOTICE */
92                 if ( strstr(msg, "does not exist, skipping")!=NULL ) {
93                         /* skip 'NOTICE:  table "wow" does not exist, skipping' */
94                         return;
95                 }
96                 if ( strstr(msg, "query contains only stopword(s) or doesn't contain lexeme(s), ignored")!=NULL ) {
97                         db->emptyquery = 1;
98                         return;
99                 }
100         }
101
102         db->origreceiver(db, res);
103 }
104
105 static void
106 pgflush(ftsPG *db) {
107         int flushres;
108
109         flushres=PQflush(db->conn);
110         if ( flushres > 0 ) {
111                 /* need more write */
112                 db->state = SS_WRITE;
113         } else if ( flushres == 0 ) {
114                 /* success write, waits for read */
115                 db->state = SS_READ;
116         } else {
117                 fprintf(stderr, "PQflush failed: %s\n", PQerrorMessage(db->conn));
118                 exit(1);
119         }
120 }
121
122 static ResStatus
123 checkStatus(ftsPG* db) {
124         switch( db->state ) {
125                 case SS_READYWRITE:
126                         pgflush(db);
127                 case SS_READ:
128                 case SS_WRITE:
129                         return RS_INPROGRESS;
130                 case SS_READYREAD:
131                         PQconsumeInput(db->conn);
132                         if ( PQisBusy(db->conn) != 0 ) {
133                                 db->state = SS_READ;
134                                 return RS_INPROGRESS;
135                         }
136                         break;
137                 case SS_NONE:
138                 default:
139                         fprintf(stderr,"Should not be here!\n");
140                         exit(1);
141                         break;
142         }
143
144         return RS_OK;
145 }
146                         
147 static void
148 waitResult(ftsPG *db) {
149         struct pollfd   pfd;
150         PGresult        *res;
151
152         pfd.fd = db->socket;
153         do {
154                 pfd.events = pfd.revents = 0;
155                 if ( db->state == SS_READ ) {
156                         pfd.events = POLLIN;
157                 } else if (  db->state == SS_WRITE ) {
158                         pfd.events = POLLOUT;
159                 }
160
161                 if ( pfd.events ) {
162                         int ret = poll( &pfd, 1, INFTIM);
163                         if ( ret<0 ) {
164                                 fprintf(stderr,"poll failed: %s\n", strerror(errno));
165                                 exit(1);
166                         }
167
168                         if ( pfd.revents & (POLLHUP | POLLNVAL | POLLERR) ) {
169                                 fprintf(stderr,"Poll report about socket error\n");
170                                 exit(1);
171                         } else if ( pfd.revents & POLLIN ) {
172                                 db->state = SS_READYREAD;
173                         } else if ( pfd.revents & POLLOUT ) {
174                                 db->state = SS_READYWRITE;
175                         }
176                 }
177         } while( checkStatus(db) != RS_OK );
178
179         while ( (res = PQgetResult(db->conn))!= NULL ) {
180                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
181                         fprintf(stderr, "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn));
182                         exit(1);
183                 }
184                 PQclear(res);
185         }
186 }
187
188 static int
189 checkEmptyQuery(ftsPG *db, PGresult *res) {
190         if ( 
191                         db->emptyquery  > 0 &&
192                         PQresultStatus(res) == PGRES_FATAL_ERROR &&
193                         strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "0A000") == 0 && /* FEATURE_NOT_SUPPORTED */
194                         strstr(PQresultErrorField(res, PG_DIAG_SOURCE_FILE), "ginscan.c") != NULL 
195                 ) 
196                 return 1;
197
198         return 0;
199 }
200
201 static void
202 execQuery(ftsDB* adb, char ** words, int flags) {
203         ftsPG *db = (ftsPG*)adb; 
204         const char      *paramValues[1];
205         int i = 0;
206         PGresult *res;
207
208         if ( db->prepared == 0 ) {
209                 /* firsttime */
210                 char buf[1024];
211
212                 db->flags = flags;
213
214                 sprintf(buf, "SELECT count(*) FROM ftsbench WHERE %s @@ to_tsquery( $1 ::text );",
215                         ( flags & FLG_FUNC ) ? "to_tsvector(body)" : "fts" );
216
217                 res = PQprepare( db->conn, "search_ftsbench", buf, 1, NULL );
218                 
219                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
220                         fprintf(stderr, "PREPARE SELECT command failed: %s\n", PQerrorMessage(db->conn));
221                         exit(1);
222                 }
223                 PQclear(res);
224
225                 db->prepared = 1;
226         }
227
228         db->b.strlen = 0;
229
230         while( *words ) {
231                 char *ptr = *words;
232
233                 if ( i!= 0 ) 
234                         sb_add(&db->b, (db->flags & FLG_OR) ? " | '" : " & '", 4);
235                 else
236                         sb_add(&db->b, "'", 1);
237
238                 while( *ptr ) {
239                         if (*ptr == '\'')
240                                 sb_add(&db->b, "'", 1);
241                         sb_add(&db->b, ptr, 1);
242                         ptr++;
243                 }
244                 sb_add(&db->b, "'", 1);
245
246                 i++;
247                         
248                 words++;
249         }
250         paramValues[0] = db->b.str;
251
252         res = PQexecPrepared( db->conn, "search_ftsbench",
253                                                  1, paramValues,
254                                                  NULL, NULL, 0);
255
256         if (PQresultStatus(res) != PGRES_TUPLES_OK) {
257                 /* skip error ' all words are a stop word' for GIN index -
258                    result is empty, in any case */
259                 if ( checkEmptyQuery(db, res) == 0 ) { 
260                         fprintf(stderr, "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn));
261                         exit(1);
262                 }
263         }
264         PQclear(res);
265
266         db->emptyquery = 0;
267         pthread_mutex_lock(&(db->db.nqueryMutex));
268         db->db.nquery ++;
269         pthread_mutex_unlock(&(db->db.nqueryMutex));
270 }
271
272 static void
273 startCreateScheme(ftsDB* adb, int flags) {
274         ftsPG *db = (ftsPG*)adb;
275         char    buf[1024];
276         PGresult *res;
277
278         db->flags = flags;
279
280         res = PQexec(db->conn, "DROP TABLE IF EXISTS ftsbench CASCADE;");
281         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
282                 fprintf(stderr, "DROP TABLE command failed: %s\n", PQerrorMessage(db->conn));
283                         exit(1);
284         }
285         PQclear(res);
286
287         if ( flags & FLG_FUNC ) 
288                 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text );");
289         else
290                 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text, fts     tsvector ); "
291                                         "CREATE TRIGGER tsvectorupdate BEFORE UPDATE OR INSERT ON ftsbench "
292                                         "FOR EACH ROW EXECUTE PROCEDURE tsearch2(fts, body);" );
293
294         res = PQexec(db->conn, buf);
295         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
296                 fprintf(stderr, "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn));
297                         exit(1);
298         }
299         PQclear(res);
300
301         res = PQexec(db->conn, "BEGIN;");
302         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
303                 fprintf(stderr, "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn));
304                         exit(1);
305         }
306         PQclear(res);
307
308         return;
309 }
310
311 static void
312 finishCreateScheme(ftsDB* adb) {
313         ftsPG *db = (ftsPG*)adb; 
314         PGresult *res;
315
316         if ( db->db.nquery > 0 ) {
317                 waitResult(db);
318                 
319                 if ( PQsetnonblocking(db->conn, 0) != 0 ) {
320                         fprintf(stderr, "PQsetnonblocking command failed: %s\n", PQerrorMessage(db->conn));
321                         exit(1);
322                 }
323         }
324
325         res = PQexec(db->conn, "COMMIT;");
326         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
327                 fprintf(stderr, "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn));
328                         exit(1);
329         }
330         PQclear(res);
331
332         if ( (db->flags & (FLG_GIST | FLG_GIN)) != 0 ) {
333                 char    buf[1024];
334
335                 if ( db->flags & FLG_FUNC ) 
336                         sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( to_tsvector(body) );", 
337                                                 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
338                 else
339                         sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( fts );", 
340                                                 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
341
342                 printf("(create index, ");
343                 fflush(stdout);
344
345                 res = PQexec(db->conn, buf);
346                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
347                         fprintf(stderr, "CREATE INDEX command failed: %s\n", PQerrorMessage(db->conn));
348                                 exit(1);
349                 }
350                 PQclear(res);
351         } else {
352                 printf("(");
353                 fflush(stdout);
354         }
355
356         printf("vacuum");
357         fflush(stdout);
358
359         res = PQexec(db->conn, "VACUUM ANALYZE ftsbench;");
360         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
361                 fprintf(stderr, "VACUUM ANALYZE command failed: %s\n", PQerrorMessage(db->conn));
362                 exit(1);
363         }
364         PQclear(res);
365         printf(") ");
366         fflush(stdout);
367
368         return;
369 }
370
371
372 static void
373 InsertRow(ftsDB* adb, int id, char *txt) {
374         ftsPG *db = (ftsPG*)adb;
375         PGresult *res;
376         const char      *paramValues[2];
377         uint32_t    binaryIntVal;
378         int             paramLengths[] = {sizeof(binaryIntVal), 0};
379         int     paramFormats[] = {1, 0};
380
381         if ( db->db.nquery == 0 ) {
382                 /* firsttime */
383                 
384                 res = PQprepare( db->conn, "insert_ftsbench",
385                                                  "INSERT INTO ftsbench (id, body) VALUES ( $1 ::int4, $2 ::text);",
386                                                  2, NULL );
387                 
388                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
389                         fprintf(stderr, "PREPARE INSERT command failed: %s\n", PQerrorMessage(db->conn));
390                         exit(1);
391                 }
392                 PQclear(res);
393
394                 if ( PQsetnonblocking(db->conn, 1) != 0 ) {
395                         fprintf(stderr, "PQsetnonblocking command failed: %s\n", PQerrorMessage(db->conn));
396                         exit(1);
397                 }
398         } else {
399                 waitResult(db);
400         }
401
402         binaryIntVal = htonl((uint32_t) id);
403         paramValues[0] = (char*)&binaryIntVal;
404         paramValues[1] = txt;
405
406         if ( PQsendQueryPrepared( db->conn, "insert_ftsbench",
407                                                  2, paramValues,
408                                                  paramLengths, paramFormats, 0) == 0 ) {
409                  fprintf(stderr, "PQsendQueryPrepared failed: %s\n", PQerrorMessage(db->conn));
410                  exit(1);
411         }
412
413         pgflush(db);
414
415         db->db.nquery++;
416 }
417
418 static void 
419 Close(ftsDB* adb) {
420         ftsPG *db = (ftsPG*)adb;
421
422         PQfinish(db->conn);
423 }
424
425 ftsDB* 
426 PGInit(char * connstr) {
427         ftsPG   *db = (ftsPG*)malloc(sizeof(ftsPG));
428         char    conninfo[1024];
429
430         memset(db,0,sizeof(ftsPG));
431
432         sprintf(conninfo, "dbname=%s", connstr);
433         db->conn = PQconnectdb(conninfo);
434
435         if (PQstatus(db->conn) != CONNECTION_OK) {
436                 fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(db->conn));
437                 exit(1);
438         }
439
440         db->origreceiver = PQsetNoticeReceiver(db->conn, NoticeReceiver, (void *)db);
441
442         db->db.execQuery = execQuery;
443         db->db.startCreateScheme = startCreateScheme;
444         db->db.finishCreateScheme = finishCreateScheme;
445         db->db.InsertRow = InsertRow;
446         db->db.Close = Close;
447         db->socket = PQsocket(db->conn);
448         if ( db->socket < 0 ) {
449                 fprintf(stderr,"Socket error\n");
450                 exit(1);
451         }
452         
453         return (ftsDB*)db;
454 }