lod_object.c 191 KB
Newer Older
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1
2
3
4
5
6
7
8
/*
 * GPL HEADER START
 *
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 2 only,
 * as published by the Free Software Foundation.
9
 *
Alex Zhuravlev's avatar
Alex Zhuravlev committed
10
11
12
13
14
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License version 2 for more details.  A copy is
 * included in the COPYING file that accompanied this code.
15
 *
Alex Zhuravlev's avatar
Alex Zhuravlev committed
16
17
18
19
20
21
22
23
24
25
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 *
 * GPL HEADER END
 */
/*
 * Copyright  2009 Sun Microsystems, Inc. All rights reserved
 * Use is subject to license terms.
 *
26
 * Copyright (c) 2012, 2017, Intel Corporation.
Alex Zhuravlev's avatar
Alex Zhuravlev committed
27
28
29
30
 */
/*
 * lustre/lod/lod_object.c
 *
31
32
33
34
35
 * This file contains implementations of methods for the OSD API
 * for the Logical Object Device (LOD) layer, which provides a virtual
 * local OSD object interface to the MDD layer, and abstracts the
 * addressing of local (OSD) and remote (OSP) objects. The API is
 * described in the file lustre/include/dt_object.h and in
36
 * Documentation/osd-api.txt.
37
 *
Alex Zhuravlev's avatar
Alex Zhuravlev committed
38
39
40
41
42
 * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
 */

#define DEBUG_SUBSYSTEM S_MDS

43
44
#include <linux/random.h>

Alex Zhuravlev's avatar
Alex Zhuravlev committed
45
46
47
48
49
#include <obd.h>
#include <obd_class.h>
#include <obd_support.h>

#include <lustre_fid.h>
50
#include <lustre_linkea.h>
51
#include <lustre_lmv.h>
52
#include <uapi/linux/lustre/lustre_param.h>
53
#include <lustre_swab.h>
54
#include <uapi/linux/lustre/lustre_ver.h>
55
#include <lprocfs_status.h>
56
#include <md_object.h>
Alex Zhuravlev's avatar
Alex Zhuravlev committed
57
58
59

#include "lod_internal.h"

60
61
62
static const char dot[] = ".";
static const char dotdot[] = "..";

63
64
65
66
67
68
69
/**
 * Implementation of dt_index_operations::dio_lookup
 *
 * Used with regular (non-striped) objects.
 *
 * \see dt_index_operations::dio_lookup() in the API description for details.
 */
70
71
static int lod_lookup(const struct lu_env *env, struct dt_object *dt,
		      struct dt_rec *rec, const struct dt_key *key)
Alex Zhuravlev's avatar
Alex Zhuravlev committed
72
73
{
	struct dt_object *next = dt_object_child(dt);
74
	return next->do_index_ops->dio_lookup(env, next, rec, key);
Alex Zhuravlev's avatar
Alex Zhuravlev committed
75
76
}

77
78
79
80
81
82
83
84
/**
 * Implementation of dt_index_operations::dio_declare_insert.
 *
 * Used with regular (non-striped) objects.
 *
 * \see dt_index_operations::dio_declare_insert() in the API description
 * for details.
 */
85
86
87
static int lod_declare_insert(const struct lu_env *env, struct dt_object *dt,
			      const struct dt_rec *rec,
			      const struct dt_key *key, struct thandle *th)
Alex Zhuravlev's avatar
Alex Zhuravlev committed
88
{
89
	return lod_sub_declare_insert(env, dt_object_child(dt), rec, key, th);
Alex Zhuravlev's avatar
Alex Zhuravlev committed
90
91
}

92
93
94
95
96
97
98
/**
 * Implementation of dt_index_operations::dio_insert.
 *
 * Used with regular (non-striped) objects
 *
 * \see dt_index_operations::dio_insert() in the API description for details.
 */
99
100
static int lod_insert(const struct lu_env *env, struct dt_object *dt,
		      const struct dt_rec *rec, const struct dt_key *key,
101
		      struct thandle *th)
Alex Zhuravlev's avatar
Alex Zhuravlev committed
102
{
103
	return lod_sub_insert(env, dt_object_child(dt), rec, key, th);
Alex Zhuravlev's avatar
Alex Zhuravlev committed
104
105
}

106
107
108
109
110
111
112
113
/**
 * Implementation of dt_index_operations::dio_declare_delete.
 *
 * Used with regular (non-striped) objects.
 *
 * \see dt_index_operations::dio_declare_delete() in the API description
 * for details.
 */
114
115
static int lod_declare_delete(const struct lu_env *env, struct dt_object *dt,
			      const struct dt_key *key, struct thandle *th)
Alex Zhuravlev's avatar
Alex Zhuravlev committed
116
{
117
	return lod_sub_declare_delete(env, dt_object_child(dt), key, th);
Alex Zhuravlev's avatar
Alex Zhuravlev committed
118
119
}

120
121
122
123
124
125
126
/**
 * Implementation of dt_index_operations::dio_delete.
 *
 * Used with regular (non-striped) objects.
 *
 * \see dt_index_operations::dio_delete() in the API description for details.
 */
127
128
static int lod_delete(const struct lu_env *env, struct dt_object *dt,
		      const struct dt_key *key, struct thandle *th)
Alex Zhuravlev's avatar
Alex Zhuravlev committed
129
{
130
	return lod_sub_delete(env, dt_object_child(dt), key, th);
Alex Zhuravlev's avatar
Alex Zhuravlev committed
131
132
}

133
134
135
136
137
138
139
/**
 * Implementation of dt_it_ops::init.
 *
 * Used with regular (non-striped) objects.
 *
 * \see dt_it_ops::init() in the API description for details.
 */
Alex Zhuravlev's avatar
Alex Zhuravlev committed
140
static struct dt_it *lod_it_init(const struct lu_env *env,
141
				 struct dt_object *dt, __u32 attr)
