lmv_intent.c 14.7 KB
Newer Older
1
/*
kalpak's avatar
   
kalpak committed
2
 * GPL HEADER START
tappro's avatar
tappro committed
3
 *
kalpak's avatar
   
kalpak committed
4
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
tappro's avatar
tappro committed
5
 *
kalpak's avatar
   
kalpak committed
6
7
8
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 2 only,
 * as published by the Free Software Foundation.
tappro's avatar
tappro committed
9
 *
kalpak's avatar
   
kalpak committed
10
11
12
13
14
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License version 2 for more details (a copy is included
 * in the LICENSE file that accompanied this code).
tappro's avatar
tappro committed
15
 *
kalpak's avatar
   
kalpak committed
16
 * You should have received a copy of the GNU General Public License
kalpak's avatar
   
kalpak committed
17
 * version 2 along with this program; If not, see
18
 * http://www.gnu.org/licenses/gpl-2.0.html
kalpak's avatar
   
kalpak committed
19
20
21
22
 *
 * GPL HEADER END
 */
/*
23
 * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
kalpak's avatar
   
kalpak committed
24
 * Use is subject to license terms.
25
 *
26
 * Copyright (c) 2011, 2016, Intel Corporation.
kalpak's avatar
   
kalpak committed
27
28
29
30
 */
/*
 * This file is part of Lustre, http://www.lustre.org/
 * Lustre is a trademark of Sun Microsystems, Inc.
tappro's avatar
tappro committed
31
32
33
34
35
36
37
38
 */

#define DEBUG_SUBSYSTEM S_LMV
#include <linux/slab.h>
#include <linux/module.h>
#include <linux/init.h>
#include <linux/slab.h>
#include <linux/pagemap.h>
39
#include <linux/math64.h>
tappro's avatar
tappro committed
40
41
#include <linux/seq_file.h>
#include <linux/namei.h>
42
#include <lustre_intent.h>
tappro's avatar
tappro committed
43
44
45
46
47

#include <obd_support.h>
#include <lustre_lib.h>
#include <lustre_net.h>
#include <lustre_dlm.h>
48
#include <lustre_mdc.h>
tappro's avatar
tappro committed
49
50
51
52
#include <obd_class.h>
#include <lprocfs_status.h>
#include "lmv_internal.h"

53
54
static int lmv_intent_remote(struct obd_export *exp, struct lookup_intent *it,
			     const struct lu_fid *parent_fid,
55
56
			     struct ptlrpc_request **reqp,
			     ldlm_blocking_callback cb_blocking,
57
58
			     __u64 extra_lock_flags,
			     const char *secctx_name, __u32 secctx_name_size)
