Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * copyto.c
4 : : * COPY <table> TO file/program/client
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/commands/copyto.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : : #include "postgres.h"
16 : :
17 : : #include <ctype.h>
18 : : #include <unistd.h>
19 : : #include <sys/stat.h>
20 : :
21 : : #include "access/table.h"
22 : : #include "access/tableam.h"
23 : : #include "catalog/pg_inherits.h"
24 : : #include "commands/copyapi.h"
25 : : #include "commands/progress.h"
26 : : #include "executor/execdesc.h"
27 : : #include "executor/executor.h"
28 : : #include "executor/tuptable.h"
29 : : #include "libpq/libpq.h"
30 : : #include "libpq/pqformat.h"
31 : : #include "mb/pg_wchar.h"
32 : : #include "miscadmin.h"
33 : : #include "pgstat.h"
34 : : #include "storage/fd.h"
35 : : #include "tcop/tcopprot.h"
36 : : #include "utils/lsyscache.h"
37 : : #include "utils/memutils.h"
38 : : #include "utils/rel.h"
39 : : #include "utils/snapmgr.h"
40 : : #include "utils/wait_event.h"
41 : :
42 : : /*
43 : : * Represents the different dest cases we need to worry about at
44 : : * the bottom level
45 : : */
46 : : typedef enum CopyDest
47 : : {
48 : : COPY_FILE, /* to file (or a piped program) */
49 : : COPY_FRONTEND, /* to frontend */
50 : : COPY_CALLBACK, /* to callback function */
51 : : } CopyDest;
52 : :
53 : : /*
54 : : * This struct contains all the state variables used throughout a COPY TO
55 : : * operation.
56 : : *
57 : : * Multi-byte encodings: all supported client-side encodings encode multi-byte
58 : : * characters by having the first byte's high bit set. Subsequent bytes of the
59 : : * character can have the high bit not set. When scanning data in such an
60 : : * encoding to look for a match to a single-byte (ie ASCII) character, we must
61 : : * use the full pg_encoding_mblen() machinery to skip over multibyte
62 : : * characters, else we might find a false match to a trailing byte. In
63 : : * supported server encodings, there is no possibility of a false match, and
64 : : * it's faster to make useless comparisons to trailing bytes than it is to
65 : : * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
66 : : * when we have to do it the hard way.
67 : : */
68 : : typedef struct CopyToStateData
69 : : {
70 : : /* format-specific routines */
71 : : const CopyToRoutine *routine;
72 : :
73 : : /* low-level state data */
74 : : CopyDest copy_dest; /* type of copy source/destination */
75 : : FILE *copy_file; /* used if copy_dest == COPY_FILE */
76 : : StringInfo fe_msgbuf; /* used for all dests during COPY TO */
77 : :
78 : : int file_encoding; /* file or remote side's character encoding */
79 : : bool need_transcoding; /* file encoding diff from server? */
80 : : bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
81 : :
82 : : /* parameters from the COPY command */
83 : : Relation rel; /* relation to copy to */
84 : : QueryDesc *queryDesc; /* executable query to copy from */
85 : : List *attnumlist; /* integer list of attnums to copy */
86 : : char *filename; /* filename, or NULL for STDOUT */
87 : : bool is_program; /* is 'filename' a program to popen? */
88 : : copy_data_dest_cb data_dest_cb; /* function for writing data */
89 : :
90 : : CopyFormatOptions opts;
91 : : Node *whereClause; /* WHERE condition (or NULL) */
92 : : List *partitions; /* OID list of partitions to copy data from */
93 : :
94 : : /*
95 : : * Working state
96 : : */
97 : : MemoryContext copycontext; /* per-copy execution context */
98 : :
99 : : FmgrInfo *out_functions; /* lookup info for output functions */
100 : : MemoryContext rowcontext; /* per-row evaluation context */
101 : : uint64 bytes_processed; /* number of bytes processed so far */
102 : : } CopyToStateData;
103 : :
104 : : /* DestReceiver for COPY (query) TO */
105 : : typedef struct
106 : : {
107 : : DestReceiver pub; /* publicly-known function pointers */
108 : : CopyToState cstate; /* CopyToStateData for the command */
109 : : uint64 processed; /* # of tuples processed */
110 : : } DR_copy;
111 : :
112 : : /* NOTE: there's a copy of this in copyfromparse.c */
113 : : static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
114 : :
115 : :
116 : : /* non-export function prototypes */
117 : : static void EndCopy(CopyToState cstate);
118 : : static void ClosePipeToProgram(CopyToState cstate);
119 : : static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot);
120 : : static void CopyAttributeOutText(CopyToState cstate, const char *string);
121 : : static void CopyAttributeOutCSV(CopyToState cstate, const char *string,
122 : : bool use_quote);
123 : : static void CopyRelationTo(CopyToState cstate, Relation rel, Relation root_rel,
124 : : uint64 *processed);
125 : :
126 : : /* built-in format-specific routines */
127 : : static void CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc);
128 : : static void CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo);
129 : : static void CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot);
130 : : static void CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot);
131 : : static void CopyToTextLikeOneRow(CopyToState cstate, TupleTableSlot *slot,
132 : : bool is_csv);
133 : : static void CopyToTextLikeEnd(CopyToState cstate);
134 : : static void CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc);
135 : : static void CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo);
136 : : static void CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot);
137 : : static void CopyToBinaryEnd(CopyToState cstate);
138 : :
139 : : /* Low-level communications functions */
140 : : static void SendCopyBegin(CopyToState cstate);
141 : : static void SendCopyEnd(CopyToState cstate);
142 : : static void CopySendData(CopyToState cstate, const void *databuf, int datasize);
143 : : static void CopySendString(CopyToState cstate, const char *str);
144 : : static void CopySendChar(CopyToState cstate, char c);
145 : : static void CopySendEndOfRow(CopyToState cstate);
146 : : static void CopySendTextLikeEndOfRow(CopyToState cstate);
147 : : static void CopySendInt32(CopyToState cstate, int32 val);
148 : : static void CopySendInt16(CopyToState cstate, int16 val);
149 : :
150 : : /*
151 : : * COPY TO routines for built-in formats.
152 : : *
153 : : * CSV and text formats share the same TextLike routines except for the
154 : : * one-row callback.
155 : : */
156 : :
157 : : /* text format */
158 : : static const CopyToRoutine CopyToRoutineText = {
159 : : .CopyToStart = CopyToTextLikeStart,
160 : : .CopyToOutFunc = CopyToTextLikeOutFunc,
161 : : .CopyToOneRow = CopyToTextOneRow,
162 : : .CopyToEnd = CopyToTextLikeEnd,
163 : : };
164 : :
165 : : /* CSV format */
166 : : static const CopyToRoutine CopyToRoutineCSV = {
167 : : .CopyToStart = CopyToTextLikeStart,
168 : : .CopyToOutFunc = CopyToTextLikeOutFunc,
169 : : .CopyToOneRow = CopyToCSVOneRow,
170 : : .CopyToEnd = CopyToTextLikeEnd,
171 : : };
172 : :
173 : : /* binary format */
174 : : static const CopyToRoutine CopyToRoutineBinary = {
175 : : .CopyToStart = CopyToBinaryStart,
176 : : .CopyToOutFunc = CopyToBinaryOutFunc,
177 : : .CopyToOneRow = CopyToBinaryOneRow,
178 : : .CopyToEnd = CopyToBinaryEnd,
179 : : };
180 : :
181 : : /* Return a COPY TO routine for the given options */
182 : : static const CopyToRoutine *
376 msawada@postgresql.o 183 :CBC 4861 : CopyToGetRoutine(const CopyFormatOptions *opts)
184 : : {
185 [ + + ]: 4861 : if (opts->csv_mode)
381 186 : 66 : return &CopyToRoutineCSV;
376 187 [ + + ]: 4795 : else if (opts->binary)
381 188 : 8 : return &CopyToRoutineBinary;
189 : :
190 : : /* default is text */
191 : 4787 : return &CopyToRoutineText;
192 : : }
193 : :
194 : : /* Implementation of the start callback for text and CSV formats */
195 : : static void
196 : 4786 : CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc)
197 : : {
198 : : /*
199 : : * For non-binary copy, we need to convert null_print to file encoding,
200 : : * because it will be sent directly with CopySendString.
201 : : */
202 [ + + ]: 4786 : if (cstate->need_transcoding)
203 : 7 : cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
204 : : cstate->opts.null_print_len,
205 : : cstate->file_encoding);
206 : :
207 : : /* if a header has been requested send the line */
255 fujii@postgresql.org 208 [ + + ]:GNC 4786 : if (cstate->opts.header_line == COPY_HEADER_TRUE)
209 : : {
210 : : ListCell *cur;
381 msawada@postgresql.o 211 :CBC 18 : bool hdr_delim = false;
212 : :
213 [ + - + + : 48 : foreach(cur, cstate->attnumlist)
+ + ]
214 : : {
215 : 30 : int attnum = lfirst_int(cur);
216 : : char *colname;
217 : :
218 [ + + ]: 30 : if (hdr_delim)
219 : 12 : CopySendChar(cstate, cstate->opts.delim[0]);
220 : 30 : hdr_delim = true;
221 : :
222 : 30 : colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
223 : :
224 [ + + ]: 30 : if (cstate->opts.csv_mode)
225 : 12 : CopyAttributeOutCSV(cstate, colname, false);
226 : : else
227 : 18 : CopyAttributeOutText(cstate, colname);
228 : : }
229 : :
230 : 18 : CopySendTextLikeEndOfRow(cstate);
231 : : }
232 : 4786 : }
233 : :
234 : : /*
235 : : * Implementation of the outfunc callback for text and CSV formats. Assign
236 : : * the output function data to the given *finfo.
237 : : */
238 : : static void
239 : 16292 : CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
240 : : {
241 : : Oid func_oid;
242 : : bool is_varlena;
243 : :
244 : : /* Set output function for an attribute */
245 : 16292 : getTypeOutputInfo(atttypid, &func_oid, &is_varlena);
246 : 16292 : fmgr_info(func_oid, finfo);
247 : 16292 : }
248 : :
249 : : /* Implementation of the per-row callback for text format */
250 : : static void
251 : 1834412 : CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot)
252 : : {
253 : 1834412 : CopyToTextLikeOneRow(cstate, slot, false);
254 : 1834412 : }
255 : :
256 : : /* Implementation of the per-row callback for CSV format */
257 : : static void
258 : 168 : CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot)
259 : : {
260 : 168 : CopyToTextLikeOneRow(cstate, slot, true);
261 : 168 : }
262 : :
263 : : /*
264 : : * Workhorse for CopyToTextOneRow() and CopyToCSVOneRow().
265 : : *
266 : : * We use pg_attribute_always_inline to reduce function call overhead
267 : : * and to help compilers to optimize away the 'is_csv' condition.
268 : : */
269 : : static pg_attribute_always_inline void
270 : 1834580 : CopyToTextLikeOneRow(CopyToState cstate,
271 : : TupleTableSlot *slot,
272 : : bool is_csv)
273 : : {
274 : 1834580 : bool need_delim = false;
275 : 1834580 : FmgrInfo *out_functions = cstate->out_functions;
276 : :
277 [ + + + + : 10781575 : foreach_int(attnum, cstate->attnumlist)
+ + ]
278 : : {
279 : 7112415 : Datum value = slot->tts_values[attnum - 1];
280 : 7112415 : bool isnull = slot->tts_isnull[attnum - 1];
281 : :
282 [ + + ]: 7112415 : if (need_delim)
283 : 5277900 : CopySendChar(cstate, cstate->opts.delim[0]);
284 : 7112415 : need_delim = true;
285 : :
286 [ + + ]: 7112415 : if (isnull)
287 : : {
288 : 600618 : CopySendString(cstate, cstate->opts.null_print_client);
289 : : }
290 : : else
291 : : {
292 : : char *string;
293 : :
294 : 6511797 : string = OutputFunctionCall(&out_functions[attnum - 1],
295 : : value);
296 : :
297 [ + + ]: 6511797 : if (is_csv)
298 : 300 : CopyAttributeOutCSV(cstate, string,
299 : 300 : cstate->opts.force_quote_flags[attnum - 1]);
300 : : else
301 : 6511497 : CopyAttributeOutText(cstate, string);
302 : : }
303 : : }
304 : :
305 : 1834580 : CopySendTextLikeEndOfRow(cstate);
306 : 1834580 : }
307 : :
308 : : /* Implementation of the end callback for text and CSV formats */
309 : : static void
310 : 4786 : CopyToTextLikeEnd(CopyToState cstate)
311 : : {
312 : : /* Nothing to do here */
313 : 4786 : }
314 : :
315 : : /*
316 : : * Implementation of the start callback for binary format. Send a header
317 : : * for a binary copy.
318 : : */
319 : : static void
320 : 7 : CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc)
321 : : {
322 : : int32 tmp;
323 : :
324 : : /* Signature */
325 : 7 : CopySendData(cstate, BinarySignature, 11);
326 : : /* Flags field */
327 : 7 : tmp = 0;
328 : 7 : CopySendInt32(cstate, tmp);
329 : : /* No header extension */
330 : 7 : tmp = 0;
331 : 7 : CopySendInt32(cstate, tmp);
332 : 7 : }
333 : :
334 : : /*
335 : : * Implementation of the outfunc callback for binary format. Assign
336 : : * the binary output function to the given *finfo.
337 : : */
338 : : static void
339 : 31 : CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
340 : : {
341 : : Oid func_oid;
342 : : bool is_varlena;
343 : :
344 : : /* Set output function for an attribute */
345 : 31 : getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena);
346 : 30 : fmgr_info(func_oid, finfo);
347 : 30 : }
348 : :
349 : : /* Implementation of the per-row callback for binary format */
350 : : static void
351 : 16 : CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
352 : : {
353 : 16 : FmgrInfo *out_functions = cstate->out_functions;
354 : :
355 : : /* Binary per-tuple header */
356 : 16 : CopySendInt16(cstate, list_length(cstate->attnumlist));
357 : :
358 [ + - + + : 112 : foreach_int(attnum, cstate->attnumlist)
+ + ]
359 : : {
360 : 80 : Datum value = slot->tts_values[attnum - 1];
361 : 80 : bool isnull = slot->tts_isnull[attnum - 1];
362 : :
363 [ + + ]: 80 : if (isnull)
364 : : {
365 : 15 : CopySendInt32(cstate, -1);
366 : : }
367 : : else
368 : : {
369 : : bytea *outputbytes;
370 : :
371 : 65 : outputbytes = SendFunctionCall(&out_functions[attnum - 1],
372 : : value);
373 : 65 : CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
374 : 65 : CopySendData(cstate, VARDATA(outputbytes),
375 : 65 : VARSIZE(outputbytes) - VARHDRSZ);
376 : : }
377 : : }
378 : :
379 : 16 : CopySendEndOfRow(cstate);
380 : 16 : }
381 : :
382 : : /* Implementation of the end callback for binary format */
383 : : static void
384 : 7 : CopyToBinaryEnd(CopyToState cstate)
385 : : {
386 : : /* Generate trailer for a binary copy */
387 : 7 : CopySendInt16(cstate, -1);
388 : : /* Need to flush out the trailer */
389 : 7 : CopySendEndOfRow(cstate);
390 : 7 : }
391 : :
392 : : /*
393 : : * Send copy start/stop messages for frontend copies. These have changed
394 : : * in past protocol redesigns.
395 : : */
396 : : static void
1938 heikki.linnakangas@i 397 : 4756 : SendCopyBegin(CopyToState cstate)
398 : : {
399 : : StringInfoData buf;
1837 400 : 4756 : int natts = list_length(cstate->attnumlist);
401 : 4756 : int16 format = (cstate->opts.binary ? 1 : 0);
402 : : int i;
403 : :
936 nathan@postgresql.or 404 : 4756 : pq_beginmessage(&buf, PqMsg_CopyOutResponse);
1837 heikki.linnakangas@i 405 : 4756 : pq_sendbyte(&buf, format); /* overall format */
406 : 4756 : pq_sendint16(&buf, natts);
407 [ + + ]: 20957 : for (i = 0; i < natts; i++)
408 : 16201 : pq_sendint16(&buf, format); /* per-column formats */
409 : 4756 : pq_endmessage(&buf);
410 : 4756 : cstate->copy_dest = COPY_FRONTEND;
1938 411 : 4756 : }
412 : :
413 : : static void
414 : 4755 : SendCopyEnd(CopyToState cstate)
415 : : {
416 : : /* Shouldn't have any unsent data */
1837 417 [ - + ]: 4755 : Assert(cstate->fe_msgbuf->len == 0);
418 : : /* Send Copy Done message */
936 nathan@postgresql.or 419 : 4755 : pq_putemptymessage(PqMsg_CopyDone);
1938 heikki.linnakangas@i 420 : 4755 : }
421 : :
422 : : /*----------
423 : : * CopySendData sends output data to the destination (file or frontend)
424 : : * CopySendString does the same for null-terminated strings
425 : : * CopySendChar does the same for single characters
426 : : * CopySendEndOfRow does the appropriate thing at end of each data row
427 : : * (data is not actually flushed except by CopySendEndOfRow)
428 : : *
429 : : * NB: no data conversion is applied by these functions
430 : : *----------
431 : : */
432 : : static void
433 : 6411731 : CopySendData(CopyToState cstate, const void *databuf, int datasize)
434 : : {
435 : 6411731 : appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
436 : 6411731 : }
437 : :
438 : : static void
439 : 600765 : CopySendString(CopyToState cstate, const char *str)
440 : : {
441 : 600765 : appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
442 : 600765 : }
443 : :
444 : : static void
445 : 7129064 : CopySendChar(CopyToState cstate, char c)
446 : : {
447 [ + + ]: 7129064 : appendStringInfoCharMacro(cstate->fe_msgbuf, c);
448 : 7129064 : }
449 : :
450 : : static void
451 : 1834621 : CopySendEndOfRow(CopyToState cstate)
452 : : {
453 : 1834621 : StringInfo fe_msgbuf = cstate->fe_msgbuf;
454 : :
455 [ + + + - ]: 1834621 : switch (cstate->copy_dest)
456 : : {
457 : 6147 : case COPY_FILE:
40 michael@paquier.xyz 458 :GNC 6147 : pgstat_report_wait_start(WAIT_EVENT_COPY_TO_WRITE);
1938 heikki.linnakangas@i 459 [ + - ]:CBC 6147 : if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
460 [ - + ]: 6147 : cstate->copy_file) != 1 ||
461 : 6147 : ferror(cstate->copy_file))
462 : : {
1938 heikki.linnakangas@i 463 [ # # ]:UBC 0 : if (cstate->is_program)
464 : : {
465 [ # # ]: 0 : if (errno == EPIPE)
466 : : {
467 : : /*
468 : : * The pipe will be closed automatically on error at
469 : : * the end of transaction, but we might get a better
470 : : * error message from the subprocess' exit code than
471 : : * just "Broken Pipe"
472 : : */
473 : 0 : ClosePipeToProgram(cstate);
474 : :
475 : : /*
476 : : * If ClosePipeToProgram() didn't throw an error, the
477 : : * program terminated normally, but closed the pipe
478 : : * first. Restore errno, and throw an error.
479 : : */
480 : 0 : errno = EPIPE;
481 : : }
482 [ # # ]: 0 : ereport(ERROR,
483 : : (errcode_for_file_access(),
484 : : errmsg("could not write to COPY program: %m")));
485 : : }
486 : : else
487 [ # # ]: 0 : ereport(ERROR,
488 : : (errcode_for_file_access(),
489 : : errmsg("could not write to COPY file: %m")));
490 : : }
40 michael@paquier.xyz 491 :GNC 6147 : pgstat_report_wait_end();
1938 heikki.linnakangas@i 492 :CBC 6147 : break;
1837 493 : 1828471 : case COPY_FRONTEND:
494 : : /* Dump the accumulated row as one CopyData message */
936 nathan@postgresql.or 495 : 1828471 : (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
1938 heikki.linnakangas@i 496 : 1828471 : break;
1251 michael@paquier.xyz 497 : 3 : case COPY_CALLBACK:
498 : 3 : cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
499 : 3 : break;
500 : : }
501 : :
502 : : /* Update the progress */
1894 tomas.vondra@postgre 503 : 1834621 : cstate->bytes_processed += fe_msgbuf->len;
504 : 1834621 : pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
505 : :
1938 heikki.linnakangas@i 506 : 1834621 : resetStringInfo(fe_msgbuf);
507 : 1834621 : }
508 : :
509 : : /*
510 : : * Wrapper function of CopySendEndOfRow for text and CSV formats. Sends the
511 : : * line termination and do common appropriate things for the end of row.
512 : : */
513 : : static inline void
381 msawada@postgresql.o 514 : 1834598 : CopySendTextLikeEndOfRow(CopyToState cstate)
515 : : {
516 [ + + + ]: 1834598 : switch (cstate->copy_dest)
517 : : {
518 : 6135 : case COPY_FILE:
519 : : /* Default line termination depends on platform */
520 : : #ifndef WIN32
521 : 6135 : CopySendChar(cstate, '\n');
522 : : #else
523 : : CopySendString(cstate, "\r\n");
524 : : #endif
525 : 6135 : break;
526 : 1828460 : case COPY_FRONTEND:
527 : : /* The FE/BE protocol uses \n as newline for all platforms */
528 : 1828460 : CopySendChar(cstate, '\n');
529 : 1828460 : break;
530 : 3 : default:
531 : 3 : break;
532 : : }
533 : :
534 : : /* Now take the actions related to the end of a row */
535 : 1834598 : CopySendEndOfRow(cstate);
536 : 1834598 : }
537 : :
538 : : /*
539 : : * These functions do apply some data conversion
540 : : */
541 : :
542 : : /*
543 : : * CopySendInt32 sends an int32 in network byte order
544 : : */
545 : : static inline void
1938 heikki.linnakangas@i 546 : 94 : CopySendInt32(CopyToState cstate, int32 val)
547 : : {
548 : : uint32 buf;
549 : :
550 : 94 : buf = pg_hton32((uint32) val);
551 : 94 : CopySendData(cstate, &buf, sizeof(buf));
552 : 94 : }
553 : :
554 : : /*
555 : : * CopySendInt16 sends an int16 in network byte order
556 : : */
557 : : static inline void
558 : 23 : CopySendInt16(CopyToState cstate, int16 val)
559 : : {
560 : : uint16 buf;
561 : :
562 : 23 : buf = pg_hton16((uint16) val);
563 : 23 : CopySendData(cstate, &buf, sizeof(buf));
564 : 23 : }
565 : :
566 : : /*
567 : : * Closes the pipe to an external program, checking the pclose() return code.
568 : : */
569 : : static void
1938 heikki.linnakangas@i 570 :UBC 0 : ClosePipeToProgram(CopyToState cstate)
571 : : {
572 : : int pclose_rc;
573 : :
574 [ # # ]: 0 : Assert(cstate->is_program);
575 : :
576 : 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
577 [ # # ]: 0 : if (pclose_rc == -1)
578 [ # # ]: 0 : ereport(ERROR,
579 : : (errcode_for_file_access(),
580 : : errmsg("could not close pipe to external command: %m")));
581 [ # # ]: 0 : else if (pclose_rc != 0)
582 : : {
583 [ # # ]: 0 : ereport(ERROR,
584 : : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
585 : : errmsg("program \"%s\" failed",
586 : : cstate->filename),
587 : : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
588 : : }
589 : 0 : }
590 : :
591 : : /*
592 : : * Release resources allocated in a cstate for COPY TO.
593 : : */
594 : : static void
1938 heikki.linnakangas@i 595 :CBC 4793 : EndCopy(CopyToState cstate)
596 : : {
597 [ - + ]: 4793 : if (cstate->is_program)
598 : : {
1938 heikki.linnakangas@i 599 :UBC 0 : ClosePipeToProgram(cstate);
600 : : }
601 : : else
602 : : {
1938 heikki.linnakangas@i 603 [ + + - + ]:CBC 4793 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1938 heikki.linnakangas@i 604 [ # # ]:UBC 0 : ereport(ERROR,
605 : : (errcode_for_file_access(),
606 : : errmsg("could not close file \"%s\": %m",
607 : : cstate->filename)));
608 : : }
609 : :
1894 tomas.vondra@postgre 610 :CBC 4793 : pgstat_progress_end_command();
611 : :
1938 heikki.linnakangas@i 612 : 4793 : MemoryContextDelete(cstate->copycontext);
613 : :
146 msawada@postgresql.o 614 [ + + ]:GNC 4793 : if (cstate->partitions)
615 : 17 : list_free(cstate->partitions);
616 : :
1938 heikki.linnakangas@i 617 :CBC 4793 : pfree(cstate);
618 : 4793 : }
619 : :
620 : : /*
621 : : * Setup CopyToState to read tuples from a table or a query for COPY TO.
622 : : *
623 : : * 'rel': Relation to be copied
624 : : * 'raw_query': Query whose results are to be copied
625 : : * 'queryRelId': OID of base relation to convert to a query (for RLS)
626 : : * 'filename': Name of server-local file to write, NULL for STDOUT
627 : : * 'is_program': true if 'filename' is program to execute
628 : : * 'data_dest_cb': Callback that processes the output data
629 : : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
630 : : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
631 : : *
632 : : * Returns a CopyToState, to be passed to DoCopyTo() and related functions.
633 : : */
634 : : CopyToState
635 : 4904 : BeginCopyTo(ParseState *pstate,
636 : : Relation rel,
637 : : RawStmt *raw_query,
638 : : Oid queryRelId,
639 : : const char *filename,
640 : : bool is_program,
641 : : copy_data_dest_cb data_dest_cb,
642 : : List *attnamelist,
643 : : List *options)
644 : : {
645 : : CopyToState cstate;
1251 michael@paquier.xyz 646 [ + + + + ]: 4904 : bool pipe = (filename == NULL && data_dest_cb == NULL);
647 : : TupleDesc tupDesc;
648 : : int num_phys_attrs;
649 : : MemoryContext oldcontext;
1832 650 : 4904 : const int progress_cols[] = {
651 : : PROGRESS_COPY_COMMAND,
652 : : PROGRESS_COPY_TYPE
653 : : };
654 : 4904 : int64 progress_vals[] = {
655 : : PROGRESS_COPY_COMMAND_TO,
656 : : 0
657 : : };
146 msawada@postgresql.o 658 :GNC 4904 : List *children = NIL;
659 : :
1938 heikki.linnakangas@i 660 [ + + + + ]:CBC 4904 : if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
661 : : {
662 [ + + ]: 30 : if (rel->rd_rel->relkind == RELKIND_VIEW)
663 [ + - ]: 6 : ereport(ERROR,
664 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
665 : : errmsg("cannot copy from view \"%s\"",
666 : : RelationGetRelationName(rel)),
667 : : errhint("Try the COPY (SELECT ...) TO variant.")));
668 [ + + ]: 24 : else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
669 : : {
345 fujii@postgresql.org 670 [ + + ]: 6 : if (!RelationIsPopulated(rel))
671 [ + - ]: 3 : ereport(ERROR,
672 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
673 : : errmsg("cannot copy from unpopulated materialized view \"%s\"",
674 : : RelationGetRelationName(rel)),
675 : : errhint("Use the REFRESH MATERIALIZED VIEW command."));
676 : : }
1938 heikki.linnakangas@i 677 [ - + ]:GBC 18 : else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1938 heikki.linnakangas@i 678 [ # # ]:UBC 0 : ereport(ERROR,
679 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
680 : : errmsg("cannot copy from foreign table \"%s\"",
681 : : RelationGetRelationName(rel)),
682 : : errhint("Try the COPY (SELECT ...) TO variant.")));
1938 heikki.linnakangas@i 683 [ - + ]:GBC 18 : else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1938 heikki.linnakangas@i 684 [ # # ]:UBC 0 : ereport(ERROR,
685 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
686 : : errmsg("cannot copy from sequence \"%s\"",
687 : : RelationGetRelationName(rel))));
1938 heikki.linnakangas@i 688 [ + - ]:GBC 18 : else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
689 : : {
690 : : /*
691 : : * Collect OIDs of relation containing data, so that later
692 : : * DoCopyTo can copy the data from them.
693 : : */
146 msawada@postgresql.o 694 :GNC 18 : children = find_all_inheritors(RelationGetRelid(rel), AccessShareLock, NULL);
695 : :
696 [ + - + + : 91 : foreach_oid(child, children)
+ + ]
697 : : {
698 : 57 : char relkind = get_rel_relkind(child);
699 : :
700 [ + + ]: 57 : if (relkind == RELKIND_FOREIGN_TABLE)
701 : : {
702 : 1 : char *relation_name = get_rel_name(child);
703 : :
704 [ + - ]: 1 : ereport(ERROR,
705 : : errcode(ERRCODE_WRONG_OBJECT_TYPE),
706 : : errmsg("cannot copy from foreign table \"%s\"", relation_name),
707 : : errdetail("Partition \"%s\" is a foreign table in partitioned table \"%s\"",
708 : : relation_name, RelationGetRelationName(rel)),
709 : : errhint("Try the COPY (SELECT ...) TO variant."));
710 : : }
711 : :
712 : : /* Exclude tables with no data */
713 [ + + - + ]: 56 : if (RELKIND_HAS_PARTITIONS(relkind))
714 : 26 : children = foreach_delete_current(children, child);
715 : : }
716 : : }
717 : : else
1938 heikki.linnakangas@i 718 [ # # ]:UBC 0 : ereport(ERROR,
719 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
720 : : errmsg("cannot copy from non-table relation \"%s\"",
721 : : RelationGetRelationName(rel))));
722 : : }
723 : :
724 : :
725 : : /* Allocate workspace and zero all fields */
95 michael@paquier.xyz 726 :GNC 4894 : cstate = palloc0_object(CopyToStateData);
727 : :
728 : : /*
729 : : * We allocate everything used by a cstate in a new memory context. This
730 : : * avoids memory leaks during repeated use of COPY in a query.
731 : : */
1938 heikki.linnakangas@i 732 :CBC 4894 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
733 : : "COPY",
734 : : ALLOCSET_DEFAULT_SIZES);
735 : :
736 : 4894 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
737 : :
738 : : /* Extract options from the statement node tree */
1768 tgl@sss.pgh.pa.us 739 : 4894 : ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
740 : :
741 : : /* Set format routine */
376 msawada@postgresql.o 742 : 4861 : cstate->routine = CopyToGetRoutine(&cstate->opts);
743 : :
744 : : /* Process the source/target relation or query */
1938 heikki.linnakangas@i 745 [ + + ]: 4861 : if (rel)
746 : : {
747 [ - + ]: 4566 : Assert(!raw_query);
748 : :
749 : 4566 : cstate->rel = rel;
750 : :
751 : 4566 : tupDesc = RelationGetDescr(cstate->rel);
146 msawada@postgresql.o 752 :GNC 4566 : cstate->partitions = children;
753 : : }
754 : : else
755 : : {
756 : : List *rewritten;
757 : : Query *query;
758 : : PlannedStmt *plan;
759 : : DestReceiver *dest;
760 : :
1938 heikki.linnakangas@i 761 :CBC 295 : cstate->rel = NULL;
146 msawada@postgresql.o 762 :GNC 295 : cstate->partitions = NIL;
763 : :
764 : : /*
765 : : * Run parse analysis and rewrite. Note this also acquires sufficient
766 : : * locks on the source table(s).
767 : : */
1472 peter@eisentraut.org 768 :CBC 295 : rewritten = pg_analyze_and_rewrite_fixedparams(raw_query,
769 : : pstate->p_sourcetext, NULL, 0,
770 : : NULL);
771 : :
772 : : /* check that we got back something we can work with */
1938 heikki.linnakangas@i 773 [ + + ]: 289 : if (rewritten == NIL)
774 : : {
775 [ + - ]: 9 : ereport(ERROR,
776 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
777 : : errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
778 : : }
779 [ + + ]: 280 : else if (list_length(rewritten) > 1)
780 : : {
781 : : ListCell *lc;
782 : :
783 : : /* examine queries to determine which error message to issue */
784 [ + - + + : 51 : foreach(lc, rewritten)
+ + ]
785 : : {
786 : 42 : Query *q = lfirst_node(Query, lc);
787 : :
788 [ + + ]: 42 : if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
789 [ + - ]: 9 : ereport(ERROR,
790 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
791 : : errmsg("conditional DO INSTEAD rules are not supported for COPY")));
792 [ + + ]: 33 : if (q->querySource == QSRC_NON_INSTEAD_RULE)
793 [ + - ]: 9 : ereport(ERROR,
794 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
795 : : errmsg("DO ALSO rules are not supported for COPY")));
796 : : }
797 : :
798 [ + - ]: 9 : ereport(ERROR,
799 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
800 : : errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
801 : : }
802 : :
803 : 253 : query = linitial_node(Query, rewritten);
804 : :
805 : : /* The grammar allows SELECT INTO, but we don't support that */
806 [ + + ]: 253 : if (query->utilityStmt != NULL &&
807 [ + + ]: 9 : IsA(query->utilityStmt, CreateTableAsStmt))
808 [ + - ]: 6 : ereport(ERROR,
809 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
810 : : errmsg("COPY (SELECT INTO) is not supported")));
811 : :
812 : : /* The only other utility command we could see is NOTIFY */
510 tgl@sss.pgh.pa.us 813 [ + + ]: 247 : if (query->utilityStmt != NULL)
814 [ + - ]: 3 : ereport(ERROR,
815 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
816 : : errmsg("COPY query must not be a utility command")));
817 : :
818 : : /*
819 : : * Similarly the grammar doesn't enforce the presence of a RETURNING
820 : : * clause, but this is required here.
821 : : */
1938 heikki.linnakangas@i 822 [ + + ]: 244 : if (query->commandType != CMD_SELECT &&
823 [ + + ]: 55 : query->returningList == NIL)
824 : : {
825 [ + + + + : 12 : Assert(query->commandType == CMD_INSERT ||
+ + - + ]
826 : : query->commandType == CMD_UPDATE ||
827 : : query->commandType == CMD_DELETE ||
828 : : query->commandType == CMD_MERGE);
829 : :
830 [ + - ]: 12 : ereport(ERROR,
831 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
832 : : errmsg("COPY query must have a RETURNING clause")));
833 : : }
834 : :
835 : : /* plan the query */
836 : 232 : plan = pg_plan_query(query, pstate->p_sourcetext,
837 : : CURSOR_OPT_PARALLEL_OK, NULL, NULL);
838 : :
839 : : /*
840 : : * With row-level security and a user using "COPY relation TO", we
841 : : * have to convert the "COPY relation TO" to a query-based COPY (eg:
842 : : * "COPY (SELECT * FROM ONLY relation) TO"), to allow the rewriter to
843 : : * add in any RLS clauses.
844 : : *
845 : : * When this happens, we are passed in the relid of the originally
846 : : * found relation (which we have locked). As the planner will look up
847 : : * the relation again, we double-check here to make sure it found the
848 : : * same one that we have locked.
849 : : */
850 [ + + ]: 231 : if (queryRelId != InvalidOid)
851 : : {
852 : : /*
853 : : * Note that with RLS involved there may be multiple relations,
854 : : * and while the one we need is almost certainly first, we don't
855 : : * make any guarantees of that in the planner, so check the whole
856 : : * list and make sure we find the original relation.
857 : : */
858 [ - + ]: 39 : if (!list_member_oid(plan->relationOids, queryRelId))
1938 heikki.linnakangas@i 859 [ # # ]:UBC 0 : ereport(ERROR,
860 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
861 : : errmsg("relation referenced by COPY statement has changed")));
862 : : }
863 : :
864 : : /*
865 : : * Use a snapshot with an updated command ID to ensure this query sees
866 : : * results of any previously executed queries.
867 : : */
1938 heikki.linnakangas@i 868 :CBC 231 : PushCopiedSnapshot(GetActiveSnapshot());
869 : 231 : UpdateActiveSnapshotCommandId();
870 : :
871 : : /* Create dest receiver for COPY OUT */
872 : 231 : dest = CreateDestReceiver(DestCopyOut);
873 : 231 : ((DR_copy *) dest)->cstate = cstate;
874 : :
875 : : /* Create a QueryDesc requesting no output */
297 amitlan@postgresql.o 876 : 231 : cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
877 : : GetActiveSnapshot(),
878 : : InvalidSnapshot,
879 : : dest, NULL, NULL, 0);
880 : :
881 : : /*
882 : : * Call ExecutorStart to prepare the plan for execution.
883 : : *
884 : : * ExecutorStart computes a result tupdesc for us
885 : : */
886 : 231 : ExecutorStart(cstate->queryDesc, 0);
887 : :
1938 heikki.linnakangas@i 888 : 228 : tupDesc = cstate->queryDesc->tupDesc;
889 : : }
890 : :
891 : : /* Generate or convert list of attributes to process */
892 : 4794 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
893 : :
894 : 4794 : num_phys_attrs = tupDesc->natts;
895 : :
896 : : /* Convert FORCE_QUOTE name list to per-column flags, check validity */
897 : 4794 : cstate->opts.force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
898 [ + + ]: 4794 : if (cstate->opts.force_quote_all)
899 : : {
897 andrew@dunslane.net 900 [ + - - + : 9 : MemSet(cstate->opts.force_quote_flags, true, num_phys_attrs * sizeof(bool));
- - - - -
- ]
901 : : }
1938 heikki.linnakangas@i 902 [ + + ]: 4785 : else if (cstate->opts.force_quote)
903 : : {
904 : : List *attnums;
905 : : ListCell *cur;
906 : :
907 : 12 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_quote);
908 : :
909 [ + - + + : 24 : foreach(cur, attnums)
+ + ]
910 : : {
911 : 12 : int attnum = lfirst_int(cur);
912 : 12 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
913 : :
914 [ - + ]: 12 : if (!list_member_int(cstate->attnumlist, attnum))
1938 heikki.linnakangas@i 915 [ # # ]:UBC 0 : ereport(ERROR,
916 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
917 : : /*- translator: %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
918 : : errmsg("%s column \"%s\" not referenced by COPY",
919 : : "FORCE_QUOTE", NameStr(attr->attname))));
1938 heikki.linnakangas@i 920 :CBC 12 : cstate->opts.force_quote_flags[attnum - 1] = true;
921 : : }
922 : : }
923 : :
924 : : /* Use client encoding when ENCODING option is not specified. */
925 [ + + ]: 4794 : if (cstate->opts.file_encoding < 0)
926 : 4779 : cstate->file_encoding = pg_get_client_encoding();
927 : : else
928 : 15 : cstate->file_encoding = cstate->opts.file_encoding;
929 : :
930 : : /*
931 : : * Set up encoding conversion info if the file and server encodings differ
932 : : * (see also pg_server_to_any).
933 : : */
765 michael@paquier.xyz 934 [ + + ]: 4794 : if (cstate->file_encoding == GetDatabaseEncoding() ||
935 [ + + ]: 10 : cstate->file_encoding == PG_SQL_ASCII)
936 : 4787 : cstate->need_transcoding = false;
937 : : else
938 : 7 : cstate->need_transcoding = true;
939 : :
940 : : /* See Multibyte encoding comment above */
1938 heikki.linnakangas@i 941 [ + + + - ]: 4794 : cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
942 : :
943 : 4794 : cstate->copy_dest = COPY_FILE; /* default */
944 : :
1251 michael@paquier.xyz 945 [ + + ]: 4794 : if (data_dest_cb)
946 : : {
947 : 1 : progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
948 : 1 : cstate->copy_dest = COPY_CALLBACK;
949 : 1 : cstate->data_dest_cb = data_dest_cb;
950 : : }
951 [ + + ]: 4793 : else if (pipe)
952 : : {
1832 953 : 4756 : progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
954 : :
1938 heikki.linnakangas@i 955 [ - + ]: 4756 : Assert(!is_program); /* the grammar does not allow this */
956 [ - + ]: 4756 : if (whereToSendOutput != DestRemote)
1938 heikki.linnakangas@i 957 :UBC 0 : cstate->copy_file = stdout;
958 : : }
959 : : else
960 : : {
1938 heikki.linnakangas@i 961 :CBC 37 : cstate->filename = pstrdup(filename);
962 : 37 : cstate->is_program = is_program;
963 : :
964 [ - + ]: 37 : if (is_program)
965 : : {
1832 michael@paquier.xyz 966 :UBC 0 : progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1938 heikki.linnakangas@i 967 : 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
968 [ # # ]: 0 : if (cstate->copy_file == NULL)
969 [ # # ]: 0 : ereport(ERROR,
970 : : (errcode_for_file_access(),
971 : : errmsg("could not execute command \"%s\": %m",
972 : : cstate->filename)));
973 : : }
974 : : else
975 : : {
976 : : mode_t oumask; /* Pre-existing umask value */
977 : : struct stat st;
978 : :
1832 michael@paquier.xyz 979 :CBC 37 : progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
980 : :
981 : : /*
982 : : * Prevent write to relative path ... too easy to shoot oneself in
983 : : * the foot by overwriting a database file ...
984 : : */
1938 heikki.linnakangas@i 985 [ - + ]: 37 : if (!is_absolute_path(filename))
1938 heikki.linnakangas@i 986 [ # # ]:UBC 0 : ereport(ERROR,
987 : : (errcode(ERRCODE_INVALID_NAME),
988 : : errmsg("relative path not allowed for COPY to file")));
989 : :
1938 heikki.linnakangas@i 990 :CBC 37 : oumask = umask(S_IWGRP | S_IWOTH);
991 [ + - ]: 37 : PG_TRY();
992 : : {
993 : 37 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
994 : : }
1938 heikki.linnakangas@i 995 :UBC 0 : PG_FINALLY();
996 : : {
1938 heikki.linnakangas@i 997 :CBC 37 : umask(oumask);
998 : : }
999 [ - + ]: 37 : PG_END_TRY();
1000 [ - + ]: 37 : if (cstate->copy_file == NULL)
1001 : : {
1002 : : /* copy errno because ereport subfunctions might change it */
1938 heikki.linnakangas@i 1003 :UBC 0 : int save_errno = errno;
1004 : :
1005 [ # # # # : 0 : ereport(ERROR,
# # ]
1006 : : (errcode_for_file_access(),
1007 : : errmsg("could not open file \"%s\" for writing: %m",
1008 : : cstate->filename),
1009 : : (save_errno == ENOENT || save_errno == EACCES) ?
1010 : : errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1011 : : "You may want a client-side facility such as psql's \\copy.") : 0));
1012 : : }
1013 : :
1938 heikki.linnakangas@i 1014 [ - + ]:CBC 37 : if (fstat(fileno(cstate->copy_file), &st))
1938 heikki.linnakangas@i 1015 [ # # ]:UBC 0 : ereport(ERROR,
1016 : : (errcode_for_file_access(),
1017 : : errmsg("could not stat file \"%s\": %m",
1018 : : cstate->filename)));
1019 : :
1938 heikki.linnakangas@i 1020 [ - + ]:CBC 37 : if (S_ISDIR(st.st_mode))
1938 heikki.linnakangas@i 1021 [ # # ]:UBC 0 : ereport(ERROR,
1022 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1023 : : errmsg("\"%s\" is a directory", cstate->filename)));
1024 : : }
1025 : : }
1026 : :
1027 : : /* initialize progress */
1894 tomas.vondra@postgre 1028 :CBC 4794 : pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
1029 [ + + ]: 4794 : cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1832 michael@paquier.xyz 1030 : 4794 : pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
1031 : :
1894 tomas.vondra@postgre 1032 : 4794 : cstate->bytes_processed = 0;
1033 : :
1938 heikki.linnakangas@i 1034 : 4794 : MemoryContextSwitchTo(oldcontext);
1035 : :
1036 : 4794 : return cstate;
1037 : : }
1038 : :
1039 : : /*
1040 : : * Clean up storage and release resources for COPY TO.
1041 : : */
1042 : : void
1043 : 4793 : EndCopyTo(CopyToState cstate)
1044 : : {
1045 [ + + ]: 4793 : if (cstate->queryDesc != NULL)
1046 : : {
1047 : : /* Close down the query and free resources. */
1048 : 228 : ExecutorFinish(cstate->queryDesc);
1049 : 228 : ExecutorEnd(cstate->queryDesc);
1050 : 228 : FreeQueryDesc(cstate->queryDesc);
1051 : 228 : PopActiveSnapshot();
1052 : : }
1053 : :
1054 : : /* Clean up storage */
1055 : 4793 : EndCopy(cstate);
1056 : 4793 : }
1057 : :
1058 : : /*
1059 : : * Copy from relation or query TO file.
1060 : : *
1061 : : * Returns the number of rows processed.
1062 : : */
1063 : : uint64
1837 1064 : 4794 : DoCopyTo(CopyToState cstate)
1065 : : {
1251 michael@paquier.xyz 1066 [ + + + + ]: 4794 : bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
1837 heikki.linnakangas@i 1067 [ + + + - ]: 4794 : bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1068 : : TupleDesc tupDesc;
1069 : : int num_phys_attrs;
1070 : : ListCell *cur;
146 msawada@postgresql.o 1071 :GNC 4794 : uint64 processed = 0;
1072 : :
1837 heikki.linnakangas@i 1073 [ + + ]:CBC 4794 : if (fe_copy)
1074 : 4756 : SendCopyBegin(cstate);
1075 : :
1938 1076 [ + + ]: 4794 : if (cstate->rel)
1077 : 4566 : tupDesc = RelationGetDescr(cstate->rel);
1078 : : else
1079 : 228 : tupDesc = cstate->queryDesc->tupDesc;
1080 : 4794 : num_phys_attrs = tupDesc->natts;
1768 tgl@sss.pgh.pa.us 1081 : 4794 : cstate->opts.null_print_client = cstate->opts.null_print; /* default */
1082 : :
1083 : : /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1938 heikki.linnakangas@i 1084 : 4794 : cstate->fe_msgbuf = makeStringInfo();
1085 : :
1086 : : /* Get info about the columns we need to process. */
1087 : 4794 : cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1088 [ + + + + : 21116 : foreach(cur, cstate->attnumlist)
+ + ]
1089 : : {
1090 : 16323 : int attnum = lfirst_int(cur);
1091 : 16323 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1092 : :
381 msawada@postgresql.o 1093 : 16323 : cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
1094 : 16323 : &cstate->out_functions[attnum - 1]);
1095 : : }
1096 : :
1097 : : /*
1098 : : * Create a temporary memory context that we can reset once per row to
1099 : : * recover palloc'd memory. This avoids any problems with leaks inside
1100 : : * datatype output routines, and should be faster than retail pfree's
1101 : : * anyway. (We don't need a whole econtext as CopyFrom does.)
1102 : : */
1938 heikki.linnakangas@i 1103 : 4793 : cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1104 : : "COPY TO",
1105 : : ALLOCSET_DEFAULT_SIZES);
1106 : :
381 msawada@postgresql.o 1107 : 4793 : cstate->routine->CopyToStart(cstate, tupDesc);
1108 : :
1938 heikki.linnakangas@i 1109 [ + + ]: 4793 : if (cstate->rel)
1110 : : {
1111 : : /*
1112 : : * If COPY TO source table is a partitioned table, then open each
1113 : : * partition and process each individual partition.
1114 : : */
146 msawada@postgresql.o 1115 [ + + ]:GNC 4565 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1116 : : {
1117 [ + - + + : 64 : foreach_oid(child, cstate->partitions)
+ + ]
1118 : : {
1119 : : Relation scan_rel;
1120 : :
1121 : : /* We already got the lock in BeginCopyTo */
1122 : 30 : scan_rel = table_open(child, NoLock);
1123 : 30 : CopyRelationTo(cstate, scan_rel, cstate->rel, &processed);
1124 : 30 : table_close(scan_rel, NoLock);
1125 : : }
1126 : : }
1127 : : else
1128 : 4548 : CopyRelationTo(cstate, cstate->rel, NULL, &processed);
1129 : : }
1130 : : else
1131 : : {
1132 : : /* run the plan --- the dest receiver will send tuples */
461 tgl@sss.pgh.pa.us 1133 :CBC 228 : ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0);
1938 heikki.linnakangas@i 1134 : 228 : processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
1135 : : }
1136 : :
381 msawada@postgresql.o 1137 : 4793 : cstate->routine->CopyToEnd(cstate);
1138 : :
1938 heikki.linnakangas@i 1139 : 4793 : MemoryContextDelete(cstate->rowcontext);
1140 : :
1837 1141 [ + + ]: 4793 : if (fe_copy)
1142 : 4755 : SendCopyEnd(cstate);
1143 : :
1938 1144 : 4793 : return processed;
1145 : : }
1146 : :
1147 : : /*
1148 : : * Scans a single table and exports its rows to the COPY destination.
1149 : : *
1150 : : * root_rel can be set to the root table of rel if rel is a partition
1151 : : * table so that we can send tuples in root_rel's rowtype, which might
1152 : : * differ from individual partitions.
1153 : : */
1154 : : static void
146 msawada@postgresql.o 1155 :GNC 4578 : CopyRelationTo(CopyToState cstate, Relation rel, Relation root_rel, uint64 *processed)
1156 : : {
1157 : : TupleTableSlot *slot;
1158 : : TableScanDesc scandesc;
1159 : 4578 : AttrMap *map = NULL;
1160 : 4578 : TupleTableSlot *root_slot = NULL;
1161 : :
1162 : 4578 : scandesc = table_beginscan(rel, GetActiveSnapshot(), 0, NULL);
1163 : 4578 : slot = table_slot_create(rel, NULL);
1164 : :
1165 : : /*
1166 : : * If we are exporting partition data here, we check if converting tuples
1167 : : * to the root table's rowtype, because a partition might have column
1168 : : * order different than its root table.
1169 : : */
1170 [ + + ]: 4578 : if (root_rel != NULL)
1171 : : {
1172 : 30 : root_slot = table_slot_create(root_rel, NULL);
1173 : 30 : map = build_attrmap_by_name_if_req(RelationGetDescr(root_rel),
1174 : : RelationGetDescr(rel),
1175 : : false);
1176 : : }
1177 : :
1178 [ + + ]: 1835617 : while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
1179 : : {
1180 : : TupleTableSlot *copyslot;
1181 : :
1182 [ - + ]: 1831039 : CHECK_FOR_INTERRUPTS();
1183 : :
1184 [ + + ]: 1831039 : if (map != NULL)
1185 : 14 : copyslot = execute_attr_map_slot(map, slot, root_slot);
1186 : : else
1187 : : {
1188 : : /* Deconstruct the tuple */
1189 : 1831025 : slot_getallattrs(slot);
1190 : 1831025 : copyslot = slot;
1191 : : }
1192 : :
1193 : : /* Format and send the data */
1194 : 1831039 : CopyOneRowTo(cstate, copyslot);
1195 : :
1196 : : /*
1197 : : * Increment the number of processed tuples, and report the progress.
1198 : : */
1199 : 1831039 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1200 : 1831039 : ++(*processed));
1201 : : }
1202 : :
1203 : 4578 : ExecDropSingleTupleTableSlot(slot);
1204 : :
1205 [ + + ]: 4578 : if (root_slot != NULL)
1206 : 30 : ExecDropSingleTupleTableSlot(root_slot);
1207 : :
1208 [ + + ]: 4578 : if (map != NULL)
1209 : 6 : free_attrmap(map);
1210 : :
1211 : 4578 : table_endscan(scandesc);
1212 : 4578 : }
1213 : :
1214 : : /*
1215 : : * Emit one row during DoCopyTo().
1216 : : */
1217 : : static inline void
1938 heikki.linnakangas@i 1218 :CBC 1834596 : CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
1219 : : {
1220 : : MemoryContext oldcontext;
1221 : :
1222 : 1834596 : MemoryContextReset(cstate->rowcontext);
1223 : 1834596 : oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
1224 : :
1225 : : /* Make sure the tuple is fully deconstructed */
1226 : 1834596 : slot_getallattrs(slot);
1227 : :
381 msawada@postgresql.o 1228 : 1834596 : cstate->routine->CopyToOneRow(cstate, slot);
1229 : :
1938 heikki.linnakangas@i 1230 : 1834596 : MemoryContextSwitchTo(oldcontext);
1231 : 1834596 : }
1232 : :
1233 : : /*
1234 : : * Send text representation of one attribute, with conversion and escaping
1235 : : */
1236 : : #define DUMPSOFAR() \
1237 : : do { \
1238 : : if (ptr > start) \
1239 : : CopySendData(cstate, start, ptr - start); \
1240 : : } while (0)
1241 : :
1242 : : static void
767 michael@paquier.xyz 1243 : 6511515 : CopyAttributeOutText(CopyToState cstate, const char *string)
1244 : : {
1245 : : const char *ptr;
1246 : : const char *start;
1247 : : char c;
1938 heikki.linnakangas@i 1248 : 6511515 : char delimc = cstate->opts.delim[0];
1249 : :
1250 [ + + ]: 6511515 : if (cstate->need_transcoding)
1938 heikki.linnakangas@i 1251 :GBC 3 : ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
1252 : : else
1938 heikki.linnakangas@i 1253 :CBC 6511512 : ptr = string;
1254 : :
1255 : : /*
1256 : : * We have to grovel through the string searching for control characters
1257 : : * and instances of the delimiter character. In most cases, though, these
1258 : : * are infrequent. To avoid overhead from calling CopySendData once per
1259 : : * character, we dump out all characters between escaped characters in a
1260 : : * single call. The loop invariant is that the data from "start" to "ptr"
1261 : : * can be sent literally, but hasn't yet been.
1262 : : *
1263 : : * We can skip pg_encoding_mblen() overhead when encoding is safe, because
1264 : : * in valid backend encodings, extra bytes of a multibyte character never
1265 : : * look like ASCII. This loop is sufficiently performance-critical that
1266 : : * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
1267 : : * of the normal safe-encoding path.
1268 : : */
1269 [ + + ]: 6511515 : if (cstate->encoding_embeds_ascii)
1270 : : {
1938 heikki.linnakangas@i 1271 :GBC 3 : start = ptr;
1272 [ + + ]: 9 : while ((c = *ptr) != '\0')
1273 : : {
1274 [ - + ]: 6 : if ((unsigned char) c < (unsigned char) 0x20)
1275 : : {
1276 : : /*
1277 : : * \r and \n must be escaped, the others are traditional. We
1278 : : * prefer to dump these using the C-like notation, rather than
1279 : : * a backslash and the literal character, because it makes the
1280 : : * dump file a bit more proof against Microsoftish data
1281 : : * mangling.
1282 : : */
1938 heikki.linnakangas@i 1283 [ # # # # :UBC 0 : switch (c)
# # # ]
1284 : : {
1285 : 0 : case '\b':
1286 : 0 : c = 'b';
1287 : 0 : break;
1288 : 0 : case '\f':
1289 : 0 : c = 'f';
1290 : 0 : break;
1291 : 0 : case '\n':
1292 : 0 : c = 'n';
1293 : 0 : break;
1294 : 0 : case '\r':
1295 : 0 : c = 'r';
1296 : 0 : break;
1297 : 0 : case '\t':
1298 : 0 : c = 't';
1299 : 0 : break;
1300 : 0 : case '\v':
1301 : 0 : c = 'v';
1302 : 0 : break;
1303 : 0 : default:
1304 : : /* If it's the delimiter, must backslash it */
1305 [ # # ]: 0 : if (c == delimc)
1306 : 0 : break;
1307 : : /* All ASCII control chars are length 1 */
1308 : 0 : ptr++;
1309 : 0 : continue; /* fall to end of loop */
1310 : : }
1311 : : /* if we get here, we need to convert the control char */
1312 [ # # ]: 0 : DUMPSOFAR();
1313 : 0 : CopySendChar(cstate, '\\');
1314 : 0 : CopySendChar(cstate, c);
1315 : 0 : start = ++ptr; /* do not include char in next run */
1316 : : }
1938 heikki.linnakangas@i 1317 [ + - - + ]:GBC 6 : else if (c == '\\' || c == delimc)
1318 : : {
1938 heikki.linnakangas@i 1319 [ # # ]:UBC 0 : DUMPSOFAR();
1320 : 0 : CopySendChar(cstate, '\\');
1321 : 0 : start = ptr++; /* we include char in next run */
1322 : : }
1938 heikki.linnakangas@i 1323 [ + + ]:GBC 6 : else if (IS_HIGHBIT_SET(c))
1324 : 3 : ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
1325 : : else
1326 : 3 : ptr++;
1327 : : }
1328 : : }
1329 : : else
1330 : : {
1938 heikki.linnakangas@i 1331 :CBC 6511512 : start = ptr;
1332 [ + + ]: 71295853 : while ((c = *ptr) != '\0')
1333 : : {
1334 [ + + ]: 64784341 : if ((unsigned char) c < (unsigned char) 0x20)
1335 : : {
1336 : : /*
1337 : : * \r and \n must be escaped, the others are traditional. We
1338 : : * prefer to dump these using the C-like notation, rather than
1339 : : * a backslash and the literal character, because it makes the
1340 : : * dump file a bit more proof against Microsoftish data
1341 : : * mangling.
1342 : : */
1343 [ - - + - : 6938 : switch (c)
+ - - ]
1344 : : {
1938 heikki.linnakangas@i 1345 :UBC 0 : case '\b':
1346 : 0 : c = 'b';
1347 : 0 : break;
1348 : 0 : case '\f':
1349 : 0 : c = 'f';
1350 : 0 : break;
1938 heikki.linnakangas@i 1351 :CBC 5865 : case '\n':
1352 : 5865 : c = 'n';
1353 : 5865 : break;
1938 heikki.linnakangas@i 1354 :UBC 0 : case '\r':
1355 : 0 : c = 'r';
1356 : 0 : break;
1938 heikki.linnakangas@i 1357 :CBC 1073 : case '\t':
1358 : 1073 : c = 't';
1359 : 1073 : break;
1938 heikki.linnakangas@i 1360 :UBC 0 : case '\v':
1361 : 0 : c = 'v';
1362 : 0 : break;
1363 : 0 : default:
1364 : : /* If it's the delimiter, must backslash it */
1365 [ # # ]: 0 : if (c == delimc)
1366 : 0 : break;
1367 : : /* All ASCII control chars are length 1 */
1368 : 0 : ptr++;
1369 : 0 : continue; /* fall to end of loop */
1370 : : }
1371 : : /* if we get here, we need to convert the control char */
1938 heikki.linnakangas@i 1372 [ + + ]:CBC 6938 : DUMPSOFAR();
1373 : 6938 : CopySendChar(cstate, '\\');
1374 : 6938 : CopySendChar(cstate, c);
1375 : 6938 : start = ++ptr; /* do not include char in next run */
1376 : : }
1377 [ + + - + ]: 64777403 : else if (c == '\\' || c == delimc)
1378 : : {
1379 [ + + ]: 2273 : DUMPSOFAR();
1380 : 2273 : CopySendChar(cstate, '\\');
1381 : 2273 : start = ptr++; /* we include char in next run */
1382 : : }
1383 : : else
1384 : 64775130 : ptr++;
1385 : : }
1386 : : }
1387 : :
1388 [ + + ]: 6511515 : DUMPSOFAR();
1389 : 6511515 : }
1390 : :
1391 : : /*
1392 : : * Send text representation of one attribute, with conversion and
1393 : : * CSV-style escaping
1394 : : */
1395 : : static void
1507 peter@eisentraut.org 1396 : 312 : CopyAttributeOutCSV(CopyToState cstate, const char *string,
1397 : : bool use_quote)
1398 : : {
1399 : : const char *ptr;
1400 : : const char *start;
1401 : : char c;
1938 heikki.linnakangas@i 1402 : 312 : char delimc = cstate->opts.delim[0];
1403 : 312 : char quotec = cstate->opts.quote[0];
1404 : 312 : char escapec = cstate->opts.escape[0];
767 michael@paquier.xyz 1405 : 312 : bool single_attr = (list_length(cstate->attnumlist) == 1);
1406 : :
1407 : : /* force quoting if it matches null_print (before conversion!) */
1938 heikki.linnakangas@i 1408 [ + + + + ]: 312 : if (!use_quote && strcmp(string, cstate->opts.null_print) == 0)
1409 : 27 : use_quote = true;
1410 : :
1411 [ + + ]: 312 : if (cstate->need_transcoding)
1938 heikki.linnakangas@i 1412 :GBC 3 : ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
1413 : : else
1938 heikki.linnakangas@i 1414 :CBC 309 : ptr = string;
1415 : :
1416 : : /*
1417 : : * Make a preliminary pass to discover if it needs quoting
1418 : : */
1419 [ + + ]: 312 : if (!use_quote)
1420 : : {
1421 : : /*
1422 : : * Quote '\.' if it appears alone on a line, so that it will not be
1423 : : * interpreted as an end-of-data marker. (PG 18 and up will not
1424 : : * interpret '\.' in CSV that way, except in embedded-in-SQL data; but
1425 : : * we want the data to be loadable by older versions too. Also, this
1426 : : * avoids breaking clients that are still using PQgetline().)
1427 : : */
1428 [ + + + + ]: 219 : if (single_attr && strcmp(ptr, "\\.") == 0)
1429 : 3 : use_quote = true;
1430 : : else
1431 : : {
1507 peter@eisentraut.org 1432 : 216 : const char *tptr = ptr;
1433 : :
1938 heikki.linnakangas@i 1434 [ + + ]: 1110 : while ((c = *tptr) != '\0')
1435 : : {
1436 [ + + + + : 963 : if (c == delimc || c == quotec || c == '\n' || c == '\r')
+ + + + ]
1437 : : {
1438 : 69 : use_quote = true;
1439 : 69 : break;
1440 : : }
1441 [ + + + + ]: 894 : if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
1938 heikki.linnakangas@i 1442 :GBC 3 : tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
1443 : : else
1938 heikki.linnakangas@i 1444 :CBC 891 : tptr++;
1445 : : }
1446 : : }
1447 : : }
1448 : :
1449 [ + + ]: 312 : if (use_quote)
1450 : : {
1451 : 165 : CopySendChar(cstate, quotec);
1452 : :
1453 : : /*
1454 : : * We adopt the same optimization strategy as in CopyAttributeOutText
1455 : : */
1456 : 165 : start = ptr;
1457 [ + + ]: 1278 : while ((c = *ptr) != '\0')
1458 : : {
1459 [ + + + + ]: 1113 : if (c == quotec || c == escapec)
1460 : : {
1461 [ + + ]: 78 : DUMPSOFAR();
1462 : 78 : CopySendChar(cstate, escapec);
1463 : 78 : start = ptr; /* we include char in next run */
1464 : : }
1465 [ + + + - ]: 1113 : if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
1938 heikki.linnakangas@i 1466 :GBC 3 : ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
1467 : : else
1938 heikki.linnakangas@i 1468 :CBC 1110 : ptr++;
1469 : : }
1470 [ + + ]: 165 : DUMPSOFAR();
1471 : :
1472 : 165 : CopySendChar(cstate, quotec);
1473 : : }
1474 : : else
1475 : : {
1476 : : /* If it doesn't need quoting, we can just dump it as-is */
1477 : 147 : CopySendString(cstate, ptr);
1478 : : }
1479 : 312 : }
1480 : :
1481 : : /*
1482 : : * copy_dest_startup --- executor startup
1483 : : */
1484 : : static void
1485 : 228 : copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
1486 : : {
1487 : : /* no-op */
1488 : 228 : }
1489 : :
1490 : : /*
1491 : : * copy_dest_receive --- receive one tuple
1492 : : */
1493 : : static bool
1494 : 3557 : copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
1495 : : {
1496 : 3557 : DR_copy *myState = (DR_copy *) self;
1768 tgl@sss.pgh.pa.us 1497 : 3557 : CopyToState cstate = myState->cstate;
1498 : :
1499 : : /* Send the data */
1938 heikki.linnakangas@i 1500 : 3557 : CopyOneRowTo(cstate, slot);
1501 : :
1502 : : /* Increment the number of processed tuples, and report the progress */
1832 michael@paquier.xyz 1503 : 3557 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1504 : 3557 : ++myState->processed);
1505 : :
1938 heikki.linnakangas@i 1506 : 3557 : return true;
1507 : : }
1508 : :
1509 : : /*
1510 : : * copy_dest_shutdown --- executor end
1511 : : */
1512 : : static void
1513 : 228 : copy_dest_shutdown(DestReceiver *self)
1514 : : {
1515 : : /* no-op */
1516 : 228 : }
1517 : :
1518 : : /*
1519 : : * copy_dest_destroy --- release DestReceiver object
1520 : : */
1521 : : static void
1938 heikki.linnakangas@i 1522 :UBC 0 : copy_dest_destroy(DestReceiver *self)
1523 : : {
1524 : 0 : pfree(self);
1525 : 0 : }
1526 : :
1527 : : /*
1528 : : * CreateCopyDestReceiver -- create a suitable DestReceiver object
1529 : : */
1530 : : DestReceiver *
1938 heikki.linnakangas@i 1531 :CBC 231 : CreateCopyDestReceiver(void)
1532 : : {
95 michael@paquier.xyz 1533 :GNC 231 : DR_copy *self = palloc_object(DR_copy);
1534 : :
1938 heikki.linnakangas@i 1535 :CBC 231 : self->pub.receiveSlot = copy_dest_receive;
1536 : 231 : self->pub.rStartup = copy_dest_startup;
1537 : 231 : self->pub.rShutdown = copy_dest_shutdown;
1538 : 231 : self->pub.rDestroy = copy_dest_destroy;
1539 : 231 : self->pub.mydest = DestCopyOut;
1540 : :
1541 : 231 : self->cstate = NULL; /* will be set later */
1542 : 231 : self->processed = 0;
1543 : :
1544 : 231 : return (DestReceiver *) self;
1545 : : }
|