Alex Zhuravlev's avatar
Alex Zhuravlev committed
142
{
143
144
145
146
	struct dt_object	*next = dt_object_child(dt);
	struct lod_it		*it = &lod_env_info(env)->lti_it;
	struct dt_it		*it_next;

147
	it_next = next->do_index_ops->dio_it.init(env, next, attr);
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
	if (IS_ERR(it_next))
		return it_next;

	/* currently we do not use more than one iterator per thread
	 * so we store it in thread info. if at some point we need
	 * more active iterators in a single thread, we can allocate
	 * additional ones */
	LASSERT(it->lit_obj == NULL);

	it->lit_it = it_next;
	it->lit_obj = next;

	return (struct dt_it *)it;
}

#define LOD_CHECK_IT(env, it)					\
164
do {								\
165
166
	LASSERT((it)->lit_obj != NULL);				\
	LASSERT((it)->lit_it != NULL);				\
167
} while (0)
168

169
170
171
172
173
174
175
/**
 * Implementation of dt_index_operations::dio_it.fini.
 *
 * Used with regular (non-striped) objects.
 *
 * \see dt_index_operations::dio_it.fini() in the API description for details.
 */
176
static void lod_it_fini(const struct lu_env *env, struct dt_it *di)
177
178
179
180
181
182
183
184
185
186
187
{
	struct lod_it *it = (struct lod_it *)di;

	LOD_CHECK_IT(env, it);
	it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);

	/* the iterator not in use any more */
	it->lit_obj = NULL;
	it->lit_it = NULL;
}

188
189
190
191
192
193
194
/**
 * Implementation of dt_it_ops::get.
 *
 * Used with regular (non-striped) objects.
 *
 * \see dt_it_ops::get() in the API description for details.
 */
195
196
static int lod_it_get(const struct lu_env *env, struct dt_it *di,
		      const struct dt_key *key)
197
198
199
200
201
202
203
{
	const struct lod_it *it = (const struct lod_it *)di;

	LOD_CHECK_IT(env, it);
	return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
}

204
205
206
207
208
209
210
/**
 * Implementation of dt_it_ops::put.
 *
 * Used with regular (non-striped) objects.
 *
 * \see dt_it_ops::put() in the API description for details.
 */
211
static void lod_it_put(const struct lu_env *env, struct dt_it *di)
212
213
214
215
216
217
218
{
	struct lod_it *it = (struct lod_it *)di;

	LOD_CHECK_IT(env, it);
	return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
}

219
220
221
222
223
224
225
/**
 * Implementation of dt_it_ops::next.
 *
 * Used with regular (non-striped) objects
 *
 * \see dt_it_ops::next() in the API description for details.
 */
226
static int lod_it_next(const struct lu_env *env, struct dt_it *di)
227
228
229
230
231
232
233
{
	struct lod_it *it = (struct lod_it *)di;

	LOD_CHECK_IT(env, it);
	return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
}

234
235
236
237
238
239
240
/**
 * Implementation of dt_it_ops::key.
 *
 * Used with regular (non-striped) objects.
 *
 * \see dt_it_ops::key() in the API description for details.
 */
241
242
static struct dt_key *lod_it_key(const struct lu_env *env,
				 const struct dt_it *di)
243
244
245
246
247
248
249
{
	const struct lod_it *it = (const struct lod_it *)di;

	LOD_CHECK_IT(env, it);
	return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
}

250
251
252
253
254
255
256
/**
 * Implementation of dt_it_ops::key_size.
 *
 * Used with regular (non-striped) objects.
 *
 * \see dt_it_ops::key_size() in the API description for details.
 */
257
static int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
258
259
{
	struct lod_it *it = (struct lod_it *)di;
Alex Zhuravlev's avatar
Alex Zhuravlev committed
260

261
262
263
264
	LOD_CHECK_IT(env, it);
	return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
}

265
266
267
268
269
270
271
/**
 * Implementation of dt_it_ops::rec.
 *
 * Used with regular (non-striped) objects.
 *
 * \see dt_it_ops::rec() in the API description for details.
 */
272
273
static int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
		      struct dt_rec *rec, __u32 attr)
274
275
276
277
{
	const struct lod_it *it = (const struct lod_it *)di;

	LOD_CHECK_IT(env, it);
278
279
280
281
	return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec,
						     attr);
}

282
283
284
285
286
287
288
/**
 * Implementation of dt_it_ops::rec_size.
 *
 * Used with regular (non-striped) objects.
 *
 * \see dt_it_ops::rec_size() in the API description for details.
 */
289
290
static int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di,
			   __u32 attr)
291
292
293
294
295
296
{
	const struct lod_it *it = (const struct lod_it *)di;

	LOD_CHECK_IT(env, it);
	return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it,
							  attr);
297
298
}

299
300
301
302
303
304
305
/**
 * Implementation of dt_it_ops::store.
 *
 * Used with regular (non-striped) objects.
 *
 * \see dt_it_ops::store() in the API description for details.
 */
306
static __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
307
308
309
310
311
312
313
{
	const struct lod_it *it = (const struct lod_it *)di;

	LOD_CHECK_IT(env, it);
	return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
}

314
315
316
317
318
319
320
/**
 * Implementation of dt_it_ops::load.
 *
 * Used with regular (non-striped) objects.
 *
 * \see dt_it_ops::load() in the API description for details.
 */
321
322
static int lod_it_load(const struct lu_env *env, const struct dt_it *di,
		       __u64 hash)
323
324
325
326
327
328
329
{
	const struct lod_it *it = (const struct lod_it *)di;

	LOD_CHECK_IT(env, it);
	return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
}

330
331
332
333
334
335
336
/**
 * Implementation of dt_it_ops::key_rec.
 *
 * Used with regular (non-striped) objects.
 *
 * \see dt_it_ops::rec() in the API description for details.
 */
337
338
static int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
			  void *key_rec)
339
340
341
342
{
	const struct lod_it *it = (const struct lod_it *)di;

	LOD_CHECK_IT(env, it);
343
344
	return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it,
							 key_rec);
Alex Zhuravlev's avatar
Alex Zhuravlev committed
345
346
347
}