tappro's avatar
tappro committed
59
{
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
	struct obd_device	*obd = exp->exp_obd;
	struct lmv_obd		*lmv = &obd->u.lmv;
	struct ptlrpc_request	*req = NULL;
	struct lustre_handle	plock;
	struct md_op_data	*op_data;
	struct lmv_tgt_desc	*tgt;
	struct mdt_body		*body;
	int			pmode;
	int			rc = 0;
	ENTRY;

	body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
	if (body == NULL)
		RETURN(-EPROTO);

75
	LASSERT((body->mbo_valid & OBD_MD_MDS));
76
77
78
79

	/*
	 * We got LOOKUP lock, but we really need attrs.
	 */
80
	pmode = it->it_lock_mode;
81
	if (pmode) {
82
83
		plock.cookie = it->it_lock_handle;
		it->it_lock_mode = 0;
84
		it->it_request = NULL;
85
	}
tappro's avatar
tappro committed
86

87
	LASSERT(fid_is_sane(&body->mbo_fid1));
tappro's avatar
tappro committed
88

89
	tgt = lmv_fid2tgt(lmv, &body->mbo_fid1);
90
91
	if (IS_ERR(tgt))
		GOTO(out, rc = PTR_ERR(tgt));
tappro's avatar
tappro committed
92

93
94
95
	OBD_ALLOC_PTR(op_data);
	if (op_data == NULL)
		GOTO(out, rc = -ENOMEM);
tappro's avatar
tappro committed
96

97
	op_data->op_fid1 = body->mbo_fid1;
98
	/* Sent the parent FID to the remote MDT */
99
100
101
102
103
	if (parent_fid != NULL) {
		/* The parent fid is only for remote open to
		 * check whether the open is from OBF,
		 * see mdt_cross_open */
		LASSERT(it->it_op & IT_OPEN);
104
		op_data->op_fid2 = *parent_fid;
105
	}
tappro's avatar
tappro committed
106

107
	op_data->op_bias = MDS_CROSS_REF;
108
	CDEBUG(D_INODE, "REMOTE_INTENT with fid="DFID" -> mds #%u\n",
109
	       PFID(&body->mbo_fid1), tgt->ltd_index);
tappro's avatar
tappro committed
110

111
112
113
114
115
116
117
118
119
120
	/* ask for security context upon intent */
	if (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_OPEN) &&
	    secctx_name_size != 0 && secctx_name != NULL) {
		op_data->op_file_secctx_name = secctx_name;
		op_data->op_file_secctx_name_size = secctx_name_size;
		CDEBUG(D_SEC, "'%.*s' is security xattr to fetch for "
		       DFID"\n",
		       secctx_name_size, secctx_name, PFID(&body->mbo_fid1));
	}

121
122
	rc = md_intent_lock(tgt->ltd_exp, op_data, it, &req, cb_blocking,
			    extra_lock_flags);
yury's avatar
b=16727    
yury committed
123
124
        if (rc)
                GOTO(out_free_op_data, rc);
tappro's avatar
tappro committed
125

126
127
128
129
130
	/*
	 * LLite needs LOOKUP lock to track dentry revocation in order to
	 * maintain dcache consistency. Thus drop UPDATE|PERM lock here
	 * and put LOOKUP in request.
	 */
131
132
133
134
	if (it->it_lock_mode != 0) {
		it->it_remote_lock_handle =
					it->it_lock_handle;
		it->it_remote_lock_mode = it->it_lock_mode;
135
136
	}

137
	if (pmode) {
138
139
		it->it_lock_handle = plock.cookie;
		it->it_lock_mode = pmode;
140
	}
141
142

	EXIT;
yury's avatar
b=16727    
yury committed
143
out_free_op_data:
144
	OBD_FREE_PTR(op_data);
tappro's avatar
tappro committed
145
out:
146
147
	if (rc && pmode)
		ldlm_lock_decref(&plock, pmode);
tappro's avatar
tappro committed
148

149
150
151
	ptlrpc_req_finished(*reqp);
	*reqp = req;
	return rc;
tappro's avatar
tappro committed
152
153
}

