Initial revision
[ftsbench.git] / pgdriver.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <string.h>
5 #include <time.h>
6 #include <errno.h>
7 #include <sys/types.h>
8 #include <netinet/in.h>
9
10 #include <libpq-fe.h>
11 #include "ftsbench.h"
12
13 #ifdef HAVE_POLL_H
14 #include <poll.h>
15 #else /* HAVE_POLL */
16 #ifdef HAVE_SYS_POLL_H
17 #include <sys/poll.h>
18 #else
19 #error Not defined HAVE_POLL_H or HAVE_SYS_POLL_H
20 #endif /* HAVE_SYS_POLL_H */
21 #endif /* HAVE_POLL */
22
23 typedef enum SocketState {
24     SS_NONE = 0,
25         SS_READ,
26         SS_READYREAD,
27         SS_WRITE,
28         SS_READYWRITE
29 } SocketState;
30
31 typedef enum ResStatus {
32         RS_OK = 0,
33         RS_INPROGRESS
34 } ResStatus;
35
36 typedef struct ftsPG {
37         ftsDB   db;
38         PGconn  *conn;
39         int             flags;
40         PQnoticeReceiver        origreceiver;
41         int             emptyquery;
42         int             prepared;
43
44         int             socket;
45         SocketState     state;
46
47         StringBuf               b;
48 } ftsPG;
49
50 static void
51 NoticeReceiver(void *arg, const PGresult *res) {
52     ftsPG *db = (ftsPG*) arg;
53
54         db->emptyquery = 0;
55
56         if ( strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "00000") == 0 ) {
57                 char    *msg = PQresultErrorMessage(res);
58                 /* NOTICE */
59                 if ( strstr(msg, "does not exist, skipping")!=NULL ) {
60                         /* skip 'NOTICE:  table "wow" does not exist, skipping' */
61                         return;
62                 }
63                 if ( strstr(msg, "query contains only stopword(s) or doesn't contain lexeme(s), ignored")!=NULL ) {
64                         db->emptyquery = 1;
65                         return;
66                 }
67         }
68
69         db->origreceiver(db, res);
70 }
71
72 static void
73 pgflush(ftsPG *db) {
74         int flushres;
75
76         flushres=PQflush(db->conn);
77         if ( flushres > 0 ) {
78                 /* need more write */
79                 db->state = SS_WRITE;
80         } else if ( flushres == 0 ) {
81                 /* success write, waits for read */
82                 db->state = SS_READ;
83         } else {
84                 fprintf(stderr, "PQflush failed: %s", PQerrorMessage(db->conn));
85                 exit(1);
86         }
87 }
88
89 static ResStatus
90 checkStatus(ftsPG* db) {
91         switch( db->state ) {
92                 case SS_READYWRITE:
93                         pgflush(db);
94                 case SS_READ:
95                 case SS_WRITE:
96                         return RS_INPROGRESS;
97                 case SS_READYREAD:
98                         PQconsumeInput(db->conn);
99                         if ( PQisBusy(db->conn) != 0 ) {
100                                 db->state = SS_READ;
101                                 return RS_INPROGRESS;
102                         }
103                         break;
104                 case SS_NONE:
105                 default:
106                         fprintf(stderr,"Should not be here!\n");
107                         exit(1);
108                         break;
109         }
110
111         return RS_OK;
112 }
113                         
114 static void
115 waitResult(ftsPG *db) {
116         struct pollfd   pfd;
117         PGresult        *res;
118
119         pfd.fd = db->socket;
120         do {
121                 pfd.events = pfd.revents = 0;
122                 if ( db->state == SS_READ ) {
123                         pfd.events = POLLIN;
124                 } else if (  db->state == SS_WRITE ) {
125                         pfd.events = POLLOUT;
126                 }
127
128                 if ( pfd.events ) {
129                         int ret = poll( &pfd, 1, INFTIM);
130                         if ( ret<0 ) {
131                                 fprintf(stderr,"poll failed: %s", strerror(errno));
132                                 exit(1);
133                         }
134
135                         if ( pfd.revents & (POLLHUP | POLLNVAL | POLLERR) ) {
136                                 fprintf(stderr,"Poll report about socket error\n");
137                                 exit(1);
138                         } else if ( pfd.revents & POLLIN ) {
139                                 db->state = SS_READYREAD;
140                         } else if ( pfd.revents & POLLOUT ) {
141                                 db->state = SS_READYWRITE;
142                         }
143                 }
144         } while( checkStatus(db) != RS_OK );
145
146         while ( (res = PQgetResult(db->conn))!= NULL ) {
147                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
148                         fprintf(stderr, "Execution of prepared statement failed: %s", PQerrorMessage(db->conn));
149                         exit(1);
150                 }
151                 PQclear(res);
152         }
153 }
154
155 static int
156 checkEmptyQuery(ftsPG *db, PGresult *res) {
157         if ( 
158                         db->emptyquery  > 0 &&
159                         PQresultStatus(res) == PGRES_FATAL_ERROR &&
160                         strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "0A000") == 0 && /* FEATURE_NOT_SUPPORTED */
161                         strstr(PQresultErrorField(res, PG_DIAG_SOURCE_FILE), "ginscan.c") != NULL 
162                 ) 
163                 return 1;
164
165         return 0;
166 }
167
168 static void
169 execQuery(ftsDB* adb, char ** words, int flags) {
170         ftsPG *db = (ftsPG*)adb; 
171         const char      *paramValues[1];
172         int i = 0;
173         PGresult *res;
174
175         if ( db->prepared == 0 ) {
176                 /* firsttime */
177                 char buf[1024];
178
179                 db->flags = flags;
180
181                 if ( flags & FLG_FUNC )
182                         sprintf(buf, "SELECT count(*) FROM ftsbench WHERE to_tsvector(body) @@ to_tsquery( $1 ::text );");
183                 else
184                         sprintf(buf, "SELECT count(*) FROM ftsbench WHERE fts @@ to_tsquery( $1 ::text );");
185
186                 res = PQprepare( db->conn, "search_ftsbench", buf, 1, NULL );
187                 
188                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
189                         fprintf(stderr, "PREPARE SELECT command failed: %s", PQerrorMessage(db->conn));
190                         exit(1);
191                 }
192                 PQclear(res);
193
194                 db->prepared = 1;
195         }
196
197         db->b.strlen = 0;
198
199         while( *words ) {
200                 char *ptr = *words;
201
202                 if ( i!= 0 ) 
203                         sb_add(&db->b, (db->flags & FLG_OR) ? " | '" : " & '", 4);
204                 else
205                         sb_add(&db->b, "'", 1);
206
207                 while( *ptr ) {
208                         if (*ptr == '\'')
209                                 sb_add(&db->b, "'", 1);
210                         sb_add(&db->b, ptr, 1);
211                         ptr++;
212                 }
213                 sb_add(&db->b, "'", 1);
214
215                 i++;
216                         
217                 words++;
218         }
219         paramValues[0] = db->b.str;
220
221         res = PQexecPrepared( db->conn, "search_ftsbench",
222                                                  1, paramValues,
223                                                  NULL, NULL, 0);
224
225         if (PQresultStatus(res) != PGRES_TUPLES_OK) {
226                 /* skip error ' all words are a stop word' for GIN index -
227                    result is empty, in any case */
228                 if ( checkEmptyQuery(db, res) == 0 ) { 
229                         fprintf(stderr, "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn));
230                         exit(1);
231                 }
232         }
233         PQclear(res);
234
235         db->emptyquery = 0;
236         pthread_mutex_lock(&(db->db.nqueryMutex));
237         db->db.nquery ++;
238         pthread_mutex_unlock(&(db->db.nqueryMutex));
239 }
240
241 static void
242 startCreateScheme(ftsDB* adb, int flags) {
243         ftsPG *db = (ftsPG*)adb;
244         char    buf[1024];
245         PGresult *res;
246
247         db->flags = flags;
248
249         res = PQexec(db->conn, "DROP TABLE IF EXISTS ftsbench CASCADE;");
250         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
251                 fprintf(stderr, "DROP TABLE command failed: %s", PQerrorMessage(db->conn));
252                         exit(1);
253         }
254         PQclear(res);
255
256         if ( flags & FLG_FUNC ) 
257                 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text );");
258         else
259                 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text, fts     tsvector ); "
260                                         "CREATE TRIGGER tsvectorupdate BEFORE UPDATE OR INSERT ON ftsbench "
261                                         "FOR EACH ROW EXECUTE PROCEDURE tsearch2(fts, body);" );
262
263         res = PQexec(db->conn, buf);
264         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
265                 fprintf(stderr, "CREATE TABLE command failed: %s", PQerrorMessage(db->conn));
266                         exit(1);
267         }
268         PQclear(res);
269
270         res = PQexec(db->conn, "BEGIN;");
271         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
272                 fprintf(stderr, "CREATE TABLE command failed: %s", PQerrorMessage(db->conn));
273                         exit(1);
274         }
275         PQclear(res);
276
277         return;
278 }
279
280 static void
281 finishCreateScheme(ftsDB* adb) {
282         ftsPG *db = (ftsPG*)adb; 
283         PGresult *res;
284
285         if ( db->db.nquery > 0 ) {
286                 waitResult(db);
287                 
288                 if ( PQsetnonblocking(db->conn, 0) != 0 ) {
289                         fprintf(stderr, "PQsetnonblocking command failed: %s", PQerrorMessage(db->conn));
290                         exit(1);
291                 }
292         }
293
294         res = PQexec(db->conn, "COMMIT;");
295         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
296                 fprintf(stderr, "CREATE TABLE command failed: %s", PQerrorMessage(db->conn));
297                         exit(1);
298         }
299         PQclear(res);
300
301         if ( (db->flags & (FLG_GIST | FLG_GIN)) != 0 ) {
302                 char    buf[1024];
303
304                 if ( db->flags & FLG_FUNC ) 
305                         sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( to_tsvector(body) );", 
306                                                 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
307                 else
308                         sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( fts );", 
309                                                 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
310
311                 printf("(create index, ");
312                 fflush(stdout);
313
314                 res = PQexec(db->conn, buf);
315                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
316                         fprintf(stderr, "CREATE INDEX command failed: %s", PQerrorMessage(db->conn));
317                                 exit(1);
318                 }
319                 PQclear(res);
320         } else {
321                 printf("(");
322                 fflush(stdout);
323         }
324
325         printf("vacuum");
326         fflush(stdout);
327
328         res = PQexec(db->conn, "VACUUM ANALYZE ftsbench;");
329         if (PQresultStatus(res) != PGRES_COMMAND_OK) {
330                 fprintf(stderr, "VACUUM ANALYZE command failed: %s", PQerrorMessage(db->conn));
331                 exit(1);
332         }
333         PQclear(res);
334         printf(") ");
335         fflush(stdout);
336
337         return;
338 }
339
340
341 static void
342 InsertRow(ftsDB* adb, int id, char *txt) {
343         ftsPG *db = (ftsPG*)adb;
344         PGresult *res;
345         const char      *paramValues[2];
346         uint32_t    binaryIntVal;
347         int             paramLengths[] = {sizeof(binaryIntVal), 0};
348         int     paramFormats[] = {1, 0};
349
350         if ( db->db.nquery == 0 ) {
351                 /* firsttime */
352                 
353                 res = PQprepare( db->conn, "insert_ftsbench",
354                                                  "INSERT INTO ftsbench (id, body) VALUES ( $1 ::int4, $2 ::text);",
355                                                  2, NULL );
356                 
357                 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
358                         fprintf(stderr, "PREPARE INSERT command failed: %s", PQerrorMessage(db->conn));
359                         exit(1);
360                 }
361                 PQclear(res);
362
363                 if ( PQsetnonblocking(db->conn, 1) != 0 ) {
364                         fprintf(stderr, "PQsetnonblocking command failed: %s", PQerrorMessage(db->conn));
365                         exit(1);
366                 }
367         } else {
368                 waitResult(db);
369         }
370
371         binaryIntVal = htonl((uint32_t) id);
372         paramValues[0] = (char*)&binaryIntVal;
373         paramValues[1] = txt;
374
375         if ( PQsendQueryPrepared( db->conn, "insert_ftsbench",
376                                                  2, paramValues,
377                                                  paramLengths, paramFormats, 0) == 0 ) {
378                  fprintf(stderr, "PQsendQueryPrepared failed: %s", PQerrorMessage(db->conn));
379                  exit(1);
380         }
381
382         pgflush(db);
383
384         db->db.nquery++;
385 }
386
387 ftsDB* 
388 PGInit(char * connstr) {
389         ftsPG   *db = (ftsPG*)malloc(sizeof(ftsPG));
390         char    conninfo[1024];
391
392         memset(db,0,sizeof(ftsPG));
393
394         sprintf(conninfo, "dbname=%s", connstr);
395         db->conn = PQconnectdb(conninfo);
396
397         if (PQstatus(db->conn) != CONNECTION_OK) {
398                 fprintf(stderr, "Connection to database failed: %s", PQerrorMessage(db->conn));
399                 exit(1);
400         }
401
402         db->origreceiver = PQsetNoticeReceiver(db->conn, NoticeReceiver, (void *)db);
403
404         db->db.execQuery = execQuery;
405         db->db.startCreateScheme = startCreateScheme;
406         db->db.finishCreateScheme = finishCreateScheme;
407         db->db.InsertRow = InsertRow;
408         db->socket = PQsocket(db->conn);
409         if ( db->socket < 0 ) {
410                 fprintf(stderr,"Socket error\n");
411                 exit(1);
412         }
413         
414         return (ftsDB*)db;
415 }