Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * copyfrom.c
4 : : * COPY <table> FROM file/program/client
5 : : *
6 : : * This file contains routines needed to efficiently load tuples into a
7 : : * table. That includes looking up the correct partition, firing triggers,
8 : : * calling the table AM function to insert the data, and updating indexes.
9 : : * Reading data from the input file or client and parsing it into Datums
10 : : * is handled in copyfromparse.c.
11 : : *
12 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
13 : : * Portions Copyright (c) 1994, Regents of the University of California
14 : : *
15 : : *
16 : : * IDENTIFICATION
17 : : * src/backend/commands/copyfrom.c
18 : : *
19 : : *-------------------------------------------------------------------------
20 : : */
21 : : #include "postgres.h"
22 : :
23 : : #include <ctype.h>
24 : : #include <unistd.h>
25 : : #include <sys/stat.h>
26 : :
27 : : #include "access/heapam.h"
28 : : #include "access/tableam.h"
29 : : #include "access/xact.h"
30 : : #include "catalog/namespace.h"
31 : : #include "commands/copyapi.h"
32 : : #include "commands/copyfrom_internal.h"
33 : : #include "commands/progress.h"
34 : : #include "commands/trigger.h"
35 : : #include "executor/execPartition.h"
36 : : #include "executor/executor.h"
37 : : #include "executor/nodeModifyTable.h"
38 : : #include "executor/tuptable.h"
39 : : #include "foreign/fdwapi.h"
40 : : #include "mb/pg_wchar.h"
41 : : #include "miscadmin.h"
42 : : #include "nodes/miscnodes.h"
43 : : #include "optimizer/optimizer.h"
44 : : #include "pgstat.h"
45 : : #include "rewrite/rewriteHandler.h"
46 : : #include "storage/fd.h"
47 : : #include "tcop/tcopprot.h"
48 : : #include "utils/lsyscache.h"
49 : : #include "utils/memutils.h"
50 : : #include "utils/portal.h"
51 : : #include "utils/rel.h"
52 : : #include "utils/snapmgr.h"
53 : : #include "utils/typcache.h"
54 : :
55 : : /*
56 : : * No more than this many tuples per CopyMultiInsertBuffer
57 : : *
58 : : * Caution: Don't make this too big, as we could end up with this many
59 : : * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
60 : : * multiInsertBuffers list. Increasing this can cause quadratic growth in
61 : : * memory requirements during copies into partitioned tables with a large
62 : : * number of partitions.
63 : : */
64 : : #define MAX_BUFFERED_TUPLES 1000
65 : :
66 : : /*
67 : : * Flush buffers if there are >= this many bytes, as counted by the input
68 : : * size, of tuples stored.
69 : : */
70 : : #define MAX_BUFFERED_BYTES 65535
71 : :
72 : : /*
73 : : * Trim the list of buffers back down to this number after flushing. This
74 : : * must be >= 2.
75 : : */
76 : : #define MAX_PARTITION_BUFFERS 32
77 : :
78 : : /* Stores multi-insert data related to a single relation in CopyFrom. */
79 : : typedef struct CopyMultiInsertBuffer
80 : : {
81 : : TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
82 : : ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
83 : : BulkInsertState bistate; /* BulkInsertState for this rel if plain
84 : : * table; NULL if foreign table */
85 : : int nused; /* number of 'slots' containing tuples */
86 : : uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
87 : : * stream */
88 : : } CopyMultiInsertBuffer;
89 : :
90 : : /*
91 : : * Stores one or many CopyMultiInsertBuffers and details about the size and
92 : : * number of tuples which are stored in them. This allows multiple buffers to
93 : : * exist at once when COPYing into a partitioned table.
94 : : */
95 : : typedef struct CopyMultiInsertInfo
96 : : {
97 : : List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
98 : : int bufferedTuples; /* number of tuples buffered over all buffers */
99 : : int bufferedBytes; /* number of bytes from all buffered tuples */
100 : : CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */
101 : : EState *estate; /* Executor state used for COPY */
102 : : CommandId mycid; /* Command Id used for COPY */
103 : : int ti_options; /* table insert options */
104 : : } CopyMultiInsertInfo;
105 : :
106 : :
107 : : /* non-export function prototypes */
108 : : static void ClosePipeFromProgram(CopyFromState cstate);
109 : :
110 : : /*
111 : : * Built-in format-specific routines. One-row callbacks are defined in
112 : : * copyfromparse.c.
113 : : */
114 : : static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
115 : : Oid *typioparam);
116 : : static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc);
117 : : static void CopyFromTextLikeEnd(CopyFromState cstate);
118 : : static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
119 : : FmgrInfo *finfo, Oid *typioparam);
120 : : static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
121 : : static void CopyFromBinaryEnd(CopyFromState cstate);
122 : :
123 : :
124 : : /*
125 : : * COPY FROM routines for built-in formats.
126 : : *
127 : : * CSV and text formats share the same TextLike routines except for the
128 : : * one-row callback.
129 : : */
130 : :
131 : : /* text format */
132 : : static const CopyFromRoutine CopyFromRoutineText = {
133 : : .CopyFromInFunc = CopyFromTextLikeInFunc,
134 : : .CopyFromStart = CopyFromTextLikeStart,
135 : : .CopyFromOneRow = CopyFromTextOneRow,
136 : : .CopyFromEnd = CopyFromTextLikeEnd,
137 : : };
138 : :
139 : : /* CSV format */
140 : : static const CopyFromRoutine CopyFromRoutineCSV = {
141 : : .CopyFromInFunc = CopyFromTextLikeInFunc,
142 : : .CopyFromStart = CopyFromTextLikeStart,
143 : : .CopyFromOneRow = CopyFromCSVOneRow,
144 : : .CopyFromEnd = CopyFromTextLikeEnd,
145 : : };
146 : :
147 : : /* binary format */
148 : : static const CopyFromRoutine CopyFromRoutineBinary = {
149 : : .CopyFromInFunc = CopyFromBinaryInFunc,
150 : : .CopyFromStart = CopyFromBinaryStart,
151 : : .CopyFromOneRow = CopyFromBinaryOneRow,
152 : : .CopyFromEnd = CopyFromBinaryEnd,
153 : : };
154 : :
155 : : /* Return a COPY FROM routine for the given options */
156 : : static const CopyFromRoutine *
376 msawada@postgresql.o 157 :CBC 991 : CopyFromGetRoutine(const CopyFormatOptions *opts)
158 : : {
159 [ + + ]: 991 : if (opts->csv_mode)
380 160 : 149 : return &CopyFromRoutineCSV;
376 161 [ + + ]: 842 : else if (opts->binary)
380 162 : 8 : return &CopyFromRoutineBinary;
163 : :
164 : : /* default is text */
165 : 834 : return &CopyFromRoutineText;
166 : : }
167 : :
168 : : /* Implementation of the start callback for text and CSV formats */
169 : : static void
170 : 971 : CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
171 : : {
172 : : AttrNumber attr_count;
173 : :
174 : : /*
175 : : * If encoding conversion is needed, we need another buffer to hold the
176 : : * converted input data. Otherwise, we can just point input_buf to the
177 : : * same buffer as raw_buf.
178 : : */
179 [ + + ]: 971 : if (cstate->need_transcoding)
180 : : {
181 : 18 : cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
182 : 18 : cstate->input_buf_index = cstate->input_buf_len = 0;
183 : : }
184 : : else
185 : 953 : cstate->input_buf = cstate->raw_buf;
186 : 971 : cstate->input_reached_eof = false;
187 : :
188 : 971 : initStringInfo(&cstate->line_buf);
189 : :
190 : : /*
191 : : * Create workspace for CopyReadAttributes results; used by CSV and text
192 : : * format.
193 : : */
194 : 971 : attr_count = list_length(cstate->attnumlist);
195 : 971 : cstate->max_fields = attr_count;
196 : 971 : cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
197 : 971 : }
198 : :
199 : : /*
200 : : * Implementation of the infunc callback for text and CSV formats. Assign
201 : : * the input function data to the given *finfo.
202 : : */
203 : : static void
204 : 2822 : CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
205 : : Oid *typioparam)
206 : : {
207 : : Oid func_oid;
208 : :
209 : 2822 : getTypeInputInfo(atttypid, &func_oid, typioparam);
210 : 2822 : fmgr_info(func_oid, finfo);
211 : 2822 : }
212 : :
213 : : /* Implementation of the end callback for text and CSV formats */
214 : : static void
215 : 635 : CopyFromTextLikeEnd(CopyFromState cstate)
216 : : {
217 : : /* nothing to do */
218 : 635 : }
219 : :
220 : : /* Implementation of the start callback for binary format */
221 : : static void
222 : 7 : CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
223 : : {
224 : : /* Read and verify binary header */
225 : 7 : ReceiveCopyBinaryHeader(cstate);
226 : 7 : }
227 : :
228 : : /*
229 : : * Implementation of the infunc callback for binary format. Assign
230 : : * the binary input function to the given *finfo.
231 : : */
232 : : static void
233 : 31 : CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
234 : : FmgrInfo *finfo, Oid *typioparam)
235 : : {
236 : : Oid func_oid;
237 : :
238 : 31 : getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
239 : 30 : fmgr_info(func_oid, finfo);
240 : 30 : }
241 : :
242 : : /* Implementation of the end callback for binary format */
243 : : static void
244 : 3 : CopyFromBinaryEnd(CopyFromState cstate)
245 : : {
246 : : /* nothing to do */
247 : 3 : }
248 : :
249 : : /*
250 : : * error context callback for COPY FROM
251 : : *
252 : : * The argument for the error context must be CopyFromState.
253 : : */
254 : : void
1938 heikki.linnakangas@i 255 : 190 : CopyFromErrorCallback(void *arg)
256 : : {
1809 257 : 190 : CopyFromState cstate = (CopyFromState) arg;
258 : :
1249 efujita@postgresql.o 259 [ + + ]: 190 : if (cstate->relname_only)
260 : : {
261 : 34 : errcontext("COPY %s",
262 : : cstate->cur_relname);
263 : 34 : return;
264 : : }
1938 heikki.linnakangas@i 265 [ + + ]: 156 : if (cstate->opts.binary)
266 : : {
267 : : /* can't usefully display the data */
268 [ + - ]: 1 : if (cstate->cur_attname)
351 peter@eisentraut.org 269 : 1 : errcontext("COPY %s, line %" PRIu64 ", column %s",
270 : : cstate->cur_relname,
271 : : cstate->cur_lineno,
272 : : cstate->cur_attname);
273 : : else
351 peter@eisentraut.org 274 :UBC 0 : errcontext("COPY %s, line %" PRIu64,
275 : : cstate->cur_relname,
276 : : cstate->cur_lineno);
277 : : }
278 : : else
279 : : {
1938 heikki.linnakangas@i 280 [ + + + + ]:CBC 155 : if (cstate->cur_attname && cstate->cur_attval)
281 : 26 : {
282 : : /* error is relevant to a particular column */
283 : : char *attval;
284 : :
713 msawada@postgresql.o 285 : 26 : attval = CopyLimitPrintoutLength(cstate->cur_attval);
351 peter@eisentraut.org 286 : 26 : errcontext("COPY %s, line %" PRIu64 ", column %s: \"%s\"",
287 : : cstate->cur_relname,
288 : : cstate->cur_lineno,
289 : : cstate->cur_attname,
290 : : attval);
1938 heikki.linnakangas@i 291 : 26 : pfree(attval);
292 : : }
293 [ + + ]: 129 : else if (cstate->cur_attname)
294 : : {
295 : : /* error is relevant to a particular column, value is NULL */
351 peter@eisentraut.org 296 : 6 : errcontext("COPY %s, line %" PRIu64 ", column %s: null input",
297 : : cstate->cur_relname,
298 : : cstate->cur_lineno,
299 : : cstate->cur_attname);
300 : : }
301 : : else
302 : : {
303 : : /*
304 : : * Error is relevant to a particular line.
305 : : *
306 : : * If line_buf still contains the correct line, print it.
307 : : */
1809 heikki.linnakangas@i 308 [ + + ]: 123 : if (cstate->line_buf_valid)
309 : : {
310 : : char *lineval;
311 : :
713 msawada@postgresql.o 312 : 100 : lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
351 peter@eisentraut.org 313 : 100 : errcontext("COPY %s, line %" PRIu64 ": \"%s\"",
314 : : cstate->cur_relname,
315 : : cstate->cur_lineno, lineval);
1938 heikki.linnakangas@i 316 : 100 : pfree(lineval);
317 : : }
318 : : else
319 : : {
351 peter@eisentraut.org 320 : 23 : errcontext("COPY %s, line %" PRIu64,
321 : : cstate->cur_relname,
322 : : cstate->cur_lineno);
323 : : }
324 : : }
325 : : }
326 : : }
327 : :
328 : : /*
329 : : * Make sure we don't print an unreasonable amount of COPY data in a message.
330 : : *
331 : : * Returns a pstrdup'd copy of the input.
332 : : */
333 : : char *
713 msawada@postgresql.o 334 : 156 : CopyLimitPrintoutLength(const char *str)
335 : : {
336 : : #define MAX_COPY_DATA_DISPLAY 100
337 : :
1938 heikki.linnakangas@i 338 : 156 : int slen = strlen(str);
339 : : int len;
340 : : char *res;
341 : :
342 : : /* Fast path if definitely okay */
343 [ + - ]: 156 : if (slen <= MAX_COPY_DATA_DISPLAY)
344 : 156 : return pstrdup(str);
345 : :
346 : : /* Apply encoding-dependent truncation */
1938 heikki.linnakangas@i 347 :UBC 0 : len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
348 : :
349 : : /*
350 : : * Truncate, and add "..." to show we truncated the input.
351 : : */
352 : 0 : res = (char *) palloc(len + 4);
353 : 0 : memcpy(res, str, len);
354 : 0 : strcpy(res + len, "...");
355 : :
356 : 0 : return res;
357 : : }
358 : :
359 : : /*
360 : : * Allocate memory and initialize a new CopyMultiInsertBuffer for this
361 : : * ResultRelInfo.
362 : : */
363 : : static CopyMultiInsertBuffer *
1938 heikki.linnakangas@i 364 :CBC 850 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
365 : : {
366 : : CopyMultiInsertBuffer *buffer;
367 : :
95 michael@paquier.xyz 368 :GNC 850 : buffer = palloc_object(CopyMultiInsertBuffer);
1938 heikki.linnakangas@i 369 :CBC 850 : memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
370 : 850 : buffer->resultRelInfo = rri;
1249 efujita@postgresql.o 371 [ + + ]: 850 : buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
1938 heikki.linnakangas@i 372 : 850 : buffer->nused = 0;
373 : :
374 : 850 : return buffer;
375 : : }
376 : :
377 : : /*
378 : : * Make a new buffer for this ResultRelInfo.
379 : : */
380 : : static inline void
381 : 850 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
382 : : ResultRelInfo *rri)
383 : : {
384 : : CopyMultiInsertBuffer *buffer;
385 : :
386 : 850 : buffer = CopyMultiInsertBufferInit(rri);
387 : :
388 : : /* Setup back-link so we can easily find this buffer again */
389 : 850 : rri->ri_CopyMultiInsertBuffer = buffer;
390 : : /* Record that we're tracking this buffer */
391 : 850 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
392 : 850 : }
393 : :
394 : : /*
395 : : * Initialize an already allocated CopyMultiInsertInfo.
396 : : *
397 : : * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
398 : : * for that table.
399 : : */
400 : : static void
401 : 847 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
402 : : CopyFromState cstate, EState *estate, CommandId mycid,
403 : : int ti_options)
404 : : {
405 : 847 : miinfo->multiInsertBuffers = NIL;
406 : 847 : miinfo->bufferedTuples = 0;
407 : 847 : miinfo->bufferedBytes = 0;
408 : 847 : miinfo->cstate = cstate;
409 : 847 : miinfo->estate = estate;
410 : 847 : miinfo->mycid = mycid;
411 : 847 : miinfo->ti_options = ti_options;
412 : :
413 : : /*
414 : : * Only setup the buffer when not dealing with a partitioned table.
415 : : * Buffers for partitioned tables will just be setup when we need to send
416 : : * tuples their way for the first time.
417 : : */
418 [ + + ]: 847 : if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
419 : 807 : CopyMultiInsertInfoSetupBuffer(miinfo, rri);
420 : 847 : }
421 : :
422 : : /*
423 : : * Returns true if the buffers are full
424 : : */
425 : : static inline bool
426 : 703467 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
427 : : {
428 [ + + ]: 703467 : if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
429 [ + + ]: 702869 : miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
430 : 651 : return true;
431 : 702816 : return false;
432 : : }
433 : :
434 : : /*
435 : : * Returns true if we have no buffered tuples
436 : : */
437 : : static inline bool
438 : 761 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
439 : : {
440 : 761 : return miinfo->bufferedTuples == 0;
441 : : }
442 : :
443 : : /*
444 : : * Write the tuples stored in 'buffer' out to the table.
445 : : */
446 : : static inline void
447 : 1350 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
448 : : CopyMultiInsertBuffer *buffer,
449 : : int64 *processed)
450 : : {
1809 451 : 1350 : CopyFromState cstate = miinfo->cstate;
1938 452 : 1350 : EState *estate = miinfo->estate;
453 : 1350 : int nused = buffer->nused;
454 : 1350 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
455 : 1350 : TupleTableSlot **slots = buffer->slots;
456 : : int i;
457 : :
1249 efujita@postgresql.o 458 [ + + ]: 1350 : if (resultRelInfo->ri_FdwRoutine)
459 : : {
460 : 7 : int batch_size = resultRelInfo->ri_BatchSize;
461 : 7 : int sent = 0;
462 : :
463 [ - + ]: 7 : Assert(buffer->bistate == NULL);
464 : :
465 : : /* Ensure that the FDW supports batching and it's enabled */
466 [ - + ]: 7 : Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
467 [ - + ]: 7 : Assert(batch_size > 1);
468 : :
469 : : /*
470 : : * We suppress error context information other than the relation name,
471 : : * if one of the operations below fails.
472 : : */
473 [ - + ]: 7 : Assert(!cstate->relname_only);
474 : 7 : cstate->relname_only = true;
475 : :
476 [ + + ]: 19 : while (sent < nused)
477 : : {
478 : 13 : int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
479 : 13 : int inserted = size;
480 : : TupleTableSlot **rslots;
481 : :
482 : : /* insert into foreign table: let the FDW do it */
483 : : rslots =
484 : 13 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
485 : : resultRelInfo,
486 : 13 : &slots[sent],
487 : : NULL,
488 : : &inserted);
489 : :
490 : 12 : sent += size;
491 : :
492 : : /* No need to do anything if there are no inserted rows */
493 [ + + ]: 12 : if (inserted <= 0)
494 : 2 : continue;
495 : :
496 : : /* Triggers on foreign tables should not have transition tables */
497 [ - + - - ]: 10 : Assert(resultRelInfo->ri_TrigDesc == NULL ||
498 : : resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
499 : :
500 : : /* Run AFTER ROW INSERT triggers */
501 [ - + ]: 10 : if (resultRelInfo->ri_TrigDesc != NULL &&
1249 efujita@postgresql.o 502 [ # # ]:UBC 0 : resultRelInfo->ri_TrigDesc->trig_insert_after_row)
503 : : {
504 : 0 : Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
505 : :
506 [ # # ]: 0 : for (i = 0; i < inserted; i++)
507 : : {
508 : 0 : TupleTableSlot *slot = rslots[i];
509 : :
510 : : /*
511 : : * AFTER ROW Triggers might reference the tableoid column,
512 : : * so (re-)initialize tts_tableOid before evaluating them.
513 : : */
514 : 0 : slot->tts_tableOid = relid;
515 : :
516 : 0 : ExecARInsertTriggers(estate, resultRelInfo,
517 : : slot, NIL,
518 : : cstate->transition_capture);
519 : : }
520 : : }
521 : :
522 : : /* Update the row counter and progress of the COPY command */
1249 efujita@postgresql.o 523 :CBC 10 : *processed += inserted;
524 : 10 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
525 : : *processed);
526 : : }
527 : :
528 [ + + ]: 24 : for (i = 0; i < nused; i++)
529 : 18 : ExecClearTuple(slots[i]);
530 : :
531 : : /* reset relname_only */
532 : 6 : cstate->relname_only = false;
533 : : }
534 : : else
535 : : {
536 : 1343 : CommandId mycid = miinfo->mycid;
537 : 1343 : int ti_options = miinfo->ti_options;
538 : 1343 : bool line_buf_valid = cstate->line_buf_valid;
539 : 1343 : uint64 save_cur_lineno = cstate->cur_lineno;
540 : : MemoryContext oldcontext;
541 : :
542 [ - + ]: 1343 : Assert(buffer->bistate != NULL);
543 : :
544 : : /*
545 : : * Print error context information correctly, if one of the operations
546 : : * below fails.
547 : : */
548 : 1343 : cstate->line_buf_valid = false;
549 : :
550 : : /*
551 : : * table_multi_insert may leak memory, so switch to short-lived memory
552 : : * context before calling it.
553 : : */
554 [ + - ]: 1343 : oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
555 : 1343 : table_multi_insert(resultRelInfo->ri_RelationDesc,
556 : : slots,
557 : : nused,
558 : : mycid,
559 : : ti_options,
703 akorotkov@postgresql 560 :GIC 1343 : buffer->bistate);
1249 efujita@postgresql.o 561 :CBC 1343 : MemoryContextSwitchTo(oldcontext);
562 : :
563 [ + + ]: 704716 : for (i = 0; i < nused; i++)
564 : : {
565 : : /*
566 : : * If there are any indexes, update them for all the inserted
567 : : * tuples, and run AFTER ROW INSERT triggers.
568 : : */
703 akorotkov@postgresql 569 [ + + ]: 703382 : if (resultRelInfo->ri_NumIndices > 0)
570 : : {
571 : : List *recheckIndexes;
572 : :
1249 efujita@postgresql.o 573 : 200955 : cstate->cur_lineno = buffer->linenos[i];
574 : : recheckIndexes =
575 : 200955 : ExecInsertIndexTuples(resultRelInfo,
576 : : estate, 0, buffer->slots[i],
577 : : NIL, NULL);
578 : 200946 : ExecARInsertTriggers(estate, resultRelInfo,
579 : 200946 : slots[i], recheckIndexes,
580 : : cstate->transition_capture);
581 : 200946 : list_free(recheckIndexes);
582 : : }
583 : :
584 : : /*
585 : : * There's no indexes, but see if we need to run AFTER ROW INSERT
586 : : * triggers anyway.
587 : : */
588 [ + + ]: 502427 : else if (resultRelInfo->ri_TrigDesc != NULL &&
589 [ + + ]: 42 : (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
590 [ + + ]: 33 : resultRelInfo->ri_TrigDesc->trig_insert_new_table))
591 : : {
592 : 18 : cstate->cur_lineno = buffer->linenos[i];
593 : 18 : ExecARInsertTriggers(estate, resultRelInfo,
594 : 18 : slots[i], NIL,
595 : : cstate->transition_capture);
596 : : }
597 : :
598 : 703373 : ExecClearTuple(slots[i]);
599 : : }
600 : :
601 : : /* Update the row counter and progress of the COPY command */
602 : 1334 : *processed += nused;
603 : 1334 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
604 : : *processed);
605 : :
606 : : /* reset cur_lineno and line_buf_valid to what they were */
607 : 1334 : cstate->line_buf_valid = line_buf_valid;
608 : 1334 : cstate->cur_lineno = save_cur_lineno;
609 : : }
610 : :
611 : : /* Mark that all slots are free */
1938 heikki.linnakangas@i 612 : 1340 : buffer->nused = 0;
613 : 1340 : }
614 : :
615 : : /*
616 : : * Drop used slots and free member for this buffer.
617 : : *
618 : : * The buffer must be flushed before cleanup.
619 : : */
620 : : static inline void
621 : 740 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
622 : : CopyMultiInsertBuffer *buffer)
623 : : {
1249 efujita@postgresql.o 624 : 740 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
625 : : int i;
626 : :
627 : : /* Ensure buffer was flushed */
1938 heikki.linnakangas@i 628 [ - + ]: 740 : Assert(buffer->nused == 0);
629 : :
630 : : /* Remove back-link to ourself */
1249 efujita@postgresql.o 631 : 740 : resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
632 : :
633 [ + + ]: 740 : if (resultRelInfo->ri_FdwRoutine == NULL)
634 : : {
635 [ - + ]: 734 : Assert(buffer->bistate != NULL);
636 : 734 : FreeBulkInsertState(buffer->bistate);
637 : : }
638 : : else
639 [ - + ]: 6 : Assert(buffer->bistate == NULL);
640 : :
641 : : /* Since we only create slots on demand, just drop the non-null ones. */
1938 heikki.linnakangas@i 642 [ + + + + ]: 124683 : for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
643 : 123943 : ExecDropSingleTupleTableSlot(buffer->slots[i]);
644 : :
1249 efujita@postgresql.o 645 [ + + ]: 740 : if (resultRelInfo->ri_FdwRoutine == NULL)
646 : 734 : table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
647 : : miinfo->ti_options);
648 : :
1938 heikki.linnakangas@i 649 : 740 : pfree(buffer);
650 : 740 : }
651 : :
652 : : /*
653 : : * Write out all stored tuples in all buffers out to the tables.
654 : : *
655 : : * Once flushed we also trim the tracked buffers list down to size by removing
656 : : * the buffers created earliest first.
657 : : *
658 : : * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
659 : : * used. When cleaning up old buffers we'll never remove the one for
660 : : * 'curr_rri'.
661 : : */
662 : : static inline void
1249 efujita@postgresql.o 663 : 1237 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
664 : : int64 *processed)
665 : : {
666 : : ListCell *lc;
667 : :
1938 heikki.linnakangas@i 668 [ + - + + : 2577 : foreach(lc, miinfo->multiInsertBuffers)
+ + ]
669 : : {
670 : 1350 : CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
671 : :
1249 efujita@postgresql.o 672 : 1350 : CopyMultiInsertBufferFlush(miinfo, buffer, processed);
673 : : }
674 : :
1938 heikki.linnakangas@i 675 : 1227 : miinfo->bufferedTuples = 0;
676 : 1227 : miinfo->bufferedBytes = 0;
677 : :
678 : : /*
679 : : * Trim the list of tracked buffers down if it exceeds the limit. Here we
680 : : * remove buffers starting with the ones we created first. It seems less
681 : : * likely that these older ones will be needed than the ones that were
682 : : * just created.
683 : : */
684 [ - + ]: 1227 : while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
685 : : {
686 : : CopyMultiInsertBuffer *buffer;
687 : :
1938 heikki.linnakangas@i 688 :UBC 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
689 : :
690 : : /*
691 : : * We never want to remove the buffer that's currently being used, so
692 : : * if we happen to find that then move it to the end of the list.
693 : : */
694 [ # # ]: 0 : if (buffer->resultRelInfo == curr_rri)
695 : : {
696 : : /*
697 : : * The code below would misbehave if we were trying to reduce the
698 : : * list to less than two items.
699 : : */
700 : : StaticAssertDecl(MAX_PARTITION_BUFFERS >= 2,
701 : : "MAX_PARTITION_BUFFERS must be >= 2");
702 : :
703 : 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
704 : 0 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
705 : 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
706 : : }
707 : :
708 : 0 : CopyMultiInsertBufferCleanup(miinfo, buffer);
709 : 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
710 : : }
1938 heikki.linnakangas@i 711 :CBC 1227 : }
712 : :
713 : : /*
714 : : * Cleanup allocated buffers and free memory
715 : : */
716 : : static inline void
717 : 737 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
718 : : {
719 : : ListCell *lc;
720 : :
721 [ + + + + : 1477 : foreach(lc, miinfo->multiInsertBuffers)
+ + ]
722 : 740 : CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
723 : :
724 : 737 : list_free(miinfo->multiInsertBuffers);
725 : 737 : }
726 : :
727 : : /*
728 : : * Get the next TupleTableSlot that the next tuple should be stored in.
729 : : *
730 : : * Callers must ensure that the buffer is not full.
731 : : *
732 : : * Note: 'miinfo' is unused but has been included for consistency with the
733 : : * other functions in this area.
734 : : */
735 : : static inline TupleTableSlot *
736 : 704328 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
737 : : ResultRelInfo *rri)
738 : : {
739 : 704328 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
740 : : int nused;
741 : :
742 [ - + ]: 704328 : Assert(buffer != NULL);
575 drowley@postgresql.o 743 [ - + ]: 704328 : Assert(buffer->nused < MAX_BUFFERED_TUPLES);
744 : :
745 : 704328 : nused = buffer->nused;
746 : :
1938 heikki.linnakangas@i 747 [ + + ]: 704328 : if (buffer->slots[nused] == NULL)
748 : 124128 : buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
749 : 704328 : return buffer->slots[nused];
750 : : }
751 : :
752 : : /*
753 : : * Record the previously reserved TupleTableSlot that was reserved by
754 : : * CopyMultiInsertInfoNextFreeSlot as being consumed.
755 : : */
756 : : static inline void
757 : 703467 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
758 : : TupleTableSlot *slot, int tuplen, uint64 lineno)
759 : : {
760 : 703467 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
761 : :
762 [ - + ]: 703467 : Assert(buffer != NULL);
763 [ - + ]: 703467 : Assert(slot == buffer->slots[buffer->nused]);
764 : :
765 : : /* Store the line number so we can properly report any errors later */
766 : 703467 : buffer->linenos[buffer->nused] = lineno;
767 : :
768 : : /* Record this slot as being used */
769 : 703467 : buffer->nused++;
770 : :
771 : : /* Update how many tuples are stored and their size */
772 : 703467 : miinfo->bufferedTuples++;
773 : 703467 : miinfo->bufferedBytes += tuplen;
774 : 703467 : }
775 : :
776 : : /*
777 : : * Copy FROM file to relation.
778 : : */
779 : : uint64
780 : 941 : CopyFrom(CopyFromState cstate)
781 : : {
782 : : ResultRelInfo *resultRelInfo;
783 : : ResultRelInfo *target_resultRelInfo;
784 : 941 : ResultRelInfo *prevResultRelInfo = NULL;
785 : 941 : EState *estate = CreateExecutorState(); /* for ExecConstraints() */
786 : : ModifyTableState *mtstate;
787 : : ExprContext *econtext;
788 : 941 : TupleTableSlot *singleslot = NULL;
789 : 941 : MemoryContext oldcontext = CurrentMemoryContext;
790 : :
791 : 941 : PartitionTupleRouting *proute = NULL;
792 : : ErrorContextCallback errcallback;
793 : 941 : CommandId mycid = GetCurrentCommandId(true);
794 : 941 : int ti_options = 0; /* start with default options for insert */
795 : 941 : BulkInsertState bistate = NULL;
796 : : CopyInsertMethod insertMethod;
797 : 941 : CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
1832 michael@paquier.xyz 798 : 941 : int64 processed = 0;
799 : 941 : int64 excluded = 0;
800 : : bool has_before_insert_row_trig;
801 : : bool has_instead_insert_row_trig;
1938 heikki.linnakangas@i 802 : 941 : bool leafpart_use_multi_insert = false;
803 : :
804 [ - + ]: 941 : Assert(cstate->rel);
805 [ - + ]: 941 : Assert(list_length(cstate->range_table) == 1);
806 : :
786 akorotkov@postgresql 807 [ + + ]: 941 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
789 808 [ - + ]: 45 : Assert(cstate->escontext);
809 : :
810 : : /*
811 : : * The target must be a plain, foreign, or partitioned relation, or have
812 : : * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
813 : : * allowed on views, so we only hint about them in the view case.)
814 : : */
1938 heikki.linnakangas@i 815 [ + + ]: 941 : if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
816 [ + + ]: 88 : cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
817 [ + + ]: 66 : cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
818 [ + + ]: 9 : !(cstate->rel->trigdesc &&
819 [ - + ]: 6 : cstate->rel->trigdesc->trig_insert_instead_row))
820 : : {
821 [ + - ]: 3 : if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
822 [ + - ]: 3 : ereport(ERROR,
823 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
824 : : errmsg("cannot copy to view \"%s\"",
825 : : RelationGetRelationName(cstate->rel)),
826 : : errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
1938 heikki.linnakangas@i 827 [ # # ]:UBC 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
828 [ # # ]: 0 : ereport(ERROR,
829 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
830 : : errmsg("cannot copy to materialized view \"%s\"",
831 : : RelationGetRelationName(cstate->rel))));
832 [ # # ]: 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
833 [ # # ]: 0 : ereport(ERROR,
834 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
835 : : errmsg("cannot copy to sequence \"%s\"",
836 : : RelationGetRelationName(cstate->rel))));
837 : : else
838 [ # # ]: 0 : ereport(ERROR,
839 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
840 : : errmsg("cannot copy to non-table relation \"%s\"",
841 : : RelationGetRelationName(cstate->rel))));
842 : : }
843 : :
844 : : /*
845 : : * If the target file is new-in-transaction, we assume that checking FSM
846 : : * for free space is a waste of time. This could possibly be wrong, but
847 : : * it's unlikely.
848 : : */
1938 heikki.linnakangas@i 849 [ + + + - :CBC 938 : if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
+ - + - -
+ ]
850 [ + + ]: 853 : (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
1348 rhaas@postgresql.org 851 [ + + ]: 847 : cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
1938 heikki.linnakangas@i 852 : 44 : ti_options |= TABLE_INSERT_SKIP_FSM;
853 : :
854 : : /*
855 : : * Optimize if new relation storage was created in this subxact or one of
856 : : * its committed children and we won't see those rows later as part of an
857 : : * earlier scan or command. The subxact test ensures that if this subxact
858 : : * aborts then the frozen rows won't be visible after xact cleanup. Note
859 : : * that the stronger test of exactly which subtransaction created it is
860 : : * crucial for correctness of this optimization. The test for an earlier
861 : : * scan or command tolerates false negatives. FREEZE causes other sessions
862 : : * to see rows they would not see under MVCC, and a false negative merely
863 : : * spreads that anomaly to the current session.
864 : : */
865 [ + + ]: 938 : if (cstate->opts.freeze)
866 : : {
867 : : /*
868 : : * We currently disallow COPY FREEZE on partitioned tables. The
869 : : * reason for this is that we've simply not yet opened the partitions
870 : : * to determine if the optimization can be applied to them. We could
871 : : * go and open them all here, but doing so may be quite a costly
872 : : * overhead for small copies. In any case, we may just end up routing
873 : : * tuples to a small number of partitions. It seems better just to
874 : : * raise an ERROR for partitioned tables.
875 : : */
876 [ + + ]: 36 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
877 : : {
878 [ + - ]: 3 : ereport(ERROR,
879 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
880 : : errmsg("cannot perform COPY FREEZE on a partitioned table")));
881 : : }
882 : :
883 : : /* There's currently no support for COPY FREEZE on foreign tables. */
402 nathan@postgresql.or 884 [ + + ]: 33 : if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
885 [ + - ]: 3 : ereport(ERROR,
886 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
887 : : errmsg("cannot perform COPY FREEZE on a foreign table")));
888 : :
889 : : /*
890 : : * Tolerate one registration for the benefit of FirstXactSnapshot.
891 : : * Scan-bearing queries generally create at least two registrations,
892 : : * though relying on that is fragile, as is ignoring ActiveSnapshot.
893 : : * Clear CatalogSnapshot to avoid counting its registration. We'll
894 : : * still detect ongoing catalog scans, each of which separately
895 : : * registers the snapshot it uses.
896 : : */
1938 heikki.linnakangas@i 897 : 30 : InvalidateCatalogSnapshot();
898 [ + - - + ]: 30 : if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
1938 heikki.linnakangas@i 899 [ # # ]:UBC 0 : ereport(ERROR,
900 : : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
901 : : errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
902 : :
1938 heikki.linnakangas@i 903 [ + - + + ]:CBC 60 : if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
1348 rhaas@postgresql.org 904 : 30 : cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
1938 heikki.linnakangas@i 905 [ + - ]: 9 : ereport(ERROR,
906 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
907 : : errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
908 : :
909 : 21 : ti_options |= TABLE_INSERT_FROZEN;
910 : : }
911 : :
912 : : /*
913 : : * We need a ResultRelInfo so we can use the regular executor's
914 : : * index-entry-making machinery. (There used to be a huge amount of code
915 : : * here that basically duplicated execUtils.c ...)
916 : : */
401 amitlan@postgresql.o 917 : 923 : ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos,
918 : : bms_make_singleton(1));
1938 heikki.linnakangas@i 919 : 923 : resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
920 : 923 : ExecInitResultRelation(estate, resultRelInfo, 1);
921 : :
922 : : /* Verify the named relation is a valid target for INSERT */
192 dean.a.rasheed@gmail 923 : 923 : CheckValidResultRel(resultRelInfo, CMD_INSERT, ONCONFLICT_NONE, NIL);
924 : :
1938 heikki.linnakangas@i 925 : 922 : ExecOpenIndices(resultRelInfo, false);
926 : :
927 : : /*
928 : : * Set up a ModifyTableState so we can let FDW(s) init themselves for
929 : : * foreign-table result relation(s).
930 : : */
931 : 922 : mtstate = makeNode(ModifyTableState);
932 : 922 : mtstate->ps.plan = NULL;
933 : 922 : mtstate->ps.state = estate;
934 : 922 : mtstate->operation = CMD_INSERT;
1810 tgl@sss.pgh.pa.us 935 : 922 : mtstate->mt_nrels = 1;
1938 heikki.linnakangas@i 936 : 922 : mtstate->resultRelInfo = resultRelInfo;
1861 937 : 922 : mtstate->rootResultRelInfo = resultRelInfo;
938 : :
1938 939 [ + + ]: 922 : if (resultRelInfo->ri_FdwRoutine != NULL &&
940 [ + - ]: 18 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
941 : 18 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
942 : : resultRelInfo);
943 : :
944 : : /*
945 : : * Also, if the named relation is a foreign table, determine if the FDW
946 : : * supports batch insert and determine the batch size (a FDW may support
947 : : * batching, but it may be disabled for the server/table).
948 : : *
949 : : * If the FDW does not support batching, we set the batch size to 1.
950 : : */
1249 efujita@postgresql.o 951 [ + + ]: 922 : if (resultRelInfo->ri_FdwRoutine != NULL &&
952 [ + - ]: 18 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
953 [ + - ]: 18 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
954 : 18 : resultRelInfo->ri_BatchSize =
955 : 18 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
956 : : else
957 : 904 : resultRelInfo->ri_BatchSize = 1;
958 : :
959 [ - + ]: 922 : Assert(resultRelInfo->ri_BatchSize >= 1);
960 : :
961 : : /* Prepare to catch AFTER triggers. */
1938 heikki.linnakangas@i 962 : 922 : AfterTriggerBeginQuery();
963 : :
964 : : /*
965 : : * If there are any triggers with transition tables on the named relation,
966 : : * we need to be prepared to capture transition tuples.
967 : : *
968 : : * Because partition tuple routing would like to know about whether
969 : : * transition capture is active, we also set it in mtstate, which is
970 : : * passed to ExecFindPartition() below.
971 : : */
972 : 922 : cstate->transition_capture = mtstate->mt_transition_capture =
973 : 922 : MakeTransitionCaptureState(cstate->rel->trigdesc,
974 : 922 : RelationGetRelid(cstate->rel),
975 : : CMD_INSERT);
976 : :
977 : : /*
978 : : * If the named relation is a partitioned table, initialize state for
979 : : * CopyFrom tuple routing.
980 : : */
981 [ + + ]: 922 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1804 tgl@sss.pgh.pa.us 982 : 54 : proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
983 : :
1938 heikki.linnakangas@i 984 [ + + ]: 922 : if (cstate->whereClause)
985 : 9 : cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
986 : : &mtstate->ps);
987 : :
988 : : /*
989 : : * It's generally more efficient to prepare a bunch of tuples for
990 : : * insertion, and insert them in one
991 : : * table_multi_insert()/ExecForeignBatchInsert() call, than call
992 : : * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
993 : : * However, there are a number of reasons why we might not be able to do
994 : : * this. These are explained below.
995 : : */
996 [ + + ]: 922 : if (resultRelInfo->ri_TrigDesc != NULL &&
997 [ + + ]: 94 : (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
998 [ + + ]: 48 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
999 : : {
1000 : : /*
1001 : : * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
1002 : : * triggers on the table. Such triggers might query the table we're
1003 : : * inserting into and act differently if the tuples that have already
1004 : : * been processed and prepared for insertion are not there.
1005 : : */
1006 : 52 : insertMethod = CIM_SINGLE;
1007 : : }
1249 efujita@postgresql.o 1008 [ + + ]: 870 : else if (resultRelInfo->ri_FdwRoutine != NULL &&
1009 [ + + ]: 14 : resultRelInfo->ri_BatchSize == 1)
1010 : : {
1011 : : /*
1012 : : * Can't support multi-inserts to a foreign table if the FDW does not
1013 : : * support batching, or it's disabled for the server or foreign table.
1014 : : */
1015 : 9 : insertMethod = CIM_SINGLE;
1016 : : }
1938 heikki.linnakangas@i 1017 [ + + + + ]: 861 : else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
1018 [ + + ]: 16 : resultRelInfo->ri_TrigDesc->trig_insert_new_table)
1019 : : {
1020 : : /*
1021 : : * For partitioned tables we can't support multi-inserts when there
1022 : : * are any statement level insert triggers. It might be possible to
1023 : : * allow partitioned tables with such triggers in the future, but for
1024 : : * now, CopyMultiInsertInfoFlush expects that any after row insert and
1025 : : * statement level insert triggers are on the same relation.
1026 : : */
1027 : 11 : insertMethod = CIM_SINGLE;
1028 : : }
1249 efujita@postgresql.o 1029 [ + + ]: 850 : else if (cstate->volatile_defexprs)
1030 : : {
1031 : : /*
1032 : : * Can't support multi-inserts if there are any volatile default
1033 : : * expressions in the table. Similarly to the trigger case above,
1034 : : * such expressions may query the table we're inserting into.
1035 : : *
1036 : : * Note: It does not matter if any partitions have any volatile
1037 : : * default expressions as we use the defaults from the target of the
1038 : : * COPY command.
1039 : : */
1938 heikki.linnakangas@i 1040 : 3 : insertMethod = CIM_SINGLE;
1041 : : }
1042 [ - + ]: 847 : else if (contain_volatile_functions(cstate->whereClause))
1043 : : {
1044 : : /*
1045 : : * Can't support multi-inserts if there are any volatile function
1046 : : * expressions in WHERE clause. Similarly to the trigger case above,
1047 : : * such expressions may query the table we're inserting into.
1048 : : *
1049 : : * Note: the whereClause was already preprocessed in DoCopy(), so it's
1050 : : * okay to use contain_volatile_functions() directly.
1051 : : */
1938 heikki.linnakangas@i 1052 :UBC 0 : insertMethod = CIM_SINGLE;
1053 : : }
1054 : : else
1055 : : {
1056 : : /*
1057 : : * For partitioned tables, we may still be able to perform bulk
1058 : : * inserts. However, the possibility of this depends on which types
1059 : : * of triggers exist on the partition. We must disable bulk inserts
1060 : : * if the partition is a foreign table that can't use batching or it
1061 : : * has any before row insert or insert instead triggers (same as we
1062 : : * checked above for the parent table). Since the partition's
1063 : : * resultRelInfos are initialized only when we actually need to insert
1064 : : * the first tuple into them, we must have the intermediate insert
1065 : : * method of CIM_MULTI_CONDITIONAL to flag that we must later
1066 : : * determine if we can use bulk-inserts for the partition being
1067 : : * inserted into.
1068 : : */
1938 heikki.linnakangas@i 1069 [ + + ]:CBC 847 : if (proute)
1070 : 40 : insertMethod = CIM_MULTI_CONDITIONAL;
1071 : : else
1072 : 807 : insertMethod = CIM_MULTI;
1073 : :
1074 : 847 : CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
1075 : : estate, mycid, ti_options);
1076 : : }
1077 : :
1078 : : /*
1079 : : * If not using batch mode (which allocates slots as needed) set up a
1080 : : * tuple slot too. When inserting into a partitioned table, we also need
1081 : : * one, even if we might batch insert, to read the tuple in the root
1082 : : * partition's form.
1083 : : */
1084 [ + + + + ]: 922 : if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
1085 : : {
1086 : 115 : singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
1087 : : &estate->es_tupleTable);
1088 : 115 : bistate = GetBulkInsertState();
1089 : : }
1090 : :
1091 [ + + ]: 1016 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1092 [ + + ]: 94 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1093 : :
1094 [ + + ]: 1016 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1095 [ + + ]: 94 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1096 : :
1097 : : /*
1098 : : * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
1099 : : * should do this for COPY, since it's not really an "INSERT" statement as
1100 : : * such. However, executing these triggers maintains consistency with the
1101 : : * EACH ROW triggers that we already fire on COPY.
1102 : : */
1103 : 922 : ExecBSInsertTriggers(estate, resultRelInfo);
1104 : :
1105 [ + + ]: 922 : econtext = GetPerTupleExprContext(estate);
1106 : :
1107 : : /* Set up callback to identify error line number */
1108 : 922 : errcallback.callback = CopyFromErrorCallback;
472 peter@eisentraut.org 1109 : 922 : errcallback.arg = cstate;
1938 heikki.linnakangas@i 1110 : 922 : errcallback.previous = error_context_stack;
1111 : 922 : error_context_stack = &errcallback;
1112 : :
1113 : : for (;;)
1114 : 733721 : {
1115 : : TupleTableSlot *myslot;
1116 : : bool skip_tuple;
1117 : :
1118 [ - + ]: 734643 : CHECK_FOR_INTERRUPTS();
1119 : :
1120 : : /*
1121 : : * Reset the per-tuple exprcontext. We do this after every tuple, to
1122 : : * clean-up after expression evaluations etc.
1123 : : */
1124 [ + - ]: 734643 : ResetPerTupleExprContext(estate);
1125 : :
1126 : : /* select slot to (initially) load row into */
1127 [ + + + + ]: 734643 : if (insertMethod == CIM_SINGLE || proute)
1128 : : {
1129 : 137441 : myslot = singleslot;
1130 [ - + ]: 137441 : Assert(myslot != NULL);
1131 : : }
1132 : : else
1133 : : {
1134 [ - + ]: 597202 : Assert(resultRelInfo == target_resultRelInfo);
1135 [ - + ]: 597202 : Assert(insertMethod == CIM_MULTI);
1136 : :
1137 : 597202 : myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1138 : : resultRelInfo);
1139 : : }
1140 : :
1141 : : /*
1142 : : * Switch to per-tuple context before calling NextCopyFrom, which does
1143 : : * evaluate default expressions etc. and requires per-tuple context.
1144 : : */
1145 [ + - ]: 734643 : MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1146 : :
1147 : 734643 : ExecClearTuple(myslot);
1148 : :
1149 : : /* Directly store the values/nulls array in the slot */
1150 [ + + ]: 734643 : if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
1151 : 807 : break;
1152 : :
528 fujii@postgresql.org 1153 [ + + ]: 733743 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
789 akorotkov@postgresql 1154 [ + + ]: 81 : cstate->escontext->error_occurred)
1155 : : {
1156 : : /*
1157 : : * Soft error occurred, skip this tuple and just make
1158 : : * ErrorSaveContext ready for the next NextCopyFrom. Since we
1159 : : * don't set details_wanted and error_data is not to be filled,
1160 : : * just resetting error_occurred is enough.
1161 : : */
528 fujii@postgresql.org 1162 : 54 : cstate->escontext->error_occurred = false;
1163 : :
1164 : : /* Report that this tuple was skipped by the ON_ERROR clause */
780 msawada@postgresql.o 1165 : 54 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
528 fujii@postgresql.org 1166 : 54 : cstate->num_errors);
1167 : :
484 1168 [ + + ]: 54 : if (cstate->opts.reject_limit > 0 &&
523 1169 [ + + ]: 24 : cstate->num_errors > cstate->opts.reject_limit)
1170 [ + - ]: 3 : ereport(ERROR,
1171 : : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1172 : : errmsg("skipped more than REJECT_LIMIT (%" PRId64 ") rows due to data type incompatibility",
1173 : : cstate->opts.reject_limit)));
1174 : :
1175 : : /* Repeat NextCopyFrom() until no soft error occurs */
789 akorotkov@postgresql 1176 : 51 : continue;
1177 : : }
1178 : :
1938 heikki.linnakangas@i 1179 : 733689 : ExecStoreVirtualTuple(myslot);
1180 : :
1181 : : /*
1182 : : * Constraints and where clause might reference the tableoid column,
1183 : : * so (re-)initialize tts_tableOid before evaluating them.
1184 : : */
1185 : 733689 : myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
1186 : :
1187 : : /* Triggers and stuff need to be invoked in query context. */
1188 : 733689 : MemoryContextSwitchTo(oldcontext);
1189 : :
1190 [ + + ]: 733689 : if (cstate->whereClause)
1191 : : {
1192 : 33 : econtext->ecxt_scantuple = myslot;
1193 : : /* Skip items that don't match COPY's WHERE clause */
1194 [ + + ]: 33 : if (!ExecQual(cstate->qualexpr, econtext))
1195 : : {
1196 : : /*
1197 : : * Report that this tuple was filtered out by the WHERE
1198 : : * clause.
1199 : : */
1832 michael@paquier.xyz 1200 : 18 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
1201 : : ++excluded);
1938 heikki.linnakangas@i 1202 : 18 : continue;
1203 : : }
1204 : : }
1205 : :
1206 : : /* Determine the partition to insert the tuple into */
1207 [ + + ]: 733671 : if (proute)
1208 : : {
1209 : : TupleConversionMap *map;
1210 : :
1211 : : /*
1212 : : * Attempt to find a partition suitable for this tuple.
1213 : : * ExecFindPartition() will raise an error if none can be found or
1214 : : * if the found partition is not suitable for INSERTs.
1215 : : */
1216 : 137197 : resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
1217 : : proute, myslot, estate);
1218 : :
1219 [ + + ]: 137196 : if (prevResultRelInfo != resultRelInfo)
1220 : : {
1221 : : /* Determine which triggers exist on this partition */
1222 [ + + ]: 80785 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1223 [ + + ]: 27 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1224 : :
1225 [ + + ]: 80785 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1226 [ - + ]: 27 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1227 : :
1228 : : /*
1229 : : * Disable multi-inserts when the partition has BEFORE/INSTEAD
1230 : : * OF triggers, or if the partition is a foreign table that
1231 : : * can't use batching.
1232 : : */
1233 : 131487 : leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
1234 [ + + ]: 50729 : !has_before_insert_row_trig &&
1235 [ + + + - ]: 182204 : !has_instead_insert_row_trig &&
1249 efujita@postgresql.o 1236 [ + + ]: 50717 : (resultRelInfo->ri_FdwRoutine == NULL ||
1237 [ + + ]: 6 : resultRelInfo->ri_BatchSize > 1);
1238 : :
1239 : : /* Set the multi-insert buffer to use for this partition. */
1938 heikki.linnakangas@i 1240 [ + + ]: 80758 : if (leafpart_use_multi_insert)
1241 : : {
1242 [ + + ]: 50715 : if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
1243 : 43 : CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
1244 : : resultRelInfo);
1245 : : }
1246 [ + + ]: 30043 : else if (insertMethod == CIM_MULTI_CONDITIONAL &&
1247 [ - + ]: 14 : !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1248 : : {
1249 : : /*
1250 : : * Flush pending inserts if this partition can't use
1251 : : * batching, so rows are visible to triggers etc.
1252 : : */
1249 efujita@postgresql.o 1253 :UBC 0 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1254 : : resultRelInfo,
1255 : : &processed);
1256 : : }
1257 : :
1938 heikki.linnakangas@i 1258 [ + - ]:CBC 80758 : if (bistate != NULL)
1259 : 80758 : ReleaseBulkInsertStatePin(bistate);
1260 : 80758 : prevResultRelInfo = resultRelInfo;
1261 : : }
1262 : :
1263 : : /*
1264 : : * If we're capturing transition tuples, we might need to convert
1265 : : * from the partition rowtype to root rowtype. But if there are no
1266 : : * BEFORE triggers on the partition that could change the tuple,
1267 : : * we can just remember the original unconverted tuple to avoid a
1268 : : * needless round trip conversion.
1269 : : */
1270 [ + + ]: 137196 : if (cstate->transition_capture != NULL)
1271 : 29 : cstate->transition_capture->tcs_original_insert_tuple =
1272 [ + + ]: 29 : !has_before_insert_row_trig ? myslot : NULL;
1273 : :
1274 : : /*
1275 : : * We might need to convert from the root rowtype to the partition
1276 : : * rowtype.
1277 : : */
1199 alvherre@alvh.no-ip. 1278 : 137196 : map = ExecGetRootToChildMap(resultRelInfo, estate);
1938 heikki.linnakangas@i 1279 [ + + + + ]: 137196 : if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
1280 : : {
1281 : : /* non batch insert */
1282 [ + + ]: 30070 : if (map != NULL)
1283 : : {
1284 : : TupleTableSlot *new_slot;
1285 : :
1286 : 55 : new_slot = resultRelInfo->ri_PartitionTupleSlot;
1287 : 55 : myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
1288 : : }
1289 : : }
1290 : : else
1291 : : {
1292 : : /*
1293 : : * Prepare to queue up tuple for later batch insert into
1294 : : * current partition.
1295 : : */
1296 : : TupleTableSlot *batchslot;
1297 : :
1298 : : /* no other path available for partitioned table */
1299 [ - + ]: 107126 : Assert(insertMethod == CIM_MULTI_CONDITIONAL);
1300 : :
1301 : 107126 : batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1302 : : resultRelInfo);
1303 : :
1304 [ + + ]: 107126 : if (map != NULL)
1305 : 6100 : myslot = execute_attr_map_slot(map->attrMap, myslot,
1306 : : batchslot);
1307 : : else
1308 : : {
1309 : : /*
1310 : : * This looks more expensive than it is (Believe me, I
1311 : : * optimized it away. Twice.). The input is in virtual
1312 : : * form, and we'll materialize the slot below - for most
1313 : : * slot types the copy performs the work materialization
1314 : : * would later require anyway.
1315 : : */
1316 : 101026 : ExecCopySlot(batchslot, myslot);
1317 : 101026 : myslot = batchslot;
1318 : : }
1319 : : }
1320 : :
1321 : : /* ensure that triggers etc see the right relation */
1322 : 137196 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1323 : : }
1324 : :
1325 : 733670 : skip_tuple = false;
1326 : :
1327 : : /* BEFORE ROW INSERT Triggers */
1328 [ + + ]: 733670 : if (has_before_insert_row_trig)
1329 : : {
1330 [ + + ]: 137 : if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
1331 : 8 : skip_tuple = true; /* "do nothing" */
1332 : : }
1333 : :
1334 [ + + ]: 733670 : if (!skip_tuple)
1335 : : {
1336 : : /*
1337 : : * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1338 : : * tuple. Otherwise, proceed with inserting the tuple into the
1339 : : * table or foreign table.
1340 : : */
1341 [ + + ]: 733662 : if (has_instead_insert_row_trig)
1342 : : {
1343 : 6 : ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1344 : : }
1345 : : else
1346 : : {
1347 : : /* Compute stored generated columns */
1348 [ + + ]: 733656 : if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1349 [ + + ]: 330970 : resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
1350 : 17 : ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1351 : : CMD_INSERT);
1352 : :
1353 : : /*
1354 : : * If the target is a plain table, check the constraints of
1355 : : * the tuple.
1356 : : */
1357 [ + + ]: 733656 : if (resultRelInfo->ri_FdwRoutine == NULL &&
1358 [ + + ]: 733610 : resultRelInfo->ri_RelationDesc->rd_att->constr)
1359 : 330952 : ExecConstraints(resultRelInfo, myslot, estate);
1360 : :
1361 : : /*
1362 : : * Also check the tuple against the partition constraint, if
1363 : : * there is one; except that if we got here via tuple-routing,
1364 : : * we don't need to if there's no BR trigger defined on the
1365 : : * partition.
1366 : : */
1367 [ + + + + ]: 733641 : if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1368 [ + + ]: 137190 : (proute == NULL || has_before_insert_row_trig))
1369 : 1154 : ExecPartitionCheck(resultRelInfo, myslot, estate, true);
1370 : :
1371 : : /* Store the slot in the multi-insert buffer, when enabled. */
1372 [ + + + + ]: 733641 : if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1373 : : {
1374 : : /*
1375 : : * The slot previously might point into the per-tuple
1376 : : * context. For batching it needs to be longer lived.
1377 : : */
1378 : 703467 : ExecMaterializeSlot(myslot);
1379 : :
1380 : : /* Add this tuple to the tuple buffer */
1381 : 703467 : CopyMultiInsertInfoStore(&multiInsertInfo,
1382 : : resultRelInfo, myslot,
1383 : : cstate->line_buf.len,
1384 : : cstate->cur_lineno);
1385 : :
1386 : : /*
1387 : : * If enough inserts have queued up, then flush all
1388 : : * buffers out to their tables.
1389 : : */
1390 [ + + ]: 703467 : if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
1249 efujita@postgresql.o 1391 : 651 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1392 : : resultRelInfo,
1393 : : &processed);
1394 : :
1395 : : /*
1396 : : * We delay updating the row counter and progress of the
1397 : : * COPY command until after writing the tuples stored in
1398 : : * the buffer out to the table, as in single insert mode.
1399 : : * See CopyMultiInsertBufferFlush().
1400 : : */
1401 : 703467 : continue; /* next tuple please */
1402 : : }
1403 : : else
1404 : : {
1938 heikki.linnakangas@i 1405 : 30174 : List *recheckIndexes = NIL;
1406 : :
1407 : : /* OK, store the tuple */
1408 [ + + ]: 30174 : if (resultRelInfo->ri_FdwRoutine != NULL)
1409 : : {
1410 : 27 : myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1411 : : resultRelInfo,
1412 : : myslot,
1413 : : NULL);
1414 : :
1415 [ + + ]: 26 : if (myslot == NULL) /* "do nothing" */
1416 : 2 : continue; /* next tuple please */
1417 : :
1418 : : /*
1419 : : * AFTER ROW Triggers might reference the tableoid
1420 : : * column, so (re-)initialize tts_tableOid before
1421 : : * evaluating them.
1422 : : */
1423 : 24 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1424 : : }
1425 : : else
1426 : : {
1427 : : /* OK, store the tuple and create index entries for it */
1428 : 30147 : table_tuple_insert(resultRelInfo->ri_RelationDesc,
1429 : : myslot, mycid, ti_options, bistate);
1430 : :
703 akorotkov@postgresql 1431 [ - + ]: 30147 : if (resultRelInfo->ri_NumIndices > 0)
1938 heikki.linnakangas@i 1432 :UBC 0 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1433 : : estate, 0,
1434 : : myslot, NIL,
1435 : : NULL);
1436 : : }
1437 : :
1438 : : /* AFTER ROW INSERT Triggers */
1938 heikki.linnakangas@i 1439 :CBC 30171 : ExecARInsertTriggers(estate, resultRelInfo, myslot,
1440 : : recheckIndexes, cstate->transition_capture);
1441 : :
1442 : 30169 : list_free(recheckIndexes);
1443 : : }
1444 : : }
1445 : :
1446 : : /*
1447 : : * We count only tuples not suppressed by a BEFORE INSERT trigger
1448 : : * or FDW; this is the same definition used by nodeModifyTable.c
1449 : : * for counting tuples inserted by an INSERT command. Update
1450 : : * progress of the COPY command as well.
1451 : : */
1832 michael@paquier.xyz 1452 : 30175 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1453 : : ++processed);
1454 : : }
1455 : : }
1456 : :
1457 : : /* Flush any remaining buffered tuples */
1938 heikki.linnakangas@i 1458 [ + + ]: 807 : if (insertMethod != CIM_SINGLE)
1459 : : {
1460 [ + + ]: 747 : if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1249 efujita@postgresql.o 1461 : 586 : CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
1462 : : }
1463 : :
1464 : : /* Done, clean up */
1938 heikki.linnakangas@i 1465 : 797 : error_context_stack = errcallback.previous;
1466 : :
12 peter@eisentraut.org 1467 [ + + ]:GNC 797 : if (cstate->num_errors > 0 &&
528 fujii@postgresql.org 1468 [ + + ]:CBC 18 : cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
1469 : : {
12 peter@eisentraut.org 1470 [ + + ]:GNC 15 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
1471 [ + - ]: 12 : ereport(NOTICE,
1472 : : errmsg_plural("%" PRIu64 " row was skipped due to data type incompatibility",
1473 : : "%" PRIu64 " rows were skipped due to data type incompatibility",
1474 : : cstate->num_errors,
1475 : : cstate->num_errors));
1476 [ + - ]: 3 : else if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
1477 [ + - ]: 3 : ereport(NOTICE,
1478 : : errmsg_plural("in %" PRIu64 " row, columns were set to null due to data type incompatibility",
1479 : : "in %" PRIu64 " rows, columns were set to null due to data type incompatibility",
1480 : : cstate->num_errors,
1481 : : cstate->num_errors));
1482 : : }
1483 : :
1938 heikki.linnakangas@i 1484 [ + + ]:CBC 797 : if (bistate != NULL)
1485 : 99 : FreeBulkInsertState(bistate);
1486 : :
1487 : 797 : MemoryContextSwitchTo(oldcontext);
1488 : :
1489 : : /* Execute AFTER STATEMENT insertion triggers */
1490 : 797 : ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1491 : :
1492 : : /* Handle queued AFTER triggers */
1493 : 797 : AfterTriggerEndQuery(estate);
1494 : :
1495 : 797 : ExecResetTupleTable(estate->es_tupleTable, false);
1496 : :
1497 : : /* Allow the FDW to shut down */
1498 [ + + ]: 797 : if (target_resultRelInfo->ri_FdwRoutine != NULL &&
1499 [ + - ]: 16 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
1500 : 16 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
1501 : : target_resultRelInfo);
1502 : :
1503 : : /* Tear down the multi-insert buffer data */
1504 [ + + ]: 797 : if (insertMethod != CIM_SINGLE)
1505 : 737 : CopyMultiInsertInfoCleanup(&multiInsertInfo);
1506 : :
1507 : : /* Close all the partitioned tables, leaf partitions, and their indices */
1508 [ + + ]: 797 : if (proute)
1509 : 51 : ExecCleanupTupleRouting(mtstate, proute);
1510 : :
1511 : : /* Close the result relations, including any trigger target relations */
1512 : 797 : ExecCloseResultRelations(estate);
1513 : 797 : ExecCloseRangeTableRelations(estate);
1514 : :
1515 : 797 : FreeExecutorState(estate);
1516 : :
1517 : 797 : return processed;
1518 : : }
1519 : :
1520 : : /*
1521 : : * Setup to read tuples from a file for COPY FROM.
1522 : : *
1523 : : * 'rel': Used as a template for the tuples
1524 : : * 'whereClause': WHERE clause from the COPY FROM command
1525 : : * 'filename': Name of server-local file to read, NULL for STDIN
1526 : : * 'is_program': true if 'filename' is program to execute
1527 : : * 'data_source_cb': callback that provides the input data
1528 : : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
1529 : : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
1530 : : *
1531 : : * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
1532 : : */
1533 : : CopyFromState
1534 : 1124 : BeginCopyFrom(ParseState *pstate,
1535 : : Relation rel,
1536 : : Node *whereClause,
1537 : : const char *filename,
1538 : : bool is_program,
1539 : : copy_data_source_cb data_source_cb,
1540 : : List *attnamelist,
1541 : : List *options)
1542 : : {
1543 : : CopyFromState cstate;
1544 : 1124 : bool pipe = (filename == NULL);
1545 : : TupleDesc tupDesc;
1546 : : AttrNumber num_phys_attrs,
1547 : : num_defaults;
1548 : : FmgrInfo *in_functions;
1549 : : Oid *typioparams;
1550 : : int *defmap;
1551 : : ExprState **defexprs;
1552 : : MemoryContext oldcontext;
1553 : : bool volatile_defexprs;
1832 michael@paquier.xyz 1554 : 1124 : const int progress_cols[] = {
1555 : : PROGRESS_COPY_COMMAND,
1556 : : PROGRESS_COPY_TYPE,
1557 : : PROGRESS_COPY_BYTES_TOTAL
1558 : : };
1559 : 1124 : int64 progress_vals[] = {
1560 : : PROGRESS_COPY_COMMAND_FROM,
1561 : : 0,
1562 : : 0
1563 : : };
1564 : :
1565 : : /* Allocate workspace and zero all fields */
95 michael@paquier.xyz 1566 :GNC 1124 : cstate = palloc0_object(CopyFromStateData);
1567 : :
1568 : : /*
1569 : : * We allocate everything used by a cstate in a new memory context. This
1570 : : * avoids memory leaks during repeated use of COPY in a query.
1571 : : */
1938 heikki.linnakangas@i 1572 :CBC 1124 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1573 : : "COPY",
1574 : : ALLOCSET_DEFAULT_SIZES);
1575 : :
1576 : 1124 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1577 : :
1578 : : /* Extract options from the statement node tree */
1809 1579 : 1124 : ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
1580 : :
1581 : : /* Set the format routine */
376 msawada@postgresql.o 1582 : 991 : cstate->routine = CopyFromGetRoutine(&cstate->opts);
1583 : :
1584 : : /* Process the target relation */
1938 heikki.linnakangas@i 1585 : 991 : cstate->rel = rel;
1586 : :
1587 : 991 : tupDesc = RelationGetDescr(cstate->rel);
1588 : :
1589 : : /* process common options or initialization */
1590 : :
1591 : : /* Generate or convert list of attributes to process */
1592 : 991 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1593 : :
1594 : 991 : num_phys_attrs = tupDesc->natts;
1595 : :
1596 : : /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1597 : 991 : cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
897 andrew@dunslane.net 1598 [ + + ]: 991 : if (cstate->opts.force_notnull_all)
1599 [ + - - + : 6 : MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
- - - - -
- ]
1600 [ + + ]: 985 : else if (cstate->opts.force_notnull)
1601 : : {
1602 : : List *attnums;
1603 : : ListCell *cur;
1604 : :
1938 heikki.linnakangas@i 1605 : 13 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1606 : :
1607 [ + - + + : 26 : foreach(cur, attnums)
+ + ]
1608 : : {
1609 : 16 : int attnum = lfirst_int(cur);
1610 : 16 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1611 : :
1612 [ + + ]: 16 : if (!list_member_int(cstate->attnumlist, attnum))
1613 [ + - ]: 3 : ereport(ERROR,
1614 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1615 : : /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1616 : : errmsg("%s column \"%s\" not referenced by COPY",
1617 : : "FORCE_NOT_NULL", NameStr(attr->attname))));
1618 : 13 : cstate->opts.force_notnull_flags[attnum - 1] = true;
1619 : : }
1620 : : }
1621 : :
1622 : : /* Set up soft error handler for ON_ERROR */
786 akorotkov@postgresql 1623 [ + + ]: 988 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
1624 : : {
789 1625 : 50 : cstate->escontext = makeNode(ErrorSaveContext);
1626 : 50 : cstate->escontext->type = T_ErrorSaveContext;
1627 : 50 : cstate->escontext->error_occurred = false;
1628 : :
12 peter@eisentraut.org 1629 [ + + ]:GNC 50 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE ||
1630 [ + - ]: 18 : cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
789 akorotkov@postgresql 1631 :CBC 50 : cstate->escontext->details_wanted = false;
1632 : : }
1633 : : else
1634 : 938 : cstate->escontext = NULL;
1635 : :
12 peter@eisentraut.org 1636 [ + + ]:GNC 988 : if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
1637 : : {
1638 : 18 : int attr_count = list_length(cstate->attnumlist);
1639 : :
1640 : : /*
1641 : : * When data type conversion fails and ON_ERROR is SET_NULL, we need
1642 : : * ensure that the input column allow null values. ExecConstraints()
1643 : : * will cover most of the cases, but it does not verify domain
1644 : : * constraints. Therefore, for constrained domains, the null value
1645 : : * check must be performed during the initial string-to-datum
1646 : : * conversion (see CopyFromTextLikeOneRow()).
1647 : : */
1648 : 18 : cstate->domain_with_constraint = palloc0_array(bool, attr_count);
1649 : :
1650 [ + - + + : 90 : foreach_int(attno, cstate->attnumlist)
+ + ]
1651 : : {
1652 : 54 : int i = foreach_current_index(attno);
1653 : :
1654 : 54 : Form_pg_attribute att = TupleDescAttr(tupDesc, attno - 1);
1655 : :
3 andrew@dunslane.net 1656 : 54 : cstate->domain_with_constraint[i] = DomainHasConstraints(att->atttypid, NULL);
1657 : : }
1658 : : }
1659 : :
1660 : : /* Convert FORCE_NULL name list to per-column flags, check validity */
1938 heikki.linnakangas@i 1661 :CBC 988 : cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
897 andrew@dunslane.net 1662 [ + + ]: 988 : if (cstate->opts.force_null_all)
1663 [ + - - + : 6 : MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
- - - - -
- ]
1664 [ + + ]: 982 : else if (cstate->opts.force_null)
1665 : : {
1666 : : List *attnums;
1667 : : ListCell *cur;
1668 : :
1938 heikki.linnakangas@i 1669 : 13 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1670 : :
1671 [ + - + + : 26 : foreach(cur, attnums)
+ + ]
1672 : : {
1673 : 16 : int attnum = lfirst_int(cur);
1674 : 16 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1675 : :
1676 [ + + ]: 16 : if (!list_member_int(cstate->attnumlist, attnum))
1677 [ + - ]: 3 : ereport(ERROR,
1678 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1679 : : /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1680 : : errmsg("%s column \"%s\" not referenced by COPY",
1681 : : "FORCE_NULL", NameStr(attr->attname))));
1682 : 13 : cstate->opts.force_null_flags[attnum - 1] = true;
1683 : : }
1684 : : }
1685 : :
1686 : : /* Convert convert_selectively name list to per-column flags */
1687 [ + + ]: 985 : if (cstate->opts.convert_selectively)
1688 : : {
1689 : : List *attnums;
1690 : : ListCell *cur;
1691 : :
1692 : 3 : cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1693 : :
1694 : 3 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
1695 : :
1696 [ + - + + : 7 : foreach(cur, attnums)
+ + ]
1697 : : {
1698 : 4 : int attnum = lfirst_int(cur);
1699 : 4 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1700 : :
1701 [ - + ]: 4 : if (!list_member_int(cstate->attnumlist, attnum))
1938 heikki.linnakangas@i 1702 [ # # ]:UBC 0 : ereport(ERROR,
1703 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1704 : : errmsg_internal("selected column \"%s\" not referenced by COPY",
1705 : : NameStr(attr->attname))));
1938 heikki.linnakangas@i 1706 :CBC 4 : cstate->convert_select_flags[attnum - 1] = true;
1707 : : }
1708 : : }
1709 : :
1710 : : /* Use client encoding when ENCODING option is not specified. */
1711 [ + + ]: 985 : if (cstate->opts.file_encoding < 0)
1712 : 970 : cstate->file_encoding = pg_get_client_encoding();
1713 : : else
1714 : 15 : cstate->file_encoding = cstate->opts.file_encoding;
1715 : :
1716 : : /*
1717 : : * Look up encoding conversion function.
1718 : : */
1809 1719 [ + + ]: 985 : if (cstate->file_encoding == GetDatabaseEncoding() ||
1720 [ + + - + ]: 39 : cstate->file_encoding == PG_SQL_ASCII ||
1721 : 18 : GetDatabaseEncoding() == PG_SQL_ASCII)
1722 : : {
1723 : 967 : cstate->need_transcoding = false;
1724 : : }
1725 : : else
1726 : : {
1727 : 18 : cstate->need_transcoding = true;
1728 : 18 : cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
1729 : : GetDatabaseEncoding());
896 tgl@sss.pgh.pa.us 1730 [ - + ]: 18 : if (!OidIsValid(cstate->conversion_proc))
896 tgl@sss.pgh.pa.us 1731 [ # # ]:UBC 0 : ereport(ERROR,
1732 : : (errcode(ERRCODE_UNDEFINED_FUNCTION),
1733 : : errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
1734 : : pg_encoding_to_char(cstate->file_encoding),
1735 : : pg_encoding_to_char(GetDatabaseEncoding()))));
1736 : : }
1737 : :
1938 heikki.linnakangas@i 1738 :CBC 985 : cstate->copy_src = COPY_FILE; /* default */
1739 : :
1740 : 985 : cstate->whereClause = whereClause;
1741 : :
1742 : : /* Initialize state variables */
1743 : 985 : cstate->eol_type = EOL_UNKNOWN;
1744 : 985 : cstate->cur_relname = RelationGetRelationName(cstate->rel);
1745 : 985 : cstate->cur_lineno = 0;
1746 : 985 : cstate->cur_attname = NULL;
1747 : 985 : cstate->cur_attval = NULL;
1249 efujita@postgresql.o 1748 : 985 : cstate->relname_only = false;
2 nathan@postgresql.or 1749 :GNC 985 : cstate->simd_enabled = true;
1750 : :
1751 : : /*
1752 : : * Allocate buffers for the input pipeline.
1753 : : *
1754 : : * attribute_buf and raw_buf are used in both text and binary modes, but
1755 : : * input_buf and line_buf only in text mode.
1756 : : */
1809 heikki.linnakangas@i 1757 :CBC 985 : cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
1938 1758 : 985 : cstate->raw_buf_index = cstate->raw_buf_len = 0;
1809 1759 : 985 : cstate->raw_reached_eof = false;
1760 : :
1761 : 985 : initStringInfo(&cstate->attribute_buf);
1762 : :
1763 : : /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
1938 1764 [ + + ]: 985 : if (pstate)
1765 : : {
1766 : 948 : cstate->range_table = pstate->p_rtable;
1195 alvherre@alvh.no-ip. 1767 : 948 : cstate->rteperminfos = pstate->p_rteperminfos;
1768 : : }
1769 : :
1938 heikki.linnakangas@i 1770 : 985 : num_defaults = 0;
1771 : 985 : volatile_defexprs = false;
1772 : :
1773 : : /*
1774 : : * Pick up the required catalog information for each attribute in the
1775 : : * relation, including the input function, the element type (to pass to
1776 : : * the input function), and info about defaults and constraints. (Which
1777 : : * input function we use depends on text/binary format choice.)
1778 : : */
1779 : 985 : in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1780 : 985 : typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1781 : 985 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1782 : 985 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1783 : :
1299 drowley@postgresql.o 1784 [ + + ]: 3893 : for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
1785 : : {
1938 heikki.linnakangas@i 1786 : 2915 : Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1787 : :
1788 : : /* We don't need info for dropped attributes */
1789 [ + + ]: 2915 : if (att->attisdropped)
1790 : 62 : continue;
1791 : :
1792 : : /* Fetch the input function and typioparam info */
380 msawada@postgresql.o 1793 : 2853 : cstate->routine->CopyFromInFunc(cstate, att->atttypid,
1794 : 2853 : &in_functions[attnum - 1],
1795 : 2853 : &typioparams[attnum - 1]);
1796 : :
1797 : : /* Get default info if available */
1098 andrew@dunslane.net 1798 : 2852 : defexprs[attnum - 1] = NULL;
1799 : :
1800 : : /*
1801 : : * We only need the default values for columns that do not appear in
1802 : : * the column list, unless the DEFAULT option was given. We never need
1803 : : * default values for generated columns.
1804 : : */
896 1805 [ + + ]: 2852 : if ((cstate->opts.default_print != NULL ||
1806 [ + + ]: 2789 : !list_member_int(cstate->attnumlist, attnum)) &&
1807 [ + + ]: 341 : !att->attgenerated)
1808 : : {
1938 heikki.linnakangas@i 1809 : 323 : Expr *defexpr = (Expr *) build_column_default(cstate->rel,
1810 : : attnum);
1811 : :
1812 [ + + ]: 323 : if (defexpr != NULL)
1813 : : {
1814 : : /* Run the expression through planner */
1815 : 132 : defexpr = expression_planner(defexpr);
1816 : :
1817 : : /* Initialize executable expression in copycontext */
1098 andrew@dunslane.net 1818 : 126 : defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
1819 : :
1820 : : /* if NOT copied from input */
1821 : : /* use default value if one exists */
1822 [ + + ]: 126 : if (!list_member_int(cstate->attnumlist, attnum))
1823 : : {
1824 : 86 : defmap[num_defaults] = attnum - 1;
1825 : 86 : num_defaults++;
1826 : : }
1827 : :
1828 : : /*
1829 : : * If a default expression looks at the table being loaded,
1830 : : * then it could give the wrong answer when using
1831 : : * multi-insert. Since database access can be dynamic this is
1832 : : * hard to test for exactly, so we use the much wider test of
1833 : : * whether the default expression is volatile. We allow for
1834 : : * the special case of when the default expression is the
1835 : : * nextval() of a sequence which in this specific case is
1836 : : * known to be safe for use with the multi-insert
1837 : : * optimization. Hence we use this special case function
1838 : : * checker rather than the standard check for
1839 : : * contain_volatile_functions(). Note also that we already
1840 : : * ran the expression through expression_planner().
1841 : : */
1938 heikki.linnakangas@i 1842 [ + - ]: 126 : if (!volatile_defexprs)
1843 : 126 : volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1844 : : }
1845 : : }
1846 : : }
1847 : :
962 drowley@postgresql.o 1848 : 978 : cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
1849 : :
1850 : : /* initialize progress */
1894 tomas.vondra@postgre 1851 : 978 : pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
1852 [ + - ]: 978 : cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1853 : 978 : cstate->bytes_processed = 0;
1854 : :
1855 : : /* We keep those variables in cstate. */
1938 heikki.linnakangas@i 1856 : 978 : cstate->in_functions = in_functions;
1857 : 978 : cstate->typioparams = typioparams;
1858 : 978 : cstate->defmap = defmap;
1859 : 978 : cstate->defexprs = defexprs;
1860 : 978 : cstate->volatile_defexprs = volatile_defexprs;
1861 : 978 : cstate->num_defaults = num_defaults;
1862 : 978 : cstate->is_program = is_program;
1863 : :
1864 [ + + ]: 978 : if (data_source_cb)
1865 : : {
1832 michael@paquier.xyz 1866 : 203 : progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
1938 heikki.linnakangas@i 1867 : 203 : cstate->copy_src = COPY_CALLBACK;
1868 : 203 : cstate->data_source_cb = data_source_cb;
1869 : : }
1870 [ + + ]: 775 : else if (pipe)
1871 : : {
1832 michael@paquier.xyz 1872 : 542 : progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
1938 heikki.linnakangas@i 1873 [ - + ]: 542 : Assert(!is_program); /* the grammar does not allow this */
1874 [ + - ]: 542 : if (whereToSendOutput == DestRemote)
1875 : 542 : ReceiveCopyBegin(cstate);
1876 : : else
1938 heikki.linnakangas@i 1877 :UBC 0 : cstate->copy_file = stdin;
1878 : : }
1879 : : else
1880 : : {
1938 heikki.linnakangas@i 1881 :CBC 233 : cstate->filename = pstrdup(filename);
1882 : :
1883 [ - + ]: 233 : if (cstate->is_program)
1884 : : {
1832 michael@paquier.xyz 1885 :UBC 0 : progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1938 heikki.linnakangas@i 1886 : 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1887 [ # # ]: 0 : if (cstate->copy_file == NULL)
1888 [ # # ]: 0 : ereport(ERROR,
1889 : : (errcode_for_file_access(),
1890 : : errmsg("could not execute command \"%s\": %m",
1891 : : cstate->filename)));
1892 : : }
1893 : : else
1894 : : {
1895 : : struct stat st;
1896 : :
1832 michael@paquier.xyz 1897 :CBC 233 : progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
1938 heikki.linnakangas@i 1898 : 233 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1899 [ - + ]: 233 : if (cstate->copy_file == NULL)
1900 : : {
1901 : : /* copy errno because ereport subfunctions might change it */
1938 heikki.linnakangas@i 1902 :UBC 0 : int save_errno = errno;
1903 : :
1904 [ # # # # : 0 : ereport(ERROR,
# # ]
1905 : : (errcode_for_file_access(),
1906 : : errmsg("could not open file \"%s\" for reading: %m",
1907 : : cstate->filename),
1908 : : (save_errno == ENOENT || save_errno == EACCES) ?
1909 : : errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1910 : : "You may want a client-side facility such as psql's \\copy.") : 0));
1911 : : }
1912 : :
1938 heikki.linnakangas@i 1913 [ - + ]:CBC 233 : if (fstat(fileno(cstate->copy_file), &st))
1938 heikki.linnakangas@i 1914 [ # # ]:UBC 0 : ereport(ERROR,
1915 : : (errcode_for_file_access(),
1916 : : errmsg("could not stat file \"%s\": %m",
1917 : : cstate->filename)));
1918 : :
1938 heikki.linnakangas@i 1919 [ - + ]:CBC 233 : if (S_ISDIR(st.st_mode))
1938 heikki.linnakangas@i 1920 [ # # ]:UBC 0 : ereport(ERROR,
1921 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1922 : : errmsg("\"%s\" is a directory", cstate->filename)));
1923 : :
1832 michael@paquier.xyz 1924 :CBC 233 : progress_vals[2] = st.st_size;
1925 : : }
1926 : : }
1927 : :
1928 : 978 : pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
1929 : :
380 msawada@postgresql.o 1930 : 978 : cstate->routine->CopyFromStart(cstate, tupDesc);
1931 : :
1938 heikki.linnakangas@i 1932 : 978 : MemoryContextSwitchTo(oldcontext);
1933 : :
1934 : 978 : return cstate;
1935 : : }
1936 : :
1937 : : /*
1938 : : * Clean up storage and release resources for COPY FROM.
1939 : : */
1940 : : void
1941 : 638 : EndCopyFrom(CopyFromState cstate)
1942 : : {
1943 : : /* Invoke the end callback */
380 msawada@postgresql.o 1944 : 638 : cstate->routine->CopyFromEnd(cstate);
1945 : :
1946 : : /* No COPY FROM related resources except memory. */
1938 heikki.linnakangas@i 1947 [ - + ]: 638 : if (cstate->is_program)
1948 : : {
1938 heikki.linnakangas@i 1949 :UBC 0 : ClosePipeFromProgram(cstate);
1950 : : }
1951 : : else
1952 : : {
1938 heikki.linnakangas@i 1953 [ + + - + ]:CBC 638 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1938 heikki.linnakangas@i 1954 [ # # ]:UBC 0 : ereport(ERROR,
1955 : : (errcode_for_file_access(),
1956 : : errmsg("could not close file \"%s\": %m",
1957 : : cstate->filename)));
1958 : : }
1959 : :
1894 tomas.vondra@postgre 1960 :CBC 638 : pgstat_progress_end_command();
1961 : :
1938 heikki.linnakangas@i 1962 : 638 : MemoryContextDelete(cstate->copycontext);
1963 : 638 : pfree(cstate);
1964 : 638 : }
1965 : :
1966 : : /*
1967 : : * Closes the pipe from an external program, checking the pclose() return code.
1968 : : */
1969 : : static void
1938 heikki.linnakangas@i 1970 :UBC 0 : ClosePipeFromProgram(CopyFromState cstate)
1971 : : {
1972 : : int pclose_rc;
1973 : :
1974 [ # # ]: 0 : Assert(cstate->is_program);
1975 : :
1976 : 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
1977 [ # # ]: 0 : if (pclose_rc == -1)
1978 [ # # ]: 0 : ereport(ERROR,
1979 : : (errcode_for_file_access(),
1980 : : errmsg("could not close pipe to external command: %m")));
1981 [ # # ]: 0 : else if (pclose_rc != 0)
1982 : : {
1983 : : /*
1984 : : * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1985 : : * expectable for the called program to fail with SIGPIPE, and we
1986 : : * should not report that as an error. Otherwise, SIGPIPE indicates a
1987 : : * problem.
1988 : : */
1809 1989 [ # # # # ]: 0 : if (!cstate->raw_reached_eof &&
1938 1990 : 0 : wait_result_is_signal(pclose_rc, SIGPIPE))
1991 : 0 : return;
1992 : :
1993 [ # # ]: 0 : ereport(ERROR,
1994 : : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1995 : : errmsg("program \"%s\" failed",
1996 : : cstate->filename),
1997 : : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1998 : : }
1999 : : }
|