154
int lmv_revalidate_slaves(struct obd_export *exp,
155
			  const struct lu_fid *pfid,
156
			  const struct lmv_stripe_md *lsm,
157
158
159
			  ldlm_blocking_callback cb_blocking,
			  int extra_lock_flags)
{
160
161
162
163
164
165
166
167
	struct obd_device *obd = exp->exp_obd;
	struct lmv_obd *lmv = &obd->u.lmv;
	struct ptlrpc_request *req = NULL;
	struct mdt_body *body;
	struct md_op_data *op_data;
	int i;
	int valid_stripe_count = 0;
	int rc = 0;
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192

	ENTRY;

	/**
	 * revalidate slaves has some problems, temporarily return,
	 * we may not need that
	 */
	OBD_ALLOC_PTR(op_data);
	if (op_data == NULL)
		RETURN(-ENOMEM);

	/**
	 * Loop over the stripe information, check validity and update them
	 * from MDS if needed.
	 */
	for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
		struct lu_fid		fid;
		struct lookup_intent	it = { .it_op = IT_GETATTR };
		struct lustre_handle	*lockh = NULL;
		struct lmv_tgt_desc	*tgt = NULL;
		struct inode		*inode;

		fid = lsm->lsm_md_oinfo[i].lmo_fid;
		inode = lsm->lsm_md_oinfo[i].lmo_root;

193
194
195
		if (!inode)
			continue;

196
197
198
199
200
201
		/*
		 * Prepare op_data for revalidating. Note that @fid2 shluld be
		 * defined otherwise it will go to server and take new lock
		 * which is not needed here.
		 */
		memset(op_data, 0, sizeof(*op_data));
202
203
204
205
		if (exp_connect_flags2(exp) & OBD_CONNECT2_GETATTR_PFID)
			op_data->op_fid1 = *pfid;
		else
			op_data->op_fid1 = fid;
206
207
		op_data->op_fid2 = fid;

208
209
210
		tgt = lmv_tgt(lmv, lsm->lsm_md_oinfo[i].lmo_mds);
		if (!tgt)
			GOTO(cleanup, rc = -ENODEV);
211

212
		CDEBUG(D_INODE, "Revalidate slave "DFID" -> mds #%u\n",
213
		       PFID(&fid), tgt->ltd_index);
214

215
216
217
218
219
		if (req != NULL) {
			ptlrpc_req_finished(req);
			req = NULL;
		}

220
221
		rc = md_intent_lock(tgt->ltd_exp, op_data, &it, &req,
				    cb_blocking, extra_lock_flags);
222
223
224
225
226
227
		if (rc == -ENOENT) {
			/* skip stripe is not exists */
			rc = 0;
			continue;
		}

228
229
230
		if (rc < 0)
			GOTO(cleanup, rc);

231
		lockh = (struct lustre_handle *)&it.it_lock_handle;
232
233
234
235
236
237
238
239
240
		if (rc > 0 && req == NULL) {
			/* slave inode is still valid */
			CDEBUG(D_INODE, "slave "DFID" is still valid.\n",
			       PFID(&fid));
			rc = 0;
		} else {
			/* refresh slave from server */
			body = req_capsule_server_get(&req->rq_pill,
						      &RMF_MDT_BODY);
241
			if (body == NULL) {
242
				if (it.it_lock_mode && lockh) {
243
					ldlm_lock_decref(lockh,
244
245
						 it.it_lock_mode);
					it.it_lock_mode = 0;
246
				}
247
				GOTO(cleanup, rc = -ENOENT);
248
249
			}

250
			i_size_write(inode, body->mbo_size);
251
			inode->i_blocks = body->mbo_blocks;
252
			set_nlink(inode, body->mbo_nlink);
253
254
255
			inode->i_atime.tv_sec = body->mbo_atime;
			inode->i_ctime.tv_sec = body->mbo_ctime;
			inode->i_mtime.tv_sec = body->mbo_mtime;
256
		}
257

258
		md_set_lock_data(tgt->ltd_exp, lockh, inode, NULL);
259
260
261
		if (it.it_lock_mode != 0 && lockh != NULL) {
			ldlm_lock_decref(lockh, it.it_lock_mode);
			it.it_lock_mode = 0;
262
		}
263
264

		valid_stripe_count++;
265
266
267
	}

cleanup:
268
269
270
	if (req != NULL)
		ptlrpc_req_finished(req);

271
272
273
274
	/* if all stripes are invalid, return -ENOENT to notify user */
	if (!rc && !valid_stripe_count)
		rc = -ENOENT;

275
276
277
278
	OBD_FREE_PTR(op_data);
	RETURN(rc);
}

tappro's avatar
tappro committed
279
280
281
282
/*
 * IT_OPEN is intended to open (and create, possible) an object. Parent (pid)
 * may be split dir.
 */
