Clean up: use separate function for fatal error
[ftsbench.git] / pgdriver.c
1 /*
2  * Copyright (c) 2006 Teodor Sigaev <teodor@sigaev.ru>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *        notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *        notice, this list of conditions and the following disclaimer in the
12  *        documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of any co-contributors
14  *        may be used to endorse or promote products derived from this software
15  *        without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL CONTRIBUTORS BE LIABLE FOR ANY
21  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
23  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
25  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
27  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <unistd.h>
33 #include <string.h>
34 #include <time.h>
35 #include <errno.h>
36 #include <sys/types.h>
37 #include <netinet/in.h>
38
39 #include <libpq-fe.h>
40 #include "ftsbench.h"
41
42 #ifdef HAVE_POLL_H
43 #include <poll.h>
44 #else /* HAVE_POLL */
45 #ifdef HAVE_SYS_POLL_H
46 #include <sys/poll.h>
47 #else
48 #error Not defined HAVE_POLL_H or HAVE_SYS_POLL_H
49 #endif /* HAVE_SYS_POLL_H */
50 #endif /* HAVE_POLL */
51
52 #ifndef INFTIM
53 #define INFTIM (-1)
54 #endif
55
56 typedef enum SocketState {
57     SS_NONE = 0,
58         SS_READ,
59         SS_READYREAD,
60         SS_WRITE,
61         SS_READYWRITE
62 } SocketState;
63
64 typedef enum ResStatus {
65         RS_OK = 0,
66         RS_INPROGRESS
67 } ResStatus;
68
69 typedef struct ftsPG {
70         ftsDB   db;
71         PGconn  *conn;
72         int             flags;
73         PQnoticeReceiver        origreceiver;
74         int             emptyquery;
75         int             prepared;
76
77         int             socket;
78         SocketState     state;
79
80         StringBuf               b;
81 } ftsPG;
82
83 static void
84 NoticeReceiver(void *arg, const PGresult *res) {
85     ftsPG *db = (ftsPG*) arg;
86
87         db->emptyquery = 0;
88
89         if ( strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "00000") == 0 ) {
90                 char    *msg = PQresultErrorMessage(res);
91                 /* NOTICE */
92                 if ( strstr(msg, "does not exist, skipping")!=NULL ) {
93                         /* skip 'NOTICE:  table "wow" does not exist, skipping' */
94                         return;
95                 }
96                 if ( strstr(msg, "query contains only stopword(s) or doesn't contain lexeme(s), ignored")!=NULL ) {
97                         db->emptyquery = 1;
98                         return;
99                 }
100         }
101
102         db->origreceiver(db, res);
103 }
104
105 static void
106 pgflush(ftsPG *db) {
107         int flushres;
108
109         flushres=PQflush(db->conn);
110         if ( flushres > 0 ) {
111                 /* need more write */
112                 db->state = SS_WRITE;
113         } else if ( flushres == 0 ) {
114                 /* success write, waits for read */
115                 db->state = SS_READ;
116         } else {
117                 fatal( "PQflush failed: %s\n", PQerrorMessage(db->conn));
118         }
119 }
120
121 static ResStatus
122 checkStatus(ftsPG* db) {
123         switch( db->state ) {
124                 case SS_READYWRITE:
125                         pgflush(db);
126                 case SS_READ:
127                 case SS_WRITE:
128                         return RS_INPROGRESS;
129                 case SS_READYREAD:
130                         PQconsumeInput(db->conn);
131                         if ( PQisBusy(db->conn) != 0 ) {
132                                 db->state = SS_READ;
133                                 return RS_INPROGRESS;
134                         }
135                         break;
136                 case SS_NONE:
137                 default:
138                         fatal( "Should not be here!\n");
139                         break;
140         }
141
142         return RS_OK;
143 }
144                         
145 static void
146 waitResult(ftsPG *db) {
147         struct pollfd   pfd;
148         PGresult        *res;
149
150         pfd.fd = db->socket;
151         do {
152                 pfd.events = pfd.revents = 0;
153                 if ( db->state == SS_READ ) {
154                         pfd.events = POLLIN;
155                 } else if (  db->state == SS_WRITE ) {
156                         pfd.events = POLLOUT;
157                 }
158
159                 if ( pfd.events ) {
160                         int ret = poll( &pfd, 1, INFTIM);
161                         if ( ret<0 ) 
162                                 fatal("poll failed: %s\n", strerror(errno));
163
164                         if ( pfd.revents & (POLLHUP | POLLNVAL | POLLERR) ) { 
165                                 fatal("Poll report about socket error\n");
166                         } else if ( pfd.revents & POLLIN ) {
167                                 db->state = SS_READYREAD;
168                         } else if ( pfd.revents & POLLOUT ) {
169                                 db->state = SS_READYWRITE;
170                         }
171                 }
172         } while( checkStatus(db) != RS_OK );
173
174         while ( (res = PQgetResult(db->conn))!= NULL ) {
175                 if (PQresultStatus(res) != PGRES_COMMAND_OK) 
176                         fatal( "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn));
177                 PQclear(res);
178         }
179 }
180
181 static int
182 checkEmptyQuery(ftsPG *db, PGresult *res) {
183         if ( 
184                         db->emptyquery  > 0 &&
185                         PQresultStatus(res) == PGRES_FATAL_ERROR &&
186                         strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "0A000") == 0 && /* FEATURE_NOT_SUPPORTED */
187                         strstr(PQresultErrorField(res, PG_DIAG_SOURCE_FILE), "ginscan.c") != NULL 
188                 ) 
189                 return 1;
190
191         return 0;
192 }
193
194 static void
195 execQuery(ftsDB* adb, char ** words, int flags) {
196         ftsPG *db = (ftsPG*)adb; 
197         const char      *paramValues[1];
198         int i = 0;
199         PGresult *res;
200
201         if ( db->prepared == 0 ) {
202                 /* firsttime */
203                 char buf[1024];
204
205                 db->flags = flags;
206
207                 sprintf(buf, "SELECT count(*) FROM ftsbench WHERE %s @@ to_tsquery( $1 ::text );",
208                         ( flags & FLG_FUNC ) ? "to_tsvector(body)" : "fts" );
209
210                 res = PQprepare( db->conn, "search_ftsbench", buf, 1, NULL );
211                 
212                 if (PQresultStatus(res) != PGRES_COMMAND_OK) 
213                         fatal( "PREPARE SELECT command failed: %s\n", PQerrorMessage(db->conn));
214                 PQclear(res);
215
216                 db->prepared = 1;
217         }
218
219         db->b.strlen = 0;
220
221         while( *words ) {
222                 char *ptr = *words;
223
224                 if ( i!= 0 ) 
225                         sb_add(&db->b, (db->flags & FLG_OR) ? " | '" : " & '", 4);
226                 else
227                         sb_add(&db->b, "'", 1);
228
229                 while( *ptr ) {
230                         if (*ptr == '\'')
231                                 sb_add(&db->b, "'", 1);
232                         sb_add(&db->b, ptr, 1);
233                         ptr++;
234                 }
235                 sb_add(&db->b, "'", 1);
236
237                 i++;
238                         
239                 words++;
240         }
241         paramValues[0] = db->b.str;
242
243         res = PQexecPrepared( db->conn, "search_ftsbench",
244                                                  1, paramValues,
245                                                  NULL, NULL, 0);
246
247         if (PQresultStatus(res) != PGRES_TUPLES_OK) {
248                 /* skip error ' all words are a stop word' for GIN index -
249                    result is empty, in any case */
250                 if ( checkEmptyQuery(db, res) == 0 ) 
251                         fatal( "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn));
252         } else if ( PQntuples(res) == 1 ) {
253                 db->db.nres += atoi( PQgetvalue(res,0,0) );
254         } else {
255                 fatal("Bad PQntuples %d\n", PQntuples(res));
256         }
257         
258         PQclear(res);
259
260         db->emptyquery = 0;
261         pthread_mutex_lock(&(db->db.nqueryMutex));
262         db->db.nquery ++;
263         pthread_mutex_unlock(&(db->db.nqueryMutex));
264 }
265
266 static void
267 startCreateScheme(ftsDB* adb, int flags) {
268         ftsPG *db = (ftsPG*)adb;
269         char    buf[1024];
270         PGresult *res;
271
272         db->flags = flags;
273
274         res = PQexec(db->conn, "DROP TABLE IF EXISTS ftsbench CASCADE;");
275         if (PQresultStatus(res) != PGRES_COMMAND_OK) 
276                 fatal( "DROP TABLE command failed: %s\n", PQerrorMessage(db->conn));
277         PQclear(res);
278
279         if ( flags & FLG_FUNC ) 
280                 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text );");
281         else
282                 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text, fts     tsvector ); "
283                                         "CREATE TRIGGER tsvectorupdate BEFORE UPDATE OR INSERT ON ftsbench "
284                                         "FOR EACH ROW EXECUTE PROCEDURE tsearch2(fts, body);" );
285
286         res = PQexec(db->conn, buf);
287         if (PQresultStatus(res) != PGRES_COMMAND_OK) 
288                 fatal( "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn));
289         PQclear(res);
290
291         res = PQexec(db->conn, "BEGIN;");
292         if (PQresultStatus(res) != PGRES_COMMAND_OK) 
293                 fatal( "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn));
294         PQclear(res);
295
296         return;
297 }
298
299 static void
300 finishCreateScheme(ftsDB* adb) {
301         ftsPG *db = (ftsPG*)adb; 
302         PGresult *res;
303
304         if ( db->db.nquery > 0 ) {
305                 waitResult(db);
306                 
307                 if ( PQsetnonblocking(db->conn, 0) != 0 ) 
308                         fatal( "PQsetnonblocking command failed: %s\n", PQerrorMessage(db->conn));
309         }
310
311         res = PQexec(db->conn, "COMMIT;");
312         if (PQresultStatus(res) != PGRES_COMMAND_OK) 
313                 fatal( "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn));
314         PQclear(res);
315
316         if ( (db->flags & (FLG_GIST | FLG_GIN)) != 0 ) {
317                 char    buf[1024];
318
319                 if ( db->flags & FLG_FUNC ) 
320                         sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( to_tsvector(body) );", 
321                                                 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
322                 else
323                         sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( fts );", 
324                                                 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
325
326                 report("(create index, ");
327
328                 res = PQexec(db->conn, buf);
329                 if (PQresultStatus(res) != PGRES_COMMAND_OK) 
330                         fatal( "CREATE INDEX command failed: %s\n", PQerrorMessage(db->conn));
331                 PQclear(res);
332         } else 
333                 report("(");
334
335         report("vacuum");
336
337         res = PQexec(db->conn, "VACUUM ANALYZE ftsbench;");
338         if (PQresultStatus(res) != PGRES_COMMAND_OK) 
339                 fatal( "VACUUM ANALYZE command failed: %s\n", PQerrorMessage(db->conn));
340         PQclear(res);
341
342         report(") ");
343
344         return;
345 }
346
347
348 static void
349 InsertRow(ftsDB* adb, int id, char *txt) {
350         ftsPG *db = (ftsPG*)adb;
351         PGresult *res;
352         const char      *paramValues[2];
353         uint32_t    binaryIntVal;
354         int             paramLengths[] = {sizeof(binaryIntVal), 0};
355         int     paramFormats[] = {1, 0};
356
357         if ( db->db.nquery == 0 ) {
358                 /* firsttime */
359                 
360                 res = PQprepare( db->conn, "insert_ftsbench",
361                                                  "INSERT INTO ftsbench (id, body) VALUES ( $1 ::int4, $2 ::text);",
362                                                  2, NULL );
363                 
364                 if (PQresultStatus(res) != PGRES_COMMAND_OK) 
365                         fatal( "PREPARE INSERT command failed: %s\n", PQerrorMessage(db->conn));
366                 PQclear(res);
367
368                 if ( PQsetnonblocking(db->conn, 1) != 0 ) 
369                         fatal( "PQsetnonblocking command failed: %s\n", PQerrorMessage(db->conn));
370         } else {
371                 waitResult(db);
372         }
373
374         binaryIntVal = htonl((uint32_t) id);
375         paramValues[0] = (char*)&binaryIntVal;
376         paramValues[1] = txt;
377
378         if ( PQsendQueryPrepared( db->conn, "insert_ftsbench",
379                                                  2, paramValues,
380                                                  paramLengths, paramFormats, 0) == 0 ) 
381                  fatal( "PQsendQueryPrepared failed: %s\n", PQerrorMessage(db->conn));
382
383         pgflush(db);
384
385         db->db.nquery++;
386 }
387
388 static void 
389 Close(ftsDB* adb) {
390         ftsPG *db = (ftsPG*)adb;
391
392         PQfinish(db->conn);
393 }
394
395 ftsDB* 
396 PGInit(char * connstr) {
397         ftsPG   *db = (ftsPG*)malloc(sizeof(ftsPG));
398         char    conninfo[1024];
399
400         memset(db,0,sizeof(ftsPG));
401
402         sprintf(conninfo, "dbname=%s", connstr);
403         db->conn = PQconnectdb(conninfo);
404
405         if (PQstatus(db->conn) != CONNECTION_OK) 
406                 fatal( "Connection to database failed: %s\n", PQerrorMessage(db->conn));
407
408         db->origreceiver = PQsetNoticeReceiver(db->conn, NoticeReceiver, (void *)db);
409
410         db->db.execQuery = execQuery;
411         db->db.startCreateScheme = startCreateScheme;
412         db->db.finishCreateScheme = finishCreateScheme;
413         db->db.InsertRow = InsertRow;
414         db->db.Close = Close;
415         db->socket = PQsocket(db->conn);
416         if ( db->socket < 0 ) 
417                 fatal("Socket error\n");
418         
419         return (ftsDB*)db;
420 }