static struct dt_index_operations lod_index_ops = {
348
349
350
351
352
	.dio_lookup		= lod_lookup,
	.dio_declare_insert	= lod_declare_insert,
	.dio_insert		= lod_insert,
	.dio_declare_delete	= lod_declare_delete,
	.dio_delete		= lod_delete,
353
354
355
356
357
358
359
360
361
	.dio_it	= {
		.init		= lod_it_init,
		.fini		= lod_it_fini,
		.get		= lod_it_get,
		.put		= lod_it_put,
		.next		= lod_it_next,
		.key		= lod_it_key,
		.key_size	= lod_it_key_size,
		.rec		= lod_it_rec,
362
		.rec_size	= lod_it_rec_size,
363
364
365
		.store		= lod_it_store,
		.load		= lod_it_load,
		.key_rec	= lod_it_key_rec,
Alex Zhuravlev's avatar
Alex Zhuravlev committed
366
367
368
	}
};

369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
/**
 * Implementation of dt_index_operations::dio_lookup
 *
 * Used with striped directories.
 *
 * \see dt_index_operations::dio_lookup() in the API description for details.
 */
static int lod_striped_lookup(const struct lu_env *env, struct dt_object *dt,
		      struct dt_rec *rec, const struct dt_key *key)
{
	struct lod_object *lo = lod_dt_obj(dt);
	struct dt_object *next;
	const char *name = (const char *)key;

	LASSERT(lo->ldo_dir_stripe_count > 0);

	if (strcmp(name, dot) == 0) {
		struct lu_fid *fid = (struct lu_fid *)rec;

		*fid = *lod_object_fid(lo);
		return 1;
	}

	if (strcmp(name, dotdot) == 0) {
		next = dt_object_child(dt);
	} else {
		int index;

		index = lmv_name_to_stripe_index(lo->ldo_dir_hash_type,
						 lo->ldo_dir_stripe_count,
						 name, strlen(name));
		if (index < 0)
			return index;

		next = lo->ldo_stripe[index];
		if (!next || !dt_object_exists(next))
			return -ENODEV;
	}

	return next->do_index_ops->dio_lookup(env, next, rec, key);
}

411
/**
412
 * Implementation of dt_it_ops::init.
413
 *
414
415
 * Used with striped objects. Internally just initializes the iterator
 * on the first stripe.
416
 *
417
 * \see dt_it_ops::init() in the API description for details.
418
419
 */
static struct dt_it *lod_striped_it_init(const struct lu_env *env,
420
					 struct dt_object *dt, __u32 attr)
421
{
422
423
424
425
426
	struct lod_object *lo = lod_dt_obj(dt);
	struct dt_object *next;
	struct lod_it *it = &lod_env_info(env)->lti_it;
	struct dt_it *it_next;
	__u16 index = 0;
427

428
	LASSERT(lo->ldo_dir_stripe_count > 0);
429
430
431
432
433
434
435
436
437
438
439

	do {
		next = lo->ldo_stripe[index];
		if (next && dt_object_exists(next))
			break;
	} while (++index < lo->ldo_dir_stripe_count);

	/* no valid stripe */
	if (!next || !dt_object_exists(next))
		return ERR_PTR(-ENODEV);

440
441
	LASSERT(next->do_index_ops != NULL);

442
	it_next = next->do_index_ops->dio_it.init(env, next, attr);
443
444
445
446
447
448
449
450
451
	if (IS_ERR(it_next))
		return it_next;

	/* currently we do not use more than one iterator per thread
	 * so we store it in thread info. if at some point we need
	 * more active iterators in a single thread, we can allocate
	 * additional ones */
	LASSERT(it->lit_obj == NULL);

452
	it->lit_stripe_index = index;
453
454
455
456
457
458
459
	it->lit_attr = attr;
	it->lit_it = it_next;
	it->lit_obj = dt;

	return (struct dt_it *)it;
}

460
461
462
463
#define LOD_CHECK_STRIPED_IT(env, it, lo)				\
do {									\
	LASSERT((it)->lit_obj != NULL);					\
	LASSERT((it)->lit_it != NULL);					\
464
465
	LASSERT((lo)->ldo_dir_stripe_count > 0);			\
	LASSERT((it)->lit_stripe_index < (lo)->ldo_dir_stripe_count);	\
466
467
468
} while (0)

/**
469
 * Implementation of dt_it_ops::fini.
470
 *
471
 * Used with striped objects.
472
 *
473
 * \see dt_it_ops::fini() in the API description for details.
474
475
476
477
478
479
480
 */
static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
{
	struct lod_it		*it = (struct lod_it *)di;
	struct lod_object	*lo = lod_dt_obj(it->lit_obj);
	struct dt_object	*next;

481
482
483
484
	/* If lit_it == NULL, then it means the sub_it has been finished,
	 * which only happens in failure cases, see lod_striped_it_next() */
	if (it->lit_it != NULL) {
		LOD_CHECK_STRIPED_IT(env, it, lo);
485

486
		next = lo->ldo_stripe[it->lit_stripe_index];
487
488
489
490
		if (next) {
			LASSERT(next->do_index_ops != NULL);
			next->do_index_ops->dio_it.fini(env, it->lit_it);
		}
491
	}
492
493
494
495
496
497
498
499

	/* the iterator not in use any more */
	it->lit_obj = NULL;
	it->lit_it = NULL;
	it->lit_stripe_index = 0;
}

/**
500
 * Implementation of dt_it_ops::get.
501
 *
502
503
504
 * Right now it's not used widely, only to reset the iterator to the
 * initial position. It should be possible to implement a full version
 * which chooses a correct stripe to be able to position with any key.
505
 *
506
 * \see dt_it_ops::get() in the API description for details.
507
508
509
510
 */