283
284
285
286
287
static int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
			   struct lookup_intent *it,
			   struct ptlrpc_request **reqp,
			   ldlm_blocking_callback cb_blocking,
			   __u64 extra_lock_flags)
tappro's avatar
tappro committed
288
{
289
290
291
292
293
294
295
	struct obd_device *obd = exp->exp_obd;
	struct lmv_obd *lmv = &obd->u.lmv;
	struct lmv_tgt_desc *tgt;
	struct mdt_body *body;
	__u64 flags = it->it_flags;
	int rc;

296
297
	ENTRY;

298
	/* do not allow file creation in foreign dir */
299
	if ((it->it_op & IT_CREAT) && lmv_dir_foreign(op_data->op_mea1))
300
301
		RETURN(-ENODATA);

302
303
	if ((it->it_op & IT_CREAT) && !(flags & MDS_OPEN_BY_FID)) {
		/* don't allow create under dir with bad hash */
304
		if (lmv_dir_bad_hash(op_data->op_mea1))
305
306
			RETURN(-EBADF);

307
		if (lmv_dir_layout_changing(op_data->op_mea1)) {
308
309
310
311
			if (flags & O_EXCL) {
				/*
				 * open(O_CREAT | O_EXCL) needs to check
				 * existing name, which should be done on both
312
				 * old and new layout, check old layout on
313
314
				 * client side.
				 */
315
				rc = lmv_old_layout_lookup(lmv, op_data);
316
317
318
				if (rc != -ENOENT)
					RETURN(rc);

319
				op_data->op_new_layout = true;
320
321
322
323
324
325
326
327
328
329
330
331
			} else {
				/*
				 * open(O_CREAT) will be sent to MDT in old
				 * layout first, to avoid creating new file
				 * under old layout, clear O_CREAT.
				 */
				it->it_flags &= ~O_CREAT;
			}
		}
	}

retry:
332
333
334
335
336
337
	if (it->it_flags & MDS_OPEN_BY_FID) {
		LASSERT(fid_is_sane(&op_data->op_fid2));

		/* for striped directory, we can't know parent stripe fid
		 * without name, but we can set it to child fid, and MDT
		 * will obtain it from linkea in open in such case. */
338
		if (lmv_dir_striped(op_data->op_mea1))
339
			op_data->op_fid1 = op_data->op_fid2;
340

341
		tgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
342
343
344
		if (IS_ERR(tgt))
			RETURN(PTR_ERR(tgt));

345
		op_data->op_mds = tgt->ltd_index;
346
	} else {
347
		LASSERT(fid_is_sane(&op_data->op_fid1));
348
349
		LASSERT(it->it_flags & MDS_OPEN_PCC ||
			fid_is_zero(&op_data->op_fid2));
350
351
		LASSERT(op_data->op_name != NULL);

352
		tgt = lmv_locate_tgt(lmv, op_data);
353
354
355
		if (IS_ERR(tgt))
			RETURN(PTR_ERR(tgt));
	}
356

357
358
	/* If it is ready to open the file by FID, do not need
	 * allocate FID at all, otherwise it will confuse MDT */
359
360
	if ((it->it_op & IT_CREAT) && !(it->it_flags & MDS_OPEN_BY_FID ||
					it->it_flags & MDS_OPEN_PCC)) {
361
		/*
362
363
		 * For lookup(IT_CREATE) cases allocate new fid and setup FLD
		 * for it.
364
		 */
365
		rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
366
367
368
		if (rc != 0)
			RETURN(rc);
	}
tappro's avatar
tappro committed
369

370
	CDEBUG(D_INODE, "OPEN_INTENT with fid1="DFID", fid2="DFID","
371
	       " name='%s' -> mds #%u\n", PFID(&op_data->op_fid1),
372
	       PFID(&op_data->op_fid2), op_data->op_name, tgt->ltd_index);
373

374
375
	rc = md_intent_lock(tgt->ltd_exp, op_data, it, reqp, cb_blocking,
			    extra_lock_flags);
376
377
378
379
380
381
	if (rc != 0)
		RETURN(rc);
	/*
	 * Nothing is found, do not access body->fid1 as it is zero and thus
	 * pointless.
	 */
382
383
	if ((it->it_disposition & DISP_LOOKUP_NEG) &&
	    !(it->it_disposition & DISP_OPEN_CREATE) &&
384
385
386
387
388
389
390
391
392
393
394
395
396
	    !(it->it_disposition & DISP_OPEN_OPEN)) {
		if (!(it->it_flags & MDS_OPEN_BY_FID) &&
		    lmv_dir_retry_check_update(op_data)) {
			ptlrpc_req_finished(*reqp);
			it->it_request = NULL;
			it->it_disposition = 0;
			*reqp = NULL;

			it->it_flags = flags;
			fid_zero(&op_data->op_fid2);
			goto retry;
		}

397
		RETURN(rc);
398
	}
399
400
401
402
403

	body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
	if (body == NULL)
		RETURN(-EPROTO);

404
	/* Not cross-ref case, just get out of here. */
405
	if (unlikely((body->mbo_valid & OBD_MD_MDS))) {
406
		rc = lmv_intent_remote(exp, it, &op_data->op_fid1, reqp,
407
408
409
				       cb_blocking, extra_lock_flags,
				       op_data->op_file_secctx_name,
				       op_data->op_file_secctx_name_size);
410
411
412
413
414
415
		if (rc != 0)
			RETURN(rc);

		body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
		if (body == NULL)
			RETURN(-EPROTO);
416
	}
tappro's avatar
tappro committed
417

418
	RETURN(rc);
tappro's avatar
tappro committed
419
420
}

