Loading fs/ceph/osd_client.c +45 −93 Original line number Diff line number Diff line Loading @@ -13,7 +13,8 @@ #include "decode.h" #include "auth.h" #define OSD_REPLY_RESERVE_FRONT_LEN 512 #define OSD_OP_FRONT_LEN 4096 #define OSD_OPREPLY_FRONT_LEN 512 const static struct ceph_connection_operations osd_con_ops; Loading Loading @@ -75,17 +76,6 @@ static void calc_layout(struct ceph_osd_client *osdc, req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages); } static void remove_replies(struct ceph_osd_request *req) { int i; int max = ARRAY_SIZE(req->replies); for (i=0; i<max; i++) { if (req->replies[i]) ceph_msg_put(req->replies[i]); } } /* * requests */ Loading @@ -99,7 +89,6 @@ void ceph_osdc_release_request(struct kref *kref) ceph_msg_put(req->r_request); if (req->r_reply) ceph_msg_put(req->r_reply); remove_replies(req); if (req->r_con_filling_msg) { dout("release_request revoking pages %p from con %p\n", req->r_pages, req->r_con_filling_msg); Loading @@ -117,60 +106,6 @@ void ceph_osdc_release_request(struct kref *kref) kfree(req); } static int alloc_replies(struct ceph_osd_request *req, int num_reply) { int i; int max = ARRAY_SIZE(req->replies); BUG_ON(num_reply > max); for (i=0; i<num_reply; i++) { req->replies[i] = ceph_msg_new(0, OSD_REPLY_RESERVE_FRONT_LEN, 0, 0, NULL); if (IS_ERR(req->replies[i])) { int j; int err = PTR_ERR(req->replies[i]); for (j = 0; j<=i; j++) { ceph_msg_put(req->replies[j]); } return err; } } for (; i<max; i++) { req->replies[i] = NULL; } req->cur_reply = 0; return 0; } static struct ceph_msg *__get_next_reply(struct ceph_connection *con, struct ceph_osd_request *req, int front_len) { struct ceph_msg *reply; if (req->r_con_filling_msg) { dout("revoking reply msg %p from old con %p\n", req->r_reply, req->r_con_filling_msg); ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); ceph_con_put(req->r_con_filling_msg); req->cur_reply = 0; } reply = req->replies[req->cur_reply]; if (!reply || front_len > OSD_REPLY_RESERVE_FRONT_LEN) { /* maybe we can allocate it now? */ reply = ceph_msg_new(0, front_len, 0, 0, NULL); if (!reply || IS_ERR(reply)) { pr_err(" reply alloc failed, front_len=%d\n", front_len); return ERR_PTR(-ENOMEM); } } req->r_con_filling_msg = ceph_con_get(con); req->r_reply = ceph_msg_get(reply); /* for duration of read over socket */ return ceph_msg_get(reply); } /* * build new request AND message, calculate layout, and adjust file * extent as needed. Loading Loading @@ -201,7 +136,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, void *p; int num_op = 1 + do_sync; size_t msg_size = sizeof(*head) + num_op*sizeof(*op); int err, i; int i; if (use_mempool) { req = mempool_alloc(osdc->req_mempool, GFP_NOFS); Loading @@ -212,13 +147,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (req == NULL) return ERR_PTR(-ENOMEM); err = alloc_replies(req, num_reply); if (err) { ceph_osdc_put_request(req); return ERR_PTR(-ENOMEM); } req->r_num_prealloc_reply = num_reply; req->r_osdc = osdc; req->r_mempool = use_mempool; kref_init(&req->r_kref); Loading @@ -229,7 +157,19 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); /* create message; allow space for oid */ /* create reply message */ if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); else msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, OSD_OPREPLY_FRONT_LEN, 0, 0, NULL); if (IS_ERR(msg)) { ceph_osdc_put_request(req); return ERR_PTR(PTR_ERR(msg)); } req->r_reply = msg; /* create request message; allow space for oid */ msg_size += 40; if (snapc) msg_size += sizeof(u64) * snapc->num_snaps; Loading Loading @@ -819,21 +759,11 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, * avoid a (safe but slower) revoke later. */ if (req->r_con_filling_msg == con && req->r_reply == msg) { dout(" got pages, dropping con_filling_msg ref %p\n", con); dout(" dropping con_filling_msg ref %p\n", con); req->r_con_filling_msg = NULL; ceph_con_put(con); } if (req->r_reply) { /* * once we see the message has been received, we don't * need a ref (which is only needed for revoking * pages) */ ceph_msg_put(req->r_reply); req->r_reply = NULL; } if (!req->r_got_reply) { unsigned bytes; Loading Loading @@ -1249,11 +1179,17 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) if (!osdc->req_mempool) goto out; err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true); err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true); if (err < 0) goto out_mempool; err = ceph_msgpool_init(&osdc->msgpool_op_reply, OSD_OPREPLY_FRONT_LEN, 10, true); if (err < 0) goto out_msgpool; return 0; out_msgpool: ceph_msgpool_destroy(&osdc->msgpool_op); out_mempool: mempool_destroy(osdc->req_mempool); out: Loading @@ -1271,6 +1207,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) remove_old_osds(osdc, 1); mempool_destroy(osdc->req_mempool); ceph_msgpool_destroy(&osdc->msgpool_op); ceph_msgpool_destroy(&osdc->msgpool_op_reply); } /* Loading Loading @@ -1405,15 +1342,28 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, if (!req) { *skip = 1; m = NULL; pr_info("alloc_msg unknown tid %llu from osd%d\n", tid, pr_info("get_reply unknown tid %llu from osd%d\n", tid, osd->o_osd); goto out; } m = __get_next_reply(con, req, front); if (!m || IS_ERR(m)) { *skip = 1; if (req->r_con_filling_msg) { dout("get_reply revoking msg %p from old con %p\n", req->r_reply, req->r_con_filling_msg); ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); ceph_con_put(req->r_con_filling_msg); } if (front > req->r_reply->front.iov_len) { pr_warning("get_reply front %d > preallocated %d\n", front, (int)req->r_reply->front.iov_len); m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL); if (IS_ERR(m)) goto out; ceph_msg_put(req->r_reply); req->r_reply = m; } m = ceph_msg_get(req->r_reply); if (data_len > 0) { err = __prepare_pages(con, hdr, req, tid, m); Loading @@ -1424,6 +1374,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, } } *skip = 0; req->r_con_filling_msg = ceph_con_get(con); dout("get_reply tid %lld %p\n", tid, m); out: mutex_unlock(&osdc->request_mutex); Loading fs/ceph/osd_client.h +1 −4 Original line number Diff line number Diff line Loading @@ -53,7 +53,6 @@ struct ceph_osd_request { int r_flags; /* any additional flags for the osd */ u32 r_sent; /* >0 if r_request is sending/sent */ int r_got_reply; int r_num_prealloc_reply; struct ceph_osd_client *r_osdc; struct kref r_kref; Loading @@ -77,9 +76,6 @@ struct ceph_osd_request { struct page **r_pages; /* pages for data payload */ int r_pages_from_pool; int r_own_pages; /* if true, i own page list */ struct ceph_msg *replies[2]; int cur_reply; }; struct ceph_osd_client { Loading @@ -106,6 +102,7 @@ struct ceph_osd_client { mempool_t *req_mempool; struct ceph_msgpool msgpool_op; struct ceph_msgpool msgpool_op_reply; }; extern int ceph_osdc_init(struct ceph_osd_client *osdc, Loading Loading
fs/ceph/osd_client.c +45 −93 Original line number Diff line number Diff line Loading @@ -13,7 +13,8 @@ #include "decode.h" #include "auth.h" #define OSD_REPLY_RESERVE_FRONT_LEN 512 #define OSD_OP_FRONT_LEN 4096 #define OSD_OPREPLY_FRONT_LEN 512 const static struct ceph_connection_operations osd_con_ops; Loading Loading @@ -75,17 +76,6 @@ static void calc_layout(struct ceph_osd_client *osdc, req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages); } static void remove_replies(struct ceph_osd_request *req) { int i; int max = ARRAY_SIZE(req->replies); for (i=0; i<max; i++) { if (req->replies[i]) ceph_msg_put(req->replies[i]); } } /* * requests */ Loading @@ -99,7 +89,6 @@ void ceph_osdc_release_request(struct kref *kref) ceph_msg_put(req->r_request); if (req->r_reply) ceph_msg_put(req->r_reply); remove_replies(req); if (req->r_con_filling_msg) { dout("release_request revoking pages %p from con %p\n", req->r_pages, req->r_con_filling_msg); Loading @@ -117,60 +106,6 @@ void ceph_osdc_release_request(struct kref *kref) kfree(req); } static int alloc_replies(struct ceph_osd_request *req, int num_reply) { int i; int max = ARRAY_SIZE(req->replies); BUG_ON(num_reply > max); for (i=0; i<num_reply; i++) { req->replies[i] = ceph_msg_new(0, OSD_REPLY_RESERVE_FRONT_LEN, 0, 0, NULL); if (IS_ERR(req->replies[i])) { int j; int err = PTR_ERR(req->replies[i]); for (j = 0; j<=i; j++) { ceph_msg_put(req->replies[j]); } return err; } } for (; i<max; i++) { req->replies[i] = NULL; } req->cur_reply = 0; return 0; } static struct ceph_msg *__get_next_reply(struct ceph_connection *con, struct ceph_osd_request *req, int front_len) { struct ceph_msg *reply; if (req->r_con_filling_msg) { dout("revoking reply msg %p from old con %p\n", req->r_reply, req->r_con_filling_msg); ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); ceph_con_put(req->r_con_filling_msg); req->cur_reply = 0; } reply = req->replies[req->cur_reply]; if (!reply || front_len > OSD_REPLY_RESERVE_FRONT_LEN) { /* maybe we can allocate it now? */ reply = ceph_msg_new(0, front_len, 0, 0, NULL); if (!reply || IS_ERR(reply)) { pr_err(" reply alloc failed, front_len=%d\n", front_len); return ERR_PTR(-ENOMEM); } } req->r_con_filling_msg = ceph_con_get(con); req->r_reply = ceph_msg_get(reply); /* for duration of read over socket */ return ceph_msg_get(reply); } /* * build new request AND message, calculate layout, and adjust file * extent as needed. Loading Loading @@ -201,7 +136,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, void *p; int num_op = 1 + do_sync; size_t msg_size = sizeof(*head) + num_op*sizeof(*op); int err, i; int i; if (use_mempool) { req = mempool_alloc(osdc->req_mempool, GFP_NOFS); Loading @@ -212,13 +147,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (req == NULL) return ERR_PTR(-ENOMEM); err = alloc_replies(req, num_reply); if (err) { ceph_osdc_put_request(req); return ERR_PTR(-ENOMEM); } req->r_num_prealloc_reply = num_reply; req->r_osdc = osdc; req->r_mempool = use_mempool; kref_init(&req->r_kref); Loading @@ -229,7 +157,19 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); /* create message; allow space for oid */ /* create reply message */ if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); else msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, OSD_OPREPLY_FRONT_LEN, 0, 0, NULL); if (IS_ERR(msg)) { ceph_osdc_put_request(req); return ERR_PTR(PTR_ERR(msg)); } req->r_reply = msg; /* create request message; allow space for oid */ msg_size += 40; if (snapc) msg_size += sizeof(u64) * snapc->num_snaps; Loading Loading @@ -819,21 +759,11 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, * avoid a (safe but slower) revoke later. */ if (req->r_con_filling_msg == con && req->r_reply == msg) { dout(" got pages, dropping con_filling_msg ref %p\n", con); dout(" dropping con_filling_msg ref %p\n", con); req->r_con_filling_msg = NULL; ceph_con_put(con); } if (req->r_reply) { /* * once we see the message has been received, we don't * need a ref (which is only needed for revoking * pages) */ ceph_msg_put(req->r_reply); req->r_reply = NULL; } if (!req->r_got_reply) { unsigned bytes; Loading Loading @@ -1249,11 +1179,17 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) if (!osdc->req_mempool) goto out; err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true); err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true); if (err < 0) goto out_mempool; err = ceph_msgpool_init(&osdc->msgpool_op_reply, OSD_OPREPLY_FRONT_LEN, 10, true); if (err < 0) goto out_msgpool; return 0; out_msgpool: ceph_msgpool_destroy(&osdc->msgpool_op); out_mempool: mempool_destroy(osdc->req_mempool); out: Loading @@ -1271,6 +1207,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) remove_old_osds(osdc, 1); mempool_destroy(osdc->req_mempool); ceph_msgpool_destroy(&osdc->msgpool_op); ceph_msgpool_destroy(&osdc->msgpool_op_reply); } /* Loading Loading @@ -1405,15 +1342,28 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, if (!req) { *skip = 1; m = NULL; pr_info("alloc_msg unknown tid %llu from osd%d\n", tid, pr_info("get_reply unknown tid %llu from osd%d\n", tid, osd->o_osd); goto out; } m = __get_next_reply(con, req, front); if (!m || IS_ERR(m)) { *skip = 1; if (req->r_con_filling_msg) { dout("get_reply revoking msg %p from old con %p\n", req->r_reply, req->r_con_filling_msg); ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); ceph_con_put(req->r_con_filling_msg); } if (front > req->r_reply->front.iov_len) { pr_warning("get_reply front %d > preallocated %d\n", front, (int)req->r_reply->front.iov_len); m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL); if (IS_ERR(m)) goto out; ceph_msg_put(req->r_reply); req->r_reply = m; } m = ceph_msg_get(req->r_reply); if (data_len > 0) { err = __prepare_pages(con, hdr, req, tid, m); Loading @@ -1424,6 +1374,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, } } *skip = 0; req->r_con_filling_msg = ceph_con_get(con); dout("get_reply tid %lld %p\n", tid, m); out: mutex_unlock(&osdc->request_mutex); Loading
fs/ceph/osd_client.h +1 −4 Original line number Diff line number Diff line Loading @@ -53,7 +53,6 @@ struct ceph_osd_request { int r_flags; /* any additional flags for the osd */ u32 r_sent; /* >0 if r_request is sending/sent */ int r_got_reply; int r_num_prealloc_reply; struct ceph_osd_client *r_osdc; struct kref r_kref; Loading @@ -77,9 +76,6 @@ struct ceph_osd_request { struct page **r_pages; /* pages for data payload */ int r_pages_from_pool; int r_own_pages; /* if true, i own page list */ struct ceph_msg *replies[2]; int cur_reply; }; struct ceph_osd_client { Loading @@ -106,6 +102,7 @@ struct ceph_osd_client { mempool_t *req_mempool; struct ceph_msgpool msgpool_op; struct ceph_msgpool msgpool_op_reply; }; extern int ceph_osdc_init(struct ceph_osd_client *osdc, Loading