static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
			      const struct dt_key *key)
{
511
512
513
	const struct lod_it *it = (const struct lod_it *)di;
	struct lod_object *lo = lod_dt_obj(it->lit_obj);
	struct dt_object *next;
514
515
516
517
518

	LOD_CHECK_STRIPED_IT(env, it, lo);

	next = lo->ldo_stripe[it->lit_stripe_index];
	LASSERT(next != NULL);
519
	LASSERT(dt_object_exists(next));
520
521
522
523
524
525
	LASSERT(next->do_index_ops != NULL);

	return next->do_index_ops->dio_it.get(env, it->lit_it, key);
}

/**
526
 * Implementation of dt_it_ops::put.
527
 *
528
529
530
 * Used with striped objects.
 *
 * \see dt_it_ops::put() in the API description for details.
531
532
533
 */
static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
{
534
535
536
537
538
539
540
541
542
543
	struct lod_it *it = (struct lod_it *)di;
	struct lod_object *lo = lod_dt_obj(it->lit_obj);
	struct dt_object *next;

	/*
	 * If lit_it == NULL, then it means the sub_it has been finished,
	 * which only happens in failure cases, see lod_striped_it_next()
	 */
	if (!it->lit_it)
		return;
544
545
546
547
548
549
550
551
552
553
554

	LOD_CHECK_STRIPED_IT(env, it, lo);

	next = lo->ldo_stripe[it->lit_stripe_index];
	LASSERT(next != NULL);
	LASSERT(next->do_index_ops != NULL);

	return next->do_index_ops->dio_it.put(env, it->lit_it);
}

/**
555
 * Implementation of dt_it_ops::next.
556
 *
557
558
 * Used with striped objects. When the end of the current stripe is
 * reached, the method takes the next stripe's iterator.
559
 *
560
 * \see dt_it_ops::next() in the API description for details.
561
562
563
 */
static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
{
564
565
566
567
568
569
570
	struct lod_it *it = (struct lod_it *)di;
	struct lod_object *lo = lod_dt_obj(it->lit_obj);
	struct dt_object *next;
	struct dt_it *it_next;
	__u32 index;
	int rc;

571
572
573
574
575
576
	ENTRY;

	LOD_CHECK_STRIPED_IT(env, it, lo);

	next = lo->ldo_stripe[it->lit_stripe_index];
	LASSERT(next != NULL);
577
	LASSERT(dt_object_exists(next));
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
	LASSERT(next->do_index_ops != NULL);
again:
	rc = next->do_index_ops->dio_it.next(env, it->lit_it);
	if (rc < 0)
		RETURN(rc);

	if (rc == 0 && it->lit_stripe_index == 0)
		RETURN(rc);

	if (rc == 0 && it->lit_stripe_index > 0) {
		struct lu_dirent *ent;

		ent = (struct lu_dirent *)lod_env_info(env)->lti_key;

		rc = next->do_index_ops->dio_it.rec(env, it->lit_it,
						    (struct dt_rec *)ent,
						    it->lit_attr);
		if (rc != 0)
			RETURN(rc);

		/* skip . and .. for slave stripe */
		if ((strncmp(ent->lde_name, ".",
			     le16_to_cpu(ent->lde_namelen)) == 0 &&
		     le16_to_cpu(ent->lde_namelen) == 1) ||
		    (strncmp(ent->lde_name, "..",
			     le16_to_cpu(ent->lde_namelen)) == 0 &&
		     le16_to_cpu(ent->lde_namelen) == 2))
			goto again;

		RETURN(rc);
	}

	next->do_index_ops->dio_it.put(env, it->lit_it);
	next->do_index_ops->dio_it.fini(env, it->lit_it);
612
	it->lit_it = NULL;
613

614
615
616
617
618
619
	/* go to next stripe */
	index = it->lit_stripe_index;
	while (++index < lo->ldo_dir_stripe_count) {
		next = lo->ldo_stripe[index];
		if (!next)
			continue;
620

621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
		if (!dt_object_exists(next))
			continue;

		rc = next->do_ops->do_index_try(env, next,
						&dt_directory_features);
		if (rc != 0)
			RETURN(rc);

		LASSERT(next->do_index_ops != NULL);

		it_next = next->do_index_ops->dio_it.init(env, next,
							  it->lit_attr);
		if (IS_ERR(it_next))
			RETURN(PTR_ERR(it_next));

		rc = next->do_index_ops->dio_it.get(env, it_next,
						    (const struct dt_key *)"");
		if (rc <= 0)
			RETURN(rc == 0 ? -EIO : rc);
640

641
		it->lit_it = it_next;
642
		it->lit_stripe_index = index;
643
		goto again;
644

645
646
	}

647
	RETURN(1);
648
649
650
}

/**
651
 * Implementation of dt_it_ops::key.
652
 *
653
 * Used with striped objects.
654
 *
655
 * \see dt_it_ops::key() in the API description for details.
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
 */
static struct dt_key *lod_striped_it_key(const struct lu_env *env,
					 const struct dt_it *di)
{
	const struct lod_it	*it = (const struct lod_it *)di;
	struct lod_object	*lo = lod_dt_obj(it->lit_obj);
	struct dt_object	*next;

	LOD_CHECK_STRIPED_IT(env, it, lo);

	next = lo->ldo_stripe[it->lit_stripe_index];
	LASSERT(next != NULL);
	LASSERT(next->do_index_ops != NULL);

	return next->do_index_ops->dio_it.key(env, it->lit_it);
}

/**
674
 * Implementation of dt_it_ops::key_size.
675
 *
676
 * Used with striped objects.
677
 *
678
 * \see dt_it_ops::size() in the API description for details.
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
 */
static int lod_striped_it_key_size(const struct lu_env *env,
				   const struct dt_it *di)
{
	struct lod_it		*it = (struct lod_it *)di;
	struct lod_object	*lo = lod_dt_obj(it->lit_obj);
	struct dt_object	*next;

	LOD_CHECK_STRIPED_IT(env, it, lo);

	next = lo->ldo_stripe[it->lit_stripe_index];
	LASSERT(next != NULL);
	LASSERT(next->do_index_ops != NULL);

	return next->do_index_ops->dio_it.key_size(env, it->lit_it);
}