yury's avatar
b=16727    
yury committed
421
422
423
/*
 * Handler for: getattr, lookup and revalidate cases.
 */
424
425
426
427
428
static int
lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
		  struct lookup_intent *it, struct ptlrpc_request **reqp,
		  ldlm_blocking_callback cb_blocking,
		  __u64 extra_lock_flags)
tappro's avatar
tappro committed
429
{
430
431
432
433
434
	struct obd_device *obd = exp->exp_obd;
	struct lmv_obd *lmv = &obd->u.lmv;
	struct lmv_tgt_desc *tgt = NULL;
	struct mdt_body *body;
	int rc;
435
436
	ENTRY;

437
	/* foreign dir is not striped */
438
	if (lmv_dir_foreign(op_data->op_mea1)) {
439
440
441
442
443
444
		/* only allow getattr/lookup for itself */
		if (op_data->op_name != NULL)
			RETURN(-ENODATA);
		RETURN(0);
	}

445
retry:
446
447
448
449
450
451
452
453
454
	if (op_data->op_name) {
		tgt = lmv_locate_tgt(lmv, op_data);
		if (!fid_is_sane(&op_data->op_fid2))
			fid_zero(&op_data->op_fid2);
	} else if (fid_is_sane(&op_data->op_fid2)) {
		tgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
	} else {
		tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
	}
455
	if (IS_ERR(tgt))
456
457
458
		RETURN(PTR_ERR(tgt));

	CDEBUG(D_INODE, "LOOKUP_INTENT with fid1="DFID", fid2="DFID
459
	       ", name='%s' -> mds #%u\n",
460
	       PFID(&op_data->op_fid1), PFID(&op_data->op_fid2),
461
	       op_data->op_name ? op_data->op_name : "<NULL>",
462
	       tgt->ltd_index);
463
464
465

	op_data->op_bias &= ~MDS_CROSS_REF;

466
467
	rc = md_intent_lock(tgt->ltd_exp, op_data, it, reqp, cb_blocking,
			    extra_lock_flags);
468
	if (rc < 0)
469
470
		RETURN(rc);

471
472
473
	if (*reqp == NULL) {
		/* If RPC happens, lsm information will be revalidated
		 * during update_inode process (see ll_update_lsm_md) */
474
		if (lmv_dir_striped(op_data->op_mea2)) {
475
476
			rc = lmv_revalidate_slaves(exp, &op_data->op_fid2,
						   op_data->op_mea2,
477
478
						   cb_blocking,
						   extra_lock_flags);
479
480
481
482
			if (rc != 0)
				RETURN(rc);
		}
		RETURN(rc);
483
484
485
486
487
488
	} else if (it_disposition(it, DISP_LOOKUP_NEG) &&
		   lmv_dir_retry_check_update(op_data)) {
		ptlrpc_req_finished(*reqp);
		it->it_request = NULL;
		it->it_disposition = 0;
		*reqp = NULL;
489

490
		goto retry;
491
	}
492

493
494
495
	if (!it_has_reply_body(it))
		RETURN(0);

496
497
498
499
500
501
502
503
	/*
	 * MDS has returned success. Probably name has been resolved in
	 * remote inode. Let's check this.
	 */
	body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
	if (body == NULL)
		RETURN(-EPROTO);

504
	/* Not cross-ref case, just get out of here. */
505
	if (unlikely((body->mbo_valid & OBD_MD_MDS))) {
506
		rc = lmv_intent_remote(exp, it, NULL, reqp, cb_blocking,
507
508
509
				       extra_lock_flags,
				       op_data->op_file_secctx_name,
				       op_data->op_file_secctx_name_size);
510
511
512
513
514
515
		if (rc != 0)
			RETURN(rc);
		body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
		if (body == NULL)
			RETURN(-EPROTO);
	}
516
517

	RETURN(rc);
tappro's avatar
tappro committed
518
519
520
}

