Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * inv_api.c
4 : : * routines for manipulating inversion fs large objects. This file
5 : : * contains the user-level large object application interface routines.
6 : : *
7 : : *
8 : : * Note: we access pg_largeobject.data using its C struct declaration.
9 : : * This is safe because it immediately follows pageno which is an int4 field,
10 : : * and therefore the data field will always be 4-byte aligned, even if it
11 : : * is in the short 1-byte-header format. We have to detoast it since it's
12 : : * quite likely to be in compressed or short format. We also need to check
13 : : * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14 : : *
15 : : * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16 : : * does most of the backend code. We expect that CurrentMemoryContext will
17 : : * be a short-lived context. Data that must persist across function calls
18 : : * is kept either in CacheMemoryContext (the Relation structs) or in the
19 : : * memory context given to inv_open (for LargeObjectDesc structs).
20 : : *
21 : : *
22 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
23 : : * Portions Copyright (c) 1994, Regents of the University of California
24 : : *
25 : : *
26 : : * IDENTIFICATION
27 : : * src/backend/storage/large_object/inv_api.c
28 : : *
29 : : *-------------------------------------------------------------------------
30 : : */
31 : : #include "postgres.h"
32 : :
33 : : #include <limits.h>
34 : :
35 : : #include "access/detoast.h"
36 : : #include "access/genam.h"
37 : : #include "access/htup_details.h"
38 : : #include "access/table.h"
39 : : #include "access/xact.h"
40 : : #include "catalog/dependency.h"
41 : : #include "catalog/indexing.h"
42 : : #include "catalog/objectaccess.h"
43 : : #include "catalog/pg_largeobject.h"
44 : : #include "libpq/libpq-fs.h"
45 : : #include "miscadmin.h"
46 : : #include "storage/large_object.h"
47 : : #include "utils/acl.h"
48 : : #include "utils/fmgroids.h"
49 : : #include "utils/rel.h"
50 : : #include "utils/snapmgr.h"
51 : :
52 : :
53 : : /*
54 : : * GUC: backwards-compatibility flag to suppress LO permission checks
55 : : */
56 : : bool lo_compat_privileges;
57 : :
58 : : /*
59 : : * All accesses to pg_largeobject and its index make use of a single
60 : : * Relation reference. To guarantee that the relcache entry remains
61 : : * in the cache, on the first reference inside a subtransaction, we
62 : : * execute a slightly klugy maneuver to assign ownership of the
63 : : * Relation reference to TopTransactionResourceOwner.
64 : : */
65 : : static Relation lo_heap_r = NULL;
66 : : static Relation lo_index_r = NULL;
67 : :
68 : :
69 : : /*
70 : : * Open pg_largeobject and its index, if not already done in current xact
71 : : */
72 : : static void
7951 tgl@sss.pgh.pa.us 73 :CBC 2084 : open_lo_relation(void)
74 : : {
75 : : ResourceOwner currentOwner;
76 : :
77 [ + + + - ]: 2084 : if (lo_heap_r && lo_index_r)
78 : 1843 : return; /* already open in current xact */
79 : :
80 : : /* Arrange for the top xact to own these relation references */
81 : 241 : currentOwner = CurrentResourceOwner;
3128 82 : 241 : CurrentResourceOwner = TopTransactionResourceOwner;
83 : :
84 : : /* Use RowExclusiveLock since we might either read or write */
85 [ + - ]: 241 : if (lo_heap_r == NULL)
2661 andres@anarazel.de 86 : 241 : lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
3128 tgl@sss.pgh.pa.us 87 [ + - ]: 241 : if (lo_index_r == NULL)
88 : 241 : lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
89 : :
7951 90 : 241 : CurrentResourceOwner = currentOwner;
91 : : }
92 : :
93 : : /*
94 : : * Clean up at main transaction end
95 : : */
96 : : void
97 : 360 : close_lo_relation(bool isCommit)
98 : : {
99 [ + + - + ]: 360 : if (lo_heap_r || lo_index_r)
100 : : {
101 : : /*
102 : : * Only bother to close if committing; else abort cleanup will handle
103 : : * it
104 : : */
105 [ + + ]: 241 : if (isCommit)
106 : : {
107 : : ResourceOwner currentOwner;
108 : :
109 : 191 : currentOwner = CurrentResourceOwner;
3128 110 : 191 : CurrentResourceOwner = TopTransactionResourceOwner;
111 : :
112 [ + - ]: 191 : if (lo_index_r)
113 : 191 : index_close(lo_index_r, NoLock);
114 [ + - ]: 191 : if (lo_heap_r)
2661 andres@anarazel.de 115 : 191 : table_close(lo_heap_r, NoLock);
116 : :
7951 tgl@sss.pgh.pa.us 117 : 191 : CurrentResourceOwner = currentOwner;
118 : : }
119 : 241 : lo_heap_r = NULL;
120 : 241 : lo_index_r = NULL;
121 : : }
122 : 360 : }
123 : :
124 : :
125 : : /*
126 : : * Extract data field from a pg_largeobject tuple, detoasting if needed
127 : : * and verifying that the length is sane. Returns data pointer (a bytea *),
128 : : * data length, and an indication of whether to pfree the data pointer.
129 : : */
130 : : static void
4352 131 : 6858 : getdatafield(Form_pg_largeobject tuple,
132 : : bytea **pdatafield,
133 : : int *plen,
134 : : bool *pfreeit)
135 : : {
136 : : bytea *datafield;
137 : : int len;
138 : : bool freeit;
139 : :
140 : 6858 : datafield = &(tuple->data); /* see note at top of file */
141 : 6858 : freeit = false;
142 [ + + ]: 6858 : if (VARATT_IS_EXTENDED(datafield))
143 : : {
144 : : datafield = (bytea *)
83 michael@paquier.xyz 145 :GNC 6750 : detoast_attr((varlena *) datafield);
4352 tgl@sss.pgh.pa.us 146 :CBC 6750 : freeit = true;
147 : : }
148 : 6858 : len = VARSIZE(datafield) - VARHDRSZ;
149 [ + - - + ]: 6858 : if (len < 0 || len > LOBLKSIZE)
4352 tgl@sss.pgh.pa.us 150 [ # # ]:UBC 0 : ereport(ERROR,
151 : : (errcode(ERRCODE_DATA_CORRUPTED),
152 : : errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
153 : : tuple->loid, tuple->pageno, len)));
4352 tgl@sss.pgh.pa.us 154 :CBC 6858 : *pdatafield = datafield;
155 : 6858 : *plen = len;
156 : 6858 : *pfreeit = freeit;
9324 157 : 6858 : }
158 : :
159 : :
160 : : /*
161 : : * inv_create -- create a new large object
162 : : *
163 : : * Arguments:
164 : : * lobjId - OID to use for new large object, or InvalidOid to pick one
165 : : *
166 : : * Returns:
167 : : * OID of new object
168 : : *
169 : : * If lobjId is not InvalidOid, then an error occurs if the OID is already
170 : : * in use.
171 : : */
172 : : Oid
7631 173 : 104 : inv_create(Oid lobjId)
174 : : {
175 : : Oid lobjId_new;
176 : :
177 : : /*
178 : : * Create a new largeobject with empty data pages
179 : : */
5989 itagaki.takahiro@gma 180 : 104 : lobjId_new = LargeObjectCreate(lobjId);
181 : :
182 : : /*
183 : : * dependency on the owner of largeobject
184 : : *
185 : : * Note that LO dependencies are recorded using classId
186 : : * LargeObjectRelationId for backwards-compatibility reasons. Using
187 : : * LargeObjectMetadataRelationId instead would simplify matters for the
188 : : * backend, but it'd complicate pg_dump and possibly break other clients.
189 : : */
190 : 104 : recordDependencyOnOwner(LargeObjectRelationId,
191 : : lobjId_new, GetUserId());
192 : :
193 : : /* Post creation hook for new large object */
4808 rhaas@postgresql.org 194 [ - + ]: 104 : InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
195 : :
196 : : /*
197 : : * Advance command counter to make new tuple visible to later operations.
198 : : */
9326 bruce@momjian.us 199 : 104 : CommandCounterIncrement();
200 : :
5989 itagaki.takahiro@gma 201 : 104 : return lobjId_new;
202 : : }
203 : :
204 : : /*
205 : : * inv_open -- access an existing large object.
206 : : *
207 : : * Returns a large object descriptor, appropriately filled in.
208 : : * The descriptor and subsidiary data are allocated in the specified
209 : : * memory context, which must be suitably long-lived for the caller's
210 : : * purposes. If the returned descriptor has a snapshot associated
211 : : * with it, the caller must ensure that it also lives long enough,
212 : : * e.g. by calling RegisterSnapshotOnOwner
213 : : */
214 : : LargeObjectDesc *
7314 tgl@sss.pgh.pa.us 215 : 360 : inv_open(Oid lobjId, int flags, MemoryContext mcxt)
216 : : {
217 : : LargeObjectDesc *retval;
4600 heikki.linnakangas@i 218 : 360 : Snapshot snapshot = NULL;
219 : 360 : int descflags = 0;
220 : :
221 : : /*
222 : : * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
223 : : * | INV_READ), the caller being allowed to read the large object
224 : : * descriptor in either case.
225 : : */
9175 bruce@momjian.us 226 [ + + ]: 360 : if (flags & INV_WRITE)
3099 tgl@sss.pgh.pa.us 227 : 142 : descflags |= IFS_WRLOCK | IFS_RDLOCK;
228 [ + + ]: 360 : if (flags & INV_READ)
229 : 238 : descflags |= IFS_RDLOCK;
230 : :
231 [ - + ]: 360 : if (descflags == 0)
4957 tgl@sss.pgh.pa.us 232 [ # # ]:UBC 0 : ereport(ERROR,
233 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
234 : : errmsg("invalid flags for opening a large object: %d",
235 : : flags)));
236 : :
237 : : /* Get snapshot. If write is requested, use an instantaneous snapshot. */
3099 tgl@sss.pgh.pa.us 238 [ + + ]:CBC 360 : if (descflags & IFS_WRLOCK)
239 : 142 : snapshot = NULL;
240 : : else
241 : 218 : snapshot = GetActiveSnapshot();
242 : :
243 : : /* Can't use LargeObjectExists here because we need to specify snapshot */
600 fujii@postgresql.org 244 [ + + ]: 360 : if (!LargeObjectExistsWithSnapshot(lobjId, snapshot))
7631 tgl@sss.pgh.pa.us 245 [ + - ]: 5 : ereport(ERROR,
246 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
247 : : errmsg("large object %u does not exist", lobjId)));
248 : :
249 : : /* Apply permission checks, again specifying snapshot */
3099 250 [ + - ]: 355 : if ((descflags & IFS_RDLOCK) != 0)
251 : : {
252 [ + + + + ]: 698 : if (!lo_compat_privileges &&
253 : 343 : pg_largeobject_aclcheck_snapshot(lobjId,
254 : : GetUserId(),
255 : : ACL_SELECT,
256 : : snapshot) != ACLCHECK_OK)
257 [ + - ]: 28 : ereport(ERROR,
258 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
259 : : errmsg("permission denied for large object %u",
260 : : lobjId)));
261 : : }
262 [ + + ]: 327 : if ((descflags & IFS_WRLOCK) != 0)
263 : : {
264 [ + + + + ]: 244 : if (!lo_compat_privileges &&
265 : 118 : pg_largeobject_aclcheck_snapshot(lobjId,
266 : : GetUserId(),
267 : : ACL_UPDATE,
268 : : snapshot) != ACLCHECK_OK)
269 [ + - ]: 20 : ereport(ERROR,
270 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
271 : : errmsg("permission denied for large object %u",
272 : : lobjId)));
273 : : }
274 : :
275 : : /* OK to create a descriptor */
276 : 307 : retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
277 : : sizeof(LargeObjectDesc));
278 : 307 : retval->id = lobjId;
279 : 307 : retval->offset = 0;
280 : 307 : retval->flags = descflags;
281 : :
282 : : /* caller sets if needed, not used by the functions in this file */
1644 heikki.linnakangas@i 283 : 307 : retval->subid = InvalidSubTransactionId;
284 : :
285 : : /*
286 : : * The snapshot (if any) is just the currently active snapshot. The
287 : : * caller will replace it with a longer-lived copy if needed.
288 : : */
4600 289 : 307 : retval->snapshot = snapshot;
290 : :
10108 bruce@momjian.us 291 : 307 : return retval;
292 : : }
293 : :
294 : : /*
295 : : * Closes a large object descriptor previously made by inv_open(), and
296 : : * releases the long-term memory used by it.
297 : : */
298 : : void
10466 299 : 287 : inv_close(LargeObjectDesc *obj_desc)
300 : : {
223 peter@eisentraut.org 301 [ - + ]:GNC 287 : Assert(obj_desc);
10467 bruce@momjian.us 302 :CBC 287 : pfree(obj_desc);
10892 scrappy@hub.org 303 : 287 : }
304 : :
305 : : /*
306 : : * Destroys an existing large object (not to be confused with a descriptor!)
307 : : *
308 : : * Note we expect caller to have done any required permissions check.
309 : : */
310 : : int
9643 bruce@momjian.us 311 : 57 : inv_drop(Oid lobjId)
312 : : {
313 : : ObjectAddress object;
314 : :
315 : : /*
316 : : * Delete any comments and dependencies on the large object
317 : : */
5989 itagaki.takahiro@gma 318 : 57 : object.classId = LargeObjectRelationId;
319 : 57 : object.objectId = lobjId;
320 : 57 : object.objectSubId = 0;
5213 rhaas@postgresql.org 321 : 57 : performDeletion(&object, DROP_CASCADE, 0);
322 : :
323 : : /*
324 : : * Advance command counter so that tuple removal will be seen by later
325 : : * large-object operations in this transaction.
326 : : */
9324 tgl@sss.pgh.pa.us 327 : 57 : CommandCounterIncrement();
328 : :
329 : : /* For historical reasons, we always return 1 on success. */
10467 bruce@momjian.us 330 : 57 : return 1;
331 : : }
332 : :
333 : : /*
334 : : * Determine size of a large object
335 : : *
336 : : * NOTE: LOs can contain gaps, just like Unix files. We actually return
337 : : * the offset of the last byte + 1.
338 : : */
339 : : static uint64
9324 tgl@sss.pgh.pa.us 340 : 89 : inv_getsize(LargeObjectDesc *obj_desc)
341 : : {
4958 ishii@postgresql.org 342 : 89 : uint64 lastbyte = 0;
343 : : ScanKeyData skey[1];
344 : : SysScanDesc sd;
345 : : HeapTuple tuple;
346 : :
223 peter@eisentraut.org 347 [ - + ]:GNC 89 : Assert(obj_desc);
348 : :
7951 tgl@sss.pgh.pa.us 349 :CBC 89 : open_lo_relation();
350 : :
8210 351 : 89 : ScanKeyInit(&skey[0],
352 : : Anum_pg_largeobject_loid,
353 : : BTEqualStrategyNumber, F_OIDEQ,
354 : : ObjectIdGetDatum(obj_desc->id));
355 : :
6597 356 : 89 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
357 : : obj_desc->snapshot, 1, skey);
358 : :
359 : : /*
360 : : * Because the pg_largeobject index is on both loid and pageno, but we
361 : : * constrain only loid, a backwards scan should visit all pages of the
362 : : * large object in reverse pageno order. So, it's sufficient to examine
363 : : * the first valid tuple (== last valid page).
364 : : */
5989 itagaki.takahiro@gma 365 : 89 : tuple = systable_getnext_ordered(sd, BackwardScanDirection);
366 [ + + ]: 89 : if (HeapTupleIsValid(tuple))
367 : : {
368 : : Form_pg_largeobject data;
369 : : bytea *datafield;
370 : : int len;
371 : : bool pfreeit;
372 : :
6746 bruce@momjian.us 373 [ - + ]: 76 : if (HeapTupleHasNulls(tuple)) /* paranoia */
6902 tgl@sss.pgh.pa.us 374 [ # # ]:UBC 0 : elog(ERROR, "null field found in pg_largeobject");
8751 tgl@sss.pgh.pa.us 375 :CBC 76 : data = (Form_pg_largeobject) GETSTRUCT(tuple);
4352 376 : 76 : getdatafield(data, &datafield, &len, &pfreeit);
377 : 76 : lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
9324 378 [ + + ]: 76 : if (pfreeit)
379 : 24 : pfree(datafield);
380 : : }
381 : :
6597 382 : 89 : systable_endscan_ordered(sd);
383 : :
9324 384 : 89 : return lastbyte;
385 : : }
386 : :
387 : : int64
4958 ishii@postgresql.org 388 : 190 : inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
389 : : {
390 : : int64 newoffset;
391 : :
223 peter@eisentraut.org 392 [ - + ]:GNC 190 : Assert(obj_desc);
393 : :
394 : : /*
395 : : * We allow seek/tell if you have either read or write permission, so no
396 : : * need for a permission check here.
397 : : */
398 : :
399 : : /*
400 : : * Note: overflow in the additions is possible, but since we will reject
401 : : * negative results, we don't need any extra test for that.
402 : : */
9324 tgl@sss.pgh.pa.us 403 [ + + + - ]:CBC 190 : switch (whence)
404 : : {
405 : 89 : case SEEK_SET:
4957 406 : 89 : newoffset = offset;
9324 407 : 89 : break;
408 : 12 : case SEEK_CUR:
4957 409 : 12 : newoffset = obj_desc->offset + offset;
9324 410 : 12 : break;
411 : 89 : case SEEK_END:
4957 412 : 89 : newoffset = inv_getsize(obj_desc) + offset;
9324 413 : 89 : break;
9324 tgl@sss.pgh.pa.us 414 :UBC 0 : default:
4957 415 [ # # ]: 0 : ereport(ERROR,
416 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
417 : : errmsg("invalid whence setting: %d", whence)));
418 : : newoffset = 0; /* keep compiler quiet */
419 : : break;
420 : : }
421 : :
422 : : /*
423 : : * use errmsg_internal here because we don't want to expose INT64_FORMAT
424 : : * in translatable strings; doing better is not worth the trouble
425 : : */
4957 tgl@sss.pgh.pa.us 426 [ + - - + ]:CBC 190 : if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
4957 tgl@sss.pgh.pa.us 427 [ # # ]:UBC 0 : ereport(ERROR,
428 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
429 : : errmsg_internal("invalid large object seek target: " INT64_FORMAT,
430 : : newoffset)));
431 : :
4957 tgl@sss.pgh.pa.us 432 :CBC 190 : obj_desc->offset = newoffset;
433 : 190 : return newoffset;
434 : : }
435 : :
436 : : int64
10466 bruce@momjian.us 437 : 32 : inv_tell(LargeObjectDesc *obj_desc)
438 : : {
223 peter@eisentraut.org 439 [ - + ]:GNC 32 : Assert(obj_desc);
440 : :
441 : : /*
442 : : * We allow seek/tell if you have either read or write permission, so no
443 : : * need for a permission check here.
444 : : */
445 : :
10108 bruce@momjian.us 446 :CBC 32 : return obj_desc->offset;
447 : : }
448 : :
449 : : int
10466 450 : 918 : inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
451 : : {
9175 452 : 918 : int nread = 0;
453 : : int64 n;
454 : : int64 off;
455 : : int len;
456 : 918 : int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
457 : : uint64 pageoff;
458 : : ScanKeyData skey[2];
459 : : SysScanDesc sd;
460 : : HeapTuple tuple;
461 : :
223 peter@eisentraut.org 462 [ - + ]:GNC 918 : Assert(obj_desc);
10467 bruce@momjian.us 463 [ - + ]:CBC 918 : Assert(buf != NULL);
464 : :
3099 tgl@sss.pgh.pa.us 465 [ - + ]: 918 : if ((obj_desc->flags & IFS_RDLOCK) == 0)
3099 tgl@sss.pgh.pa.us 466 [ # # ]:UBC 0 : ereport(ERROR,
467 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
468 : : errmsg("permission denied for large object %u",
469 : : obj_desc->id)));
470 : :
9324 tgl@sss.pgh.pa.us 471 [ + + ]:CBC 918 : if (nbytes <= 0)
9326 bruce@momjian.us 472 : 13 : return 0;
473 : :
7951 tgl@sss.pgh.pa.us 474 : 905 : open_lo_relation();
475 : :
8210 476 : 905 : ScanKeyInit(&skey[0],
477 : : Anum_pg_largeobject_loid,
478 : : BTEqualStrategyNumber, F_OIDEQ,
479 : : ObjectIdGetDatum(obj_desc->id));
480 : :
481 : 905 : ScanKeyInit(&skey[1],
482 : : Anum_pg_largeobject_pageno,
483 : : BTGreaterEqualStrategyNumber, F_INT4GE,
484 : : Int32GetDatum(pageno));
485 : :
6597 486 : 905 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
487 : : obj_desc->snapshot, 2, skey);
488 : :
489 [ + + ]: 6935 : while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
490 : : {
491 : : Form_pg_largeobject data;
492 : : bytea *datafield;
493 : : bool pfreeit;
494 : :
6746 bruce@momjian.us 495 [ - + ]: 6750 : if (HeapTupleHasNulls(tuple)) /* paranoia */
6902 tgl@sss.pgh.pa.us 496 [ # # ]:UBC 0 : elog(ERROR, "null field found in pg_largeobject");
8751 tgl@sss.pgh.pa.us 497 :CBC 6750 : data = (Form_pg_largeobject) GETSTRUCT(tuple);
498 : :
499 : : /*
500 : : * We expect the indexscan will deliver pages in order. However,
501 : : * there may be missing pages if the LO contains unwritten "holes". We
502 : : * want missing sections to read out as zeroes.
503 : : */
4958 ishii@postgresql.org 504 : 6750 : pageoff = ((uint64) data->pageno) * LOBLKSIZE;
9324 tgl@sss.pgh.pa.us 505 [ + + ]: 6750 : if (pageoff > obj_desc->offset)
506 : : {
507 : 8 : n = pageoff - obj_desc->offset;
508 : 8 : n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
509 [ - + - - : 8 : MemSet(buf + nread, 0, n);
- - - - -
- ]
510 : 8 : nread += n;
511 : 8 : obj_desc->offset += n;
512 : : }
513 : :
514 [ + + ]: 6750 : if (nread < nbytes)
515 : : {
516 [ - + ]: 6746 : Assert(obj_desc->offset >= pageoff);
517 : 6746 : off = (int) (obj_desc->offset - pageoff);
518 [ + - - + ]: 6746 : Assert(off >= 0 && off < LOBLKSIZE);
519 : :
4352 520 : 6746 : getdatafield(data, &datafield, &len, &pfreeit);
9324 521 [ + + ]: 6746 : if (len > off)
522 : : {
523 : 6681 : n = len - off;
524 : 6681 : n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
525 : 6681 : memcpy(buf + nread, VARDATA(datafield) + off, n);
526 : 6681 : nread += n;
527 : 6681 : obj_desc->offset += n;
528 : : }
529 [ + + ]: 6746 : if (pfreeit)
530 : 6698 : pfree(datafield);
531 : : }
532 : :
533 [ + + ]: 6750 : if (nread >= nbytes)
534 : 720 : break;
535 : : }
536 : :
6597 537 : 905 : systable_endscan_ordered(sd);
538 : :
10108 bruce@momjian.us 539 : 905 : return nread;
540 : : }
541 : :
542 : : int
7180 543 : 1058 : inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
544 : : {
9175 545 : 1058 : int nwritten = 0;
546 : : int n;
547 : : int off;
548 : : int len;
549 : 1058 : int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
550 : : ScanKeyData skey[2];
551 : : SysScanDesc sd;
552 : : HeapTuple oldtuple;
553 : : Form_pg_largeobject olddata;
554 : : bool neednextpage;
555 : : bytea *datafield;
556 : : bool pfreeit;
557 : : union
558 : : {
559 : : alignas(int32) bytea hdr;
560 : : /* this is to make the union big enough for a LO data chunk: */
561 : : char data[LOBLKSIZE + VARHDRSZ];
273 msawada@postgresql.o 562 :GNC 1058 : } workbuf = {0};
7007 tgl@sss.pgh.pa.us 563 :CBC 1058 : char *workb = VARDATA(&workbuf.hdr);
564 : : HeapTuple newtup;
565 : : Datum values[Natts_pg_largeobject];
566 : : bool nulls[Natts_pg_largeobject];
567 : : bool replace[Natts_pg_largeobject];
568 : : CatalogIndexState indstate;
569 : :
223 peter@eisentraut.org 570 [ - + ]:GNC 1058 : Assert(obj_desc);
10467 bruce@momjian.us 571 [ - + ]:CBC 1058 : Assert(buf != NULL);
572 : :
573 : : /* enforce writability because snapshot is probably wrong otherwise */
3099 tgl@sss.pgh.pa.us 574 [ - + ]: 1058 : if ((obj_desc->flags & IFS_WRLOCK) == 0)
3099 tgl@sss.pgh.pa.us 575 [ # # ]:UBC 0 : ereport(ERROR,
576 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
577 : : errmsg("permission denied for large object %u",
578 : : obj_desc->id)));
579 : :
9324 tgl@sss.pgh.pa.us 580 [ - + ]:CBC 1058 : if (nbytes <= 0)
9324 tgl@sss.pgh.pa.us 581 :UBC 0 : return 0;
582 : :
583 : : /* this addition can't overflow because nbytes is only int32 */
4958 ishii@postgresql.org 584 [ - + ]:CBC 1058 : if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
4957 tgl@sss.pgh.pa.us 585 [ # # ]:UBC 0 : ereport(ERROR,
586 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
587 : : errmsg("invalid large object write request size: %d",
588 : : nbytes)));
589 : :
7951 tgl@sss.pgh.pa.us 590 :CBC 1058 : open_lo_relation();
591 : :
592 : 1058 : indstate = CatalogOpenIndexes(lo_heap_r);
593 : :
8210 594 : 1058 : ScanKeyInit(&skey[0],
595 : : Anum_pg_largeobject_loid,
596 : : BTEqualStrategyNumber, F_OIDEQ,
597 : : ObjectIdGetDatum(obj_desc->id));
598 : :
599 : 1058 : ScanKeyInit(&skey[1],
600 : : Anum_pg_largeobject_pageno,
601 : : BTGreaterEqualStrategyNumber, F_INT4GE,
602 : : Int32GetDatum(pageno));
603 : :
6597 604 : 1058 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
605 : : obj_desc->snapshot, 2, skey);
606 : :
8751 607 : 1058 : oldtuple = NULL;
9324 608 : 1058 : olddata = NULL;
609 : 1058 : neednextpage = true;
610 : :
9326 bruce@momjian.us 611 [ + + ]: 6380 : while (nwritten < nbytes)
612 : : {
613 : : /*
614 : : * If possible, get next pre-existing page of the LO. We expect the
615 : : * indexscan will deliver these in order --- but there may be holes.
616 : : */
9324 tgl@sss.pgh.pa.us 617 [ + + ]: 5322 : if (neednextpage)
618 : : {
6597 619 [ + + ]: 1062 : if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
620 : : {
3240 621 [ - + ]: 24 : if (HeapTupleHasNulls(oldtuple)) /* paranoia */
6902 tgl@sss.pgh.pa.us 622 [ # # ]:UBC 0 : elog(ERROR, "null field found in pg_largeobject");
8751 tgl@sss.pgh.pa.us 623 :CBC 24 : olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
624 [ - + ]: 24 : Assert(olddata->pageno >= pageno);
625 : : }
9324 626 : 1062 : neednextpage = false;
627 : : }
628 : :
629 : : /*
630 : : * If we have a pre-existing page, see if it is the page we want to
631 : : * write, or a later one.
632 : : */
633 [ + + + - ]: 5322 : if (olddata != NULL && olddata->pageno == pageno)
634 : : {
635 : : /*
636 : : * Update an existing page with fresh data.
637 : : *
638 : : * First, load old data into workbuf
639 : : */
4352 640 : 24 : getdatafield(olddata, &datafield, &len, &pfreeit);
9324 641 : 24 : memcpy(workb, VARDATA(datafield), len);
642 [ + + ]: 24 : if (pfreeit)
643 : 20 : pfree(datafield);
644 : :
645 : : /*
646 : : * Fill any hole
647 : : */
648 : 24 : off = (int) (obj_desc->offset % LOBLKSIZE);
649 [ - + ]: 24 : if (off > len)
9324 tgl@sss.pgh.pa.us 650 [ # # # # :UBC 0 : MemSet(workb + len, 0, off - len);
# # # # #
# ]
651 : :
652 : : /*
653 : : * Insert appropriate portion of new data
654 : : */
9324 tgl@sss.pgh.pa.us 655 :CBC 24 : n = LOBLKSIZE - off;
656 : 24 : n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
657 : 24 : memcpy(workb + off, buf + nwritten, n);
658 : 24 : nwritten += n;
659 : 24 : obj_desc->offset += n;
660 : 24 : off += n;
661 : : /* compute valid length of new page */
662 : 24 : len = (len >= off) ? len : off;
7007 663 : 24 : SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
664 : :
665 : : /*
666 : : * Form and insert updated tuple
667 : : */
9324 668 : 24 : memset(values, 0, sizeof(values));
6393 669 : 24 : memset(nulls, false, sizeof(nulls));
670 : 24 : memset(replace, false, sizeof(replace));
9172 671 : 24 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
6393 672 : 24 : replace[Anum_pg_largeobject_data - 1] = true;
673 : 24 : newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
674 : : values, nulls, replace);
3380 675 : 24 : CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
676 : : indstate);
9324 677 : 24 : heap_freetuple(newtup);
678 : :
679 : : /*
680 : : * We're done with this old page.
681 : : */
8751 682 : 24 : oldtuple = NULL;
9324 683 : 24 : olddata = NULL;
684 : 24 : neednextpage = true;
685 : : }
686 : : else
687 : : {
688 : : /*
689 : : * Write a brand new page.
690 : : *
691 : : * First, fill any hole
692 : : */
693 : 5298 : off = (int) (obj_desc->offset % LOBLKSIZE);
694 [ + + ]: 5298 : if (off > 0)
695 [ - + - - : 4 : MemSet(workb, 0, off);
- - - - -
- ]
696 : :
697 : : /*
698 : : * Insert appropriate portion of new data
699 : : */
700 : 5298 : n = LOBLKSIZE - off;
701 : 5298 : n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
702 : 5298 : memcpy(workb + off, buf + nwritten, n);
703 : 5298 : nwritten += n;
704 : 5298 : obj_desc->offset += n;
705 : : /* compute valid length of new page */
706 : 5298 : len = off + n;
7007 707 : 5298 : SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
708 : :
709 : : /*
710 : : * Form and insert updated tuple
711 : : */
9324 712 : 5298 : memset(values, 0, sizeof(values));
6393 713 : 5298 : memset(nulls, false, sizeof(nulls));
9324 714 : 5298 : values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
715 : 5298 : values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
9172 716 : 5298 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
6393 717 : 5298 : newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
3380 718 : 5298 : CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
9324 719 : 5298 : heap_freetuple(newtup);
720 : : }
721 : 5322 : pageno++;
722 : : }
723 : :
6597 724 : 1058 : systable_endscan_ordered(sd);
725 : :
8674 726 : 1058 : CatalogCloseIndexes(indstate);
727 : :
728 : : /*
729 : : * Advance command counter so that my tuple updates will be seen by later
730 : : * large-object operations in this transaction.
731 : : */
9324 732 : 1058 : CommandCounterIncrement();
733 : :
9326 bruce@momjian.us 734 : 1058 : return nwritten;
735 : : }
736 : :
737 : : void
4958 ishii@postgresql.org 738 : 32 : inv_truncate(LargeObjectDesc *obj_desc, int64 len)
739 : : {
7003 bruce@momjian.us 740 : 32 : int32 pageno = (int32) (len / LOBLKSIZE);
741 : : int32 off;
742 : : ScanKeyData skey[2];
743 : : SysScanDesc sd;
744 : : HeapTuple oldtuple;
745 : : Form_pg_largeobject olddata;
746 : : union
747 : : {
748 : : alignas(int32) bytea hdr;
749 : : /* this is to make the union big enough for a LO data chunk: */
750 : : char data[LOBLKSIZE + VARHDRSZ];
273 msawada@postgresql.o 751 :GNC 32 : } workbuf = {0};
6746 bruce@momjian.us 752 :CBC 32 : char *workb = VARDATA(&workbuf.hdr);
753 : : HeapTuple newtup;
754 : : Datum values[Natts_pg_largeobject];
755 : : bool nulls[Natts_pg_largeobject];
756 : : bool replace[Natts_pg_largeobject];
757 : : CatalogIndexState indstate;
758 : :
223 peter@eisentraut.org 759 [ - + ]:GNC 32 : Assert(obj_desc);
760 : :
761 : : /* enforce writability because snapshot is probably wrong otherwise */
3099 tgl@sss.pgh.pa.us 762 [ - + ]:CBC 32 : if ((obj_desc->flags & IFS_WRLOCK) == 0)
3099 tgl@sss.pgh.pa.us 763 [ # # ]:UBC 0 : ereport(ERROR,
764 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
765 : : errmsg("permission denied for large object %u",
766 : : obj_desc->id)));
767 : :
768 : : /*
769 : : * use errmsg_internal here because we don't want to expose INT64_FORMAT
770 : : * in translatable strings; doing better is not worth the trouble
771 : : */
4957 tgl@sss.pgh.pa.us 772 [ + - - + ]:CBC 32 : if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
4957 tgl@sss.pgh.pa.us 773 [ # # ]:UBC 0 : ereport(ERROR,
774 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
775 : : errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
776 : : len)));
777 : :
7003 bruce@momjian.us 778 :CBC 32 : open_lo_relation();
779 : :
780 : 32 : indstate = CatalogOpenIndexes(lo_heap_r);
781 : :
782 : : /*
783 : : * Set up to find all pages with desired loid and pageno >= target
784 : : */
785 : 32 : ScanKeyInit(&skey[0],
786 : : Anum_pg_largeobject_loid,
787 : : BTEqualStrategyNumber, F_OIDEQ,
788 : : ObjectIdGetDatum(obj_desc->id));
789 : :
790 : 32 : ScanKeyInit(&skey[1],
791 : : Anum_pg_largeobject_pageno,
792 : : BTGreaterEqualStrategyNumber, F_INT4GE,
793 : : Int32GetDatum(pageno));
794 : :
6597 tgl@sss.pgh.pa.us 795 : 32 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
796 : : obj_desc->snapshot, 2, skey);
797 : :
798 : : /*
799 : : * If possible, get the page the truncation point is in. The truncation
800 : : * point may be beyond the end of the LO or in a hole.
801 : : */
7003 bruce@momjian.us 802 : 32 : olddata = NULL;
6597 tgl@sss.pgh.pa.us 803 [ + + ]: 32 : if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
804 : : {
3240 805 [ - + ]: 20 : if (HeapTupleHasNulls(oldtuple)) /* paranoia */
6902 tgl@sss.pgh.pa.us 806 [ # # ]:UBC 0 : elog(ERROR, "null field found in pg_largeobject");
7003 bruce@momjian.us 807 :CBC 20 : olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
808 [ - + ]: 20 : Assert(olddata->pageno >= pageno);
809 : : }
810 : :
811 : : /*
812 : : * If we found the page of the truncation point we need to truncate the
813 : : * data in it. Otherwise if we're in a hole, we need to create a page to
814 : : * mark the end of data.
815 : : */
816 [ + + + + ]: 32 : if (olddata != NULL && olddata->pageno == pageno)
817 : 12 : {
818 : : /* First, load old data into workbuf */
819 : : bytea *datafield;
820 : : int pagelen;
821 : : bool pfreeit;
822 : :
4352 tgl@sss.pgh.pa.us 823 : 12 : getdatafield(olddata, &datafield, &pagelen, &pfreeit);
7003 bruce@momjian.us 824 : 12 : memcpy(workb, VARDATA(datafield), pagelen);
825 [ + + ]: 12 : if (pfreeit)
6746 826 : 8 : pfree(datafield);
827 : :
828 : : /*
829 : : * Fill any hole
830 : : */
7003 831 : 12 : off = len % LOBLKSIZE;
832 [ + + ]: 12 : if (off > pagelen)
6746 833 [ + - - + : 4 : MemSet(workb + pagelen, 0, off - pagelen);
- - - - -
- ]
834 : :
835 : : /* compute length of new page */
7003 836 : 12 : SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
837 : :
838 : : /*
839 : : * Form and insert updated tuple
840 : : */
841 : 12 : memset(values, 0, sizeof(values));
6393 tgl@sss.pgh.pa.us 842 : 12 : memset(nulls, false, sizeof(nulls));
843 : 12 : memset(replace, false, sizeof(replace));
7003 bruce@momjian.us 844 : 12 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
6393 tgl@sss.pgh.pa.us 845 : 12 : replace[Anum_pg_largeobject_data - 1] = true;
846 : 12 : newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
847 : : values, nulls, replace);
3380 848 : 12 : CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
849 : : indstate);
7003 bruce@momjian.us 850 : 12 : heap_freetuple(newtup);
851 : : }
852 : : else
853 : : {
854 : : /*
855 : : * If the first page we found was after the truncation point, we're in
856 : : * a hole that we'll fill, but we need to delete the later page
857 : : * because the loop below won't visit it again.
858 : : */
5578 tgl@sss.pgh.pa.us 859 [ + + ]: 20 : if (olddata != NULL)
860 : : {
861 [ - + ]: 8 : Assert(olddata->pageno > pageno);
3380 862 : 8 : CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
863 : : }
864 : :
865 : : /*
866 : : * Write a brand new page.
867 : : *
868 : : * Fill the hole up to the truncation point
869 : : */
7003 bruce@momjian.us 870 : 20 : off = len % LOBLKSIZE;
871 [ + - ]: 20 : if (off > 0)
872 [ - + - - : 20 : MemSet(workb, 0, off);
- - - - -
- ]
873 : :
874 : : /* compute length of new page */
875 : 20 : SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
876 : :
877 : : /*
878 : : * Form and insert new tuple
879 : : */
880 : 20 : memset(values, 0, sizeof(values));
6393 tgl@sss.pgh.pa.us 881 : 20 : memset(nulls, false, sizeof(nulls));
7003 bruce@momjian.us 882 : 20 : values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
883 : 20 : values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
884 : 20 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
6393 tgl@sss.pgh.pa.us 885 : 20 : newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
3380 886 : 20 : CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
7003 bruce@momjian.us 887 : 20 : heap_freetuple(newtup);
888 : : }
889 : :
890 : : /*
891 : : * Delete any pages after the truncation point. If the initial search
892 : : * didn't find a page, then of course there's nothing more to do.
893 : : */
5578 tgl@sss.pgh.pa.us 894 [ + + ]: 32 : if (olddata != NULL)
895 : : {
896 [ + + ]: 24 : while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
897 : : {
3380 898 : 4 : CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
899 : : }
900 : : }
901 : :
6597 902 : 32 : systable_endscan_ordered(sd);
903 : :
7003 bruce@momjian.us 904 : 32 : CatalogCloseIndexes(indstate);
905 : :
906 : : /*
907 : : * Advance command counter so that tuple updates will be seen by later
908 : : * large-object operations in this transaction.
909 : : */
910 : 32 : CommandCounterIncrement();
911 : 32 : }
|