/**
697
 * Implementation of dt_it_ops::rec.
698
 *
699
 * Used with striped objects.
700
 *
701
 * \see dt_it_ops::rec() in the API description for details.
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
 */
static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
			      struct dt_rec *rec, __u32 attr)
{
	const struct lod_it	*it = (const struct lod_it *)di;
	struct lod_object	*lo = lod_dt_obj(it->lit_obj);
	struct dt_object	*next;

	LOD_CHECK_STRIPED_IT(env, it, lo);

	next = lo->ldo_stripe[it->lit_stripe_index];
	LASSERT(next != NULL);
	LASSERT(next->do_index_ops != NULL);

	return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
}

/**
720
 * Implementation of dt_it_ops::rec_size.
721
 *
722
 * Used with striped objects.
723
 *
724
 * \see dt_it_ops::rec_size() in the API description for details.
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
 */
static int lod_striped_it_rec_size(const struct lu_env *env,
				   const struct dt_it *di, __u32 attr)
{
	struct lod_it		*it = (struct lod_it *)di;
	struct lod_object	*lo = lod_dt_obj(it->lit_obj);
	struct dt_object	*next;

	LOD_CHECK_STRIPED_IT(env, it, lo);

	next = lo->ldo_stripe[it->lit_stripe_index];
	LASSERT(next != NULL);
	LASSERT(next->do_index_ops != NULL);

	return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
}

/**
743
 * Implementation of dt_it_ops::store.
744
 *
745
 * Used with striped objects.
746
 *
747
 * \see dt_it_ops::store() in the API description for details.
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
 */
static __u64 lod_striped_it_store(const struct lu_env *env,
				  const struct dt_it *di)
{
	const struct lod_it	*it = (const struct lod_it *)di;
	struct lod_object	*lo = lod_dt_obj(it->lit_obj);
	struct dt_object	*next;

	LOD_CHECK_STRIPED_IT(env, it, lo);

	next = lo->ldo_stripe[it->lit_stripe_index];
	LASSERT(next != NULL);
	LASSERT(next->do_index_ops != NULL);

	return next->do_index_ops->dio_it.store(env, it->lit_it);
}

/**
766
 * Implementation of dt_it_ops::load.
767
 *
768
 * Used with striped objects.
769
 *
770
 * \see dt_it_ops::load() in the API description for details.
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
 */
static int lod_striped_it_load(const struct lu_env *env,
			       const struct dt_it *di, __u64 hash)
{
	const struct lod_it	*it = (const struct lod_it *)di;
	struct lod_object	*lo = lod_dt_obj(it->lit_obj);
	struct dt_object	*next;

	LOD_CHECK_STRIPED_IT(env, it, lo);

	next = lo->ldo_stripe[it->lit_stripe_index];
	LASSERT(next != NULL);
	LASSERT(next->do_index_ops != NULL);

	return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
}

static struct dt_index_operations lod_striped_index_ops = {
789
	.dio_lookup		= lod_striped_lookup,
790
791
792
793
	.dio_declare_insert	= lod_declare_insert,
	.dio_insert		= lod_insert,
	.dio_declare_delete	= lod_declare_delete,
	.dio_delete		= lod_delete,
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
	.dio_it	= {
		.init		= lod_striped_it_init,
		.fini		= lod_striped_it_fini,
		.get		= lod_striped_it_get,
		.put		= lod_striped_it_put,
		.next		= lod_striped_it_next,
		.key		= lod_striped_it_key,
		.key_size	= lod_striped_it_key_size,
		.rec		= lod_striped_it_rec,
		.rec_size	= lod_striped_it_rec_size,
		.store		= lod_striped_it_store,
		.load		= lod_striped_it_load,
	}
};

809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
/**
 * Append the FID for each shard of the striped directory after the
 * given LMV EA header.
 *
 * To simplify striped directory and the consistency verification,
 * we only store the LMV EA header on disk, for both master object
 * and slave objects. When someone wants to know the whole LMV EA,
 * such as client readdir(), we can build the entrie LMV EA on the
 * MDT side (in RAM) via iterating the sub-directory entries that
 * are contained in the master object of the stripe directory.
 *
 * For the master object of the striped directroy, the valid name
 * for each shard is composed of the ${shard_FID}:${shard_idx}.
 *
 * There may be holes in the LMV EA if some shards' name entries
 * are corrupted or lost.
 *
 * \param[in] env	pointer to the thread context
 * \param[in] lo	pointer to the master object of the striped directory
 * \param[in] buf	pointer to the lu_buf which will hold the LMV EA
 * \param[in] resize	whether re-allocate the buffer if it is not big enough
 *
 * \retval		positive size of the LMV EA
 * \retval		0 for nothing to be loaded
 * \retval		negative error number on failure
 */
int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo,
			struct lu_buf *buf, bool resize)
{
	struct lu_dirent	*ent	=
			(struct lu_dirent *)lod_env_info(env)->lti_key;
	struct lod_device	*lod	= lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
	struct dt_object	*obj	= dt_object_child(&lo->ldo_obj);
	struct lmv_mds_md_v1	*lmv1	= buf->lb_buf;
	struct dt_it		*it;
	const struct dt_it_ops	*iops;
	__u32			 stripes;
	__u32			 magic	= le32_to_cpu(lmv1->lmv_magic);
847
	size_t			 lmv1_size;
848
849
850
851
852
853
854
855
856
857
	int			 rc;
	ENTRY;

	if (magic != LMV_MAGIC_V1)
		RETURN(0);

	stripes = le32_to_cpu(lmv1->lmv_stripe_count);
	if (stripes < 1)
		RETURN(0);

858
859
860
861
862
	rc = lmv_mds_md_size(stripes, magic);
	if (rc < 0)
		RETURN(rc);
	lmv1_size = rc;
	if (buf->lb_len < lmv1_size) {
863
864
865
866
867
868
869
870
		struct lu_buf tbuf;

		if (!resize)
			RETURN(-ERANGE);

		tbuf = *buf;
		buf->lb_buf = NULL;
		buf->lb_len = 0;
871
		lu_buf_alloc(buf, lmv1_size);
872
873
874
875
876
877
878
879
880
881
882
883
		lmv1 = buf->lb_buf;
		if (lmv1 == NULL)
			RETURN(-ENOMEM);

		memcpy(buf->lb_buf, tbuf.lb_buf, tbuf.lb_len);
	}

