add .gitignore
[ftsbench.git] / pgdriver.c
1 /*
2  * Copyright (c) 2006 Teodor Sigaev <teodor@sigaev.ru>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *        notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *        notice, this list of conditions and the following disclaimer in the
12  *        documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of any co-contributors
14  *        may be used to endorse or promote products derived from this software
15  *        without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL CONTRIBUTORS BE LIABLE FOR ANY
21  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
23  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
25  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
27  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <unistd.h>
33 #include <string.h>
34 #include <time.h>
35 #include <errno.h>
36 #include <sys/types.h>
37 #include <netinet/in.h>
38
39 #include <libpq-fe.h>
40 #include "ftsbench.h"
41
42 #ifdef HAVE_POLL_H
43 #include <poll.h>
44 #else /* HAVE_POLL */
45 #ifdef HAVE_SYS_POLL_H
46 #include <sys/poll.h>
47 #else
48 #error Not defined HAVE_POLL_H or HAVE_SYS_POLL_H
49 #endif /* HAVE_SYS_POLL_H */
50 #endif /* HAVE_POLL */
51
52 #ifndef INFTIM
53 #define INFTIM (-1)
54 #endif
55
56 typedef enum SocketState {
57     SS_NONE = 0,
58         SS_READ,
59         SS_READYREAD,
60         SS_WRITE,
61         SS_READYWRITE
62 } SocketState;
63
64 typedef enum ResStatus {
65         RS_OK = 0,
66         RS_INPROGRESS
67 } ResStatus;
68
69 typedef struct ftsPG {
70         ftsDB   db;
71         PGconn  *conn;
72         int             flags;
73         PQnoticeReceiver        origreceiver;
74         int             emptyquery;
75         int             prepared;
76
77         int             socket;
78         SocketState     state;
79
80         StringBuf               b;
81 } ftsPG;
82
83 static void
84 NoticeReceiver(void *arg, const PGresult *res) {
85     ftsPG *db = (ftsPG*) arg;
86
87         db->emptyquery = 0;
88
89         if ( strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "00000") == 0 ) {
90                 char    *msg = PQresultErrorMessage(res);
91                 /* NOTICE */
92                 if ( strstr(msg, "does not exist, skipping")!=NULL ) {
93                         /* skip 'NOTICE:  table "wow" does not exist, skipping' */
94                         return;
95                 }
96                 if ( strstr(msg, "query contains only stopword(s) or doesn't contain lexeme(s), ignored")!=NULL ) {
97                         db->emptyquery = 1;
98                         return;
99                 }
100         }
101
102         db->origreceiver(db, res);
103 }
104
105 static void
106 pgflush(ftsPG *db) {
107         int flushres;
108
109         flushres=PQflush(db->conn);
110         if ( flushres > 0 ) {
111                 /* need more write */
112                 db->state = SS_WRITE;
113         } else if ( flushres == 0 ) {
114                 /* success write, waits for read */
115                 db->state = SS_READ;
116         } else {
117                 fatal( "PQflush failed: %s\n", PQerrorMessage(db->conn));
118         }
119 }
120
121 static ResStatus
122 checkStatus(ftsPG* db) {
123         switch( db->state ) {
124                 case SS_READYWRITE:
125                         pgflush(db);
126                 case SS_READ:
127                 case SS_WRITE:
128                         return RS_INPROGRESS;
129                 case SS_READYREAD:
130                         PQconsumeInput(db->conn);
131                         if ( PQisBusy(db->conn) != 0 ) {
132                                 db->state = SS_READ;
133                                 return RS_INPROGRESS;
134                         }
135                         break;
136                 case SS_NONE:
137                 default:
138                         fatal( "Should not be here!\n");
139                         break;
140         }
141
142         return RS_OK;
143 }
144                         
145 static void
146 waitResult(ftsPG *db) {
147         struct pollfd   pfd;
148         PGresult        *res;
149
150         pfd.fd = db->socket;
151         do {
152                 pfd.events = pfd.revents = 0;
153                 if ( db->state == SS_READ ) {
154                         pfd.events = POLLIN;
155                 } else if (  db->state == SS_WRITE ) {
156                         pfd.events = POLLOUT;
157                 }
158
159                 if ( pfd.events ) {
160                         int ret = poll( &pfd, 1, INFTIM);
161                         if ( ret<0 ) 
162                                 fatal("poll failed: %s\n", strerror(errno));
163
164                         if ( pfd.revents & (POLLHUP | POLLNVAL | POLLERR) ) { 
165                                 fatal("Poll report about socket error\n");
166                         } else if ( pfd.revents & POLLIN ) {
167                                 db->state = SS_READYREAD;
168                         } else if ( pfd.revents & POLLOUT ) {
169                                 db->state = SS_READYWRITE;
170                         }
171                 }
172         } while( checkStatus(db) != RS_OK );
173
174         while ( (res = PQgetResult(db->conn))!= NULL ) {
175                 if (PQresultStatus(res) != PGRES_COMMAND_OK) 
176                         fatal( "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn));
177                 PQclear(res);
178         }
179 }
180
181 static int
182 checkEmptyQuery(ftsPG *db, PGresult *res) {
183         if ( 
184                         db->emptyquery  > 0 &&
185                         PQresultStatus(res) == PGRES_FATAL_ERROR &&
186                         strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "0A000") == 0 && /* FEATURE_NOT_SUPPORTED */
187                         strstr(PQresultErrorField(res, PG_DIAG_SOURCE_FILE), "ginscan.c") != NULL 
188                 ) 
189                 return 1;
190
191         return 0;
192 }
193
194 static void
195 execQuery(ftsDB* adb, char ** words, int flags) {
196         ftsPG *db = (ftsPG*)adb; 
197         int i = 0;
198         PGresult *res;
199
200         if ( db->prepared == 0 ) {
201                 /* firsttime */
202                 char buf[1024];
203
204                 db->flags = flags;
205
206                 sprintf(buf, "SELECT count(*) FROM ftsbench WHERE %s @@ to_tsquery( $1 ::text );",
207                         ( flags & FLG_FUNC ) ? "to_tsvector(body)" : "fts" );
208
209                 res = PQprepare( db->conn, "search_ftsbench", buf, 1, NULL );
210                 
211                 if (PQresultStatus(res) != PGRES_COMMAND_OK) 
212                         fatal( "PREPARE SELECT command failed: %s\n", PQerrorMessage(db->conn));
213                 PQclear(res);
214
215                 db->prepared = 1;
216         }
217
218         db->b.strlen = 0;
219
220         while( *words ) {
221                 char *ptr = *words;
222
223                 if ( i!= 0 ) 
224                         sb_add(&db->b, (db->flags & FLG_OR) ? " | '" : " & '", 4);
225                 else
226                         sb_addchar(&db->b, '\'');
227
228                 while( *ptr ) {
229                         if (*ptr == '\'')
230                                 sb_addchar(&db->b, '\'');
231                         sb_addchar(&db->b, *ptr);
232                         ptr++;
233                 }
234                 sb_addchar(&db->b, '\'');
235
236                 i++;
237                         
238                 words++;
239         }
240         i = 1;
241
242         res = PQexecPrepared( db->conn, "search_ftsbench",
243                                                  1, (const char**)&(db->b.str),
244                                                  &(db->b.strlen), &i, 0);
245
246         if (PQresultStatus(res) != PGRES_TUPLES_OK) {
247                 /* skip error ' all words are a stop word' for GIN index -
248                    result is empty, in any case */
249                 if ( checkEmptyQuery(db, res) == 0 ) 
250                         fatal( "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn));
251         } else if ( PQntuples(res) == 1 ) {
252                 db->db.nres += atoi( PQgetvalue(res,0,0) );
253         } else {
254                 fatal("Bad PQntuples %d\n", PQntuples(res));
255         }
256         
257         PQclear(res);
258
259         db->emptyquery = 0;
260         pthread_mutex_lock(&(db->db.nqueryMutex));
261         db->db.nquery ++;
262         pthread_mutex_unlock(&(db->db.nqueryMutex));
263 }
264
265 static void
266 startCreateScheme(ftsDB* adb, int flags) {
267         ftsPG *db = (ftsPG*)adb;
268         char    buf[1024];
269         PGresult *res;
270
271         db->flags = flags;
272
273         res = PQexec(db->conn, "DROP TABLE IF EXISTS ftsbench CASCADE;");
274         if (PQresultStatus(res) != PGRES_COMMAND_OK) 
275                 fatal( "DROP TABLE command failed: %s\n", PQerrorMessage(db->conn));
276         PQclear(res);
277
278         if ( flags & FLG_FUNC ) 
279                 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text );");
280         else
281                 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text, fts     tsvector ); "
282                                         "CREATE TRIGGER tsvectorupdate BEFORE UPDATE OR INSERT ON ftsbench "
283                                         "FOR EACH ROW EXECUTE PROCEDURE tsearch2(fts, body);" );
284
285         res = PQexec(db->conn, buf);
286         if (PQresultStatus(res) != PGRES_COMMAND_OK) 
287                 fatal( "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn));
288         PQclear(res);
289
290         res = PQexec(db->conn, "BEGIN;");
291         if (PQresultStatus(res) != PGRES_COMMAND_OK) 
292                 fatal( "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn));
293         PQclear(res);
294
295         return;
296 }
297
298 static void
299 finishCreateScheme(ftsDB* adb) {
300         ftsPG *db = (ftsPG*)adb; 
301         PGresult *res;
302
303         if ( db->db.nquery > 0 ) {
304                 waitResult(db);
305                 
306                 if ( PQsetnonblocking(db->conn, 0) != 0 ) 
307                         fatal( "PQsetnonblocking command failed: %s\n", PQerrorMessage(db->conn));
308         }
309
310         res = PQexec(db->conn, "COMMIT;");
311         if (PQresultStatus(res) != PGRES_COMMAND_OK) 
312                 fatal( "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn));
313         PQclear(res);
314
315         if ( (db->flags & (FLG_GIST | FLG_GIN)) != 0 ) {
316                 char    buf[1024];
317
318                 if ( db->flags & FLG_FUNC ) 
319                         sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( to_tsvector(body) );", 
320                                                 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
321                 else
322                         sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( fts );", 
323                                                 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
324
325                 report("(create index, ");
326
327                 res = PQexec(db->conn, buf);
328                 if (PQresultStatus(res) != PGRES_COMMAND_OK) 
329                         fatal( "CREATE INDEX command failed: %s\n", PQerrorMessage(db->conn));
330                 PQclear(res);
331         } else 
332                 report("(");
333
334         report("vacuum");
335
336         res = PQexec(db->conn, "VACUUM ANALYZE ftsbench;");
337         if (PQresultStatus(res) != PGRES_COMMAND_OK) 
338                 fatal( "VACUUM ANALYZE command failed: %s\n", PQerrorMessage(db->conn));
339         PQclear(res);
340
341         report(") ");
342
343         return;
344 }
345
346
347 static void
348 InsertRow(ftsDB* adb, int id, char *txt) {
349         ftsPG *db = (ftsPG*)adb;
350         PGresult *res;
351         const char      *paramValues[2];
352         uint32_t    binaryIntVal;
353         int             paramLengths[] = {sizeof(binaryIntVal), 0};
354         int     paramFormats[] = {1, 0};
355
356         if ( db->db.nquery == 0 ) {
357                 /* firsttime */
358                 
359                 res = PQprepare( db->conn, "insert_ftsbench",
360                                                  "INSERT INTO ftsbench (id, body) VALUES ( $1 ::int4, $2 ::text);",
361                                                  2, NULL );
362                 
363                 if (PQresultStatus(res) != PGRES_COMMAND_OK) 
364                         fatal( "PREPARE INSERT command failed: %s\n", PQerrorMessage(db->conn));
365                 PQclear(res);
366
367                 if ( PQsetnonblocking(db->conn, 1) != 0 ) 
368                         fatal( "PQsetnonblocking command failed: %s\n", PQerrorMessage(db->conn));
369         } else {
370                 waitResult(db);
371         }
372
373         binaryIntVal = htonl((uint32_t) id);
374         paramValues[0] = (char*)&binaryIntVal;
375         paramValues[1] = txt;
376
377         if ( PQsendQueryPrepared( db->conn, "insert_ftsbench",
378                                                  2, paramValues,
379                                                  paramLengths, paramFormats, 0) == 0 ) 
380                  fatal( "PQsendQueryPrepared failed: %s\n", PQerrorMessage(db->conn));
381
382         pgflush(db);
383
384         db->db.nquery++;
385 }
386
387 static void 
388 Close(ftsDB* adb) {
389         ftsPG *db = (ftsPG*)adb;
390
391         PQfinish(db->conn);
392 }
393
394 ftsDB* 
395 PGInit(char * connstr) {
396         ftsPG   *db = (ftsPG*)malloc(sizeof(ftsPG));
397         char    conninfo[1024];
398
399         memset(db,0,sizeof(ftsPG));
400
401         sprintf(conninfo, "dbname=%s", connstr);
402         db->conn = PQconnectdb(conninfo);
403
404         if (PQstatus(db->conn) != CONNECTION_OK) 
405                 fatal( "Connection to database failed: %s\n", PQerrorMessage(db->conn));
406
407         db->origreceiver = PQsetNoticeReceiver(db->conn, NoticeReceiver, (void *)db);
408
409         db->db.execQuery = execQuery;
410         db->db.startCreateScheme = startCreateScheme;
411         db->db.finishCreateScheme = finishCreateScheme;
412         db->db.InsertRow = InsertRow;
413         db->db.Close = Close;
414         db->socket = PQsocket(db->conn);
415         if ( db->socket < 0 ) 
416                 fatal("Socket error\n");
417         
418         return (ftsDB*)db;
419 }