int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
521
522
		    struct lookup_intent *it, struct ptlrpc_request **reqp,
		    ldlm_blocking_callback cb_blocking,
523
		    __u64 extra_lock_flags)
tappro's avatar
tappro committed
524
{
525
	int rc;
526
	ENTRY;
tappro's avatar
tappro committed
527

528
529
	LASSERT(it != NULL);
	LASSERT(fid_is_sane(&op_data->op_fid1));
tappro's avatar
tappro committed
530

531
	CDEBUG(D_INODE, "INTENT LOCK '%s' for "DFID" '%.*s' on "DFID"\n",
532
533
534
		LL_IT2STR(it), PFID(&op_data->op_fid2),
		(int)op_data->op_namelen, op_data->op_name,
		PFID(&op_data->op_fid1));
tappro's avatar
tappro committed
535

536
	if (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT | IT_GETXATTR))
537
538
539
540
541
542
543
544
		rc = lmv_intent_lookup(exp, op_data, it, reqp, cb_blocking,
				       extra_lock_flags);
	else if (it->it_op & IT_OPEN)
		rc = lmv_intent_open(exp, op_data, it, reqp, cb_blocking,
				     extra_lock_flags);
	else
		LBUG();

545
546
547
	if (rc < 0) {
		struct lustre_handle lock_handle;

548
549
		if (it->it_lock_mode != 0) {
			lock_handle.cookie = it->it_lock_handle;
550
551
			ldlm_lock_decref_and_cancel(&lock_handle,
						    it->it_lock_mode);
552
553
		}

554
555
		it->it_lock_handle = 0;
		it->it_lock_mode = 0;
556

557
558
		if (it->it_remote_lock_mode != 0) {
			lock_handle.cookie = it->it_remote_lock_handle;
559
560
			ldlm_lock_decref_and_cancel(&lock_handle,
						    it->it_remote_lock_mode);
561
562
		}

563
564
		it->it_remote_lock_handle = 0;
		it->it_remote_lock_mode = 0;
565
566
	}

567
	RETURN(rc);
tappro's avatar
tappro committed
568
}