	if (unlikely(!dt_try_as_dir(env, obj)))
		RETURN(-ENOTDIR);

	memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid));
	iops = &obj->do_index_ops->dio_it;
884
	it = iops->init(env, obj, LUDA_64BITHASH);
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
	if (IS_ERR(it))
		RETURN(PTR_ERR(it));

	rc = iops->load(env, it, 0);
	if (rc == 0)
		rc = iops->next(env, it);
	else if (rc > 0)
		rc = 0;

	while (rc == 0) {
		char		 name[FID_LEN + 2] = "";
		struct lu_fid	 fid;
		__u32		 index;
		int		 len;

		rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
		if (rc != 0)
			break;

		rc = -EIO;

		fid_le_to_cpu(&fid, &ent->lde_fid);
		ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
		if (ent->lde_name[0] == '.') {
			if (ent->lde_namelen == 1)
				goto next;

			if (ent->lde_namelen == 2 && ent->lde_name[1] == '.')
				goto next;
		}

916
917
		len = snprintf(name, sizeof(name),
			       DFID":", PFID(&ent->lde_fid));
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
		/* The ent->lde_name is composed of ${FID}:${index} */
		if (ent->lde_namelen < len + 1 ||
		    memcmp(ent->lde_name, name, len) != 0) {
			CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
			       "%s: invalid shard name %.*s with the FID "DFID
			       " for the striped directory "DFID", %s\n",
			       lod2obd(lod)->obd_name, ent->lde_namelen,
			       ent->lde_name, PFID(&fid),
			       PFID(lu_object_fid(&obj->do_lu)),
			       lod->lod_lmv_failout ? "failout" : "skip");

			if (lod->lod_lmv_failout)
				break;

			goto next;
		}

		index = 0;
		do {
			if (ent->lde_name[len] < '0' ||
			    ent->lde_name[len] > '9') {
				CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
				       "%s: invalid shard name %.*s with the "
				       "FID "DFID" for the striped directory "
				       DFID", %s\n",
				       lod2obd(lod)->obd_name, ent->lde_namelen,
				       ent->lde_name, PFID(&fid),
				       PFID(lu_object_fid(&obj->do_lu)),
				       lod->lod_lmv_failout ?
				       "failout" : "skip");

				if (lod->lod_lmv_failout)
					break;

				goto next;
			}

			index = index * 10 + ent->lde_name[len++] - '0';
		} while (len < ent->lde_namelen);

		if (len == ent->lde_namelen) {
			/* Out of LMV EA range. */
			if (index >= stripes) {
				CERROR("%s: the shard %.*s for the striped "
				       "directory "DFID" is out of the known "
				       "LMV EA range [0 - %u], failout\n",
				       lod2obd(lod)->obd_name, ent->lde_namelen,
				       ent->lde_name,
				       PFID(lu_object_fid(&obj->do_lu)),
				       stripes - 1);

				break;
			}

			/* The slot has been occupied. */
			if (!fid_is_zero(&lmv1->lmv_stripe_fids[index])) {
				struct lu_fid fid0;

				fid_le_to_cpu(&fid0,
					&lmv1->lmv_stripe_fids[index]);
				CERROR("%s: both the shard "DFID" and "DFID
				       " for the striped directory "DFID
				       " claim the same LMV EA slot at the "
				       "index %d, failout\n",
				       lod2obd(lod)->obd_name,
				       PFID(&fid0), PFID(&fid),
				       PFID(lu_object_fid(&obj->do_lu)), index);

				break;
			}

			/* stored as LE mode */
			lmv1->lmv_stripe_fids[index] = ent->lde_fid;

next:
			rc = iops->next(env, it);
		}
	}

	iops->put(env, it);
	iops->fini(env, it);

	RETURN(rc > 0 ? lmv_mds_md_size(stripes, magic) : rc);
}

1003
/**
1004
 * Implementation of dt_object_operations::do_index_try.
1005
 *
1006
 * \see dt_object_operations::do_index_try() in the API description for details.
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
 */
static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
			 const struct dt_index_features *feat)
{
	struct lod_object	*lo = lod_dt_obj(dt);
	struct dt_object	*next = dt_object_child(dt);
	int			rc;
	ENTRY;

	LASSERT(next->do_ops);
	LASSERT(next->do_ops->do_index_try);

1019
	rc = lod_striping_load(env, lo);
1020
1021
1022
1023
1024
1025
1026
	if (rc != 0)
		RETURN(rc);

	rc = next->do_ops->do_index_try(env, next, feat);
	if (rc != 0)
		RETURN(rc);

1027
	if (lo->ldo_dir_stripe_count > 0) {
1028
1029
		int i;

1030
		for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
1031
1032
1033
			if (!lo->ldo_stripe[i])
				continue;
			if (!dt_object_exists(lo->ldo_stripe[i]))
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
				continue;
			rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
						lo->ldo_stripe[i], feat);
			if (rc != 0)
				RETURN(rc);
		}
		dt->do_index_ops = &lod_striped_index_ops;
	} else {
		dt->do_index_ops = &lod_index_ops;
	}

	RETURN(rc);
}

1048
1049
1050
1051
1052
/**
 * Implementation of dt_object_operations::do_read_lock.
 *
 * \see dt_object_operations::do_read_lock() in the API description for details.
 */
1053
1054
static void lod_read_lock(const struct lu_env *env, struct dt_object *dt,
			  unsigned role)
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1055
1056
1057
1058
{
	dt_read_lock(env, dt_object_child(dt), role);
}

1059
1060
1061
1062
1063
1064
/**
 * Implementation of dt_object_operations::do_write_lock.
 *
 * \see dt_object_operations::do_write_lock() in the API description for
 * details.
 */
1065
1066
static void lod_write_lock(const struct lu_env *env, struct dt_object *dt,
			   unsigned role)
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1067
1068
1069
1070
{
	dt_write_lock(env, dt_object_child(dt), role);
}

1071
1072
1073
1074
1075
1076
/**
 * Implementation of dt_object_operations::do_read_unlock.
 *
 * \see dt_object_operations::do_read_unlock() in the API description for
 * details.
 */
1077
static void lod_read_unlock(const struct lu_env *env, struct dt_object *dt)
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1078
1079
1080
1081
{
	dt_read_unlock(env, dt_object_child(dt));
}

1082
1083
1084
1085
1086
1087
/**
 * Implementation of dt_object_operations::do_write_unlock.
 *
 * \see dt_object_operations::do_write_unlock() in the API description for
 * details.
 */
1088
static void lod_write_unlock(const struct lu_env *env, struct dt_object *dt)
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1089
1090
1091
1092
{
	dt_write_unlock(env, dt_object_child(dt));
}

1093
1094
1095
1096
1097
1098
/**
 * Implementation of dt_object_operations::do_write_locked.
 *
 * \see dt_object_operations::do_write_locked() in the API description for
 * details.
 */
1099
static int lod_write_locked(const struct lu_env *env, struct dt_object *dt)
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1100
1101
1102
1103
{
	return dt_write_locked(env, dt_object_child(dt));
}

1104
1105
1106
1107
1108
/**
 * Implementation of dt_object_operations::do_attr_get.
 *
 * \see dt_object_operations::do_attr_get() in the API description for details.
 */
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1109
1110
static int lod_attr_get(const struct lu_env *env,
			struct dt_object *dt,
1111
			struct lu_attr *attr)
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1112
{
1113
1114
1115
1116
	/* Note: for striped directory, client will merge attributes
	 * from all of the sub-stripes see lmv_merge_attr(), and there
	 * no MDD logic depend on directory nlink/size/time, so we can
	 * always use master inode nlink and size for now. */
1117
	return dt_attr_get(env, dt_object_child(dt), attr);
1118
1119
}

1120
static inline void lod_adjust_stripe_info(struct lod_layout_component *comp,
1121
1122
					  struct lov_desc *desc,
					  int append_stripes)
1123
1124
{
	if (comp->llc_pattern != LOV_PATTERN_MDT) {
1125
1126
1127
		if (append_stripes) {
			comp->llc_stripe_count = append_stripes;
		} else if (!comp->llc_stripe_count) {
1128
1129
			comp->llc_stripe_count =
				desc->ld_default_stripe_count;
1130
		}
1131
1132
1133
1134
1135
	}
	if (comp->llc_stripe_size <= 0)
		comp->llc_stripe_size = desc->ld_default_stripe_size;
}

1136
int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
1137
			    struct thandle *th,
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
			    struct lod_obj_stripe_cb_data *data)
{
	struct lod_layout_component *lod_comp;
	int i, j, rc;
	ENTRY;

	LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL);
	for (i = 0; i < lo->ldo_comp_cnt; i++) {
		lod_comp = &lo->ldo_comp_entries[i];

		if (lod_comp->llc_stripe == NULL)
			continue;

1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
		/* has stripe but not inited yet, this component has been
		 * declared to be created, but hasn't created yet.
		 */
		if (!lod_comp_inited(lod_comp))
			continue;

		if (data->locd_comp_skip_cb &&
		    data->locd_comp_skip_cb(env, lo, i, data))
			continue;

1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
		if (data->locd_comp_cb) {
			rc = data->locd_comp_cb(env, lo, i, data);
			if (rc)
				RETURN(rc);
		}

		/* could used just to do sth about component, not each
		 * stripes
		 */
		if (!data->locd_stripe_cb)
			continue;

1173
1174
		LASSERT(lod_comp->llc_stripe_count > 0);
		for (j = 0; j < lod_comp->llc_stripe_count; j++) {
1175
1176
1177
1178
			struct dt_object *dt = lod_comp->llc_stripe[j];

			if (dt == NULL)
				continue;
1179
			rc = data->locd_stripe_cb(env, lo, dt, th, i, j, data);
1180
1181
1182
1183
1184
1185
1186
			if (rc != 0)
				RETURN(rc);
		}
	}
	RETURN(0);
}

1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
static bool lod_obj_attr_set_comp_skip_cb(const struct lu_env *env,
		struct lod_object *lo, int comp_idx,
		struct lod_obj_stripe_cb_data *data)
{
	struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[comp_idx];
	bool skipped = false;

	if (!(data->locd_attr->la_valid & LA_LAYOUT_VERSION))
		return skipped;

	switch (lo->ldo_flr_state) {
	case LCM_FL_WRITE_PENDING: {
		int i;

		/* skip stale components */
		if (lod_comp->llc_flags & LCME_FL_STALE) {
			skipped = true;
			break;
		}

		/* skip valid and overlapping components, therefore any
		 * attempts to write overlapped components will never succeed
		 * because client will get EINPROGRESS. */
		for (i = 0; i < lo->ldo_comp_cnt; i++) {
			if (i == comp_idx)
				continue;

			if (lo->ldo_comp_entries[i].llc_flags & LCME_FL_STALE)
				continue;

			if (lu_extent_is_overlapped(&lod_comp->llc_extent,
					&lo->ldo_comp_entries[i].llc_extent)) {
				skipped = true;
				break;
			}
		}
		break;
	}
	default:
		LASSERTF(0, "impossible: %d\n", lo->ldo_flr_state);
	case LCM_FL_SYNC_PENDING:
		break;
	}

	CDEBUG(D_LAYOUT, DFID": %s to set component %x to version: %u\n",
	       PFID(lu_object_fid(&lo->ldo_obj.do_lu)),
	       skipped ? "skipped" : "chose", lod_comp->llc_id,
	       data->locd_attr->la_layout_version);

	return skipped;
}

1239
1240
1241
static inline int
lod_obj_stripe_attr_set_cb(const struct lu_env *env, struct lod_object *lo,
			   struct dt_object *dt, struct thandle *th,
1242
1243
			   int comp_idx, int stripe_idx,
			   struct lod_obj_stripe_cb_data *data)
1244
1245
{
	if (data->locd_declare)
1246
1247
		return lod_sub_declare_attr_set(env, dt, data->locd_attr, th);

1248
1249
1250
1251
1252
1253
	if (data->locd_attr->la_valid & LA_LAYOUT_VERSION) {
		CDEBUG(D_LAYOUT, DFID": set layout version: %u, comp_idx: %d\n",
		       PFID(lu_object_fid(&dt->do_lu)),
		       data->locd_attr->la_layout_version, comp_idx);
	}

1254
	return lod_sub_attr_set(env, dt, data->locd_attr, th);
1255
1256
}

1257
1258
1259
1260
1261
1262
1263
1264
/**
 * Implementation of dt_object_operations::do_declare_attr_set.
 *
 * If the object is striped, then apply the changes to all the stripes.
 *
 * \see dt_object_operations::do_declare_attr_set() in the API description
 * for details.
 */
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1265
1266
1267
static int lod_declare_attr_set(const struct lu_env *env,
				struct dt_object *dt,
				const struct lu_attr *attr,
1268
				struct thandle *th)
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1269
1270
{
	struct dt_object  *next = dt_object_child(dt);
1271
1272
	struct lod_object *lo = lod_dt_obj(dt);
	int                rc, i;
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1273
1274
1275
1276
1277
	ENTRY;

	/*
	 * declare setattr on the local object
	 */
1278
	rc = lod_sub_declare_attr_set(env, next, attr, th);
1279
1280
1281
	if (rc)
		RETURN(rc);

1282
	/* osp_declare_attr_set() ignores all attributes other than
1283
1284
1285
1286
1287
1288
1289
	 * UID, GID, PROJID, and size, and osp_attr_set() ignores all
	 * but UID, GID and PROJID. Declaration of size attr setting
	 * happens through lod_declare_init_size(), and not through
	 * this function. Therefore we need not load striping unless
	 * ownership is changing.  This should save memory and (we hope)
	 * speed up rename().
	 */
1290
	if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
1291
		if (!(attr->la_valid & LA_REMOTE_ATTR_SET))
1292
			RETURN(rc);
1293
1294
1295

		if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
			RETURN(0);
1296
	} else {
1297
		if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID | LA_MODE |
1298
1299
					LA_ATIME | LA_MTIME | LA_CTIME |
					LA_FLAGS)))
1300
1301
			RETURN(rc);
	}
1302
1303
1304
1305
1306
	/*
	 * load striping information, notice we don't do this when object
	 * is being initialized as we don't need this information till
	 * few specific cases like destroy, chown
	 */
1307
	rc = lod_striping_load(env, lo);
1308
1309
1310
	if (rc)
		RETURN(rc);

1311
	if (!lod_obj_is_striped(dt))
1312
1313
		RETURN(0);

1314
1315
1316
	/*
	 * if object is striped declare changes on the stripes
	 */
1317
1318
	if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
		LASSERT(lo->ldo_stripe);
1319
		for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
1320
1321
			if (lo->ldo_stripe[i] == NULL)
				continue;
1322
1323
			if (!dt_object_exists(lo->ldo_stripe[i]))
				continue;
1324
1325
			rc = lod_sub_declare_attr_set(env, lo->ldo_stripe[i],
						      attr, th);
1326
1327
1328
1329
			if (rc != 0)
				RETURN(rc);
		}
	} else {
1330
		struct lod_obj_stripe_cb_data data = { { 0 } };
1331
1332
1333

		data.locd_attr = attr;
		data.locd_declare = true;
1334
1335
		data.locd_stripe_cb = lod_obj_stripe_attr_set_cb;
		rc = lod_obj_for_each_stripe(env, lo, th, &data);
1336
	}
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1337

1338
1339
1340
1341
1342
1343
1344
1345
	if (rc)
		RETURN(rc);

	if (!dt_object_exists(next) || dt_object_remote(next) ||
	    !S_ISREG(attr->la_mode))
		RETURN(0);

	if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE)) {
1346
		rc = lod_sub_declare_xattr_del(env, next, XATTR_NAME_LOV, th);
1347
1348
1349
1350
1351
		RETURN(rc);
	}

	if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) ||
	    OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PFL_RANGE)) {
1352
1353
1354
1355
1356
		struct lod_thread_info *info = lod_env_info(env);
		struct lu_buf *buf = &info->lti_buf;

		buf->lb_buf = info->lti_ea_store;
		buf->lb_len = info->lti_ea_store_size;
1357
1358
		rc = lod_sub_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
					       LU_XATTR_REPLACE, th);
1359
1360
	}

Alex Zhuravlev's avatar
Alex Zhuravlev committed
1361
1362
1363
	RETURN(rc);
}

1364
1365
1366
1367
1368
1369
1370
1371
/**
 * Implementation of dt_object_operations::do_attr_set.
 *
 * If the object is striped, then apply the changes to all or subset of
 * the stripes depending on the object type and specific attributes.
 *
 * \see dt_object_operations::do_attr_set() in the API description for details.
 */
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1372
1373
1374
static int lod_attr_set(const struct lu_env *env,
			struct dt_object *dt,
			const struct lu_attr *attr,
1375
			struct thandle *th)
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1376
{
1377
1378
1379
	struct dt_object	*next = dt_object_child(dt);
	struct lod_object	*lo = lod_dt_obj(dt);
	int			rc, i;
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1380
1381
1382
1383
1384
	ENTRY;

	/*
	 * apply changes to the local object
	 */
1385
	rc = lod_sub_attr_set(env, next, attr, th);
Alex Zhuravlev's avatar
Alex Zhuravlev committed
1386
1387
1388
	if (rc)
		RETURN(rc);

1389
	if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
1390
		if (!(attr->la_valid & LA_REMOTE_ATTR_SET))
1391
			RETURN(rc);
1392
1393